feat(av1-transfer): add process name alias and per-task progress bars

- Add -n/--process-name flag to set ffmpeg argv[0] alias (exec -a equivalent)
- Show real-time per-file progress via ffmpeg -progress pipe:1
- Use slot queue to manage per-worker tqdm bars (position 1..N)
- Add -pix_fmt yuv420p10le for 10-bit output

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
FlintyLemming
2026-02-28 10:29:36 +08:00
parent 10388ebe6d
commit 5bf9f2f89e

View File

@@ -2,82 +2,173 @@
AV1 视频批量转码脚本 AV1 视频批量转码脚本
使用方法: 使用方法:
python main.py -i <输入目录> -o <输出目录> [-w <并行数>] python main.py -i <输入目录> -o <输出目录> [-w <并行数>] [-n <进程别名>]
参数说明: 参数说明:
-i, --input 输入目录路径(必需) -i, --input 输入目录路径(必需)
-o, --output 输出目录路径(必需) -o, --output 输出目录路径(必需)
-w, --workers 并行任务数,默认为 4 -w, --workers 并行任务数,默认为 4
-n, --process-name ffmpeg 进程别名(混淆进程名,等效于 exec -a <别名> ffmpeg
示例: 示例:
python main.py -i /path/to/input -o /path/to/output python main.py -i /path/to/input -o /path/to/output
python main.py -i ./videos -o ./converted -w 2 python main.py -i ./videos -o ./converted -w 2
python main.py -i ./videos -o ./converted -w 2 -n my_stream_1
""" """
import argparse import argparse
import queue
import shutil
import subprocess import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm from tqdm import tqdm
# ================= 配置区域 ================= # ================= 配置区域 =================
DEFAULT_WORKERS = 4 # 默认并行任务数 DEFAULT_WORKERS = 4
VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"} VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"}
# ============================================ # ============================================
def transcode_one_file(file_info): # 每个 worker 进度条的格式
SLOT_BAR_FORMAT = "{desc:<50} {percentage:3.0f}%|{bar}| {n:.0f}/{total:.0f}s"
def get_duration(input_path):
"""用 ffprobe 获取视频时长(秒),失败返回 None"""
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(input_path),
],
capture_output=True,
text=True,
)
try:
return float(result.stdout.strip())
except (ValueError, AttributeError):
return None
def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_name):
""" """
单个文件转码函数 单个文件转码,实时更新对应 slot 的进度条。
file_info: (输入完整路径, 输出完整路径) file_info: (输入完整路径, 输出完整路径)
""" """
input_path, output_path = file_info input_path, output_path = file_info
# 确保输出目录存在 # 从队列中拿到一个空闲 slot
slot = slot_queue.get()
bar = slot_bars[slot]
try:
output_path.parent.mkdir(parents=True, exist_ok=True) output_path.parent.mkdir(parents=True, exist_ok=True)
# ffmpeg 命令 duration = get_duration(input_path)
# -y: 覆盖已存在文件 total_secs = int(duration) if duration else 1
# -loglevel error: 减少日志输出,避免干扰进度条
cmd = [ bar.reset(total=total_secs)
"ffmpeg", "-y", "-i", str(input_path), short_name = input_path.name
if len(short_name) > 45:
short_name = short_name[:42] + "..."
bar.set_description(f" {short_name}")
# 构建 ffmpeg 命令
# -progress pipe:1 将结构化进度输出到 stdoutkey=value 格式)
ffmpeg_args = [
"-y",
"-progress", "pipe:1",
"-loglevel", "error",
"-i", str(input_path),
"-c:v", "libsvtav1", "-c:v", "libsvtav1",
"-crf", "30", "-crf", "30",
"-preset", "6", "-preset", "6",
"-pix_fmt", "yuv420p10le",
"-c:a", "copy", "-c:a", "copy",
str(output_path) str(output_path),
] ]
if process_name:
# 用 executable 指定真实路径argv[0] 设为别名
# 等效于 shell 的: exec -a <process_name> ffmpeg ...
ffmpeg_exe = shutil.which("ffmpeg")
if not ffmpeg_exe:
raise RuntimeError("找不到 ffmpeg 可执行文件")
cmd = [process_name] + ffmpeg_args
proc = subprocess.Popen(
cmd,
executable=ffmpeg_exe,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
else:
cmd = ["ffmpeg"] + ffmpeg_args
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# 实时解析 -progress 输出out_time_us 字段,单位微秒)
last_secs = 0.0
for raw_line in proc.stdout:
line = raw_line.decode(errors="replace").strip()
if line.startswith("out_time_us="):
try: try:
# 使用 subprocess 运行,捕获 stderr 避免刷屏 us = int(line.split("=", 1)[1])
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) if duration and us > 0:
return True cur_secs = us / 1_000_000
except subprocess.CalledProcessError as e: delta = cur_secs - last_secs
print(f"\n[错误] 转码失败: {input_path}\n原因: {e.stderr.decode()}") if delta > 0:
# 避免浮点误差超出 total
delta = min(delta, bar.total - bar.n)
bar.update(delta)
last_secs = cur_secs
except (ValueError, IndexError):
pass
proc.wait()
if proc.returncode != 0:
err = proc.stderr.read().decode(errors="replace")
tqdm.write(f"\n[错误] 转码失败: {input_path}\n{err}")
return False return False
return True
except Exception as e:
tqdm.write(f"\n[异常] {input_path}: {e}")
return False
finally:
# 重置 slot 进度条并归还 slot
bar.set_description(" [空闲]")
bar.reset(total=1)
overall_bar.update(1)
slot_queue.put(slot)
def parse_args(): def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="AV1 视频批量转码脚本", description="AV1 视频批量转码脚本",
formatter_class=argparse.RawDescriptionHelpFormatter formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-i", "--input",
type=str,
required=True,
help="输入目录路径"
)
parser.add_argument(
"-o", "--output",
type=str,
required=True,
help="输出目录路径"
) )
parser.add_argument("-i", "--input", type=str, required=True, help="输入目录路径")
parser.add_argument("-o", "--output", type=str, required=True, help="输出目录路径")
parser.add_argument( parser.add_argument(
"-w", "--workers", "-w", "--workers",
type=int, type=int,
default=DEFAULT_WORKERS, default=DEFAULT_WORKERS,
help=f"并行任务数(默认: {DEFAULT_WORKERS}" help=f"并行任务数(默认: {DEFAULT_WORKERS}",
)
parser.add_argument(
"-n", "--process-name",
type=str,
default=None,
dest="process_name",
help="ffmpeg 进程别名,用于混淆进程名(如 my_stream_1",
) )
return parser.parse_args() return parser.parse_args()
@@ -88,19 +179,19 @@ def main():
input_root = Path(args.input).resolve() input_root = Path(args.input).resolve()
output_root = Path(args.output).resolve() output_root = Path(args.output).resolve()
max_workers = args.workers max_workers = args.workers
process_name = args.process_name
if not input_root.exists(): if not input_root.exists():
print(f"错误: 输入目录 {input_root} 不存在") print(f"错误: 输入目录 {input_root} 不存在")
return return
# 1. 扫描所有待处理文件 # 扫描所有待处理文件
tasks = [] tasks = []
print(f"正在扫描目录: {input_root} ...") print(f"正在扫描目录: {input_root} ...")
for file in input_root.rglob("*"): for file in input_root.rglob("*"):
if file.name.startswith("._"): if file.name.startswith("._"):
continue continue
if file.suffix.lower() in VIDEO_EXTS: if file.suffix.lower() in VIDEO_EXTS:
# 计算相对路径并更换后缀为 .mp4
rel_path = file.relative_to(input_root) rel_path = file.relative_to(input_root)
out_file = output_root / rel_path.with_suffix(".mp4") out_file = output_root / rel_path.with_suffix(".mp4")
tasks.append((file, out_file)) tasks.append((file, out_file))
@@ -110,14 +201,54 @@ def main():
return return
print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...") print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...")
if process_name:
print(f"进程别名: {process_name}")
# 2. 使用线程池并行执行 # 初始化进度条
# 注意ffmpeg 内部是多线程的max_workers 不宜设置得过大 # position=0: 总进度position=1..N: 各 worker 当前文件进度
overall_bar = tqdm(
total=len(tasks),
desc="总进度",
position=0,
leave=True,
unit="文件",
)
slot_queue: queue.Queue[int] = queue.Queue()
slot_bars: list[tqdm] = []
for i in range(max_workers):
bar = tqdm(
total=1,
desc=" [空闲]",
position=i + 1,
leave=True,
unit="s",
bar_format=SLOT_BAR_FORMAT,
)
slot_bars.append(bar)
slot_queue.put(i)
# 并行执行转码
with ThreadPoolExecutor(max_workers=max_workers) as executor: with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 使用 tqdm 显示总进度 futures = [
list(tqdm(executor.map(transcode_one_file, tasks), total=len(tasks), desc="转码进度")) executor.submit(
transcode_one_file,
task, overall_bar, slot_bars, slot_queue, process_name,
)
for task in tasks
]
for f in as_completed(futures):
try:
f.result()
except Exception as e:
tqdm.write(f"[未捕获异常] {e}")
for bar in slot_bars:
bar.close()
overall_bar.close()
print("\n任务全部完成!") print("\n任务全部完成!")
if __name__ == "__main__": if __name__ == "__main__":
main() main()