scripts-public/llm-api-benchmark/vllm_benchmark.py
2025-04-05 00:18:15 +08:00

565 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import time
import random
import string
import argparse
import pandas as pd
import aiohttp
import json
import os
from datetime import datetime
import uuid
try:
import tiktoken
tokenizer = tiktoken.get_encoding("cl100k_base")
HAS_TOKENIZER = True
except:
print("警告: tiktoken未安装将使用近似token计数方法")
HAS_TOKENIZER = False
# API配置
API_BASE = "http://10.0.24.40:8000/v1"
MODEL_NAME = "DeepSeek-V3-0324"
API_KEY = "1111"
# 计算文本token数
def count_tokens(text):
if HAS_TOKENIZER:
return len(tokenizer.encode(text))
else:
# 近似英文文本约4个字符/token
return len(text) // 4
# 生成指定token长度的文本会引导模型生成更长的回复
def generate_text_with_target_tokens(target_tokens):
# 基础主题(用于多样性)
topics = [
"人工智能的发展已经改变了各行各业的工作方式和效率。",
"气候变化对全球生态系统带来了重大挑战。",
"太空探索不断揭示我们宇宙的新见解。",
"医学研究的进步大大提高了人类寿命。",
"全球经济在数字时代面临机遇与挑战。",
"文化交流一直在塑造人类历史上的社会。",
"教育系统不断适应不断变化的技术环境。",
"生物多样性保护仍然是重要的环境优先事项。",
"城市规划必须解决可持续性和人口增长问题。",
"意识哲学提出了关于存在的基本问题。"
]
# 从随机主题开始
text = random.choice(topics)
# 添加唯一标识符以防止KV缓存命中
unique_id = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
text = f"[会话ID: {unique_id}] {text}"
current_tokens = count_tokens(text)
# 用于扩展提示的附加句子
sentences = [
"这种观点近年来受到了广泛关注。",
"研究人员继续探索这一现象的各个方面。",
"其影响超出了直接环境。",
"多种因素导致了这种复杂的动态。",
"历史先例为这个问题提供了宝贵的见解。",
"关于这个话题已经出现了不同的观点。",
"长期影响仍然是持续辩论的主题。",
"技术创新加速了这一领域的发展。",
"政策考虑必须考虑到各利益相关者的利益。",
"实证证据表明有几种可能的解释。",
"理论框架提供了结构化的理解方法。",
"实际应用继续随着新发现而发展。",
"与其他领域的交叉创造了有趣的协同效应。",
"文化视角为这一讨论增添了重要维度。",
"经济因素在塑造结果方面起着重要作用。",
"道德考虑在决策过程中仍然至关重要。",
"全球背景影响了这个问题的地方表现。",
"系统分析揭示了不易察觉的模式。",
"替代方法已经成为可行的策略。",
"平衡相互竞争的优先事项仍然是一个挑战。"
]
# 添加句子直到达到目标token计数
while current_tokens < target_tokens:
# 添加随机句子
sentence = random.choice(sentences)
text += " " + sentence
# 偶尔添加一些随机数字或文本以增加多样性
if random.random() < 0.3:
random_element = f" 案例{random.randint(1000, 9999)}: 示例类型{random.choice(string.ascii_uppercase)}"
text += random_element
current_tokens = count_tokens(text)
return text, current_tokens
# 创建指定token长度的多样化提示
def create_diverse_prompts(token_length, count=10):
prompts = []
actual_lengths = []
for _ in range(count):
text, actual_length = generate_text_with_target_tokens(token_length)
prompts.append(text)
actual_lengths.append(actual_length)
avg_length = sum(actual_lengths) / len(actual_lengths) if actual_lengths else 0
print(f" 创建了{len(prompts)}个提示平均token长度: {avg_length:.1f}")
return prompts
# 创建结果目录
def create_results_directory(base_dir="benchmark_results"):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
result_dir = f"{base_dir}_{timestamp}"
if not os.path.exists(result_dir):
os.makedirs(result_dir)
print(f"创建结果目录: {result_dir}")
# 为各类结果创建子目录
requests_dir = os.path.join(result_dir, "requests")
if not os.path.exists(requests_dir):
os.makedirs(requests_dir)
return result_dir, requests_dir
# 向vLLM服务器发送API请求
async def make_request(session, prompt, request_id, max_tokens=500, min_tokens=300):
url = f"{API_BASE}/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}"
}
# 在提示末尾添加指令,要求模型生成详细回复
instruction = "\n\n请针对上述内容提供详细分析和见解至少500字的深入回应。包括多个方面的讨论、可能的影响、历史背景和未来展望。"
augmented_prompt = prompt + instruction
payload = {
"model": MODEL_NAME,
"prompt": augmented_prompt,
"max_tokens": max(max_tokens, 500), # 确保max_tokens足够大
"temperature": 0.7,
"min_tokens": 300, # 尝试使用min_tokens (如果API支持)
"stop": None, # 不设置停止标记
}
request_start_time = time.time()
try:
async with session.post(url, headers=headers, json=payload) as response:
response_content = await response.text()
request_end_time = time.time()
try:
response_json = json.loads(response_content)
if response.status == 200:
input_tokens = response_json.get("usage", {}).get("prompt_tokens", 0)
output_tokens = response_json.get("usage", {}).get("completion_tokens", 0)
total_tokens = input_tokens + output_tokens
elapsed_time = request_end_time - request_start_time
# 计算吞吐量 - 只考虑输出tokens
tokens_per_second = output_tokens / elapsed_time if elapsed_time > 0 else 0
# 同时保存总吞吐量(用于参考)
total_tokens_per_second = total_tokens / elapsed_time if elapsed_time > 0 else 0
# 获取响应文本
completion_text = response_json.get("choices", [{}])[0].get("text", "")
result = {
"request_id": request_id,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"elapsed_time": elapsed_time,
"tokens_per_second": tokens_per_second, # 只考虑输出tokens的速率
"total_tokens_per_second": total_tokens_per_second, # 总tokens速率参考
"prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt,
"completion": completion_text[:200] + "..." if len(completion_text) > 200 else completion_text,
"status": "success",
"timestamp": datetime.now().isoformat(),
"full_response": response_json
}
else:
error_msg = str(response_json)
print(f"错误 {response.status}: {error_msg[:100]}...")
result = {
"request_id": request_id,
"status": "error",
"error_code": response.status,
"error": error_msg,
"elapsed_time": request_end_time - request_start_time,
"timestamp": datetime.now().isoformat(),
"prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt
}
except json.JSONDecodeError:
result = {
"request_id": request_id,
"status": "error",
"error_code": response.status,
"error": "JSON解析错误",
"raw_response": response_content[:500] + "..." if len(response_content) > 500 else response_content,
"elapsed_time": request_end_time - request_start_time,
"timestamp": datetime.now().isoformat(),
"prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt
}
except Exception as e:
result = {
"request_id": request_id,
"status": "exception",
"error": str(e),
"elapsed_time": time.time() - request_start_time,
"timestamp": datetime.now().isoformat(),
"prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt
}
print(f"请求异常: {str(e)}")
return result
# 保存单个请求的结果
def save_request_result(result, requests_dir):
request_id = result["request_id"]
file_path = os.path.join(requests_dir, f"request_{request_id}.json")
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
return file_path
# 运行并发测试
async def run_concurrent_tests(prompts, concurrency, requests_dir, max_tokens=500, min_tokens=300):
if len(prompts) < concurrency:
print(f"警告: 提示数量({len(prompts)})不足以满足并发级别({concurrency})")
print(f" 开始执行{concurrency}个并发请求...")
test_start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(concurrency):
# 确保为每个并发请求使用不同的提示
prompt_idx = i % len(prompts)
# 为每个请求生成唯一ID
request_id = str(uuid.uuid4())
tasks.append(make_request(session, prompts[prompt_idx], request_id, max_tokens, min_tokens))
# 等待所有请求完成
results = await asyncio.gather(*tasks)
test_end_time = time.time()
total_test_time = test_end_time - test_start_time
print(f" 所有{concurrency}个请求已完成,总耗时: {total_test_time:.2f}")
# 保存每个请求的详细结果
for result in results:
save_request_result(result, requests_dir)
# 过滤出成功的请求
successful_results = [r for r in results if r.get("status") == "success"]
if not successful_results:
return {
"concurrency": concurrency,
"total_tokens_per_second": 0,
"avg_tokens_per_second": 0,
"success_rate": 0,
"total_tokens": 0,
"total_output_tokens": 0,
"max_elapsed_time": 0,
"avg_elapsed_time": 0,
"total_test_time": total_test_time
}
# 计算所有请求处理的总token数
total_tokens = sum(r["total_tokens"] for r in successful_results)
total_output_tokens = sum(r["output_tokens"] for r in successful_results)
# 计算最长请求和平均请求时间
max_elapsed_time = max(r["elapsed_time"] for r in successful_results)
avg_elapsed_time = sum(r["elapsed_time"] for r in successful_results) / len(successful_results)
# 计算总吞吐量所有并发请求的每秒token数
# 这里使用总测试时间(从第一个请求开始到最后一个请求结束)
total_tokens_per_second = total_output_tokens / total_test_time if total_test_time > 0 else 0
# 计算每个请求的平均吞吐量 - 只考虑输出tokens
# 这里使用每个请求的单独耗时
avg_tokens_per_second = sum(r["tokens_per_second"] for r in successful_results) / len(successful_results)
# 成功率
success_rate = len(successful_results) / concurrency
summary = {
"concurrency": concurrency,
"total_tokens": total_tokens,
"total_output_tokens": total_output_tokens,
"max_elapsed_time": max_elapsed_time,
"avg_elapsed_time": avg_elapsed_time,
"total_test_time": total_test_time,
"total_tokens_per_second": total_tokens_per_second,
"avg_tokens_per_second": avg_tokens_per_second,
"success_rate": success_rate,
"requests_completed": len(successful_results),
"requests_failed": concurrency - len(successful_results)
}
# 将测试摘要保存为单独的文件
summary_file = os.path.join(requests_dir, f"summary_concurrency_{concurrency}.json")
with open(summary_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
return summary
# 主基准测试函数
async def run_benchmark(input_token_lengths, concurrency_levels, result_dir, requests_dir, max_output_tokens=500, min_output_tokens=300, prompts_per_length=20):
results = []
for token_length in input_token_lengths:
print(f"\n===== 测试输入token长度: ~{token_length} =====")
# 为这个token长度创建多样化提示
# 生成比最大并发数更多的提示以确保多样性
prompts = create_diverse_prompts(token_length, count=max(prompts_per_length, max(concurrency_levels)))
# 保存生成的提示到文件
prompts_file = os.path.join(result_dir, f"prompts_length_{token_length}.txt")
with open(prompts_file, 'w', encoding='utf-8') as f:
for i, prompt in enumerate(prompts):
f.write(f"--- Prompt {i+1} ---\n")
f.write(f"{prompt}\n\n")
# 为这个token长度创建子目录
token_dir = os.path.join(requests_dir, f"token_length_{token_length}")
if not os.path.exists(token_dir):
os.makedirs(token_dir)
for concurrency in concurrency_levels:
print(f"\n----- 测试并发数: {concurrency} -----")
# 创建并发级别子目录
concurrency_dir = os.path.join(token_dir, f"concurrency_{concurrency}")
if not os.path.exists(concurrency_dir):
os.makedirs(concurrency_dir)
# 使用此并发级别运行测试
test_result = await run_concurrent_tests(prompts, concurrency, concurrency_dir, max_output_tokens, min_output_tokens)
test_result["input_token_length"] = token_length
results.append(test_result)
print(f" 总tokens/s: {test_result['total_tokens_per_second']:.2f}")
print(f" 每请求平均tokens/s: {test_result['avg_tokens_per_second']:.2f}")
print(f" 成功率: {test_result['success_rate'] * 100:.1f}%")
print(f" 平均请求时间: {test_result['avg_elapsed_time']:.2f}")
# 测试之间添加延迟,确保系统稳定
print(f" 等待系统冷却10秒...")
await asyncio.sleep(10)
return results
# 将结果保存到文件
def save_results(results, result_dir):
# 保存原始结果CSV
results_file = os.path.join(result_dir, "benchmark_results.csv")
df = pd.DataFrame(results)
df.to_csv(results_file, index=False)
print(f"\n结果已保存到 {results_file}")
# 创建数据透视表
total_pivot_file = os.path.join(result_dir, "benchmark_total_tokens_per_second.csv")
total_pivot = df.pivot(index="input_token_length", columns="concurrency", values="total_tokens_per_second")
total_pivot.to_csv(total_pivot_file)
avg_pivot_file = os.path.join(result_dir, "benchmark_avg_tokens_per_second.csv")
avg_pivot = df.pivot(index="input_token_length", columns="concurrency", values="avg_tokens_per_second")
avg_pivot.to_csv(avg_pivot_file)
# 另外保存一个文件标明这些指标只考虑了输出tokens
with open(os.path.join(result_dir, "tokens_per_second_metrics_info.txt"), 'w', encoding='utf-8') as f:
f.write("注意tokens_per_second指标只考虑了LLM生成的输出tokens不包括输入tokens。\n")
f.write("total_tokens_per_second: 所有请求生成的总输出tokens / 测试总耗时\n")
f.write("avg_tokens_per_second: 每个请求的(输出tokens / 请求耗时)的平均值\n")
# 保存JSON格式的结果
json_file = os.path.join(result_dir, "benchmark_results.json")
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"汇总表已保存到:")
print(f"- {total_pivot_file}")
print(f"- {avg_pivot_file}")
print(f"- {json_file}")
# 返回DataFrame以供进一步分析
return df
# 生成图表
def generate_charts(df, result_dir, token_lengths, concurrency_levels):
try:
import matplotlib.pyplot as plt
# 设置使用英文标题和标签,避免中文显示问题
# 绘制不同输入长度下总tokens/s与并发数的关系
plt.figure(figsize=(12, 8))
for token_length in token_lengths:
subset = df[df["input_token_length"] == token_length]
plt.plot(subset["concurrency"], subset["total_tokens_per_second"],
marker='o', label=f"Input Length: {token_length}")
plt.xlabel('Concurrency')
plt.ylabel('Total Output Tokens/s')
plt.title('vLLM Performance: Total Output Tokens/s vs Concurrency')
plt.grid(True)
plt.legend()
chart_file = os.path.join(result_dir, 'total_tokens_per_second_vs_concurrency.png')
plt.savefig(chart_file)
# 绘制不同输入长度下平均tokens/s与并发数的关系
plt.figure(figsize=(12, 8))
for token_length in token_lengths:
subset = df[df["input_token_length"] == token_length]
plt.plot(subset["concurrency"], subset["avg_tokens_per_second"],
marker='o', label=f"Input Length: {token_length}")
plt.xlabel('Concurrency')
plt.ylabel('Average Output Tokens/s per Request')
plt.title('vLLM Performance: Average Output Tokens/s vs Concurrency')
plt.grid(True)
plt.legend()
avg_chart_file = os.path.join(result_dir, 'avg_tokens_per_second_vs_concurrency.png')
plt.savefig(avg_chart_file)
# 绘制成功率图表
plt.figure(figsize=(12, 8))
for token_length in token_lengths:
subset = df[df["input_token_length"] == token_length]
plt.plot(subset["concurrency"], subset["success_rate"] * 100,
marker='o', label=f"Input Length: {token_length}")
plt.xlabel('Concurrency')
plt.ylabel('Success Rate (%)')
plt.title('vLLM Performance: Request Success Rate vs Concurrency')
plt.grid(True)
plt.ylim(0, 105) # 设置y轴范围为0-105%
plt.legend()
success_chart_file = os.path.join(result_dir, 'success_rate_vs_concurrency.png')
plt.savefig(success_chart_file)
print("\n性能图表已保存到:")
print(f"- {chart_file}")
print(f"- {avg_chart_file}")
print(f"- {success_chart_file}")
except ImportError:
print("\nMatplotlib不可用跳过图表生成。")
# 主函数
async def main():
# 声明全局变量
global API_BASE, MODEL_NAME, API_KEY
parser = argparse.ArgumentParser(description="vLLM服务器性能基准测试")
parser.add_argument("--token-lengths", type=int, nargs="+", default=[10, 100, 500, 1000, 2000],
help="要测试的输入token长度列表")
parser.add_argument("--concurrency-levels", type=int, nargs="+", default=[1, 2, 4, 8, 16, 32],
help="要测试的并发级别列表")
parser.add_argument("--max-output-tokens", type=int, default=500,
help="每个请求的最大输出token数")
parser.add_argument("--min-output-tokens", type=int, default=300,
help="每个请求的最小期望输出token数")
parser.add_argument("--prompts-per-length", type=int, default=50,
help="每个token长度生成的多样化提示数量")
parser.add_argument("--api-base", type=str, default=API_BASE,
help="vLLM API的基础URL")
parser.add_argument("--model", type=str, default=MODEL_NAME,
help="用于测试的模型名称")
parser.add_argument("--api-key", type=str, default=API_KEY,
help="用于认证的API密钥")
parser.add_argument("--result-dir", type=str, default=None,
help="结果输出目录 (默认为带时间戳的目录)")
args = parser.parse_args()
# 从命令行参数更新全局变量
API_BASE = args.api_base
MODEL_NAME = args.model
API_KEY = args.api_key
print(f"\n===== vLLM 性能基准测试 =====")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"API基础URL: {API_BASE}")
print(f"模型: {MODEL_NAME}")
print(f"输入token长度: {args.token_lengths}")
print(f"并发级别: {args.concurrency_levels}")
print(f"最大输出token数: {args.max_output_tokens}")
print(f"最小输出token数: {args.min_output_tokens}")
print(f"每长度提示数: {args.prompts_per_length}")
# 创建结果目录
if args.result_dir:
result_dir = args.result_dir
if not os.path.exists(result_dir):
os.makedirs(result_dir)
requests_dir = os.path.join(result_dir, "requests")
if not os.path.exists(requests_dir):
os.makedirs(requests_dir)
else:
result_dir, requests_dir = create_results_directory()
print(f"结果将保存到: {result_dir}")
# 保存测试配置
config = {
"timestamp": datetime.now().isoformat(),
"api_base": API_BASE,
"model": MODEL_NAME,
"token_lengths": args.token_lengths,
"concurrency_levels": args.concurrency_levels,
"max_output_tokens": args.max_output_tokens,
"min_output_tokens": args.min_output_tokens,
"prompts_per_length": args.prompts_per_length
}
config_file = os.path.join(result_dir, "test_config.json")
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# 运行基准测试
print("\n开始执行基准测试...\n")
results = await run_benchmark(
args.token_lengths,
args.concurrency_levels,
result_dir,
requests_dir,
args.max_output_tokens,
args.min_output_tokens,
args.prompts_per_length
)
# 保存并显示结果
df = save_results(results, result_dir)
# 打印汇总表
print("\n按输入长度和并发级别的总tokens/s汇总 (仅计算输出tokens):")
total_summary = df.pivot(index="input_token_length", columns="concurrency", values="total_tokens_per_second")
print(total_summary.round(2))
print("\n按输入长度和并发级别的每请求平均tokens/s汇总 (仅计算输出tokens):")
avg_summary = df.pivot(index="input_token_length", columns="concurrency", values="avg_tokens_per_second")
print(avg_summary.round(2))
# 生成图表
generate_charts(df, result_dir, args.token_lengths, args.concurrency_levels)
print(f"\n基准测试已完成!所有结果已保存到: {result_dir}")
if __name__ == "__main__":
asyncio.run(main())