From a8e3ae2ae7a2091ad608bc2e9fd289a395ba7f3f Mon Sep 17 00:00:00 2001 From: Taeyeon Mori Date: Thu, 16 Sep 2021 21:41:17 +0200 Subject: [PATCH] python/steamutil,vdfparser: Fix mypy issues --- lib/python/propex.py | 116 +++++++++++++++++++++++++++ lib/python/steamutil.py | 173 ++++++++++++++++------------------------ lib/python/vdfparser.py | 45 ++++++++++- 3 files changed, 225 insertions(+), 109 deletions(-) create mode 100644 lib/python/propex.py diff --git a/lib/python/propex.py b/lib/python/propex.py new file mode 100644 index 0000000..73afc68 --- /dev/null +++ b/lib/python/propex.py @@ -0,0 +1,116 @@ +# Custom property-like classes +from typing import Dict, Literal, MutableMapping, NewType, Optional, Sequence, Generic, TypeVar, Callable, Type, Any, Mapping, overload, Union + + +T = TypeVar('T') +O = TypeVar('O') + + +class CustomProperty(property, Generic[T]): + """ Subclass property for IDE support. + Subclasses need not call super().__init__(), + but might want to initialize self.property_name if such information is available """ + + property_name: str = "<>" + + def __init__(self): + # Overwrite property constructor + pass + + def __set_name__(self, owner: Type[O], name: str): + """ Set name given in class body. + May not be called if assigned outside class definition """ + self.property_name = name + + @overload # type: ignore + def __get__(self, obj: None, cls: Type[O]) -> 'CustomProperty[T]': ... + @overload + def __get__(self, obj: O, cls: Type[O]) -> T: ... + + def __get__(self, obj: Optional[O], cls: Type[O]): + if obj is None: + return self + raise AttributeError(f"Cannot read property {self.property_name} of {obj:r}") + + def __set__(self, obj: O, value: T): + raise AttributeError(f"Cannot write property {self.property_name} of {obj:r}") + + def __delete__(self, obj: O): + raise AttributeError(f"Cannot delete property {self.property_name} of {obj:r}") + + +class CachedProperty(CustomProperty[T]): + """ A property that is only computed once per instance and then replaces + itself with an ordinary attribute. Deleting the attribute resets the + property. + + Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76 + """ + + def __init__(self, func: Callable[[O], T]): + self.__doc__ = getattr(func, '__doc__') + self.func = func + self.property_name = func.__name__ + + def __get__(self, obj: Optional[O], cls: Type[O]): # type: ignore + if obj is None: + return self + value = obj.__dict__[self.property_name] = self.func(obj) + return value + + +class SettableCachedProperty(CachedProperty[T]): + def __set__(self, obj: O, value: T): + obj.__dict__[self.property_name] = value + + +class DictPathRoProperty(CustomProperty[T]): + _NoDefault = NewType("_NoDefault", object) + _nodefault = _NoDefault(object()) + + def __init__(self, source_member: str, path: Sequence[str], + default: Union[T, _NoDefault]=_nodefault, type: Callable[[Any], T]=lambda x: x): + self.source_member = source_member + self.path = path + self.default = default + self.type = type + + def _get_parent(self, obj: O, *, create=False) -> MutableMapping[str, Any]: + d: MutableMapping[str, Any] = getattr(obj, self.source_member) + for pc in self.path[:-1]: + try: + d = d[pc] + except KeyError: + if not create: + raise + nd: MutableMapping[str, Any] = {} + d[pc] = nd + d = nd + return d + + def __get__(self, obj: Optional[O], cls: Type[O]): # type: ignore + if obj is None: + return self + try: + val = self._get_parent(obj)[self.path[-1]] + except KeyError: + if self.default is not self._nodefault: + return self.default + raise + else: + return self.type(val) + + +class DictPathProperty(DictPathRoProperty[T]): + def __init__(self, *args, allow_create_parents=True, **kwds): + super().__init__(*args, **kwds) + self.allow_create_parents = allow_create_parents + + def __set__(self, obj, value): + self._get_parent(obj, create=self.allow_create_parents)[self.path[-1]] = value + + def __delete__(self, obj): + del self._get_parent(obj)[self.path[-1]] + + +__all__ = ['CustomProperty', 'CachedProperty', 'SettableCachedProperty', 'DictPathRoProperty', 'DictPathProperty'] diff --git a/lib/python/steamutil.py b/lib/python/steamutil.py index fd4916c..ae83d0b 100644 --- a/lib/python/steamutil.py +++ b/lib/python/steamutil.py @@ -5,78 +5,14 @@ import sys, os import re, fnmatch, datetime from pathlib import Path -from typing import List, Iterable, Dict, Tuple, Callable, Optional, Union +from typing import List, Iterable, Dict, Literal, Mapping, Tuple, Callable, Optional, Union, Any, cast, overload -from vdfparser import VdfParser, DeepDict, AppInfoFile, LowerCaseNormalizingDict - - -class CachedProperty: - """ A property that is only computed once per instance and then replaces - itself with an ordinary attribute. Deleting the attribute resets the - property. - - Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76 - """ - - def __init__(self, func): - self.__doc__ = getattr(func, '__doc__') - self.func = func - - def __get__(self, obj, cls): - if obj is None: - return self - value = obj.__dict__[self.func.__name__] = self.func(obj) - return value - - -class DictPathRoProperty: - __slots__ = "property", "path", "default", "type" - _nodefault = object() - _id = lambda x: x - - def __init__(self, property: str, path: Tuple[str], default=_nodefault, type=_id): - self.property = property - self.path = path - self.default = default - self.type = type - - def __get__(self, obj, cls): - if obj is None: - return self - d = getattr(obj, self.property) - try: - for pc in self.path: - d = d[pc] - except KeyError: - if self.default is not self._nodefault: - return self.default - raise - else: - return self.type(d) - - -class DictPathProperty(DictPathRoProperty): - __slots__ = () - - def _get_create_parent(self, obj) -> Dict: - d = getattr(obj, self.property) - for pc in self.path[:-1]: - try: - d = d[pc] - except KeyError: - nd = {} - d[pc] = nd - d = nd - return d - - def __set__(self, obj, value): - self._get_create_parent(obj)[self.path[-1]] = value - - def __delete__(self, obj): - del self._get_create_parent(obj)[self.path[-1]] +from vdfparser import VdfParser, DeepDict, AppInfoFile, LowerCaseNormalizingDict, dd_getpath +from propex import CachedProperty, DictPathProperty, DictPathRoProperty _vdf = VdfParser() +_vdf_ci = VdfParser(factory=LowerCaseNormalizingDict) class MalformedManifestError(Exception): @@ -110,11 +46,11 @@ class AppInfo: def launch_configs(self): return self.appinfo["appinfo"]["config"]["launch"].values() - name = DictPathRoProperty("appinfo", ("appinfo", "common", "name"), default=None) - oslist = DictPathRoProperty("appinfo", ("appinfo", "common", "oslist"), type=lambda s: s.split(",")) - install_dir = DictPathRoProperty("appinfo", ("appinfo", "config", "installdir"), default=None) - languages = DictPathRoProperty("appinfo", ("appinfo", "common", "supported_languages")) - gameid = DictPathRoProperty("appinfo", ("appinfo", "common", "gameid"), type=int) + name = DictPathRoProperty[Optional[str]]("appinfo", ("appinfo", "common", "name"), default=None) + oslist = DictPathRoProperty[List[str]] ("appinfo", ("appinfo", "common", "oslist"), type=lambda s: s.split(",")) + install_dir = DictPathRoProperty[Optional[str]]("appinfo", ("appinfo", "config", "installdir"), default=None) + languages = DictPathRoProperty[Any] ("appinfo", ("appinfo", "common", "supported_languages")) + gameid = DictPathRoProperty[int] ("appinfo", ("appinfo", "common", "gameid"), type=int) # Misc. def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path: @@ -167,7 +103,7 @@ class App(AppInfo): if "AppState" not in self.manifest: raise MalformedManifestError("App manifest doesn't have AppState key", self.manifest_path) - super().__init__(libfolder.steam, int(self.manifest["AppState"]["appid"])) + super().__init__(libfolder.steam, int(dd_getpath(self.manifest, ("AppState", "appid"), t=str))) installed = True @@ -175,9 +111,9 @@ class App(AppInfo): return "" % (self.appid, self.name, self.install_path) # Basic info - name = DictPathRoProperty("manifest", ("AppState", "name")) - language = DictPathRoProperty("manifest", ("AppState", "UserConfig", "language"), None) - install_dir = DictPathRoProperty("manifest", ("AppState", "installdir")) + name = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "name"), None) + language = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "UserConfig", "language"), None) + install_dir = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "installdir"), None) @CachedProperty def install_path(self) -> Path: @@ -191,27 +127,30 @@ class App(AppInfo): # Steam Play info @property - def is_steam_play(self): - uc = self.manifest["AppState"].get("UserConfig") - if uc and "platform_override_source" in uc: - return uc["platform_override_source"] + def is_steam_play(self) -> Optional[str]: + return dd_getpath(self.manifest, ("AppState", "UserConfig", "platform_override_source"), None, t=str) @property - def is_proton_app(self): - uc = self.manifest["AppState"].get("UserConfig") + def is_proton_app(self) -> Optional[bool]: + uc = dd_getpath(self.manifest, ("AppState", "UserConfig"), None, t=dict) if uc and "platform_override_source" in uc: return uc["platform_override_source"] == "windows" and uc["platform_override_dest"] == "linux" + return None @CachedProperty def compat_path(self) -> Path: return self.steamapps_path / "compatdata" / str(self.appid) + + @CachedProperty + def compat_prefix(self) -> Path: + return self.compat_path / "pfx" @CachedProperty def compat_drive(self) -> Path: - return self.compat_path / "pfx" / "drive_c" + return self.compat_prefix / "drive_c" # Install size - declared_install_size = DictPathRoProperty("manifest", ("AppState", "SizeOnDisk"), 0, type=int) + declared_install_size = DictPathRoProperty[int]("manifest", ("AppState", "SizeOnDisk"), 0, type=int) def compute_install_size(self) -> int: def sum_size(p: Path): @@ -275,10 +214,11 @@ class LibraryFolder: manifest = self.steamapps_path / ("appmanifest_%d.acf" % appid) if manifest.exists(): return App(self, manifest) + return None def find_apps_re(self, regexp: str) -> Iterable[App]: reg = re.compile(r'"name"\s+".*%s.*"' % regexp, re.IGNORECASE) - for manifest in self.appmanifests: #pylint:disable=not-an-iterable + for manifest in self.appmanifests: with open(manifest, encoding="utf-8") as f: content = f.read() if reg.search(content): @@ -320,7 +260,7 @@ class UserAppConfig: t = int(self._data.get("Playtime2wks", "0")) return datetime.time(t // 60, t % 60) - launch_options = DictPathProperty("_data", ("LaunchOptions",), None) + launch_options = DictPathProperty[Optional[DeepDict]]("_data", ("LaunchOptions",), None) class LoginUser: @@ -341,8 +281,8 @@ class LoginUser: """ 32-bit account ID """ return self.id & 0xffffffff - account_name = DictPathRoProperty("info", ("AccountName",)) - username = DictPathRoProperty("info", ("PersonaName",)) + account_name = DictPathRoProperty[str]("info", ("AccountName",)) + username = DictPathRoProperty[str]("info", ("PersonaName",)) @CachedProperty def userdata_path(self) -> Path: @@ -410,6 +350,7 @@ class Steam: path /= "Steam" if path.exists(): return path + return None # Various paths @property @@ -433,7 +374,7 @@ class Steam: vdf_ci = VdfParser(factory=LowerCaseNormalizingDict) with open(self.loginusers_vdf, encoding="utf-8") as f: data = vdf_ci.parse(f) - for id, info in data["users"].items(): + for id, info in cast(Mapping[str, Dict], data["users"]).items(): if info["mostrecent"] == "1": return LoginUser(self, int(id), info) except KeyError: @@ -451,9 +392,9 @@ class Steam: with open(self.config_vdf, encoding="utf-8") as f: return _vdf.parse(f) - config_install_store = DictPathProperty("config", ("InstallConfigStore",)) - config_software_steam = DictPathProperty("config", ("InstallConfigStore", "Software", "Valve", "Steam")) - compat_tool_mapping = DictPathProperty("config_software_steam", ("CompatToolMapping",)) + config_install_store = DictPathProperty[Dict]("config", ("InstallConfigStore",)) + config_software_steam = DictPathProperty[Dict]("config", ("InstallConfigStore", "Software", "Valve", "Steam")) + compat_tool_mapping = DictPathProperty[Dict]("config_software_steam", ("CompatToolMapping",)) # AppInfo cache @CachedProperty @@ -470,7 +411,7 @@ class Steam: return info[891390]["appinfo"] @CachedProperty - def compat_tools(self) -> {str:{}}: + def compat_tools(self) -> Dict[str, Dict]: tools = {} # Find official proton installs valve = self.steamplay_manifest["extended"]["compat_tools"] @@ -492,7 +433,7 @@ class Steam: for mfst_path in manifests: with open(mfst_path, encoding="utf-8") as f: mfst = _vdf.parse(f) - for name, t in mfst["compatibilitytools"]["compat_tools"].items(): + for name, t in dd_getpath(mfst, ("compatibilitytools", "compat_tools"), t=dict).items(): # TODO warn duplicate name t["install_path"] = mfst_path.parent / t["install_path"] tools[name] = t @@ -502,19 +443,34 @@ class Steam: @CachedProperty def library_folder_paths(self) -> List[Path]: with open(self.libraryfolders_vdf, encoding="utf-8") as f: - return [Path(v) for k,v in _vdf.parse(f)["LibraryFolders"].items() if k.isdigit()] + data = _vdf_ci.parse(f) + def gen(): + for k, v in dd_getpath(data, ("LibraryFolders",), t=dict).items(): + if k.isdigit(): + if isinstance(v, str): + yield Path(v) + elif 'path' in v: + yield Path(v['path']) + else: + raise ValueError("Unknown format of libraryfolders.vdf") + return list(gen()) @CachedProperty def library_folders(self) -> List[LibraryFolder]: - return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths] #pylint:disable=not-an-iterable + return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths] @property def apps(self) -> Iterable[App]: - for lf in self.library_folders: #pylint:disable=not-an-iterable + for lf in self.library_folders: yield from lf.apps - - def get_app(self, id: int, installed=True) -> Optional[App]: - for lf in self.library_folders: #pylint:disable=not-an-iterable + + @overload + def get_app(self, id: int, installed: Literal[True]=True) -> Optional[App]: ... + @overload + def get_app(self, id: int, installed: Literal[False]) -> Optional[AppInfo]: ... + + def get_app(self, id: int, installed=True) -> Optional[AppInfo]: + for lf in self.library_folders: app = lf.get_app(id) if app is not None: return app @@ -522,8 +478,14 @@ class Steam: for appid, appinfo in self.appinfo.items(): if appid == id: return AppInfo(self, appid, appinfo_data=appinfo) + return None + + @overload + def find_apps_re(self, regexp: str, installed: Literal[True]) -> Iterable[App]: ... + @overload + def find_apps_re(self, regexp: str, installed: Literal[False]) -> Iterable[AppInfo]: ... - def find_apps_re(self, regexp: str, installed=True) -> Iterable[App]: + def find_apps_re(self, regexp: str, installed=True) -> Iterable[AppInfo]: """ Find all apps by regular expression """ if not installed: # Search whole appinfo cache @@ -538,7 +500,7 @@ class Steam: broken_ids.add(appid) continue if reg.search(name): - for lf in self.library_folders: #pylint:disable=not-an-iterable + for lf in self.library_folders: app = lf.get_app(appid) if app: yield app @@ -556,8 +518,8 @@ class Steam: print("[SteamUtil] Warning: found broken entries in appinfo cache:", ",".join(map(str, broken_ids))) # Search local manifests directly reg = re.compile(r'"name"\s+".*%s.*"' % regexp, re.IGNORECASE) - for lf in self.library_folders: #pylint:disable=not-an-iterable - for manifest in lf.appmanifests: #pylint:disable=not-an-iterable + for lf in self.library_folders: + for manifest in lf.appmanifests: with open(manifest, encoding="utf-8") as f: content = f.read() if reg.search(content): @@ -569,3 +531,4 @@ class Steam: def find_app(self, pattern: str, installed=True) -> Optional[App]: for app in self.find_apps(pattern, installed=installed): return app + return None diff --git a/lib/python/vdfparser.py b/lib/python/vdfparser.py index 9e108cb..e5cdb07 100644 --- a/lib/python/vdfparser.py +++ b/lib/python/vdfparser.py @@ -7,14 +7,51 @@ from __future__ import unicode_literals import io import struct -import collections import datetime -from typing import Dict, Union, Mapping, Tuple +from typing import Any, Dict, Optional, Sequence, Type, TypeVar, Union, Mapping, Tuple, NewType, cast, overload -DeepDict = Mapping[str, Union[str, "DeepDict"]] +#### Nested dictionary support +# Mypy doesn't support recursive types :( +DeepDict = Mapping[str, Union[Mapping[str, Any], str]] +_NoDefault = NewType('_NoDefault', object) +_nodefault = _NoDefault(object()) +_DefaultT = TypeVar('_DefaultT', DeepDict, str, None) +_DDCastT = TypeVar('_DDCastT', DeepDict, str) +@overload +def dd_getpath(dct: DeepDict, path: Sequence[str], default: _NoDefault=_nodefault, *, t: None=None) -> Union[DeepDict, str]: ... +@overload +def dd_getpath(dct: DeepDict, path: Sequence[str], default: _DefaultT, *, t: None=None) -> Union[DeepDict, str, _DefaultT]: ... +@overload +def dd_getpath(dct: DeepDict, path: Sequence[str], default: _NoDefault=_nodefault, *, t: Type[_DDCastT]) -> _DDCastT: ... +@overload +def dd_getpath(dct: DeepDict, path: Sequence[str], default: _DefaultT, *, t: Type[_DDCastT]) -> Union[_DDCastT, _DefaultT]: ... + +def dd_getpath(dct: DeepDict, path: Sequence[str], default: Union[_DefaultT, _NoDefault]=_nodefault, *, t: Optional[Type[_DDCastT]]=None): + """ + Retrieve a value from inside a nested dictionary. + @param dct The nested mapping + @param path The path to retrieve. Represented by a tuple of strings. + @param default A default value. Raises KeyError if omitted. + @param t Result type for built-in typing.cast(), specify 'str' or 'dict' + """ + d: Any = dct + try: + for pc in path: + d = d[pc] + # XXX: runtime type check + assert (t is None or isinstance(d, t)), f"Expected value at path {path} to be {t}, not {type(d)}" + return d + except KeyError: + if default is not _nodefault: + return default + raise + + +#### Case-Insensitive dictionary. +# Unfortunately, Valve seems to play it a little loose with casing in their .vdf files class LowerCaseNormalizingDict(dict): def __init__(self, *args, **kwds): super().__init__() @@ -268,7 +305,7 @@ class BinaryVdfParser: key, value = self._read_item(fd, t) map[key] = value - def _read_item(self, fd: io.BufferedIOBase, t: int) -> (str, DeepDict): + def _read_item(self, fd: io.BufferedIOBase, t: bytes) -> Tuple[str, Union[str, DeepDict]]: key = self._read_cstring(fd) if t == self.T_SKEY: