python/steamutil,vdfparser: Fix mypy issues

master
Taeyeon Mori 3 years ago
parent 2786987f57
commit a8e3ae2ae7
  1. 116
      lib/python/propex.py
  2. 171
      lib/python/steamutil.py
  3. 45
      lib/python/vdfparser.py

@ -0,0 +1,116 @@
# Custom property-like classes
from typing import Dict, Literal, MutableMapping, NewType, Optional, Sequence, Generic, TypeVar, Callable, Type, Any, Mapping, overload, Union
T = TypeVar('T')
O = TypeVar('O')
class CustomProperty(property, Generic[T]):
""" Subclass property for IDE support.
Subclasses need not call super().__init__(),
but might want to initialize self.property_name if such information is available """
property_name: str = "<<name unknown>>"
def __init__(self):
# Overwrite property constructor
pass
def __set_name__(self, owner: Type[O], name: str):
""" Set name given in class body.
May not be called if assigned outside class definition """
self.property_name = name
@overload # type: ignore
def __get__(self, obj: None, cls: Type[O]) -> 'CustomProperty[T]': ...
@overload
def __get__(self, obj: O, cls: Type[O]) -> T: ...
def __get__(self, obj: Optional[O], cls: Type[O]):
if obj is None:
return self
raise AttributeError(f"Cannot read property {self.property_name} of {obj:r}")
def __set__(self, obj: O, value: T):
raise AttributeError(f"Cannot write property {self.property_name} of {obj:r}")
def __delete__(self, obj: O):
raise AttributeError(f"Cannot delete property {self.property_name} of {obj:r}")
class CachedProperty(CustomProperty[T]):
""" A property that is only computed once per instance and then replaces
itself with an ordinary attribute. Deleting the attribute resets the
property.
Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
"""
def __init__(self, func: Callable[[O], T]):
self.__doc__ = getattr(func, '__doc__')
self.func = func
self.property_name = func.__name__
def __get__(self, obj: Optional[O], cls: Type[O]): # type: ignore
if obj is None:
return self
value = obj.__dict__[self.property_name] = self.func(obj)
return value
class SettableCachedProperty(CachedProperty[T]):
def __set__(self, obj: O, value: T):
obj.__dict__[self.property_name] = value
class DictPathRoProperty(CustomProperty[T]):
_NoDefault = NewType("_NoDefault", object)
_nodefault = _NoDefault(object())
def __init__(self, source_member: str, path: Sequence[str],
default: Union[T, _NoDefault]=_nodefault, type: Callable[[Any], T]=lambda x: x):
self.source_member = source_member
self.path = path
self.default = default
self.type = type
def _get_parent(self, obj: O, *, create=False) -> MutableMapping[str, Any]:
d: MutableMapping[str, Any] = getattr(obj, self.source_member)
for pc in self.path[:-1]:
try:
d = d[pc]
except KeyError:
if not create:
raise
nd: MutableMapping[str, Any] = {}
d[pc] = nd
d = nd
return d
def __get__(self, obj: Optional[O], cls: Type[O]): # type: ignore
if obj is None:
return self
try:
val = self._get_parent(obj)[self.path[-1]]
except KeyError:
if self.default is not self._nodefault:
return self.default
raise
else:
return self.type(val)
class DictPathProperty(DictPathRoProperty[T]):
def __init__(self, *args, allow_create_parents=True, **kwds):
super().__init__(*args, **kwds)
self.allow_create_parents = allow_create_parents
def __set__(self, obj, value):
self._get_parent(obj, create=self.allow_create_parents)[self.path[-1]] = value
def __delete__(self, obj):
del self._get_parent(obj)[self.path[-1]]
__all__ = ['CustomProperty', 'CachedProperty', 'SettableCachedProperty', 'DictPathRoProperty', 'DictPathProperty']

@ -5,78 +5,14 @@ import sys, os
import re, fnmatch, datetime import re, fnmatch, datetime
from pathlib import Path from pathlib import Path
from typing import List, Iterable, Dict, Tuple, Callable, Optional, Union from typing import List, Iterable, Dict, Literal, Mapping, Tuple, Callable, Optional, Union, Any, cast, overload
from vdfparser import VdfParser, DeepDict, AppInfoFile, LowerCaseNormalizingDict from vdfparser import VdfParser, DeepDict, AppInfoFile, LowerCaseNormalizingDict, dd_getpath
from propex import CachedProperty, DictPathProperty, DictPathRoProperty
class CachedProperty:
""" A property that is only computed once per instance and then replaces
itself with an ordinary attribute. Deleting the attribute resets the
property.
Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
"""
def __init__(self, func):
self.__doc__ = getattr(func, '__doc__')
self.func = func
def __get__(self, obj, cls):
if obj is None:
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
class DictPathRoProperty:
__slots__ = "property", "path", "default", "type"
_nodefault = object()
_id = lambda x: x
def __init__(self, property: str, path: Tuple[str], default=_nodefault, type=_id):
self.property = property
self.path = path
self.default = default
self.type = type
def __get__(self, obj, cls):
if obj is None:
return self
d = getattr(obj, self.property)
try:
for pc in self.path:
d = d[pc]
except KeyError:
if self.default is not self._nodefault:
return self.default
raise
else:
return self.type(d)
class DictPathProperty(DictPathRoProperty):
__slots__ = ()
def _get_create_parent(self, obj) -> Dict:
d = getattr(obj, self.property)
for pc in self.path[:-1]:
try:
d = d[pc]
except KeyError:
nd = {}
d[pc] = nd
d = nd
return d
def __set__(self, obj, value):
self._get_create_parent(obj)[self.path[-1]] = value
def __delete__(self, obj):
del self._get_create_parent(obj)[self.path[-1]]
_vdf = VdfParser() _vdf = VdfParser()
_vdf_ci = VdfParser(factory=LowerCaseNormalizingDict)
class MalformedManifestError(Exception): class MalformedManifestError(Exception):
@ -110,11 +46,11 @@ class AppInfo:
def launch_configs(self): def launch_configs(self):
return self.appinfo["appinfo"]["config"]["launch"].values() return self.appinfo["appinfo"]["config"]["launch"].values()
name = DictPathRoProperty("appinfo", ("appinfo", "common", "name"), default=None) name = DictPathRoProperty[Optional[str]]("appinfo", ("appinfo", "common", "name"), default=None)
oslist = DictPathRoProperty("appinfo", ("appinfo", "common", "oslist"), type=lambda s: s.split(",")) oslist = DictPathRoProperty[List[str]] ("appinfo", ("appinfo", "common", "oslist"), type=lambda s: s.split(","))
install_dir = DictPathRoProperty("appinfo", ("appinfo", "config", "installdir"), default=None) install_dir = DictPathRoProperty[Optional[str]]("appinfo", ("appinfo", "config", "installdir"), default=None)
languages = DictPathRoProperty("appinfo", ("appinfo", "common", "supported_languages")) languages = DictPathRoProperty[Any] ("appinfo", ("appinfo", "common", "supported_languages"))
gameid = DictPathRoProperty("appinfo", ("appinfo", "common", "gameid"), type=int) gameid = DictPathRoProperty[int] ("appinfo", ("appinfo", "common", "gameid"), type=int)
# Misc. # Misc.
def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path: def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path:
@ -167,7 +103,7 @@ class App(AppInfo):
if "AppState" not in self.manifest: if "AppState" not in self.manifest:
raise MalformedManifestError("App manifest doesn't have AppState key", self.manifest_path) raise MalformedManifestError("App manifest doesn't have AppState key", self.manifest_path)
super().__init__(libfolder.steam, int(self.manifest["AppState"]["appid"])) super().__init__(libfolder.steam, int(dd_getpath(self.manifest, ("AppState", "appid"), t=str)))
installed = True installed = True
@ -175,9 +111,9 @@ class App(AppInfo):
return "<steamutil.App #%7d '%s' @ \"%s\">" % (self.appid, self.name, self.install_path) return "<steamutil.App #%7d '%s' @ \"%s\">" % (self.appid, self.name, self.install_path)
# Basic info # Basic info
name = DictPathRoProperty("manifest", ("AppState", "name")) name = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "name"), None)
language = DictPathRoProperty("manifest", ("AppState", "UserConfig", "language"), None) language = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "UserConfig", "language"), None)
install_dir = DictPathRoProperty("manifest", ("AppState", "installdir")) install_dir = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "installdir"), None)
@CachedProperty @CachedProperty
def install_path(self) -> Path: def install_path(self) -> Path:
@ -191,27 +127,30 @@ class App(AppInfo):
# Steam Play info # Steam Play info
@property @property
def is_steam_play(self): def is_steam_play(self) -> Optional[str]:
uc = self.manifest["AppState"].get("UserConfig") return dd_getpath(self.manifest, ("AppState", "UserConfig", "platform_override_source"), None, t=str)
if uc and "platform_override_source" in uc:
return uc["platform_override_source"]
@property @property
def is_proton_app(self): def is_proton_app(self) -> Optional[bool]:
uc = self.manifest["AppState"].get("UserConfig") uc = dd_getpath(self.manifest, ("AppState", "UserConfig"), None, t=dict)
if uc and "platform_override_source" in uc: if uc and "platform_override_source" in uc:
return uc["platform_override_source"] == "windows" and uc["platform_override_dest"] == "linux" return uc["platform_override_source"] == "windows" and uc["platform_override_dest"] == "linux"
return None
@CachedProperty @CachedProperty
def compat_path(self) -> Path: def compat_path(self) -> Path:
return self.steamapps_path / "compatdata" / str(self.appid) return self.steamapps_path / "compatdata" / str(self.appid)
@CachedProperty
def compat_prefix(self) -> Path:
return self.compat_path / "pfx"
@CachedProperty @CachedProperty
def compat_drive(self) -> Path: def compat_drive(self) -> Path:
return self.compat_path / "pfx" / "drive_c" return self.compat_prefix / "drive_c"
# Install size # Install size
declared_install_size = DictPathRoProperty("manifest", ("AppState", "SizeOnDisk"), 0, type=int) declared_install_size = DictPathRoProperty[int]("manifest", ("AppState", "SizeOnDisk"), 0, type=int)
def compute_install_size(self) -> int: def compute_install_size(self) -> int:
def sum_size(p: Path): def sum_size(p: Path):
@ -275,10 +214,11 @@ class LibraryFolder:
manifest = self.steamapps_path / ("appmanifest_%d.acf" % appid) manifest = self.steamapps_path / ("appmanifest_%d.acf" % appid)
if manifest.exists(): if manifest.exists():
return App(self, manifest) return App(self, manifest)
return None
def find_apps_re(self, regexp: str) -> Iterable[App]: def find_apps_re(self, regexp: str) -> Iterable[App]:
reg = re.compile(r'"name"\s+".*%s.*"' % regexp, re.IGNORECASE) reg = re.compile(r'"name"\s+".*%s.*"' % regexp, re.IGNORECASE)
for manifest in self.appmanifests: #pylint:disable=not-an-iterable for manifest in self.appmanifests:
with open(manifest, encoding="utf-8") as f: with open(manifest, encoding="utf-8") as f:
content = f.read() content = f.read()
if reg.search(content): if reg.search(content):
@ -320,7 +260,7 @@ class UserAppConfig:
t = int(self._data.get("Playtime2wks", "0")) t = int(self._data.get("Playtime2wks", "0"))
return datetime.time(t // 60, t % 60) return datetime.time(t // 60, t % 60)
launch_options = DictPathProperty("_data", ("LaunchOptions",), None) launch_options = DictPathProperty[Optional[DeepDict]]("_data", ("LaunchOptions",), None)
class LoginUser: class LoginUser:
@ -341,8 +281,8 @@ class LoginUser:
""" 32-bit account ID """ """ 32-bit account ID """
return self.id & 0xffffffff return self.id & 0xffffffff
account_name = DictPathRoProperty("info", ("AccountName",)) account_name = DictPathRoProperty[str]("info", ("AccountName",))
username = DictPathRoProperty("info", ("PersonaName",)) username = DictPathRoProperty[str]("info", ("PersonaName",))
@CachedProperty @CachedProperty
def userdata_path(self) -> Path: def userdata_path(self) -> Path:
@ -410,6 +350,7 @@ class Steam:
path /= "Steam" path /= "Steam"
if path.exists(): if path.exists():
return path return path
return None
# Various paths # Various paths
@property @property
@ -433,7 +374,7 @@ class Steam:
vdf_ci = VdfParser(factory=LowerCaseNormalizingDict) vdf_ci = VdfParser(factory=LowerCaseNormalizingDict)
with open(self.loginusers_vdf, encoding="utf-8") as f: with open(self.loginusers_vdf, encoding="utf-8") as f:
data = vdf_ci.parse(f) data = vdf_ci.parse(f)
for id, info in data["users"].items(): for id, info in cast(Mapping[str, Dict], data["users"]).items():
if info["mostrecent"] == "1": if info["mostrecent"] == "1":
return LoginUser(self, int(id), info) return LoginUser(self, int(id), info)
except KeyError: except KeyError:
@ -451,9 +392,9 @@ class Steam:
with open(self.config_vdf, encoding="utf-8") as f: with open(self.config_vdf, encoding="utf-8") as f:
return _vdf.parse(f) return _vdf.parse(f)
config_install_store = DictPathProperty("config", ("InstallConfigStore",)) config_install_store = DictPathProperty[Dict]("config", ("InstallConfigStore",))
config_software_steam = DictPathProperty("config", ("InstallConfigStore", "Software", "Valve", "Steam")) config_software_steam = DictPathProperty[Dict]("config", ("InstallConfigStore", "Software", "Valve", "Steam"))
compat_tool_mapping = DictPathProperty("config_software_steam", ("CompatToolMapping",)) compat_tool_mapping = DictPathProperty[Dict]("config_software_steam", ("CompatToolMapping",))
# AppInfo cache # AppInfo cache
@CachedProperty @CachedProperty
@ -470,7 +411,7 @@ class Steam:
return info[891390]["appinfo"] return info[891390]["appinfo"]
@CachedProperty @CachedProperty
def compat_tools(self) -> {str:{}}: def compat_tools(self) -> Dict[str, Dict]:
tools = {} tools = {}
# Find official proton installs # Find official proton installs
valve = self.steamplay_manifest["extended"]["compat_tools"] valve = self.steamplay_manifest["extended"]["compat_tools"]
@ -492,7 +433,7 @@ class Steam:
for mfst_path in manifests: for mfst_path in manifests:
with open(mfst_path, encoding="utf-8") as f: with open(mfst_path, encoding="utf-8") as f:
mfst = _vdf.parse(f) mfst = _vdf.parse(f)
for name, t in mfst["compatibilitytools"]["compat_tools"].items(): for name, t in dd_getpath(mfst, ("compatibilitytools", "compat_tools"), t=dict).items():
# TODO warn duplicate name # TODO warn duplicate name
t["install_path"] = mfst_path.parent / t["install_path"] t["install_path"] = mfst_path.parent / t["install_path"]
tools[name] = t tools[name] = t
@ -502,19 +443,34 @@ class Steam:
@CachedProperty @CachedProperty
def library_folder_paths(self) -> List[Path]: def library_folder_paths(self) -> List[Path]:
with open(self.libraryfolders_vdf, encoding="utf-8") as f: with open(self.libraryfolders_vdf, encoding="utf-8") as f:
return [Path(v) for k,v in _vdf.parse(f)["LibraryFolders"].items() if k.isdigit()] data = _vdf_ci.parse(f)
def gen():
for k, v in dd_getpath(data, ("LibraryFolders",), t=dict).items():
if k.isdigit():
if isinstance(v, str):
yield Path(v)
elif 'path' in v:
yield Path(v['path'])
else:
raise ValueError("Unknown format of libraryfolders.vdf")
return list(gen())
@CachedProperty @CachedProperty
def library_folders(self) -> List[LibraryFolder]: def library_folders(self) -> List[LibraryFolder]:
return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths] #pylint:disable=not-an-iterable return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths]
@property @property
def apps(self) -> Iterable[App]: def apps(self) -> Iterable[App]:
for lf in self.library_folders: #pylint:disable=not-an-iterable for lf in self.library_folders:
yield from lf.apps yield from lf.apps
def get_app(self, id: int, installed=True) -> Optional[App]: @overload
for lf in self.library_folders: #pylint:disable=not-an-iterable def get_app(self, id: int, installed: Literal[True]=True) -> Optional[App]: ...
@overload
def get_app(self, id: int, installed: Literal[False]) -> Optional[AppInfo]: ...
def get_app(self, id: int, installed=True) -> Optional[AppInfo]:
for lf in self.library_folders:
app = lf.get_app(id) app = lf.get_app(id)
if app is not None: if app is not None:
return app return app
@ -522,8 +478,14 @@ class Steam:
for appid, appinfo in self.appinfo.items(): for appid, appinfo in self.appinfo.items():
if appid == id: if appid == id:
return AppInfo(self, appid, appinfo_data=appinfo) return AppInfo(self, appid, appinfo_data=appinfo)
return None
@overload
def find_apps_re(self, regexp: str, installed: Literal[True]) -> Iterable[App]: ...
@overload
def find_apps_re(self, regexp: str, installed: Literal[False]) -> Iterable[AppInfo]: ...
def find_apps_re(self, regexp: str, installed=True) -> Iterable[App]: def find_apps_re(self, regexp: str, installed=True) -> Iterable[AppInfo]:
""" Find all apps by regular expression """ """ Find all apps by regular expression """
if not installed: if not installed:
# Search whole appinfo cache # Search whole appinfo cache
@ -538,7 +500,7 @@ class Steam:
broken_ids.add(appid) broken_ids.add(appid)
continue continue
if reg.search(name): if reg.search(name):
for lf in self.library_folders: #pylint:disable=not-an-iterable for lf in self.library_folders:
app = lf.get_app(appid) app = lf.get_app(appid)
if app: if app:
yield app yield app
@ -556,8 +518,8 @@ class Steam:
print("[SteamUtil] Warning: found broken entries in appinfo cache:", ",".join(map(str, broken_ids))) print("[SteamUtil] Warning: found broken entries in appinfo cache:", ",".join(map(str, broken_ids)))
# Search local manifests directly # Search local manifests directly
reg = re.compile(r'"name"\s+".*%s.*"' % regexp, re.IGNORECASE) reg = re.compile(r'"name"\s+".*%s.*"' % regexp, re.IGNORECASE)
for lf in self.library_folders: #pylint:disable=not-an-iterable for lf in self.library_folders:
for manifest in lf.appmanifests: #pylint:disable=not-an-iterable for manifest in lf.appmanifests:
with open(manifest, encoding="utf-8") as f: with open(manifest, encoding="utf-8") as f:
content = f.read() content = f.read()
if reg.search(content): if reg.search(content):
@ -569,3 +531,4 @@ class Steam:
def find_app(self, pattern: str, installed=True) -> Optional[App]: def find_app(self, pattern: str, installed=True) -> Optional[App]:
for app in self.find_apps(pattern, installed=installed): for app in self.find_apps(pattern, installed=installed):
return app return app
return None

