python/steamutil,protontool: Add protontool.

A tool to inspect local steam metadata (like app manifests)
Extends steamutil python library significantly
master
Taeyeon Mori 4 years ago
parent 908fe47ff5
commit 0df69011e4
  1. 364
      bin/protontool
  2. 155
      lib/python/steamutil.py
  3. 233
      lib/python/vdfparser.py

@ -0,0 +1,364 @@
#!/usr/bin/env python3
# Protontool
# Make it easier to deal with proton games
# (c) 2020 Taeyeon Mori
import steamutil
import argparse
import sys, os
from pathlib import Path
def print_error(*msg):
print("\033[1;31mError\033[22m:", *msg, "\033[0m")
def print_info(*msg):
print("\033[1;33mInfo\033[0m:", *msg)
def print_warn(*msg):
print("\033[1;33mWarning\033[22m", *msg, "\033[0m")
def format_size(bytes):
units = ["B", "kiB", "MiB", "GiB"]
unit = 0
while bytes > 750:
bytes /= 1024
unit += 1
return "%3.2f %s" % (bytes, units[unit])
def die(int, msg):
print_error(msg)
sys.exit(int)
class ProtonTool:
steam: steamutil.Steam
app: steamutil.App
def __init__(self, args):
self.args = args
self.steam = steamutil.Steam(install_path=args.steam_path)
if not isinstance(args.game, str):
self.app = args.game
if args.game.isdigit():
self.app = self.steam.get_app(int(args.game), installed=False)
if self.app is None:
die(1, "Could not find App #%s." % args.game)
else:
apps = list(self.steam.find_apps(args.game))
if len(apps) > 1:
print_error("Found more than one installed game matching /%s/, please be more specific" % args.game)
for app in apps:
print("\t", app)
sys.exit(2)
elif not apps:
die(1, "Could not find any App matching /%s/. Make sure it is installed." % args.game)
self.app = apps[0]
print_info("Found game: %s" % self.app.name)
@steamutil.CachedProperty
def user(self) -> steamutil.LoginUser:
if "userid" in self.args:
die(255, "userid not implemented")
else:
user = self.steam.most_recent_user
if not user:
die(10, "Could not detect Steam user. Have you logged in on this Computer before?")
print_info("Using steam profile: %s (%s)" % (user.username, user.account_name))
return user
class ListAction(argparse.Action):
def __init__(self, option_strings, dest, help=None):
super().__init__(option_strings, dest, nargs=0, help=help)
def __call__(self, parser, ns, values, opt):
steam = steamutil.Steam(install_path=ns.steam_path if "steam_path" in ns else None)
for name, info in steam.compat_tools.items():
proton, path = ProtonTool._format_tool_info(steam, {"name": name}, info)
print(proton)
print(" ", path)
parser.exit(0)
@classmethod
def parse_args(cls, args, prog=None):
parser = argparse.ArgumentParser(prog=prog)
parser.add_argument("game", help="Can be either the appid or a name pattern to look for")
parser.add_argument("--steam-path", help="Steam install path", default=None)
parser.add_argument("-l", "--list-tools", action=cls.ListAction, help="List all known compatibility tools")
commands = parser.add_subparsers()
info = commands.add_parser("info", description="Get information about a game")
info.set_defaults(cmd=cls.cmd_info)
exec = commands.add_parser("exec", description="Run a command (e.g. winetricks) with the correct environment")
exec.set_defaults(cmd=cls.cmd_exec)
exec.add_argument("cmdline", nargs="+", help="Command and arguments to run")
run = commands.add_parser("run", description="Run game")
run.set_defaults(cmd=cls.cmd_run)
run.add_argument("--waitforexit", action="store_true", help="use waitforexitandrun action")
run.add_argument("--tool", help="Override compatibility tool selection")
run.add_argument("--config", help="Override compatibility tool config")
appinfo = commands.add_parser("appinfo", description="Dump the Steam product info")
appinfo.set_defaults(cmd=cls.cmd_appinfo)
appinfo.add_argument("--json", action="store_true", help="Output JSON")
proton = commands.add_parser("proton", description="Run configured proton with custom arguments")
proton.set_defaults(cmd=cls.cmd_proton, cmd_proton="proton")
proton.add_argument("args", nargs="+", help="Proton arguments")
proton.add_argument("--tool", help="Override compatibility tool selection")
proton.add_argument("--config", help="Override compatibility tool config")
wine = commands.add_parser("wine", description="Run appropriate wine binary")
wine.set_defaults(cmd=cls.cmd_proton, cmd_proton="wine")
wine.add_argument("args", nargs="+", help="Wine arguments")
wine.add_argument("--tool", help="Override compatibility tool selection")
wine.add_argument("--config", help="Override compatibility tool config")
#enter = commands.add_parser("wine", description="Run the correct wine in the correct prefix")
#enter.add_argument("command", nargs="+")
#config = commands.add_parser("config")
return parser.parse_args(args)
@steamutil.CachedProperty
def appinfo(self):
return self.steam.appinfo
@property
def compat_tools(self) -> dict:
self.steam.steamplay_manifest = self.appinfo[891390]["appinfo"]
return self.steam.compat_tools
@property
def compat_tool(self) -> dict:
return self.app.compat_tool
@classmethod
def _format_tool_info(cls, steam, tool, toolinfo=None) -> (str, str):
if toolinfo is None:
toolinfo = steam.compat_tools.get(tool["name"], None)
proton = []
if not toolinfo:
proton.append("\033[31mNot found:\033[0m")
if toolinfo and "display_name" in toolinfo and toolinfo["display_name"] != tool["name"]:
proton.append("\033[1m%s\033[0m (%s)" % (toolinfo["display_name"], tool["name"]))
else:
proton.append("'\033[1m%s\033[0m'" % tool["name"])
if toolinfo and "appid" in toolinfo:
proton.append("#%s" % toolinfo["appid"])
if "source" in tool:
if tool["source"] == "user":
proton.append("[per-game override]")
elif tool["source"] == "valve":
proton.append("[official]")
elif tool["source"] == "default":
proton.append("[user default]")
if "config" in tool and tool["config"]:
proton.append("{%s}" % tool["config"])
if toolinfo and "install_path" in toolinfo:
proton_path = toolinfo["install_path"]
if not os.path.exists(toolinfo["install_path"]):
proton_path += " <<Not Accessible>>"
else:
proton_path = "<<Not Installed>>"
proton = " ".join(proton)
return proton, proton_path
def cmd_info(self):
uc = self.user.get_app_config(self.app)
if self.app.installed:
installed = "[%s] %s" % (self.app.language, format_size(self.app.declared_install_size))
if self.app.is_proton_app:
# Find out version
tool = self.compat_tool
proton, proton_path = self._format_tool_info(self.steam, tool)
else:
proton = "No"
proton_path = None
else:
installed = "No"
print("\033[1;35mName \033[0;1m:", self.app.name)
print("\033[1;35mAppID \033[0m:", self.app.appid)
print("\033[1;35mInstalled \033[0m:", installed)
if self.app.installed:
print(" @", self.app.install_path)
print("\033[1;35mLaunchOpt.\033[0m:", uc.launch_options)
print("\033[1;35mProton \033[0m:", proton)
if proton_path:
print(" @", proton_path)
if self.app.is_proton_app:
print("\033[1;35mProtonDrv.\033[0m:", self.app.compat_drive)
def setup_proton_env(self, tool, wine=False):
info = self.compat_tools[tool["name"]]
if not info:
die(20, "Compatibility tool '%s' could not be found. Is it installed?" % tool["name"])
os.environ["STEAM_COMPAT_CLIENT_INSTALL_PATH"] = str(self.steam.root)
os.environ["STEAM_COMPAT_DATA_PATH"] = str(self.app.compat_path)
os.environ["STEAM_COMPAT_CONFIG"] = tool.get("config", "")
os.environ["SteamAppId"] = str(self.app.appid)
os.environ["SteamGameId"] = str(self.app.appid)
os.environ["WINEPREFIX"] = str(self.app.compat_path / "pfx")
proton_dir = info["install_path"]
os.environ["WINEROOT"] = str(proton_dir / "dist")
if wine:
# FIXME: correctly detect WINEESYNC
os.environ["WINEESYNC"] = "1"
os.environ["LD_LIBRARY_PATH"] = ":".join(x for x in [
str(proton_dir / "dist/lib64"),
str(proton_dir / "dist/lib"),
os.environ.get("LD_LIBRARY_PATH", None)] if x)
os.environ["PATH"] = ":".join((str(Path(info["install_path"]) / "dist" / "bin"), os.environ["PATH"]))
def cmd_exec(self):
tool = self.compat_tool
self.setup_proton_env(tool, wine=True)
return os.execvp(self.args.cmdline[0], self.args.cmdline)
def cmd_run(self):
if not self.app.installed:
print_error("App is not installed.")
return 50
if self.args.tool:
tool = {"name": self.args.tool}
elif self.app.is_proton_app:
tool = self.compat_tool
else:
tool = None
if tool is not None:
if self.args.config:
tool["config"] = self.args.config
toolinfo = self.compat_tools[tool["name"]]
info_tool, info_path = self._format_tool_info(self.steam, tool, toolinfo)
print_info("Using tool", info_tool)
if self.args.waitforexit:
verb = "commandline_waitforexitandrun"
else:
verb = "commandline"
with open(toolinfo["install_path"] / "toolmanifest.vdf") as f:
mfst = steamutil._vdf.parse(f)
cmdline = mfst["manifest"][verb]
path, *args = cmdline.split()
path = str(toolinfo["install_path"]) + path
self.setup_proton_env(tool)
for cfg in self.app.launch_configs:
# TODO: Handle multiple
if cfg["type"] == "default" and toolinfo.get("from_oslist", "windows") in cfg["config"]["oslist"]:
argv = [path, *args, str(self.app.install_path / cfg["executable"])]
print_info("Executing", " ".join(argv))
return os.execv(path, argv)
else:
# TODO windows support?
for cfg in self.app.launch_configs:
if cfg["type"] == "default" and "linux" in cfg["config"]["oslist"]:
return os.execv(self.app.install_path / cfg["executable"], [cfg["executable"]])
def cmd_proton(self):
# proton and wine subcommands:
# Run proton respective wine with arguments
if not self.app.installed or not self.app.is_proton_app:
print_error("App not installed or not a Proton app")
return 51
if self.args.tool:
tool = {"name": self.args.tool, "source": "args"}
else:
tool = self.compat_tool
if self.args.config:
tool["config"] = self.args.config
try:
toolinfo = self.compat_tools[tool["name"]]
except KeyError:
print_error("Tool not found: '%s'" % tool["name"])
info_tool, _ = self._format_tool_info(self.steam, tool, toolinfo)
print_info("Using tool", info_tool)
if self.args.cmd_proton == "proton":
path = str(toolinfo["install_path"] / "proton")
argv = ["proton"] + self.args.args
elif self.args.cmd_proton == "wine":
path = str(toolinfo["install_path"] / "dist/bin/wine")
argv = [path] + self.args.args
else:
print_error("Unknown cmd_proton verb:", self.args.cmd_proton)
return 111
if not os.path.exists(path):
print_error("No such file:", path)
return 53
self.setup_proton_env(tool, wine=self.args.cmd_proton=="wine")
print_info("Running", path, *argv[1:])
return os.execv(path, argv)
def cmd_appinfo(self):
info = self.appinfo.get(self.app.appid, {})
if self.args.json:
import json
json.dump(info.dict, sys.stdout)
else:
print("AppId:", info.id)
print("AppState:", info.state)
print("Last Update:", info.last_update)
print()
self._pretty_dump(info.dict)
def _pretty_dump(self, d: dict, level=0):
for key, value in d.items():
print(" " * level, "\033[1m", key, "\033[0m", end="", sep="")
if isinstance(value, dict):
print(":")
self._pretty_dump(value, level+1)
else:
print(" = ", value, sep="")
@classmethod
def main(cls, argv: [str]) -> int:
args = cls.parse_args(argv[1:], argv[0])
self = cls(args)
if "cmd" not in args or not args.cmd:
print_error("No command given. see %s --help" % argv[0])
return 255
return args.cmd(self)
if __name__ == "__main__":
sys.exit(ProtonTool.main(sys.argv))

