♻️ refactor(av1-transfer): remove process name obfuscation feature
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,28 +2,21 @@
|
||||
AV1 视频批量转码脚本
|
||||
|
||||
使用方法:
|
||||
python main.py -i <输入目录> -o <输出目录> [-w <并行数>] [-n <进程别名>]
|
||||
python main.py -i <输入目录> -o <输出目录> [-w <并行数>]
|
||||
|
||||
参数说明:
|
||||
-i, --input 输入目录路径(必需)
|
||||
-o, --output 输出目录路径(必需)
|
||||
-w, --workers 并行任务数,默认为 4
|
||||
-n, --process-name 进程别名(同时重命名 Python 主进程和 ffmpeg 子进程)
|
||||
-i, --input 输入目录路径(必需)
|
||||
-o, --output 输出目录路径(必需)
|
||||
-w, --workers 并行任务数,默认为 4
|
||||
|
||||
示例:
|
||||
python main.py -i /path/to/input -o /path/to/output
|
||||
python main.py -i ./videos -o ./converted -w 2
|
||||
python main.py -i ./videos -o ./converted -w 2 -n my_stream_1
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
import os
|
||||
import queue
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
|
||||
@@ -38,42 +31,6 @@ VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"}
|
||||
SLOT_BAR_FORMAT = "{desc:<50} {percentage:3.0f}%|{bar}| {n:.0f}/{total:.0f}s"
|
||||
|
||||
|
||||
def _set_proc_title(title: str) -> None:
|
||||
"""
|
||||
通过两种方式彻底隐藏进程命令行:
|
||||
1. prctl(PR_SET_NAME) — 改 /proc/self/comm(top 默认显示的短名)
|
||||
2. 直接覆盖 argv 内存 — 改 /proc/self/cmdline(ps aux COMMAND 列)
|
||||
"""
|
||||
# 1. 改短名(/proc/self/comm,最多 15 字节)
|
||||
try:
|
||||
libc_name = ctypes.util.find_library("c")
|
||||
if libc_name:
|
||||
libc = ctypes.CDLL(libc_name, use_errno=True)
|
||||
libc.prctl(15, title.encode()[:15], 0, 0, 0) # PR_SET_NAME = 15
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2. 覆盖 argv 内存,清除 /proc/self/cmdline 里的参数
|
||||
try:
|
||||
argc = ctypes.c_int(0)
|
||||
argv = ctypes.POINTER(ctypes.c_char_p)()
|
||||
ctypes.pythonapi.Py_GetArgcArgv(ctypes.byref(argc), ctypes.byref(argv))
|
||||
if argc.value == 0:
|
||||
return
|
||||
# argv 字符串在内存中连续排列,计算总占用字节数
|
||||
start = ctypes.cast(argv[0], ctypes.c_void_p).value
|
||||
end = ctypes.cast(argv[argc.value - 1], ctypes.c_void_p).value
|
||||
end += len(argv[argc.value - 1]) + 1
|
||||
size = end - start
|
||||
# 用 title 填充开头,其余全部清零
|
||||
enc = title.encode()[:size - 1]
|
||||
buf = (ctypes.c_char * size).from_address(start)
|
||||
buf[:len(enc)] = enc
|
||||
buf[len(enc):] = b"\x00" * (size - len(enc))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def get_duration(input_path):
|
||||
"""用 ffprobe 获取视频时长(秒),失败返回 None"""
|
||||
try:
|
||||
@@ -92,14 +49,13 @@ def get_duration(input_path):
|
||||
return None
|
||||
|
||||
|
||||
def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_name):
|
||||
def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue):
|
||||
"""
|
||||
单个文件转码,实时更新对应 slot 的进度条。
|
||||
file_info: (输入完整路径, 输出完整路径)
|
||||
"""
|
||||
input_path, output_path = file_info
|
||||
|
||||
# 从队列中拿到一个空闲 slot
|
||||
slot = slot_queue.get()
|
||||
bar = slot_bars[slot]
|
||||
|
||||
@@ -115,10 +71,8 @@ def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_na
|
||||
short_name = short_name[:42] + "..."
|
||||
bar.set_description(f" {short_name}")
|
||||
|
||||
# 构建 ffmpeg 命令
|
||||
# -progress pipe:1 将结构化进度输出到 stdout(key=value 格式)
|
||||
ffmpeg_args = [
|
||||
"-y",
|
||||
cmd = [
|
||||
"ffmpeg", "-y",
|
||||
"-progress", "pipe:1",
|
||||
"-loglevel", "error",
|
||||
"-i", str(input_path),
|
||||
@@ -126,34 +80,15 @@ def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_na
|
||||
"-crf", "30",
|
||||
"-preset", "6",
|
||||
"-pix_fmt", "yuv420p10le",
|
||||
"-c:a", "copy",
|
||||
"-c:a", "copy",
|
||||
str(output_path),
|
||||
]
|
||||
|
||||
str_args = [str(a) for a in ffmpeg_args]
|
||||
if process_name:
|
||||
# 尝试在 fork 后、subprocess exec 之前用 os.execvp 替换进程(设 argv[0] 为别名)
|
||||
# 多线程环境下 GIL 可能导致 execvp 失败;失败时静默返回,
|
||||
# subprocess 会接着正常执行 ffmpeg,转码不受影响
|
||||
_pname = process_name
|
||||
_args = str_args
|
||||
def _try_exec():
|
||||
try:
|
||||
os.execvp("ffmpeg", [_pname] + _args)
|
||||
except Exception:
|
||||
pass
|
||||
proc = subprocess.Popen(
|
||||
["ffmpeg"] + str_args,
|
||||
preexec_fn=_try_exec,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
else:
|
||||
proc = subprocess.Popen(
|
||||
["ffmpeg"] + str_args,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
|
||||
# 实时解析 -progress 输出(out_time_us 字段,单位微秒)
|
||||
last_secs = 0.0
|
||||
@@ -166,7 +101,6 @@ def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_na
|
||||
cur_secs = us / 1_000_000
|
||||
delta = cur_secs - last_secs
|
||||
if delta > 0:
|
||||
# 避免浮点误差超出 total
|
||||
delta = min(delta, bar.total - bar.n)
|
||||
bar.update(delta)
|
||||
last_secs = cur_secs
|
||||
@@ -187,7 +121,6 @@ def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue, process_na
|
||||
return False
|
||||
|
||||
finally:
|
||||
# 重置 slot 进度条并归还 slot
|
||||
bar.set_description(" [空闲]")
|
||||
bar.reset(total=1)
|
||||
overall_bar.update(1)
|
||||
@@ -207,13 +140,6 @@ def parse_args():
|
||||
default=DEFAULT_WORKERS,
|
||||
help=f"并行任务数(默认: {DEFAULT_WORKERS})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-n", "--process-name",
|
||||
type=str,
|
||||
default=None,
|
||||
dest="process_name",
|
||||
help="ffmpeg 进程别名,用于混淆进程名(如 my_stream_1)",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
@@ -223,13 +149,11 @@ def main():
|
||||
input_root = Path(args.input).resolve()
|
||||
output_root = Path(args.output).resolve()
|
||||
max_workers = args.workers
|
||||
process_name = args.process_name
|
||||
|
||||
if not input_root.exists():
|
||||
print(f"错误: 输入目录 {input_root} 不存在")
|
||||
return
|
||||
|
||||
# 扫描所有待处理文件
|
||||
tasks = []
|
||||
print(f"正在扫描目录: {input_root} ...")
|
||||
for file in input_root.rglob("*"):
|
||||
@@ -245,12 +169,7 @@ def main():
|
||||
return
|
||||
|
||||
print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...")
|
||||
if process_name:
|
||||
_set_proc_title(process_name)
|
||||
print(f"进程别名: {process_name}")
|
||||
|
||||
# 初始化进度条
|
||||
# position=0: 总进度;position=1..N: 各 worker 当前文件进度
|
||||
overall_bar = tqdm(
|
||||
total=len(tasks),
|
||||
desc="总进度",
|
||||
@@ -273,13 +192,9 @@ def main():
|
||||
slot_bars.append(bar)
|
||||
slot_queue.put(i)
|
||||
|
||||
# 并行执行转码
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = [
|
||||
executor.submit(
|
||||
transcode_one_file,
|
||||
task, overall_bar, slot_bars, slot_queue, process_name,
|
||||
)
|
||||
executor.submit(transcode_one_file, task, overall_bar, slot_bars, slot_queue)
|
||||
for task in tasks
|
||||
]
|
||||
for f in as_completed(futures):
|
||||
|
||||
Reference in New Issue
Block a user