[python] add steamutil, steamsync

Taeyeon Mori 5 years ago
parent 8e814cf891
commit 7c0cd9ca95
  1. 42
  2. 246
  3. 374
  4. 14

@ -0,0 +1,42 @@
#!/usr/bin/env python3
from pathlib import Path
from steamsync import SteamSync, SyncOp
# Find home
steamns_home = Path("~/.local/steam/home/taeyeon/").expanduser()
if steamns_home.exists():
home = steamns_home
home = Path("~").expanduser()
sync = SteamSync(Path("~/Nextcloud/Misc/Savegames").expanduser())
def zanzarah(op):
with op.game_directory.prefix("Save") as set:
set += "*"
def fs19(op):
with op.my_documents.prefix("My Games/FarmingSimulator2019") as set:
#@sync.by_name("Fell Seal")
def fell_seal(op):
with op.from_(home).prefix("Fell Seal") as set:
set += "saves"
set += "customdata"

@ -0,0 +1,246 @@
#!/usr/bin/env python3
import sys, os
import fnmatch
import re
import itertools
import shutil
from pathlib import Path
from typing import Tuple, Dict, List, Union, Set, Callable, Any
from steamutil import Steam, App, CachedProperty
class AppNotFoundType:
def __call__(self, func) -> None:
def __bool__(self) -> bool:
return False
AppNotFound = AppNotFoundType()
class SyncPath:
__slots__ = "op", "local", "common"
op: 'SyncOp'
local: Path
common: Path
def __init__(self, op, local, common="."):
self.op = op
self.local = Path(local)
self.common = Path(common)
def prefix(self, component: Union[str, Path]) -> 'SyncPath':
return SyncPath(self.op, self.local / component, self.common)
def __div__(self, component: Union[str, Path]) -> 'SyncPath':
return SyncPath(self.op, self.local, self.common / component)
def path(self) -> Path:
return self.local / self.common
def target_path(self) -> Path:
return self.op.target_path / self.common
def __enter__(self) -> 'SyncSet':
return SyncSet(self)
def __exit__(self, type, value, traceback):
# Todo: auto-commit?
class SyncSet:
FileStatSet = Dict[Path, Tuple[Path, os.stat_result]]
op: 'SyncOp'
spath: SyncPath
local: FileStatSet
target: FileStatSet
changeset: int
def __init__(self, path):
self.op = path.op
self.spath = path
self.local = {}
self.target = {}
def path(self) -> Path:
return self.spath.path
def target_path(self) -> Path:
return self.spath.target_path
# Modify inclusion
def _collect_files(anchor: Path, patterns: List[str]):
files: SyncSet.FileStatSet = {}
def add_file(f):
if f.is_file():
files[f.relative_to(anchor)] = f, f.stat()
elif f.is_dir():
for f in f.iterdir():
for f in set(itertools.chain.from_iterable(anchor.glob(g) for g in patterns)):
return files
def add(self, *patterns):
self.local.update(self._collect_files(self.path, patterns))
self.target.update(self._collect_files(self.target_path, patterns))
def __iadd__(self, pattern: str) -> 'SyncSet':
return self
# Calculate changes
def _inval(self):
for cache in "files_from_local", "files_from_target", "files_unmodified":
if cache in self.__dict__:
del self.__dict__[cache]
def _sync_set(src_files: FileStatSet, dst_files: FileStatSet) -> Set[Path]:
Return a set of files that need to be updated from src to dst.
return {f
for f, (_, sst) in src_files.items()
if f not in dst_files or sst.st_mtime > dst_files[f][1].st_mtime
def files_from_local(self) -> Set[Path]:
return self._sync_set(self.local, self.target)
def files_from_target(self) -> Set[Path]:
return self._sync_set(self.target, self.local)
def files_unmodified(self) -> Set[Path]:
return (self.local.keys() | self.target.keys()) - (self.files_from_local | self.files_from_target)
def show_confirm(self) -> bool:
# XXX: move to SyncOp?
print(" Local is newer: ", ", ".join(map(str, self.files_from_local)))
print(" Target is newer: ", ", ".join(map(str, self.files_from_target)))
print(" Unmodified: ", ", ".join(map(str, self.files_unmodified)))
print("Press enter to continue")
return True # TODO: Proper thingey
def execute(self, *, make_inconsistent=False) -> bool:
operations = []
if self.files_from_local:
if self.files_from_target and not make_inconsistent:
self.op.report_error(["Both sides have changed files. Synchronizing would lead to inconsistent and possibly broken savegames."])
return False
operations += [(self.path / p, self.target_path / p) for p in self.files_from_local] #pylint:disable=not-an-iterable
if self.files_from_target:
operations += [(self.target_path / p, self.path / p) for p in self.files_from_target] #pylint:disable=not-an-iterable
return self.op._do_copy(operations)
class SyncOp:
parent: 'SteamSync'
app: App
def __init__(self, ssync, app):
self.parent = ssync
self.app = app
def __call__(self, func: Callable[['SyncOp'],Any]):
# For decorator use
return func(self)
def _do_copy(self, ops: List[Tuple[Path, Path]]) -> bool:
for src, dst in ops:
if not dst.parent.exists():
print(" \033[36m%s -> %s\033[0m" % (src, dst))
shutil.copy2(src, dst)
return True
# UI
def _report_begin(self):
print("\033[34mNow Synchronizing App %s\033[0m" % self.app.name)
def report_error(self, msg: List[str]):
print("\033[31m"+"\n".join(" " + l for l in msg)+"\033[0m")
def target_path(self) -> Path:
return self.parent.target_path / self.app.install_dir
def my_documents(self) -> SyncPath:
""" Get the Windows "My Documents" folder """
if sys.platform.startswith("linux"):
# TODO: what about native games?
return SyncPath(self, self.app.compat_drive / "users/steamuser/My Documents")
elif sys.platform == "win32":
def get_my_documents():
import ctypes.wintypes
CSIDL_PERSONAL = 5 # My Documents
SHGFP_TYPE_CURRENT = 0 # Get current, not default value
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf)
return buf.value
return SyncPath(self, get_my_documents())
raise Exception("Platform not supported")
def game_directory(self) -> SyncPath:
return SyncPath(self, self.app.install_path)
def from_(self, path: Path) -> SyncPath:
return SyncPath(self, path)
class SteamSync:
target_path: Path
steam: Steam
def __init__(self, target_path: Path, *, steam_path: Path = None):
self.target_path = Path(target_path)
self.steam = Steam(steam_path)
def apps(self) -> List[App]:
return list(self.steam.apps)
def by_id(self, appid):
app = self.steam.get_app(appid)
if app is not None:
return SyncOp(self, app)
return AppNotFound
def by_name(self, pattern):
pt = re.compile(fnmatch.translate(pattern).rstrip("\\Z"), re.IGNORECASE)
app = None
for candidate in self.apps: #pylint:disable=not-an-iterable
if pt.search(candidate.name):
if app is not None:
raise Exception("Encountered more than one possible App matching '%s'" % pattern)
app = candidate
return SyncOp(self, app)

