Files
scripts-public/av1-transfer/main.py
FlintyLemming 48b4817b9d add script
2026-01-22 22:28:40 +08:00

121 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AV1 视频批量转码脚本
使用方法:
python main.py -i <输入目录> -o <输出目录> [-w <并行数>]
参数说明:
-i, --input 输入目录路径(必需)
-o, --output 输出目录路径(必需)
-w, --workers 并行任务数,默认为 4
示例:
python main.py -i /path/to/input -o /path/to/output
python main.py -i ./videos -o ./converted -w 2
"""
import argparse
import subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
# ================= 配置区域 =================
DEFAULT_WORKERS = 4 # 默认并行任务数
VIDEO_EXTS = {".mp4", ".mkv", ".mov", ".avi", ".flv", ".ts", ".webm"}
# ============================================
def transcode_one_file(file_info):
"""
单个文件转码函数
file_info: (输入完整路径, 输出完整路径)
"""
input_path, output_path = file_info
# 确保输出目录存在
output_path.parent.mkdir(parents=True, exist_ok=True)
# ffmpeg 命令
# -y: 覆盖已存在文件
# -loglevel error: 减少日志输出,避免干扰进度条
cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-c:v", "libsvtav1",
"-crf", "30",
"-preset", "6",
"-c:a", "copy",
str(output_path)
]
try:
# 使用 subprocess 运行,捕获 stderr 避免刷屏
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
return True
except subprocess.CalledProcessError as e:
print(f"\n[错误] 转码失败: {input_path}\n原因: {e.stderr.decode()}")
return False
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description="AV1 视频批量转码脚本",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"-i", "--input",
type=str,
required=True,
help="输入目录路径"
)
parser.add_argument(
"-o", "--output",
type=str,
required=True,
help="输出目录路径"
)
parser.add_argument(
"-w", "--workers",
type=int,
default=DEFAULT_WORKERS,
help=f"并行任务数(默认: {DEFAULT_WORKERS}"
)
return parser.parse_args()
def main():
args = parse_args()
input_root = Path(args.input).resolve()
output_root = Path(args.output).resolve()
max_workers = args.workers
if not input_root.exists():
print(f"错误: 输入目录 {input_root} 不存在")
return
# 1. 扫描所有待处理文件
tasks = []
print(f"正在扫描目录: {input_root} ...")
for file in input_root.rglob("*"):
if file.suffix.lower() in VIDEO_EXTS:
# 计算相对路径并更换后缀为 .mp4
rel_path = file.relative_to(input_root)
out_file = output_root / rel_path.with_suffix(".mp4")
tasks.append((file, out_file))
if not tasks:
print("未发现匹配的视频文件。")
return
print(f"共发现 {len(tasks)} 个文件,准备使用 {max_workers} 个并行任务进行转码...")
# 2. 使用线程池并行执行
# 注意ffmpeg 内部是多线程的max_workers 不宜设置得过大
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 使用 tqdm 显示总进度
list(tqdm(executor.map(transcode_one_file, tasks), total=len(tasks), desc="转码进度"))
print("\n任务全部完成!")
if __name__ == "__main__":
main()