Drop the setproctitle dependency. Use prctl(PR_SET_NAME) to change /proc/self/comm and directly overwrite argv memory via Py_GetArgcArgv to clear /proc/self/cmdline, so ps aux shows no arguments. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
295 lines
9.1 KiB
Python
295 lines
9.1 KiB
Python
"""
|
||
AV1 视频批量转码脚本
|
||
|
||
使用方法:
|
||
python main.py -i <输入目录> -o <输出目录> [-w <并行数>] [-n <进程别名>]
|
||
|
||
参数说明:
|
||
-i, --input 输入目录路径(必需)
|
||
-o, --output 输出目录路径(必需)
|
||
-w, --workers 并行任务数,默认为 4
|
||
-n, --process-name 进程别名(同时重命名 Python 主进程和 ffmpeg 子进程)
|
||
|
||
示例:
|
||
python main.py -i /path/to/input -o /path/to/output
|
||
python main.py -i ./videos -o ./converted -w 2
|
||
python main.py -i ./videos -o ./converted -w 2 -n my_stream_1
|
||
"""
|
||
|
||
import argparse
|
||
import ctypes
|
||
import ctypes.util
|
||
import queue
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from pathlib import Path
|
||
|
||
from tqdm import tqdm
|
||
|
||
# ================= 配置区域 =================
|
||
DEFAULT_WORKERS = 4
|
||
VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"}
|
||
# ============================================
|
||
|
||
# 每个 worker 进度条的格式
|
||
SLOT_BAR_FORMAT = "{desc:<50} {percentage:3.0f}%|{bar}| {n:.0f}/{total:.0f}s"
|
||
|
||
|
||
def _set_proc_title(title: str) -> None:
|
||
"""
|
||
通过两种方式彻底隐藏进程命令行:
|
||
1. prctl(PR_SET_NAME) — 改 /proc/self/comm(top 默认显示的短名)
|
||
2. 直接覆盖 argv 内存 — 改 /proc/self/cmdline(ps aux COMMAND 列)
|
||
"""
|
||
# 1. 改短名(/proc/self/comm,最多 15 字节)
|
||
try:
|
||
libc_name = ctypes.util.find_library("c")
|
||
if libc_name:
|
||
libc = ctypes.CDLL(libc_name, use_errno=True)
|
||
libc.prctl(15, title.encode()[:15], 0, 0, 0) # PR_SET_NAME = 15
|
||
except Exception:
|
||
pass
|
||
|
||
# 2. 覆盖 argv 内存,清除 /proc/self/cmdline 里的参数
|
||
try:
|
||
argc = ctypes.c_int(0)
|
||
argv = ctypes.POINTER(ctypes.c_char_p)()
|
||
ctypes.pythonapi.Py_GetArgcArgv(ctypes.byref(argc), ctypes.byref(argv))
|
||
if argc.value == 0:
|
||
return
|
||
# argv 字符串在内存中连续排列,计算总占用字节数
|
||
start = ctypes.cast(argv[0], ctypes.c_void_p).value
|
||
end = ctypes.cast(argv[argc.value - 1], ctypes.c_void_p).value
|
||
end += len(argv[argc.value - 1]) + 1
|
||
size = end - start
|
||
# 用 title 填充开头,其余全部清零
|
||
enc = title.encode()[:size - 1]
|
||
buf = (ctypes.c_char * size).from_address(start)
|
||
buf[:len(enc)] = enc
|
||
buf[len(enc):] = b"\x00" * (size - len(enc))
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_duration(input_path):
|
||
"""用 ffprobe 获取视频时长(秒),失败返回 None"""
|
||
result = subprocess.run(
|
||
[
|
||
"ffprobe", "-v", "error",
|
||
"-show_entries", "format=duration",
|
||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||
str(input_path),
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
try:
|
||
return float(result.stdout.strip())
|
||
except (ValueError, AttributeError):
|
||
return None
|
||
|
||
|
||
def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_name):
|
||
"""
|
||
单个文件转码,实时更新对应 slot 的进度条。
|
||
file_info: (输入完整路径, 输出完整路径)
|
||
"""
|
||
input_path, output_path = file_info
|
||
|
||
# 从队列中拿到一个空闲 slot
|
||
slot = slot_queue.get()
|
||
bar = slot_bars[slot]
|
||
|
||
try:
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
duration = get_duration(input_path)
|
||
total_secs = int(duration) if duration else 1
|
||
|
||
bar.reset(total=total_secs)
|
||
short_name = input_path.name
|
||
if len(short_name) > 45:
|
||
short_name = short_name[:42] + "..."
|
||
bar.set_description(f" {short_name}")
|
||
|
||
# 构建 ffmpeg 命令
|
||
# -progress pipe:1 将结构化进度输出到 stdout(key=value 格式)
|
||
ffmpeg_args = [
|
||
"-y",
|
||
"-progress", "pipe:1",
|
||
"-loglevel", "error",
|
||
"-i", str(input_path),
|
||
"-c:v", "libsvtav1",
|
||
"-crf", "30",
|
||
"-preset", "6",
|
||
"-pix_fmt", "yuv420p10le",
|
||
"-c:a", "copy",
|
||
str(output_path),
|
||
]
|
||
|
||
if process_name:
|
||
# 用 executable 指定真实路径,argv[0] 设为别名
|
||
# 等效于 shell 的: exec -a <process_name> ffmpeg ...
|
||
ffmpeg_exe = shutil.which("ffmpeg")
|
||
if not ffmpeg_exe:
|
||
raise RuntimeError("找不到 ffmpeg 可执行文件")
|
||
cmd = [process_name] + ffmpeg_args
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
executable=ffmpeg_exe,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
else:
|
||
cmd = ["ffmpeg"] + ffmpeg_args
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
|
||
# 实时解析 -progress 输出(out_time_us 字段,单位微秒)
|
||
last_secs = 0.0
|
||
for raw_line in proc.stdout:
|
||
line = raw_line.decode(errors="replace").strip()
|
||
if line.startswith("out_time_us="):
|
||
try:
|
||
us = int(line.split("=", 1)[1])
|
||
if duration and us > 0:
|
||
cur_secs = us / 1_000_000
|
||
delta = cur_secs - last_secs
|
||
if delta > 0:
|
||
# 避免浮点误差超出 total
|
||
delta = min(delta, bar.total - bar.n)
|
||
bar.update(delta)
|
||
last_secs = cur_secs
|
||
except (ValueError, IndexError):
|
||
pass
|
||
|
||
proc.wait()
|
||
|
||
if proc.returncode != 0:
|
||
err = proc.stderr.read().decode(errors="replace")
|
||
tqdm.write(f"\n[错误] 转码失败: {input_path}\n{err}")
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
tqdm.write(f"\n[异常] {input_path}: {e}")
|
||
return False
|
||
|
||
finally:
|
||
# 重置 slot 进度条并归还 slot
|
||
bar.set_description(" [空闲]")
|
||
bar.reset(total=1)
|
||
overall_bar.update(1)
|
||
slot_queue.put(slot)
|
||
|
||
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser(
|
||
description="AV1 视频批量转码脚本",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
)
|
||
parser.add_argument("-i", "--input", type=str, required=True, help="输入目录路径")
|
||
parser.add_argument("-o", "--output", type=str, required=True, help="输出目录路径")
|
||
parser.add_argument(
|
||
"-w", "--workers",
|
||
type=int,
|
||
default=DEFAULT_WORKERS,
|
||
help=f"并行任务数(默认: {DEFAULT_WORKERS})",
|
||
)
|
||
parser.add_argument(
|
||
"-n", "--process-name",
|
||
type=str,
|
||
default=None,
|
||
dest="process_name",
|
||
help="ffmpeg 进程别名,用于混淆进程名(如 my_stream_1)",
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
def main():
|
||
args = parse_args()
|
||
|
||
input_root = Path(args.input).resolve()
|
||
output_root = Path(args.output).resolve()
|
||
max_workers = args.workers
|
||
process_name = args.process_name
|
||
|
||
if not input_root.exists():
|
||
print(f"错误: 输入目录 {input_root} 不存在")
|
||
return
|
||
|
||
# 扫描所有待处理文件
|
||
tasks = []
|
||
print(f"正在扫描目录: {input_root} ...")
|
||
for file in input_root.rglob("*"):
|
||
if file.name.startswith("._"):
|
||
continue
|
||
if file.suffix.lower() in VIDEO_EXTS:
|
||
rel_path = file.relative_to(input_root)
|
||
out_file = output_root / rel_path.with_suffix(".mp4")
|
||
tasks.append((file, out_file))
|
||
|
||
if not tasks:
|
||
print("未发现匹配的视频文件。")
|
||
return
|
||
|
||
print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...")
|
||
if process_name:
|
||
_set_proc_title(process_name)
|
||
print(f"进程别名: {process_name}")
|
||
|
||
# 初始化进度条
|
||
# position=0: 总进度;position=1..N: 各 worker 当前文件进度
|
||
overall_bar = tqdm(
|
||
total=len(tasks),
|
||
desc="总进度",
|
||
position=0,
|
||
leave=True,
|
||
unit="文件",
|
||
)
|
||
|
||
slot_queue: queue.Queue[int] = queue.Queue()
|
||
slot_bars: list[tqdm] = []
|
||
for i in range(max_workers):
|
||
bar = tqdm(
|
||
total=1,
|
||
desc=" [空闲]",
|
||
position=i + 1,
|
||
leave=True,
|
||
unit="s",
|
||
bar_format=SLOT_BAR_FORMAT,
|
||
)
|
||
slot_bars.append(bar)
|
||
slot_queue.put(i)
|
||
|
||
# 并行执行转码
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
futures = [
|
||
executor.submit(
|
||
transcode_one_file,
|
||
task, overall_bar, slot_bars, slot_queue, process_name,
|
||
)
|
||
for task in tasks
|
||
]
|
||
for f in as_completed(futures):
|
||
try:
|
||
f.result()
|
||
except Exception as e:
|
||
tqdm.write(f"[未捕获异常] {e}")
|
||
|
||
for bar in slot_bars:
|
||
bar.close()
|
||
overall_bar.close()
|
||
|
||
print("\n任务全部完成!")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|