From 7c46de9fa487c4b2032bb21769af223860f86a04 Mon Sep 17 00:00:00 2001 From: Taeyeon Mori Date: Tue, 23 Nov 2021 04:31:10 +0100 Subject: [PATCH] patchdir: add update mode, remove backup dirs --- bin/patchdir | 309 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 243 insertions(+), 66 deletions(-) diff --git a/bin/patchdir b/bin/patchdir index 3ad0552..edf82a9 100755 --- a/bin/patchdir +++ b/bin/patchdir @@ -4,11 +4,13 @@ #pylint: disable=missing-module-docstring,missing-function-docstring import argparse +import fnmatch import pathlib import contextlib import os import ctypes import textwrap +import re import shlex import libarchive @@ -38,10 +40,10 @@ except AttributeError: ffi.c_int, ffi.check_int) try: - read_extract = ffi.read_extract + read_extract2 = ffi.read_extract2 except AttributeError: - read_extract = ffi.ffi("read_extract", - [ffi.c_archive_p, ffi.c_archive_entry_p, ffi.c_int], + read_extract2 = ffi.ffi("read_extract2", + [ffi.c_archive_p, ffi.c_archive_entry_p, ffi.c_archive_p], ffi.c_int, ffi.check_int) def to_bytes(path, encoding="utf-8"): @@ -89,11 +91,14 @@ def disk_reader(path=None, flags=0, lookup=True): ffi.read_free(ard_p) @contextlib.contextmanager -def file_writer_ext(filename): +def file_writer_ext(filename, target_filename=None): archive_p = ffi.write_new() try: bfn = to_bytes(filename, "fsencode") - write_set_format_filter_by_ext(archive_p, bfn) + if target_filename is None: + write_set_format_filter_by_ext(archive_p, bfn) + else: + write_set_format_filter_by_ext(archive_p, to_bytes(target_filename, "fsencode")) ffi.write_open_filename(archive_p, bfn) try: yield libarchive.write.ArchiveWrite(archive_p) @@ -102,40 +107,150 @@ def file_writer_ext(filename): finally: ffi.write_free(archive_p) -def extract_to_disk(entry: libarchive.entry.ArchiveEntry, path: os.PathLike=None): +def extract_entry(entry: libarchive.entry.ArchiveEntry, dst_p: libarchive.ffi.c_archive_p, path: os.PathLike=None): archive_p, entry_p = entry._archive_p, entry._entry_p #pylint:disable=protected-access if path: ffi.entry_update_pathname_utf8(entry_p, to_bytes(path, "utf-8")) - read_extract(archive_p, entry_p, - libarchive.extract.EXTRACT_TIME|libarchive.extract.EXTRACT_UNLINK) + read_extract2(archive_p, entry_p, dst_p) +def copy_entry(entry: libarchive.entry.ArchiveEntry, dst: libarchive.write.ArchiveWrite, path: os.PathLike=None, + *, bufsize=ffi.page_size*8): + archive_p, entry_p, dst_p = entry._archive_p, entry._entry_p, dst._pointer #pylint:disable=protected-access + if path: + ffi.entry_update_pathname_utf8(entry_p, to_bytes(path, "utf-8")) + # read_extract2 and write_data_block only supported with archive_write_disk :( + #read_extract2(archive_p, entry_p, dst_p) + ffi.write_header(dst_p, entry_p) + buf_p = ctypes.create_string_buffer(bufsize) + read_data, write_data = ffi.read_data, ffi.write_data + while (bytes_read := read_data(archive_p, buf_p, bufsize)) > 0: + write_data(dst_p, buf_p, bytes_read) + ffi.write_finish_entry(dst_p) + +def copy_all_entries(src: libarchive.read.ArchiveRead, dst: libarchive.write.ArchiveWrite) -> list[pathlib.Path]: + entries = [] + with libarchive.entry.new_archive_entry() as entry_p: + archive_p, dst_p = src._pointer, dst._pointer + buf_p = ctypes.create_string_buffer(bufsize) + read_data, write_data = ffi.read_data, ffi.write_data + while ffi.read_next_header2(archive_p, entry_p) != ARCHIVE_EOF: + entries.append(pathlib.Path(entry_pathname_w(entry_p))) + #read_extract2(archive_p, entry_p, dst_p) + ffi.write_header(dst_p, entry_p) + while (bytes_read := read_data(archive_p, buf_p, bufsize)) > 0: + write_data(dst_p, buf_p, bytes_read) + ffi.write_finish_entry(dst_p) + return entries + +def list_all_entries(src: libarchive.read.ArchiveRead) -> list[pathlib.Path]: + entries = [] + with libarchive.entry.new_archive_entry() as entry_p: + src_p = src._pointer + read_next_header2, ARCHIVE_EOF, entry_pathname_w, Path = \ + ffi.read_next_header2, ffi.ARCHIVE_EOF, ffi.entry_pathname_w, pathlib.Path + while read_next_header2(src_p, entry_p) != ffi.ARCHIVE_EOF: + entries.append(Path(entry_pathname_w(entry_p))) + return entries # Main +def confirm(msg, code=0): + print(msg) + reply = input("Continue? [n] ") + if not (res := reply.lower().startswith("y")) and code != 0: + sys.exit(code) + return res + +def glob_compile(pattern): + return re.compile(fnmatch.translate(pattern)) + def parse_args(argv): parser = argparse.ArgumentParser(prog=argv[0], description=""" Patch a folder structure with files from an archive. This will replace existing files with those of the same name in an archive, with the option to back up the old versions and generate a script to revert the changes. """) - parser.add_argument("-p", "--strip", type=int, default=0, - help="Strip NUM leading components from archived file names.") + parser.add_argument("archive", type=pathlib.Path, + help="Achive file name") parser.add_argument("-C", "--directory", type=pathlib.Path, default=pathlib.Path("."), help="Operate in ") - parser.add_argument("-b", "--backup", type=pathlib.Path, default=None, - help="Create backup copies of overwritten files") - parser.add_argument("-B", "--backup-archive", type=pathlib.Path, default=None, - help="Create an archive of the original files") - parser.add_argument("-m", "--match", default=None, - help="Only extract files matching GLOB", metavar="GLOB") - parser.add_argument("-u", "--uninstall-script", type=pathlib.Path, default=None, - help="Filename to save an uninstall-scipt to.", metavar="FILE") parser.add_argument("-n", "--dry-run", action="store_true", help="Perform a dry run") - parser.add_argument("archive", help="Achive file name") - - return parser.parse_args(argv[1:]) - - + parser.add_argument("--noconfirm", action="store_true", + help="Don't ask for confirmation on possibly dangerous operations") + target = parser.add_argument_group("Target") + target.add_argument("-p", "--strip", type=int, default=0, + help="Strip N leading path components from archived file names.", metavar="N") + target.add_argument("-P", "--prefix", type=pathlib.Path, default=None, + help="Prepend leading path components to archived file names.") + fmatch = target.add_mutually_exclusive_group() + fmatch.add_argument("-m", "--match", dest="match", type=glob_compile, default=None, + help="Only extract files matching GLOB", metavar="GLOB") + fmatch.add_argument("-M", "--regex", dest="match", type=re.compile, default=None, + help="Only extract files matching REGEX", metavar="REGEX") + backup = parser.add_argument_group("Backup") + backup.add_argument("-B", "--backup-archive", type=pathlib.Path, default=None, + help="Create an archive of the original files", metavar="FILE") + backup.add_argument("-u", "--uninstall-script", type=pathlib.Path, default=None, + help="Filename to save an uninstall-scipt to. Shoul be combined with -B", metavar="FILE") + backup.add_argument("-a", "--append", action="store_true", default=False, + help="Update existing backup files") + preset = parser.add_mutually_exclusive_group() + preset.add_argument("-U", "--uninstall-preset", action="store_const", dest="preset", const="uninstall", + help="Short-hand for '-B %%n.uninstall.tar.xz -u %%n.uninstall.sh', %%n being the archive basename") + preset.add_argument("-A", "--append-preset", action="store_const", dest="preset", const="append", + help="Short-hand for '-a -B patchdir_backup.tar.xz -u patchdir_restore.sh") + + args = parser.parse_args(argv[1:]) + + if args.preset == "uninstall": + # Ignore when combined with explicit -u and -B; allows general-purpose alias patchdir='patchdir -U' + if args.backup_archive is None: + args.backup_archive = pathlib.Path(f"{args.archive.stem}.uninstall.tar.xz") + if args.uninstall_script is None: + args.uninstall_script = pathlib.Path(f"{args.archive.stem}.uninstall.sh") + elif args.preset == "append": + args.append = True + if args.backup_archive is None: + args.backup_archive = pathlib.Path("patchdir_backup.tar.xz") + if args.uninstall_script is None: + args.uninstall_script = pathlib.Path("patchdir_restore.sh") + + if not args.noconfirm: + if not args.uninstall_script or not args.backup_archive: + confirm("Original files will be lost without -B and u. Consider using -U.", 1) + + return args + +# Parsing uninstall scripts +SCRIPT_TAG = "#> patchdir restore script v1 <#" +SCRIPT_BACKUP_CMDS = {"restore"} +SCRIPT_REMOVE_CMDS = {"remove"} +SCRIPT_DIR_CMDS = {"remove_dir"} +SCRIPT_META_CMDS = {"cleanup"} +SCRIPT_CMDS = SCRIPT_BACKUP_CMDS|SCRIPT_REMOVE_CMDS|SCRIPT_DIR_CMDS|SCRIPT_META_CMDS + +def read_script(path): + # Relies on ordered dictionary implementation + is_script = False + operations = {} + with open(path) as f: + for ln, line in enumerate(f): + if line.strip() == SCRIPT_TAG: + is_script = True + elif is_script: + cmd = shlex.split(line) + if len(cmd) != 2 or cmd[0] not in SCRIPT_CMDS: + raise ValueError(f"Failed to read invalid line {ln} in script {path}", line) + if cmd not in SCRIPT_META_CMDS: + operations[pathlib.Path(cmd[1])] = cmd[0] + if not is_script: + raise ValueError("Script for appending {path} doesn't seem to be a pathdir script") + return operations + +def write_script(fp, cmd, path): + fp.write(f"{cmd:10s} {shlex.quote(str(path))}\n") + +# Helpers def makedirs(path, dryrun=False): if path.is_dir(): return set() @@ -156,27 +271,51 @@ def makedirs(path, dryrun=False): return created - def main(argv): args = parse_args(argv) with contextlib.ExitStack() as ctx: # archive name is the only argument not affected by -C - archive = ctx.enter_context(libarchive.file_reader(args.archive)) + archive = ctx.enter_context(libarchive.file_reader(str(args.archive))) # Change directory to target os.chdir(args.directory) - if args.backup_archive and os.path.exists(args.backup_archive): - print("\033[31mError: Backup archive file already exist\033[0m") - return 3 + do_append = False + if args.append: + # Check consistency for append + checks = [p.exists() + for p in (args.backup_archive, args.uninstall_script) + if p is not None] + do_append = any(check) + if do_append and not all(check): + raise FileExistsError("Inconsistency in existing files to append to.") + else: + if args.backup_archive and args.backup_archive.exists(): + if args.noconfirm: + print("\033[31mError: Backup archive file already exist\033[0m") + return 3 + else: + confirm("Backup archive already exists.", 3) uninstall_script = None + operations = {} folders = set() files = set() if args.uninstall_script: - uninstall_script = ctx.enter_context(open(args.uninstall_script, "x")) - os.chmod(uninstall_script.fileno(), 0o755) + if do_append: + operations = read_script(args.uninstall_script) + elif args.uninstall_script.exists(): + if args.noconfirm: + print("\033[31mError: Uninstall script file already exist\033[0m") + return 3 + else: + confirm("Uninstall script already exists. Overwrite?", 3) + if not args.dry_run: + uninstall_script = ctx.enter_context(open(args.uninstall_script, "w")) + os.chmod(uninstall_script.fileno(), 0o755) + else: + uninstall_script = ctx.enter_context(open("/dev/null", "w")) uninstall_script_dir = args.uninstall_script.parent.resolve() uninstall_script.write(textwrap.dedent(f"""\ @@ -186,7 +325,6 @@ def main(argv): cd "`dirname "$0"`/{pathlib.Path.cwd().relative_to(uninstall_script_dir)}" - BACKUP_DIR={f"'{args.backup}'" if args.backup else ' # No backup directory was created (-b)'} BACKUP_ARCHIVE={f"'{args.backup_archive}'" if args.backup_archive else ' # No backup archive was created (-B)'} remove() {{ @@ -196,12 +334,7 @@ def main(argv): remove_dir() {{ rmdir "$1" || echo "\\033[34mFailed to remove folder $1\\033[0m" }} - if [ -n "$BACKUP_DIR" -a -d "$BACKUP_DIR" ]; then - restore() {{ - echo "Restoring $1 from $BACKUP_DIR" - mv "$BACKUP_DIR/$1" "$1" - }} - elif [ -n "$BACKUP_ARCHIVE" -a -f "$BACKUP_ARCHIVE" ]; then + if [ -n "$BACKUP_ARCHIVE" -a -f "$BACKUP_ARCHIVE" ]; then restore() {{ echo "Restoring $1 from $BACKUP_ARCHIVE" bsdtar -Uqxf "$BACKUP_ARCHIVE" "$1" @@ -211,36 +344,84 @@ def main(argv): echo "Removing $1 \\033[31m[Previously existing file is lost]\\033[0m" rm "$1" }} - elif ! grep -qE "restore\\s+[\\"'].+[\\"']" "$0"; then + elif ! grep -qE "restore\\s+([\\"'].+[\\"']|(?\\!script)\\S+\\s*(#.*)?\\$)" "$0"; then : No files were overwritten else echo "\\033[31mError: Cannot restore original files because no backup is available\\033[0m" echo "Set PATCHDIR_LOSE_FILES=1 to remove changed files without restoring originals" exit 66 fi + if [ "$1" = "-k" -o -n "$PATCHDIR_KEEP_BACKUP" ]; + cleanup() {{ + echo "Keeping $1" + }} + else + cleanup() {{ + echo "Cleaning up $1" + rm "$1" + }} + fi + {SCRIPT_TAG} # Restore Files # """)) - backup_dir = None - backup_folders = set() - if args.backup: - backup_dir = args.backup + if operations: + for cmd, path in operations: + if cmd not in SCRIPT_DIR_CMDS: + write_script(uninstall_script, cmd, path) backup_archive = None + backup_archive_prepended = set() if args.backup_archive: - backup_archive = ctx.enter_context(file_writer_ext(args.backup_archive)) + if do_append: + # Prepend old contents + backup_archive_tmpname = args.backup_archive.with_name(args.backup_archive.name + ".tmp") + if not args.dry_run: + backup_archive = ctx.enter_context(file_writer_ext(backup_archive_tmpname, args.backup_archive)) + else: + backup_archive = ctx.enter_context(file_writer_ext("/dev/null", args.backup_archive)) + with libarchive.file_reader(str(args.backup_archive)) as ar: + backup_archive_prepended.update(copy_all_entries(ar, backup_archive)) + else: + if not args.dry_run: + backup_archive = ctx.enter_context(file_writer_ext(args.backup_archive)) + else: + backup_archive = ctx.enter_context(file_writer_ext("/dev/null", args.backup_archive)) disk_rdr = ctx.enter_context(disk_reader()) + # Check consistency + prev_restore_files = set() + prev_remove_files = set() + for op, path in operations: + if op in SCRIPT_BACKUP_CMDS: + if backup_archive and path not in backup_archive_prepended: + print("\033[34mWarn: {path} referenced in uninstall script but missing in backup archive\033[0m") + prev_restore_files.add(path) + elif op in SCRIPT_REMOVE_CMDS: + prev_remove_files.add(path) + elif op == "remove_dir": + folders.add(path) + + # Open disk for writing + if not args.dry_run: + extract = ctx.enter_context(libarchive.extract.new_archive_write_disk( + libarchive.extract.EXTRACT_TIME|libarchive.extract.EXTRACT_UNLINK)) + + root = pathlib.PurePath('/') for entry in archive: - epath = pathlib.Path(entry.path) + if args.match: + if not args.match.match('/'+entry.path): + continue - if args.match and not epath.match(args.match): - continue + epath = pathlib.Path(entry.path) if args.strip: epath = pathlib.Path(*epath.parts[args.strip:]) + if args.prefix: + epath = args.prefix / epath + if entry.isdir: folders |= makedirs(epath, args.dry_run) else: @@ -254,37 +435,33 @@ def main(argv): # Backup if backup_archive: print(f"Backing up {epath} to {args.backup_archive}") - disk_rdr.add_file_to_archive(backup_archive, epath) - if backup_dir: - print(f"Moving old {epath} to {args.backup}") - bpath = args.backup.joinpath(epath) - backup_folders |= makedirs(bpath.parent, args.dry_run) - if not args.dry_run: - os.rename(epath, bpath) - if uninstall_script: - uninstall_script.write(f"restore '{epath}'\n") + # Skip if an older version in archive or it originally didn't exist + if epath not in backup_archive_prepended and epath not in prev_remove_files: + disk_rdr.add_file_to_archive(backup_archive, epath) + if uninstall_script and epath not in operations: + write_script(uninstall_script, "restore", epath) elif uninstall_script: - uninstall_script.write(f"remove '{epath}'\n") + write_script(uninstall_script, "remove", epath) print(f"Extracting {epath}") if not args.dry_run: - extract_to_disk(entry, epath) + extract_entry(entry, extract, epath) - if uninstall_script and (folders or backup_folders): - uninstall_script.write("\n# Remove folders #\n") + if do_append and backup_archive and not args.dry_run: + os.rename(backup_archive_tmpname, args.backup_archive) - if backup_folders: - uninstall_script.write('if [ -n "$BACKUP_DIR" -a -d "$BACKUP_DIR" ]; then\n') - for dname in sorted(backup_folders, key=lambda x: len(x.parts), reverse=True): - uninstall_script.write(f"\tremove_dir '{dname}'\n") - uninstall_script.write('fi\n') + if uninstall_script and folders: + uninstall_script.write("\n# Remove folders #\n") if folders: for dname in sorted(folders, key=lambda x: len(x.parts), reverse=True): - uninstall_script.write(f"remove_dir '{dname}'\n") + write_script(uninstall_script, "remove_dir", dname) if uninstall_script: - uninstall_script.write(f"\n# Remove script #\nremove '{args.uninstall_script}'\n") + uninstall_script.write(f"\n# Remove script #\n") + write_script(uninstall_script, "cleanup", args.uninstall_script) + if args.backup_archive: + write_script(uninstall_script, "cleanup", args.backup_archive) return 0