diff --git a/av1-transfer/main.py b/av1-transfer/main.py index 30d4322..04b9b54 100644 --- a/av1-transfer/main.py +++ b/av1-transfer/main.py @@ -2,28 +2,21 @@ AV1 视频批量转码脚本 使用方法: - python main.py -i <输入目录> -o <输出目录> [-w <并行数>] [-n <进程别名>] + python main.py -i <输入目录> -o <输出目录> [-w <并行数>] 参数说明: - -i, --input 输入目录路径(必需) - -o, --output 输出目录路径(必需) - -w, --workers 并行任务数,默认为 4 - -n, --process-name 进程别名(同时重命名 Python 主进程和 ffmpeg 子进程) + -i, --input 输入目录路径(必需) + -o, --output 输出目录路径(必需) + -w, --workers 并行任务数,默认为 4 示例: python main.py -i /path/to/input -o /path/to/output python main.py -i ./videos -o ./converted -w 2 - python main.py -i ./videos -o ./converted -w 2 -n my_stream_1 """ import argparse -import ctypes -import ctypes.util -import os import queue -import shutil import subprocess -import sys from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path @@ -38,42 +31,6 @@ VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"} SLOT_BAR_FORMAT = "{desc:<50} {percentage:3.0f}%|{bar}| {n:.0f}/{total:.0f}s" -def _set_proc_title(title: str) -> None: - """ - 通过两种方式彻底隐藏进程命令行: - 1. prctl(PR_SET_NAME) — 改 /proc/self/comm(top 默认显示的短名) - 2. 直接覆盖 argv 内存 — 改 /proc/self/cmdline(ps aux COMMAND 列) - """ - # 1. 改短名(/proc/self/comm,最多 15 字节) - try: - libc_name = ctypes.util.find_library("c") - if libc_name: - libc = ctypes.CDLL(libc_name, use_errno=True) - libc.prctl(15, title.encode()[:15], 0, 0, 0) # PR_SET_NAME = 15 - except Exception: - pass - - # 2. 覆盖 argv 内存,清除 /proc/self/cmdline 里的参数 - try: - argc = ctypes.c_int(0) - argv = ctypes.POINTER(ctypes.c_char_p)() - ctypes.pythonapi.Py_GetArgcArgv(ctypes.byref(argc), ctypes.byref(argv)) - if argc.value == 0: - return - # argv 字符串在内存中连续排列,计算总占用字节数 - start = ctypes.cast(argv[0], ctypes.c_void_p).value - end = ctypes.cast(argv[argc.value - 1], ctypes.c_void_p).value - end += len(argv[argc.value - 1]) + 1 - size = end - start - # 用 title 填充开头,其余全部清零 - enc = title.encode()[:size - 1] - buf = (ctypes.c_char * size).from_address(start) - buf[:len(enc)] = enc - buf[len(enc):] = b"\x00" * (size - len(enc)) - except Exception: - pass - - def get_duration(input_path): """用 ffprobe 获取视频时长(秒),失败返回 None""" try: @@ -92,14 +49,13 @@ def get_duration(input_path): return None -def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_name): +def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue): """ 单个文件转码,实时更新对应 slot 的进度条。 file_info: (输入完整路径, 输出完整路径) """ input_path, output_path = file_info - # 从队列中拿到一个空闲 slot slot = slot_queue.get() bar = slot_bars[slot] @@ -115,10 +71,8 @@ def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_na short_name = short_name[:42] + "..." bar.set_description(f" {short_name}") - # 构建 ffmpeg 命令 - # -progress pipe:1 将结构化进度输出到 stdout(key=value 格式) - ffmpeg_args = [ - "-y", + cmd = [ + "ffmpeg", "-y", "-progress", "pipe:1", "-loglevel", "error", "-i", str(input_path), @@ -126,34 +80,15 @@ def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_na "-crf", "30", "-preset", "6", "-pix_fmt", "yuv420p10le", - "-c:a", "copy", + "-c:a", "copy", str(output_path), ] - str_args = [str(a) for a in ffmpeg_args] - if process_name: - # 尝试在 fork 后、subprocess exec 之前用 os.execvp 替换进程(设 argv[0] 为别名) - # 多线程环境下 GIL 可能导致 execvp 失败;失败时静默返回, - # subprocess 会接着正常执行 ffmpeg,转码不受影响 - _pname = process_name - _args = str_args - def _try_exec(): - try: - os.execvp("ffmpeg", [_pname] + _args) - except Exception: - pass - proc = subprocess.Popen( - ["ffmpeg"] + str_args, - preexec_fn=_try_exec, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - else: - proc = subprocess.Popen( - ["ffmpeg"] + str_args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) # 实时解析 -progress 输出(out_time_us 字段,单位微秒) last_secs = 0.0 @@ -166,7 +101,6 @@ def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_na cur_secs = us / 1_000_000 delta = cur_secs - last_secs if delta > 0: - # 避免浮点误差超出 total delta = min(delta, bar.total - bar.n) bar.update(delta) last_secs = cur_secs @@ -187,7 +121,6 @@ def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_na return False finally: - # 重置 slot 进度条并归还 slot bar.set_description(" [空闲]") bar.reset(total=1) overall_bar.update(1) @@ -207,13 +140,6 @@ def parse_args(): default=DEFAULT_WORKERS, help=f"并行任务数(默认: {DEFAULT_WORKERS})", ) - parser.add_argument( - "-n", "--process-name", - type=str, - default=None, - dest="process_name", - help="ffmpeg 进程别名,用于混淆进程名(如 my_stream_1)", - ) return parser.parse_args() @@ -223,13 +149,11 @@ def main(): input_root = Path(args.input).resolve() output_root = Path(args.output).resolve() max_workers = args.workers - process_name = args.process_name if not input_root.exists(): print(f"错误: 输入目录 {input_root} 不存在") return - # 扫描所有待处理文件 tasks = [] print(f"正在扫描目录: {input_root} ...") for file in input_root.rglob("*"): @@ -245,12 +169,7 @@ def main(): return print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...") - if process_name: - _set_proc_title(process_name) - print(f"进程别名: {process_name}") - # 初始化进度条 - # position=0: 总进度;position=1..N: 各 worker 当前文件进度 overall_bar = tqdm( total=len(tasks), desc="总进度", @@ -273,13 +192,9 @@ def main(): slot_bars.append(bar) slot_queue.put(i) - # 并行执行转码 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [ - executor.submit( - transcode_one_file, - task, overall_bar, slot_bars, slot_queue, process_name, - ) + executor.submit(transcode_one_file, task, overall_bar, slot_bars, slot_queue) for task in tasks ] for f in as_completed(futures):