import asyncio import time import random import string import argparse import pandas as pd import aiohttp import json import os from datetime import datetime import uuid try: import tiktoken tokenizer = tiktoken.get_encoding("cl100k_base") HAS_TOKENIZER = True except: print("警告: tiktoken未安装,将使用近似token计数方法") HAS_TOKENIZER = False # API配置 API_BASE = "http://10.0.24.40:8000/v1" MODEL_NAME = "DeepSeek-V3-0324" API_KEY = "1111" # 计算文本token数 def count_tokens(text): if HAS_TOKENIZER: return len(tokenizer.encode(text)) else: # 近似:英文文本约4个字符/token return len(text) // 4 # 生成指定token长度的文本(会引导模型生成更长的回复) def generate_text_with_target_tokens(target_tokens): # 基础主题(用于多样性) topics = [ "人工智能的发展已经改变了各行各业的工作方式和效率。", "气候变化对全球生态系统带来了重大挑战。", "太空探索不断揭示我们宇宙的新见解。", "医学研究的进步大大提高了人类寿命。", "全球经济在数字时代面临机遇与挑战。", "文化交流一直在塑造人类历史上的社会。", "教育系统不断适应不断变化的技术环境。", "生物多样性保护仍然是重要的环境优先事项。", "城市规划必须解决可持续性和人口增长问题。", "意识哲学提出了关于存在的基本问题。" ] # 从随机主题开始 text = random.choice(topics) # 添加唯一标识符以防止KV缓存命中 unique_id = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) text = f"[会话ID: {unique_id}] {text}" current_tokens = count_tokens(text) # 用于扩展提示的附加句子 sentences = [ "这种观点近年来受到了广泛关注。", "研究人员继续探索这一现象的各个方面。", "其影响超出了直接环境。", "多种因素导致了这种复杂的动态。", "历史先例为这个问题提供了宝贵的见解。", "关于这个话题已经出现了不同的观点。", "长期影响仍然是持续辩论的主题。", "技术创新加速了这一领域的发展。", "政策考虑必须考虑到各利益相关者的利益。", "实证证据表明有几种可能的解释。", "理论框架提供了结构化的理解方法。", "实际应用继续随着新发现而发展。", "与其他领域的交叉创造了有趣的协同效应。", "文化视角为这一讨论增添了重要维度。", "经济因素在塑造结果方面起着重要作用。", "道德考虑在决策过程中仍然至关重要。", "全球背景影响了这个问题的地方表现。", "系统分析揭示了不易察觉的模式。", "替代方法已经成为可行的策略。", "平衡相互竞争的优先事项仍然是一个挑战。" ] # 添加句子直到达到目标token计数 while current_tokens < target_tokens: # 添加随机句子 sentence = random.choice(sentences) text += " " + sentence # 偶尔添加一些随机数字或文本以增加多样性 if random.random() < 0.3: random_element = f" 案例{random.randint(1000, 9999)}: 示例类型{random.choice(string.ascii_uppercase)}。" text += random_element current_tokens = count_tokens(text) return text, current_tokens # 创建指定token长度的多样化提示 def create_diverse_prompts(token_length, count=10): prompts = [] actual_lengths = [] for _ in range(count): text, actual_length = generate_text_with_target_tokens(token_length) prompts.append(text) actual_lengths.append(actual_length) avg_length = sum(actual_lengths) / len(actual_lengths) if actual_lengths else 0 print(f" 创建了{len(prompts)}个提示,平均token长度: {avg_length:.1f}") return prompts # 创建结果目录 def create_results_directory(base_dir="benchmark_results"): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") result_dir = f"{base_dir}_{timestamp}" if not os.path.exists(result_dir): os.makedirs(result_dir) print(f"创建结果目录: {result_dir}") # 为各类结果创建子目录 requests_dir = os.path.join(result_dir, "requests") if not os.path.exists(requests_dir): os.makedirs(requests_dir) return result_dir, requests_dir # 向vLLM服务器发送API请求 async def make_request(session, prompt, request_id, max_tokens=500, min_tokens=300): url = f"{API_BASE}/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}" } # 在提示末尾添加指令,要求模型生成详细回复 instruction = "\n\n请针对上述内容提供详细分析和见解,至少500字的深入回应。包括多个方面的讨论、可能的影响、历史背景和未来展望。" augmented_prompt = prompt + instruction payload = { "model": MODEL_NAME, "prompt": augmented_prompt, "max_tokens": max(max_tokens, 500), # 确保max_tokens足够大 "temperature": 0.7, "min_tokens": 300, # 尝试使用min_tokens (如果API支持) "stop": None, # 不设置停止标记 } request_start_time = time.time() try: async with session.post(url, headers=headers, json=payload) as response: response_content = await response.text() request_end_time = time.time() try: response_json = json.loads(response_content) if response.status == 200: input_tokens = response_json.get("usage", {}).get("prompt_tokens", 0) output_tokens = response_json.get("usage", {}).get("completion_tokens", 0) total_tokens = input_tokens + output_tokens elapsed_time = request_end_time - request_start_time # 计算吞吐量 - 只考虑输出tokens tokens_per_second = output_tokens / elapsed_time if elapsed_time > 0 else 0 # 同时保存总吞吐量(用于参考) total_tokens_per_second = total_tokens / elapsed_time if elapsed_time > 0 else 0 # 获取响应文本 completion_text = response_json.get("choices", [{}])[0].get("text", "") result = { "request_id": request_id, "input_tokens": input_tokens, "output_tokens": output_tokens, "total_tokens": total_tokens, "elapsed_time": elapsed_time, "tokens_per_second": tokens_per_second, # 只考虑输出tokens的速率 "total_tokens_per_second": total_tokens_per_second, # 总tokens速率(参考) "prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt, "completion": completion_text[:200] + "..." if len(completion_text) > 200 else completion_text, "status": "success", "timestamp": datetime.now().isoformat(), "full_response": response_json } else: error_msg = str(response_json) print(f"错误 {response.status}: {error_msg[:100]}...") result = { "request_id": request_id, "status": "error", "error_code": response.status, "error": error_msg, "elapsed_time": request_end_time - request_start_time, "timestamp": datetime.now().isoformat(), "prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt } except json.JSONDecodeError: result = { "request_id": request_id, "status": "error", "error_code": response.status, "error": "JSON解析错误", "raw_response": response_content[:500] + "..." if len(response_content) > 500 else response_content, "elapsed_time": request_end_time - request_start_time, "timestamp": datetime.now().isoformat(), "prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt } except Exception as e: result = { "request_id": request_id, "status": "exception", "error": str(e), "elapsed_time": time.time() - request_start_time, "timestamp": datetime.now().isoformat(), "prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt } print(f"请求异常: {str(e)}") return result # 保存单个请求的结果 def save_request_result(result, requests_dir): request_id = result["request_id"] file_path = os.path.join(requests_dir, f"request_{request_id}.json") with open(file_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) return file_path # 运行并发测试 async def run_concurrent_tests(prompts, concurrency, requests_dir, max_tokens=500, min_tokens=300): if len(prompts) < concurrency: print(f"警告: 提示数量({len(prompts)})不足以满足并发级别({concurrency})") print(f" 开始执行{concurrency}个并发请求...") test_start_time = time.time() async with aiohttp.ClientSession() as session: tasks = [] for i in range(concurrency): # 确保为每个并发请求使用不同的提示 prompt_idx = i % len(prompts) # 为每个请求生成唯一ID request_id = str(uuid.uuid4()) tasks.append(make_request(session, prompts[prompt_idx], request_id, max_tokens, min_tokens)) # 等待所有请求完成 results = await asyncio.gather(*tasks) test_end_time = time.time() total_test_time = test_end_time - test_start_time print(f" 所有{concurrency}个请求已完成,总耗时: {total_test_time:.2f}秒") # 保存每个请求的详细结果 for result in results: save_request_result(result, requests_dir) # 过滤出成功的请求 successful_results = [r for r in results if r.get("status") == "success"] if not successful_results: return { "concurrency": concurrency, "total_tokens_per_second": 0, "avg_tokens_per_second": 0, "success_rate": 0, "total_tokens": 0, "total_output_tokens": 0, "max_elapsed_time": 0, "avg_elapsed_time": 0, "total_test_time": total_test_time } # 计算所有请求处理的总token数 total_tokens = sum(r["total_tokens"] for r in successful_results) total_output_tokens = sum(r["output_tokens"] for r in successful_results) # 计算最长请求和平均请求时间 max_elapsed_time = max(r["elapsed_time"] for r in successful_results) avg_elapsed_time = sum(r["elapsed_time"] for r in successful_results) / len(successful_results) # 计算总吞吐量(所有并发请求的每秒token数) # 这里使用总测试时间(从第一个请求开始到最后一个请求结束) total_tokens_per_second = total_output_tokens / total_test_time if total_test_time > 0 else 0 # 计算每个请求的平均吞吐量 - 只考虑输出tokens # 这里使用每个请求的单独耗时 avg_tokens_per_second = sum(r["tokens_per_second"] for r in successful_results) / len(successful_results) # 成功率 success_rate = len(successful_results) / concurrency summary = { "concurrency": concurrency, "total_tokens": total_tokens, "total_output_tokens": total_output_tokens, "max_elapsed_time": max_elapsed_time, "avg_elapsed_time": avg_elapsed_time, "total_test_time": total_test_time, "total_tokens_per_second": total_tokens_per_second, "avg_tokens_per_second": avg_tokens_per_second, "success_rate": success_rate, "requests_completed": len(successful_results), "requests_failed": concurrency - len(successful_results) } # 将测试摘要保存为单独的文件 summary_file = os.path.join(requests_dir, f"summary_concurrency_{concurrency}.json") with open(summary_file, 'w', encoding='utf-8') as f: json.dump(summary, f, ensure_ascii=False, indent=2) return summary # 主基准测试函数 async def run_benchmark(input_token_lengths, concurrency_levels, result_dir, requests_dir, max_output_tokens=500, min_output_tokens=300, prompts_per_length=20): results = [] for token_length in input_token_lengths: print(f"\n===== 测试输入token长度: ~{token_length} =====") # 为这个token长度创建多样化提示 # 生成比最大并发数更多的提示以确保多样性 prompts = create_diverse_prompts(token_length, count=max(prompts_per_length, max(concurrency_levels))) # 保存生成的提示到文件 prompts_file = os.path.join(result_dir, f"prompts_length_{token_length}.txt") with open(prompts_file, 'w', encoding='utf-8') as f: for i, prompt in enumerate(prompts): f.write(f"--- Prompt {i+1} ---\n") f.write(f"{prompt}\n\n") # 为这个token长度创建子目录 token_dir = os.path.join(requests_dir, f"token_length_{token_length}") if not os.path.exists(token_dir): os.makedirs(token_dir) for concurrency in concurrency_levels: print(f"\n----- 测试并发数: {concurrency} -----") # 创建并发级别子目录 concurrency_dir = os.path.join(token_dir, f"concurrency_{concurrency}") if not os.path.exists(concurrency_dir): os.makedirs(concurrency_dir) # 使用此并发级别运行测试 test_result = await run_concurrent_tests(prompts, concurrency, concurrency_dir, max_output_tokens, min_output_tokens) test_result["input_token_length"] = token_length results.append(test_result) print(f" 总tokens/s: {test_result['total_tokens_per_second']:.2f}") print(f" 每请求平均tokens/s: {test_result['avg_tokens_per_second']:.2f}") print(f" 成功率: {test_result['success_rate'] * 100:.1f}%") print(f" 平均请求时间: {test_result['avg_elapsed_time']:.2f}秒") # 测试之间添加延迟,确保系统稳定 print(f" 等待系统冷却10秒...") await asyncio.sleep(10) return results # 将结果保存到文件 def save_results(results, result_dir): # 保存原始结果CSV results_file = os.path.join(result_dir, "benchmark_results.csv") df = pd.DataFrame(results) df.to_csv(results_file, index=False) print(f"\n结果已保存到 {results_file}") # 创建数据透视表 total_pivot_file = os.path.join(result_dir, "benchmark_total_tokens_per_second.csv") total_pivot = df.pivot(index="input_token_length", columns="concurrency", values="total_tokens_per_second") total_pivot.to_csv(total_pivot_file) avg_pivot_file = os.path.join(result_dir, "benchmark_avg_tokens_per_second.csv") avg_pivot = df.pivot(index="input_token_length", columns="concurrency", values="avg_tokens_per_second") avg_pivot.to_csv(avg_pivot_file) # 另外保存一个文件,标明这些指标只考虑了输出tokens with open(os.path.join(result_dir, "tokens_per_second_metrics_info.txt"), 'w', encoding='utf-8') as f: f.write("注意:tokens_per_second指标只考虑了LLM生成的输出tokens,不包括输入tokens。\n") f.write("total_tokens_per_second: 所有请求生成的总输出tokens / 测试总耗时\n") f.write("avg_tokens_per_second: 每个请求的(输出tokens / 请求耗时)的平均值\n") # 保存JSON格式的结果 json_file = os.path.join(result_dir, "benchmark_results.json") with open(json_file, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"汇总表已保存到:") print(f"- {total_pivot_file}") print(f"- {avg_pivot_file}") print(f"- {json_file}") # 返回DataFrame以供进一步分析 return df # 生成图表 def generate_charts(df, result_dir, token_lengths, concurrency_levels): try: import matplotlib.pyplot as plt # 设置使用英文标题和标签,避免中文显示问题 # 绘制不同输入长度下总tokens/s与并发数的关系 plt.figure(figsize=(12, 8)) for token_length in token_lengths: subset = df[df["input_token_length"] == token_length] plt.plot(subset["concurrency"], subset["total_tokens_per_second"], marker='o', label=f"Input Length: {token_length}") plt.xlabel('Concurrency') plt.ylabel('Total Output Tokens/s') plt.title('vLLM Performance: Total Output Tokens/s vs Concurrency') plt.grid(True) plt.legend() chart_file = os.path.join(result_dir, 'total_tokens_per_second_vs_concurrency.png') plt.savefig(chart_file) # 绘制不同输入长度下平均tokens/s与并发数的关系 plt.figure(figsize=(12, 8)) for token_length in token_lengths: subset = df[df["input_token_length"] == token_length] plt.plot(subset["concurrency"], subset["avg_tokens_per_second"], marker='o', label=f"Input Length: {token_length}") plt.xlabel('Concurrency') plt.ylabel('Average Output Tokens/s per Request') plt.title('vLLM Performance: Average Output Tokens/s vs Concurrency') plt.grid(True) plt.legend() avg_chart_file = os.path.join(result_dir, 'avg_tokens_per_second_vs_concurrency.png') plt.savefig(avg_chart_file) # 绘制成功率图表 plt.figure(figsize=(12, 8)) for token_length in token_lengths: subset = df[df["input_token_length"] == token_length] plt.plot(subset["concurrency"], subset["success_rate"] * 100, marker='o', label=f"Input Length: {token_length}") plt.xlabel('Concurrency') plt.ylabel('Success Rate (%)') plt.title('vLLM Performance: Request Success Rate vs Concurrency') plt.grid(True) plt.ylim(0, 105) # 设置y轴范围为0-105% plt.legend() success_chart_file = os.path.join(result_dir, 'success_rate_vs_concurrency.png') plt.savefig(success_chart_file) print("\n性能图表已保存到:") print(f"- {chart_file}") print(f"- {avg_chart_file}") print(f"- {success_chart_file}") except ImportError: print("\nMatplotlib不可用,跳过图表生成。") # 主函数 async def main(): # 声明全局变量 global API_BASE, MODEL_NAME, API_KEY parser = argparse.ArgumentParser(description="vLLM服务器性能基准测试") parser.add_argument("--token-lengths", type=int, nargs="+", default=[10, 100, 500, 1000, 2000], help="要测试的输入token长度列表") parser.add_argument("--concurrency-levels", type=int, nargs="+", default=[1, 2, 4, 8, 16, 32], help="要测试的并发级别列表") parser.add_argument("--max-output-tokens", type=int, default=500, help="每个请求的最大输出token数") parser.add_argument("--min-output-tokens", type=int, default=300, help="每个请求的最小期望输出token数") parser.add_argument("--prompts-per-length", type=int, default=50, help="每个token长度生成的多样化提示数量") parser.add_argument("--api-base", type=str, default=API_BASE, help="vLLM API的基础URL") parser.add_argument("--model", type=str, default=MODEL_NAME, help="用于测试的模型名称") parser.add_argument("--api-key", type=str, default=API_KEY, help="用于认证的API密钥") parser.add_argument("--result-dir", type=str, default=None, help="结果输出目录 (默认为带时间戳的目录)") args = parser.parse_args() # 从命令行参数更新全局变量 API_BASE = args.api_base MODEL_NAME = args.model API_KEY = args.api_key print(f"\n===== vLLM 性能基准测试 =====") print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"API基础URL: {API_BASE}") print(f"模型: {MODEL_NAME}") print(f"输入token长度: {args.token_lengths}") print(f"并发级别: {args.concurrency_levels}") print(f"最大输出token数: {args.max_output_tokens}") print(f"最小输出token数: {args.min_output_tokens}") print(f"每长度提示数: {args.prompts_per_length}") # 创建结果目录 if args.result_dir: result_dir = args.result_dir if not os.path.exists(result_dir): os.makedirs(result_dir) requests_dir = os.path.join(result_dir, "requests") if not os.path.exists(requests_dir): os.makedirs(requests_dir) else: result_dir, requests_dir = create_results_directory() print(f"结果将保存到: {result_dir}") # 保存测试配置 config = { "timestamp": datetime.now().isoformat(), "api_base": API_BASE, "model": MODEL_NAME, "token_lengths": args.token_lengths, "concurrency_levels": args.concurrency_levels, "max_output_tokens": args.max_output_tokens, "min_output_tokens": args.min_output_tokens, "prompts_per_length": args.prompts_per_length } config_file = os.path.join(result_dir, "test_config.json") with open(config_file, 'w', encoding='utf-8') as f: json.dump(config, f, ensure_ascii=False, indent=2) # 运行基准测试 print("\n开始执行基准测试...\n") results = await run_benchmark( args.token_lengths, args.concurrency_levels, result_dir, requests_dir, args.max_output_tokens, args.min_output_tokens, args.prompts_per_length ) # 保存并显示结果 df = save_results(results, result_dir) # 打印汇总表 print("\n按输入长度和并发级别的总tokens/s汇总 (仅计算输出tokens):") total_summary = df.pivot(index="input_token_length", columns="concurrency", values="total_tokens_per_second") print(total_summary.round(2)) print("\n按输入长度和并发级别的每请求平均tokens/s汇总 (仅计算输出tokens):") avg_summary = df.pivot(index="input_token_length", columns="concurrency", values="avg_tokens_per_second") print(avg_summary.round(2)) # 生成图表 generate_charts(df, result_dir, args.token_lengths, args.concurrency_levels) print(f"\n基准测试已完成!所有结果已保存到: {result_dir}") if __name__ == "__main__": asyncio.run(main())