diff --git a/lib/python/steamsync.py b/lib/python/steamsync.py index 5a1ca95..ecd1c1e 100644 --- a/lib/python/steamsync.py +++ b/lib/python/steamsync.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import sys, os - import fnmatch import re import itertools @@ -11,16 +10,60 @@ import shutil import tarfile import time -from pathlib import Path -from typing import Tuple, Dict, List, Union, Set, Callable, Any +from abc import ABCMeta, abstractmethod +from copy import copy +from getpass import getuser +from pathlib import PurePath, Path, PureWindowsPath +from typing import Iterable, Tuple, Dict, List, Union, Set, Callable, Any, Optional, TypeVar, Generic, Sequence, overload +from warnings import warn -from steamutil import Steam, App, CachedProperty, MalformedManifestError +from propex import CachedProperty, SettableCachedProperty +from steamutil import Steam, App + + +### ----------------------------------------------------------------- +# Clone Abstraction +### ----------------------------------------------------------------- +_Clone = TypeVar("_Clone", bound="Cloneable") + +class Cloneable: + __slots__ = () + + def clone(self: _Clone, **update) -> _Clone: + # XXX: Implement directly? + obj = copy(self) + for k, v in update.items(): + setattr(obj, k, v) + return obj ### ----------------------------------------------------------------- # Sync Abstractions ### ----------------------------------------------------------------- -class SyncPath: +PathOrStr = Union[PurePath, os.PathLike, str] +_SyncPath = TypeVar("_SyncPath", bound="SyncPath") + + +class ISyncContext: + target_path: Path + home_path: Path + + +class ISyncOp(metaclass=ABCMeta): + parent: ISyncContext + + @property + @abstractmethod + def target_path(self) -> Path: ... + + @abstractmethod + def _do_copy(self, ops: List[Tuple[Path, Path]]) -> bool: ... + + @abstractmethod + def report_error(self, msg: Sequence[str]): ... + + +class SyncPath(Cloneable): """ A SyncPath represents a pair of paths: local/common ; target/common @@ -38,33 +81,31 @@ class SyncPath: """ __slots__ = "op", "local", "common" - op: 'AbstractSyncOp' + op: ISyncOp local: Path - common: Path + common: PurePath - def __init__(self, op, local, common="."): + def __init__(self, op: ISyncOp, local: PathOrStr, common: PathOrStr="."): self.op = op self.local = Path(local) - self.common = Path(common) - - ## Change paths - def prefix(self, component: Union[str, Path]) -> 'SyncPath': - """ Return a new SyncPath that has a component prefixed to the local path """ - return SyncPath(self.op, self.local / component, self.common) - - def __truediv__(self, component: Union[str, Path]) -> 'SyncPath': - """ Return a new SyncPath that nas a component added """ - return SyncPath(self.op, self.local, self.common / component) - - ## Retrieve paths + self.common = PurePath(common) + @property def path(self) -> Path: - """ Get the local path """ - return self.local / self.common + return Path(self.local, self.common) def exists(self) -> bool: """ Chech whether local path exists """ return self.path.exists() + + ## Change paths + def prefix(self: _SyncPath, component: PathOrStr) -> _SyncPath: + """ Return a new SyncPath that has a component prefixed to the local path """ + return self.clone(local=self.local/component) + + def __truediv__(self: _SyncPath, component: PathOrStr) -> _SyncPath: + """ Return a new SyncPath that nas a component added """ + return self.clone(common=self.common/component) @property def target_path(self) -> Path: @@ -80,7 +121,11 @@ class SyncPath: pass -class _SyncSetCommon: +class _SyncSetCommon(metaclass=ABCMeta): + files_from_local: CachedProperty[Set[Path]] + files_from_target: CachedProperty[Set[Path]] + files_unmodified: CachedProperty[Set[Path]] + def show_confirm(self, skip=True) -> bool: # XXX: move to SyncOp? print(" Local is newer: ", ", ".join(map(str, self.files_from_local))) @@ -96,6 +141,13 @@ class _SyncSetCommon: if resp.lower() in ("y", "yes", ""): return True return False + + @abstractmethod + def commit(self, *, make_inconsistent=False) -> bool: ... + + def execute(self, *, make_inconsistent=False) -> bool: + warn("SyncSet.execute() was renamed to commit()", DeprecationWarning) + return self.commit(make_inconsistent=make_inconsistent) class SyncSet(_SyncSetCommon): @@ -105,7 +157,7 @@ class SyncSet(_SyncSetCommon): """ FileStatSet = Dict[Path, Tuple[Path, os.stat_result]] - op: 'AbstractSyncOp' + op: ISyncOp spath: SyncPath local: FileStatSet target: FileStatSet @@ -193,7 +245,7 @@ class SyncSet(_SyncSetCommon): for name, (path, _) in self.target.items(): tf.add(path, name) - def execute(self, *, make_inconsistent=False) -> bool: + def commit(self, *, make_inconsistent=False) -> bool: operations = [] if self.files_from_local: if self.files_from_target and not make_inconsistent: @@ -209,58 +261,268 @@ class SyncSet(_SyncSetCommon): class SyncMultiSet(list, _SyncSetCommon): """ Provides a convenient interface to a number of SyncSets """ + ### XXX: Is this safe with how files_* do relative paths?? def _union_set(self, attrname) -> Set[Path]: if not self: return set() return functools.reduce(operator.or_, map(operator.attrgetter(attrname), self)) - @property + @CachedProperty def files_from_local(self) -> Set[Path]: return self._union_set("files_from_local") - @property + @CachedProperty def files_from_target(self) -> Set[Path]: return self._union_set("files_from_target") - @property + @CachedProperty def files_unmodified(self) -> Set[Path]: return self._union_set("files_unmodified") - def execute(self, *, make_inconsistent=False) -> bool: + def commit(self, *, make_inconsistent=False) -> bool: + res = True for sset in self: - sset.execute(make_inconsistent=make_inconsistent) + res = res and sset.execute(make_inconsistent=make_inconsistent) + return res + + +### ----------------------------------------------------------------- +# Common Paths +### ----------------------------------------------------------------- +P = TypeVar('P', Path, SyncPath) + +class AbstractCommonPaths: + class Common(Generic[P], metaclass=ABCMeta): + ## Abstract + @abstractmethod + def _path_factory(self, path: PathOrStr) -> P: pass + + ## Basic + parent: ISyncContext + + def __init__(self, *, parent): + self.parent = parent + + ## Platform + is_wine: bool + is_windows: bool + is_native_windows: bool + is_native_linux: bool + + ## Common paths + @property + def home(self) -> P: + return self._path_factory(self.parent.home_path) + + def from_(self, path: PathOrStr) -> P: + return self._path_factory(path) + + class WindowsCommon(Common[P]): + is_windows: bool = True + is_native_linux: bool = False + + @property + @abstractmethod + def drive_c(self) -> P: pass + + # abstract attribute + my_documents: CachedProperty[P] + + class Windows(WindowsCommon[P]): + is_native_windows: bool = True + is_wine: bool = False + + @property + def drive_c(self) -> P: + return self._path_factory("C:\\") + + @CachedProperty + def my_documents(self) -> P: + """ Get the Windows "My Documents" folder """ + def get_my_documents(): + import ctypes.wintypes + CSIDL_PERSONAL = 5 # My Documents + SHGFP_TYPE_CURRENT = 0 # Get current, not default value + + buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) + ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf) + + return buf.value + return self._path_factory(get_my_documents()) + + class Wine(Common[P]): + _wine_prefix: Path + + def __init__(self, *, prefix, **kwds): + super().__init__(**kwds) + self._wine_prefix = prefix + + @property + def wine_prefix(self) -> P: + return self._path_factory(self._wine_prefix) + + @property + def drive_c(self) -> P: + return self._path_factory(self._wine_prefix / "drive_c") + + @overload + @staticmethod + def _find_file_ci(path: Path, candidates: Sequence[str], exclude: None=None) -> List[Path]: ... + @overload + @staticmethod + def _find_file_ci(path: Path, candidates: None, exclude: Sequence[str]) -> List[Path]: ... + @overload + @staticmethod + def _find_file_ci(path: Path, candidates: Sequence[str], exclude: Sequence[str]) -> List[Path]: ... + + @staticmethod + def _find_file_ci(path: Path, candidates: Optional[Sequence[str]]=None, exclude: Optional[Sequence[str]]=None) -> List[Path]: + entries: Dict[str, Path] = {p.name.lower(): p for p in path.iterdir() if p.is_dir()} + results: List[Path] = [] + if candidates is not None: + def gen(): + for name in candidates: + p = entries.get(name) + if p is not None: + yield p + results.extend(gen()) + if exclude is not None: + results.extend((path for name, path in entries.items() if name not in exclude and path not in results)) + return results + + @CachedProperty + def _wine_prefix_userprofile(self) -> Path: + ## Try to find out the username in the prefix + ## usually, this is the same as the system user, but + ## e.g. Proton always uses 'steamuser' + # XXX: make user name configurable or at least cache it? + # BUG: mypy#7781 overload staticmethod is broken when called on instance + candidates = self.__class__._find_file_ci(self._wine_prefix / "drive_c" / "users", [getuser().lower(), 'steamuser'], ['public']) + if not candidates: + raise FileNotFoundError(f"Could not detect userprofile path in wine prefix {self.wine_prefix}") + # XXX: be smarter? + return candidates[0] + + @property + def home(self) -> P: + return self._path_factory(self._wine_prefix_userprofile) + + @CachedProperty + def my_documents(self) -> P: + """ Get the Windows "My Documents" folder """ + ppath = self._wine_prefix_userprofile + # BUG: mypy#7781 overload staticmethod is broken when called on instance + candidates = self.__class__._find_file_ci(ppath, ['my documents', 'documents']) + if not candidates: + raise FileNotFoundError(f"Could not find 'My Documents' folder in profile at '{ppath}'") + return self._path_factory(candidates[0]) + + class Linux(Common[P]): + is_native_linux: bool = True + is_native_windows: bool = False + is_windows: bool = False + is_wine: bool = False + + ## XDG + # XXX: make it methods and search all locations? + @CachedProperty + def xdg_config_dir(self) -> P: + raise NotImplementedError() + + @CachedProperty + def xdg_data_dir(self) -> P: + raise NotImplementedError() + + +class CommonPaths: + class Mixin(AbstractCommonPaths.Common[Path]): + def _path_factory(self, p: PathOrStr) -> Path: + return Path(p) + + class LinuxPaths(AbstractCommonPaths.Linux[Path], Mixin): pass + class WindowsPaths(AbstractCommonPaths.Windows[Path], Mixin): pass + class WinePaths(AbstractCommonPaths.Wine[Path], Mixin): pass + + Paths = Union[LinuxPaths, WindowsPaths, WinePaths] + NativePaths = Union[LinuxPaths, WindowsPaths] + + @overload + @classmethod + def create(c, parent: ISyncContext, wine_prefix: None) -> NativePaths: ... + @overload + @classmethod + def create(c, parent: ISyncContext, wine_prefix: Path) -> WinePaths: ... + + @classmethod + def create(c, parent: ISyncContext, wine_prefix: Optional[Path]) -> Paths: + if wine_prefix is not None: + return c.WinePaths(parent=parent, prefix=wine_prefix) + elif sys.platform == 'win32': + return c.WindowsPaths(parent=parent) + else: + return c.LinuxPaths(parent=parent) + + +class CommonSyncPaths: + class Mixin(AbstractCommonPaths.Common[SyncPath]): + op: 'AbstractSyncOp' + + def __init__(self, *, op: 'AbstractSyncOp', **kwds): + # Not sure why this complains. Maybe because of the **kwds? + super().__init__(parent=op.parent, **kwds) #type: ignore + self.op = op + + def _path_factory(self, p: PathOrStr) -> SyncPath: + return SyncPath(self.op, p) + + class LinuxPaths(AbstractCommonPaths.Linux[SyncPath], Mixin): pass + class WindowsPaths(AbstractCommonPaths.Windows[SyncPath], Mixin): pass + class WinePaths(AbstractCommonPaths.Wine[SyncPath], Mixin): pass + + Paths = Union[LinuxPaths, WindowsPaths, WinePaths] + + @classmethod + def create(c, op: 'AbstractSyncOp', wine_prefix: Optional[Path]) -> Paths: + if wine_prefix is not None: + return c.WinePaths(op=op, prefix=wine_prefix) + elif sys.platform == 'win32': + return c.WindowsPaths(op=op) + else: + return c.LinuxPaths(op=op) ### ----------------------------------------------------------------- # Sync Operation ### ----------------------------------------------------------------- -class AbstractSyncOp: - parent: 'SteamSync' +_AbstractSyncOp = TypeVar("_AbstractSyncOp", bound="AbstractSyncOp") + +class AbstractSyncOp(ISyncOp): + parent: ISyncContext + name: str # Abstract - def __init__(self, parent: 'SteamSync'): + def __init__(self, parent: ISyncContext): self.parent = parent + + # Paths + @CachedProperty + def paths(self) -> CommonSyncPaths.Paths: + return CommonSyncPaths.create(self, None) - # Properties - @property - def name(self): - """ Name of the app """ - raise NotImplementedError() + def __getattr__(self, name): + return getattr(self.paths, name) - @property + # Properties + @SettableCachedProperty def slug(self): """ Name of the destination folder """ return self.name - @slug.setter - def slug(self, value: str): - dict(self)["slug"] = value - @property def target_path(self) -> Path: """ Full path to copy saves to """ return self.parent.target_path / self.slug - def __call__(self, func: Callable[['AbstractSyncOp'], Any]): + def __call__(self: _AbstractSyncOp, func: Callable[[_AbstractSyncOp], Any]): # For decorator use self._report_begin() return func(self) @@ -281,43 +543,9 @@ class AbstractSyncOp: print("\033[34mNow Synchronizing App \033[36m%s\033[34m (%s)\033[0m" % (self.name, self.__class__.__name__.replace("SyncOp", ""))) - def report_error(self, msg: List[str]): + def report_error(self, msg: Iterable[str]): print("\033[31m"+"\n".join(" " + l for l in msg)+"\033[0m") - # Start from here - @property - def home(self): - return SyncPath(self, self.parent.home_path) - - @CachedProperty - def my_documents(self) -> SyncPath: - """ Get the Windows "My Documents" folder """ - if sys.platform == "win32": - def get_my_documents(): - import ctypes.wintypes - CSIDL_PERSONAL = 5 # My Documents - SHGFP_TYPE_CURRENT = 0 # Get current, not default value - - buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) - ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf) - - return buf.value - return SyncPath(self, get_my_documents()) - else: - raise RuntimeError("Platform has unknown My Documents location") - - def from_(self, path: Path) -> SyncPath: - return SyncPath(self, path) - - -class GenericSyncOp(AbstractSyncOp): - """ Generic Sync Operation for Non-Steam Apps """ - name: str = None - - def __init__(self, parent, name): - super().__init__(parent) - self.name = name - class SteamSyncOp(AbstractSyncOp): """ Sync Operation for Steam Apps """ @@ -326,6 +554,10 @@ class SteamSyncOp(AbstractSyncOp): def __init__(self, ssync, app): super().__init__(ssync) self.app = app + + @CachedProperty + def paths(self) -> CommonSyncPaths.Paths: + return CommonSyncPaths.create(self, self.app.compat_prefix if self.app.is_proton_app else None) ## Implement AbstractSyncOp @property @@ -341,18 +573,6 @@ class SteamSyncOp(AbstractSyncOp): def game_directory(self) -> SyncPath: return SyncPath(self, self.app.install_path) - @CachedProperty - def my_documents(self) -> SyncPath: - if sys.platform.startswith("linux"): - # TODO: what about native games? - return SyncPath(self, self.app.compat_drive / "users/steamuser/My Documents") - else: - return super().my_documents - - @property - def user_home(self) -> SyncPath: - return SyncPath(self, self.parent.home_path) - ## Steam Cloud def steam_cloud_ufs(self) -> SyncMultiSet: if "ufs" not in self.app.appinfo["appinfo"] or "savefiles" not in self.app.appinfo["appinfo"]["ufs"]: @@ -374,9 +594,9 @@ class SteamSyncOp(AbstractSyncOp): # Find root anchor root = ufs_def["root"] if root == "WinMyDocuments": - path = self.my_documents + path = self.paths.my_documents elif root in ("LinuxHome", "MacHome"): - path = self.user_home + path = self.paths.home else: raise NotImplementedError("Steam Cloud UFS root %s not implemented for %r" % (root, self.app)) @@ -401,6 +621,38 @@ class SteamSyncOp(AbstractSyncOp): return sms +class GenericSyncOp(AbstractSyncOp): + """ Generic Sync Operation for Non-Steam Apps """ + + def __init__(self, parent, name): + super().__init__(parent) + self.name = name + + +class GenericFoundSyncOp(GenericSyncOp): + _found: Path + + def __init__(self, parent, name, found: Path): + super().__init__(parent, name) + self._found = found + + @property + def found(self) -> SyncPath: + return SyncPath(self, self._found) + + +class WineSyncOp(GenericFoundSyncOp): + _wine_prefix: Path + + def __init__(self, parent, name, prefix: Path, found: Path): + super().__init__(parent, name, found) + self._wine_prefix = prefix + + @CachedProperty + def paths(self) -> CommonSyncPaths.Paths: + return CommonSyncPaths.create(self, self._wine_prefix) + + class SyncNoOp: """ No-Op Sync Operation """ def __call__(self, func) -> None: @@ -415,23 +667,60 @@ AppNotFound = SyncNoOp() ### ----------------------------------------------------------------- # Main Sync manager class ### ----------------------------------------------------------------- -class SteamSync: +class NoSteamSync(ISyncContext): target_path: Path - steam: Steam home_path: Path - def __init__(self, target_path: Path, *, steam_path: Path = None): + def __init__(self, target_path: Path): self.target_path = Path(target_path) - self.steam = Steam(steam_path) self.home_path = Path.home() + @CachedProperty + def paths(self) -> CommonPaths.NativePaths: + return CommonPaths.create(self, None) + + def generic(self, name, find: Optional[Callable[[CommonPaths.NativePaths], Path]], *, platform=None) -> Union[GenericSyncOp, SyncNoOp]: + """ Non-Steam App """ + if platform is None or platform in sys.platform: + if find is None: + return GenericSyncOp(self, name) + search_path = find(self.paths) + if search_path.exists(): + return GenericFoundSyncOp(self, name, search_path) + return AppNotFound + + def wine(self, name, prefixes: Sequence[PathOrStr], find: Callable[[CommonPaths.Paths], Path]) -> Union[WineSyncOp, GenericFoundSyncOp, SyncNoOp]: + """ + Works the same as .generic() on Windows, but additionally searches any number of Wine-Prefixes when not running on Windows + """ + if sys.platform == 'win32': + search_path = find(self.paths) + if search_path.exists(): + return GenericFoundSyncOp(self, name, search_path) + else: + for prefix in prefixes: + prefixpath = Path(prefix) + paths = CommonPaths.create(self, prefixpath) + search_path = find(paths) + if search_path.exists(): + return WineSyncOp(self, name, prefixpath, search_path) + return AppNotFound + + +class SteamSync(NoSteamSync): + steam: Steam + + def __init__(self, target_path: Path, *, steam_path: Path = None): + super().__init__(target_path) + self.steam = Steam(steam_path) + # Get Information @CachedProperty def apps(self) -> List[App]: return list(self.steam.apps) # Get Sync Operation for a specific App - def by_id(self, appid): + def by_id(self, appid: int) -> Union[SteamSyncOp, SyncNoOp]: """ Steam App by AppID """ app = self.steam.get_app(appid) if app is not None: @@ -439,11 +728,11 @@ class SteamSync: else: return AppNotFound - def by_name(self, pattern): + def by_name(self, pattern: str) -> Union[SteamSyncOp, SyncNoOp]: """ Steam App by Name """ pt = re.compile(fnmatch.translate(pattern).rstrip("\\Z"), re.IGNORECASE) app = None - for candidate in self.apps: #pylint:disable=not-an-iterable + for candidate in self.apps: if pt.search(candidate.name): if app is not None: raise Exception("Encountered more than one possible App matching '%s'" % pattern) @@ -451,9 +740,3 @@ class SteamSync: if app is None: return AppNotFound return SteamSyncOp(self, app) - - def generic(self, name, *, platform=None): - """ Non-Steam App """ - if platform is not None and platform not in sys.platform: - return AppNotFound - return GenericSyncOp(self, name)