diff --git a/lib/python/vdfparser.py b/lib/python/vdfparser.py index 3963cdc..f62a8e8 100644 --- a/lib/python/vdfparser.py +++ b/lib/python/vdfparser.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 # Parse Steam/Source VDF Files # Reference: https://developer.valvesoftware.com/wiki/KeyValues#File_Format -# (c) 2015-2020 Taeyeon Mori; CC-BY-SA +# (c) 2015-2024 Taeyeon Mori; CC-BY-SA from __future__ import unicode_literals import datetime import io import struct -from typing import (Any, Dict, Iterator, Mapping, NewType, Optional, Sequence, +from typing import (Any, BinaryIO, Dict, Iterator, List, Mapping, NewType, Optional, Sequence, Tuple, Type, TypeVar, Union, overload) try: @@ -96,6 +96,7 @@ class LowerCaseNormalizingDict(dict): return super().get(key.lower(), default=default) +#### Text VDF parser. class VdfParser: """ Simple Steam/Source VDF parser @@ -262,6 +263,41 @@ class VdfParser: self._write_map(fd, dictionary, 0 if pretty else None) +#### Binary parsing utils +def _read_exactly(fd: BinaryIO, s: int) -> bytes: + cs = fd.read(s) + if len(cs) < s: + raise EOFError() + return cs + +def _read_int(fd: BinaryIO, size: int=4, signed=False) -> int: + return int.from_bytes(_read_exactly(fd, size), 'little', signed=signed) + +def _read_struct(fd: BinaryIO, s: struct.Struct): + return s.unpack(fd.read(s.size)) + +def _read_until(fd: BinaryIO, delim: bytes, bufsize: int=64) -> bytes: + pieces = [] + piece: bytes + end = -1 + + while end == -1: + piece = fd.read(bufsize) + if not piece: + raise EOFError() + + end = piece.find(delim) + pieces.append(piece[:end]) + + fd.seek(end - len(piece) + len(delim), io.SEEK_CUR) + + return b"".join(pieces) + +def _read_cstring(fd: BinaryIO) -> str: + return _read_until(fd, b'\0').decode("utf-8", "replace") + + +#### Binary VDF parser class BinaryVdfParser: # Type codes T_SKEY = b'\x00' # Subkey @@ -271,109 +307,86 @@ class BinaryVdfParser: T_PNTR = b'\x04' # 32-bit pointer T_WSTR = b'\x05' # 0-delimited wide string T_COLR = b'\x06' # 32-bit color - T_UNT8 = b'\x07' # 64-bit unsigned int + T_INT8 = b'\x07' # 64-bit int T_END = b'\x08' # End of subkey - T_INT8 = b'\x0A' # 64-bit signed int + T_SIN8 = b'\x0A' # 64-bit signed int T_END2 = b'\x0B' # Alternative end of subkey tag # Unpack binary types - S_INT4 = struct.Struct(" bytes: - pieces = [] - buf = bytearray(64) - end = -1 - - while end == -1: - read = fd.readinto(buf) - - if not read: - raise EOFError() - - end = buf.find(delim, 0, read) - pieces.append(bytes(buf[:read if end < 0 else end])) - - fd.seek(end - read + len(delim), io.SEEK_CUR) - - return b"".join(pieces) - - @staticmethod - def _read_struct(fd: io.BufferedIOBase, s: struct.Struct): - return s.unpack(fd.read(s.size)) - - def _read_cstring(self, fd: io.BufferedIOBase) -> str: - return self._read_until(fd, b'\0').decode("utf-8", "replace") - - def _read_wstring(self, fd: io.BufferedIOBase) -> str: - return self._read_until(fd, b'\0\0').decode("utf-16") - - def _read_map(self, fd: io.BufferedIOBase) -> DeepDict: + def _read_map(self, fd: BinaryIO, key_table: Optional[List[str]]=None) -> DeepDict: map = self.factory() while True: t = fd.read(1) - if not len(t): + if not t: raise EOFError() if t in (self.T_END, self.T_END2): return map - key, value = self._read_item(fd, t) - map[key] = value + if key_table is not None: + key = key_table[_read_int(fd, 4)] + else: + key = _read_cstring(fd) - def _read_item(self, fd: io.BufferedIOBase, t: bytes) -> Tuple[str, Union[str, DeepDict]]: - key = self._read_cstring(fd) + value = self._read_value(fd, t, key_table=key_table) + map[key] = value + + def _read_value(self, fd: BinaryIO, t: bytes, key_table: Optional[List[str]]=None) -> Union[str, int, float, DeepDict]: if t == self.T_SKEY: - return key, self._read_map(fd) + return self._read_map(fd, key_table=key_table) elif t == self.T_CSTR: - return key, self._read_cstring(fd) + return _read_cstring(fd) elif t == self.T_WSTR: - return key, self._read_wstring(fd) + length = _read_int(fd, 2) + return _read_exactly(fd, length).decode("utf-16") elif t in (self.T_INT4, self.T_PNTR, self.T_COLR): - return key, self._read_struct(fd, self.S_INT4)[0] - elif t == self.T_UNT8: - return key, self._read_struct(fd, self.S_UNT8)[0] + return _read_int(fd, 4) elif t == self.T_INT8: - return key, self._read_struct(fd, self.S_INT8)[0] + return _read_int(fd, 8) + elif t == self.T_SIN8: + return _read_int(fd, 8, True) elif t == self.T_FLT4: - return key, self._read_struct(fd, self.S_FLT4)[0] + return _read_struct(fd, self.S_FLT4)[0] else: raise ValueError("Unknown data type", fd.tell(), t) - def parse(self, fd: io.BufferedIOBase) -> DeepDict: - return self._read_map(fd) + def parse(self, fd: BinaryIO, key_table: Optional[List[str]]=None) -> DeepDict: + return self._read_map(fd, key_table=key_table) - def parse_bytes(self, data: bytes) -> DeepDict: + def parse_bytes(self, data: bytes, key_table: Optional[List[str]]=None) -> DeepDict: with io.BytesIO(data) as fd: - return self.parse(fd) + return self.parse(fd, key_table=key_table) class AppInfoFile: S_APP_HEADER = struct.Struct(" Self: return cls(open(filename, "br"), close=True) - def __init__(self, file, bvdf_parser=None, close=True): + def __init__(self, file: BinaryIO, bvdf_parser=None, close=True): self.file = file self.parser = bvdf_parser if bvdf_parser is not None else BinaryVdfParser() + self.key_table = None self._close_file = close def _load_offset(self, offset: int) -> DeepDict: self.file.seek(offset, io.SEEK_SET) - return self.parser.parse(self.file) + return self.parser.parse(self.file, key_table=self.key_table) class App: __slots__ = "appinfo", "offset", "id", "size", "state", "last_update", "token", "hash", "changeset", "hash_bin", "_data" @@ -391,6 +404,9 @@ class AppInfoFile: self.offset = offset self._data = None + def __repr__(self) -> str: + return f"<{self.__class__.__qualname__}@{id(self):08x}: {self.id} @{self.offset:08x}>" + def __getitem__(self, key): if self._data is None: self._data = self.appinfo._load_offset(self.offset) @@ -402,43 +418,60 @@ class AppInfoFile: self._data = self.appinfo._load_offset(self.offset) return self._data - def _read_exactly(self, s: int) -> bytes: - cs = self.file.read(s) - if len(cs) < s: - raise EOFError() - return cs + def _read_string_table_from(self, offset: int) -> List[str]: + # preserve offset + _offset = self.file.tell() + self.file.seek(offset) + count = _read_int(self.file, 4) + + stable: List[str] = [] + rest: List[bytes] = [] + buf = b'' + for _ in range(count): + while (end := buf.find(b'\0')) < 0: + rest.append(buf) + buf = self.file.read(4096) + if not buf: + raise EOFError() + if rest: + cs = b''.join((*rest, buf[:end])) + rest.clear() + else: + cs = buf[:end] + stable.append(cs.decode("utf-8")) + buf = buf[end+1:] - def _read_int(self) -> int: - return self.S_INT4.unpack(self._read_exactly(self.S_INT4.size))[0] + self.file.seek(_offset) + return stable def _load_index(self) -> Tuple[int, Dict[int, App]]: - magic = self._read_exactly(4) - if magic == b"\x28\x44\x56\x07": + magic = _read_exactly(self.file, 4) + universe = _read_int(self.file, 4) + + if magic == b"\x29\x44\x56\x07": + header_struct = self.S_APP_HEADER_V2 + # read key table + kto = _read_int(self.file, 8) + self.key_table = self._read_string_table_from(kto) + elif magic == b"\x28\x44\x56\x07": header_struct = self.S_APP_HEADER_V2 elif magic == b"\x27\x44\x56\x07": header_struct = self.S_APP_HEADER else: - raise ValueError("Unknown appinfo.vdf magic") + raise ValueError(f"Unknown appinfo.vdf magic {magic.hex()}") - universe = self._read_int() apps = {} - buffer = bytearray(header_struct.size) - while True: - read = self.file.readinto(buffer) - - if read < 4: + buf = self.file.read(header_struct.size) + if buf.startswith(b"\0\0\0\0"): + break # Done + if len(buf) < header_struct.size: raise EOFError() - struct = header_struct.unpack(buffer) + struct = header_struct.unpack(buf) appid, size, *_ = struct - if appid == 0: - break # Done - elif read < header_struct.size: - raise EOFError() - apps[appid] = self.App(self, self.file.tell(), struct) self.file.seek(size - (header_struct.size - 8), io.SEEK_CUR)