""" AV1 视频批量转码脚本 使用方法: python main.py -i <输入目录> -o <输出目录> [-w <并行数>] [-n <进程别名>] 参数说明: -i, --input 输入目录路径(必需) -o, --output 输出目录路径(必需) -w, --workers 并行任务数,默认为 4 -n, --process-name 进程别名(同时重命名 Python 主进程和 ffmpeg 子进程) 示例: python main.py -i /path/to/input -o /path/to/output python main.py -i ./videos -o ./converted -w 2 python main.py -i ./videos -o ./converted -w 2 -n my_stream_1 """ import argparse import ctypes import ctypes.util import queue import shlex import subprocess import sys from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from tqdm import tqdm # ================= 配置区域 ================= DEFAULT_WORKERS = 4 VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"} # ============================================ # 每个 worker 进度条的格式 SLOT_BAR_FORMAT = "{desc:<50} {percentage:3.0f}%|{bar}| {n:.0f}/{total:.0f}s" def _set_proc_title(title: str) -> None: """ 通过两种方式彻底隐藏进程命令行: 1. prctl(PR_SET_NAME) — 改 /proc/self/comm(top 默认显示的短名) 2. 直接覆盖 argv 内存 — 改 /proc/self/cmdline(ps aux COMMAND 列) """ # 1. 改短名(/proc/self/comm,最多 15 字节) try: libc_name = ctypes.util.find_library("c") if libc_name: libc = ctypes.CDLL(libc_name, use_errno=True) libc.prctl(15, title.encode()[:15], 0, 0, 0) # PR_SET_NAME = 15 except Exception: pass # 2. 覆盖 argv 内存,清除 /proc/self/cmdline 里的参数 try: argc = ctypes.c_int(0) argv = ctypes.POINTER(ctypes.c_char_p)() ctypes.pythonapi.Py_GetArgcArgv(ctypes.byref(argc), ctypes.byref(argv)) if argc.value == 0: return # argv 字符串在内存中连续排列,计算总占用字节数 start = ctypes.cast(argv[0], ctypes.c_void_p).value end = ctypes.cast(argv[argc.value - 1], ctypes.c_void_p).value end += len(argv[argc.value - 1]) + 1 size = end - start # 用 title 填充开头,其余全部清零 enc = title.encode()[:size - 1] buf = (ctypes.c_char * size).from_address(start) buf[:len(enc)] = enc buf[len(enc):] = b"\x00" * (size - len(enc)) except Exception: pass def get_duration(input_path): """用 ffprobe 获取视频时长(秒),失败返回 None""" try: result = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(input_path), ], capture_output=True, text=True, ) return float(result.stdout.strip()) except Exception: return None def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_name): """ 单个文件转码,实时更新对应 slot 的进度条。 file_info: (输入完整路径, 输出完整路径) """ input_path, output_path = file_info # 从队列中拿到一个空闲 slot slot = slot_queue.get() bar = slot_bars[slot] try: output_path.parent.mkdir(parents=True, exist_ok=True) duration = get_duration(input_path) total_secs = int(duration) if duration else 1 bar.reset(total=total_secs) short_name = input_path.name if len(short_name) > 45: short_name = short_name[:42] + "..." bar.set_description(f" {short_name}") # 构建 ffmpeg 命令 # -progress pipe:1 将结构化进度输出到 stdout(key=value 格式) ffmpeg_args = [ "-y", "-progress", "pipe:1", "-loglevel", "error", "-i", str(input_path), "-c:v", "libsvtav1", "-crf", "30", "-preset", "6", "-pix_fmt", "yuv420p10le", "-c:a", "copy", str(output_path), ] if process_name: # 用 bash 的 exec -a 重命名 ffmpeg 进程的 argv[0] # ffmpeg 通过 PATH 查找,避免 execve 直接调用 symlink 的兼容问题 shell_args = " ".join(shlex.quote(str(a)) for a in ffmpeg_args) proc = subprocess.Popen( f"exec -a {shlex.quote(process_name)} ffmpeg {shell_args}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) else: proc = subprocess.Popen( ["ffmpeg"] + ffmpeg_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # 实时解析 -progress 输出(out_time_us 字段,单位微秒) last_secs = 0.0 for raw_line in proc.stdout: line = raw_line.decode(errors="replace").strip() if line.startswith("out_time_us="): try: us = int(line.split("=", 1)[1]) if duration and us > 0: cur_secs = us / 1_000_000 delta = cur_secs - last_secs if delta > 0: # 避免浮点误差超出 total delta = min(delta, bar.total - bar.n) bar.update(delta) last_secs = cur_secs except (ValueError, IndexError): pass proc.wait() if proc.returncode != 0: err = proc.stderr.read().decode(errors="replace") tqdm.write(f"\n[错误] 转码失败: {input_path}\n{err}") return False return True except Exception as e: tqdm.write(f"\n[异常] {input_path}: {e}") return False finally: # 重置 slot 进度条并归还 slot bar.set_description(" [空闲]") bar.reset(total=1) overall_bar.update(1) slot_queue.put(slot) def parse_args(): parser = argparse.ArgumentParser( description="AV1 视频批量转码脚本", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("-i", "--input", type=str, required=True, help="输入目录路径") parser.add_argument("-o", "--output", type=str, required=True, help="输出目录路径") parser.add_argument( "-w", "--workers", type=int, default=DEFAULT_WORKERS, help=f"并行任务数(默认: {DEFAULT_WORKERS})", ) parser.add_argument( "-n", "--process-name", type=str, default=None, dest="process_name", help="ffmpeg 进程别名,用于混淆进程名(如 my_stream_1)", ) return parser.parse_args() def main(): args = parse_args() input_root = Path(args.input).resolve() output_root = Path(args.output).resolve() max_workers = args.workers process_name = args.process_name if not input_root.exists(): print(f"错误: 输入目录 {input_root} 不存在") return # 扫描所有待处理文件 tasks = [] print(f"正在扫描目录: {input_root} ...") for file in input_root.rglob("*"): if file.name.startswith("._"): continue if file.suffix.lower() in VIDEO_EXTS: rel_path = file.relative_to(input_root) out_file = output_root / rel_path.with_suffix(".mp4") tasks.append((file, out_file)) if not tasks: print("未发现匹配的视频文件。") return print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...") if process_name: _set_proc_title(process_name) print(f"进程别名: {process_name}") # 初始化进度条 # position=0: 总进度;position=1..N: 各 worker 当前文件进度 overall_bar = tqdm( total=len(tasks), desc="总进度", position=0, leave=True, unit="文件", ) slot_queue: queue.Queue[int] = queue.Queue() slot_bars: list[tqdm] = [] for i in range(max_workers): bar = tqdm( total=1, desc=" [空闲]", position=i + 1, leave=True, unit="s", bar_format=SLOT_BAR_FORMAT, ) slot_bars.append(bar) slot_queue.put(i) # 并行执行转码 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ executor.submit( transcode_one_file, task, overall_bar, slot_bars, slot_queue, process_name, ) for task in tasks ] for f in as_completed(futures): try: f.result() except Exception as e: tqdm.write(f"[未捕获异常] {e}") for bar in slot_bars: bar.close() overall_bar.close() print("\n任务全部完成!") if __name__ == "__main__": main()