diff --git a/lib/python/advancedav.py b/lib/python/advancedav.py index d3c7747..a3dd0aa 100644 --- a/lib/python/advancedav.py +++ b/lib/python/advancedav.py @@ -35,9 +35,9 @@ from typing import Iterable, Mapping, Sequence, Iterator, MutableMapping from pathlib import Path, PurePath -__all__ = "AdvancedAVError", "AdvancedAV", "SimpleAV" +__all__ = "AdvancedAVError", "AdvancedAV", "SimpleAV", "MultiAV" -version_info = 2, 99, 0 +version_info = 2, 99, 5 # Constants DEFAULT_CONTAINER = "matroska" @@ -73,6 +73,57 @@ def ffmpeg_int(no: str) -> int: return int(no) +class Future: + def __init__(self): + self.result = None + self.finished = False + self.exception = None + + self._then = [] + self._catch = [] + + # Consumer + def then(self, fn): + if self.finished: + if not self.exception: + fn(self.result) + else: + self._then.append(fn) + return self + + def catch(self, fn): + if self.finished: + if self.exception: + fn(self.exception) + else: + self._catch.append(fn) + return self + + # Provider + def complete(self, result=None): + self.result = result + self.finished = True + for c in self._then: + c(result) + return self + + def fail(self, exception): + self.exception = exception + self.finished = True + for c in self._catch: + c(exception) + + def __enter__(self): + return self.complete + + def __exit__(self, exc): + if not self.finished: + if exc: + self.fail(exc) + else: + self.fail(RuntimeError("Future not completed in context")) + + # == Base Classes == class ObjectWithOptions: """ @@ -366,6 +417,10 @@ class OutputStream(Stream, ObjectWithOptions, ObjectWithMetadata): bitrate = OptionProperty("b", type=ffmpeg_int) +class OutputAudioStream(OutputStream): + channels = OptionProperty("ac") + + class OutputVideoStream(OutputStream): def downscale(self, width, height): # Scale while keeping aspect ratio; never upscale. @@ -375,6 +430,7 @@ class OutputVideoStream(OutputStream): def output_stream_factory(file, source, *args, **more): return { + S_AUDIO: OutputAudioStream, S_VIDEO: OutputVideoStream, }.get(source.type, OutputStream)(file, source, *args, **more) @@ -525,10 +581,7 @@ class InputFile(File): ffprobe_args = "-show_format", "-show_streams", "-show_chapters", "-print_format", "json" def _initialize_info(self): - probe = self.pp.call_probe(tuple(Task.argv_options(self.options)) - + self.ffprobe_args - + ("-i", self.filename)) - self._information = json.loads(probe) + self._information = self.pp.probe_file(self, ffprobe_args_hint=self.ffprobe_args) def _initialize_streams(self): """ Parse the ffprobe output @@ -733,52 +786,21 @@ class OutputFile(File, ObjectWithMetadata): # === Task Classes === -class Task: +class BaseTask: """ - Holds information about an AV-processing Task. - - A Task is a collection of Input- and Output-Files and related options. - While OutputFiles are bound to one task at a time, InputFiles can be reused across Tasks. + Task base class """ - - output_factory = OutputFile - def __init__(self, pp: "AdvancedAV"): super().__init__() self.pp = pp - self.inputs = [] - """ :type: list[InputFile] """ - self.inputs_by_name = {} - """ :type: dict[str, InputFile] """ - - self.outputs = [] - """ :type: list[OutputFile] """ - - # -- Manage Inputs - def add_input(self, file: "str | InputFile") -> InputFile: - """ Register an input file - - When \param file is already registered as input file to this Task, do nothing. - - :param file: Can be either the filename of an input file or an InputFile object. - The latter will be created if the former is passed. - """ - if isinstance(file, PurePath): # Pathlib support - file = str(file) - if isinstance(file, str): - if file in self.inputs_by_name: - return self.inputs_by_name[file] - - file = self.pp.create_input(file) - - if file not in self.inputs: - self.pp.to_debug("Adding input file #%i: %s", len(self.inputs), file.name) - self.inputs.append(file) - self.inputs_by_name[file.filename] = file + # -- Inputs + # inputs: Sequence[InputFile] - return file + @property + def inputs_by_name(self) -> Mapping[str, InputFile]: + return {i.name: i for i in self.inputs} def qualified_input_stream_spec(self, stream: InputStream) -> str: """ Construct the qualified input stream spec (combination of input file number and stream spec) @@ -789,43 +811,28 @@ class Task: if file_index >= 0: return "{}:{}".format(file_index, stream.stream_spec) - # -- Manage Outputs - def add_output(self, filename: str, container: str=DEFAULT_CONTAINER, options: Mapping=None) -> OutputFile: - """ Add an output file - - NOTE: Contrary to add_input this will NOT take an OutputFile instance and return it. - """ - for outfile in self.outputs: - if outfile.filename == filename: - raise AdvancedAVError("Output File '%s' already added." % filename) - else: - outfile = self.output_factory(self, filename, container, options) - self.pp.to_debug("New output file #%i: %s", len(self.outputs), filename) - self.outputs.append(outfile) - return outfile - - # -- Manage Streams - def iter_video_streams(self) -> Iterator: + # -- Input Streams + def iter_video_streams(self) -> Iterator[InputStream]: for input_ in self.inputs: yield from input_.video_streams - def iter_audio_streams(self) -> Iterator: + def iter_audio_streams(self) -> Iterator[InputAudioStream]: for input_ in self.inputs: yield from input_.audio_streams - def iter_subtitle_streams(self) -> Iterator: + def iter_subtitle_streams(self) -> Iterator[InputStream]: for input_ in self.inputs: yield from input_.subtitle_streams - def iter_attachment_streams(self) -> Iterator: + def iter_attachment_streams(self) -> Iterator[InputStream]: for input_ in self.inputs: yield from input_.attachment_streams - def iter_data_streams(self) -> Iterator: + def iter_data_streams(self) -> Iterator[InputStream]: for input_ in self.inputs: yield from input_.data_streams - def iter_streams(self) -> Iterator: + def iter_streams(self) -> Iterator[InputStream]: for input_ in self.inputs: yield from input_.streams @@ -833,9 +840,12 @@ class Task: for input_ in self.inputs: yield from input_.chapters + # -- Outputs + # outputs: Sequence[OutputFile] + # -- FFmpeg @staticmethod - def argv_options(options: Mapping, qualifier: str=None) -> Iterator: + def argv_options(options: Mapping, qualifier: str=None) -> Iterator[str]: """ Yield arbitrary options :type options: Mapping[str, str] @@ -856,7 +866,7 @@ class Task: yield str(value) @staticmethod - def argv_metadata(metadata: Mapping, qualifier: str=None) -> Iterator: + def argv_metadata(metadata: Mapping, qualifier: str=None) -> Iterator[str]: """ Yield arbitrary metadata :type metadata: Mapping[str, str] @@ -870,7 +880,7 @@ class Task: yield opt yield "%s=%s" % meta - def generate_args(self) -> Iterator: + def generate_args(self) -> Iterator[str]: """ Generate the ffmpeg commandline for this task :rtype: Iterator[str] @@ -917,7 +927,7 @@ class Task: out_fn = output.filename yield out_fn if out_fn[0] != "-" else "./" + out_fn - def commit(self, additional_args: Iterable=()): + def commit(self, additional_args: Iterable=(), immediate=True, **args): """ Commit the changes. @@ -926,7 +936,122 @@ class Task: :type additional_args: Iterable[str] :raises: AdvancedAVError when FFmpeg fails """ - self.pp.call_conv(itertools.chain(additional_args, self.generate_args())) + f = self.pp.commit_task(self, add_ffmpeg_args=additional_args, immediate=immediate, **args) + if f.finished: + if f.exception: + raise f.exception + elif immediate: + raise RuntimeError("Requested immediate commit but result was deferred") + + def commit2(self, **args) -> Future: + """ + Commit the changes. + + add_ffmpeg_args can be used to pass global arguments to ffmpeg. (like -y) + + :type additional_args: Iterable[str] + :returns: a Future + """ + return self.pp.commit_task(self, **args) + + # -- Managing the task + def split(self, pieces=0) -> Sequence["PartialTask"]: + """ + Split a task into min(pieces, len(outputs)) partial tasks + """ + parts = [] + + if pieces > 0: + for i in range(min(len(self.outputs), pieces)): + parts.append([]) + + for i, output in enumerate(self.outputs): + parts[i % pieces].append(output) + + else: + parts = [[output] for output in self.outputs] + + return [PartialTask(self, outset) for outset in parts] + + +class PartialTask(BaseTask): + def __init__(self, parent, outs): + super().__init__(parent.pp) + + self.parent = parent + + self.outputs = outs + + @property + def inputs(self): + return self.parent.inputs + + @property + def inputs_by_name(self): + return self.parent.inputs_by_name + + +class Task(BaseTask): + """ + Holds information about an AV-processing Task. + + A Task is a collection of Input- and Output-Files and related options. + While OutputFiles are bound to one task at a time, InputFiles can be reused across Tasks. + """ + + output_factory = OutputFile + + inputs_by_name = None + + def __init__(self, pp: "AdvancedAV"): + super().__init__(pp) + + self.inputs = [] + """ :type: list[InputFile] """ + self.inputs_by_name = {} + """ :type: dict[str, InputFile] """ + + self.outputs = [] + """ :type: list[OutputFile] """ + + # -- Manage Inputs + def add_input(self, file: "str | InputFile") -> InputFile: + """ Register an input file + + When \param file is already registered as input file to this Task, do nothing. + + :param file: Can be either the filename of an input file or an InputFile object. + The latter will be created if the former is passed. + """ + if isinstance(file, PurePath): # Pathlib support + file = str(file) + if isinstance(file, str): + if file in self.inputs_by_name: + return self.inputs_by_name[file] + + file = self.pp.create_input(file) + + if file not in self.inputs: + self.pp.to_debug("Adding input file #%i: %s", len(self.inputs), file.name) + self.inputs.append(file) + self.inputs_by_name[file.filename] = file + + return file + + # -- Manage Outputs + def add_output(self, filename: str, container: str=DEFAULT_CONTAINER, options: Mapping=None) -> OutputFile: + """ Add an output file + + NOTE: Contrary to add_input this will NOT take an OutputFile instance and return it. + """ + for outfile in self.outputs: + if outfile.filename == filename: + raise AdvancedAVError("Output File '%s' already added." % filename) + else: + outfile = self.output_factory(self, filename, container, options) + self.pp.to_debug("New output file #%i: %s", len(self.outputs), filename) + self.outputs.append(outfile) + return outfile class SimpleTask(Task): @@ -980,27 +1105,6 @@ class AdvancedAV(metaclass=ABCMeta): def to_debug(self, text, *fmt): self.get_logger().debug(text % fmt) - # ---- FFmpeg ---- - @abstractmethod - def call_conv(self, args: Iterable): - """ - Call ffmpeg. - :param args: Iterable[str] The ffprobe arguments - It should throw an AdvancedAVError if the call fails - """ - pass - - @abstractmethod - def call_probe(self, args: Iterable) -> str: - """ - Call ffprobe. - :param args: Iterable[str] The ffprobe arguments - :return: str the standard output - It should throw an AdvancedAVError if the call fails - NOTE: Make sure the locale is set to C so the regexes match - """ - pass - # ---- Create Tasks ---- def create_task(self) -> Task: """ @@ -1018,6 +1122,29 @@ class AdvancedAV(metaclass=ABCMeta): """ return SimpleTask(self, filename, container, options) + # ---- Process Tasks ---- + @abstractmethod + def commit_task(self, task: Task, *, add_ffmpeg_args: Sequence[str]=None, immediate: bool=False) -> Future: + """ + Execute a task + + :param add_ffmpeg_args: List[str] arguments to add to ffmpeg call, if ffmpeg is used + :param immediate: Request that the task is executed synchronously + :return: A simple (possibly finished) future object describing the result + """ + + # ---- Analyze Files ---- + @abstractmethod + def probe_file(self, path, *, ffprobe_args_hint: Sequence[str]=None) -> Mapping[str, object]: + """ + Analyze a media file + + :param path: The file path + :param ffprobe_args_hint: A hint as to which arguments would need to be passed to ffprobe to + supply all needed information + :return: The media information, in parsed ffmpeg JSON format + """ + # ---- Create InputFiles ---- def create_input(self, filename: str, options=None): """ @@ -1056,23 +1183,29 @@ class SimpleAV(AdvancedAV): _posix_env = dict(os.environ) _posix_env["LANG"] = _posix_env["LC_ALL"] = "C" - def call_conv(self, args: Iterable): - """ Actually call ffmpeg + def make_conv_argv(self, task, add_ffmpeg_args): + return tuple(itertools.chain((self._ffmpeg,), self.global_args, self.global_conv_args, + add_ffmpeg_args, task.generate_args())) - :type args: Iterable[str] - """ - argv = tuple(itertools.chain((self._ffmpeg,), self.global_args, self.global_conv_args, args)) + def commit_task(self, task, *, add_ffmpeg_args=(), immediate=True): + with Future() as f: + argv = self.make_conv_argv(task, add_ffmpeg_args) - self.to_debug("Running Command: %s", argv) + self.to_debug("Running Command: %s", argv) - output = None if self.ffmpeg_output else subprocess.DEVNULL + output = None if self.ffmpeg_output else subprocess.DEVNULL - subprocess.call(argv, stdout=output, stderr=output) + subprocess.call(argv, stdout=output, stderr=output) - def call_probe(self, args: Iterable): - """ Call ffprobe (With LANG=LC_ALL=C) + return f() - :type args: Iterable[str] + def call_probe(self, args: Iterable): + """ + Call ffprobe. + :param args: Iterable[str] The ffprobe arguments + :return: str the standard output + It should throw an AdvancedAVError if the call fails + NOTE: Make sure the locale is set to C so the regexes match """ argv = tuple(itertools.chain((self._ffprobe,), self.global_args, self.global_probe_args, args)) @@ -1088,3 +1221,86 @@ class SimpleAV(AdvancedAV): raise AdvancedAVError(msg) return out.decode("utf-8", "replace") + + def probe_file(self, file, *, ffprobe_args_hint=None): + probe = self.call_probe(tuple(BaseTask.argv_options(file.options)) + + ffprobe_args_hint + + ("-i", file.filename)) + return json.loads(probe) + + +class MultiAV(SimpleAV): + def __init__(self, workers=1, ffmpeg=None, ffprobe=None): + super().__init__(ffmpeg=ffmpeg, ffprobe=ffprobe) + + self.concurrent = workers + + self.workers = {} + self.queue = collections.deque() + + # Enqueue + def commit_task(self, task, *, add_ffmpeg_args=(), immediate=False): + if immediate: + return super().commit_task(task, add_ffmpeg_args=add_ffmpeg_args) + else: + f = Future() + self.queue.append((f, task, add_ffmpeg_args)) + return f + + # Process + def process_queue(self): + """ + Process tasks until queue is empty. + Note that the last few tasks may still be running in the background when this returns + """ + from time import sleep + while self.queue: + self.manage_workers() + sleep(.250) + + def manage_workers(self): + """ + Make a single run over available workers and see to it that they have work if available + """ + for id in range(self.concurrent): + if not self.poll_worker(id) and self.queue: + self.workers[id] = self._spawn_next() + + def wait(self): + """ Wait for processing to finish up """ + while self.workers: + for id, (worker, f) in list(self.workers.items()): + worker.wait() + self.poll_worker(id) + + def process_serial(self): + """ Process the queue one task at a time """ + while self.queue: + p, f = self.spawn_next() + with f: + p.wait() + p.complete() + + def poll_worker(self, id): + """ See if a worker is still running and clean it up otherwise """ + if id in self.workers: + worker, future = self.workers[id] + + if worker.poll() is not None: + if worker.returncode != 0: + future.fail(AdvancedAVError("ffmpeg returned %d" % worker.returncode)) + else: + future.complete() + del self.workers[id] + else: + return True + return False + + def _spawn_next(self, **b): + """ Spawn next worker """ + f, task, add_ffmpeg_args = self.queue.popleft() + + argv = self.make_conv_argv(task, add_ffmpeg_args) + self.to_debug("Running: %s" % (argv,)) + + return subprocess.Popen(self.make_conv_argv(task, add_ffmpeg_args), **b), f