diff --git a/bin/protontool b/bin/protontool new file mode 100755 index 0000000..24a379c --- /dev/null +++ b/bin/protontool @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 +# Protontool +# Make it easier to deal with proton games +# (c) 2020 Taeyeon Mori + + +import steamutil +import argparse +import sys, os +from pathlib import Path + + +def print_error(*msg): + print("\033[1;31mError\033[22m:", *msg, "\033[0m") + +def print_info(*msg): + print("\033[1;33mInfo\033[0m:", *msg) + +def print_warn(*msg): + print("\033[1;33mWarning\033[22m", *msg, "\033[0m") + +def format_size(bytes): + units = ["B", "kiB", "MiB", "GiB"] + unit = 0 + while bytes > 750: + bytes /= 1024 + unit += 1 + return "%3.2f %s" % (bytes, units[unit]) + + +def die(int, msg): + print_error(msg) + sys.exit(int) + + +class ProtonTool: + steam: steamutil.Steam + app: steamutil.App + + def __init__(self, args): + self.args = args + self.steam = steamutil.Steam(install_path=args.steam_path) + + if not isinstance(args.game, str): + self.app = args.game + if args.game.isdigit(): + self.app = self.steam.get_app(int(args.game), installed=False) + if self.app is None: + die(1, "Could not find App #%s." % args.game) + else: + apps = list(self.steam.find_apps(args.game)) + if len(apps) > 1: + print_error("Found more than one installed game matching /%s/, please be more specific" % args.game) + for app in apps: + print("\t", app) + sys.exit(2) + elif not apps: + die(1, "Could not find any App matching /%s/. Make sure it is installed." % args.game) + self.app = apps[0] + + print_info("Found game: %s" % self.app.name) + + @steamutil.CachedProperty + def user(self) -> steamutil.LoginUser: + if "userid" in self.args: + die(255, "userid not implemented") + else: + user = self.steam.most_recent_user + if not user: + die(10, "Could not detect Steam user. Have you logged in on this Computer before?") + print_info("Using steam profile: %s (%s)" % (user.username, user.account_name)) + return user + + class ListAction(argparse.Action): + def __init__(self, option_strings, dest, help=None): + super().__init__(option_strings, dest, nargs=0, help=help) + + def __call__(self, parser, ns, values, opt): + steam = steamutil.Steam(install_path=ns.steam_path if "steam_path" in ns else None) + for name, info in steam.compat_tools.items(): + proton, path = ProtonTool._format_tool_info(steam, {"name": name}, info) + print(proton) + print(" ", path) + parser.exit(0) + + + @classmethod + def parse_args(cls, args, prog=None): + parser = argparse.ArgumentParser(prog=prog) + parser.add_argument("game", help="Can be either the appid or a name pattern to look for") + parser.add_argument("--steam-path", help="Steam install path", default=None) + parser.add_argument("-l", "--list-tools", action=cls.ListAction, help="List all known compatibility tools") + + commands = parser.add_subparsers() + + info = commands.add_parser("info", description="Get information about a game") + info.set_defaults(cmd=cls.cmd_info) + + exec = commands.add_parser("exec", description="Run a command (e.g. winetricks) with the correct environment") + exec.set_defaults(cmd=cls.cmd_exec) + exec.add_argument("cmdline", nargs="+", help="Command and arguments to run") + + run = commands.add_parser("run", description="Run game") + run.set_defaults(cmd=cls.cmd_run) + run.add_argument("--waitforexit", action="store_true", help="use waitforexitandrun action") + run.add_argument("--tool", help="Override compatibility tool selection") + run.add_argument("--config", help="Override compatibility tool config") + + appinfo = commands.add_parser("appinfo", description="Dump the Steam product info") + appinfo.set_defaults(cmd=cls.cmd_appinfo) + appinfo.add_argument("--json", action="store_true", help="Output JSON") + + proton = commands.add_parser("proton", description="Run configured proton with custom arguments") + proton.set_defaults(cmd=cls.cmd_proton, cmd_proton="proton") + proton.add_argument("args", nargs="+", help="Proton arguments") + proton.add_argument("--tool", help="Override compatibility tool selection") + proton.add_argument("--config", help="Override compatibility tool config") + + wine = commands.add_parser("wine", description="Run appropriate wine binary") + wine.set_defaults(cmd=cls.cmd_proton, cmd_proton="wine") + wine.add_argument("args", nargs="+", help="Wine arguments") + wine.add_argument("--tool", help="Override compatibility tool selection") + wine.add_argument("--config", help="Override compatibility tool config") + + #enter = commands.add_parser("wine", description="Run the correct wine in the correct prefix") + #enter.add_argument("command", nargs="+") + + #config = commands.add_parser("config") + + return parser.parse_args(args) + + @steamutil.CachedProperty + def appinfo(self): + return self.steam.appinfo + + @property + def compat_tools(self) -> dict: + self.steam.steamplay_manifest = self.appinfo[891390]["appinfo"] + return self.steam.compat_tools + + @property + def compat_tool(self) -> dict: + return self.app.compat_tool + + @classmethod + def _format_tool_info(cls, steam, tool, toolinfo=None) -> (str, str): + if toolinfo is None: + toolinfo = steam.compat_tools.get(tool["name"], None) + + proton = [] + + if not toolinfo: + proton.append("\033[31mNot found:\033[0m") + + if toolinfo and "display_name" in toolinfo and toolinfo["display_name"] != tool["name"]: + proton.append("\033[1m%s\033[0m (%s)" % (toolinfo["display_name"], tool["name"])) + else: + proton.append("'\033[1m%s\033[0m'" % tool["name"]) + + if toolinfo and "appid" in toolinfo: + proton.append("#%s" % toolinfo["appid"]) + if "source" in tool: + if tool["source"] == "user": + proton.append("[per-game override]") + elif tool["source"] == "valve": + proton.append("[official]") + elif tool["source"] == "default": + proton.append("[user default]") + if "config" in tool and tool["config"]: + proton.append("{%s}" % tool["config"]) + if toolinfo and "install_path" in toolinfo: + proton_path = toolinfo["install_path"] + if not os.path.exists(toolinfo["install_path"]): + proton_path += " <>" + else: + proton_path = "<>" + proton = " ".join(proton) + + return proton, proton_path + + def cmd_info(self): + uc = self.user.get_app_config(self.app) + + if self.app.installed: + installed = "[%s] %s" % (self.app.language, format_size(self.app.declared_install_size)) + + if self.app.is_proton_app: + # Find out version + tool = self.compat_tool + proton, proton_path = self._format_tool_info(self.steam, tool) + else: + proton = "No" + proton_path = None + else: + installed = "No" + + print("\033[1;35mName \033[0;1m:", self.app.name) + print("\033[1;35mAppID \033[0m:", self.app.appid) + print("\033[1;35mInstalled \033[0m:", installed) + if self.app.installed: + print(" @", self.app.install_path) + print("\033[1;35mLaunchOpt.\033[0m:", uc.launch_options) + print("\033[1;35mProton \033[0m:", proton) + if proton_path: + print(" @", proton_path) + if self.app.is_proton_app: + print("\033[1;35mProtonDrv.\033[0m:", self.app.compat_drive) + + def setup_proton_env(self, tool, wine=False): + info = self.compat_tools[tool["name"]] + if not info: + die(20, "Compatibility tool '%s' could not be found. Is it installed?" % tool["name"]) + os.environ["STEAM_COMPAT_CLIENT_INSTALL_PATH"] = str(self.steam.root) + os.environ["STEAM_COMPAT_DATA_PATH"] = str(self.app.compat_path) + os.environ["STEAM_COMPAT_CONFIG"] = tool.get("config", "") + os.environ["SteamAppId"] = str(self.app.appid) + os.environ["SteamGameId"] = str(self.app.appid) + os.environ["WINEPREFIX"] = str(self.app.compat_path / "pfx") + proton_dir = info["install_path"] + os.environ["WINEROOT"] = str(proton_dir / "dist") + if wine: + # FIXME: correctly detect WINEESYNC + os.environ["WINEESYNC"] = "1" + os.environ["LD_LIBRARY_PATH"] = ":".join(x for x in [ + str(proton_dir / "dist/lib64"), + str(proton_dir / "dist/lib"), + os.environ.get("LD_LIBRARY_PATH", None)] if x) + os.environ["PATH"] = ":".join((str(Path(info["install_path"]) / "dist" / "bin"), os.environ["PATH"])) + + def cmd_exec(self): + tool = self.compat_tool + + self.setup_proton_env(tool, wine=True) + + return os.execvp(self.args.cmdline[0], self.args.cmdline) + + def cmd_run(self): + if not self.app.installed: + print_error("App is not installed.") + return 50 + + if self.args.tool: + tool = {"name": self.args.tool} + elif self.app.is_proton_app: + tool = self.compat_tool + else: + tool = None + + if tool is not None: + if self.args.config: + tool["config"] = self.args.config + + toolinfo = self.compat_tools[tool["name"]] + + info_tool, info_path = self._format_tool_info(self.steam, tool, toolinfo) + + print_info("Using tool", info_tool) + + if self.args.waitforexit: + verb = "commandline_waitforexitandrun" + else: + verb = "commandline" + + with open(toolinfo["install_path"] / "toolmanifest.vdf") as f: + mfst = steamutil._vdf.parse(f) + cmdline = mfst["manifest"][verb] + + path, *args = cmdline.split() + path = str(toolinfo["install_path"]) + path + + self.setup_proton_env(tool) + + for cfg in self.app.launch_configs: + # TODO: Handle multiple + if cfg["type"] == "default" and toolinfo.get("from_oslist", "windows") in cfg["config"]["oslist"]: + argv = [path, *args, str(self.app.install_path / cfg["executable"])] + print_info("Executing", " ".join(argv)) + return os.execv(path, argv) + + else: + # TODO windows support? + for cfg in self.app.launch_configs: + if cfg["type"] == "default" and "linux" in cfg["config"]["oslist"]: + return os.execv(self.app.install_path / cfg["executable"], [cfg["executable"]]) + + def cmd_proton(self): + # proton and wine subcommands: + # Run proton respective wine with arguments + if not self.app.installed or not self.app.is_proton_app: + print_error("App not installed or not a Proton app") + return 51 + + if self.args.tool: + tool = {"name": self.args.tool, "source": "args"} + else: + tool = self.compat_tool + + if self.args.config: + tool["config"] = self.args.config + + try: + toolinfo = self.compat_tools[tool["name"]] + except KeyError: + print_error("Tool not found: '%s'" % tool["name"]) + + info_tool, _ = self._format_tool_info(self.steam, tool, toolinfo) + + print_info("Using tool", info_tool) + + if self.args.cmd_proton == "proton": + path = str(toolinfo["install_path"] / "proton") + argv = ["proton"] + self.args.args + elif self.args.cmd_proton == "wine": + path = str(toolinfo["install_path"] / "dist/bin/wine") + argv = [path] + self.args.args + else: + print_error("Unknown cmd_proton verb:", self.args.cmd_proton) + return 111 + + if not os.path.exists(path): + print_error("No such file:", path) + return 53 + + self.setup_proton_env(tool, wine=self.args.cmd_proton=="wine") + + print_info("Running", path, *argv[1:]) + + return os.execv(path, argv) + + def cmd_appinfo(self): + info = self.appinfo.get(self.app.appid, {}) + if self.args.json: + import json + json.dump(info.dict, sys.stdout) + else: + print("AppId:", info.id) + print("AppState:", info.state) + print("Last Update:", info.last_update) + + print() + + self._pretty_dump(info.dict) + + def _pretty_dump(self, d: dict, level=0): + for key, value in d.items(): + print(" " * level, "\033[1m", key, "\033[0m", end="", sep="") + if isinstance(value, dict): + print(":") + self._pretty_dump(value, level+1) + else: + print(" = ", value, sep="") + + @classmethod + def main(cls, argv: [str]) -> int: + args = cls.parse_args(argv[1:], argv[0]) + self = cls(args) + if "cmd" not in args or not args.cmd: + print_error("No command given. see %s --help" % argv[0]) + return 255 + return args.cmd(self) + + +if __name__ == "__main__": + sys.exit(ProtonTool.main(sys.argv)) diff --git a/lib/python/steamutil.py b/lib/python/steamutil.py index a4b8409..10626fc 100644 --- a/lib/python/steamutil.py +++ b/lib/python/steamutil.py @@ -7,7 +7,7 @@ import re, fnmatch, datetime from pathlib import Path from typing import List, Iterable, Dict, Tuple, Callable, Optional, Union -from vdfparser import VdfParser, DeepDict +from vdfparser import VdfParser, DeepDict, AppInfoFile class CachedProperty: @@ -30,13 +30,15 @@ class CachedProperty: class DictPathRoProperty: - __slots__ = "property", "path", "default" + __slots__ = "property", "path", "default", "type" _nodefault = object() + _id = lambda x: x - def __init__(self, property: str, path: Tuple[str], default=_nodefault): + def __init__(self, property: str, path: Tuple[str], default=_nodefault, type=_id): self.property = property self.path = path self.default = default + self.type = type def __get__(self, obj, cls): if obj is None: @@ -50,7 +52,7 @@ class DictPathRoProperty: return self.default raise else: - return d + return self.type(d) class DictPathProperty(DictPathRoProperty): @@ -83,7 +85,65 @@ class MalformedManifestError(Exception): return self.args[1] -class App: +class AppInfo: + steam: 'Steam' + appid: int + + def __init__(self, steam, appid, *, appinfo_data=None): + self.steam = steam + self.appid = appid + if appinfo_data is not None: + self.__dict__["appinfo"] = appinfo_data + + installed = False + + # AppInfo + @CachedProperty + def appinfo(self): + # FIXME: properly close AppInfoFile but also deal with always-open appinfo + return self.steam.appinfo[self.appid] + + @property + def launch_configs(self): + return self.appinfo["appinfo"]["config"]["launch"].values() + + name = DictPathRoProperty("appinfo", ("appinfo", "common", "name")) + oslist = DictPathRoProperty("appinfo", ("appinfo", "common", "oslist"), type=lambda s: s.split(",")) + install_dir = DictPathRoProperty("appinfo", ("appinfo", "config", "installdir")) + languages = DictPathRoProperty("appinfo", ("appinfo", "common", "supported_languages")) + gameid = DictPathRoProperty("appinfo", ("appinfo", "common", "gameid"), type=int) + + # Misc. + def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path: + return self.steam.get_userdata_path(user_id) / str(self.appid) + + @property + def is_native(self): + return sys.platform in self.oslist + + @CachedProperty + def compat_tool(self) -> dict: + mapping = self.steam.compat_tool_mapping + appid = str(self.appid) + # User override + if appid in mapping and mapping[appid]["name"]: + tool = dict(mapping[appid]) + tool["source"] = "user" + return tool + # Steam play manifest + manifest = self.steam.steamplay_manifest["extended"]["app_mappings"] + if appid in manifest: + tool = dict(manifest[appid]) + tool["name"] = tool["tool"] + tool["source"] = "valve" + return tool + # User default + tool = dict(mapping["0"]) + tool["source"] = "default" + return tool + + +class App(AppInfo): steam: 'Steam' library_folder: 'LibraryFolder' steamapps_path: Path @@ -91,7 +151,6 @@ class App: manifest: DeepDict def __init__(self, libfolder, manifest_path: Path, *, manifest_data=None): - self.steam = libfolder.steam self.library_folder = libfolder self.steamapps_path = libfolder.steamapps_path @@ -105,14 +164,14 @@ class App: if "AppState" not in self.manifest: raise MalformedManifestError("App manifest doesn't have AppState key", self.manifest_path) + super().__init__(libfolder.steam, int(self.manifest["AppState"]["appid"])) + + installed = True + def __repr__(self): return "" % (self.appid, self.name, self.install_path) # Basic info - @property - def appid(self) -> int: - return int(self.manifest["AppState"]["appid"]) - name = DictPathRoProperty("manifest", ("AppState", "name")) language = DictPathRoProperty("manifest", ("AppState", "UserConfig", "language"), None) install_dir = DictPathRoProperty("manifest", ("AppState", "installdir")) @@ -121,9 +180,6 @@ class App: def install_path(self) -> Path: return self.steamapps_path / "common" / self.install_dir - def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path: - return self.steam.get_userdata_path(user_id) / str(self.appid) - # Workshop # TODO @CachedProperty @@ -131,6 +187,18 @@ class App: return self.steamapps_path / "workshop" / "content" / str(self.appid) # Steam Play info + @property + def is_steam_play(self): + uc = self.manifest["AppState"].get("UserConfig") + if uc and "platform_override_source" in uc: + return uc["platform_override_source"] + + @property + def is_proton_app(self): + uc = self.manifest["AppState"].get("UserConfig") + if uc and "platform_override_source" in uc: + return uc["platform_override_source"] == "windows" and uc["platform_override_dest"] == "linux" + @CachedProperty def compat_path(self) -> Path: return self.steamapps_path / "compatdata" / str(self.appid) @@ -140,7 +208,7 @@ class App: return self.compat_path / "pfx" / "drive_c" # Install size - declared_install_size = DictPathRoProperty("manifest", ("AppState", "SizeOnDisk"), 0) + declared_install_size = DictPathRoProperty("manifest", ("AppState", "SizeOnDisk"), 0, type=int) def compute_install_size(self) -> int: def sum_size(p: Path): @@ -248,7 +316,7 @@ class UserAppConfig: def playtime_two_weeks(self) -> datetime.time: t = int(self._data.get("Playtime2wks", "0")) return datetime.time(t // 60, t % 60) - + launch_options = DictPathProperty("_data", ("LaunchOptions",), None) @@ -371,6 +439,59 @@ class Steam: if isinstance(user_id, LoginUser): user_id = user_id.account_id return self.root / "userdata" / str(user_id) + + # Config + @CachedProperty + def config(self) -> DeepDict: + with open(self.config_vdf) as f: + return _vdf.parse(f) + + config_install_store = DictPathProperty("config", ("InstallConfigStore",)) + config_software_steam = DictPathProperty("config", ("InstallConfigStore", "Software", "Valve", "Steam")) + compat_tool_mapping = DictPathProperty("config_software_steam", ("CompatToolMapping",)) + + # AppInfo cache + @CachedProperty + def appinfo_vdf(self): + return self.root / "appcache" / "appinfo.vdf" + + @property + def appinfo(self) -> AppInfoFile: + return AppInfoFile.open(self.appinfo_vdf) + + @CachedProperty + def steamplay_manifest(self) -> DeepDict: + with self.appinfo as info: + return info[891390]["appinfo"] + + @CachedProperty + def compat_tools(self) -> {str:{}}: + tools = {} + # Find official proton installs + valve = self.steamplay_manifest["extended"]["compat_tools"] + for name, t in valve.items(): + app = self.get_app(t["appid"]) + if app: + tool = dict(t) + tool["install_path"] = app.install_path + tools[name] = tool + # Find custom compat tools + manifests = [] + for p in (self.root / "compatibilitytools.d").iterdir(): + if p.suffix == ".vdf": + manifests.append(p) + elif p.is_dir(): + c = p / "compatibilitytool.vdf" + if c.exists(): + manifests.append(c) + for mfst_path in manifests: + with open(mfst_path) as f: + mfst = _vdf.parse(f) + for name, t in mfst["compatibilitytools"]["compat_tools"].items(): + # TODO warn duplicate name + t["install_path"] = mfst_path.parent / t["install_path"] + tools[name] = t + return tools # Game/App Library @CachedProperty @@ -387,11 +508,15 @@ class Steam: for lf in self.library_folders: #pylint:disable=not-an-iterable yield from lf.apps - def get_app(self, id: int) -> Optional[App]: + def get_app(self, id: int, installed=True) -> Optional[App]: for lf in self.library_folders: #pylint:disable=not-an-iterable app = lf.get_app(id) if app is not None: return app + if not installed: + for appid, appinfo in self.appinfo.items(): + if appid == id: + return AppInfo(self, appid, appinfo_data=appinfo) def find_apps(self, pattern: str) -> Iterable[App]: for lf in self.library_folders: #pylint:disable=not-an-iterable diff --git a/lib/python/vdfparser.py b/lib/python/vdfparser.py index 8624a6d..2b4cb0f 100644 --- a/lib/python/vdfparser.py +++ b/lib/python/vdfparser.py @@ -6,10 +6,13 @@ from __future__ import unicode_literals import io +import struct +import collections +import datetime -from typing import Dict, Union +from typing import Dict, Union, Mapping, Tuple -DeepDict = Dict[str, Union[str, "DeepDict"]] +DeepDict = Mapping[str, Union[str, "DeepDict"]] class VdfParser: @@ -176,3 +179,229 @@ class VdfParser: if self.encoding: raise NotImplementedError("Writing in binary mode is not implemented yet.") # TODO (maybe) self._write_map(fd, dictionary, 0 if pretty else None) + + +class BinaryVdfParser: + # Type codes + T_SKEY = b'\x00' # Subkey + T_CSTR = b'\x01' # 0-delimited string + T_INT4 = b'\x02' # 32-bit int + T_FLT4 = b'\x03' # 32-bit float + T_PNTR = b'\x04' # 32-bit pointer + T_WSTR = b'\x05' # 0-delimited wide string + T_COLR = b'\x06' # 32-bit color + T_UNT8 = b'\x07' # 64-bit unsigned int + T_END = b'\x08' # End of subkey + T_INT8 = b'\x0A' # 64-bit signed int + T_END2 = b'\x0B' # Alternative end of subkey tag + + # Unpack binary types + S_INT4 = struct.Struct(" bytes: + pieces = [] + buf = bytearray(64) + end = -1 + + while end == -1: + read = fd.readinto(buf) + + if not read: + raise EOFError() + + end = buf.find(delim, 0, read) + pieces.append(bytes(buf[:read if end < 0 else end])) + + fd.seek(end - read + len(delim), io.SEEK_CUR) + + return b"".join(pieces) + + @staticmethod + def _read_struct(fd: io.BufferedIOBase, s: struct.Struct): + return s.unpack(fd.read(s.size)) + + def _read_cstring(self, fd: io.BufferedIOBase) -> str: + return self._read_until(fd, b'\0').decode("utf-8", "replace") + + def _read_wstring(self, fd: io.BufferedIOBase) -> str: + return self._read_until(fd, b'\0\0').decode("utf-16") + + def _read_map(self, fd: io.BufferedIOBase) -> DeepDict: + map = self.factory() + + while True: + t = fd.read(1) + + if not len(t): + raise EOFError() + + if t in (self.T_END, self.T_END2): + return map + + key, value = self._read_item(fd, t) + map[key] = value + + def _read_item(self, fd: io.BufferedIOBase, t: int) -> (str, DeepDict): + key = self._read_cstring(fd) + + if t == self.T_SKEY: + return key, self._read_map(fd) + elif t == self.T_CSTR: + return key, self._read_cstring(fd) + elif t == self.T_WSTR: + return key, self._read_wstring(fd) + elif t in (self.T_INT4, self.T_PNTR, self.T_COLR): + return key, self._read_struct(fd, self.S_INT4)[0] + elif t == self.T_UNT8: + return key, self._read_struct(fd, self.S_UNT8)[0] + elif t == self.T_INT8: + return key, self._read_struct(fd, self.S_INT8)[0] + elif t == self.T_FLT4: + return key, self._read_struct(fd, self.S_FLT4)[0] + else: + raise ValueError("Unknown data type", fd.tell(), t) + + def parse(self, fd: io.BufferedIOBase) -> DeepDict: + return self._read_map(fd) + + def parse_bytes(self, data: bytes) -> DeepDict: + with io.BytesIO(data) as fd: + return self.parse(fd) + + +class AppInfoFile: + S_APP_HEADER = struct.Struct(" "AppInfoFile": + return cls(open(filename, "br"), close=True) + + def __init__(self, file, bvdf_parser=None, close=True): + self.file = file + self.parser = bvdf_parser if bvdf_parser is not None else BinaryVdfParser() + self._close_file = close + self._universe = None + self._apps = None + + def _load_map(self, offset: int) -> DeepDict: + self.file.seek(offset, io.SEEK_SET) + return self.parser.parse(self.file) + + class App: + __slots__ = "appinfo", "offset", "id", "size", "state", "last_update", "token", "hash", "changeset", "_data" + + def __init__(self, appinfo, offset, struct): + self.id = struct[0] + self.size = struct[1] + self.state = struct[2] + self.last_update = datetime.datetime.fromtimestamp(struct[3]) + self.token = struct[4] + self.hash = struct[5] + self.changeset = struct[6] + self.appinfo = appinfo + self.offset = offset + self._data = None + + def __getitem__(self, key): + if self._data is None: + self._data = self.appinfo._load_map(self.offset) + return self._data[key] + + def __getattr__(self, attr): + if attr in dir(dict): + if self._data is None: + self._data = self.appinfo._load_map(self.offset) + return getattr(self._data, attr) + raise AttributeError(attr) + + @property + def dict(self): + if self._data is None: + self._data = self.appinfo._load_map(self.offset) + return self._data + + def _read_exactly(self, s: int) -> bytes: + cs = self.file.read(s) + if len(cs) < s: + raise EOFError() + return cs + + def _read_int(self) -> int: + return self.S_INT4.unpack(self._read_exactly(self.S_INT4.size))[0] + + def _load(self): + magic = self._read_exactly(4) + if magic != b"\x27\x44\x56\x07": + raise ValueError("Wrong appinfo.vdf magic") + + self._universe = self._read_int() + self._apps = {} + + buffer = bytearray(self.S_APP_HEADER.size) + + while True: + read = self.file.readinto(buffer) + + if read < 4: + raise EOFError() + + struct = self.S_APP_HEADER.unpack(buffer) + appid, size, *_ = struct + + if appid == 0: + return # Done + elif read < self.S_APP_HEADER.size: + raise EOFError() + + self._apps[appid] = self.App(self, self.file.tell(), struct) + + self.file.seek(size - (self.S_APP_HEADER.size - 8), io.SEEK_CUR) + + @property + def universe(self): + if self._universe is None: + self._load() + return self._universe + + def __getattr__(self, attr): + if attr in dir(dict): + if self._apps is None: + self._load() + return getattr(self._apps, attr) + raise AttributeError(attr) + + def __getitem__(self, key): + if self._apps is None: + self._load() + return self._apps[key] + + def __iter__(self): + if self._apps is None: + self._load() + return iter(self._apps) + + @property + def dict(self): + if self._apps is None: + self._load() + return self._apps + + # Cleanup + def __enter__(self): + return self + + def __exit__(self, exc, tp, tb): + self.close() + + def close(self): + if self._close_file: + self.file.close() +