Dotfiles
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

744 lines
24 KiB

#!/usr/bin/env python3
import sys, os
import fnmatch
import re
import itertools
import functools
import operator
import shutil
import tarfile
import time
from abc import ABCMeta, abstractmethod
from copy import copy
from getpass import getuser
from pathlib import PurePath, Path, PureWindowsPath
from typing import Iterable, Tuple, Dict, List, Union, Set, Callable, Any, Optional, TypeVar, Generic, Sequence, overload
from warnings import warn
from propex import CachedProperty, SettableCachedProperty
from steamutil import Steam, App
### -----------------------------------------------------------------
# Clone Abstraction
### -----------------------------------------------------------------
_Clone = TypeVar("_Clone", bound="Cloneable")
class Cloneable:
__slots__ = ()
def clone(self: _Clone, **update) -> _Clone:
# XXX: Implement directly?
obj = copy(self)
for k, v in update.items():
setattr(obj, k, v)
return obj
### -----------------------------------------------------------------
# Sync Abstractions
### -----------------------------------------------------------------
PathOrStr = Union[PurePath, os.PathLike, str]
_SyncPath = TypeVar("_SyncPath", bound="SyncPath")
class ISyncContext:
target_path: Path
home_path: Path
class ISyncOp(metaclass=ABCMeta):
parent: ISyncContext
@property
@abstractmethod
def target_path(self) -> Path: ...
@abstractmethod
def _do_copy(self, ops: List[Tuple[Path, Path]]) -> bool: ...
@abstractmethod
def report_error(self, msg: Sequence[str]): ...
class SyncPath(Cloneable):
"""
A SyncPath represents a pair of paths:
local/common ; target/common
Whereby target is the location being synched to and local is
the prefix the data is synched from on the local machine.
Common has components common to both paths.
Usually, you'd set the local prefix and then the common part.
e.g.: op.home.prefix(".my_game") / "Savegames"
whereby "Savegames" is included in the resulting target
path, but ".my_game" is not.
Note that SyncPath should be considered immutable. Relevant
methods return a new instance.
"""
__slots__ = "op", "local", "common"
op: ISyncOp
local: Path
common: PurePath
def __init__(self, op: ISyncOp, local: PathOrStr, common: PathOrStr="."):
self.op = op
self.local = Path(local)
self.common = PurePath(common)
@property
def path(self) -> Path:
return Path(self.local, self.common)
def exists(self) -> bool:
""" Chech whether local path exists """
return self.path.exists()
## Change paths
def prefix(self: _SyncPath, component: PathOrStr) -> _SyncPath:
""" Return a new SyncPath that has a component prefixed to the local path """
return self.clone(local=self.local/component)
def __truediv__(self: _SyncPath, component: PathOrStr) -> _SyncPath:
""" Return a new SyncPath that nas a component added """
return self.clone(common=self.common/component)
@property
def target_path(self) -> Path:
""" Get the sync target path """
return self.op.target_path / self.common
## Begin a SyncSet
def __enter__(self) -> 'SyncSet':
return SyncSet(self)
def __exit__(self, type, value, traceback):
# Todo: auto-commit?
pass
class _SyncSetCommon(metaclass=ABCMeta):
files_from_local: CachedProperty[Set[Path]]
files_from_target: CachedProperty[Set[Path]]
files_unmodified: CachedProperty[Set[Path]]
def show_confirm(self, skip=True) -> bool:
# XXX: move to SyncOp?
print(" Local is newer: ", ", ".join(map(str, self.files_from_local)))
print(" Target is newer: ", ", ".join(map(str, self.files_from_target)))
print(" Unmodified: ", ", ".join(map(str, self.files_unmodified)))
if skip and not self.files_from_local and not self.files_from_target:
print(" \033[31mNoting to do!\033[0m")
return False
print("Continue? <Y/n> ", end="")
resp = input().strip()
if resp.lower() in ("y", "yes", ""):
return True
return False
@abstractmethod
def commit(self, *, make_inconsistent=False) -> bool: ...
def execute(self, *, make_inconsistent=False) -> bool:
warn("SyncSet.execute() was renamed to commit()", DeprecationWarning)
return self.commit(make_inconsistent=make_inconsistent)
class SyncSet(_SyncSetCommon):
"""
A SyncSet represents a set of files to be synchronized
from a local to a target location represented by a SyncPath
"""
FileStatSet = Dict[Path, Tuple[Path, os.stat_result]]
op: ISyncOp
spath: SyncPath
local: FileStatSet
target: FileStatSet
changeset: int
backup_dir: str = "_backup"
def __init__(self, path):
self.op = path.op
self.spath = path
self.local = {}
self.target = {}
@property
def path(self) -> Path:
""" The local path """
return self.spath.path
@property
def target_path(self) -> Path:
""" The target path """
return self.spath.target_path
# Modify inclusion
def _collect_files(self, anchor: Path, patterns: List[str]):
files: SyncSet.FileStatSet = {}
def add_file(f):
if f.is_file():
relative = f.relative_to(anchor)
if relative.parts[0] == self.backup_dir:
return
files[relative] = f, f.stat()
elif f.is_dir():
for f in f.iterdir():
add_file(f)
for f in set(itertools.chain.from_iterable(anchor.glob(g) for g in patterns)):
add_file(f)
return files
def add(self, *patterns):
self.local.update(self._collect_files(self.path, patterns))
self.target.update(self._collect_files(self.target_path, patterns))
self._inval()
def __iadd__(self, pattern: str) -> 'SyncSet':
self.add(pattern)
return self
# Calculate changes
def _inval(self):
for cache in "files_from_local", "files_from_target", "files_unmodified":
if cache in self.__dict__:
del self.__dict__[cache]
@staticmethod
def _sync_set(src_files: FileStatSet, dst_files: FileStatSet) -> Set[Path]:
"""
Return a set of files that need to be updated from src to dst.
"""
return {f
for f, (_, sst) in src_files.items()
if f not in dst_files or sst.st_mtime > dst_files[f][1].st_mtime
}
@CachedProperty
def files_from_local(self) -> Set[Path]:
return self._sync_set(self.local, self.target)
@CachedProperty
def files_from_target(self) -> Set[Path]:
return self._sync_set(self.target, self.local)
@CachedProperty
def files_unmodified(self) -> Set[Path]:
return (self.local.keys() | self.target.keys()) - (self.files_from_local | self.files_from_target)
def backup(self):
if not self.files_from_local:
print(" \033[35mBackup not needed\033[0m")
else:
backup_file = self.target_path / self.backup_dir / time.strftime("%Y%m%d_%H%M.tar.xz")
print(" \033[35mBacking up to \033[36m%s\033[35m...\033[0m" % backup_file)
backup_file.parent.mkdir(parents=True,exist_ok=True)
with tarfile.open(backup_file, "x:xz") as tf:
for name, (path, _) in self.target.items():
tf.add(path, name)
def commit(self, *, make_inconsistent=False) -> bool:
operations = []
if self.files_from_local:
if self.files_from_target and not make_inconsistent:
self.op.report_error(["Both sides have changed files. Synchronizing would lead to inconsistent and possibly broken savegames."])
return False
operations += [(self.path / p, self.target_path / p) for p in self.files_from_local] #pylint:disable=not-an-iterable
if self.files_from_target:
operations += [(self.target_path / p, self.path / p) for p in self.files_from_target] #pylint:disable=not-an-iterable
return self.op._do_copy(operations)
class SyncMultiSet(list, _SyncSetCommon):
""" Provides a convenient interface to a number of SyncSets """
### XXX: Is this safe with how files_* do relative paths??
def _union_set(self, attrname) -> Set[Path]:
if not self:
return set()
return functools.reduce(operator.or_, map(operator.attrgetter(attrname), self))
@CachedProperty
def files_from_local(self) -> Set[Path]:
return self._union_set("files_from_local")
@CachedProperty
def files_from_target(self) -> Set[Path]:
return self._union_set("files_from_target")
@CachedProperty
def files_unmodified(self) -> Set[Path]:
return self._union_set("files_unmodified")
def commit(self, *, make_inconsistent=False) -> bool:
res = True
for sset in self:
res = res and sset.execute(make_inconsistent=make_inconsistent)
return res
### -----------------------------------------------------------------
# Common Paths
### -----------------------------------------------------------------
P = TypeVar('P', Path, SyncPath)
class AbstractCommonPaths:
class Common(Generic[P], metaclass=ABCMeta):
## Abstract
@abstractmethod
def _path_factory(self, path: PathOrStr) -> P: pass
## Basic
parent: ISyncContext
def __init__(self, *, parent):
self.parent = parent
## Platform
is_wine: bool
is_windows: bool
is_native_windows: bool
is_native_linux: bool
## Common paths
@property
def home(self) -> P:
return self._path_factory(self.parent.home_path)
def from_(self, path: PathOrStr) -> P:
return self._path_factory(path)
class WindowsCommon(Common[P]):
is_windows: bool = True
is_native_linux: bool = False
@property
@abstractmethod
def drive_c(self) -> P: pass
# abstract attribute
my_documents: CachedProperty[P]
class Windows(WindowsCommon[P]):
is_native_windows: bool = True
is_wine: bool = False
@property
def drive_c(self) -> P:
return self._path_factory("C:\\")
@CachedProperty
def my_documents(self) -> P:
""" Get the Windows "My Documents" folder """
def get_my_documents():
import ctypes.wintypes
CSIDL_PERSONAL = 5 # My Documents
SHGFP_TYPE_CURRENT = 0 # Get current, not default value
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf)
return buf.value
return self._path_factory(get_my_documents())
class Wine(WindowsCommon[P]):
is_native_windows: bool = False
is_wine: bool = True
_wine_prefix: Path
def __init__(self, *, prefix, **kwds):
super().__init__(**kwds)
self._wine_prefix = prefix
@property
def wine_prefix(self) -> P:
return self._path_factory(self._wine_prefix)
@property
def drive_c(self) -> P:
return self._path_factory(self._wine_prefix / "drive_c")
@overload
@staticmethod
def _find_file_ci(path: Path, candidates: Sequence[str], exclude: None=None) -> List[Path]: ...
@overload
@staticmethod
def _find_file_ci(path: Path, candidates: None, exclude: Sequence[str]) -> List[Path]: ...
@overload
@staticmethod
def _find_file_ci(path: Path, candidates: Sequence[str], exclude: Sequence[str]) -> List[Path]: ...
@staticmethod
def _find_file_ci(path: Path, candidates: Optional[Sequence[str]]=None, exclude: Optional[Sequence[str]]=None) -> List[Path]:
entries: Dict[str, Path] = {p.name.lower(): p for p in path.iterdir() if p.is_dir()}
results: List[Path] = []
if candidates is not None:
def gen():
for name in candidates:
p = entries.get(name)
if p is not None:
yield p
results.extend(gen())
if exclude is not None:
results.extend((path for name, path in entries.items() if name not in exclude and path not in results))
return results
@CachedProperty
def _wine_prefix_userprofile(self) -> Path:
## Try to find out the username in the prefix
## usually, this is the same as the system user, but
## e.g. Proton always uses 'steamuser'
# XXX: make user name configurable or at least cache it?
# BUG: mypy#7781 overload staticmethod is broken when called on instance
candidates = self.__class__._find_file_ci(self._wine_prefix / "drive_c" / "users", [getuser().lower(), 'steamuser'], ['public'])
if not candidates:
raise FileNotFoundError(f"Could not detect userprofile path in wine prefix {self.wine_prefix}")
# XXX: be smarter?
return candidates[0]
@property
def home(self) -> P:
return self._path_factory(self._wine_prefix_userprofile)
@CachedProperty
def my_documents(self) -> P:
""" Get the Windows "My Documents" folder """
ppath = self._wine_prefix_userprofile
# BUG: mypy#7781 overload staticmethod is broken when called on instance
candidates = self.__class__._find_file_ci(ppath, ['my documents', 'documents'])
if not candidates:
raise FileNotFoundError(f"Could not find 'My Documents' folder in profile at '{ppath}'")
return self._path_factory(candidates[0])
class Linux(Common[P]):
is_native_linux: bool = True
is_native_windows: bool = False
is_windows: bool = False
is_wine: bool = False
## XDG
# XXX: make it methods and search all locations?
@CachedProperty
def xdg_config_dir(self) -> P:
raise NotImplementedError()
@CachedProperty
def xdg_data_dir(self) -> P:
raise NotImplementedError()
class CommonPaths:
class Mixin(AbstractCommonPaths.Common[Path]):
def _path_factory(self, p: PathOrStr) -> Path:
return Path(p)
class LinuxPaths(AbstractCommonPaths.Linux[Path], Mixin): pass
class WindowsPaths(AbstractCommonPaths.Windows[Path], Mixin): pass
class WinePaths(AbstractCommonPaths.Wine[Path], Mixin): pass
Paths = Union[LinuxPaths, WindowsPaths, WinePaths]
NativePaths = Union[LinuxPaths, WindowsPaths]
@overload
@classmethod
def create(c, parent: ISyncContext, wine_prefix: None) -> NativePaths: ...
@overload
@classmethod
def create(c, parent: ISyncContext, wine_prefix: Path) -> WinePaths: ...
@classmethod
def create(c, parent: ISyncContext, wine_prefix: Optional[Path]) -> Paths:
if wine_prefix is not None:
return c.WinePaths(parent=parent, prefix=wine_prefix)
elif sys.platform == 'win32':
return c.WindowsPaths(parent=parent)
else:
return c.LinuxPaths(parent=parent)
class CommonSyncPaths:
class Mixin(AbstractCommonPaths.Common[SyncPath]):
op: 'AbstractSyncOp'
def __init__(self, *, op: 'AbstractSyncOp', **kwds):
# Not sure why this complains. Maybe because of the **kwds?
super().__init__(parent=op.parent, **kwds) #type: ignore
self.op = op
def _path_factory(self, p: PathOrStr) -> SyncPath:
return SyncPath(self.op, p)
class LinuxPaths(AbstractCommonPaths.Linux[SyncPath], Mixin): pass
class WindowsPaths(AbstractCommonPaths.Windows[SyncPath], Mixin): pass
class WinePaths(AbstractCommonPaths.Wine[SyncPath], Mixin): pass
Paths = Union[LinuxPaths, WindowsPaths, WinePaths]
@classmethod
def create(c, op: 'AbstractSyncOp', wine_prefix: Optional[Path]) -> Paths:
if wine_prefix is not None:
return c.WinePaths(op=op, prefix=wine_prefix)
elif sys.platform == 'win32':
return c.WindowsPaths(op=op)
else:
return c.LinuxPaths(op=op)
### -----------------------------------------------------------------
# Sync Operation
### -----------------------------------------------------------------
_AbstractSyncOp = TypeVar("_AbstractSyncOp", bound="AbstractSyncOp")
class AbstractSyncOp(ISyncOp):
parent: ISyncContext
name: str # Abstract
def __init__(self, parent: ISyncContext):
self.parent = parent
# Paths
@CachedProperty
def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, None)
def __getattr__(self, name):
return getattr(self.paths, name)
# Properties
@SettableCachedProperty
def slug(self):
""" Name of the destination folder """
return self.name
@property
def target_path(self) -> Path:
""" Full path to copy saves to """
return self.parent.target_path / self.slug
def __call__(self: _AbstractSyncOp, func: Callable[[_AbstractSyncOp], Any]):
# For decorator use
self._report_begin()
return func(self)
# Actual Copy Logic
@staticmethod
def _do_copy(ops: List[Tuple[Path, Path]]) -> bool:
for src, dst in ops:
if not dst.parent.exists():
dst.parent.mkdir(parents=True)
print(" \033[36m%s -> %s\033[0m" % (src, dst))
shutil.copy2(src, dst)
return True
# UI
def _report_begin(self):
print("\033[34mNow Synchronizing App \033[36m%s\033[34m (%s)\033[0m"
% (self.name, self.__class__.__name__.replace("SyncOp", "")))
def report_error(self, msg: Iterable[str]):
print("\033[31m"+"\n".join(" " + l for l in msg)+"\033[0m")
class SteamSyncOp(AbstractSyncOp):
""" Sync Operation for Steam Apps """
app: App
def __init__(self, ssync, app):
super().__init__(ssync)
self.app = app
@CachedProperty
def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, self.app.compat_prefix if self.app.is_proton_app else None)
## Implement AbstractSyncOp
@property
def name(self):
return self.app.name
@property
def slug(self):
return self.app.install_dir
## Addidtional information available through Steam
@property
def game_directory(self) -> SyncPath:
return SyncPath(self, self.app.install_path)
## Steam Cloud
def steam_cloud_ufs(self) -> SyncMultiSet:
if "ufs" not in self.app.appinfo["appinfo"] or "savefiles" not in self.app.appinfo["appinfo"]["ufs"]:
raise ValueError("%r doesn't support Steam Cloud by way of UFS" % self.app)
sms = SyncMultiSet()
if sys.platform.startswith("win") or self.app.is_proton_app:
ufs_platform = "Windows"
elif sys.platform.startswith("linux"):
ufs_platform = "Linux"
else:
raise NotImplementedError("Steam Cloud UFS not (yet) supported on platform %s" % sys.platform)
for ufs_def in self.app.appinfo["appinfo"]["ufs"]["savefiles"].values():
# Filter by platform
if "platforms" in ufs_def and ufs_platform not in ufs_def["platforms"].values():
continue
# Find root anchor
root = ufs_def["root"]
if root == "WinMyDocuments":
path = self.paths.my_documents
elif root in ("LinuxHome", "MacHome"):
path = self.paths.home
else:
raise NotImplementedError("Steam Cloud UFS root %s not implemented for %r" % (root, self.app))
# Add relative path
# XXX: Should path be prefixed or included in the target. Are there even apps with multiple ufs entries?
# For now, take last component?
if "path" in ufs_def and ufs_def["path"]:
rpath = Path(ufs_def["path"])
if rpath.anchor:
# Fix paths with leading slash/backslash XXX: is this valid?
rpath = rpath.relative_to(rpath.anchor)
if len(rpath.parts) > 1:
path = path.prefix(rpath.parent)
path /= rpath.name
# Add files by pattern
sset = SyncSet(path)
sset.add(ufs_def["pattern"])
# XXX: what about platform and recursive keys?
sms.append(sset)
return sms
class GenericSyncOp(AbstractSyncOp):
""" Generic Sync Operation for Non-Steam Apps """
def __init__(self, parent, name):
super().__init__(parent)
self.name = name
class GenericFoundSyncOp(GenericSyncOp):
_found: Path
def __init__(self, parent, name, found: Path):
super().__init__(parent, name)
self._found = found
@property
def found(self) -> SyncPath:
return SyncPath(self, self._found)
class WineSyncOp(GenericFoundSyncOp):
_wine_prefix: Path
def __init__(self, parent, name, prefix: Path, found: Path):
super().__init__(parent, name, found)
self._wine_prefix = prefix
@CachedProperty
def paths(self) -> CommonSyncPaths.Paths:
return CommonSyncPaths.create(self, self._wine_prefix)
class SyncNoOp:
""" No-Op Sync Operation """
def __call__(self, func) -> None:
pass
def __bool__(self) -> bool:
return False
AppNotFound = SyncNoOp()
### -----------------------------------------------------------------
# Main Sync manager class
### -----------------------------------------------------------------
class NoSteamSync(ISyncContext):
target_path: Path
home_path: Path
def __init__(self, target_path: Path):
self.target_path = Path(target_path)
self.home_path = Path.home()
@CachedProperty
def paths(self) -> CommonPaths.NativePaths:
return CommonPaths.create(self, None)
def generic(self, name, find: Optional[Callable[[CommonPaths.NativePaths], Path]], *, platform=None) -> Union[GenericSyncOp, SyncNoOp]:
""" Non-Steam App """
if platform is None or platform in sys.platform:
if find is None:
return GenericSyncOp(self, name)
search_path = find(self.paths)
if search_path.exists():
return GenericFoundSyncOp(self, name, search_path)
return AppNotFound
def wine(self, name, prefixes: Sequence[PathOrStr], find: Callable[[CommonPaths.Paths], Path]) -> Union[WineSyncOp, GenericFoundSyncOp, SyncNoOp]:
"""
Works the same as .generic() on Windows, but additionally searches any number of Wine-Prefixes when not running on Windows
"""
if sys.platform == 'win32':
search_path = find(self.paths)
if search_path.exists():
return GenericFoundSyncOp(self, name, search_path)
else:
for prefix in prefixes:
prefixpath = Path(prefix)
paths = CommonPaths.create(self, prefixpath)
search_path = find(paths)
if search_path.exists():
return WineSyncOp(self, name, prefixpath, search_path)
return AppNotFound
class SteamSync(NoSteamSync):
steam: Steam
def __init__(self, target_path: Path, *, steam_path: Path = None):
super().__init__(target_path)
self.steam = Steam(steam_path)
# Get Information
@CachedProperty
def apps(self) -> List[App]:
return list(self.steam.apps)
# Get Sync Operation for a specific App
def by_id(self, appid: int) -> Union[SteamSyncOp, SyncNoOp]:
""" Steam App by AppID """
app = self.steam.get_app(appid)
if app is not None:
return SteamSyncOp(self, app)
else:
return AppNotFound
def by_name(self, pattern: str) -> Union[SteamSyncOp, SyncNoOp]:
""" Steam App by Name """
pt = re.compile(fnmatch.translate(pattern).rstrip("\\Z"), re.IGNORECASE)
app = None
for candidate in self.apps:
if pt.search(candidate.name):
if app is not None:
raise Exception("Encountered more than one possible App matching '%s'" % pattern)
app = candidate
if app is None:
return AppNotFound
return SteamSyncOp(self, app)