#!/usr/bin/env python3 # (c) 2017-2021 Taeyeon Mori # Requires libarchive-c #pylint: disable=missing-module-docstring,missing-function-docstring import argparse import pathlib import contextlib import os import ctypes import textwrap import shlex import libarchive # Extend libarchive-c from libarchive import ffi try: entry_copy_sourcepath = ffi.entry_copy_sourcepath except AttributeError: entry_copy_sourcepath = ffi.ffi("entry_copy_sourcepath", [ffi.c_archive_entry_p, ffi.c_char_p], None) try: read_disk_entry_from_file = ffi.read_disk_entry_from_file except AttributeError: read_disk_entry_from_file = ffi.ffi("read_disk_entry_from_file", [ffi.c_archive_p, ffi.c_archive_entry_p, ffi.c_int, ffi.c_void_p], ffi.c_int, ffi.check_int) try: write_set_format_filter_by_ext = ffi.write_set_format_filter_by_ext except AttributeError: write_set_format_filter_by_ext = ffi.ffi("write_set_format_filter_by_ext", [ffi.c_archive_p, ffi.c_char_p], ffi.c_int, ffi.check_int) try: read_extract = ffi.read_extract except AttributeError: read_extract = ffi.ffi("read_extract", [ffi.c_archive_p, ffi.c_archive_entry_p, ffi.c_int], ffi.c_int, ffi.check_int) def to_bytes(path, encoding="utf-8"): if encoding == "fsencode": return os.fsencode(path) if isinstance(path, os.PathLike): path = os.fspath(path) if isinstance(path, bytes): return path if isinstance(path, str): return path.encode(encoding) raise ValueError(f"Don't know how to convert {path} to bytes") class _DiskRead(libarchive.read.ArchiveRead): def add_file_to_archive(self, archive: libarchive.write.ArchiveWrite, path: os.PathLike, archive_name: os.PathLike=None, *, bufsize=ffi.page_size*8): archive_p = archive._pointer #pylint:disable=protected-access with open(path, 'rb') as fp: #pylint:disable=invalid-name with libarchive.entry.new_archive_entry() as entry_p: if archive_name: entry_copy_sourcepath(entry_p, to_bytes(path, "fsencode")) ffi.entry_update_pathname_utf8(entry_p, to_bytes(archive_name, "utf-8")) else: ffi.entry_update_pathname_utf8(entry_p, to_bytes(path, "utf-8")) read_disk_entry_from_file(self._pointer, entry_p, fp.fileno(), None) ffi.write_header(archive_p, entry_p) buf = bytearray(bufsize) buf_ptr = ctypes.byref(ctypes.c_char.from_buffer(buf)) write_data = ffi.write_data while (bytes_read := fp.readinto(buf)) > 0: write_data(archive_p, buf_ptr, bytes_read) ffi.write_finish_entry(archive_p) @contextlib.contextmanager def disk_reader(path=None, flags=0, lookup=True): ard_p = ffi.read_disk_new() try: ffi.read_disk_set_behavior(ard_p, flags) if lookup: ffi.read_disk_set_standard_lookup(ard_p) if path: ffi.read_disk_open_w(ard_p, path) yield _DiskRead(ard_p) finally: ffi.read_free(ard_p) @contextlib.contextmanager def file_writer_ext(filename): archive_p = ffi.write_new() try: bfn = to_bytes(filename, "fsencode") write_set_format_filter_by_ext(archive_p, bfn) ffi.write_open_filename(archive_p, bfn) try: yield libarchive.write.ArchiveWrite(archive_p) finally: ffi.write_close(archive_p) finally: ffi.write_free(archive_p) def extract_to_disk(entry: libarchive.entry.ArchiveEntry, path: os.PathLike=None): archive_p, entry_p = entry._archive_p, entry._entry_p #pylint:disable=protected-access if path: ffi.entry_update_pathname_utf8(entry_p, to_bytes(path, "utf-8")) read_extract(archive_p, entry_p, libarchive.extract.EXTRACT_TIME|libarchive.extract.EXTRACT_UNLINK) # Main def parse_args(argv): parser = argparse.ArgumentParser(prog=argv[0], description=""" Patch a folder structure with files from an archive. This will replace existing files with those of the same name in an archive, with the option to back up the old versions and generate a script to revert the changes. """) parser.add_argument("-p", "--strip", type=int, default=0, help="Strip NUM leading components from archived file names.") parser.add_argument("-C", "--directory", type=pathlib.Path, default=pathlib.Path("."), help="Operate in ") parser.add_argument("-b", "--backup", type=pathlib.Path, default=None, help="Create backup copies of overwritten files") parser.add_argument("-B", "--backup-archive", type=pathlib.Path, default=None, help="Create an archive of the original files") parser.add_argument("-m", "--match", default=None, help="Only extract files matching GLOB", metavar="GLOB") parser.add_argument("-u", "--uninstall-script", type=pathlib.Path, default=None, help="Filename to save an uninstall-scipt to.", metavar="FILE") parser.add_argument("-n", "--dry-run", action="store_true", help="Perform a dry run") parser.add_argument("archive", help="Achive file name") return parser.parse_args(argv[1:]) def makedirs(path, dryrun=False): if path.is_dir(): return set() created = set() stack = [path] while stack: path = stack[-1] if path.parent.is_dir(): if path.exists(): raise IOError("Exists but not a directory: '%s'" % path) if dryrun: return set(stack) os.mkdir(path) created.add(stack.pop()) else: stack.append(path.parent) return created def main(argv): args = parse_args(argv) with contextlib.ExitStack() as ctx: # archive name is the only argument not affected by -C archive = ctx.enter_context(libarchive.file_reader(args.archive)) # Change directory to target os.chdir(args.directory) if args.backup_archive and os.path.exists(args.backup_archive): print("\033[31mError: Backup archive file already exist\033[0m") return 3 uninstall_script = None folders = set() files = set() if args.uninstall_script: uninstall_script = ctx.enter_context(open(args.uninstall_script, "x")) os.chmod(uninstall_script.fileno(), 0o755) uninstall_script_dir = args.uninstall_script.parent.resolve() uninstall_script.write(textwrap.dedent(f"""\ #!/bin/sh -eu # Uninstall script generated by: # {shlex.join(sys.argv)} cd "`dirname "$0"`/{pathlib.Path.cwd().relative_to(uninstall_script_dir)}" BACKUP_DIR={f"'{args.backup}'" if args.backup else ' # No backup directory was created (-b)'} BACKUP_ARCHIVE={f"'{args.backup_archive}'" if args.backup_archive else ' # No backup archive was created (-B)'} remove() {{ echo "Removing $1" rm "$1" }} remove_dir() {{ rmdir "$1" || echo "\\033[34mFailed to remove folder $1\\033[0m" }} if [ -n "$BACKUP_DIR" -a -d "$BACKUP_DIR" ]; then restore() {{ echo "Restoring $1 from $BACKUP_DIR" mv "$BACKUP_DIR/$1" "$1" }} elif [ -n "$BACKUP_ARCHIVE" -a -f "$BACKUP_ARCHIVE" ]; then restore() {{ echo "Restoring $1 from $BACKUP_ARCHIVE" bsdtar -Uqxf "$BACKUP_ARCHIVE" "$1" }} elif [ -n "${{PATCHDIR_LOSE_FILES-}}" ]; then restore() {{ echo "Removing $1 \\033[31m[Previously existing file is lost]\\033[0m" rm "$1" }} elif ! grep -qE "restore\\s+[\\"'].+[\\"']" "$0"; then : No files were overwritten else echo "\\033[31mError: Cannot restore original files because no backup is available\\033[0m" echo "Set PATCHDIR_LOSE_FILES=1 to remove changed files without restoring originals" exit 66 fi # Restore Files # """)) backup_dir = None backup_folders = set() if args.backup: backup_dir = args.backup backup_archive = None if args.backup_archive: backup_archive = ctx.enter_context(file_writer_ext(args.backup_archive)) disk_rdr = ctx.enter_context(disk_reader()) for entry in archive: epath = pathlib.Path(entry.path) if args.match and not epath.match(args.match): continue if args.strip: epath = pathlib.Path(*epath.parts[args.strip:]) if entry.isdir: folders |= makedirs(epath, args.dry_run) else: folders |= makedirs(epath.parent, args.dry_run) # Archive might have multiple versions of the same file. # Use the last one, but don't overwrite the backup with intermediate copies if epath not in files: files.add(epath) if epath.exists(): # Backup if backup_archive: print(f"Backing up {epath} to {args.backup_archive}") disk_rdr.add_file_to_archive(backup_archive, epath) if backup_dir: print(f"Moving old {epath} to {args.backup}") bpath = args.backup.joinpath(epath) backup_folders |= makedirs(bpath.parent, args.dry_run) if not args.dry_run: os.rename(epath, bpath) if uninstall_script: uninstall_script.write(f"restore '{epath}'\n") elif uninstall_script: uninstall_script.write(f"remove '{epath}'\n") print(f"Extracting {epath}") if not args.dry_run: extract_to_disk(entry, epath) if uninstall_script and (folders or backup_folders): uninstall_script.write("\n# Remove folders #\n") if backup_folders: uninstall_script.write('if [ -n "$BACKUP_DIR" -a -d "$BACKUP_DIR" ]; then\n') for dname in sorted(backup_folders, key=lambda x: len(x.parts), reverse=True): uninstall_script.write(f"\tremove_dir '{dname}'\n") uninstall_script.write('fi\n') if folders: for dname in sorted(folders, key=lambda x: len(x.parts), reverse=True): uninstall_script.write(f"remove_dir '{dname}'\n") if uninstall_script: uninstall_script.write(f"\n# Remove script #\nremove '{args.uninstall_script}'\n") return 0 if __name__ == "__main__": import sys sys.exit(main(sys.argv))