From 010ea8b43fe90ccd32fd8bcf72755c833521eaf9 Mon Sep 17 00:00:00 2001 From: Taeyeon Mori Date: Sat, 11 Nov 2017 23:17:08 +0900 Subject: [PATCH] xconv: Run ffmpeg concurrently --- lib/python/xconv/app.py | 34 ++++++++++++++++++++++------ lib/python/xconv/cmdline.py | 2 ++ lib/python/xconv/profiles/getsubs.py | 2 +- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/lib/python/xconv/app.py b/lib/python/xconv/app.py index 3690850..8a2ec9d 100644 --- a/lib/python/xconv/app.py +++ b/lib/python/xconv/app.py @@ -53,11 +53,19 @@ class SimpleTask(advancedav.SimpleTask): class AdvancedTask(advancedav.Task): output_factory = OutputFile - def __init__(self, aav, basename): - self.output_basename = basename + def __init__(self, aav, output_prefix): + self.output_prefix = output_prefix + self.output_directory = os.path.dirname(output_prefix) + self.output_basename = os.path.basename(output_prefix) super().__init__(aav) +class Manager(advancedav.MultiAV): + def _spawn_next(self, **b): + print("\033[32m Processing '%s'\033[0m" % task_name(self.queue[0][1])) + return super()._spawn_next(**b) + + # == App == def make_basename(path, infile): return build_path(path, splitext(basename(infile))[0]) @@ -125,12 +133,12 @@ def main(argv): return -1 # Initialize AAV - aav = advancedav.SimpleAV(ffmpeg=args.ffmpeg, ffprobe=args.ffprobe) + aav = Manager(ffmpeg=args.ffmpeg, ffprobe=args.ffprobe, workers=args.concurrent) if args.quiet: aav.global_conv_args = "-loglevel", "warning" - aav.global_args += "-hide_banner", + aav.global_args += "-hide_banner", "-stats" # Collect Tasks tasks = [] @@ -190,10 +198,22 @@ def main(argv): print("\033[35mExecuting Tasks..\033[0m\033[K") + # Paralellize + if args.concurrent > 1 and not args.merge and not args.concat: + tasks = sum([task.split(args.concurrent) for task in tasks], []) + # Commit - for task in tasks: - print("\033[32m Processing '%s'\033[0m" % task_name(task)) - task.commit() + for task in tasks: + task.commit2().then(lambda x: print("\033[32m Finished '%s'\033[0m" % task_name(task)))\ + .catch(lambda e: print("\033[31m Failed '%s': %s\033[0m" % (task_name(task), e))) + + aav.process_queue() + aav.wait() + + else: + for task in tasks: + print("\033[32m Processing '%s'\033[0m" % task_name(task)) + task.commit() # Clean up if args.concat: diff --git a/lib/python/xconv/cmdline.py b/lib/python/xconv/cmdline.py index 337852a..674b7d9 100644 --- a/lib/python/xconv/cmdline.py +++ b/lib/python/xconv/cmdline.py @@ -33,6 +33,7 @@ from advancedav import version_info as aav_version_info from argparse import ArgumentParser, Action from pathlib import Path from os.path import basename +from multiprocessing import cpu_count version = "%s (AdvancedAV %s)" % (".".join(map(str, version_info)), ".".join(map(str, aav_version_info))) @@ -122,6 +123,7 @@ def parse_args(argv): # Available Options parser.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true") parser.add_argument("-q", "--quiet", help="Be less verbose", action="store_true") + parser.add_argument("-j", "--concurrent", help="Run ffmpeg concurrently using at most N instances [%(default)s]", metavar="N", default=cpu_count()) profile = parser.add_argument_group("Profile") profile.add_argument("-l", "--list-profiles", help="List profiles and quit", action=ProfilesAction) profile.add_argument("-i", "--profile-info", help="Give info about a profile and quit", metavar="PROFILE", action=ProfileInfoAction) diff --git a/lib/python/xconv/profiles/getsubs.py b/lib/python/xconv/profiles/getsubs.py index 366ebf6..0a04232 100644 --- a/lib/python/xconv/profiles/getsubs.py +++ b/lib/python/xconv/profiles/getsubs.py @@ -34,7 +34,7 @@ from ..profile import * @features(no_single_output=True) def getsubs(task, defines): for stream in task.iter_subtitle_streams(): - of = task.add_output("%s.%s.%s" % (task.output_basename, task.inputs.index(stream.file), stream.stream_spec), None) # TODO get real file extension + of = task.add_output("%s.%s.%s" % (task.output_prefix, task.inputs.index(stream.file), stream.stream_spec), None) # TODO get real file extension os = of.map_stream(stream) if "format" in defines: os.codec = defines["format"]