xconv: make the executable use the package

master
Taeyeon Mori 7 years ago
parent a1fe0e7401
commit dc883a9415
  1. 399
      bin/xconv

@ -1,13 +1,13 @@
#!/usr/bin/env python3
"""
xconv ffpmeg wrapper based on AdvancedAV
xconv ffmpeg wrapper based on AdvancedAV
-----------------------------------------------------------
AdvancedAV helps with constructing FFmpeg commandline arguments.
It can automatically parse input files with the help of FFmpeg's ffprobe tool (WiP)
and allows programatically mapping streams to output files and setting metadata on them.
-----------------------------------------------------------
Copyright (c) 2015-2016 Taeyeon Mori
Copyright (c) 2015-2017 Taeyeon Mori
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -23,397 +23,8 @@ xconv ffpmeg wrapper based on AdvancedAV
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from advancedav import version_info as aav_version_info
from xconv.app import main
from argparse import ArgumentParser, Action
from abc import ABCMeta, abstractmethod
from itertools import chain
from functools import wraps
from os.path import isdir, join as build_path, basename, dirname, splitext, exists, abspath
from os import environ, makedirs, mkdir
from sys import argv, exit
version_info = 0, 1, 3
# == Misc. helpers ==
def __update(f, name, update):
if hasattr(f, name):
getattr(f, name).update(update);
else:
setattr(f, name, update)
def __defaults(obj, **defs):
for k, v in defs.items():
if not hasattr(obj, k):
setattr(obj, k, v)
# == Profile Decorators ==
profiles = {}
def profile(f):
"""
Define a XConv Profile
Note: Should be outermost decorator
"""
__defaults(f,
description=None,
container=None,
ext=None,
defines={},
features={})
profiles[f.__name__] = f
return f
def description(desc):
""" Add a profile description """
def apply(f):
f.description = desc
return f
return apply
def output(container=None, ext=None):
""" Add output file information """
def apply(f):
if container:
f.container = container
f.ext = "mkv" if container == "matroska" else container
if ext:
f.ext = ext
return f
return apply
def defines(**defs):
""" Document supported defines with description """
def apply(f):
__update(f, "defines", defs)
return f
return apply
def features(**features):
""" Set opaque feature flags """
def apply(f):
__update(f, "features", features)
return f
return apply
def singleaudio(profile):
"""
Operate on a single audio stream (The first one found)
The stream will be passed to the decorated function in the "stream" keyword
"""
@wraps(profile)
def wrapper(task, **kwds):
try:
audio_stream = next(task.iter_audio_streams())
except StopIteration:
print("No audio track in '%s'" % "', '".join(map(lambda x: x.name, task.inputs)))
return False
return profile(task, stream=audio_stream, **kwds)
__update(wrapper, "features", {"singleaudio": None})
return wrapper
# == Extend AAV ==
import advancedav
class OutputFile(advancedav.OutputFile):
def change_format(self, format=None, ext=None):
# Diverge from decorated format.
# Watch out for args.genout!!
if format:
self.container = format
if ext:
self.name = splitext(self.name)[0] + "." + ext
class SimpleTask(advancedav.SimpleTask):
output_factory = OutputFile
# == Profile definitions ==
@profile
@description("First Video H.264 Main fastdecode animation, max 1280x800; Audio AAC; Keep subtitles")
@output(container="matroska", ext="mkv")
def laptop(task):
# add first video stream
for s in task.iter_video_streams():
(task.map_stream(s)
.set(codec="libx264",
tune=("fastdecode", "animation"),
profile="main",
preset="fast")
.downscale(1280, 800))
break
# Add all audio streams (reencode to aac if necessary)
for s in task.iter_audio_streams():
os = task.map_stream(s)
if s.codec != "aac":
os.set(codec="aac")
# add all subtitle and attachment streams
for s in chain(task.iter_subtitle_streams(), task.iter_attachment_streams()):
task.map_stream(s)
# go
return True
@profile
@description("Save the first audio track as FLAC.")
@output(ext="flac")
@singleaudio
def flac(task, stream):
if stream.codec == "ape":
stream.file.set(max_samples="all") # Monkey's insane preset is insane.
(task.map_stream(stream)
.set(codec="flac",
compression_level="10"))
return True
@profile
@description("Encode Opus Audio")
@output(ext="mka", container="matroska")
@defines(ogg="Use Ogg/Opus output container",
bitrate="Target bitrate (Default 96k)")
@features(argshax=None)
@singleaudio
def opus(task, stream, defines, args):
os = (task.map_stream(stream)
.set(codec="libopus",
vbr="on")
# Defines
.apply(defines, bitrate="b"))
# Output format
if "ogg" in defines:
task.change_format("ogg", "opus" if args.genout else None)
return True
@profile
@description("Encode Opus Audiobook")
@output(container="ogg", ext="ogg")
@defines(stereo="Keep stereo channels")
@singleaudio
def audiobook(task, stream, defines):
out = (task.map_stream(stream)
.set(codec="libopus",
vbr="on",
b="32k",
ac="1",
application="voip"))
if "stereo" in defines:
out.set(ac="2",
b="36k")
return True
@profile
@description("Copy all streams to a new container")
@defines(format="Container format",
fext="File extension")
@features(argshax=None)
def remux(task, defines, args):
task.change_format(
defines["format"] if "format" in defines else None,
defines["fext"] if "fext" in defines and args.genout else None)
return all(task.map_stream(s) for s in task.iter_streams())
# == Support code ==
class ProfilesAction(Action):
def __call__(self, parser, *a, **b):
print("Available Profiles:")
for name, profile in sorted(profiles.items()):
print(" %-20s %s" % (name, profile.description if profile.description else ""))
parser.exit()
class ProfileInfoAction(Action):
def __call__(self, parser, args, *a, **b):
if not args.profile:
print("-i must come after -p")
parser.exit() # todo
profile = profiles[args.profile]
print("Profile '%s':" % args.profile)
if profile.description:
print(" Description: %s" % profile.description)
output_info = []
if profile.container:
output_info.append("Format: %s" % profile.container)
if profile.ext:
output_info.append("File extension: %s" % profile.ext)
if output_info:
print(" Output: %s" % "; ".join(output_info))
if profile.features:
print(" Flags: %s" % ", ".join("%s(%r)" % (k, v) if v is not None else k for k, v in profile.features.items()))
if profile.defines:
print(" Supported defines:")
for define in sorted(profile.defines.items()):
print(" %s: %s" % define)
parser.exit()
class DefineAction(Action):
def __init__(self, option_strings, dest, nargs=None, default=None, **kwargs):
super().__init__(option_strings, dest, nargs=1, default=default or {}, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
value = values[0]
dest = getattr(namespace, self.dest)
if "=" in value:
k, v = value.split("=")
dest[k] = v
else:
dest[value] = True
def parse_args(argv):
parser = ArgumentParser(prog=argv[0])
parser.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true")
parser.add_argument("-q", "--quiet", help="Be less verbose", action="store_true")
files = parser.add_argument_group("Files")
files.add_argument("inputs", nargs="+", help="The input file(s)")
files.add_argument("output", help="The output filename or directory")
multimode = files.add_mutually_exclusive_group()
multimode.add_argument("-m", "--merge", help="Merge streams from all inputs instead of mapping each input to an output", action="store_true")
multimode.add_argument("-C", "--concat", help="Concatenate streams from inputs instead of mapping", action="store_true")
files.add_argument("-u", "--update", help="Only work on files that don't already exist", action="store_true")
files.add_argument("-c", "--create-parents", help="Create containing folders if they don't exist", action="store_true")
files.add_argument("-cc", "--create-folder", help="Create the output folder if it doesn't exist", action="store_true")
profile = parser.add_argument_group("Profile")
profile.add_argument("-p", "--profile", choices=profiles.keys(), required=True, help="Specify the profile. See the source code.")
profile.add_argument("-l", "--list-profiles", action=ProfilesAction, nargs=0, help="List profiles and exit")
profile.add_argument("-i", "--profile-info", action=ProfileInfoAction, nargs=0, help="Give info about a profile")
profile.add_argument("-D", "--define", help="Define an option to be used by the profile", action=DefineAction, metavar="NAME[=VALUE]")
progs = parser.add_argument_group("Programs")
progs.add_argument("--ffmpeg", default="ffmpeg", help="Path to the ffmpeg executable")
progs.add_argument("--ffprobe", default="ffprobe", help="Path to the ffprobe executable")
return parser.parse_args(argv[1:])
def make_outfile(args, profile, infile):
if args.genout:
if hasattr(profile, "ext"):
return build_path(args.output, ".".join((splitext(basename(infile))[0], profile.ext if profile.ext else "bin")))
else:
return build_path(args.output, basename(infile))
else:
return args.output
def main(argv):
import logging
# Parse commandline
args = parse_args(argv)
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
profile = profiles[args.profile]
if (args.create_parents or args.create_folder) and not isdir(dirname(args.output)):
makedirs(dirname(args.output))
if args.create_folder and not isdir(args.output):
mkdir(args.output)
args.genout = isdir(args.output)
print("\033[36mXConv %s (AdvancedAV %s) (c) Taeyeon Mori\033[0m" % (".".join(map(str, version_info)), ".".join(map(str, aav_version_info))))
print("\033[34mProfile: %s\033[0m" % args.profile)
# Initialize AAV
aav = advancedav.SimpleAV(ffmpeg=args.ffmpeg, ffprobe=args.ffprobe)
if args.quiet:
aav.global_conv_args = "-loglevel", "warning"
aav.global_args += "-hide_banner",
# Collect Tasks
tasks = []
print("\033[35mCollecting Tasks..\033[0m")
if args.merge:
task = SimpleTask(aav, make_outfile(args, profile, args.inputs[0]), profile.container)
for input in args.inputs:
task.add_input(input)
tasks.append(task)
elif args.concat:
import tempfile, os
tmp = tempfile.NamedTemporaryFile(mode="w", delete=False)
with tmp:
tmp.write("ffconcat version 1.0\n")
tmp.write("# XConv concat file\n")
for f in map(abspath, args.inputs):
print("\033[36m Concatenating %s\033[0m" % basename(f))
tmp.write("file '%s'\n" % f)
task = SimpleTask(aav, make_outfile(args, profile, args.inputs[0]), profile.container)
options(task.add_input(tmp.name),
f="concat",
safe="0")
tasks.append(task)
else:
if not args.genout and len(args.inputs) > 1:
print("\033[31mOutput path '%s' is not a directory.\033[0m" % args.output)
return -1
for input in args.inputs:
out = make_outfile(args, profile, input)
if args.update and exists(out):
continue
task = SimpleTask(aav, out, profile.container)
task.add_input(input)
tasks.append(task)
print("\033[35mPreparing Tasks..\033[0m")
# Prepare profile parameters
pkw = {}
if profile.defines:
pkw["defines"] = args.define
if profile.features:
if "argshax" in profile.features:
pkw["args"] = args
# Apply profile
for task in tasks:
print("\033[32m Applying profile for '%s'\033[0m" % basename(task.name), end="\033[K\r")
res = profile(task, **pkw)
if not res:
print("\033[31m Failed to apply profile for '%s'\033[0m\033[K" % basename(task.name))
return 1
print("\033[35mExecuting Tasks..\033[0m\033[K")
# Commit
for task in tasks:
print("\033[32m Processing '%s'\033[0m" % basename(task.name))
task.commit()
# Clean up
if args.concat:
os.unlink(tmp.name)
return 0
if __name__ == "__main__":
import sys
sys.exit(main(sys.argv))
exit(main(argv))

Loading…
Cancel
Save