py/propex,vdfparser,steamutil,steamsync: Improve typing

master
Taeyeon Mori 2 months ago
parent 79a50b5c4d
commit c47c5c78db
  1. 34
      lib/python/propex.py
  2. 133
      lib/python/steamsync.py
  3. 45
      lib/python/steamutil.py
  4. 116
      lib/python/vdfparser.py

@ -1,9 +1,18 @@
# Custom property-like classes # Custom property-like classes
from typing import Dict, Literal, MutableMapping, NewType, Optional, Sequence, Generic, TypeVar, Callable, Type, Any, Mapping, overload, Union from typing import MutableMapping, NewType, Optional, Sequence, Generic, TypeVar, Callable, Type, Any, overload, Union
try:
from typing import Self
except ImportError:
try:
from typing_extensions import Self
except ImportError:
Self = Any
T = TypeVar('T') T = TypeVar('T')
O = TypeVar('O') O = TypeVar('O')
Ow = TypeVar('Ow', covariant=True)
class CustomProperty(property, Generic[T]): class CustomProperty(property, Generic[T]):
@ -23,7 +32,7 @@ class CustomProperty(property, Generic[T]):
self.property_name = name self.property_name = name
@overload # type: ignore @overload # type: ignore
def __get__(self, obj: None, cls: Type[O]) -> 'CustomProperty[T]': ... def __get__(self, obj: None, cls: Type[O]) -> Self: ...
@overload @overload
def __get__(self, obj: O, cls: Type[O]) -> T: ... def __get__(self, obj: O, cls: Type[O]) -> T: ...
@ -39,7 +48,7 @@ class CustomProperty(property, Generic[T]):
raise AttributeError(f"Cannot delete property {self.property_name} of {obj!r}") raise AttributeError(f"Cannot delete property {self.property_name} of {obj!r}")
class CachedProperty(CustomProperty[T]): class CachedProperty(CustomProperty[T], Generic[T, Ow]):
""" A property that is only computed once per instance and then replaces """ A property that is only computed once per instance and then replaces
itself with an ordinary attribute. Deleting the attribute resets the itself with an ordinary attribute. Deleting the attribute resets the
property. property.
@ -47,20 +56,23 @@ class CachedProperty(CustomProperty[T]):
Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76 Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
""" """
def __init__(self, func: Callable[[O], T]): def __init__(self, func: Callable[[Ow], T]):
self.__doc__ = getattr(func, '__doc__') self.__doc__ = getattr(func, '__doc__')
self.func = func self.func = func
self.property_name = func.__name__ self.property_name = func.__name__
def __get__(self, obj: Optional[O], cls: Type[O]): # type: ignore def __get__(self, obj: Optional[Ow], cls: Type[Ow]): # type: ignore[override]
if obj is None: if obj is None:
return self return self
value = obj.__dict__[self.property_name] = self.func(obj) value = obj.__dict__[self.property_name] = self.func(obj)
return value return value
def __delete__(self, obj: Ow): # type: ignore[override,misc]
del obj.__dict__[self.property_name]
class SettableCachedProperty(CachedProperty[T]):
def __set__(self, obj: O, value: T): class SettableCachedProperty(CachedProperty[T, O]):
def __set__(self, obj: O, value: T): #type: ignore[override]
obj.__dict__[self.property_name] = value obj.__dict__[self.property_name] = value
@ -113,4 +125,10 @@ class DictPathProperty(DictPathRoProperty[T]):
del self._get_parent(obj)[self.path[-1]] del self._get_parent(obj)[self.path[-1]]
__all__ = ['CustomProperty', 'CachedProperty', 'SettableCachedProperty', 'DictPathRoProperty', 'DictPathProperty'] # functools.cached_property polyfill
try:
from functools import cached_property
except ImportError:
cached_property = CachedProperty # type: ignore[assignment,misc]
__all__ = ['CustomProperty', 'CachedProperty', 'SettableCachedProperty', 'DictPathRoProperty', 'DictPathProperty', 'cached_property']

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import sys, os import sys
import os
import fnmatch import fnmatch
import re import re
import itertools import itertools
@ -13,11 +14,11 @@ import time
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from copy import copy from copy import copy
from getpass import getuser from getpass import getuser
from pathlib import PurePath, Path, PureWindowsPath from pathlib import PurePath, Path
from typing import Iterable, Tuple, Dict, List, Union, Set, Callable, Any, Optional, TypeVar, Generic, Sequence, overload from typing import Iterable, Tuple, Dict, List, Union, Set, Callable, Any, Optional, TypeVar, Generic, Sequence, overload, Literal, TypedDict
from warnings import warn from warnings import warn
from propex import CachedProperty, SettableCachedProperty from propex import SettableCachedProperty, cached_property
from steamutil import Steam, App from steamutil import Steam, App
@ -122,9 +123,15 @@ class SyncPath(Cloneable):
class _SyncSetCommon(metaclass=ABCMeta): class _SyncSetCommon(metaclass=ABCMeta):
files_from_local: CachedProperty[Set[Path]] @property
files_from_target: CachedProperty[Set[Path]] @abstractmethod
files_unmodified: CachedProperty[Set[Path]] def files_from_local(self) -> Set[Path]: ...
@property
@abstractmethod
def files_from_target(self) -> Set[Path]: ...
@property
@abstractmethod
def files_unmodified(self) -> Set[Path]: ...
def show_confirm(self, skip=True) -> bool: def show_confirm(self, skip=True) -> bool:
# XXX: move to SyncOp? # XXX: move to SyncOp?
@ -221,15 +228,15 @@ class SyncSet(_SyncSetCommon):
if f not in dst_files or sst.st_mtime > dst_files[f][1].st_mtime if f not in dst_files or sst.st_mtime > dst_files[f][1].st_mtime
} }
@CachedProperty @cached_property
def files_from_local(self) -> Set[Path]: def files_from_local(self) -> Set[Path]:
return self._sync_set(self.local, self.target) return self._sync_set(self.local, self.target)
@CachedProperty @cached_property
def files_from_target(self) -> Set[Path]: def files_from_target(self) -> Set[Path]:
return self._sync_set(self.target, self.local) return self._sync_set(self.target, self.local)
@CachedProperty @cached_property
def files_unmodified(self) -> Set[Path]: def files_unmodified(self) -> Set[Path]:
return (self.local.keys() | self.target.keys()) - (self.files_from_local | self.files_from_target) return (self.local.keys() | self.target.keys()) - (self.files_from_local | self.files_from_target)
@ -267,15 +274,15 @@ class SyncMultiSet(list, _SyncSetCommon):
return set() return set()
return functools.reduce(operator.or_, map(operator.attrgetter(attrname), self)) return functools.reduce(operator.or_, map(operator.attrgetter(attrname), self))
@CachedProperty @cached_property
def files_from_local(self) -> Set[Path]: def files_from_local(self) -> Set[Path]:
return self._union_set("files_from_local") return self._union_set("files_from_local")
@CachedProperty @cached_property
def files_from_target(self) -> Set[Path]: def files_from_target(self) -> Set[Path]:
return self._union_set("files_from_target") return self._union_set("files_from_target")
@CachedProperty @cached_property
def files_unmodified(self) -> Set[Path]: def files_unmodified(self) -> Set[Path]:
return self._union_set("files_unmodified") return self._union_set("files_unmodified")
@ -321,12 +328,18 @@ class AbstractCommonPaths:
is_windows: bool = True is_windows: bool = True
is_native_linux: bool = False is_native_linux: bool = False
# abstract attribute
@property @property
@abstractmethod @abstractmethod
def drive_c(self) -> P: pass def drive_c(self) -> P: ...
# abstract attribute @property
my_documents: CachedProperty[P] @abstractmethod
def my_documents(self) -> P: ...
@property
@abstractmethod
def appdata_roaming(self) -> P: ...
class Windows(WindowsCommon[P]): class Windows(WindowsCommon[P]):
is_native_windows: bool = True is_native_windows: bool = True
@ -336,19 +349,27 @@ class AbstractCommonPaths:
def drive_c(self) -> P: def drive_c(self) -> P:
return self._path_factory("C:\\") return self._path_factory("C:\\")
@CachedProperty # Win32 API
def my_documents(self) -> P: CSIDL_PERSONAL = 0x0005
""" Get the Windows "My Documents" folder """ CSIDL_APPDATA = 0x001a
def get_my_documents():
@staticmethod
def SHGetFolderPath(csidl: int) -> str:
import ctypes.wintypes import ctypes.wintypes
CSIDL_PERSONAL = 5 # My Documents
SHGFP_TYPE_CURRENT = 0 # Get current, not default value SHGFP_TYPE_CURRENT = 0 # Get current, not default value
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf) shell32 = ctypes.windll.shell32 # type: ignore[attr-defined] # Windows only
shell32.SHGetFolderPathW(None, csidl, None, SHGFP_TYPE_CURRENT, buf)
return buf.value return buf.value
return self._path_factory(get_my_documents())
@cached_property
def my_documents(self) -> P:
""" Get the Windows "My Documents" folder """
return self._path_factory(self.SHGetFolderPath(self.CSIDL_PERSONAL))
@cached_property
def appdata_roaming(self) -> P:
return self._path_factory(self.SHGetFolderPath(self.CSIDL_APPDATA))
class Wine(WindowsCommon[P]): class Wine(WindowsCommon[P]):
is_native_windows: bool = False is_native_windows: bool = False
@ -379,6 +400,10 @@ class AbstractCommonPaths:
@staticmethod @staticmethod
def _find_file_ci(path: Path, candidates: Optional[Sequence[str]]=None, exclude: Optional[Sequence[str]]=None) -> List[Path]: def _find_file_ci(path: Path, candidates: Optional[Sequence[str]]=None, exclude: Optional[Sequence[str]]=None) -> List[Path]:
""" Find directory entry with casefolding
Note: candidates must already be lowercase """
if not path.exists():
return []
entries: Dict[str, Path] = {p.name.lower(): p for p in path.iterdir() if p.is_dir()} entries: Dict[str, Path] = {p.name.lower(): p for p in path.iterdir() if p.is_dir()}
results: List[Path] = [] results: List[Path] = []
if candidates is not None: if candidates is not None:
@ -392,7 +417,7 @@ class AbstractCommonPaths:
results.extend((path for name, path in entries.items() if name not in exclude and path not in results)) results.extend((path for name, path in entries.items() if name not in exclude and path not in results))
return results return results
@CachedProperty @cached_property
def _wine_prefix_userprofile(self) -> Path: def _wine_prefix_userprofile(self) -> Path:
## Try to find out the username in the prefix ## Try to find out the username in the prefix
## usually, this is the same as the system user, but ## usually, this is the same as the system user, but
@ -409,14 +434,23 @@ class AbstractCommonPaths:
def home(self) -> P: def home(self) -> P:
return self._path_factory(self._wine_prefix_userprofile) return self._path_factory(self._wine_prefix_userprofile)
@CachedProperty @cached_property
def my_documents(self) -> P: def my_documents(self) -> P:
""" Get the Windows "My Documents" folder """ """ Get the Windows "My Documents" folder """
ppath = self._wine_prefix_userprofile candidates = self._find_file_ci(self._wine_prefix_userprofile, ['my documents', 'documents'])
# BUG: mypy#7781 overload staticmethod is broken when called on instance if not candidates:
candidates = self.__class__._find_file_ci(ppath, ['my documents', 'documents']) raise FileNotFoundError(f"Could not find 'My Documents' folder in profile at '{self._wine_prefix_userprofile}'")
return self._path_factory(candidates[0])
@cached_property
def appdata_roaming(self) -> P:
candidates = self._find_file_ci(self._wine_prefix_userprofile, ['appdata', 'application data'])
if not candidates: if not candidates:
raise FileNotFoundError(f"Could not find 'My Documents' folder in profile at '{ppath}'") raise FileNotFoundError(f"Could not find 'AppData/Roaming' folder in profile at '{self._wine_prefix_userprofile}'")
for candidate in candidates:
roaming = self._find_file_ci(candidate, ['roaming'])
if roaming:
return self._path_factory(roaming[0])
return self._path_factory(candidates[0]) return self._path_factory(candidates[0])
class Linux(Common[P]): class Linux(Common[P]):
@ -427,11 +461,11 @@ class AbstractCommonPaths:
## XDG ## XDG
# XXX: make it methods and search all locations? # XXX: make it methods and search all locations?
@CachedProperty @cached_property
def xdg_config_dir(self) -> P: def xdg_config_dir(self) -> P:
raise NotImplementedError() raise NotImplementedError()
@CachedProperty @cached_property
def xdg_data_dir(self) -> P: def xdg_data_dir(self) -> P:
raise NotImplementedError() raise NotImplementedError()
@ -441,9 +475,12 @@ class CommonPaths:
def _path_factory(self, p: PathOrStr) -> Path: def _path_factory(self, p: PathOrStr) -> Path:
return Path(p) return Path(p)
class LinuxPaths(AbstractCommonPaths.Linux[Path], Mixin): pass class LinuxPaths(AbstractCommonPaths.Linux[Path], Mixin):
class WindowsPaths(AbstractCommonPaths.Windows[Path], Mixin): pass pass
class WinePaths(AbstractCommonPaths.Wine[Path], Mixin): pass class WindowsPaths(AbstractCommonPaths.Windows[Path], Mixin):
pass
class WinePaths(AbstractCommonPaths.Wine[Path], Mixin):
pass
Paths = Union[LinuxPaths, WindowsPaths, WinePaths] Paths = Union[LinuxPaths, WindowsPaths, WinePaths]
NativePaths = Union[LinuxPaths, WindowsPaths] NativePaths = Union[LinuxPaths, WindowsPaths]
@ -477,9 +514,12 @@ class CommonSyncPaths:
def _path_factory(self, p: PathOrStr) -> SyncPath: def _path_factory(self, p: PathOrStr) -> SyncPath:
return SyncPath(self.op, p) return SyncPath(self.op, p)
class LinuxPaths(AbstractCommonPaths.Linux[SyncPath], Mixin): pass class LinuxPaths(AbstractCommonPaths.Linux[SyncPath], Mixin):
class WindowsPaths(AbstractCommonPaths.Windows[SyncPath], Mixin): pass pass
class WinePaths(AbstractCommonPaths.Wine[SyncPath], Mixin): pass class WindowsPaths(AbstractCommonPaths.Windows[SyncPath], Mixin):
pass
class WinePaths(AbstractCommonPaths.Wine[SyncPath], Mixin):
pass
Paths = Union[LinuxPaths, WindowsPaths, WinePaths] Paths = Union[LinuxPaths, WindowsPaths, WinePaths]
@ -498,6 +538,7 @@ class CommonSyncPaths:
### ----------------------------------------------------------------- ### -----------------------------------------------------------------
_AbstractSyncOp = TypeVar("_AbstractSyncOp", bound="AbstractSyncOp") _AbstractSyncOp = TypeVar("_AbstractSyncOp", bound="AbstractSyncOp")
class AbstractSyncOp(ISyncOp): class AbstractSyncOp(ISyncOp):
parent: ISyncContext parent: ISyncContext
name: str # Abstract name: str # Abstract
@ -506,7 +547,7 @@ class AbstractSyncOp(ISyncOp):
self.parent = parent self.parent = parent
# Paths # Paths
@CachedProperty @cached_property
def paths(self) -> CommonSyncPaths.Paths: def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, None) return CommonSyncPaths.create(self, None)
@ -515,7 +556,7 @@ class AbstractSyncOp(ISyncOp):
# Properties # Properties
@SettableCachedProperty @SettableCachedProperty
def slug(self): def slug(self) -> str:
""" Name of the destination folder """ """ Name of the destination folder """
return self.name return self.name
@ -546,7 +587,7 @@ class AbstractSyncOp(ISyncOp):
% (self.name, self.__class__.__name__.replace("SyncOp", ""))) % (self.name, self.__class__.__name__.replace("SyncOp", "")))
def report_error(self, msg: Iterable[str]): def report_error(self, msg: Iterable[str]):
print("\033[31m"+"\n".join(" " + l for l in msg)+"\033[0m") print("\033[31m"+"\n".join(" " + ln for ln in msg)+"\033[0m")
class SteamSyncOp(AbstractSyncOp): class SteamSyncOp(AbstractSyncOp):
@ -557,7 +598,7 @@ class SteamSyncOp(AbstractSyncOp):
super().__init__(ssync) super().__init__(ssync)
self.app = app self.app = app
@CachedProperty @cached_property
def paths(self) -> CommonSyncPaths.Paths: def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, self.app.compat_prefix if self.app.is_proton_app else None) return CommonSyncPaths.create(self, self.app.compat_prefix if self.app.is_proton_app else None)
@ -650,7 +691,7 @@ class WineSyncOp(GenericFoundSyncOp):
super().__init__(parent, name, found) super().__init__(parent, name, found)
self._wine_prefix = prefix self._wine_prefix = prefix
@CachedProperty @cached_property
def paths(self) -> CommonSyncPaths.Paths: def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, self._wine_prefix) return CommonSyncPaths.create(self, self._wine_prefix)
@ -712,12 +753,12 @@ class NoSteamSync(ISyncContext):
class SteamSync(NoSteamSync): class SteamSync(NoSteamSync):
steam: Steam steam: Steam
def __init__(self, target_path: Path, *, steam_path: Path = None): def __init__(self, target_path: Path, *, steam_path: Optional[Path] = None):
super().__init__(target_path) super().__init__(target_path)
self.steam = Steam(steam_path) self.steam = Steam(steam_path)
# Get Information # Get Information
@CachedProperty @cached_property
def apps(self) -> List[App]: def apps(self) -> List[App]:
return list(self.steam.apps) return list(self.steam.apps)