@ -7,14 +7,51 @@ from __future__ import unicode_literals
import io import io
import struct import struct
import collections
import datetime import datetime
from typing import Dict, Union, Mapping, Tuple from typing import Any, Dict, Optional, Sequence, Type, TypeVar, Union, Mapping, Tuple, NewType, cast, overload
DeepDict = Mapping[str, Union[str, "DeepDict"]] #### Nested dictionary support
# Mypy doesn't support recursive types :(
DeepDict = Mapping[str, Union[Mapping[str, Any], str]]
_NoDefault = NewType('_NoDefault', object)
_nodefault = _NoDefault(object())
_DefaultT = TypeVar('_DefaultT', DeepDict, str, None)
_DDCastT = TypeVar('_DDCastT', DeepDict, str)
@overload
def dd_getpath(dct: DeepDict, path: Sequence[str], default: _NoDefault=_nodefault, *, t: None=None) -> Union[DeepDict, str]: ...
@overload
def dd_getpath(dct: DeepDict, path: Sequence[str], default: _DefaultT, *, t: None=None) -> Union[DeepDict, str, _DefaultT]: ...
@overload
def dd_getpath(dct: DeepDict, path: Sequence[str], default: _NoDefault=_nodefault, *, t: Type[_DDCastT]) -> _DDCastT: ...
@overload
def dd_getpath(dct: DeepDict, path: Sequence[str], default: _DefaultT, *, t: Type[_DDCastT]) -> Union[_DDCastT, _DefaultT]: ...
def dd_getpath(dct: DeepDict, path: Sequence[str], default: Union[_DefaultT, _NoDefault]=_nodefault, *, t: Optional[Type[_DDCastT]]=None):
"""
Retrieve a value from inside a nested dictionary.
@param dct The nested mapping
@param path The path to retrieve. Represented by a tuple of strings.
@param default A default value. Raises KeyError if omitted.
@param t Result type for built-in typing.cast(), specify 'str' or 'dict'
"""
d: Any = dct
try:
for pc in path:
d = d[pc]
# XXX: runtime type check
assert (t is None or isinstance(d, t)), f"Expected value at path {path} to be {t}, not {type(d)}"
return d
except KeyError:
if default is not _nodefault:
return default
raise
#### Case-Insensitive dictionary.
# Unfortunately, Valve seems to play it a little loose with casing in their .vdf files
class LowerCaseNormalizingDict(dict): class LowerCaseNormalizingDict(dict):
def __init__(self, *args, **kwds): def __init__(self, *args, **kwds):
super().__init__() super().__init__()
@ -268,7 +305,7 @@ class BinaryVdfParser:
key, value = self._read_item(fd, t) key, value = self._read_item(fd, t)
map[key] = value map[key] = value
def _read_item(self, fd: io.BufferedIOBase, t: int) -> (str, DeepDict): def _read_item(self, fd: io.BufferedIOBase, t: bytes) -> Tuple[str, Union[str, DeepDict]]:
key = self._read_cstring(fd) key = self._read_cstring(fd)
if t == self.T_SKEY: if t == self.T_SKEY:

Loading…
Cancel
Save