Files
scripts-public/av1-transfer/main.py
FlintyLemming 3f5a69c7af ♻️ refactor(av1-transfer): remove process name obfuscation feature
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 11:45:43 +08:00

215 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AV1 视频批量转码脚本
使用方法:
python main.py -i <输入目录> -o <输出目录> [-w <并行数>]
参数说明:
-i, --input 输入目录路径(必需)
-o, --output 输出目录路径(必需)
-w, --workers 并行任务数,默认为 4
示例:
python main.py -i /path/to/input -o /path/to/output
python main.py -i ./videos -o ./converted -w 2
"""
import argparse
import queue
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
# ================= 配置区域 =================
DEFAULT_WORKERS = 4
VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"}
# ============================================
# 每个 worker 进度条的格式
SLOT_BAR_FORMAT = "{desc:<50} {percentage:3.0f}%|{bar}| {n:.0f}/{total:.0f}s"
def get_duration(input_path):
"""用 ffprobe 获取视频时长(秒),失败返回 None"""
try:
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(input_path),
],
capture_output=True,
text=True,
)
return float(result.stdout.strip())
except Exception:
return None
def transcode_one_file(file_info, overall_bar, slot_bars, slot_queue):
"""
单个文件转码,实时更新对应 slot 的进度条。
file_info: (输入完整路径, 输出完整路径)
"""
input_path, output_path = file_info
slot = slot_queue.get()
bar = slot_bars[slot]
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
duration = get_duration(input_path)
total_secs = int(duration) if duration else 1
bar.reset(total=total_secs)
short_name = input_path.name
if len(short_name) > 45:
short_name = short_name[:42] + "..."
bar.set_description(f" {short_name}")
cmd = [
"ffmpeg", "-y",
"-progress", "pipe:1",
"-loglevel", "error",
"-i", str(input_path),
"-c:v", "libsvtav1",
"-crf", "30",
"-preset", "6",
"-pix_fmt", "yuv420p10le",
"-c:a", "copy",
str(output_path),
]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# 实时解析 -progress 输出out_time_us 字段,单位微秒)
last_secs = 0.0
for raw_line in proc.stdout:
line = raw_line.decode(errors="replace").strip()
if line.startswith("out_time_us="):
try:
us = int(line.split("=", 1)[1])
if duration and us > 0:
cur_secs = us / 1_000_000
delta = cur_secs - last_secs
if delta > 0:
delta = min(delta, bar.total - bar.n)
bar.update(delta)
last_secs = cur_secs
except (ValueError, IndexError):
pass
proc.wait()
if proc.returncode != 0:
err = proc.stderr.read().decode(errors="replace")
tqdm.write(f"\n[错误] 转码失败: {input_path}\n{err}")
return False
return True
except Exception as e:
tqdm.write(f"\n[异常] {input_path}: {e}")
return False
finally:
bar.set_description(" [空闲]")
bar.reset(total=1)
overall_bar.update(1)
slot_queue.put(slot)
def parse_args():
parser = argparse.ArgumentParser(
description="AV1 视频批量转码脚本",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("-i", "--input", type=str, required=True, help="输入目录路径")
parser.add_argument("-o", "--output", type=str, required=True, help="输出目录路径")
parser.add_argument(
"-w", "--workers",
type=int,
default=DEFAULT_WORKERS,
help=f"并行任务数(默认: {DEFAULT_WORKERS}",
)
return parser.parse_args()
def main():
args = parse_args()
input_root = Path(args.input).resolve()
output_root = Path(args.output).resolve()
max_workers = args.workers
if not input_root.exists():
print(f"错误: 输入目录 {input_root} 不存在")
return
tasks = []
print(f"正在扫描目录: {input_root} ...")
for file in input_root.rglob("*"):
if file.name.startswith("._"):
continue
if file.suffix.lower() in VIDEO_EXTS:
rel_path = file.relative_to(input_root)
out_file = output_root / rel_path.with_suffix(".mp4")
tasks.append((file, out_file))
if not tasks:
print("未发现匹配的视频文件。")
return
print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...")
overall_bar = tqdm(
total=len(tasks),
desc="总进度",
position=0,
leave=True,
unit="文件",
)
slot_queue: queue.Queue[int] = queue.Queue()
slot_bars: list[tqdm] = []
for i in range(max_workers):
bar = tqdm(
total=1,
desc=" [空闲]",
position=i + 1,
leave=True,
unit="s",
bar_format=SLOT_BAR_FORMAT,
)
slot_bars.append(bar)
slot_queue.put(i)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(transcode_one_file, task, overall_bar, slot_bars, slot_queue)
for task in tasks
]
for f in as_completed(futures):
try:
f.result()
except Exception as e:
tqdm.write(f"[未捕获异常] {e}")
for bar in slot_bars:
bar.close()
overall_bar.close()
print("\n任务全部完成!")
if __name__ == "__main__":
main()