Files
scripts-public/av1-transfer/main.py
FlintyLemming 79e19229af 🐛 fix(av1-transfer): use preexec_fn+os.execvp to rename ffmpeg argv[0]
execve with explicit paths fails on this system (linuxbrew symlink issue).
os.execvp in preexec_fn uses PATH lookup which works, and since it replaces
the child process before subprocess's own exec runs, pipes are inherited correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 11:19:11 +08:00

296 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AV1 视频批量转码脚本
使用方法:
python main.py -i <输入目录> -o <输出目录> [-w <并行数>] [-n <进程别名>]
参数说明:
-i, --input 输入目录路径(必需)
-o, --output 输出目录路径(必需)
-w, --workers 并行任务数,默认为 4
-n, --process-name 进程别名(同时重命名 Python 主进程和 ffmpeg 子进程)
示例:
python main.py -i /path/to/input -o /path/to/output
python main.py -i ./videos -o ./converted -w 2
python main.py -i ./videos -o ./converted -w 2 -n my_stream_1
"""
import argparse
import ctypes
import ctypes.util
import os
import queue
import shutil
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
# ================= 配置区域 =================
DEFAULT_WORKERS = 4
VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"}
# ============================================
# 每个 worker 进度条的格式
SLOT_BAR_FORMAT = "{desc:<50} {percentage:3.0f}%|{bar}| {n:.0f}/{total:.0f}s"
def _set_proc_title(title: str) -> None:
"""
通过两种方式彻底隐藏进程命令行:
1. prctl(PR_SET_NAME) — 改 /proc/self/commtop 默认显示的短名)
2. 直接覆盖 argv 内存 — 改 /proc/self/cmdlineps aux COMMAND 列)
"""
# 1. 改短名(/proc/self/comm最多 15 字节)
try:
libc_name = ctypes.util.find_library("c")
if libc_name:
libc = ctypes.CDLL(libc_name, use_errno=True)
libc.prctl(15, title.encode()[:15], 0, 0, 0) # PR_SET_NAME = 15
except Exception:
pass
# 2. 覆盖 argv 内存,清除 /proc/self/cmdline 里的参数
try:
argc = ctypes.c_int(0)
argv = ctypes.POINTER(ctypes.c_char_p)()
ctypes.pythonapi.Py_GetArgcArgv(ctypes.byref(argc), ctypes.byref(argv))
if argc.value == 0:
return
# argv 字符串在内存中连续排列,计算总占用字节数
start = ctypes.cast(argv[0], ctypes.c_void_p).value
end = ctypes.cast(argv[argc.value - 1], ctypes.c_void_p).value
end += len(argv[argc.value - 1]) + 1
size = end - start
# 用 title 填充开头,其余全部清零
enc = title.encode()[:size - 1]
buf = (ctypes.c_char * size).from_address(start)
buf[:len(enc)] = enc
buf[len(enc):] = b"\x00" * (size - len(enc))
except Exception:
pass
def get_duration(input_path):
"""用 ffprobe 获取视频时长(秒),失败返回 None"""
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(input_path),
],
capture_output=True,
text=True,
)
return float(result.stdout.strip())
except Exception:
return None
def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_name):
"""
单个文件转码,实时更新对应 slot 的进度条。
file_info: (输入完整路径, 输出完整路径)
"""
input_path, output_path = file_info
# 从队列中拿到一个空闲 slot
slot = slot_queue.get()
bar = slot_bars[slot]
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
duration = get_duration(input_path)
total_secs = int(duration) if duration else 1
bar.reset(total=total_secs)
short_name = input_path.name
if len(short_name) > 45:
short_name = short_name[:42] + "..."
bar.set_description(f" {short_name}")
# 构建 ffmpeg 命令
# -progress pipe:1 将结构化进度输出到 stdoutkey=value 格式)
ffmpeg_args = [
"-y",
"-progress", "pipe:1",
"-loglevel", "error",
"-i", str(input_path),
"-c:v", "libsvtav1",
"-crf", "30",
"-preset", "6",
"-pix_fmt", "yuv420p10le",
"-c:a", "copy",
str(output_path),
]
str_args = [str(a) for a in ffmpeg_args]
if process_name:
# fork 后、subprocess 执行 exec 之前,用 os.execvp 替换进程:
# - execvp 通过 PATH 查找 ffmpeg规避 execve 直接调用 symlink 的问题)
# - argv[0] 设为别名ps/top 显示别名而非 ffmpeg
# - subprocess 设好的 stdout/stderr pipe 已继承,无需额外处理
_pname = process_name
_args = str_args
proc = subprocess.Popen(
["ffmpeg"] + str_args,
preexec_fn=lambda: os.execvp("ffmpeg", [_pname] + _args),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
else:
proc = subprocess.Popen(
["ffmpeg"] + str_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# 实时解析 -progress 输出out_time_us 字段,单位微秒)
last_secs = 0.0
for raw_line in proc.stdout:
line = raw_line.decode(errors="replace").strip()
if line.startswith("out_time_us="):
try:
us = int(line.split("=", 1)[1])
if duration and us > 0:
cur_secs = us / 1_000_000
delta = cur_secs - last_secs
if delta > 0:
# 避免浮点误差超出 total
delta = min(delta, bar.total - bar.n)
bar.update(delta)
last_secs = cur_secs
except (ValueError, IndexError):
pass
proc.wait()
if proc.returncode != 0:
err = proc.stderr.read().decode(errors="replace")
tqdm.write(f"\n[错误] 转码失败: {input_path}\n{err}")
return False
return True
except Exception as e:
tqdm.write(f"\n[异常] {input_path}: {e}")
return False
finally:
# 重置 slot 进度条并归还 slot
bar.set_description(" [空闲]")
bar.reset(total=1)
overall_bar.update(1)
slot_queue.put(slot)
def parse_args():
parser = argparse.ArgumentParser(
description="AV1 视频批量转码脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("-i", "--input", type=str, required=True, help="输入目录路径")
parser.add_argument("-o", "--output", type=str, required=True, help="输出目录路径")
parser.add_argument(
"-w", "--workers",
type=int,
default=DEFAULT_WORKERS,
help=f"并行任务数(默认: {DEFAULT_WORKERS}",
)
parser.add_argument(
"-n", "--process-name",
type=str,
default=None,
dest="process_name",
help="ffmpeg 进程别名,用于混淆进程名(如 my_stream_1",
)
return parser.parse_args()
def main():
args = parse_args()
input_root = Path(args.input).resolve()
output_root = Path(args.output).resolve()
max_workers = args.workers
process_name = args.process_name
if not input_root.exists():
print(f"错误: 输入目录 {input_root} 不存在")
return
# 扫描所有待处理文件
tasks = []
print(f"正在扫描目录: {input_root} ...")
for file in input_root.rglob("*"):
if file.name.startswith("._"):
continue
if file.suffix.lower() in VIDEO_EXTS:
rel_path = file.relative_to(input_root)
out_file = output_root / rel_path.with_suffix(".mp4")
tasks.append((file, out_file))
if not tasks:
print("未发现匹配的视频文件。")
return
print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...")
if process_name:
_set_proc_title(process_name)
print(f"进程别名: {process_name}")
# 初始化进度条
# position=0: 总进度position=1..N: 各 worker 当前文件进度
overall_bar = tqdm(
total=len(tasks),
desc="总进度",
position=0,
leave=True,
unit="文件",
)
slot_queue: queue.Queue[int] = queue.Queue()
slot_bars: list[tqdm] = []
for i in range(max_workers):
bar = tqdm(
total=1,
desc=" [空闲]",
position=i + 1,
leave=True,
unit="s",
bar_format=SLOT_BAR_FORMAT,
)
slot_bars.append(bar)
slot_queue.put(i)
# 并行执行转码
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(
transcode_one_file,
task, overall_bar, slot_bars, slot_queue, process_name,
)
for task in tasks
]
for f in as_completed(futures):
try:
f.result()
except Exception as e:
tqdm.write(f"[未捕获异常] {e}")
for bar in slot_bars:
bar.close()
overall_bar.close()
print("\n任务全部完成!")
if __name__ == "__main__":
main()