@ -7,7 +7,7 @@ import re, fnmatch, datetime
from pathlib import Path from pathlib import Path
from typing import List, Iterable, Dict, Tuple, Callable, Optional, Union from typing import List, Iterable, Dict, Tuple, Callable, Optional, Union
from vdfparser import VdfParser, DeepDict from vdfparser import VdfParser, DeepDict, AppInfoFile
class CachedProperty: class CachedProperty:
@ -30,13 +30,15 @@ class CachedProperty:
class DictPathRoProperty: class DictPathRoProperty:
__slots__ = "property", "path", "default" __slots__ = "property", "path", "default", "type"
_nodefault = object() _nodefault = object()
_id = lambda x: x
def __init__(self, property: str, path: Tuple[str], default=_nodefault): def __init__(self, property: str, path: Tuple[str], default=_nodefault, type=_id):
self.property = property self.property = property
self.path = path self.path = path
self.default = default self.default = default
self.type = type
def __get__(self, obj, cls): def __get__(self, obj, cls):
if obj is None: if obj is None:
@ -50,7 +52,7 @@ class DictPathRoProperty:
return self.default return self.default
raise raise
else: else:
return d return self.type(d)
class DictPathProperty(DictPathRoProperty): class DictPathProperty(DictPathRoProperty):
@ -83,7 +85,65 @@ class MalformedManifestError(Exception):
return self.args[1] return self.args[1]
class App: class AppInfo:
steam: 'Steam'
appid: int
def __init__(self, steam, appid, *, appinfo_data=None):
self.steam = steam
self.appid = appid
if appinfo_data is not None:
self.__dict__["appinfo"] = appinfo_data
installed = False
# AppInfo
@CachedProperty
def appinfo(self):
# FIXME: properly close AppInfoFile but also deal with always-open appinfo
return self.steam.appinfo[self.appid]
@property
def launch_configs(self):
return self.appinfo["appinfo"]["config"]["launch"].values()
name = DictPathRoProperty("appinfo", ("appinfo", "common", "name"))
oslist = DictPathRoProperty("appinfo", ("appinfo", "common", "oslist"), type=lambda s: s.split(","))
install_dir = DictPathRoProperty("appinfo", ("appinfo", "config", "installdir"))
languages = DictPathRoProperty("appinfo", ("appinfo", "common", "supported_languages"))
gameid = DictPathRoProperty("appinfo", ("appinfo", "common", "gameid"), type=int)
# Misc.
def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path:
return self.steam.get_userdata_path(user_id) / str(self.appid)
@property
def is_native(self):
return sys.platform in self.oslist
@CachedProperty
def compat_tool(self) -> dict:
mapping = self.steam.compat_tool_mapping
appid = str(self.appid)
# User override
if appid in mapping and mapping[appid]["name"]:
tool = dict(mapping[appid])
tool["source"] = "user"
return tool
# Steam play manifest
manifest = self.steam.steamplay_manifest["extended"]["app_mappings"]
if appid in manifest:
tool = dict(manifest[appid])
tool["name"] = tool["tool"]
tool["source"] = "valve"
return tool
# User default
tool = dict(mapping["0"])
tool["source"] = "default"
return tool
class App(AppInfo):
steam: 'Steam' steam: 'Steam'
library_folder: 'LibraryFolder' library_folder: 'LibraryFolder'
steamapps_path: Path steamapps_path: Path
@ -91,7 +151,6 @@ class App:
manifest: DeepDict manifest: DeepDict
def __init__(self, libfolder, manifest_path: Path, *, manifest_data=None): def __init__(self, libfolder, manifest_path: Path, *, manifest_data=None):
self.steam = libfolder.steam
self.library_folder = libfolder self.library_folder = libfolder
self.steamapps_path = libfolder.steamapps_path self.steamapps_path = libfolder.steamapps_path
@ -105,14 +164,14 @@ class App:
if "AppState" not in self.manifest: if "AppState" not in self.manifest:
raise MalformedManifestError("App manifest doesn't have AppState key", self.manifest_path) raise MalformedManifestError("App manifest doesn't have AppState key", self.manifest_path)
super().__init__(libfolder.steam, int(self.manifest["AppState"]["appid"]))
installed = True
def __repr__(self): def __repr__(self):
return "<steamutil.App %d '%s' @ \"%s\">" % (self.appid, self.name, self.install_path) return "<steamutil.App %d '%s' @ \"%s\">" % (self.appid, self.name, self.install_path)
# Basic info # Basic info
@property
def appid(self) -> int:
return int(self.manifest["AppState"]["appid"])
name = DictPathRoProperty("manifest", ("AppState", "name")) name = DictPathRoProperty("manifest", ("AppState", "name"))
language = DictPathRoProperty("manifest", ("AppState", "UserConfig", "language"), None) language = DictPathRoProperty("manifest", ("AppState", "UserConfig", "language"), None)
install_dir = DictPathRoProperty("manifest", ("AppState", "installdir")) install_dir = DictPathRoProperty("manifest", ("AppState", "installdir"))
@ -121,9 +180,6 @@ class App:
def install_path(self) -> Path: def install_path(self) -> Path:
return self.steamapps_path / "common" / self.install_dir return self.steamapps_path / "common" / self.install_dir
def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path:
return self.steam.get_userdata_path(user_id) / str(self.appid)
# Workshop # Workshop
# TODO # TODO
@CachedProperty @CachedProperty
@ -131,6 +187,18 @@ class App:
return self.steamapps_path / "workshop" / "content" / str(self.appid) return self.steamapps_path / "workshop" / "content" / str(self.appid)
# Steam Play info # Steam Play info
@property
def is_steam_play(self):
uc = self.manifest["AppState"].get("UserConfig")
if uc and "platform_override_source" in uc:
return uc["platform_override_source"]
@property
def is_proton_app(self):
uc = self.manifest["AppState"].get("UserConfig")
if uc and "platform_override_source" in uc:
return uc["platform_override_source"] == "windows" and uc["platform_override_dest"] == "linux"
@CachedProperty @CachedProperty
def compat_path(self) -> Path: def compat_path(self) -> Path:
return self.steamapps_path / "compatdata" / str(self.appid) return self.steamapps_path / "compatdata" / str(self.appid)
@ -140,7 +208,7 @@ class App:
return self.compat_path / "pfx" / "drive_c" return self.compat_path / "pfx" / "drive_c"
# Install size # Install size
declared_install_size = DictPathRoProperty("manifest", ("AppState", "SizeOnDisk"), 0) declared_install_size = DictPathRoProperty("manifest", ("AppState", "SizeOnDisk"), 0, type=int)
def compute_install_size(self) -> int: def compute_install_size(self) -> int:
def sum_size(p: Path): def sum_size(p: Path):
@ -372,6 +440,59 @@ class Steam:
user_id = user_id.account_id user_id = user_id.account_id
return self.root / "userdata" / str(user_id) return self.root / "userdata" / str(user_id)
# Config
@CachedProperty
def config(self) -> DeepDict:
with open(self.config_vdf) as f:
return _vdf.parse(f)
config_install_store = DictPathProperty("config", ("InstallConfigStore",))
config_software_steam = DictPathProperty("config", ("InstallConfigStore", "Software", "Valve", "Steam"))
compat_tool_mapping = DictPathProperty("config_software_steam", ("CompatToolMapping",))
# AppInfo cache
@CachedProperty
def appinfo_vdf(self):
return self.root / "appcache" / "appinfo.vdf"
@property
def appinfo(self) -> AppInfoFile:
return AppInfoFile.open(self.appinfo_vdf)
@CachedProperty
def steamplay_manifest(self) -> DeepDict:
with self.appinfo as info:
return info[891390]["appinfo"]
@CachedProperty
def compat_tools(self) -> {str:{}}:
tools = {}
# Find official proton installs
valve = self.steamplay_manifest["extended"]["compat_tools"]
for name, t in valve.items():
app = self.get_app(t["appid"])
if app:
tool = dict(t)
tool["install_path"] = app.install_path
tools[name] = tool
# Find custom compat tools
manifests = []
for p in (self.root / "compatibilitytools.d").iterdir():
if p.suffix == ".vdf":
manifests.append(p)
elif p.is_dir():
c = p / "compatibilitytool.vdf"
if c.exists():
manifests.append(c)
for mfst_path in manifests:
with open(mfst_path) as f:
mfst = _vdf.parse(f)
for name, t in mfst["compatibilitytools"]["compat_tools"].items():
# TODO warn duplicate name
t["install_path"] = mfst_path.parent / t["install_path"]
tools[name] = t
return tools
# Game/App Library # Game/App Library
@CachedProperty @CachedProperty
def library_folder_paths(self) -> List[Path]: def library_folder_paths(self) -> List[Path]:
@ -387,11 +508,15 @@ class Steam:
for lf in self.library_folders: #pylint:disable=not-an-iterable for lf in self.library_folders: #pylint:disable=not-an-iterable
yield from lf.apps yield from lf.apps
def get_app(self, id: int) -> Optional[App]: def get_app(self, id: int, installed=True) -> Optional[App]:
for lf in self.library_folders: #pylint:disable=not-an-iterable for lf in self.library_folders: #pylint:disable=not-an-iterable
app = lf.get_app(id) app = lf.get_app(id)
if app is not None: if app is not None:
return app return app
if not installed:
for appid, appinfo in self.appinfo.items():
if appid == id:
return AppInfo(self, appid, appinfo_data=appinfo)
def find_apps(self, pattern: str) -> Iterable[App]: def find_apps(self, pattern: str) -> Iterable[App]:
for lf in self.library_folders: #pylint:disable=not-an-iterable for lf in self.library_folders: #pylint:disable=not-an-iterable

