diff --git a/av1-transfer/main.py b/av1-transfer/main.py index 9b10c81..dcac2dc 100644 --- a/av1-transfer/main.py +++ b/av1-transfer/main.py @@ -2,82 +2,173 @@ AV1 视频批量转码脚本 使用方法: - python main.py -i <输入目录> -o <输出目录> [-w <并行数>] + python main.py -i <输入目录> -o <输出目录> [-w <并行数>] [-n <进程别名>] 参数说明: - -i, --input 输入目录路径(必需) - -o, --output 输出目录路径(必需) - -w, --workers 并行任务数,默认为 4 + -i, --input 输入目录路径(必需) + -o, --output 输出目录路径(必需) + -w, --workers 并行任务数,默认为 4 + -n, --process-name ffmpeg 进程别名(混淆进程名,等效于 exec -a <别名> ffmpeg) 示例: python main.py -i /path/to/input -o /path/to/output python main.py -i ./videos -o ./converted -w 2 + python main.py -i ./videos -o ./converted -w 2 -n my_stream_1 """ import argparse +import queue +import shutil import subprocess +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from concurrent.futures import ThreadPoolExecutor + from tqdm import tqdm # ================= 配置区域 ================= -DEFAULT_WORKERS = 4 # 默认并行任务数 +DEFAULT_WORKERS = 4 VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"} # ============================================ -def transcode_one_file(file_info): +# 每个 worker 进度条的格式 +SLOT_BAR_FORMAT = "{desc:<50} {percentage:3.0f}%|{bar}| {n:.0f}/{total:.0f}s" + + +def get_duration(input_path): + """用 ffprobe 获取视频时长(秒),失败返回 None""" + result = subprocess.run( + [ + "ffprobe", "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + str(input_path), + ], + capture_output=True, + text=True, + ) + try: + return float(result.stdout.strip()) + except (ValueError, AttributeError): + return None + + +def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_name): """ - 单个文件转码函数 + 单个文件转码,实时更新对应 slot 的进度条。 file_info: (输入完整路径, 输出完整路径) """ input_path, output_path = file_info - # 确保输出目录存在 - output_path.parent.mkdir(parents=True, exist_ok=True) - - # ffmpeg 命令 - # -y: 覆盖已存在文件 - # -loglevel error: 减少日志输出,避免干扰进度条 - cmd = [ - "ffmpeg", "-y", "-i", str(input_path), - "-c:v", "libsvtav1", - "-crf", "30", - "-preset", "6", - "-c:a", "copy", - str(output_path) - ] + # 从队列中拿到一个空闲 slot + slot = slot_queue.get() + bar = slot_bars[slot] try: - # 使用 subprocess 运行,捕获 stderr 避免刷屏 - subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) + output_path.parent.mkdir(parents=True, exist_ok=True) + + duration = get_duration(input_path) + total_secs = int(duration) if duration else 1 + + bar.reset(total=total_secs) + short_name = input_path.name + if len(short_name) > 45: + short_name = short_name[:42] + "..." + bar.set_description(f" {short_name}") + + # 构建 ffmpeg 命令 + # -progress pipe:1 将结构化进度输出到 stdout(key=value 格式) + ffmpeg_args = [ + "-y", + "-progress", "pipe:1", + "-loglevel", "error", + "-i", str(input_path), + "-c:v", "libsvtav1", + "-crf", "30", + "-preset", "6", + "-pix_fmt", "yuv420p10le", + "-c:a", "copy", + str(output_path), + ] + + if process_name: + # 用 executable 指定真实路径,argv[0] 设为别名 + # 等效于 shell 的: exec -a ffmpeg ... + ffmpeg_exe = shutil.which("ffmpeg") + if not ffmpeg_exe: + raise RuntimeError("找不到 ffmpeg 可执行文件") + cmd = [process_name] + ffmpeg_args + proc = subprocess.Popen( + cmd, + executable=ffmpeg_exe, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + else: + cmd = ["ffmpeg"] + ffmpeg_args + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + # 实时解析 -progress 输出(out_time_us 字段,单位微秒) + last_secs = 0.0 + for raw_line in proc.stdout: + line = raw_line.decode(errors="replace").strip() + if line.startswith("out_time_us="): + try: + us = int(line.split("=", 1)[1]) + if duration and us > 0: + cur_secs = us / 1_000_000 + delta = cur_secs - last_secs + if delta > 0: + # 避免浮点误差超出 total + delta = min(delta, bar.total - bar.n) + bar.update(delta) + last_secs = cur_secs + except (ValueError, IndexError): + pass + + proc.wait() + + if proc.returncode != 0: + err = proc.stderr.read().decode(errors="replace") + tqdm.write(f"\n[错误] 转码失败: {input_path}\n{err}") + return False + return True - except subprocess.CalledProcessError as e: - print(f"\n[错误] 转码失败: {input_path}\n原因: {e.stderr.decode()}") + + except Exception as e: + tqdm.write(f"\n[异常] {input_path}: {e}") return False + finally: + # 重置 slot 进度条并归还 slot + bar.set_description(" [空闲]") + bar.reset(total=1) + overall_bar.update(1) + slot_queue.put(slot) + + def parse_args(): - """解析命令行参数""" parser = argparse.ArgumentParser( description="AV1 视频批量转码脚本", - formatter_class=argparse.RawDescriptionHelpFormatter - ) - parser.add_argument( - "-i", "--input", - type=str, - required=True, - help="输入目录路径" - ) - parser.add_argument( - "-o", "--output", - type=str, - required=True, - help="输出目录路径" + formatter_class=argparse.RawDescriptionHelpFormatter, ) + parser.add_argument("-i", "--input", type=str, required=True, help="输入目录路径") + parser.add_argument("-o", "--output", type=str, required=True, help="输出目录路径") parser.add_argument( "-w", "--workers", type=int, default=DEFAULT_WORKERS, - help=f"并行任务数(默认: {DEFAULT_WORKERS})" + help=f"并行任务数(默认: {DEFAULT_WORKERS})", + ) + parser.add_argument( + "-n", "--process-name", + type=str, + default=None, + dest="process_name", + help="ffmpeg 进程别名,用于混淆进程名(如 my_stream_1)", ) return parser.parse_args() @@ -88,19 +179,19 @@ def main(): input_root = Path(args.input).resolve() output_root = Path(args.output).resolve() max_workers = args.workers + process_name = args.process_name if not input_root.exists(): print(f"错误: 输入目录 {input_root} 不存在") return - # 1. 扫描所有待处理文件 + # 扫描所有待处理文件 tasks = [] print(f"正在扫描目录: {input_root} ...") for file in input_root.rglob("*"): if file.name.startswith("._"): continue if file.suffix.lower() in VIDEO_EXTS: - # 计算相对路径并更换后缀为 .mp4 rel_path = file.relative_to(input_root) out_file = output_root / rel_path.with_suffix(".mp4") tasks.append((file, out_file)) @@ -110,14 +201,54 @@ def main(): return print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...") + if process_name: + print(f"进程别名: {process_name}") - # 2. 使用线程池并行执行 - # 注意:ffmpeg 内部是多线程的,max_workers 不宜设置得过大 + # 初始化进度条 + # position=0: 总进度;position=1..N: 各 worker 当前文件进度 + overall_bar = tqdm( + total=len(tasks), + desc="总进度", + position=0, + leave=True, + unit="文件", + ) + + slot_queue: queue.Queue[int] = queue.Queue() + slot_bars: list[tqdm] = [] + for i in range(max_workers): + bar = tqdm( + total=1, + desc=" [空闲]", + position=i + 1, + leave=True, + unit="s", + bar_format=SLOT_BAR_FORMAT, + ) + slot_bars.append(bar) + slot_queue.put(i) + + # 并行执行转码 with ThreadPoolExecutor(max_workers=max_workers) as executor: - # 使用 tqdm 显示总进度 - list(tqdm(executor.map(transcode_one_file, tasks), total=len(tasks), desc="转码进度")) + futures = [ + executor.submit( + transcode_one_file, + task, overall_bar, slot_bars, slot_queue, process_name, + ) + for task in tasks + ] + for f in as_completed(futures): + try: + f.result() + except Exception as e: + tqdm.write(f"[未捕获异常] {e}") + + for bar in slot_bars: + bar.close() + overall_bar.close() print("\n任务全部完成!") + if __name__ == "__main__": - main() \ No newline at end of file + main()