python/steamsync: Add proper support for Wine

Previously only supported through Steam Proton
master
Taeyeon Mori 3 years ago
parent a8e3ae2ae7
commit d8d4401707
  1. 495
      lib/python/steamsync.py

@ -1,7 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import sys, os import sys, os
import fnmatch import fnmatch
import re import re
import itertools import itertools
@ -11,16 +10,60 @@ import shutil
import tarfile import tarfile
import time import time
from pathlib import Path from abc import ABCMeta, abstractmethod
from typing import Tuple, Dict, List, Union, Set, Callable, Any from copy import copy
from getpass import getuser
from pathlib import PurePath, Path, PureWindowsPath
from typing import Iterable, Tuple, Dict, List, Union, Set, Callable, Any, Optional, TypeVar, Generic, Sequence, overload
from warnings import warn
from steamutil import Steam, App, CachedProperty, MalformedManifestError from propex import CachedProperty, SettableCachedProperty
from steamutil import Steam, App
### -----------------------------------------------------------------
# Clone Abstraction
### -----------------------------------------------------------------
_Clone = TypeVar("_Clone", bound="Cloneable")
class Cloneable:
__slots__ = ()
def clone(self: _Clone, **update) -> _Clone:
# XXX: Implement directly?
obj = copy(self)
for k, v in update.items():
setattr(obj, k, v)
return obj
### ----------------------------------------------------------------- ### -----------------------------------------------------------------
# Sync Abstractions # Sync Abstractions
### ----------------------------------------------------------------- ### -----------------------------------------------------------------
class SyncPath: PathOrStr = Union[PurePath, os.PathLike, str]
_SyncPath = TypeVar("_SyncPath", bound="SyncPath")
class ISyncContext:
target_path: Path
home_path: Path
class ISyncOp(metaclass=ABCMeta):
parent: ISyncContext
@property
@abstractmethod
def target_path(self) -> Path: ...
@abstractmethod
def _do_copy(self, ops: List[Tuple[Path, Path]]) -> bool: ...
@abstractmethod
def report_error(self, msg: Sequence[str]): ...
class SyncPath(Cloneable):
""" """
A SyncPath represents a pair of paths: A SyncPath represents a pair of paths:
local/common ; target/common local/common ; target/common
@ -38,33 +81,31 @@ class SyncPath:
""" """
__slots__ = "op", "local", "common" __slots__ = "op", "local", "common"
op: 'AbstractSyncOp' op: ISyncOp
local: Path local: Path
common: Path common: PurePath
def __init__(self, op, local, common="."): def __init__(self, op: ISyncOp, local: PathOrStr, common: PathOrStr="."):
self.op = op self.op = op
self.local = Path(local) self.local = Path(local)
self.common = Path(common) self.common = PurePath(common)
## Change paths
def prefix(self, component: Union[str, Path]) -> 'SyncPath':
""" Return a new SyncPath that has a component prefixed to the local path """
return SyncPath(self.op, self.local / component, self.common)
def __truediv__(self, component: Union[str, Path]) -> 'SyncPath':
""" Return a new SyncPath that nas a component added """
return SyncPath(self.op, self.local, self.common / component)
## Retrieve paths
@property @property
def path(self) -> Path: def path(self) -> Path:
""" Get the local path """ return Path(self.local, self.common)
return self.local / self.common
def exists(self) -> bool: def exists(self) -> bool:
""" Chech whether local path exists """ """ Chech whether local path exists """
return self.path.exists() return self.path.exists()
## Change paths
def prefix(self: _SyncPath, component: PathOrStr) -> _SyncPath:
""" Return a new SyncPath that has a component prefixed to the local path """
return self.clone(local=self.local/component)
def __truediv__(self: _SyncPath, component: PathOrStr) -> _SyncPath:
""" Return a new SyncPath that nas a component added """
return self.clone(common=self.common/component)
@property @property
def target_path(self) -> Path: def target_path(self) -> Path:
@ -80,7 +121,11 @@ class SyncPath:
pass pass
class _SyncSetCommon: class _SyncSetCommon(metaclass=ABCMeta):
files_from_local: CachedProperty[Set[Path]]
files_from_target: CachedProperty[Set[Path]]
files_unmodified: CachedProperty[Set[Path]]
def show_confirm(self, skip=True) -> bool: def show_confirm(self, skip=True) -> bool:
# XXX: move to SyncOp? # XXX: move to SyncOp?
print(" Local is newer: ", ", ".join(map(str, self.files_from_local))) print(" Local is newer: ", ", ".join(map(str, self.files_from_local)))
@ -96,6 +141,13 @@ class _SyncSetCommon:
if resp.lower() in ("y", "yes", ""): if resp.lower() in ("y", "yes", ""):
return True return True
return False return False
@abstractmethod
def commit(self, *, make_inconsistent=False) -> bool: ...
def execute(self, *, make_inconsistent=False) -> bool:
warn("SyncSet.execute() was renamed to commit()", DeprecationWarning)
return self.commit(make_inconsistent=make_inconsistent)
class SyncSet(_SyncSetCommon): class SyncSet(_SyncSetCommon):
@ -105,7 +157,7 @@ class SyncSet(_SyncSetCommon):
""" """
FileStatSet = Dict[Path, Tuple[Path, os.stat_result]] FileStatSet = Dict[Path, Tuple[Path, os.stat_result]]
op: 'AbstractSyncOp' op: ISyncOp
spath: SyncPath spath: SyncPath
local: FileStatSet local: FileStatSet
target: FileStatSet target: FileStatSet
@ -193,7 +245,7 @@ class SyncSet(_SyncSetCommon):
for name, (path, _) in self.target.items(): for name, (path, _) in self.target.items():
tf.add(path, name) tf.add(path, name)
def execute(self, *, make_inconsistent=False) -> bool: def commit(self, *, make_inconsistent=False) -> bool:
operations = [] operations = []
if self.files_from_local: if self.files_from_local:
if self.files_from_target and not make_inconsistent: if self.files_from_target and not make_inconsistent:
@ -209,58 +261,268 @@ class SyncSet(_SyncSetCommon):
class SyncMultiSet(list, _SyncSetCommon): class SyncMultiSet(list, _SyncSetCommon):
""" Provides a convenient interface to a number of SyncSets """ """ Provides a convenient interface to a number of SyncSets """
### XXX: Is this safe with how files_* do relative paths??
def _union_set(self, attrname) -> Set[Path]: def _union_set(self, attrname) -> Set[Path]:
if not self: if not self:
return set() return set()
return functools.reduce(operator.or_, map(operator.attrgetter(attrname), self)) return functools.reduce(operator.or_, map(operator.attrgetter(attrname), self))
@property @CachedProperty
def files_from_local(self) -> Set[Path]: def files_from_local(self) -> Set[Path]:
return self._union_set("files_from_local") return self._union_set("files_from_local")
@property @CachedProperty
def files_from_target(self) -> Set[Path]: def files_from_target(self) -> Set[Path]:
return self._union_set("files_from_target") return self._union_set("files_from_target")
@property @CachedProperty
def files_unmodified(self) -> Set[Path]: def files_unmodified(self) -> Set[Path]:
return self._union_set("files_unmodified") return self._union_set("files_unmodified")
def execute(self, *, make_inconsistent=False) -> bool: def commit(self, *, make_inconsistent=False) -> bool:
res = True
for sset in self: for sset in self:
sset.execute(make_inconsistent=make_inconsistent) res = res and sset.execute(make_inconsistent=make_inconsistent)
return res
### -----------------------------------------------------------------
# Common Paths
### -----------------------------------------------------------------
P = TypeVar('P', Path, SyncPath)
class AbstractCommonPaths:
class Common(Generic[P], metaclass=ABCMeta):
## Abstract
@abstractmethod
def _path_factory(self, path: PathOrStr) -> P: pass
## Basic
parent: ISyncContext
def __init__(self, *, parent):
self.parent = parent
## Platform
is_wine: bool
is_windows: bool
is_native_windows: bool
is_native_linux: bool
## Common paths
@property
def home(self) -> P:
return self._path_factory(self.parent.home_path)
def from_(self, path: PathOrStr) -> P:
return self._path_factory(path)
class WindowsCommon(Common[P]):
is_windows: bool = True
is_native_linux: bool = False
@property
@abstractmethod
def drive_c(self) -> P: pass
# abstract attribute
my_documents: CachedProperty[P]
class Windows(WindowsCommon[P]):
is_native_windows: bool = True
is_wine: bool = False
@property
def drive_c(self) -> P:
return self._path_factory("C:\\")
@CachedProperty
def my_documents(self) -> P:
""" Get the Windows "My Documents" folder """
def get_my_documents():
import ctypes.wintypes
CSIDL_PERSONAL = 5 # My Documents
SHGFP_TYPE_CURRENT = 0 # Get current, not default value
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf)
return buf.value
return self._path_factory(get_my_documents())
class Wine(Common[P]):
_wine_prefix: Path
def __init__(self, *, prefix, **kwds):
super().__init__(**kwds)
self._wine_prefix = prefix
@property
def wine_prefix(self) -> P:
return self._path_factory(self._wine_prefix)
@property
def drive_c(self) -> P:
return self._path_factory(self._wine_prefix / "drive_c")
@overload
@staticmethod
def _find_file_ci(path: Path, candidates: Sequence[str], exclude: None=None) -> List[Path]: ...
@overload
@staticmethod
def _find_file_ci(path: Path, candidates: None, exclude: Sequence[str]) -> List[Path]: ...
@overload
@staticmethod
def _find_file_ci(path: Path, candidates: Sequence[str], exclude: Sequence[str]) -> List[Path]: ...
@staticmethod
def _find_file_ci(path: Path, candidates: Optional[Sequence[str]]=None, exclude: Optional[Sequence[str]]=None) -> List[Path]:
entries: Dict[str, Path] = {p.name.lower(): p for p in path.iterdir() if p.is_dir()}
results: List[Path] = []
if candidates is not None:
def gen():
for name in candidates:
p = entries.get(name)
if p is not None:
yield p
results.extend(gen())
if exclude is not None:
results.extend((path for name, path in entries.items() if name not in exclude and path not in results))
return results
@CachedProperty
def _wine_prefix_userprofile(self) -> Path:
## Try to find out the username in the prefix
## usually, this is the same as the system user, but
## e.g. Proton always uses 'steamuser'
# XXX: make user name configurable or at least cache it?
# BUG: mypy#7781 overload staticmethod is broken when called on instance
candidates = self.__class__._find_file_ci(self._wine_prefix / "drive_c" / "users", [getuser().lower(), 'steamuser'], ['public'])
if not candidates:
raise FileNotFoundError(f"Could not detect userprofile path in wine prefix {self.wine_prefix}")
# XXX: be smarter?
return candidates[0]
@property
def home(self) -> P:
return self._path_factory(self._wine_prefix_userprofile)
@CachedProperty
def my_documents(self) -> P:
""" Get the Windows "My Documents" folder """
ppath = self._wine_prefix_userprofile
# BUG: mypy#7781 overload staticmethod is broken when called on instance
candidates = self.__class__._find_file_ci(ppath, ['my documents', 'documents'])
if not candidates:
raise FileNotFoundError(f"Could not find 'My Documents' folder in profile at '{ppath}'")
return self._path_factory(candidates[0])
class Linux(Common[P]):
is_native_linux: bool = True
is_native_windows: bool = False
is_windows: bool = False
is_wine: bool = False
## XDG
# XXX: make it methods and search all locations?
@CachedProperty
def xdg_config_dir(self) -> P:
raise NotImplementedError()
@CachedProperty
def xdg_data_dir(self) -> P:
raise NotImplementedError()
class CommonPaths:
class Mixin(AbstractCommonPaths.Common[Path]):
def _path_factory(self, p: PathOrStr) -> Path:
return Path(p)
class LinuxPaths(AbstractCommonPaths.Linux[Path], Mixin): pass
class WindowsPaths(AbstractCommonPaths.Windows[Path], Mixin): pass
class WinePaths(AbstractCommonPaths.Wine[Path], Mixin): pass
Paths = Union[LinuxPaths, WindowsPaths, WinePaths]
NativePaths = Union[LinuxPaths, WindowsPaths]
@overload
@classmethod
def create(c, parent: ISyncContext, wine_prefix: None) -> NativePaths: ...
@overload
@classmethod
def create(c, parent: ISyncContext, wine_prefix: Path) -> WinePaths: ...
@classmethod
def create(c, parent: ISyncContext, wine_prefix: Optional[Path]) -> Paths:
if wine_prefix is not None:
return c.WinePaths(parent=parent, prefix=wine_prefix)
elif sys.platform == 'win32':
return c.WindowsPaths(parent=parent)
else:
return c.LinuxPaths(parent=parent)
class CommonSyncPaths:
class Mixin(AbstractCommonPaths.Common[SyncPath]):
op: 'AbstractSyncOp'
def __init__(self, *, op: 'AbstractSyncOp', **kwds):
# Not sure why this complains. Maybe because of the **kwds?
super().__init__(parent=op.parent, **kwds) #type: ignore
self.op = op
def _path_factory(self, p: PathOrStr) -> SyncPath:
return SyncPath(self.op, p)
class LinuxPaths(AbstractCommonPaths.Linux[SyncPath], Mixin): pass
class WindowsPaths(AbstractCommonPaths.Windows[SyncPath], Mixin): pass
class WinePaths(AbstractCommonPaths.Wine[SyncPath], Mixin): pass
Paths = Union[LinuxPaths, WindowsPaths, WinePaths]
@classmethod
def create(c, op: 'AbstractSyncOp', wine_prefix: Optional[Path]) -> Paths:
if wine_prefix is not None:
return c.WinePaths(op=op, prefix=wine_prefix)
elif sys.platform == 'win32':
return c.WindowsPaths(op=op)
else:
return c.LinuxPaths(op=op)
### ----------------------------------------------------------------- ### -----------------------------------------------------------------
# Sync Operation # Sync Operation
### ----------------------------------------------------------------- ### -----------------------------------------------------------------
class AbstractSyncOp: _AbstractSyncOp = TypeVar("_AbstractSyncOp", bound="AbstractSyncOp")
parent: 'SteamSync'
class AbstractSyncOp(ISyncOp):
parent: ISyncContext
name: str # Abstract
def __init__(self, parent: 'SteamSync'): def __init__(self, parent: ISyncContext):
self.parent = parent self.parent = parent
# Paths
@CachedProperty
def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, None)
# Properties def __getattr__(self, name):
@property return getattr(self.paths, name)
def name(self):
""" Name of the app """
raise NotImplementedError()
@property # Properties
@SettableCachedProperty
def slug(self): def slug(self):
""" Name of the destination folder """ """ Name of the destination folder """
return self.name return self.name
@slug.setter
def slug(self, value: str):
dict(self)["slug"] = value
@property @property
def target_path(self) -> Path: def target_path(self) -> Path:
""" Full path to copy saves to """ """ Full path to copy saves to """
return self.parent.target_path / self.slug return self.parent.target_path / self.slug
def __call__(self, func: Callable[['AbstractSyncOp'], Any]): def __call__(self: _AbstractSyncOp, func: Callable[[_AbstractSyncOp], Any]):
# For decorator use # For decorator use
self._report_begin() self._report_begin()
return func(self) return func(self)
@ -281,43 +543,9 @@ class AbstractSyncOp:
print("\033[34mNow Synchronizing App \033[36m%s\033[34m (%s)\033[0m" print("\033[34mNow Synchronizing App \033[36m%s\033[34m (%s)\033[0m"
% (self.name, self.__class__.__name__.replace("SyncOp", ""))) % (self.name, self.__class__.__name__.replace("SyncOp", "")))
def report_error(self, msg: List[str]): def report_error(self, msg: Iterable[str]):
print("\033[31m"+"\n".join(" " + l for l in msg)+"\033[0m") print("\033[31m"+"\n".join(" " + l for l in msg)+"\033[0m")
# Start from here
@property
def home(self):
return SyncPath(self, self.parent.home_path)
@CachedProperty
def my_documents(self) -> SyncPath:
""" Get the Windows "My Documents" folder """
if sys.platform == "win32":
def get_my_documents():
import ctypes.wintypes
CSIDL_PERSONAL = 5 # My Documents
SHGFP_TYPE_CURRENT = 0 # Get current, not default value
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf)
return buf.value
return SyncPath(self, get_my_documents())
else:
raise RuntimeError("Platform has unknown My Documents location")
def from_(self, path: Path) -> SyncPath:
return SyncPath(self, path)
class GenericSyncOp(AbstractSyncOp):
""" Generic Sync Operation for Non-Steam Apps """
name: str = None
def __init__(self, parent, name):
super().__init__(parent)
self.name = name
class SteamSyncOp(AbstractSyncOp): class SteamSyncOp(AbstractSyncOp):
""" Sync Operation for Steam Apps """ """ Sync Operation for Steam Apps """
@ -326,6 +554,10 @@ class SteamSyncOp(AbstractSyncOp):
def __init__(self, ssync, app): def __init__(self, ssync, app):
super().__init__(ssync) super().__init__(ssync)
self.app = app self.app = app
@CachedProperty
def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, self.app.compat_prefix if self.app.is_proton_app else None)
## Implement AbstractSyncOp ## Implement AbstractSyncOp
@property @property
@ -341,18 +573,6 @@ class SteamSyncOp(AbstractSyncOp):
def game_directory(self) -> SyncPath: def game_directory(self) -> SyncPath:
return SyncPath(self, self.app.install_path) return SyncPath(self, self.app.install_path)
@CachedProperty
def my_documents(self) -> SyncPath:
if sys.platform.startswith("linux"):
# TODO: what about native games?
return SyncPath(self, self.app.compat_drive / "users/steamuser/My Documents")
else:
return super().my_documents
@property
def user_home(self) -> SyncPath:
return SyncPath(self, self.parent.home_path)
## Steam Cloud ## Steam Cloud
def steam_cloud_ufs(self) -> SyncMultiSet: def steam_cloud_ufs(self) -> SyncMultiSet:
if "ufs" not in self.app.appinfo["appinfo"] or "savefiles" not in self.app.appinfo["appinfo"]["ufs"]: if "ufs" not in self.app.appinfo["appinfo"] or "savefiles" not in self.app.appinfo["appinfo"]["ufs"]:
@ -374,9 +594,9 @@ class SteamSyncOp(AbstractSyncOp):
# Find root anchor # Find root anchor
root = ufs_def["root"] root = ufs_def["root"]
if root == "WinMyDocuments": if root == "WinMyDocuments":
path = self.my_documents path = self.paths.my_documents
elif root in ("LinuxHome", "MacHome"): elif root in ("LinuxHome", "MacHome"):
path = self.user_home path = self.paths.home
else: else:
raise NotImplementedError("Steam Cloud UFS root %s not implemented for %r" % (root, self.app)) raise NotImplementedError("Steam Cloud UFS root %s not implemented for %r" % (root, self.app))
@ -401,6 +621,38 @@ class SteamSyncOp(AbstractSyncOp):
return sms return sms
class GenericSyncOp(AbstractSyncOp):
""" Generic Sync Operation for Non-Steam Apps """
def __init__(self, parent, name):
super().__init__(parent)
self.name = name
class GenericFoundSyncOp(GenericSyncOp):
_found: Path
def __init__(self, parent, name, found: Path):
super().__init__(parent, name)
self._found = found
@property
def found(self) -> SyncPath:
return SyncPath(self, self._found)
class WineSyncOp(GenericFoundSyncOp):
_wine_prefix: Path
def __init__(self, parent, name, prefix: Path, found: Path):
super().__init__(parent, name, found)
self._wine_prefix = prefix
@CachedProperty
def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, self._wine_prefix)
class SyncNoOp: class SyncNoOp:
""" No-Op Sync Operation """ """ No-Op Sync Operation """
def __call__(self, func) -> None: def __call__(self, func) -> None:
@ -415,23 +667,60 @@ AppNotFound = SyncNoOp()
### ----------------------------------------------------------------- ### -----------------------------------------------------------------
# Main Sync manager class # Main Sync manager class
### ----------------------------------------------------------------- ### -----------------------------------------------------------------
class SteamSync: class NoSteamSync(ISyncContext):
target_path: Path target_path: Path
steam: Steam
home_path: Path home_path: Path
def __init__(self, target_path: Path, *, steam_path: Path = None): def __init__(self, target_path: Path):
self.target_path = Path(target_path) self.target_path = Path(target_path)
self.steam = Steam(steam_path)
self.home_path = Path.home() self.home_path = Path.home()
@CachedProperty
def paths(self) -> CommonPaths.NativePaths:
return CommonPaths.create(self, None)
def generic(self, name, find: Optional[Callable[[CommonPaths.NativePaths], Path]], *, platform=None) -> Union[GenericSyncOp, SyncNoOp]:
""" Non-Steam App """
if platform is None or platform in sys.platform:
if find is None:
return GenericSyncOp(self, name)
search_path = find(self.paths)
if search_path.exists():
return GenericFoundSyncOp(self, name, search_path)
return AppNotFound
def wine(self, name, prefixes: Sequence[PathOrStr], find: Callable[[CommonPaths.Paths], Path]) -> Union[WineSyncOp, GenericFoundSyncOp, SyncNoOp]:
"""
Works the same as .generic() on Windows, but additionally searches any number of Wine-Prefixes when not running on Windows
"""
if sys.platform == 'win32':
search_path = find(self.paths)
if search_path.exists():
return GenericFoundSyncOp(self, name, search_path)
else:
for prefix in prefixes:
prefixpath = Path(prefix)
paths = CommonPaths.create(self, prefixpath)
search_path = find(paths)
if search_path.exists():
return WineSyncOp(self, name, prefixpath, search_path)
return AppNotFound
class SteamSync(NoSteamSync):
steam: Steam
def __init__(self, target_path: Path, *, steam_path: Path = None):
super().__init__(target_path)
self.steam = Steam(steam_path)
# Get Information # Get Information
@CachedProperty @CachedProperty
def apps(self) -> List[App]: def apps(self) -> List[App]:
return list(self.steam.apps) return list(self.steam.apps)
# Get Sync Operation for a specific App # Get Sync Operation for a specific App
def by_id(self, appid): def by_id(self, appid: int) -> Union[SteamSyncOp, SyncNoOp]:
""" Steam App by AppID """ """ Steam App by AppID """
app = self.steam.get_app(appid) app = self.steam.get_app(appid)
if app is not None: if app is not None:
@ -439,11 +728,11 @@ class SteamSync:
else: else:
return AppNotFound return AppNotFound
def by_name(self, pattern): def by_name(self, pattern: str) -> Union[SteamSyncOp, SyncNoOp]:
""" Steam App by Name """ """ Steam App by Name """
pt = re.compile(fnmatch.translate(pattern).rstrip("\\Z"), re.IGNORECASE) pt = re.compile(fnmatch.translate(pattern).rstrip("\\Z"), re.IGNORECASE)
app = None app = None
for candidate in self.apps: #pylint:disable=not-an-iterable for candidate in self.apps:
if pt.search(candidate.name): if pt.search(candidate.name):
if app is not None: if app is not None:
raise Exception("Encountered more than one possible App matching '%s'" % pattern) raise Exception("Encountered more than one possible App matching '%s'" % pattern)
@ -451,9 +740,3 @@ class SteamSync:
if app is None: if app is None:
return AppNotFound return AppNotFound
return SteamSyncOp(self, app) return SteamSyncOp(self, app)
def generic(self, name, *, platform=None):
""" Non-Steam App """
if platform is not None and platform not in sys.platform:
return AppNotFound
return GenericSyncOp(self, name)

Loading…
Cancel
Save