diff --git a/bin/sync_savegames b/bin/sync_savegames new file mode 100755 index 0000000..39777f3 --- /dev/null +++ b/bin/sync_savegames @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +#pylint:disable=no-member + +from pathlib import Path +from steamsync import SteamSync, SyncOp + + +# Find home +steamns_home = Path("~/.local/steam/home/taeyeon/").expanduser() + +if steamns_home.exists(): + home = steamns_home +else: + home = Path("~").expanduser() + +sync = SteamSync(Path("~/Nextcloud/Misc/Savegames").expanduser()) + + +@sync.by_name("zanzarah") +def zanzarah(op): + with op.game_directory.prefix("Save") as set: + set += "*" + set.execute() + +#@sync.by_id(787860) +def fs19(op): + with op.my_documents.prefix("My Games/FarmingSimulator2019") as set: + set.add( + "music/streamingInternetRadios.xml", + "savegame[1-9]", + "savegame[1-2][0-9]" + "VERSION", + ) + set.execute() + +#@sync.by_name("Fell Seal") +def fell_seal(op): + with op.from_(home).prefix("Fell Seal") as set: + set += "saves" + set += "customdata" + set.execute() + diff --git a/lib/python/steamsync.py b/lib/python/steamsync.py new file mode 100644 index 0000000..80c402e --- /dev/null +++ b/lib/python/steamsync.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 + +import sys, os + +import fnmatch +import re +import itertools +import shutil + +from pathlib import Path +from typing import Tuple, Dict, List, Union, Set, Callable, Any + +from steamutil import Steam, App, CachedProperty + + +class AppNotFoundType: + def __call__(self, func) -> None: + pass + + def __bool__(self) -> bool: + return False + +AppNotFound = AppNotFoundType() + + +class SyncPath: + __slots__ = "op", "local", "common" + + op: 'SyncOp' + local: Path + common: Path + + def __init__(self, op, local, common="."): + self.op = op + self.local = Path(local) + self.common = Path(common) + + def prefix(self, component: Union[str, Path]) -> 'SyncPath': + return SyncPath(self.op, self.local / component, self.common) + + def __div__(self, component: Union[str, Path]) -> 'SyncPath': + return SyncPath(self.op, self.local, self.common / component) + + @property + def path(self) -> Path: + return self.local / self.common + + @property + def target_path(self) -> Path: + return self.op.target_path / self.common + + def __enter__(self) -> 'SyncSet': + return SyncSet(self) + + def __exit__(self, type, value, traceback): + # Todo: auto-commit? + pass + + +class SyncSet: + FileStatSet = Dict[Path, Tuple[Path, os.stat_result]] + + op: 'SyncOp' + spath: SyncPath + local: FileStatSet + target: FileStatSet + changeset: int + + def __init__(self, path): + self.op = path.op + self.spath = path + self.local = {} + self.target = {} + + @property + def path(self) -> Path: + return self.spath.path + + @property + def target_path(self) -> Path: + return self.spath.target_path + + # Modify inclusion + @staticmethod + def _collect_files(anchor: Path, patterns: List[str]): + files: SyncSet.FileStatSet = {} + def add_file(f): + if f.is_file(): + files[f.relative_to(anchor)] = f, f.stat() + elif f.is_dir(): + for f in f.iterdir(): + add_file(f) + for f in set(itertools.chain.from_iterable(anchor.glob(g) for g in patterns)): + add_file(f) + return files + + def add(self, *patterns): + self.local.update(self._collect_files(self.path, patterns)) + self.target.update(self._collect_files(self.target_path, patterns)) + self._inval() + + def __iadd__(self, pattern: str) -> 'SyncSet': + self.add(pattern) + return self + + # Calculate changes + def _inval(self): + for cache in "files_from_local", "files_from_target", "files_unmodified": + if cache in self.__dict__: + del self.__dict__[cache] + + @staticmethod + def _sync_set(src_files: FileStatSet, dst_files: FileStatSet) -> Set[Path]: + """ + Return a set of files that need to be updated from src to dst. + """ + return {f + for f, (_, sst) in src_files.items() + if f not in dst_files or sst.st_mtime > dst_files[f][1].st_mtime + } + + @CachedProperty + def files_from_local(self) -> Set[Path]: + return self._sync_set(self.local, self.target) + + @CachedProperty + def files_from_target(self) -> Set[Path]: + return self._sync_set(self.target, self.local) + + @CachedProperty + def files_unmodified(self) -> Set[Path]: + return (self.local.keys() | self.target.keys()) - (self.files_from_local | self.files_from_target) + + def show_confirm(self) -> bool: + # XXX: move to SyncOp? + print(" Local is newer: ", ", ".join(map(str, self.files_from_local))) + print(" Target is newer: ", ", ".join(map(str, self.files_from_target))) + print(" Unmodified: ", ", ".join(map(str, self.files_unmodified))) + + print("Press enter to continue") + input() + return True # TODO: Proper thingey + + def execute(self, *, make_inconsistent=False) -> bool: + operations = [] + if self.files_from_local: + if self.files_from_target and not make_inconsistent: + self.op.report_error(["Both sides have changed files. Synchronizing would lead to inconsistent and possibly broken savegames."]) + return False + operations += [(self.path / p, self.target_path / p) for p in self.files_from_local] #pylint:disable=not-an-iterable + + if self.files_from_target: + operations += [(self.target_path / p, self.path / p) for p in self.files_from_target] #pylint:disable=not-an-iterable + + return self.op._do_copy(operations) + + +class SyncOp: + parent: 'SteamSync' + app: App + + def __init__(self, ssync, app): + self.parent = ssync + self.app = app + + def __call__(self, func: Callable[['SyncOp'],Any]): + # For decorator use + self._report_begin() + return func(self) + + def _do_copy(self, ops: List[Tuple[Path, Path]]) -> bool: + for src, dst in ops: + if not dst.parent.exists(): + dst.parent.mkdir(parents=True) + + print(" \033[36m%s -> %s\033[0m" % (src, dst)) + shutil.copy2(src, dst) + return True + + # UI + def _report_begin(self): + print("\033[34mNow Synchronizing App %s\033[0m" % self.app.name) + + def report_error(self, msg: List[str]): + print("\033[31m"+"\n".join(" " + l for l in msg)+"\033[0m") + + @CachedProperty + def target_path(self) -> Path: + return self.parent.target_path / self.app.install_dir + + @CachedProperty + def my_documents(self) -> SyncPath: + """ Get the Windows "My Documents" folder """ + if sys.platform.startswith("linux"): + # TODO: what about native games? + return SyncPath(self, self.app.compat_drive / "users/steamuser/My Documents") + elif sys.platform == "win32": + def get_my_documents(): + import ctypes.wintypes + CSIDL_PERSONAL = 5 # My Documents + SHGFP_TYPE_CURRENT = 0 # Get current, not default value + + buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) + ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf) + + return buf.value + return SyncPath(self, get_my_documents()) + else: + raise Exception("Platform not supported") + + @property + def game_directory(self) -> SyncPath: + return SyncPath(self, self.app.install_path) + + def from_(self, path: Path) -> SyncPath: + return SyncPath(self, path) + + +class SteamSync: + target_path: Path + steam: Steam + + def __init__(self, target_path: Path, *, steam_path: Path = None): + self.target_path = Path(target_path) + self.steam = Steam(steam_path) + + @CachedProperty + def apps(self) -> List[App]: + return list(self.steam.apps) + + def by_id(self, appid): + app = self.steam.get_app(appid) + if app is not None: + return SyncOp(self, app) + else: + return AppNotFound + + def by_name(self, pattern): + pt = re.compile(fnmatch.translate(pattern).rstrip("\\Z"), re.IGNORECASE) + app = None + for candidate in self.apps: #pylint:disable=not-an-iterable + if pt.search(candidate.name): + if app is not None: + raise Exception("Encountered more than one possible App matching '%s'" % pattern) + app = candidate + return SyncOp(self, app) diff --git a/lib/python/steamutil.py b/lib/python/steamutil.py new file mode 100644 index 0000000..2a08a24 --- /dev/null +++ b/lib/python/steamutil.py @@ -0,0 +1,374 @@ +# Discover Steam install and games +# (c) 2020 Taeyeon Mori CC-BY-SA + +import sys, os +import re, fnmatch, datetime + +from pathlib import Path +from typing import List, Iterable, Dict, Tuple, Callable, Optional, Union + +from vdfparser import VdfParser, DeepDict + + +class CachedProperty: + """ A property that is only computed once per instance and then replaces + itself with an ordinary attribute. Deleting the attribute resets the + property. + + Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76 + """ + + def __init__(self, func): + self.__doc__ = getattr(func, '__doc__') + self.func = func + + def __get__(self, obj, cls): + if obj is None: + return self + value = obj.__dict__[self.func.__name__] = self.func(obj) + return value + + +class DictPathRoProperty: + __slots__ = "property", "path", "default" + _nodefault = object() + + def __init__(self, property: str, path: Tuple[str], default=_nodefault): + self.property = property + self.path = path + self.default = default + + def __get__(self, obj, cls): + if obj is None: + return self + d = getattr(obj, self.property) + try: + for pc in self.path: + d = d[pc] + except KeyError: + if self.default is not self._nodefault: + return self.default + raise + else: + return d + + +class DictPathProperty(DictPathRoProperty): + __slots__ = () + + def _get_create_parent(self, obj) -> Dict: + d = getattr(obj, self.property) + for pc in self.path[:-1]: + try: + d = d[pc] + except KeyError: + nd = {} + d[pc] = nd + d = nd + return d + + def __set__(self, obj, value): + self._get_create_parent(obj)[self.path[-1]] = value + + def __delete__(self, obj): + del self._get_create_parent(obj)[self.path[-1]] + + +_vdf = VdfParser() + + +class App: + steam: 'Steam' + library_folder: 'LibraryFolder' + steamapps_path: Path + manifest_path: Path + manifest: DeepDict + + def __init__(self, libfolder, manifest_path: Path, *, manifest_data=None): + self.steam = libfolder.steam + self.library_folder = libfolder + self.steamapps_path = libfolder.steamapps_path + + self.manifest_path = manifest_path + if manifest_data is None: + with open(manifest_path) as f: + self.manifest = _vdf.parse(f) + else: + self.manifest = manifest_data + + def __repr__(self): + return "" % (self.appid, self.name, self.install_path) + + # Basic info + @property + def appid(self) -> int: + return int(self.manifest["AppState"]["appid"]) + + name = DictPathRoProperty("manifest", ("AppState", "name")) + language = DictPathRoProperty("manifest", ("AppState", "UserConfig", "language"), None) + install_dir = DictPathRoProperty("manifest", ("AppState", "installdir")) + + @CachedProperty + def install_path(self) -> Path: + return self.steamapps_path / "common" / self.install_dir + + def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path: + return self.steam.get_userdata_path(user_id) / str(self.appid) + + # Workshop + # TODO + @CachedProperty + def workshop_path(self) -> Path: + return self.steamapps_path / "workshop" / "content" / str(self.appid) + + # Steam Play info + @CachedProperty + def compat_path(self) -> Path: + return self.steamapps_path / "compatdata" / str(self.appid) + + @CachedProperty + def compat_drive(self) -> Path: + return self.compat_path / "pfx" / "drive_c" + + # Install size + declared_install_size = DictPathRoProperty("manifest", ("AppState", "SizeOnDisk"), 0) + + def compute_install_size(self) -> int: + def sum_size(p: Path): + acc = 0 + for x in p.iterdir(): + if x.is_dir(): + acc += sum_size(x) + else: + acc += x.stat().st_size + return acc + return sum_size(self.install_path) + + +class LibraryFolder: + steam: 'Steam' + path: Path + + def __init__(self, steam: 'Steam', path: Path): + self.steam = steam + self.path = path + + def __repr__(self): + return "" % self.path + + # Paths + @CachedProperty + def steamapps_path(self) -> Path: + steamapps = self.path / "steamapps" + if not steamapps.exists(): + # Emulate case-insensitivity + cased = self.path / "SteamApps" + if cased.exists(): + steamapps = cased + else: + # try to find other variation + found = [d for d in self.path.iterdir() if d.is_dir() and d.name.lower() == "steamapps"] + if len(found) > 1: + raise Exception("More than one steamapps folder in library folder", self.path) + elif found: + return found[0] + # if none exists, return non-existant default name + return steamapps + + @property + def common_path(self) -> Path: + return self.steamapps_path / "common" + + @property + def appmanifests(self) -> Iterable[Path]: + return self.steamapps_path.glob("appmanifest_*.acf") # pylint:disable=no-member + + @property + def apps(self) -> Iterable[App]: + return (App(self, mf) for mf in self.appmanifests) + + def get_app(self, appid: int) -> Optional[App]: + manifest = self.steamapps_path / ("appmanifest_%d.acf" % appid) + if manifest.exists(): + return App(self, manifest) + + def find_apps_re(self, regexp: str) -> Iterable[App]: + reg = re.compile(r'"name"\s+".*%s.*"' % regexp, re.IGNORECASE) + for manifest in self.appmanifests: #pylint:disable=not-an-iterable + with open(manifest) as f: + content = f.read() + if reg.search(content): + yield App(self, manifest, manifest_data=_vdf.parse_string(content)) + + def find_apps(self, pattern: str) -> Iterable[App]: + return self.find_apps_re(fnmatch.translate(pattern).rstrip("\\Z")) + + +class UserAppConfig: + user: 'LoginUser' + appid: int + + def __init__(self, user, appid): + self.user = user + self.appid = appid + + def __repr__(self): + return "" % (self.appid, self.user.account_name) + + @property + def _data(self): + try: + return self.user.localconfig["UserLocalConfigStore"]["Software"]["Valve"]["Steam"]["Apps"][str(self.appid)] + except KeyError: + return {} # TODO + + @property + def last_played(self) -> datetime.datetime: + return datetime.datetime.fromtimestamp(int(self._data.get("LastPlayed", "0"))) + + @property + def playtime(self) -> datetime.time: + t = int(self._data.get("Playtime", "0")) + return datetime.time(t // 60, t % 60) + + @property + def playtime_two_weeks(self) -> datetime.time: + t = int(self._data.get("Playtime2wks", "0")) + return datetime.time(t // 60, t % 60) + + launch_options = DictPathProperty("_data", ("LaunchOptions",), None) + + +class LoginUser: + steam: 'Steam' + id: int + info: Dict[str, str] + + def __init__(self, steam, id: int, info: Dict): + self.steam = steam + self.id = id + self.info = info + + def __repr__(self): + return "" % (self.id , self.account_name, self.username) + + @property + def account_id(self): + """ 32-bit account ID """ + return self.id & 0xffffffff + + account_name = DictPathRoProperty("info", ("AccountName",)) + username = DictPathRoProperty("info", ("PersonaName",)) + + @CachedProperty + def userdata_path(self) -> Path: + return self.steam.get_userdata_path(self) + + @property + def localconfig_vdf(self) -> Path: + return self.userdata_path / "config" / "localconfig.vdf" + + @CachedProperty + def localconfig(self) -> DeepDict: + with open(self.localconfig_vdf) as f: + return _vdf.parse(f) + + # Game config + def get_app_config(self, app: Union[int, App]) -> Optional[UserAppConfig]: + if isinstance(app, App): + app = app.appid + return UserAppConfig(self, app) + + +class Steam: + root: Path + + def __init__(self, install_path=None): + self.root = install_path if install_path is not None else self.find_install_path() + if self.root is None: + raise Exception("Could not find Steam") + + def __repr__(self): + return "" % self.root + + @staticmethod + def find_install_path() -> Optional[Path]: + # TODO: Windows + # Linux + if sys.platform.startswith("linux"): + # Try ~/.steam first + dotsteam = Path(os.path.expanduser("~/.steam")) + if dotsteam.exists(): + steamroot = (dotsteam / "root").resolve() + if steamroot.exists(): + return steamroot + # Try ~/.local/share/Steam, classic ~/Steam + data_dir = Path(os.environ.get("XDG_DATA_HOME", "~/.local/share")).expanduser() + for path in data_dir, Path("~").expanduser(): + for name in "Steam", "SteamBeta": + steamroot = path / name + if steamroot.exists(): + return steamroot + + # Various paths + @property + def libraryfolders_vdf(self) -> Path: + """ The libraryfolders.vdf file listing all configured library locations """ + return self.root / "steamapps" / "libraryfolders.vdf" + + @property + def config_vdf(self) -> Path: + return self.root / "config" / "config.vdf" + + @property + def loginusers_vdf(self) -> Path: + return self.root / "config" / "loginusers.vdf" + + # Users + @CachedProperty + def most_recent_user(self) -> Optional[LoginUser]: + try: + with open(self.loginusers_vdf) as f: + data = _vdf.parse(f) + for id, info in data["users"].items(): + if info["mostrecent"] == "1": + return LoginUser(self, int(id), info) + except KeyError: + pass + return None + + def get_userdata_path(self, user_id: Union[int, LoginUser]) -> Path: + if isinstance(user_id, LoginUser): + user_id = user_id.account_id + return self.root / "userdata" / str(user_id) + + # Game/App Library + @CachedProperty + def library_folder_paths(self) -> List[Path]: + with open(self.libraryfolders_vdf) as f: + return [Path(v) for k,v in _vdf.parse(f)["LibraryFolders"].items() if k.isdigit()] + + @CachedProperty + def library_folders(self) -> List[LibraryFolder]: + return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths] #pylint:disable=not-an-iterable + + @property + def apps(self) -> Iterable[App]: + for lf in self.library_folders: #pylint:disable=not-an-iterable + yield from lf.apps + + def get_app(self, id: int) -> Optional[App]: + for lf in self.library_folders: #pylint:disable=not-an-iterable + app = lf.get_app(id) + if app is not None: + return app + + def find_apps(self, pattern: str) -> Iterable[App]: + for lf in self.library_folders: #pylint:disable=not-an-iterable + yield from lf.find_apps(pattern) + + def find_app(self, pattern: str) -> Optional[App]: + for app in self.find_apps(pattern): + return app + + diff --git a/lib/python/vdfparser.py b/lib/python/vdfparser.py index 6336bf1..8624a6d 100644 --- a/lib/python/vdfparser.py +++ b/lib/python/vdfparser.py @@ -1,12 +1,16 @@ #!/usr/bin/env python3 # Parse Steam/Source VDF Files # Reference: https://developer.valvesoftware.com/wiki/KeyValues#File_Format -# (c) 2015 Taeyeon Mori; CC-BY-SA +# (c) 2015-2020 Taeyeon Mori; CC-BY-SA from __future__ import unicode_literals import io +from typing import Dict, Union + +DeepDict = Dict[str, Union[str, "DeepDict"]] + class VdfParser: """ @@ -113,13 +117,13 @@ class VdfParser: else: current.append(c) - def parse(self, fd): + def parse(self, fd) -> DeepDict: """ Parse a VDF file into a python dictionary """ return self._parse_map(fd) - def parse_string(self, content): + def parse_string(self, content) -> DeepDict: """ Parse the content of a VDF file """ @@ -137,7 +141,7 @@ class VdfParser: def write(str=None, i=False, d=False, nl=False): if str: fd.write(str) - if delim: + if d: fd.write(" ") else: @@ -165,7 +169,7 @@ class VdfParser: write(self._make_literal(v)) write(d=1, nl=1) - def write(self, fd, dictionary, *, pretty=True): + def write(self, fd, dictionary: DeepDict, *, pretty=True): """ Write a dictionary to a file in VDF format """