@ -1,14 +1,17 @@
# Discover Steam install and games # Discover Steam install and games
# (c) 2020 Taeyeon Mori CC-BY-SA # (c) 2020 Taeyeon Mori CC-BY-SA
import sys, os import datetime
import re, fnmatch, datetime import fnmatch
import os
import re
import sys
from pathlib import Path from pathlib import Path
from typing import List, Iterable, Dict, Literal, Mapping, Tuple, Callable, Optional, Union, Any, cast, overload from typing import List, Iterable, Dict, Literal, Mapping, Tuple, Optional, Union, Any, cast, overload
from vdfparser import VdfParser, DeepDict, AppInfoFile, LowerCaseNormalizingDict, dd_getpath from vdfparser import VdfParser, DeepDict, AppInfoFile, LowerCaseNormalizingDict, dd_getpath
from propex import CachedProperty, SettableCachedProperty, DictPathProperty, DictPathRoProperty from propex import SettableCachedProperty, DictPathProperty, DictPathRoProperty, cached_property
_vdf = VdfParser() _vdf = VdfParser()
@ -37,7 +40,7 @@ class AppInfo:
installed = False installed = False
# AppInfo # AppInfo
@CachedProperty @cached_property
def appinfo(self): def appinfo(self):
# FIXME: properly close AppInfoFile but also deal with always-open appinfo # FIXME: properly close AppInfoFile but also deal with always-open appinfo
return self.steam.appinfo[self.appid] return self.steam.appinfo[self.appid]
@ -60,7 +63,7 @@ class AppInfo:
def is_native(self): def is_native(self):
return sys.platform in self.oslist return sys.platform in self.oslist
@CachedProperty @cached_property
def compat_tool(self) -> dict: def compat_tool(self) -> dict:
mapping = self.steam.compat_tool_mapping mapping = self.steam.compat_tool_mapping
appid = str(self.appid) appid = str(self.appid)
@ -115,13 +118,13 @@ class App(AppInfo):
language = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "UserConfig", "language"), None) language = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "UserConfig", "language"), None)
install_dir = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "installdir"), None) install_dir = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "installdir"), None)
@CachedProperty @cached_property
def install_path(self) -> Path: def install_path(self) -> Path:
return self.steamapps_path / "common" / self.install_dir return self.steamapps_path / "common" / self.install_dir
# Workshop # Workshop
# TODO # TODO
@CachedProperty @cached_property
def workshop_path(self) -> Path: def workshop_path(self) -> Path:
return self.steamapps_path / "workshop" / "content" / str(self.appid) return self.steamapps_path / "workshop" / "content" / str(self.appid)
@ -137,15 +140,15 @@ class App(AppInfo):
return uc["platform_override_source"] == "windows" and uc["platform_override_dest"] == "linux" return uc["platform_override_source"] == "windows" and uc["platform_override_dest"] == "linux"
return None return None
@CachedProperty @cached_property
def compat_path(self) -> Path: def compat_path(self) -> Path:
return self.steamapps_path / "compatdata" / str(self.appid) return self.steamapps_path / "compatdata" / str(self.appid)
@CachedProperty @cached_property
def compat_prefix(self) -> Path: def compat_prefix(self) -> Path:
return self.compat_path / "pfx" return self.compat_path / "pfx"
@CachedProperty @cached_property
def compat_drive(self) -> Path: def compat_drive(self) -> Path:
return self.compat_prefix / "drive_c" return self.compat_prefix / "drive_c"
@ -176,7 +179,7 @@ class LibraryFolder:
return "<steamutil.LibraryFolder @ \"%s\">" % self.path return "<steamutil.LibraryFolder @ \"%s\">" % self.path
# Paths # Paths
@CachedProperty @cached_property
def steamapps_path(self) -> Path: def steamapps_path(self) -> Path:
steamapps = self.path / "steamapps" steamapps = self.path / "steamapps"
if not steamapps.exists(): if not steamapps.exists():
@ -284,7 +287,7 @@ class LoginUser:
account_name = DictPathRoProperty[str]("info", ("AccountName",)) account_name = DictPathRoProperty[str]("info", ("AccountName",))
username = DictPathRoProperty[str]("info", ("PersonaName",)) username = DictPathRoProperty[str]("info", ("PersonaName",))
@CachedProperty @cached_property
def userdata_path(self) -> Path: def userdata_path(self) -> Path:
return self.steam.get_userdata_path(self) return self.steam.get_userdata_path(self)
@ -292,7 +295,7 @@ class LoginUser:
def localconfig_vdf(self) -> Path: def localconfig_vdf(self) -> Path:
return self.userdata_path / "config" / "localconfig.vdf" return self.userdata_path / "config" / "localconfig.vdf"
@CachedProperty @cached_property
def localconfig(self) -> DeepDict: def localconfig(self) -> DeepDict:
with open(self.localconfig_vdf, encoding="utf-8") as f: with open(self.localconfig_vdf, encoding="utf-8") as f:
return _vdf.parse(f) return _vdf.parse(f)
@ -367,7 +370,7 @@ class Steam:
return self.root / "config" / "loginusers.vdf" return self.root / "config" / "loginusers.vdf"
# Users # Users
@CachedProperty @cached_property
def most_recent_user(self) -> Optional[LoginUser]: def most_recent_user(self) -> Optional[LoginUser]:
try: try:
# Apparently, Steam doesn't care about case in the config/*.vdf keys # Apparently, Steam doesn't care about case in the config/*.vdf keys
@ -387,7 +390,7 @@ class Steam:
return self.root / "userdata" / str(user_id) return self.root / "userdata" / str(user_id)
# Config # Config
@CachedProperty @cached_property
def config(self) -> DeepDict: def config(self) -> DeepDict:
with open(self.config_vdf, encoding="utf-8") as f: with open(self.config_vdf, encoding="utf-8") as f:
return _vdf.parse(f) return _vdf.parse(f)
@ -397,11 +400,11 @@ class Steam:
compat_tool_mapping = DictPathProperty[Dict]("config_software_steam", ("CompatToolMapping",)) compat_tool_mapping = DictPathProperty[Dict]("config_software_steam", ("CompatToolMapping",))
# AppInfo cache # AppInfo cache
@CachedProperty @cached_property
def appinfo_vdf(self): def appinfo_vdf(self):
return self.root / "appcache" / "appinfo.vdf" return self.root / "appcache" / "appinfo.vdf"
@property @cached_property
def appinfo(self) -> AppInfoFile: def appinfo(self) -> AppInfoFile:
return AppInfoFile.open(self.appinfo_vdf) return AppInfoFile.open(self.appinfo_vdf)
@ -410,7 +413,7 @@ class Steam:
with self.appinfo as info: with self.appinfo as info:
return info[891390]["appinfo"] return info[891390]["appinfo"]
@CachedProperty @cached_property
def compat_tools(self) -> Dict[str, Dict]: def compat_tools(self) -> Dict[str, Dict]:
tools = {} tools = {}
# Find official proton installs # Find official proton installs
@ -440,7 +443,7 @@ class Steam:
return tools return tools
# Game/App Library # Game/App Library
@CachedProperty @cached_property
def library_folder_paths(self) -> List[Path]: def library_folder_paths(self) -> List[Path]:
with open(self.libraryfolders_vdf, encoding="utf-8") as f: with open(self.libraryfolders_vdf, encoding="utf-8") as f:
data = _vdf_ci.parse(f) data = _vdf_ci.parse(f)
@ -455,7 +458,7 @@ class Steam:
raise ValueError("Unknown format of libraryfolders.vdf") raise ValueError("Unknown format of libraryfolders.vdf")
return list(gen()) return list(gen())
@CachedProperty @cached_property
def library_folders(self) -> List[LibraryFolder]: def library_folders(self) -> List[LibraryFolder]:
return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths] return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths]

