advancedav: Work towards 3.0: Actually use ffprobe properly

master
Taeyeon Mori 7 years ago
parent 6af03b925d
commit 8d5bbcbd03
  1. 492
      lib/python/advancedav.py

@ -1,5 +1,5 @@
"""
AdvancedAV FFmpeg commandline generator v2.0 [Library Edition]
AdvancedAV FFmpeg commandline generator v3.0 [Library Edition]
-----------------------------------------------------------
AdvancedAV helps with constructing FFmpeg commandline arguments.
@ -23,23 +23,21 @@ AdvancedAV FFmpeg commandline generator v2.0 [Library Edition]
"""
import os
import re
import sys
import json
import logging
import subprocess
import collections
import itertools
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable, Mapping, Sequence, Iterator, MutableMapping
try:
from pathlib import Path, PurePath
except ImportError:
pass
from typing import Iterable, Mapping, Sequence, Iterator, MutableMapping
from pathlib import Path, PurePath
__all__ = "AdvancedAVError", "AdvancedAV", "SimpleAV"
version_info = 2, 1, 1
version_info = 2, 99, 0
# Constants
DEFAULT_CONTAINER = "matroska"
@ -57,8 +55,40 @@ class AdvancedAVError(Exception):
pass
# == Helpers ==
def ffmpeg_int(no: str) -> int:
if isinstance(no, str):
factor = 1
base = 1000
if no[-1].lower() == "b":
factor *= 8
no = no[:-1]
if no[-1].lower() == "i":
base = 1024
no = no[:-1]
if not no[-1].isdigit():
factor *= base ** (["k", "m", "g"].index(no[-1].lower()) + 1)
no = no[:-1]
return int(no) * factor
return int(no)
# == Base Classes ==
class ObjectWithOptions:
"""
Options refer to ffmpeg commandline arguments, referring to a specific Task, File or Stream.
Option values can be:
- str: pass value
- int: convert to string, pass value
- list, tuple: pass option multiple times, with different values
- True: pass option without value
- None: pass option without value (deprecated)
For option names, refer to the FFmpeg documentation.
Subclasses must provide an 'options' slot.
"""
__slots__ = ()
def __init__(self, *, options=None, **more):
@ -66,6 +96,14 @@ class ObjectWithOptions:
self.options = options or {}
def apply(self, source, *names, **omap):
"""
Selectively apply options from a dictionary.
Option names passed as strings will be applied as-is,
option names passed as keyword arguments will be applied as though they were named the argument's value
:return: self, for method chaining
"""
for name in names:
if name in source:
self.options[name] = source[name]
@ -76,6 +114,13 @@ class ObjectWithOptions:
return self
def set(self, **options):
"""
Set options on this object
Applies all keyword arguments as options
:return: self, for method chaining
"""
self.options.update(options)
return self
@ -102,6 +147,89 @@ class ObjectWithMetadata:
return self
# == Descriptors ==
class DescriptorBase:
__slots__ = "owner", "name"
def __init__(self, default_name="(Name Unknown)"):
self.owner = None
self.name = default_name
def __set_name__(self, owner, name):
self.owner = owner
self.name = name
repr_info = ""
def __repr__(self):
return "<%s %s of %s%s>" % (type(self).__name__, self.name, self.owner, self.repr_info)
class InformationProperty(DescriptorBase):
"""
A read-only property referring ffprobe information
"""
__slots__ = "path", "type"
def __init__(self, *path, type=lambda x: x):
super().__init__()
self.path = path
self.type = type
@property
def repr_info(self):
return " referring to %s" % self.path
def __get__(self, object, obj_type=None):
info = object.information
try:
for seg in self.path:
info = info[seg]
except (KeyError, IndexError):
return None
else:
return self.type(info)
class OptionProperty(DescriptorBase):
"""
A read-write descriptor referring to ffmpeg options
Unset options will return None,
setting an option to None will unset it.
Note: This differs from deprecated behaviour when setting options directly,
which will cause the option to be passed without arguments.
"""
__slots__ = "candidates", "type"
def __init__(self, *candidates, type=lambda x: x):
super().__init__()
self.candidates = candidates
self.type = type
@property
def repr_info(self):
return " referencing option %s" % self.candidates[0]
def __get__(self, object, obj_type=None):
for candidate in self.candidates:
if candidate in object.options:
return self.type(object.options[candidate])
else:
return None
def __set__(self, object, value):
for candidate in self.candidates:
if candidate in object.options:
del object.options[candidate]
if value is not None:
object.options[self.candidates[0]] = value
def __delete__(self, object):
self.__set__(object, None)
# === Stream Classes ===
class Stream:
"""
@ -109,23 +237,23 @@ class Stream:
One continuous stream of data muxed into a container format
"""
__slots__ = "file", "type", "index", "pertype_index", "codec", "profile"
__slots__ = "file",
def __init__(self, file: "File", type: str, index: int=None, pertype_index: int=None,
codec: str=None, profile: str=None, **more):
def __init__(self, file: "File", **more):
super().__init__(**more)
self.file = file
self.type = type
self.index = index
self.pertype_index = pertype_index
self.codec = codec
self.profile = profile
def _update_indices(self, index: int, pertype_index: int=None):
""" Update the Stream indices """
self.index = index
if pertype_index is not None:
self.pertype_index = pertype_index
@property
def index(self):
return 0
@property
def pertype_index(self):
return None
@property
def type(self):
return S_UNKNOWN
@property
def stream_spec(self):
@ -135,47 +263,108 @@ class Stream:
else:
return str(self.index)
def __str__(self):
return "<%s %s#%i: %s %s (%s)>" % (type(self).__name__, self.file.name, self.index,
self.type, self.codec, self.profile)
def __repr__(self):
return "<%s \"%s\"#%i (%s#%i)>" % (type(self).__name__, self.file.name, self.index, self.type, self.pertype_index)
# Input Streams
class InputStream(Stream):
"""
Holds information about an input stream
"""
__slots__ = "language"
__slots__ = "information", "pertype_index"
def __init__(self, file: "InputFile", type_: str, index: int, language: str, codec: str, profile: str):
super().__init__(file, type_, index, codec=codec, profile=profile)
self.file = file
self.language = language
def __init__(self, file: "InputFile", info: dict, pertype_index: int=None):
super().__init__(file)
def _update_indices(self, index: int, pertype_index: int=None):
""" InputStreams should not have their indices changed. """
if index != self.index:
raise ValueError("Cannot update indices on InputStreams! (This might mean there are bogus ids in the input")
# pertype_index gets updated by File._add_stream() so we don't throw up if it gets updated
self.information = info
self.pertype_index = pertype_index
@property
def type(self):
return self.information["codec_type"][0]
index = InformationProperty("index", type=int)
codec = InformationProperty("codec_name")
codec_name = InformationProperty("codec_long_name")
profile = InformationProperty("profile")
duration = InformationProperty("duration", type=float)
duration_ts = InformationProperty("duration_ts", type=int)
start_time = InformationProperty("start_time")
bitrate = InformationProperty("bit_rate", type=int)
max_bitrate = InformationProperty("max_bit_rate", type=int)
nb_frames = InformationProperty("nb_frames", type=int)
@property
def disposition(self):
try:
return tuple(k for k, v in self.information["disposition"].items() if v)
except KeyError:
return ()
language = InformationProperty("tags", "language")
class InputAudioStream(InputStream):
__slots__ = ()
def __init__(self, file: "InputFile", info: dict, pertype_index: int=None):
if info["codec_type"][0] != S_AUDIO:
raise ValueError("Cannot create %s from stream info of type %s" % (type(self).__name__, info["codec_type"]))
super().__init__(file, info)
@property
def type(self):
return S_AUDIO
sample_format = InformationProperty("sample_format")
sample_rate = InformationProperty("sample_rate", type=int)
channels = InformationProperty("channels", type=int)
channel_layout = InformationProperty("channel_layout")
def input_stream_factory(file, info, pertype_index=None):
return {
"audio": InputAudioStream,
}.get(info["codec_type"], InputStream)(file, info, pertype_index)
# Output Streams
class OutputStream(Stream, ObjectWithOptions, ObjectWithMetadata):
"""
Holds information about a mapped output stream
"""
__slots__ = "source", "options", "metadata"
__slots__ = "index", "pertype_index", "source", "options", "metadata"
# TODO: support other parameters like frame resolution
# Override polymorphic types
#file = None
""" :type: OutputFile """
def __init__(self, file: "OutputFile", source: InputStream, stream_id: int, stream_pertype_id: int=None,
codec: str=None, options: Mapping=None, metadata: MutableMapping=None):
super().__init__(file=file, type=source.type, index=stream_id, pertype_index=stream_pertype_id,
codec=codec, options=options, metadata=metadata)
options: Mapping=None, metadata: MutableMapping=None):
super().__init__(file=file, options=options, metadata=metadata)
self.index = stream_id
self.pertype_index = stream_pertype_id
self.source = source
@property
def type(self):
return self.source.type
def _update_indices(self, index: int, pertype_index: int=None):
""" Update the Stream indices """
self.index = index
if pertype_index is not None:
self.pertype_index = pertype_index
codec = OptionProperty("codec", "c")
bitrate = OptionProperty("b", type=ffmpeg_int)
class OutputVideoStream(OutputStream):
def downscale(self, width, height):
@ -185,7 +374,9 @@ class OutputVideoStream(OutputStream):
def output_stream_factory(file, source, *args, **more):
return (OutputVideoStream if source.type == S_VIDEO else OutputStream)(file, source, *args, **more)
return {
S_VIDEO: OutputVideoStream,
}.get(source.type, OutputStream)(file, source, *args, **more)
# === File Classes ===
@ -193,20 +384,12 @@ class File(ObjectWithOptions):
"""
ABC for Input- and Output-Files
"""
__slots__ = "name", "_streams", "_streams_by_type", "options", "path"
__slots__ = "_streams", "_streams_by_type", "options", "path"
def __init__(self, name: str, options: dict=None, **more):
def __init__(self, path: Path, options: dict=None, **more):
super().__init__(options=options, **more)
if Path: # Need to
self.path = Path(name)
self.name = str(name)
else:
self.name = name
self.options = options if options is not None else {}
""" :type: dict[str, str] """
self.path = Path(path)
self._streams = []
""" :type: list[Stream] """
@ -214,6 +397,31 @@ class File(ObjectWithOptions):
self._streams_by_type = collections.defaultdict(list)
""" :type: dict[str, list[Stream]] """
# Filename
@property
def name(self):
"""
The file's name
Changed in 3.0: previously, full path
"""
return self.path.name
@name.setter
def name(self, value):
self.path = self.path.with_name(value)
@property
def filename(self):
"""
The file's full path as string
"""
return str(self.path)
@filename.setter
def filename(self, value):
self.path = Path(value)
# Streams
def _add_stream(self, stream: Stream):
""" Add a stream """
stream._update_indices(len(self._streams), len(self._streams_by_type[stream.type]))
@ -268,13 +476,26 @@ class File(ObjectWithOptions):
"""
return self._streams_by_type[S_DATA]
@property
def filename(self) -> str:
""" Alias for .name """
return self.name
def __repr__(self):
return "<%s \"%s\">" % (type(self).__name__, self.name)
class InputFileChapter:
__slots__ = "file", "information"
def __init__(self, file, info):
self.file = file
self.information = info
def __repr__(self):
return "<InputFileChapter #%i of %s from %.0fs to %.0fs (%s)>" \
% (self.index, self.file, self.start_time, self.end_time, self.title)
start_time = InformationProperty("start_time", type=float)
end_time = InformationProperty("end_time", type=float)
def __str__(self):
return "<%s %s>" % (type(self).__name__, self.name)
index = InformationProperty("id", type=int)
title = InformationProperty("tags", "title")
class InputFile(File):
@ -284,65 +505,50 @@ class InputFile(File):
:note: Modifying the options after accessing the streams results in undefined
behaviour! (Currently: Changes will only apply to conv call)
"""
__slots__ = "pp", "_streams_initialized"
__slots__ = "pp", "_information"
stream_factory = InputStream
stream_factory = staticmethod(input_stream_factory)
def __init__(self, pp: "AdvancedAV", filename: str, options: Mapping=None):
super().__init__(name=filename, options=dict(options.items()) if options else None)
def __init__(self, pp: "AdvancedAV", path: str, options: Mapping=None, info=None):
super().__init__(path, options=dict(options.items()) if options else None)
self.pp = pp
self._information = info
self._streams_initialized = False
@property
def information(self):
if self._information is None:
self._initialize_info()
return self._information
# -- Probe streams
_reg_probe_streams = re.compile(
r"Stream #0:(?P<id>\d+)(?:\((?P<lang>[^\)]+)\))?:\s+(?P<type>\w+):\s+(?P<codec>[\w_\d]+)"
r"(?:\s+\((?P<profile>[^\)]+)\))?(?:\s+(?P<extra>.+))?"
)
# -- Initialize
ffprobe_args = "-show_format", "-show_streams", "-show_chapters", "-print_format", "json"
@staticmethod
def _stream_type(type_: str) -> str:
""" Convert the ff-/avprobe type output to the notation used on the ffmpeg/avconv commandline """
lookup = {
"Audio": S_AUDIO,
"Video": S_VIDEO,
"Subtitle": S_SUBTITLE,
"Attachment": S_ATTACHMENT,
"Data": S_DATA
}
return lookup.get(type_, S_UNKNOWN)
def _initialize_streams(self, probe: str=None) -> Iterator:
def _initialize_info(self):
probe = self.pp.call_probe(tuple(Task.argv_options(self.options))
+ self.ffprobe_args
+ ("-i", self.filename))
self._information = json.loads(probe)
def _initialize_streams(self):
""" Parse the ffprobe output
The locale of the probe output in \param probe should be C!
:rtype: Iterator[InputStream]
"""
if probe is None:
if self.options:
probe = self.pp.call_probe(itertools.chain(Task.argv_options(self.options), ("-i", self.name)))
else:
probe = self.pp.call_probe(("-i", self.name))
for match in self._reg_probe_streams.finditer(probe):
self._add_stream(self.stream_factory(self,
self._stream_type(match.group("type")),
int(match.group("id")),
match.group("lang"),
match.group("codec"),
match.group("profile")))
self._streams_initialized = True
for sinfo in self.information["streams"]:
stype = sinfo["codec_type"][0]
stream = self.stream_factory(self, sinfo, len(self._streams_by_type[stype]))
self._streams.append(stream)
self._streams_by_type[stype].append(stream)
# -- Streams
@property
def streams(self) -> Sequence:
""" Collect the available streams
:rtype: Sequence[InputStream]
"""
if not self._streams_initialized:
if not self._streams:
self._initialize_streams()
return self._streams
@ -352,7 +558,7 @@ class InputFile(File):
:rtype: Sequence[InputStream]
"""
if not self._streams_initialized:
if not self._streams:
self._initialize_streams()
return self._streams_by_type[S_VIDEO]
@ -362,7 +568,7 @@ class InputFile(File):
:rtype: Sequence[InputStream]
"""
if not self._streams_initialized:
if not self._streams:
self._initialize_streams()
return self._streams_by_type[S_AUDIO]
@ -372,7 +578,7 @@ class InputFile(File):
:rtype: Sequence[InputStream]
"""
if not self._streams_initialized:
if not self._streams:
self._initialize_streams()
return self._streams_by_type[S_SUBTITLE]
@ -382,7 +588,7 @@ class InputFile(File):
:rtype: Sequence[InputStream]
"""
if not self._streams_initialized:
if not self._streams:
self._initialize_streams()
return self._streams_by_type[S_ATTACHMENT]
@ -392,10 +598,30 @@ class InputFile(File):
:rtype: Sequence[InputStream]
"""
if not self._streams_initialized:
if not self._streams:
self._initialize_streams()
return self._streams_by_type[S_DATA]
# Information
nb_streams = InformationProperty("format", "nb_streams", type=int)
duration = InformationProperty("format", "duration", type=float)
size = InformationProperty("format", "size", type=int)
bitrate = InformationProperty("format", "bit_rate", type=int)
# Metadata
metadata = InformationProperty("format", "tags")
title = InformationProperty("format", "tags", "title")
artist = InformationProperty("format", "tags", "artist")
album = InformationProperty("format", "tags", "album")
# Chapters
@property
def chapters(self) -> Sequence[InputFileChapter]:
return list(InputFileChapter(self, i) for i in self.information["chapters"])
class OutputFile(File, ObjectWithMetadata):
"""
@ -539,7 +765,7 @@ class Task:
:param file: Can be either the filename of an input file or an InputFile object.
The latter will be created if the former is passed.
"""
if PurePath and isinstance(file, PurePath): # Pathlib support
if isinstance(file, PurePath): # Pathlib support
file = str(file)
if isinstance(file, str):
if file in self.inputs_by_name:
@ -550,7 +776,7 @@ class Task:
if file not in self.inputs:
self.pp.to_debug("Adding input file #%i: %s", len(self.inputs), file.name)
self.inputs.append(file)
self.inputs_by_name[file.name] = file
self.inputs_by_name[file.filename] = file
return file
@ -570,7 +796,7 @@ class Task:
NOTE: Contrary to add_input this will NOT take an OutputFile instance and return it.
"""
for outfile in self.outputs:
if outfile.name == filename:
if outfile.filename == filename:
raise AdvancedAVError("Output File '%s' already added." % filename)
else:
outfile = self.output_factory(self, filename, container, options)
@ -603,6 +829,10 @@ class Task:
for input_ in self.inputs:
yield from input_.streams
def iter_chapters(self) -> Iterator[InputFileChapter]:
for input_ in self.inputs:
yield from input_.chapters
# -- FFmpeg
@staticmethod
def argv_options(options: Mapping, qualifier: str=None) -> Iterator:
@ -618,12 +848,12 @@ class Task:
for option, value in options.items():
yield opt_fmt % option
if isinstance(value, (tuple, list)):
yield value[0]
yield str(value[0])
for x in value[1:]:
yield opt_fmt % option
yield x
elif value is not None:
yield value
yield str(x)
elif value is not True and value is not None:
yield str(value)
@staticmethod
def argv_metadata(metadata: Mapping, qualifier: str=None) -> Iterator:
@ -652,7 +882,7 @@ class Task:
# Add Input
yield "-i"
filename = input_.name
filename = input_.filename
if filename[0] == '-':
yield "./" + filename
else:
@ -684,7 +914,7 @@ class Task:
yield output.container
# Output Filename, prevent it from being interpreted as option
out_fn = output.name
out_fn = output.filename
yield out_fn if out_fn[0] != "-" else "./" + out_fn
def commit(self, additional_args: Iterable=()):
@ -726,7 +956,8 @@ class SimpleTask(Task):
container = _redir("output", "container")
metadata = _redir("output", "metadata")
options = _redir("output", "options")
name = _redir("output", "name")
name = _redir("output", "name") # Deprecated! use filename instead. 'name' will be reused in the future
filename = _redir("output", "name")
del _redir
@ -737,14 +968,17 @@ class AdvancedAV(metaclass=ABCMeta):
# ---- Output ----
@abstractmethod
def to_screen(self, text: str, *fmt):
""" Log messages to the user """
def get_logger(self):
"""
Get a stdlib logger to output to
"""
pass
@abstractmethod
def to_debug(self, text: str, *fmt):
""" Process verbose messages """
pass
def to_screen(self, text, *fmt):
self.get_logger().log(text % fmt)
def to_debug(self, text, *fmt):
self.get_logger().debug(text % fmt)
# ---- FFmpeg ----
@abstractmethod
@ -793,7 +1027,7 @@ class AdvancedAV(metaclass=ABCMeta):
:return: A InputFile instance
NOTE that Task.add_input is usually the preferred way to create inputs
"""
return self.input_factory(pp=self, filename=filename, options=options)
return self.input_factory(self, filename, options=options)
class SimpleAV(AdvancedAV):
@ -808,7 +1042,6 @@ class SimpleAV(AdvancedAV):
def __init__(self, *, ffmpeg="ffmpeg", ffprobe="ffprobe", logger=None, ffmpeg_output=True):
if logger is None:
import logging
self.logger = logging.getLogger("advancedav.SimpleAV")
else:
self.logger = logger
@ -817,11 +1050,8 @@ class SimpleAV(AdvancedAV):
self.ffmpeg_output = ffmpeg_output
self.logger.debug("SimpleAV initialized.")
def to_screen(self, text, *fmt):
self.logger.log(text % fmt)
def to_debug(self, text, *fmt):
self.logger.debug(text % fmt)
def get_logger(self):
return self.logger
_posix_env = dict(os.environ)
_posix_env["LANG"] = _posix_env["LC_ALL"] = "C"
@ -857,4 +1087,4 @@ class SimpleAV(AdvancedAV):
msg = err.strip().split('\n')[-1]
raise AdvancedAVError(msg)
return err.decode("utf-8", "replace")
return out.decode("utf-8", "replace")

Loading…
Cancel
Save