@ -6,10 +6,13 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import io import io
import struct
import collections
import datetime
from typing import Dict, Union from typing import Dict, Union, Mapping, Tuple
DeepDict = Dict[str, Union[str, "DeepDict"]] DeepDict = Mapping[str, Union[str, "DeepDict"]]
class VdfParser: class VdfParser:
@ -176,3 +179,229 @@ class VdfParser:
if self.encoding: if self.encoding:
raise NotImplementedError("Writing in binary mode is not implemented yet.") # TODO (maybe) raise NotImplementedError("Writing in binary mode is not implemented yet.") # TODO (maybe)
self._write_map(fd, dictionary, 0 if pretty else None) self._write_map(fd, dictionary, 0 if pretty else None)
class BinaryVdfParser:
# Type codes
T_SKEY = b'\x00' # Subkey
T_CSTR = b'\x01' # 0-delimited string
T_INT4 = b'\x02' # 32-bit int
T_FLT4 = b'\x03' # 32-bit float
T_PNTR = b'\x04' # 32-bit pointer
T_WSTR = b'\x05' # 0-delimited wide string
T_COLR = b'\x06' # 32-bit color
T_UNT8 = b'\x07' # 64-bit unsigned int
T_END = b'\x08' # End of subkey
T_INT8 = b'\x0A' # 64-bit signed int
T_END2 = b'\x0B' # Alternative end of subkey tag
# Unpack binary types
S_INT4 = struct.Struct("<i")
S_FLT4 = struct.Struct("<f")
S_INT8 = struct.Struct("<q")
S_UNT8 = struct.Struct("<Q")
def __init__(self, factory=dict):
self.factory = factory
@staticmethod
def _read_until(fd: io.BufferedIOBase, delim: bytes) -> bytes:
pieces = []
buf = bytearray(64)
end = -1
while end == -1:
read = fd.readinto(buf)
if not read:
raise EOFError()
end = buf.find(delim, 0, read)
pieces.append(bytes(buf[:read if end < 0 else end]))
fd.seek(end - read + len(delim), io.SEEK_CUR)
return b"".join(pieces)
@staticmethod
def _read_struct(fd: io.BufferedIOBase, s: struct.Struct):
return s.unpack(fd.read(s.size))
def _read_cstring(self, fd: io.BufferedIOBase) -> str:
return self._read_until(fd, b'\0').decode("utf-8", "replace")
def _read_wstring(self, fd: io.BufferedIOBase) -> str:
return self._read_until(fd, b'\0\0').decode("utf-16")
def _read_map(self, fd: io.BufferedIOBase) -> DeepDict:
map = self.factory()
while True:
t = fd.read(1)
if not len(t):
raise EOFError()
if t in (self.T_END, self.T_END2):
return map
key, value = self._read_item(fd, t)
map[key] = value
def _read_item(self, fd: io.BufferedIOBase, t: int) -> (str, DeepDict):
key = self._read_cstring(fd)
if t == self.T_SKEY:
return key, self._read_map(fd)
elif t == self.T_CSTR:
return key, self._read_cstring(fd)
elif t == self.T_WSTR:
return key, self._read_wstring(fd)
elif t in (self.T_INT4, self.T_PNTR, self.T_COLR):
return key, self._read_struct(fd, self.S_INT4)[0]
elif t == self.T_UNT8:
return key, self._read_struct(fd, self.S_UNT8)[0]
elif t == self.T_INT8:
return key, self._read_struct(fd, self.S_INT8)[0]
elif t == self.T_FLT4:
return key, self._read_struct(fd, self.S_FLT4)[0]
else:
raise ValueError("Unknown data type", fd.tell(), t)
def parse(self, fd: io.BufferedIOBase) -> DeepDict:
return self._read_map(fd)
def parse_bytes(self, data: bytes) -> DeepDict:
with io.BytesIO(data) as fd:
return self.parse(fd)
class AppInfoFile:
S_APP_HEADER = struct.Struct("<IIIIQ20sI")
S_INT4 = struct.Struct("<I")
@classmethod
def open(cls, filename) -> "AppInfoFile":
return cls(open(filename, "br"), close=True)
def __init__(self, file, bvdf_parser=None, close=True):
self.file = file
self.parser = bvdf_parser if bvdf_parser is not None else BinaryVdfParser()
self._close_file = close
self._universe = None
self._apps = None
def _load_map(self, offset: int) -> DeepDict:
self.file.seek(offset, io.SEEK_SET)
return self.parser.parse(self.file)
class App:
__slots__ = "appinfo", "offset", "id", "size", "state", "last_update", "token", "hash", "changeset", "_data"
def __init__(self, appinfo, offset, struct):
self.id = struct[0]
self.size = struct[1]
self.state = struct[2]
self.last_update = datetime.datetime.fromtimestamp(struct[3])
self.token = struct[4]
self.hash = struct[5]
self.changeset = struct[6]
self.appinfo = appinfo
self.offset = offset
self._data = None
def __getitem__(self, key):
if self._data is None:
self._data = self.appinfo._load_map(self.offset)
return self._data[key]
def __getattr__(self, attr):
if attr in dir(dict):
if self._data is None:
self._data = self.appinfo._load_map(self.offset)
return getattr(self._data, attr)
raise AttributeError(attr)
@property
def dict(self):
if self._data is None:
self._data = self.appinfo._load_map(self.offset)
return self._data
def _read_exactly(self, s: int) -> bytes:
cs = self.file.read(s)
if len(cs) < s:
raise EOFError()
return cs
def _read_int(self) -> int:
return self.S_INT4.unpack(self._read_exactly(self.S_INT4.size))[0]
def _load(self):
magic = self._read_exactly(4)
if magic != b"\x27\x44\x56\x07":
raise ValueError("Wrong appinfo.vdf magic")
self._universe = self._read_int()
self._apps = {}
buffer = bytearray(self.S_APP_HEADER.size)
while True:
read = self.file.readinto(buffer)
if read < 4:
raise EOFError()
struct = self.S_APP_HEADER.unpack(buffer)
appid, size, *_ = struct
if appid == 0:
return # Done
elif read < self.S_APP_HEADER.size:
raise EOFError()
self._apps[appid] = self.App(self, self.file.tell(), struct)
self.file.seek(size - (self.S_APP_HEADER.size - 8), io.SEEK_CUR)
@property
def universe(self):
if self._universe is None:
self._load()
return self._universe
def __getattr__(self, attr):
if attr in dir(dict):
if self._apps is None:
self._load()
return getattr(self._apps, attr)
raise AttributeError(attr)
def __getitem__(self, key):
if self._apps is None:
self._load()
return self._apps[key]
def __iter__(self):
if self._apps is None:
self._load()
return iter(self._apps)
@property
def dict(self):
if self._apps is None:
self._load()
return self._apps
# Cleanup
def __enter__(self):
return self
def __exit__(self, exc, tp, tb):
self.close()
def close(self):
if self._close_file:
self.file.close()

Loading…
Cancel
Save