#!/usr/bin/env python3 """ xconv ffpmeg wrapper based on AdvancedAV ----------------------------------------------------------- AdvancedAV helps with constructing FFmpeg commandline arguments. It can automatically parse input files with the help of FFmpeg's ffprobe tool (WiP) and allows programatically mapping streams to output files and setting metadata on them. ----------------------------------------------------------- Copyright (c) 2015-2016 Taeyeon Mori This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . """ from advancedav import version_info as aav_version_info from argparse import ArgumentParser, Action from abc import ABCMeta, abstractmethod from itertools import chain from functools import wraps from os.path import isdir, join as build_path, basename, dirname, splitext, exists, abspath from os import environ, makedirs, mkdir version_info = 0, 1, 3 # == Misc. helpers == def __update(f, name, update): if hasattr(f, name): getattr(f, name).update(update); else: setattr(f, name, update) def __defaults(obj, **defs): for k, v in defs.items(): if not hasattr(obj, k): setattr(obj, k, v) # == Profile Decorators == profiles = {} def profile(f): """ Define a XConv Profile Note: Should be outermost decorator """ __defaults(f, description=None, container=None, ext=None, defines={}, features={}) profiles[f.__name__] = f return f def description(desc): """ Add a profile description """ def apply(f): f.description = desc return f return apply def output(container=None, ext=None): """ Add output file information """ def apply(f): if container: f.container = container f.ext = "mkv" if container == "matroska" else container if ext: f.ext = ext return f return apply def defines(**defs): """ Document supported defines with description """ def apply(f): __update(f, "defines", defs) return f return apply def features(**features): """ Set opaque feature flags """ def apply(f): __update(f, "features", features) return f return apply def singleaudio(profile): """ Operate on a single audio stream (The first one found) The stream will be passed to the decorated function in the "stream" keyword """ @wraps(profile) def wrapper(task, **kwds): try: audio_stream = next(task.iter_audio_streams()) except StopIteration: print("No audio track in '%s'" % "', '".join(map(lambda x: x.name, task.inputs))) return False return profile(task, stream=audio_stream, **kwds) __update(wrapper, "features", {"singleaudio": None}) return wrapper # == Extend AAV == import advancedav class OutputFile(advancedav.OutputFile): def change_format(self, format=None, ext=None): # Diverge from decorated format. # Watch out for args.genout!! if format: self.container = format if ext: self.name = splitext(self.name)[0] + "." + ext class SimpleTask(advancedav.SimpleTask): output_factory = OutputFile # == Profile definitions == @profile @description("First Video H.264 Main fastdecode animation, max 1280x800; Audio AAC; Keep subtitles") @output(container="matroska", ext="mkv") def laptop(task): # add first video stream for s in task.iter_video_streams(): (task.map_stream(s) .set(codec="libx264", tune=("fastdecode", "animation"), profile="main", preset="fast") .downscale(1280, 800)) break # Add all audio streams (reencode to aac if necessary) for s in task.iter_audio_streams(): os = task.map_stream(s) if s.codec != "aac": os.set(codec="aac") # add all subtitle and attachment streams for s in chain(task.iter_subtitle_streams(), task.iter_attachment_streams()): task.map_stream(s) # go return True @profile @description("Save the first audio track as FLAC.") @output(ext="flac") @singleaudio def flac(task, stream): if stream.codec == "ape": stream.file.set(max_samples="all") # Monkey's insane preset is insane. (task.map_stream(stream) .set(codec="flac", compression_level="10")) return True @profile @description("Encode Opus Audio") @output(ext="mka", container="matroska") @defines(ogg="Use Ogg/Opus output container", bitrate="Target bitrate (Default 96k)") @features(argshax=None) @singleaudio def opus(task, stream, defines, args): os = (task.map_stream(stream) .set(codec="libopus", vbr="on") # Defines .apply(defines, bitrate="b")) # Output format if "ogg" in defines: task.change_format("ogg", "opus" if args.genout else None) return True @profile @description("Encode Opus Audiobook") @output(container="ogg", ext="ogg") @defines(stereo="Keep stereo channels") @singleaudio def audiobook(task, stream, defines): out = (task.map_stream(stream) .set(codec="libopus", vbr="on", b="32k", ac="1", application="voip")) if "stereo" in defines: out.set(ac="2", b="36k") return True @profile @description("Copy all streams to a new container") @defines(format="Container format", fext="File extension") @features(argshax=None) def remux(task, defines, args): task.change_format( defines["format"] if "format" in defines else None, defines["fext"] if "fext" in defines and args.genout else None) return all(task.map_stream(s) for s in task.iter_streams()) # == Support code == class ProfilesAction(Action): def __call__(self, parser, *a, **b): print("Available Profiles:") for name, profile in sorted(profiles.items()): print(" %-20s %s" % (name, profile.description if profile.description else "")) parser.exit() class ProfileInfoAction(Action): def __call__(self, parser, args, *a, **b): if not args.profile: print("-i must come after -p") parser.exit() # todo profile = profiles[args.profile] print("Profile '%s':" % args.profile) if profile.description: print(" Description: %s" % profile.description) output_info = [] if profile.container: output_info.append("Format: %s" % profile.container) if profile.ext: output_info.append("File extension: %s" % profile.ext) if output_info: print(" Output: %s" % "; ".join(output_info)) if profile.features: print(" Flags: %s" % ", ".join("%s(%r)" % (k, v) if v is not None else k for k, v in profile.features.items())) if profile.defines: print(" Supported defines:") for define in sorted(profile.defines.items()): print(" %s: %s" % define) parser.exit() class DefineAction(Action): def __init__(self, option_strings, dest, nargs=None, default=None, **kwargs): super().__init__(option_strings, dest, nargs=1, default=default or {}, **kwargs) def __call__(self, parser, namespace, values, option_string=None): value = values[0] dest = getattr(namespace, self.dest) if "=" in value: k, v = value.split("=") dest[k] = v else: dest[value] = True def parse_args(argv): parser = ArgumentParser(prog=argv[0]) parser.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true") parser.add_argument("-q", "--quiet", help="Be less verbose", action="store_true") files = parser.add_argument_group("Files") files.add_argument("inputs", nargs="+", help="The input file(s)") files.add_argument("output", help="The output filename or directory") multimode = files.add_mutually_exclusive_group() multimode.add_argument("-m", "--merge", help="Merge streams from all inputs instead of mapping each input to an output", action="store_true") multimode.add_argument("-C", "--concat", help="Concatenate streams from inputs instead of mapping", action="store_true") files.add_argument("-u", "--update", help="Only work on files that don't already exist", action="store_true") files.add_argument("-c", "--create-parents", help="Create containing folders if they don't exist", action="store_true") files.add_argument("-cc", "--create-folder", help="Create the output folder if it doesn't exist", action="store_true") profile = parser.add_argument_group("Profile") profile.add_argument("-p", "--profile", choices=profiles.keys(), required=True, help="Specify the profile. See the source code.") profile.add_argument("-l", "--list-profiles", action=ProfilesAction, nargs=0, help="List profiles and exit") profile.add_argument("-i", "--profile-info", action=ProfileInfoAction, nargs=0, help="Give info about a profile") profile.add_argument("-D", "--define", help="Define an option to be used by the profile", action=DefineAction, metavar="NAME[=VALUE]") progs = parser.add_argument_group("Programs") progs.add_argument("--ffmpeg", default="ffmpeg", help="Path to the ffmpeg executable") progs.add_argument("--ffprobe", default="ffprobe", help="Path to the ffprobe executable") return parser.parse_args(argv[1:]) def make_outfile(args, profile, infile): if args.genout: if hasattr(profile, "ext"): return build_path(args.output, ".".join((splitext(basename(infile))[0], profile.ext if profile.ext else "bin"))) else: return build_path(args.output, basename(infile)) else: return args.output def main(argv): import logging # Parse commandline args = parse_args(argv) logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) profile = profiles[args.profile] if (args.create_parents or args.create_folder) and not isdir(dirname(args.output)): makedirs(dirname(args.output)) if args.create_folder and not isdir(args.output): mkdir(args.output) args.genout = isdir(args.output) print("\033[36mXConv %s (AdvancedAV %s) (c) Taeyeon Mori\033[0m" % (".".join(map(str, version_info)), ".".join(map(str, aav_version_info)))) print("\033[34mProfile: %s\033[0m" % args.profile) # Initialize AAV aav = advancedav.SimpleAV(ffmpeg=args.ffmpeg, ffprobe=args.ffprobe) if args.quiet: aav.global_conv_args = "-loglevel", "warning" aav.global_args += "-hide_banner", # Collect Tasks tasks = [] print("\033[35mCollecting Tasks..\033[0m") if args.merge: task = SimpleTask(aav, make_outfile(args, profile, args.inputs[0]), profile.container) for input in args.inputs: task.add_input(input) tasks.append(task) elif args.concat: import tempfile, os tmp = tempfile.NamedTemporaryFile(mode="w", delete=False) with tmp: tmp.write("ffconcat version 1.0\n") tmp.write("# XConv concat file\n") for f in map(abspath, args.inputs): print("\033[36m Concatenating %s\033[0m" % basename(f)) tmp.write("file '%s'\n" % f) task = SimpleTask(aav, make_outfile(args, profile, args.inputs[0]), profile.container) options(task.add_input(tmp.name), f="concat", safe="0") tasks.append(task) else: if not args.genout and len(args.inputs) > 1: print("\033[31mOutput path '%s' is not a directory.\033[0m" % args.output) return -1 for input in args.inputs: out = make_outfile(args, profile, input) if args.update and exists(out): continue task = SimpleTask(aav, out, profile.container) task.add_input(input) tasks.append(task) print("\033[35mPreparing Tasks..\033[0m") # Prepare profile parameters pkw = {} if profile.defines: pkw["defines"] = args.define if profile.features: if "argshax" in profile.features: pkw["args"] = args # Apply profile for task in tasks: print("\033[32m Applying profile for '%s'\033[0m" % basename(task.name), end="\033[K\r") res = profile(task, **pkw) if not res: print("\033[31m Failed to apply profile for '%s'\033[0m\033[K" % basename(task.name)) return 1 print("\033[35mExecuting Tasks..\033[0m\033[K") # Commit for task in tasks: print("\033[32m Processing '%s'\033[0m" % basename(task.name)) task.commit() # Clean up if args.concat: os.unlink(tmp.name) return 0 if __name__ == "__main__": import sys sys.exit(main(sys.argv))