@ -0,0 +1,374 @@
# Discover Steam install and games
# (c) 2020 Taeyeon Mori CC-BY-SA
import sys, os
import re, fnmatch, datetime
from pathlib import Path
from typing import List, Iterable, Dict, Tuple, Callable, Optional, Union
from vdfparser import VdfParser, DeepDict
class CachedProperty:
""" A property that is only computed once per instance and then replaces
itself with an ordinary attribute. Deleting the attribute resets the
Source: https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
def __init__(self, func):
self.__doc__ = getattr(func, '__doc__')
self.func = func
def __get__(self, obj, cls):
if obj is None:
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
class DictPathRoProperty:
__slots__ = "property", "path", "default"
_nodefault = object()
def __init__(self, property: str, path: Tuple[str], default=_nodefault):
self.property = property
self.path = path
self.default = default
def __get__(self, obj, cls):
if obj is None:
return self
d = getattr(obj, self.property)
for pc in self.path:
d = d[pc]
except KeyError:
if self.default is not self._nodefault:
return self.default
return d
class DictPathProperty(DictPathRoProperty):
__slots__ = ()
def _get_create_parent(self, obj) -> Dict:
d = getattr(obj, self.property)
for pc in self.path[:-1]:
d = d[pc]
except KeyError:
nd = {}
d[pc] = nd
d = nd
return d
def __set__(self, obj, value):
self._get_create_parent(obj)[self.path[-1]] = value
def __delete__(self, obj):
del self._get_create_parent(obj)[self.path[-1]]
_vdf = VdfParser()
class App:
steam: 'Steam'
library_folder: 'LibraryFolder'
steamapps_path: Path
manifest_path: Path
manifest: DeepDict
def __init__(self, libfolder, manifest_path: Path, *, manifest_data=None):
self.steam = libfolder.steam
self.library_folder = libfolder
self.steamapps_path = libfolder.steamapps_path
self.manifest_path = manifest_path
if manifest_data is None:
with open(manifest_path) as f:
self.manifest = _vdf.parse(f)
self.manifest = manifest_data
def __repr__(self):
return "<steamutil.App %d '%s' @ \"%s\">" % (self.appid, self.name, self.install_path)
# Basic info
def appid(self) -> int:
return int(self.manifest["AppState"]["appid"])
name = DictPathRoProperty("manifest", ("AppState", "name"))
language = DictPathRoProperty("manifest", ("AppState", "UserConfig", "language"), None)
install_dir = DictPathRoProperty("manifest", ("AppState", "installdir"))
def install_path(self) -> Path:
return self.steamapps_path / "common" / self.install_dir
def get_userdata_path(self, user_id: Union[int, 'LoginUser']) -> Path:
return self.steam.get_userdata_path(user_id) / str(self.appid)
# Workshop
def workshop_path(self) -> Path:
return self.steamapps_path / "workshop" / "content" / str(self.appid)
# Steam Play info
def compat_path(self) -> Path:
return self.steamapps_path / "compatdata" / str(self.appid)
def compat_drive(self) -> Path:
return self.compat_path / "pfx" / "drive_c"
# Install size
declared_install_size = DictPathRoProperty("manifest", ("AppState", "SizeOnDisk"), 0)
def compute_install_size(self) -> int:
def sum_size(p: Path):
acc = 0
for x in p.iterdir():
if x.is_dir():
acc += sum_size(x)
acc += x.stat().st_size
return acc
return sum_size(self.install_path)
class LibraryFolder:
steam: 'Steam'
path: Path
def __init__(self, steam: 'Steam', path: Path):
self.steam = steam
self.path = path
def __repr__(self):
return "<steamutil.LibraryFolder @ \"%s\">" % self.path
# Paths
def steamapps_path(self) -> Path:
steamapps = self.path / "steamapps"
if not steamapps.exists():
# Emulate case-insensitivity
cased = self.path / "SteamApps"
if cased.exists():
steamapps = cased
# try to find other variation
found = [d for d in self.path.iterdir() if d.is_dir() and d.name.lower() == "steamapps"]
if len(found) > 1:
raise Exception("More than one steamapps folder in library folder", self.path)
elif found:
return found[0]
# if none exists, return non-existant default name
return steamapps
def common_path(self) -> Path:
return self.steamapps_path / "common"
def appmanifests(self) -> Iterable[Path]:
return self.steamapps_path.glob("appmanifest_*.acf") # pylint:disable=no-member
def apps(self) -> Iterable[App]:
return (App(self, mf) for mf in self.appmanifests)
def get_app(self, appid: int) -> Optional[App]:
manifest = self.steamapps_path / ("appmanifest_%d.acf" % appid)
if manifest.exists():
return App(self, manifest)
def find_apps_re(self, regexp: str) -> Iterable[App]:
reg = re.compile(r'"name"\s+".*%s.*"' % regexp, re.IGNORECASE)
for manifest in self.appmanifests: #pylint:disable=not-an-iterable
with open(manifest) as f:
content = f.read()
if reg.search(content):
yield App(self, manifest, manifest_data=_vdf.parse_string(content))
def find_apps(self, pattern: str) -> Iterable[App]:
return self.find_apps_re(fnmatch.translate(pattern).rstrip("\\Z"))
class UserAppConfig:
user: 'LoginUser'
appid: int
def __init__(self, user, appid):
self.user = user
self.appid = appid
def __repr__(self):
return "<steamutil.UserAppConfig appid=%d for account %s>" % (self.appid, self.user.account_name)
def _data(self):
return self.user.localconfig["UserLocalConfigStore"]["Software"]["Valve"]["Steam"]["Apps"][str(self.appid)]
except KeyError:
return {} # TODO
def last_played(self) -> datetime.datetime:
return datetime.datetime.fromtimestamp(int(self._data.get("LastPlayed", "0")))
def playtime(self) -> datetime.time:
t = int(self._data.get("Playtime", "0"))
return datetime.time(t // 60, t % 60)
def playtime_two_weeks(self) -> datetime.time:
t = int(self._data.get("Playtime2wks", "0"))
return datetime.time(t // 60, t % 60)
launch_options = DictPathProperty("_data", ("LaunchOptions",), None)
class LoginUser:
steam: 'Steam'
id: int
info: Dict[str, str]
def __init__(self, steam, id: int, info: Dict):
self.steam = steam
self.id = id
self.info = info
def __repr__(self):
return "<steamutil.LoginUser %d %s '%s'>" % (self.id , self.account_name, self.username)
def account_id(self):
""" 32-bit account ID """
return self.id & 0xffffffff
account_name = DictPathRoProperty("info", ("AccountName",))
username = DictPathRoProperty("info", ("PersonaName",))
def userdata_path(self) -> Path:
return self.steam.get_userdata_path(self)
def localconfig_vdf(self) -> Path:
return self.userdata_path / "config" / "localconfig.vdf"
def localconfig(self) -> DeepDict:
with open(self.localconfig_vdf) as f:
return _vdf.parse(f)
# Game config
def get_app_config(self, app: Union[int, App]) -> Optional[UserAppConfig]:
if isinstance(app, App):
app = app.appid
return UserAppConfig(self, app)
class Steam:
root: Path
def __init__(self, install_path=None):
self.root = install_path if install_path is not None else self.find_install_path()
if self.root is None:
raise Exception("Could not find Steam")
def __repr__(self):
return "<steamutil.Steam @ \"%s\">" % self.root
def find_install_path() -> Optional[Path]:
# TODO: Windows
# Linux
if sys.platform.startswith("linux"):
# Try ~/.steam first
dotsteam = Path(os.path.expanduser("~/.steam"))
if dotsteam.exists():
steamroot = (dotsteam / "root").resolve()
if steamroot.exists():
return steamroot
# Try ~/.local/share/Steam, classic ~/Steam
data_dir = Path(os.environ.get("XDG_DATA_HOME", "~/.local/share")).expanduser()
for path in data_dir, Path("~").expanduser():
for name in "Steam", "SteamBeta":
steamroot = path / name
if steamroot.exists():
return steamroot
# Various paths
def libraryfolders_vdf(self) -> Path:
""" The libraryfolders.vdf file listing all configured library locations """
return self.root / "steamapps" / "libraryfolders.vdf"
def config_vdf(self) -> Path:
return self.root / "config" / "config.vdf"
def loginusers_vdf(self) -> Path:
return self.root / "config" / "loginusers.vdf"
# Users
def most_recent_user(self) -> Optional[LoginUser]:
with open(self.loginusers_vdf) as f:
data = _vdf.parse(f)
for id, info in data["users"].items():
if info["mostrecent"] == "1":
return LoginUser(self, int(id), info)
except KeyError:
return None
def get_userdata_path(self, user_id: Union[int, LoginUser]) -> Path:
if isinstance(user_id, LoginUser):
user_id = user_id.account_id
return self.root / "userdata" / str(user_id)
# Game/App Library
def library_folder_paths(self) -> List[Path]:
with open(self.libraryfolders_vdf) as f:
return [Path(v) for k,v in _vdf.parse(f)["LibraryFolders"].items() if k.isdigit()]
def library_folders(self) -> List[LibraryFolder]:
return [LibraryFolder(self, self.root)] + [LibraryFolder(self, p) for p in self.library_folder_paths] #pylint:disable=not-an-iterable
def apps(self) -> Iterable[App]:
for lf in self.library_folders: #pylint:disable=not-an-iterable
yield from lf.apps
def get_app(self, id: int) -> Optional[App]:
for lf in self.library_folders: #pylint:disable=not-an-iterable
app = lf.get_app(id)
if app is not None:
return app
def find_apps(self, pattern: str) -> Iterable[App]:
for lf in self.library_folders: #pylint:disable=not-an-iterable
yield from lf.find_apps(pattern)
def find_app(self, pattern: str) -> Optional[App]:
for app in self.find_apps(pattern):
return app

@ -1,12 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# Parse Steam/Source VDF Files # Parse Steam/Source VDF Files
# Reference: https://developer.valvesoftware.com/wiki/KeyValues#File_Format # Reference: https://developer.valvesoftware.com/wiki/KeyValues#File_Format
# (c) 2015 Taeyeon Mori; CC-BY-SA # (c) 2015-2020 Taeyeon Mori; CC-BY-SA
from __future__ import unicode_literals from __future__ import unicode_literals
import io import io
from typing import Dict, Union
DeepDict = Dict[str, Union[str, "DeepDict"]]
class VdfParser: class VdfParser:
""" """
@ -113,13 +117,13 @@ class VdfParser:
else: else:
current.append(c) current.append(c)
def parse(self, fd): def parse(self, fd) -> DeepDict:
""" """
Parse a VDF file into a python dictionary Parse a VDF file into a python dictionary
""" """
return self._parse_map(fd) return self._parse_map(fd)
def parse_string(self, content): def parse_string(self, content) -> DeepDict:
""" """
Parse the content of a VDF file Parse the content of a VDF file
""" """
@ -137,7 +141,7 @@ class VdfParser:
def write(str=None, i=False, d=False, nl=False): def write(str=None, i=False, d=False, nl=False):
if str: if str:
fd.write(str) fd.write(str)
if delim: if d:
fd.write(" ") fd.write(" ")
else: else:
@ -165,7 +169,7 @@ class VdfParser:
write(self._make_literal(v)) write(self._make_literal(v))
write(d=1, nl=1) write(d=1, nl=1)
def write(self, fd, dictionary, *, pretty=True): def write(self, fd, dictionary: DeepDict, *, pretty=True):
""" """
Write a dictionary to a file in VDF format Write a dictionary to a file in VDF format
""" """