@ -5,31 +5,44 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import datetime
import io import io
import struct import struct
import datetime from typing import (Any, Dict, Iterator, Mapping, NewType, Optional, Sequence,
Tuple, Type, TypeVar, Union, overload)
from typing import Any, Dict, Optional, Sequence, Type, TypeVar, Union, Mapping, Tuple, NewType, cast, overload
try:
from functools import cached_property
except ImportError:
from propex import cached_property
try:
from typing import Self
except ImportError:
try:
from typing_extensions import Self
except ImportError:
Self = Any
#### Nested dictionary support #### Nested dictionary support
# Mypy doesn't support recursive types :( # Mypy doesn't support recursive types :(
DeepDict = Mapping[str, Union[Mapping[str, Any], str]] DeepDict = Mapping[str, Union['DeepDict', str]]
DeepDictPath = Sequence[Union[str, Sequence[str]]]
_NoDefault = NewType('_NoDefault', object) _NoDefault = NewType('_NoDefault', object)
_nodefault = _NoDefault(object()) _nodefault = _NoDefault(object())
_DefaultT = TypeVar('_DefaultT', DeepDict, str, None) _DefaultT = TypeVar('_DefaultT', DeepDict, str, None)
_DDCastT = TypeVar('_DDCastT', DeepDict, str) _DDCastT = TypeVar('_DDCastT', DeepDict, str, Dict[str, str])
@overload @overload
def dd_getpath(dct: DeepDict, path: Sequence[str], default: _NoDefault=_nodefault, *, t: None=None) -> Union[DeepDict, str]: ... def dd_getpath(dct: DeepDict, path: DeepDictPath, default: _NoDefault=_nodefault, *, t: None=None) -> Union[DeepDict, str]: ...
@overload @overload
def dd_getpath(dct: DeepDict, path: Sequence[str], default: _DefaultT, *, t: None=None) -> Union[DeepDict, str, _DefaultT]: ... def dd_getpath(dct: DeepDict, path: DeepDictPath, default: _DefaultT, *, t: None=None) -> Union[DeepDict, str, _DefaultT]: ...
@overload @overload
def dd_getpath(dct: DeepDict, path: Sequence[str], default: _NoDefault=_nodefault, *, t: Type[_DDCastT]) -> _DDCastT: ... def dd_getpath(dct: DeepDict, path: DeepDictPath, default: _NoDefault=_nodefault, *, t: Type[_DDCastT]) -> _DDCastT: ...
@overload @overload
def dd_getpath(dct: DeepDict, path: Sequence[str], default: _DefaultT, *, t: Type[_DDCastT]) -> Union[_DDCastT, _DefaultT]: ... def dd_getpath(dct: DeepDict, path: DeepDictPath, default: _DefaultT, *, t: Type[_DDCastT]) -> Union[_DDCastT, _DefaultT]: ...
def dd_getpath(dct: DeepDict, path: Sequence[str], default: Union[_DefaultT, _NoDefault]=_nodefault, *, t: Optional[Type[_DDCastT]]=None): def dd_getpath(dct: DeepDict, path: DeepDictPath, default: Union[_DefaultT, _NoDefault]=_nodefault, *, t: Optional[Type[_DDCastT]]=None) -> Any: # type: ignore[misc]
""" """
Retrieve a value from inside a nested dictionary. Retrieve a value from inside a nested dictionary.
@param dct The nested mapping @param dct The nested mapping
@ -40,7 +53,18 @@ def dd_getpath(dct: DeepDict, path: Sequence[str], default: Union[_DefaultT, _No
d: Any = dct d: Any = dct
try: try:
for pc in path: for pc in path:
if isinstance(pc, str):
d = d[pc] d = d[pc]
else:
for candidate in pc:
try:
d = d[candidate]
except KeyError:
continue
else:
break
else:
raise KeyError("Dictionary has none of key candidates %s" % pc)
# XXX: runtime type check # XXX: runtime type check
assert (t is None or isinstance(d, t)), f"Expected value at path {path} to be {t}, not {type(d)}" assert (t is None or isinstance(d, t)), f"Expected value at path {path} to be {t}, not {type(d)}"
return d return d
@ -157,7 +181,7 @@ class VdfParser:
escape = True escape = True
elif c == self.begin_char: elif c == self.begin_char:
finish() finish()
if len(tokens) / 2 == len(tokens) // 2 and (self.strict or self.factory == dict): if len(tokens) / 2 == len(tokens) // 2 and (self.strict or self.factory is dict):
raise ValueError("Sub-dictionary cannot be a key") raise ValueError("Sub-dictionary cannot be a key")
tokens.append(self._parse_map(fd, True)) tokens.append(self._parse_map(fd, True))
elif c == self.end_char: elif c == self.end_char:
@ -339,17 +363,15 @@ class AppInfoFile:
S_INT4 = struct.Struct("<I") S_INT4 = struct.Struct("<I")
@classmethod @classmethod
def open(cls, filename) -> "AppInfoFile": def open(cls, filename) -> Self:
return cls(open(filename, "br"), close=True) return cls(open(filename, "br"), close=True)
def __init__(self, file, bvdf_parser=None, close=True): def __init__(self, file, bvdf_parser=None, close=True):
self.file = file self.file = file
self.parser = bvdf_parser if bvdf_parser is not None else BinaryVdfParser() self.parser = bvdf_parser if bvdf_parser is not None else BinaryVdfParser()
self._close_file = close self._close_file = close
self._universe = None
self._apps = None
def _load_map(self, offset: int) -> DeepDict: def _load_offset(self, offset: int) -> DeepDict:
self.file.seek(offset, io.SEEK_SET) self.file.seek(offset, io.SEEK_SET)
return self.parser.parse(self.file) return self.parser.parse(self.file)
@ -371,20 +393,13 @@ class AppInfoFile:
def __getitem__(self, key): def __getitem__(self, key):
if self._data is None: if self._data is None:
self._data = self.appinfo._load_map(self.offset) self._data = self.appinfo._load_offset(self.offset)
return self._data[key] return self._data[key]
def __getattr__(self, attr):
if attr in dir(dict):
if self._data is None:
self._data = self.appinfo._load_map(self.offset)
return getattr(self._data, attr)
raise AttributeError(attr)
@property @property
def dict(self): def data(self):
if self._data is None: if self._data is None:
self._data = self.appinfo._load_map(self.offset) self._data = self.appinfo._load_offset(self.offset)
return self._data return self._data
def _read_exactly(self, s: int) -> bytes: def _read_exactly(self, s: int) -> bytes:
@ -396,7 +411,7 @@ class AppInfoFile:
def _read_int(self) -> int: def _read_int(self) -> int:
return self.S_INT4.unpack(self._read_exactly(self.S_INT4.size))[0] return self.S_INT4.unpack(self._read_exactly(self.S_INT4.size))[0]
def _load(self): def _load_index(self) -> Tuple[int, Dict[int, App]]:
magic = self._read_exactly(4) magic = self._read_exactly(4)
if magic == b"\x28\x44\x56\x07": if magic == b"\x28\x44\x56\x07":
header_struct = self.S_APP_HEADER_V2 header_struct = self.S_APP_HEADER_V2
@ -405,8 +420,8 @@ class AppInfoFile:
else: else:
raise ValueError("Unknown appinfo.vdf magic") raise ValueError("Unknown appinfo.vdf magic")
self._universe = self._read_int() universe = self._read_int()
self._apps = {} apps = {}
buffer = bytearray(header_struct.size) buffer = bytearray(header_struct.size)
@ -420,45 +435,34 @@ class AppInfoFile:
appid, size, *_ = struct appid, size, *_ = struct
if appid == 0: if appid == 0:
return # Done break # Done
elif read < header_struct.size: elif read < header_struct.size:
raise EOFError() raise EOFError()
self._apps[appid] = self.App(self, self.file.tell(), struct) apps[appid] = self.App(self, self.file.tell(), struct)
self.file.seek(size - (header_struct.size - 8), io.SEEK_CUR) self.file.seek(size - (header_struct.size - 8), io.SEEK_CUR)
@property return universe, apps
def universe(self):
if self._universe is None:
self._load()
return self._universe
def __getattr__(self, attr):
if attr in dir(dict):
if self._apps is None:
self._load()
return getattr(self._apps, attr)
raise AttributeError(attr)
def __getitem__(self, key): @cached_property
if self._apps is None: def universe(self) -> int:
self._load() universe, self.apps = self._load_index()
return self._apps[key] return universe
def __iter__(self): @cached_property
if self._apps is None: def apps(self) -> Dict[int, App]:
self._load() self.universe, apps = self._load_index()
return iter(self._apps) return apps
@property def __getitem__(self, key: int) -> App:
def dict(self): return self.apps[key]
if self._apps is None:
self._load() def __iter__(self) -> Iterator[App]:
return self._apps return iter(self.apps.values())
# Cleanup # Cleanup
def __enter__(self): def __enter__(self) -> Self:
return self return self
def __exit__(self, exc, tp, tb): def __exit__(self, exc, tp, tb):

Loading…
Cancel
Save