diff --git a/lib/python/propex.py b/lib/python/propex.py index 3993d2c..e6a68c4 100644 --- a/lib/python/propex.py +++ b/lib/python/propex.py @@ -1,9 +1,18 @@ # Custom property-like classes -from typing import Dict, Literal, MutableMapping, NewType, Optional, Sequence, Generic, TypeVar, Callable, Type, Any, Mapping, overload, Union +from typing import MutableMapping, NewType, Optional, Sequence, Generic, TypeVar, Callable, Type, Any, overload, Union + +try: + from typing import Self +except ImportError: + try: + from typing_extensions import Self + except ImportError: + Self = Any T = TypeVar('T') O = TypeVar('O') +Ow = TypeVar('Ow', covariant=True) class CustomProperty(property, Generic[T]): @@ -23,7 +32,7 @@ class CustomProperty(property, Generic[T]): self.property_name = name @overload # type: ignore - def __get__(self, obj: None, cls: Type[O]) -> 'CustomProperty[T]': ... + def __get__(self, obj: None, cls: Type[O]) -> Self: ... @overload def __get__(self, obj: O, cls: Type[O]) -> T: ... @@ -39,7 +48,7 @@ class CustomProperty(property, Generic[T]): raise AttributeError(f"Cannot delete property {self.property_name} of {obj!r}") -class CachedProperty(CustomProperty[T]): +class CachedProperty(CustomProperty[T], Generic[T, Ow]): """ A property that is only computed once per instance and then replaces itself with an ordinary attribute. Deleting the attribute resets the property. @@ -47,20 +56,23 @@ class CachedProperty(CustomProperty[T]): Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76 """ - def __init__(self, func: Callable[[O], T]): + def __init__(self, func: Callable[[Ow], T]): self.__doc__ = getattr(func, '__doc__') self.func = func self.property_name = func.__name__ - def __get__(self, obj: Optional[O], cls: Type[O]): # type: ignore + def __get__(self, obj: Optional[Ow], cls: Type[Ow]): # type: ignore[override] if obj is None: return self value = obj.__dict__[self.property_name] = self.func(obj) return value + def __delete__(self, obj: Ow): # type: ignore[override,misc] + del obj.__dict__[self.property_name] -class SettableCachedProperty(CachedProperty[T]): - def __set__(self, obj: O, value: T): + +class SettableCachedProperty(CachedProperty[T, O]): + def __set__(self, obj: O, value: T): #type: ignore[override] obj.__dict__[self.property_name] = value @@ -113,4 +125,10 @@ class DictPathProperty(DictPathRoProperty[T]): del self._get_parent(obj)[self.path[-1]] -__all__ = ['CustomProperty', 'CachedProperty', 'SettableCachedProperty', 'DictPathRoProperty', 'DictPathProperty'] +# functools.cached_property polyfill +try: + from functools import cached_property +except ImportError: + cached_property = CachedProperty # type: ignore[assignment,misc] + +__all__ = ['CustomProperty', 'CachedProperty', 'SettableCachedProperty', 'DictPathRoProperty', 'DictPathProperty', 'cached_property'] diff --git a/lib/python/steamsync.py b/lib/python/steamsync.py index eff848e..e53b1d7 100644 --- a/lib/python/steamsync.py +++ b/lib/python/steamsync.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 -import sys, os +import sys +import os import fnmatch import re import itertools @@ -13,11 +14,11 @@ import time from abc import ABCMeta, abstractmethod from copy import copy from getpass import getuser -from pathlib import PurePath, Path, PureWindowsPath -from typing import Iterable, Tuple, Dict, List, Union, Set, Callable, Any, Optional, TypeVar, Generic, Sequence, overload +from pathlib import PurePath, Path +from typing import Iterable, Tuple, Dict, List, Union, Set, Callable, Any, Optional, TypeVar, Generic, Sequence, overload, Literal, TypedDict from warnings import warn -from propex import CachedProperty, SettableCachedProperty +from propex import SettableCachedProperty, cached_property from steamutil import Steam, App @@ -122,9 +123,15 @@ class SyncPath(Cloneable): class _SyncSetCommon(metaclass=ABCMeta): - files_from_local: CachedProperty[Set[Path]] - files_from_target: CachedProperty[Set[Path]] - files_unmodified: CachedProperty[Set[Path]] + @property + @abstractmethod + def files_from_local(self) -> Set[Path]: ... + @property + @abstractmethod + def files_from_target(self) -> Set[Path]: ... + @property + @abstractmethod + def files_unmodified(self) -> Set[Path]: ... def show_confirm(self, skip=True) -> bool: # XXX: move to SyncOp? @@ -221,15 +228,15 @@ class SyncSet(_SyncSetCommon): if f not in dst_files or sst.st_mtime > dst_files[f][1].st_mtime } - @CachedProperty + @cached_property def files_from_local(self) -> Set[Path]: return self._sync_set(self.local, self.target) - @CachedProperty + @cached_property def files_from_target(self) -> Set[Path]: return self._sync_set(self.target, self.local) - @CachedProperty + @cached_property def files_unmodified(self) -> Set[Path]: return (self.local.keys() | self.target.keys()) - (self.files_from_local | self.files_from_target) @@ -267,15 +274,15 @@ class SyncMultiSet(list, _SyncSetCommon): return set() return functools.reduce(operator.or_, map(operator.attrgetter(attrname), self)) - @CachedProperty + @cached_property def files_from_local(self) -> Set[Path]: return self._union_set("files_from_local") - @CachedProperty + @cached_property def files_from_target(self) -> Set[Path]: return self._union_set("files_from_target") - @CachedProperty + @cached_property def files_unmodified(self) -> Set[Path]: return self._union_set("files_unmodified") @@ -321,12 +328,18 @@ class AbstractCommonPaths: is_windows: bool = True is_native_linux: bool = False + # abstract attribute @property @abstractmethod - def drive_c(self) -> P: pass + def drive_c(self) -> P: ... - # abstract attribute - my_documents: CachedProperty[P] + @property + @abstractmethod + def my_documents(self) -> P: ... + + @property + @abstractmethod + def appdata_roaming(self) -> P: ... class Windows(WindowsCommon[P]): is_native_windows: bool = True @@ -336,19 +349,27 @@ class AbstractCommonPaths: def drive_c(self) -> P: return self._path_factory("C:\\") - @CachedProperty + # Win32 API + CSIDL_PERSONAL = 0x0005 + CSIDL_APPDATA = 0x001a + + @staticmethod + def SHGetFolderPath(csidl: int) -> str: + import ctypes.wintypes + SHGFP_TYPE_CURRENT = 0 # Get current, not default value + buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) + shell32 = ctypes.windll.shell32 # type: ignore[attr-defined] # Windows only + shell32.SHGetFolderPathW(None, csidl, None, SHGFP_TYPE_CURRENT, buf) + return buf.value + + @cached_property def my_documents(self) -> P: """ Get the Windows "My Documents" folder """ - def get_my_documents(): - import ctypes.wintypes - CSIDL_PERSONAL = 5 # My Documents - SHGFP_TYPE_CURRENT = 0 # Get current, not default value - - buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) - ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf) + return self._path_factory(self.SHGetFolderPath(self.CSIDL_PERSONAL)) - return buf.value - return self._path_factory(get_my_documents()) + @cached_property + def appdata_roaming(self) -> P: + return self._path_factory(self.SHGetFolderPath(self.CSIDL_APPDATA)) class Wine(WindowsCommon[P]): is_native_windows: bool = False @@ -379,6 +400,10 @@ class AbstractCommonPaths: @staticmethod def _find_file_ci(path: Path, candidates: Optional[Sequence[str]]=None, exclude: Optional[Sequence[str]]=None) -> List[Path]: + """ Find directory entry with casefolding + Note: candidates must already be lowercase """ + if not path.exists(): + return [] entries: Dict[str, Path] = {p.name.lower(): p for p in path.iterdir() if p.is_dir()} results: List[Path] = [] if candidates is not None: @@ -392,7 +417,7 @@ class AbstractCommonPaths: results.extend((path for name, path in entries.items() if name not in exclude and path not in results)) return results - @CachedProperty + @cached_property def _wine_prefix_userprofile(self) -> Path: ## Try to find out the username in the prefix ## usually, this is the same as the system user, but @@ -409,14 +434,23 @@ class AbstractCommonPaths: def home(self) -> P: return self._path_factory(self._wine_prefix_userprofile) - @CachedProperty + @cached_property def my_documents(self) -> P: """ Get the Windows "My Documents" folder """ - ppath = self._wine_prefix_userprofile - # BUG: mypy#7781 overload staticmethod is broken when called on instance - candidates = self.__class__._find_file_ci(ppath, ['my documents', 'documents']) + candidates = self._find_file_ci(self._wine_prefix_userprofile, ['my documents', 'documents']) + if not candidates: + raise FileNotFoundError(f"Could not find 'My Documents' folder in profile at '{self._wine_prefix_userprofile}'") + return self._path_factory(candidates[0]) + + @cached_property + def appdata_roaming(self) -> P: + candidates = self._find_file_ci(self._wine_prefix_userprofile, ['appdata', 'application data']) if not candidates: - raise FileNotFoundError(f"Could not find 'My Documents' folder in profile at '{ppath}'") + raise FileNotFoundError(f"Could not find 'AppData/Roaming' folder in profile at '{self._wine_prefix_userprofile}'") + for candidate in candidates: + roaming = self._find_file_ci(candidate, ['roaming']) + if roaming: + return self._path_factory(roaming[0]) return self._path_factory(candidates[0]) class Linux(Common[P]): @@ -427,11 +461,11 @@ class AbstractCommonPaths: ## XDG # XXX: make it methods and search all locations? - @CachedProperty + @cached_property def xdg_config_dir(self) -> P: raise NotImplementedError() - @CachedProperty + @cached_property def xdg_data_dir(self) -> P: raise NotImplementedError() @@ -441,9 +475,12 @@ class CommonPaths: def _path_factory(self, p: PathOrStr) -> Path: return Path(p) - class LinuxPaths(AbstractCommonPaths.Linux[Path], Mixin): pass - class WindowsPaths(AbstractCommonPaths.Windows[Path], Mixin): pass - class WinePaths(AbstractCommonPaths.Wine[Path], Mixin): pass + class LinuxPaths(AbstractCommonPaths.Linux[Path], Mixin): + pass + class WindowsPaths(AbstractCommonPaths.Windows[Path], Mixin): + pass + class WinePaths(AbstractCommonPaths.Wine[Path], Mixin): + pass Paths = Union[LinuxPaths, WindowsPaths, WinePaths] NativePaths = Union[LinuxPaths, WindowsPaths] @@ -477,9 +514,12 @@ class CommonSyncPaths: def _path_factory(self, p: PathOrStr) -> SyncPath: return SyncPath(self.op, p) - class LinuxPaths(AbstractCommonPaths.Linux[SyncPath], Mixin): pass - class WindowsPaths(AbstractCommonPaths.Windows[SyncPath], Mixin): pass - class WinePaths(AbstractCommonPaths.Wine[SyncPath], Mixin): pass + class LinuxPaths(AbstractCommonPaths.Linux[SyncPath], Mixin): + pass + class WindowsPaths(AbstractCommonPaths.Windows[SyncPath], Mixin): + pass + class WinePaths(AbstractCommonPaths.Wine[SyncPath], Mixin): + pass Paths = Union[LinuxPaths, WindowsPaths, WinePaths] @@ -498,6 +538,7 @@ class CommonSyncPaths: ### ----------------------------------------------------------------- _AbstractSyncOp = TypeVar("_AbstractSyncOp", bound="AbstractSyncOp") + class AbstractSyncOp(ISyncOp): parent: ISyncContext name: str # Abstract @@ -506,7 +547,7 @@ class AbstractSyncOp(ISyncOp): self.parent = parent # Paths - @CachedProperty + @cached_property def paths(self) -> CommonSyncPaths.Paths: return CommonSyncPaths.create(self, None) @@ -515,7 +556,7 @@ class AbstractSyncOp(ISyncOp): # Properties @SettableCachedProperty - def slug(self): + def slug(self) -> str: """ Name of the destination folder """ return self.name @@ -546,7 +587,7 @@ class AbstractSyncOp(ISyncOp): % (self.name, self.__class__.__name__.replace("SyncOp", ""))) def report_error(self, msg: Iterable[str]): - print("\033[31m"+"\n".join(" " + l for l in msg)+"\033[0m") + print("\033[31m"+"\n".join(" " + ln for ln in msg)+"\033[0m") class SteamSyncOp(AbstractSyncOp): @@ -557,7 +598,7 @@ class SteamSyncOp(AbstractSyncOp): super().__init__(ssync) self.app = app - @CachedProperty + @cached_property def paths(self) -> CommonSyncPaths.Paths: return CommonSyncPaths.create(self, self.app.compat_prefix if self.app.is_proton_app else None) @@ -650,7 +691,7 @@ class WineSyncOp(GenericFoundSyncOp): super().__init__(parent, name, found) self._wine_prefix = prefix - @CachedProperty + @cached_property def paths(self) -> CommonSyncPaths.Paths: return CommonSyncPaths.create(self, self._wine_prefix) @@ -712,12 +753,12 @@ class NoSteamSync(ISyncContext): class SteamSync(NoSteamSync): steam: Steam - def __init__(self, target_path: Path, *, steam_path: Path = None): + def __init__(self, target_path: Path, *, steam_path: Optional[Path] = None): super().__init__(target_path) self.steam = Steam(steam_path) # Get Information - @CachedProperty + @cached_property def apps(self) -> List[App]: return list(self.steam.apps) diff --git a/lib/python/steamutil.py b/lib/python/steamutil.py index 5c98277..ce780cb 100644 --- a/lib/python/steamutil.py +++ b/lib/python/steamutil.py @@ -1,14 +1,17 @@ # Discover Steam install and games # (c) 2020 Taeyeon Mori CC-BY-SA -import sys, os -import re, fnmatch, datetime +import datetime +import fnmatch +import os +import re +import sys from pathlib import Path -from typing import List, Iterable, Dict, Literal, Mapping, Tuple, Callable, Optional, Union, Any, cast, overload +from typing import List, Iterable, Dict, Literal, Mapping, Tuple, Optional, Union, Any, cast, overload from vdfparser import VdfParser, DeepDict, AppInfoFile, LowerCaseNormalizingDict, dd_getpath -from propex import CachedProperty, SettableCachedProperty, DictPathProperty, DictPathRoProperty +from propex import SettableCachedProperty, DictPathProperty, DictPathRoProperty, cached_property _vdf = VdfParser() @@ -37,7 +40,7 @@ class AppInfo: installed = False # AppInfo - @CachedProperty + @cached_property def appinfo(self): # FIXME: properly close AppInfoFile but also deal with always-open appinfo return self.steam.appinfo[self.appid] @@ -60,7 +63,7 @@ class AppInfo: def is_native(self): return sys.platform in self.oslist - @CachedProperty + @cached_property def compat_tool(self) -> dict: mapping = self.steam.compat_tool_mapping appid = str(self.appid) @@ -115,13 +118,13 @@ class App(AppInfo): language = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "UserConfig", "language"), None) install_dir = DictPathRoProperty[Optional[str]]("manifest", ("AppState", "installdir"), None) - @CachedProperty + @cached_property def install_path(self) -> Path: return self.steamapps_path / "common" / self.install_dir # Workshop # TODO - @CachedProperty + @cached_property def workshop_path(self) -> Path: return self.steamapps_path / "workshop" / "content" / str(self.appid) @@ -137,15 +140,15 @@ class App(AppInfo): return uc["platform_override_source"] == "windows" and uc["platform_override_dest"] == "linux" return None - @CachedProperty + @cached_property def compat_path(self) -> Path: return self.steamapps_path / "compatdata" / str(self.appid) - @CachedProperty + @cached_property def compat_prefix(self) -> Path: return self.compat_path / "pfx" - @CachedProperty + @cached_property def compat_drive(self) -> Path: return self.compat_prefix / "drive_c" @@ -176,7 +179,7 @@ class LibraryFolder: return "" % self.path # Paths - @CachedProperty + @cached_property def steamapps_path(self) -> Path: steamapps = self.path / "steamapps" if not steamapps.exists(): @@ -284,7 +287,7 @@ class LoginUser: account_name = DictPathRoProperty[str]("info", ("AccountName",)) username = DictPathRoProperty[str]("info", ("PersonaName",)) - @CachedProperty + @cached_property def userdata_path(self) -> Path: return self.steam.get_userdata_path(self) @@ -292,7 +295,7 @@ class LoginUser: def localconfig_vdf(self) -> Path: return self.userdata_path / "config" / "localconfig.vdf" - @CachedProperty + @cached_property def localconfig(self) -> DeepDict: with open(self.localconfig_vdf, encoding="utf-8") as f: return _vdf.parse(f) @@ -367,7 +370,7 @@ class Steam: return self.root / "config" / "loginusers.vdf" # Users - @CachedProperty + @cached_property def most_recent_user(self) -> Optional[LoginUser]: try: # Apparently, Steam doesn't care about case in the config/*.vdf keys @@ -387,7 +390,7 @@ class Steam: return self.root / "userdata" / str(user_id) # Config - @CachedProperty + @cached_property def config(self) -> DeepDict: with open(self.config_vdf, encoding="utf-8") as f: return _vdf.parse(f) @@ -397,11 +400,11 @@ class Steam: compat_tool_mapping = DictPathProperty[Dict]("config_software_steam", ("CompatToolMapping",)) # AppInfo cache - @CachedProperty + @cached_property def appinfo_vdf(self): return self.root / "appcache" / "appinfo.vdf" - @property + @cached_property def appinfo(self) -> AppInfoFile: return AppInfoFile.open(self.appinfo_vdf) @@ -410,7 +413,7 @@ class Steam: with self.appinfo as info: return info[891390]["appinfo"] - @CachedProperty + @cached_property def compat_tools(self) -> Dict[str, Dict]: tools = {} # Find official proton installs @@ -440,7 +443,7 @@ class Steam: return tools # Game/App Library - @CachedProperty + @cached_property def library_folder_paths(self) -> List[Path]: with open(self.libraryfolders_vdf, encoding="utf-8") as f: data = _vdf_ci.parse(f) @@ -455,7 +458,7 @@ class Steam: raise ValueError("Unknown format of libraryfolders.vdf") return list(gen()) - @CachedProperty + @cached_property def library_folders(self) -> List[LibraryFolder]: return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths] diff --git a/lib/python/vdfparser.py b/lib/python/vdfparser.py index 7d5c64c..3963cdc 100644 --- a/lib/python/vdfparser.py +++ b/lib/python/vdfparser.py @@ -5,31 +5,44 @@ from __future__ import unicode_literals +import datetime import io import struct -import datetime - -from typing import Any, Dict, Optional, Sequence, Type, TypeVar, Union, Mapping, Tuple, NewType, cast, overload +from typing import (Any, Dict, Iterator, Mapping, NewType, Optional, Sequence, + Tuple, Type, TypeVar, Union, overload) + +try: + from functools import cached_property +except ImportError: + from propex import cached_property +try: + from typing import Self +except ImportError: + try: + from typing_extensions import Self + except ImportError: + Self = Any #### Nested dictionary support # Mypy doesn't support recursive types :( -DeepDict = Mapping[str, Union[Mapping[str, Any], str]] +DeepDict = Mapping[str, Union['DeepDict', str]] +DeepDictPath = Sequence[Union[str, Sequence[str]]] _NoDefault = NewType('_NoDefault', object) _nodefault = _NoDefault(object()) _DefaultT = TypeVar('_DefaultT', DeepDict, str, None) -_DDCastT = TypeVar('_DDCastT', DeepDict, str) +_DDCastT = TypeVar('_DDCastT', DeepDict, str, Dict[str, str]) @overload -def dd_getpath(dct: DeepDict, path: Sequence[str], default: _NoDefault=_nodefault, *, t: None=None) -> Union[DeepDict, str]: ... +def dd_getpath(dct: DeepDict, path: DeepDictPath, default: _NoDefault=_nodefault, *, t: None=None) -> Union[DeepDict, str]: ... @overload -def dd_getpath(dct: DeepDict, path: Sequence[str], default: _DefaultT, *, t: None=None) -> Union[DeepDict, str, _DefaultT]: ... +def dd_getpath(dct: DeepDict, path: DeepDictPath, default: _DefaultT, *, t: None=None) -> Union[DeepDict, str, _DefaultT]: ... @overload -def dd_getpath(dct: DeepDict, path: Sequence[str], default: _NoDefault=_nodefault, *, t: Type[_DDCastT]) -> _DDCastT: ... +def dd_getpath(dct: DeepDict, path: DeepDictPath, default: _NoDefault=_nodefault, *, t: Type[_DDCastT]) -> _DDCastT: ... @overload -def dd_getpath(dct: DeepDict, path: Sequence[str], default: _DefaultT, *, t: Type[_DDCastT]) -> Union[_DDCastT, _DefaultT]: ... +def dd_getpath(dct: DeepDict, path: DeepDictPath, default: _DefaultT, *, t: Type[_DDCastT]) -> Union[_DDCastT, _DefaultT]: ... -def dd_getpath(dct: DeepDict, path: Sequence[str], default: Union[_DefaultT, _NoDefault]=_nodefault, *, t: Optional[Type[_DDCastT]]=None): +def dd_getpath(dct: DeepDict, path: DeepDictPath, default: Union[_DefaultT, _NoDefault]=_nodefault, *, t: Optional[Type[_DDCastT]]=None) -> Any: # type: ignore[misc] """ Retrieve a value from inside a nested dictionary. @param dct The nested mapping @@ -40,7 +53,18 @@ def dd_getpath(dct: DeepDict, path: Sequence[str], default: Union[_DefaultT, _No d: Any = dct try: for pc in path: - d = d[pc] + if isinstance(pc, str): + d = d[pc] + else: + for candidate in pc: + try: + d = d[candidate] + except KeyError: + continue + else: + break + else: + raise KeyError("Dictionary has none of key candidates %s" % pc) # XXX: runtime type check assert (t is None or isinstance(d, t)), f"Expected value at path {path} to be {t}, not {type(d)}" return d @@ -157,7 +181,7 @@ class VdfParser: escape = True elif c == self.begin_char: finish() - if len(tokens) / 2 == len(tokens) // 2 and (self.strict or self.factory == dict): + if len(tokens) / 2 == len(tokens) // 2 and (self.strict or self.factory is dict): raise ValueError("Sub-dictionary cannot be a key") tokens.append(self._parse_map(fd, True)) elif c == self.end_char: @@ -339,17 +363,15 @@ class AppInfoFile: S_INT4 = struct.Struct(" "AppInfoFile": + def open(cls, filename) -> Self: return cls(open(filename, "br"), close=True) def __init__(self, file, bvdf_parser=None, close=True): self.file = file self.parser = bvdf_parser if bvdf_parser is not None else BinaryVdfParser() self._close_file = close - self._universe = None - self._apps = None - def _load_map(self, offset: int) -> DeepDict: + def _load_offset(self, offset: int) -> DeepDict: self.file.seek(offset, io.SEEK_SET) return self.parser.parse(self.file) @@ -371,20 +393,13 @@ class AppInfoFile: def __getitem__(self, key): if self._data is None: - self._data = self.appinfo._load_map(self.offset) + self._data = self.appinfo._load_offset(self.offset) return self._data[key] - def __getattr__(self, attr): - if attr in dir(dict): - if self._data is None: - self._data = self.appinfo._load_map(self.offset) - return getattr(self._data, attr) - raise AttributeError(attr) - @property - def dict(self): + def data(self): if self._data is None: - self._data = self.appinfo._load_map(self.offset) + self._data = self.appinfo._load_offset(self.offset) return self._data def _read_exactly(self, s: int) -> bytes: @@ -396,7 +411,7 @@ class AppInfoFile: def _read_int(self) -> int: return self.S_INT4.unpack(self._read_exactly(self.S_INT4.size))[0] - def _load(self): + def _load_index(self) -> Tuple[int, Dict[int, App]]: magic = self._read_exactly(4) if magic == b"\x28\x44\x56\x07": header_struct = self.S_APP_HEADER_V2 @@ -405,8 +420,8 @@ class AppInfoFile: else: raise ValueError("Unknown appinfo.vdf magic") - self._universe = self._read_int() - self._apps = {} + universe = self._read_int() + apps = {} buffer = bytearray(header_struct.size) @@ -420,45 +435,34 @@ class AppInfoFile: appid, size, *_ = struct if appid == 0: - return # Done + break # Done elif read < header_struct.size: raise EOFError() - self._apps[appid] = self.App(self, self.file.tell(), struct) + apps[appid] = self.App(self, self.file.tell(), struct) self.file.seek(size - (header_struct.size - 8), io.SEEK_CUR) - @property - def universe(self): - if self._universe is None: - self._load() - return self._universe + return universe, apps - def __getattr__(self, attr): - if attr in dir(dict): - if self._apps is None: - self._load() - return getattr(self._apps, attr) - raise AttributeError(attr) + @cached_property + def universe(self) -> int: + universe, self.apps = self._load_index() + return universe - def __getitem__(self, key): - if self._apps is None: - self._load() - return self._apps[key] + @cached_property + def apps(self) -> Dict[int, App]: + self.universe, apps = self._load_index() + return apps - def __iter__(self): - if self._apps is None: - self._load() - return iter(self._apps) + def __getitem__(self, key: int) -> App: + return self.apps[key] - @property - def dict(self): - if self._apps is None: - self._load() - return self._apps + def __iter__(self) -> Iterator[App]: + return iter(self.apps.values()) # Cleanup - def __enter__(self): + def __enter__(self) -> Self: return self def __exit__(self, exc, tp, tb):