advancedav: Make it easy to split tasks across multiple ffmpeg workers

MultiAV can manage multiple instances of ffmepeg running in parallel.
Task.split can also split a task with many outputs into multiple smaller ones to distribute them across workers
master
Taeyeon Mori 7 years ago
parent 5790ea24ca
commit 4cfa9a8d82
  1. 422
      lib/python/advancedav.py

@ -35,9 +35,9 @@ from typing import Iterable, Mapping, Sequence, Iterator, MutableMapping
from pathlib import Path, PurePath from pathlib import Path, PurePath
__all__ = "AdvancedAVError", "AdvancedAV", "SimpleAV" __all__ = "AdvancedAVError", "AdvancedAV", "SimpleAV", "MultiAV"
version_info = 2, 99, 0 version_info = 2, 99, 5
# Constants # Constants
DEFAULT_CONTAINER = "matroska" DEFAULT_CONTAINER = "matroska"
@ -73,6 +73,57 @@ def ffmpeg_int(no: str) -> int:
return int(no) return int(no)
class Future:
def __init__(self):
self.result = None
self.finished = False
self.exception = None
self._then = []
self._catch = []
# Consumer
def then(self, fn):
if self.finished:
if not self.exception:
fn(self.result)
else:
self._then.append(fn)
return self
def catch(self, fn):
if self.finished:
if self.exception:
fn(self.exception)
else:
self._catch.append(fn)
return self
# Provider
def complete(self, result=None):
self.result = result
self.finished = True
for c in self._then:
c(result)
return self
def fail(self, exception):
self.exception = exception
self.finished = True
for c in self._catch:
c(exception)
def __enter__(self):
return self.complete
def __exit__(self, exc):
if not self.finished:
if exc:
self.fail(exc)
else:
self.fail(RuntimeError("Future not completed in context"))
# == Base Classes == # == Base Classes ==
class ObjectWithOptions: class ObjectWithOptions:
""" """
@ -366,6 +417,10 @@ class OutputStream(Stream, ObjectWithOptions, ObjectWithMetadata):
bitrate = OptionProperty("b", type=ffmpeg_int) bitrate = OptionProperty("b", type=ffmpeg_int)
class OutputAudioStream(OutputStream):
channels = OptionProperty("ac")
class OutputVideoStream(OutputStream): class OutputVideoStream(OutputStream):
def downscale(self, width, height): def downscale(self, width, height):
# Scale while keeping aspect ratio; never upscale. # Scale while keeping aspect ratio; never upscale.
@ -375,6 +430,7 @@ class OutputVideoStream(OutputStream):
def output_stream_factory(file, source, *args, **more): def output_stream_factory(file, source, *args, **more):
return { return {
S_AUDIO: OutputAudioStream,
S_VIDEO: OutputVideoStream, S_VIDEO: OutputVideoStream,
}.get(source.type, OutputStream)(file, source, *args, **more) }.get(source.type, OutputStream)(file, source, *args, **more)
@ -525,10 +581,7 @@ class InputFile(File):
ffprobe_args = "-show_format", "-show_streams", "-show_chapters", "-print_format", "json" ffprobe_args = "-show_format", "-show_streams", "-show_chapters", "-print_format", "json"
def _initialize_info(self): def _initialize_info(self):
probe = self.pp.call_probe(tuple(Task.argv_options(self.options)) self._information = self.pp.probe_file(self, ffprobe_args_hint=self.ffprobe_args)
+ self.ffprobe_args
+ ("-i", self.filename))
self._information = json.loads(probe)
def _initialize_streams(self): def _initialize_streams(self):
""" Parse the ffprobe output """ Parse the ffprobe output
@ -733,52 +786,21 @@ class OutputFile(File, ObjectWithMetadata):
# === Task Classes === # === Task Classes ===
class Task: class BaseTask:
""" """
Holds information about an AV-processing Task. Task base class
A Task is a collection of Input- and Output-Files and related options.
While OutputFiles are bound to one task at a time, InputFiles can be reused across Tasks.
""" """
output_factory = OutputFile
def __init__(self, pp: "AdvancedAV"): def __init__(self, pp: "AdvancedAV"):
super().__init__() super().__init__()
self.pp = pp self.pp = pp
self.inputs = [] # -- Inputs
""" :type: list[InputFile] """ # inputs: Sequence[InputFile]
self.inputs_by_name = {}
""" :type: dict[str, InputFile] """
self.outputs = []
""" :type: list[OutputFile] """
# -- Manage Inputs
def add_input(self, file: "str | InputFile") -> InputFile:
""" Register an input file
When \param file is already registered as input file to this Task, do nothing.
:param file: Can be either the filename of an input file or an InputFile object.
The latter will be created if the former is passed.
"""
if isinstance(file, PurePath): # Pathlib support
file = str(file)
if isinstance(file, str):
if file in self.inputs_by_name:
return self.inputs_by_name[file]
file = self.pp.create_input(file)
if file not in self.inputs:
self.pp.to_debug("Adding input file #%i: %s", len(self.inputs), file.name)
self.inputs.append(file)
self.inputs_by_name[file.filename] = file
return file @property
def inputs_by_name(self) -> Mapping[str, InputFile]:
return {i.name: i for i in self.inputs}
def qualified_input_stream_spec(self, stream: InputStream) -> str: def qualified_input_stream_spec(self, stream: InputStream) -> str:
""" Construct the qualified input stream spec (combination of input file number and stream spec) """ Construct the qualified input stream spec (combination of input file number and stream spec)
@ -789,43 +811,28 @@ class Task:
if file_index >= 0: if file_index >= 0:
return "{}:{}".format(file_index, stream.stream_spec) return "{}:{}".format(file_index, stream.stream_spec)
# -- Manage Outputs # -- Input Streams
def add_output(self, filename: str, container: str=DEFAULT_CONTAINER, options: Mapping=None) -> OutputFile: def iter_video_streams(self) -> Iterator[InputStream]:
""" Add an output file
NOTE: Contrary to add_input this will NOT take an OutputFile instance and return it.
"""
for outfile in self.outputs:
if outfile.filename == filename:
raise AdvancedAVError("Output File '%s' already added." % filename)
else:
outfile = self.output_factory(self, filename, container, options)
self.pp.to_debug("New output file #%i: %s", len(self.outputs), filename)
self.outputs.append(outfile)
return outfile
# -- Manage Streams
def iter_video_streams(self) -> Iterator:
for input_ in self.inputs: for input_ in self.inputs:
yield from input_.video_streams yield from input_.video_streams
def iter_audio_streams(self) -> Iterator: def iter_audio_streams(self) -> Iterator[InputAudioStream]:
for input_ in self.inputs: for input_ in self.inputs:
yield from input_.audio_streams yield from input_.audio_streams
def iter_subtitle_streams(self) -> Iterator: def iter_subtitle_streams(self) -> Iterator[InputStream]:
for input_ in self.inputs: for input_ in self.inputs:
yield from input_.subtitle_streams yield from input_.subtitle_streams
def iter_attachment_streams(self) -> Iterator: def iter_attachment_streams(self) -> Iterator[InputStream]:
for input_ in self.inputs: for input_ in self.inputs:
yield from input_.attachment_streams yield from input_.attachment_streams
def iter_data_streams(self) -> Iterator: def iter_data_streams(self) -> Iterator[InputStream]:
for input_ in self.inputs: for input_ in self.inputs:
yield from input_.data_streams yield from input_.data_streams
def iter_streams(self) -> Iterator: def iter_streams(self) -> Iterator[InputStream]:
for input_ in self.inputs: for input_ in self.inputs:
yield from input_.streams yield from input_.streams
@ -833,9 +840,12 @@ class Task:
for input_ in self.inputs: for input_ in self.inputs:
yield from input_.chapters yield from input_.chapters
# -- Outputs
# outputs: Sequence[OutputFile]
# -- FFmpeg # -- FFmpeg
@staticmethod @staticmethod
def argv_options(options: Mapping, qualifier: str=None) -> Iterator: def argv_options(options: Mapping, qualifier: str=None) -> Iterator[str]:
""" Yield arbitrary options """ Yield arbitrary options
:type options: Mapping[str, str] :type options: Mapping[str, str]
@ -856,7 +866,7 @@ class Task:
yield str(value) yield str(value)
@staticmethod @staticmethod
def argv_metadata(metadata: Mapping, qualifier: str=None) -> Iterator: def argv_metadata(metadata: Mapping, qualifier: str=None) -> Iterator[str]:
""" Yield arbitrary metadata """ Yield arbitrary metadata
:type metadata: Mapping[str, str] :type metadata: Mapping[str, str]
@ -870,7 +880,7 @@ class Task:
yield opt yield opt
yield "%s=%s" % meta yield "%s=%s" % meta
def generate_args(self) -> Iterator: def generate_args(self) -> Iterator[str]:
""" Generate the ffmpeg commandline for this task """ Generate the ffmpeg commandline for this task
:rtype: Iterator[str] :rtype: Iterator[str]
@ -917,7 +927,7 @@ class Task:
out_fn = output.filename out_fn = output.filename
yield out_fn if out_fn[0] != "-" else "./" + out_fn yield out_fn if out_fn[0] != "-" else "./" + out_fn
def commit(self, additional_args: Iterable=()): def commit(self, additional_args: Iterable=(), immediate=True, **args):
""" """
Commit the changes. Commit the changes.
@ -926,7 +936,122 @@ class Task:
:type additional_args: Iterable[str] :type additional_args: Iterable[str]
:raises: AdvancedAVError when FFmpeg fails :raises: AdvancedAVError when FFmpeg fails
""" """
self.pp.call_conv(itertools.chain(additional_args, self.generate_args())) f = self.pp.commit_task(self, add_ffmpeg_args=additional_args, immediate=immediate, **args)
if f.finished:
if f.exception:
raise f.exception
elif immediate:
raise RuntimeError("Requested immediate commit but result was deferred")
def commit2(self, **args) -> Future:
"""
Commit the changes.
add_ffmpeg_args can be used to pass global arguments to ffmpeg. (like -y)
:type additional_args: Iterable[str]
:returns: a Future
"""
return self.pp.commit_task(self, **args)
# -- Managing the task
def split(self, pieces=0) -> Sequence["PartialTask"]:
"""
Split a task into min(pieces, len(outputs)) partial tasks
"""
parts = []
if pieces > 0:
for i in range(min(len(self.outputs), pieces)):
parts.append([])
for i, output in enumerate(self.outputs):
parts[i % pieces].append(output)
else:
parts = [[output] for output in self.outputs]
return [PartialTask(self, outset) for outset in parts]
class PartialTask(BaseTask):
def __init__(self, parent, outs):
super().__init__(parent.pp)
self.parent = parent
self.outputs = outs
@property
def inputs(self):
return self.parent.inputs
@property
def inputs_by_name(self):
return self.parent.inputs_by_name
class Task(BaseTask):
"""
Holds information about an AV-processing Task.
A Task is a collection of Input- and Output-Files and related options.
While OutputFiles are bound to one task at a time, InputFiles can be reused across Tasks.
"""
output_factory = OutputFile
inputs_by_name = None
def __init__(self, pp: "AdvancedAV"):
super().__init__(pp)
self.inputs = []
""" :type: list[InputFile] """
self.inputs_by_name = {}
""" :type: dict[str, InputFile] """
self.outputs = []
""" :type: list[OutputFile] """
# -- Manage Inputs
def add_input(self, file: "str | InputFile") -> InputFile:
""" Register an input file
When \param file is already registered as input file to this Task, do nothing.
:param file: Can be either the filename of an input file or an InputFile object.
The latter will be created if the former is passed.
"""
if isinstance(file, PurePath): # Pathlib support
file = str(file)
if isinstance(file, str):
if file in self.inputs_by_name:
return self.inputs_by_name[file]
file = self.pp.create_input(file)
if file not in self.inputs:
self.pp.to_debug("Adding input file #%i: %s", len(self.inputs), file.name)
self.inputs.append(file)
self.inputs_by_name[file.filename] = file
return file
# -- Manage Outputs
def add_output(self, filename: str, container: str=DEFAULT_CONTAINER, options: Mapping=None) -> OutputFile:
""" Add an output file
NOTE: Contrary to add_input this will NOT take an OutputFile instance and return it.
"""
for outfile in self.outputs:
if outfile.filename == filename:
raise AdvancedAVError("Output File '%s' already added." % filename)
else:
outfile = self.output_factory(self, filename, container, options)
self.pp.to_debug("New output file #%i: %s", len(self.outputs), filename)
self.outputs.append(outfile)
return outfile
class SimpleTask(Task): class SimpleTask(Task):
@ -980,27 +1105,6 @@ class AdvancedAV(metaclass=ABCMeta):
def to_debug(self, text, *fmt): def to_debug(self, text, *fmt):
self.get_logger().debug(text % fmt) self.get_logger().debug(text % fmt)
# ---- FFmpeg ----
@abstractmethod
def call_conv(self, args: Iterable):
"""
Call ffmpeg.
:param args: Iterable[str] The ffprobe arguments
It should throw an AdvancedAVError if the call fails
"""
pass
@abstractmethod
def call_probe(self, args: Iterable) -> str:
"""
Call ffprobe.
:param args: Iterable[str] The ffprobe arguments
:return: str the standard output
It should throw an AdvancedAVError if the call fails
NOTE: Make sure the locale is set to C so the regexes match
"""
pass
# ---- Create Tasks ---- # ---- Create Tasks ----
def create_task(self) -> Task: def create_task(self) -> Task:
""" """
@ -1018,6 +1122,29 @@ class AdvancedAV(metaclass=ABCMeta):
""" """
return SimpleTask(self, filename, container, options) return SimpleTask(self, filename, container, options)
# ---- Process Tasks ----
@abstractmethod
def commit_task(self, task: Task, *, add_ffmpeg_args: Sequence[str]=None, immediate: bool=False) -> Future:
"""
Execute a task
:param add_ffmpeg_args: List[str] arguments to add to ffmpeg call, if ffmpeg is used
:param immediate: Request that the task is executed synchronously
:return: A simple (possibly finished) future object describing the result
"""
# ---- Analyze Files ----
@abstractmethod
def probe_file(self, path, *, ffprobe_args_hint: Sequence[str]=None) -> Mapping[str, object]:
"""
Analyze a media file
:param path: The file path
:param ffprobe_args_hint: A hint as to which arguments would need to be passed to ffprobe to
supply all needed information
:return: The media information, in parsed ffmpeg JSON format
"""
# ---- Create InputFiles ---- # ---- Create InputFiles ----
def create_input(self, filename: str, options=None): def create_input(self, filename: str, options=None):
""" """
@ -1056,23 +1183,29 @@ class SimpleAV(AdvancedAV):
_posix_env = dict(os.environ) _posix_env = dict(os.environ)
_posix_env["LANG"] = _posix_env["LC_ALL"] = "C" _posix_env["LANG"] = _posix_env["LC_ALL"] = "C"
def call_conv(self, args: Iterable): def make_conv_argv(self, task, add_ffmpeg_args):
""" Actually call ffmpeg return tuple(itertools.chain((self._ffmpeg,), self.global_args, self.global_conv_args,
add_ffmpeg_args, task.generate_args()))
:type args: Iterable[str] def commit_task(self, task, *, add_ffmpeg_args=(), immediate=True):
""" with Future() as f:
argv = tuple(itertools.chain((self._ffmpeg,), self.global_args, self.global_conv_args, args)) argv = self.make_conv_argv(task, add_ffmpeg_args)
self.to_debug("Running Command: %s", argv) self.to_debug("Running Command: %s", argv)
output = None if self.ffmpeg_output else subprocess.DEVNULL output = None if self.ffmpeg_output else subprocess.DEVNULL
subprocess.call(argv, stdout=output, stderr=output) subprocess.call(argv, stdout=output, stderr=output)
def call_probe(self, args: Iterable): return f()
""" Call ffprobe (With LANG=LC_ALL=C)
:type args: Iterable[str] def call_probe(self, args: Iterable):
"""
Call ffprobe.
:param args: Iterable[str] The ffprobe arguments
:return: str the standard output
It should throw an AdvancedAVError if the call fails
NOTE: Make sure the locale is set to C so the regexes match
""" """
argv = tuple(itertools.chain((self._ffprobe,), self.global_args, self.global_probe_args, args)) argv = tuple(itertools.chain((self._ffprobe,), self.global_args, self.global_probe_args, args))
@ -1088,3 +1221,86 @@ class SimpleAV(AdvancedAV):
raise AdvancedAVError(msg) raise AdvancedAVError(msg)
return out.decode("utf-8", "replace") return out.decode("utf-8", "replace")
def probe_file(self, file, *, ffprobe_args_hint=None):
probe = self.call_probe(tuple(BaseTask.argv_options(file.options))
+ ffprobe_args_hint
+ ("-i", file.filename))
return json.loads(probe)
class MultiAV(SimpleAV):
def __init__(self, workers=1, ffmpeg=None, ffprobe=None):
super().__init__(ffmpeg=ffmpeg, ffprobe=ffprobe)
self.concurrent = workers
self.workers = {}
self.queue = collections.deque()
# Enqueue
def commit_task(self, task, *, add_ffmpeg_args=(), immediate=False):
if immediate:
return super().commit_task(task, add_ffmpeg_args=add_ffmpeg_args)
else:
f = Future()
self.queue.append((f, task, add_ffmpeg_args))
return f
# Process
def process_queue(self):
"""
Process tasks until queue is empty.
Note that the last few tasks may still be running in the background when this returns
"""
from time import sleep
while self.queue:
self.manage_workers()
sleep(.250)
def manage_workers(self):
"""
Make a single run over available workers and see to it that they have work if available
"""
for id in range(self.concurrent):
if not self.poll_worker(id) and self.queue:
self.workers[id] = self._spawn_next()
def wait(self):
""" Wait for processing to finish up """
while self.workers:
for id, (worker, f) in list(self.workers.items()):
worker.wait()
self.poll_worker(id)
def process_serial(self):
""" Process the queue one task at a time """
while self.queue:
p, f = self.spawn_next()
with f:
p.wait()
p.complete()
def poll_worker(self, id):
""" See if a worker is still running and clean it up otherwise """
if id in self.workers:
worker, future = self.workers[id]
if worker.poll() is not None:
if worker.returncode != 0:
future.fail(AdvancedAVError("ffmpeg returned %d" % worker.returncode))
else:
future.complete()
del self.workers[id]
else:
return True
return False
def _spawn_next(self, **b):
""" Spawn next worker """
f, task, add_ffmpeg_args = self.queue.popleft()
argv = self.make_conv_argv(task, add_ffmpeg_args)
self.to_debug("Running: %s" % (argv,))
return subprocess.Popen(self.make_conv_argv(task, add_ffmpeg_args), **b), f

Loading…
Cancel
Save