patchdir: add update mode, remove backup dirs

master
Taeyeon Mori 3 years ago
parent 96c90145ec
commit 7c46de9fa4
  1. 297
      bin/patchdir

@ -4,11 +4,13 @@
#pylint: disable=missing-module-docstring,missing-function-docstring #pylint: disable=missing-module-docstring,missing-function-docstring
import argparse import argparse
import fnmatch
import pathlib import pathlib
import contextlib import contextlib
import os import os
import ctypes import ctypes
import textwrap import textwrap
import re
import shlex import shlex
import libarchive import libarchive
@ -38,10 +40,10 @@ except AttributeError:
ffi.c_int, ffi.check_int) ffi.c_int, ffi.check_int)
try: try:
read_extract = ffi.read_extract read_extract2 = ffi.read_extract2
except AttributeError: except AttributeError:
read_extract = ffi.ffi("read_extract", read_extract2 = ffi.ffi("read_extract2",
[ffi.c_archive_p, ffi.c_archive_entry_p, ffi.c_int], [ffi.c_archive_p, ffi.c_archive_entry_p, ffi.c_archive_p],
ffi.c_int, ffi.check_int) ffi.c_int, ffi.check_int)
def to_bytes(path, encoding="utf-8"): def to_bytes(path, encoding="utf-8"):
@ -89,11 +91,14 @@ def disk_reader(path=None, flags=0, lookup=True):
ffi.read_free(ard_p) ffi.read_free(ard_p)
@contextlib.contextmanager @contextlib.contextmanager
def file_writer_ext(filename): def file_writer_ext(filename, target_filename=None):
archive_p = ffi.write_new() archive_p = ffi.write_new()
try: try:
bfn = to_bytes(filename, "fsencode") bfn = to_bytes(filename, "fsencode")
if target_filename is None:
write_set_format_filter_by_ext(archive_p, bfn) write_set_format_filter_by_ext(archive_p, bfn)
else:
write_set_format_filter_by_ext(archive_p, to_bytes(target_filename, "fsencode"))
ffi.write_open_filename(archive_p, bfn) ffi.write_open_filename(archive_p, bfn)
try: try:
yield libarchive.write.ArchiveWrite(archive_p) yield libarchive.write.ArchiveWrite(archive_p)
@ -102,40 +107,150 @@ def file_writer_ext(filename):
finally: finally:
ffi.write_free(archive_p) ffi.write_free(archive_p)
def extract_to_disk(entry: libarchive.entry.ArchiveEntry, path: os.PathLike=None): def extract_entry(entry: libarchive.entry.ArchiveEntry, dst_p: libarchive.ffi.c_archive_p, path: os.PathLike=None):
archive_p, entry_p = entry._archive_p, entry._entry_p #pylint:disable=protected-access archive_p, entry_p = entry._archive_p, entry._entry_p #pylint:disable=protected-access
if path: if path:
ffi.entry_update_pathname_utf8(entry_p, to_bytes(path, "utf-8")) ffi.entry_update_pathname_utf8(entry_p, to_bytes(path, "utf-8"))
read_extract(archive_p, entry_p, read_extract2(archive_p, entry_p, dst_p)
libarchive.extract.EXTRACT_TIME|libarchive.extract.EXTRACT_UNLINK)
def copy_entry(entry: libarchive.entry.ArchiveEntry, dst: libarchive.write.ArchiveWrite, path: os.PathLike=None,
*, bufsize=ffi.page_size*8):
archive_p, entry_p, dst_p = entry._archive_p, entry._entry_p, dst._pointer #pylint:disable=protected-access
if path:
ffi.entry_update_pathname_utf8(entry_p, to_bytes(path, "utf-8"))
# read_extract2 and write_data_block only supported with archive_write_disk :(
#read_extract2(archive_p, entry_p, dst_p)
ffi.write_header(dst_p, entry_p)
buf_p = ctypes.create_string_buffer(bufsize)
read_data, write_data = ffi.read_data, ffi.write_data
while (bytes_read := read_data(archive_p, buf_p, bufsize)) > 0:
write_data(dst_p, buf_p, bytes_read)
ffi.write_finish_entry(dst_p)
def copy_all_entries(src: libarchive.read.ArchiveRead, dst: libarchive.write.ArchiveWrite) -> list[pathlib.Path]:
entries = []
with libarchive.entry.new_archive_entry() as entry_p:
archive_p, dst_p = src._pointer, dst._pointer
buf_p = ctypes.create_string_buffer(bufsize)
read_data, write_data = ffi.read_data, ffi.write_data
while ffi.read_next_header2(archive_p, entry_p) != ARCHIVE_EOF:
entries.append(pathlib.Path(entry_pathname_w(entry_p)))
#read_extract2(archive_p, entry_p, dst_p)
ffi.write_header(dst_p, entry_p)
while (bytes_read := read_data(archive_p, buf_p, bufsize)) > 0:
write_data(dst_p, buf_p, bytes_read)
ffi.write_finish_entry(dst_p)
return entries
def list_all_entries(src: libarchive.read.ArchiveRead) -> list[pathlib.Path]:
entries = []
with libarchive.entry.new_archive_entry() as entry_p:
src_p = src._pointer
read_next_header2, ARCHIVE_EOF, entry_pathname_w, Path = \
ffi.read_next_header2, ffi.ARCHIVE_EOF, ffi.entry_pathname_w, pathlib.Path
while read_next_header2(src_p, entry_p) != ffi.ARCHIVE_EOF:
entries.append(Path(entry_pathname_w(entry_p)))
return entries
# Main # Main
def confirm(msg, code=0):
print(msg)
reply = input("Continue? [n] ")
if not (res := reply.lower().startswith("y")) and code != 0:
sys.exit(code)
return res
def glob_compile(pattern):
return re.compile(fnmatch.translate(pattern))
def parse_args(argv): def parse_args(argv):
parser = argparse.ArgumentParser(prog=argv[0], description=""" parser = argparse.ArgumentParser(prog=argv[0], description="""
Patch a folder structure with files from an archive. Patch a folder structure with files from an archive.
This will replace existing files with those of the same name in an archive, This will replace existing files with those of the same name in an archive,
with the option to back up the old versions and generate a script to revert the changes. with the option to back up the old versions and generate a script to revert the changes.
""") """)
parser.add_argument("-p", "--strip", type=int, default=0, parser.add_argument("archive", type=pathlib.Path,
help="Strip NUM leading components from archived file names.") help="Achive file name")
parser.add_argument("-C", "--directory", type=pathlib.Path, default=pathlib.Path("."), parser.add_argument("-C", "--directory", type=pathlib.Path, default=pathlib.Path("."),
help="Operate in <direcory>") help="Operate in <direcory>")
parser.add_argument("-b", "--backup", type=pathlib.Path, default=None,
help="Create backup copies of overwritten files")
parser.add_argument("-B", "--backup-archive", type=pathlib.Path, default=None,
help="Create an archive of the original files")
parser.add_argument("-m", "--match", default=None,
help="Only extract files matching GLOB", metavar="GLOB")
parser.add_argument("-u", "--uninstall-script", type=pathlib.Path, default=None,
help="Filename to save an uninstall-scipt to.", metavar="FILE")
parser.add_argument("-n", "--dry-run", action="store_true", parser.add_argument("-n", "--dry-run", action="store_true",
help="Perform a dry run") help="Perform a dry run")
parser.add_argument("archive", help="Achive file name") parser.add_argument("--noconfirm", action="store_true",
help="Don't ask for confirmation on possibly dangerous operations")
return parser.parse_args(argv[1:]) target = parser.add_argument_group("Target")
target.add_argument("-p", "--strip", type=int, default=0,
help="Strip N leading path components from archived file names.", metavar="N")
target.add_argument("-P", "--prefix", type=pathlib.Path, default=None,
help="Prepend leading path components to archived file names.")
fmatch = target.add_mutually_exclusive_group()
fmatch.add_argument("-m", "--match", dest="match", type=glob_compile, default=None,
help="Only extract files matching GLOB", metavar="GLOB")
fmatch.add_argument("-M", "--regex", dest="match", type=re.compile, default=None,
help="Only extract files matching REGEX", metavar="REGEX")
backup = parser.add_argument_group("Backup")
backup.add_argument("-B", "--backup-archive", type=pathlib.Path, default=None,
help="Create an archive of the original files", metavar="FILE")
backup.add_argument("-u", "--uninstall-script", type=pathlib.Path, default=None,
help="Filename to save an uninstall-scipt to. Shoul be combined with -B", metavar="FILE")
backup.add_argument("-a", "--append", action="store_true", default=False,
help="Update existing backup files")
preset = parser.add_mutually_exclusive_group()
preset.add_argument("-U", "--uninstall-preset", action="store_const", dest="preset", const="uninstall",
help="Short-hand for '-B %%n.uninstall.tar.xz -u %%n.uninstall.sh', %%n being the archive basename")
preset.add_argument("-A", "--append-preset", action="store_const", dest="preset", const="append",
help="Short-hand for '-a -B patchdir_backup.tar.xz -u patchdir_restore.sh")
args = parser.parse_args(argv[1:])
if args.preset == "uninstall":
# Ignore when combined with explicit -u and -B; allows general-purpose alias patchdir='patchdir -U'
if args.backup_archive is None:
args.backup_archive = pathlib.Path(f"{args.archive.stem}.uninstall.tar.xz")
if args.uninstall_script is None:
args.uninstall_script = pathlib.Path(f"{args.archive.stem}.uninstall.sh")
elif args.preset == "append":
args.append = True
if args.backup_archive is None:
args.backup_archive = pathlib.Path("patchdir_backup.tar.xz")
if args.uninstall_script is None:
args.uninstall_script = pathlib.Path("patchdir_restore.sh")
if not args.noconfirm:
if not args.uninstall_script or not args.backup_archive:
confirm("Original files will be lost without -B and u. Consider using -U.", 1)
return args
# Parsing uninstall scripts
SCRIPT_TAG = "#> patchdir restore script v1 <#"
SCRIPT_BACKUP_CMDS = {"restore"}
SCRIPT_REMOVE_CMDS = {"remove"}
SCRIPT_DIR_CMDS = {"remove_dir"}
SCRIPT_META_CMDS = {"cleanup"}
SCRIPT_CMDS = SCRIPT_BACKUP_CMDS|SCRIPT_REMOVE_CMDS|SCRIPT_DIR_CMDS|SCRIPT_META_CMDS
def read_script(path):
# Relies on ordered dictionary implementation
is_script = False
operations = {}
with open(path) as f:
for ln, line in enumerate(f):
if line.strip() == SCRIPT_TAG:
is_script = True
elif is_script:
cmd = shlex.split(line)
if len(cmd) != 2 or cmd[0] not in SCRIPT_CMDS:
raise ValueError(f"Failed to read invalid line {ln} in script {path}", line)
if cmd not in SCRIPT_META_CMDS:
operations[pathlib.Path(cmd[1])] = cmd[0]
if not is_script:
raise ValueError("Script for appending {path} doesn't seem to be a pathdir script")
return operations
def write_script(fp, cmd, path):
fp.write(f"{cmd:10s} {shlex.quote(str(path))}\n")
# Helpers
def makedirs(path, dryrun=False): def makedirs(path, dryrun=False):
if path.is_dir(): if path.is_dir():
return set() return set()
@ -156,27 +271,51 @@ def makedirs(path, dryrun=False):
return created return created
def main(argv): def main(argv):
args = parse_args(argv) args = parse_args(argv)
with contextlib.ExitStack() as ctx: with contextlib.ExitStack() as ctx:
# archive name is the only argument not affected by -C # archive name is the only argument not affected by -C
archive = ctx.enter_context(libarchive.file_reader(args.archive)) archive = ctx.enter_context(libarchive.file_reader(str(args.archive)))
# Change directory to target # Change directory to target
os.chdir(args.directory) os.chdir(args.directory)
if args.backup_archive and os.path.exists(args.backup_archive): do_append = False
if args.append:
# Check consistency for append
checks = [p.exists()
for p in (args.backup_archive, args.uninstall_script)
if p is not None]
do_append = any(check)
if do_append and not all(check):
raise FileExistsError("Inconsistency in existing files to append to.")
else:
if args.backup_archive and args.backup_archive.exists():
if args.noconfirm:
print("\033[31mError: Backup archive file already exist\033[0m") print("\033[31mError: Backup archive file already exist\033[0m")
return 3 return 3
else:
confirm("Backup archive already exists.", 3)
uninstall_script = None uninstall_script = None
operations = {}
folders = set() folders = set()
files = set() files = set()
if args.uninstall_script: if args.uninstall_script:
uninstall_script = ctx.enter_context(open(args.uninstall_script, "x")) if do_append:
operations = read_script(args.uninstall_script)
elif args.uninstall_script.exists():
if args.noconfirm:
print("\033[31mError: Uninstall script file already exist\033[0m")
return 3
else:
confirm("Uninstall script already exists. Overwrite?", 3)
if not args.dry_run:
uninstall_script = ctx.enter_context(open(args.uninstall_script, "w"))
os.chmod(uninstall_script.fileno(), 0o755) os.chmod(uninstall_script.fileno(), 0o755)
else:
uninstall_script = ctx.enter_context(open("/dev/null", "w"))
uninstall_script_dir = args.uninstall_script.parent.resolve() uninstall_script_dir = args.uninstall_script.parent.resolve()
uninstall_script.write(textwrap.dedent(f"""\ uninstall_script.write(textwrap.dedent(f"""\
@ -186,7 +325,6 @@ def main(argv):
cd "`dirname "$0"`/{pathlib.Path.cwd().relative_to(uninstall_script_dir)}" cd "`dirname "$0"`/{pathlib.Path.cwd().relative_to(uninstall_script_dir)}"
BACKUP_DIR={f"'{args.backup}'" if args.backup else ' # No backup directory was created (-b)'}
BACKUP_ARCHIVE={f"'{args.backup_archive}'" if args.backup_archive else ' # No backup archive was created (-B)'} BACKUP_ARCHIVE={f"'{args.backup_archive}'" if args.backup_archive else ' # No backup archive was created (-B)'}
remove() {{ remove() {{
@ -196,12 +334,7 @@ def main(argv):
remove_dir() {{ remove_dir() {{
rmdir "$1" || echo "\\033[34mFailed to remove folder $1\\033[0m" rmdir "$1" || echo "\\033[34mFailed to remove folder $1\\033[0m"
}} }}
if [ -n "$BACKUP_DIR" -a -d "$BACKUP_DIR" ]; then if [ -n "$BACKUP_ARCHIVE" -a -f "$BACKUP_ARCHIVE" ]; then
restore() {{
echo "Restoring $1 from $BACKUP_DIR"
mv "$BACKUP_DIR/$1" "$1"
}}
elif [ -n "$BACKUP_ARCHIVE" -a -f "$BACKUP_ARCHIVE" ]; then
restore() {{ restore() {{
echo "Restoring $1 from $BACKUP_ARCHIVE" echo "Restoring $1 from $BACKUP_ARCHIVE"
bsdtar -Uqxf "$BACKUP_ARCHIVE" "$1" bsdtar -Uqxf "$BACKUP_ARCHIVE" "$1"
@ -211,36 +344,84 @@ def main(argv):
echo "Removing $1 \\033[31m[Previously existing file is lost]\\033[0m" echo "Removing $1 \\033[31m[Previously existing file is lost]\\033[0m"
rm "$1" rm "$1"
}} }}
elif ! grep -qE "restore\\s+[\\"'].+[\\"']" "$0"; then elif ! grep -qE "restore\\s+([\\"'].+[\\"']|(?\\!script)\\S+\\s*(#.*)?\\$)" "$0"; then
: No files were overwritten : No files were overwritten
else else
echo "\\033[31mError: Cannot restore original files because no backup is available\\033[0m" echo "\\033[31mError: Cannot restore original files because no backup is available\\033[0m"
echo "Set PATCHDIR_LOSE_FILES=1 to remove changed files without restoring originals" echo "Set PATCHDIR_LOSE_FILES=1 to remove changed files without restoring originals"
exit 66 exit 66
fi fi
if [ "$1" = "-k" -o -n "$PATCHDIR_KEEP_BACKUP" ];
cleanup() {{
echo "Keeping $1"
}}
else
cleanup() {{
echo "Cleaning up $1"
rm "$1"
}}
fi
{SCRIPT_TAG}
# Restore Files # # Restore Files #
""")) """))
backup_dir = None if operations:
backup_folders = set() for cmd, path in operations:
if args.backup: if cmd not in SCRIPT_DIR_CMDS:
backup_dir = args.backup write_script(uninstall_script, cmd, path)
backup_archive = None backup_archive = None
backup_archive_prepended = set()
if args.backup_archive: if args.backup_archive:
if do_append:
# Prepend old contents
backup_archive_tmpname = args.backup_archive.with_name(args.backup_archive.name + ".tmp")
if not args.dry_run:
backup_archive = ctx.enter_context(file_writer_ext(backup_archive_tmpname, args.backup_archive))
else:
backup_archive = ctx.enter_context(file_writer_ext("/dev/null", args.backup_archive))
with libarchive.file_reader(str(args.backup_archive)) as ar:
backup_archive_prepended.update(copy_all_entries(ar, backup_archive))
else:
if not args.dry_run:
backup_archive = ctx.enter_context(file_writer_ext(args.backup_archive)) backup_archive = ctx.enter_context(file_writer_ext(args.backup_archive))
else:
backup_archive = ctx.enter_context(file_writer_ext("/dev/null", args.backup_archive))
disk_rdr = ctx.enter_context(disk_reader()) disk_rdr = ctx.enter_context(disk_reader())
for entry in archive: # Check consistency
epath = pathlib.Path(entry.path) prev_restore_files = set()
prev_remove_files = set()
for op, path in operations:
if op in SCRIPT_BACKUP_CMDS:
if backup_archive and path not in backup_archive_prepended:
print("\033[34mWarn: {path} referenced in uninstall script but missing in backup archive\033[0m")
prev_restore_files.add(path)
elif op in SCRIPT_REMOVE_CMDS:
prev_remove_files.add(path)
elif op == "remove_dir":
folders.add(path)
# Open disk for writing
if not args.dry_run:
extract = ctx.enter_context(libarchive.extract.new_archive_write_disk(
libarchive.extract.EXTRACT_TIME|libarchive.extract.EXTRACT_UNLINK))
if args.match and not epath.match(args.match): root = pathlib.PurePath('/')
for entry in archive:
if args.match:
if not args.match.match('/'+entry.path):
continue continue
epath = pathlib.Path(entry.path)
if args.strip: if args.strip:
epath = pathlib.Path(*epath.parts[args.strip:]) epath = pathlib.Path(*epath.parts[args.strip:])
if args.prefix:
epath = args.prefix / epath
if entry.isdir: if entry.isdir:
folders |= makedirs(epath, args.dry_run) folders |= makedirs(epath, args.dry_run)
else: else:
@ -254,37 +435,33 @@ def main(argv):
# Backup # Backup
if backup_archive: if backup_archive:
print(f"Backing up {epath} to {args.backup_archive}") print(f"Backing up {epath} to {args.backup_archive}")
# Skip if an older version in archive or it originally didn't exist
if epath not in backup_archive_prepended and epath not in prev_remove_files:
disk_rdr.add_file_to_archive(backup_archive, epath) disk_rdr.add_file_to_archive(backup_archive, epath)
if backup_dir: if uninstall_script and epath not in operations:
print(f"Moving old {epath} to {args.backup}") write_script(uninstall_script, "restore", epath)
bpath = args.backup.joinpath(epath)
backup_folders |= makedirs(bpath.parent, args.dry_run)
if not args.dry_run:
os.rename(epath, bpath)
if uninstall_script:
uninstall_script.write(f"restore '{epath}'\n")
elif uninstall_script: elif uninstall_script:
uninstall_script.write(f"remove '{epath}'\n") write_script(uninstall_script, "remove", epath)
print(f"Extracting {epath}") print(f"Extracting {epath}")
if not args.dry_run: if not args.dry_run:
extract_to_disk(entry, epath) extract_entry(entry, extract, epath)
if uninstall_script and (folders or backup_folders): if do_append and backup_archive and not args.dry_run:
uninstall_script.write("\n# Remove folders #\n") os.rename(backup_archive_tmpname, args.backup_archive)
if backup_folders: if uninstall_script and folders:
uninstall_script.write('if [ -n "$BACKUP_DIR" -a -d "$BACKUP_DIR" ]; then\n') uninstall_script.write("\n# Remove folders #\n")
for dname in sorted(backup_folders, key=lambda x: len(x.parts), reverse=True):
uninstall_script.write(f"\tremove_dir '{dname}'\n")
uninstall_script.write('fi\n')
if folders: if folders:
for dname in sorted(folders, key=lambda x: len(x.parts), reverse=True): for dname in sorted(folders, key=lambda x: len(x.parts), reverse=True):
uninstall_script.write(f"remove_dir '{dname}'\n") write_script(uninstall_script, "remove_dir", dname)
if uninstall_script: if uninstall_script:
uninstall_script.write(f"\n# Remove script #\nremove '{args.uninstall_script}'\n") uninstall_script.write(f"\n# Remove script #\n")
write_script(uninstall_script, "cleanup", args.uninstall_script)
if args.backup_archive:
write_script(uninstall_script, "cleanup", args.backup_archive)
return 0 return 0

Loading…
Cancel
Save