From e1fe8215441ddebc2f818d4b6bd8da80232839f9 Mon Sep 17 00:00:00 2001 From: FlintyLemming Date: Sat, 5 Apr 2025 00:18:15 +0800 Subject: [PATCH] Create llm-api-benchmark --- llm-api-benchmark/readme.md | 126 +++++++ llm-api-benchmark/vllm_benchmark.py | 565 ++++++++++++++++++++++++++++ 2 files changed, 691 insertions(+) create mode 100644 llm-api-benchmark/readme.md create mode 100644 llm-api-benchmark/vllm_benchmark.py diff --git a/llm-api-benchmark/readme.md b/llm-api-benchmark/readme.md new file mode 100644 index 0000000..94aad9f --- /dev/null +++ b/llm-api-benchmark/readme.md @@ -0,0 +1,126 @@ +# LLM API 性能基准测试脚本 + +## 概述 + +本 Python 脚本旨在使用 `asyncio` 和 `aiohttp` 对 vLLM(或兼容 OpenAI API 格式的)服务器进行性能基准测试。它的核心目标是通过解决常见的基准测试陷阱,生成更真实、更稳健的性能评估结果。 + +该脚本通过测试服务器在不同**输入提示长度**和**并发级别**下的表现,帮助了解服务器在不同负载条件下的扩展性。 + +## 主要特点 + +此脚本包含多项特性,以确保基准测试结果更具参考价值: + +1. **通过多样化提示避免缓存 (Cache Avoidance):** + * 针对每个测试的输入 Token 长度,脚本会生成一批*独特*的提示。 + * 每个提示都包含一个随机的唯一标识符 (`[会话ID: ...]`),并结合了不同的主题句和随机元素。 + * 在并发测试期间,尽可能为不同的并发请求分配不同的提示。 + * 这显著降低了命中服务器端 KV 缓存优化的可能性,从而更好地衡量原始生成性能,而非缓存读取速度。 + +2. **控制最小输出长度 (Controlled Output Length):** + * 脚本通过增强提示(例如加入:“请针对上述内容提供详细分析和见解...”)明确指示语言模型生成详细、较长的回复。 + * 利用 `max_tokens` 参数,并尝试使用 `min_tokens` 参数(如果 API 端点支持),以鼓励模型产生足够长的输出。 + * 这可以防止因模型快速生成极短、无意义回复而导致 `Token/秒` 指标虚高。测试重点在于衡量有意义的生成任务的吞吐量。 + +3. **可变输入提示长度 (Variable Input Lengths):** + * 基准测试可以配置为在一系列不同的输入提示 Token 长度(例如 `[10, 100, 500, 1000, 2000]`)上运行。 + * 测试不同的输入大小至关重要,因为 LLM 的性能(尤其是首个 Token 生成时间和整体吞吐量)会因所提供的上下文大小而显著变化。这避免了仅测试极短提示(例如 < 50 Tokens)可能带来的误导性结果。 + +4. **异步并发请求:** 使用 `asyncio` 和 `aiohttp` 高效处理高并发请求。 +5. **可配置参数:** 通过命令行参数轻松配置 API 端点、模型名称、API 密钥、Token 长度、并发级别和输出 Token 限制。 +6. **详细结果输出:** 以多种格式保存结果: + * 原始 CSV (`benchmark_results.csv`) + * 便于比较的透视表 (`benchmark_total_tokens_per_second.csv`, `benchmark_avg_tokens_per_second.csv`) + * 整体 JSON 摘要 (`benchmark_results.json`) + * 用于细粒度分析和调试的单个请求详情 (`requests/` 子目录中的 JSON 文件) + * 测试配置 (`test_config.json`) + * 生成的提示 (`prompts_length_*.txt`) +7. **侧重输出的吞吐量指标:** 主要基于生成的**输出 Tokens** 计算 `tokens_per_second`,更清晰地衡量模型的生成速度。同时报告了总吞吐量(所有并发请求)和平均每请求吞吐量。 +8. **可选性能图表:** 如果安装了 `matplotlib`,则生成可视化性能与并发关系的图表。 +9. **分词器灵活性:** 如果 `tiktoken` 可用,则使用它进行精确的 Token 计数;否则回退到基于字符的近似计数。 + +## 环境要求 + +* Python 3.7+ +* 所需的 Python 库: + * `aiohttp` + * `pandas` + * `tiktoken` (可选,强烈推荐用于精确 Token 计数) + * `matplotlib` (可选,用于生成性能图表) + +## 安装 + +克隆仓库或下载脚本文件。然后安装必要的库: + +```bash +pip install aiohttp pandas tiktoken matplotlib +``` + +# 或者,不安装可选库: +# pip install aiohttp pandas + +## 配置 + +您可以在脚本顶部直接修改配置参数,或者更灵活地通过命令行参数进行配置。关键参数包括: + +* `API_BASE`: 您的 vLLM 或兼容 API 端点的基础 URL (例如 `http://localhost:8000/v1`)。 +* `MODEL_NAME`: 部署的模型名称 (例如 `DeepSeek-V3-0324`)。 +* `API_KEY`: 您的 API 密钥 (如果服务器需要)。 + +## 使用方法 + +在终端中运行脚本: `python your_script_name.py` + +**命令行参数:** + +* `--token-lengths`: 要测试的输入 Token 长度列表 (例如 `--token-lengths 100 500 1000`)。默认: `[10, 100, 500, 1000, 2000]` +* `--concurrency-levels`: 要测试的并发级别(同时请求数)列表 (例如 `--concurrency-levels 1 4 8 16`)。默认: `[1, 2, 4, 8, 16, 32]` +* `--max-output-tokens`: 每个请求生成的最大 Token 数。默认: `500` +* `--min-output-tokens`: 每个请求期望生成的最小 Token 数(用于提示增强,如果 API 支持也作为参数)。默认: `300` +* `--prompts-per-length`: 为每个 Token 长度生成多少个不同的提示。默认: `50` +* `--api-base`: 覆盖默认的 API 基础 URL。 +* `--model`: 覆盖默认的模型名称。 +* `--api-key`: 覆盖默认的 API 密钥。 +* `--result-dir`: 指定保存结果的目录 (否则将创建一个带时间戳的目录)。 + +**示例:** + +```bash +python benchmark_vllm.py \ + --token-lengths 100 500 1000 2000 \ + --concurrency-levels 1 4 8 16 32 64 \ + --max-output-tokens 1024 \ + --min-output-tokens 500 \ + --api-base http://10.10.10.10:8000/v1 \ + --model DeepSeek-V3-0324 \ + --api-key "your-optional-api-key" \ + --result-dir ./my_benchmark_run_1 +``` + +## 输出内容 + +脚本将创建一个结果目录(由 `--result-dir` 指定或名为 `benchmark_results_YYYYMMDD_HHMMSS`)。在此目录中,您将找到: + +* `benchmark_results.csv`: 每个输入长度和并发组合的详细结果。 +* `benchmark_total_tokens_per_second.csv`: 显示总输出 Tokens/秒 的透视表。 +* `benchmark_avg_tokens_per_second.csv`: 显示平均每请求输出 Tokens/秒 的透视表。 +* `benchmark_results.json`: JSON 格式的所有结果数据。 +* `test_config.json`: 本次基准测试运行使用的配置参数。 +* `tokens_per_second_metrics_info.txt`: 关于 Tokens/秒 指标如何计算的说明。 +* `prompts_length_*.txt`: 为每个输入长度生成并用于测试的实际提示文本。 +* `requests/`: 一个子目录,包含每个 `token_length_X` 和 `concurrency_Y` 的嵌套文件夹,其中包含每个 API 请求的详细日志(单个 `request_*.json` 文件)。 +* `*.png` 文件 (如果安装了 `matplotlib`): 可视化图表: + * 总输出 Tokens/秒 vs. 并发数 + * 平均每请求输出 Tokens/秒 vs. 并发数 + * 成功率 vs. 并发数 + +## 关键指标说明 + +* **`total_tokens_per_second` (总吞吐量)**: 计算方式为(所有成功的并发请求生成的 `output_tokens` 之和)/(整个并发批次的总耗时)。这代表了服务器在该特定负载下的整体处理能力。 +* **`avg_tokens_per_second` (平均每请求吞吐量)**: 计算方式为批次内每个成功请求的 (`output_tokens` / `elapsed_time`) 的平均值。这代表了在该负载下单个客户端请求所体验到的平均生成速度。 +* **注意:** 默认情况下,这两个指标都侧重于 **输出 Tokens**,因为这通常反映了用户感知的生成速度。API 的原始响应(保存在单个请求的 JSON 文件 `full_response` 中)也包含输入 Token 计数 (`usage.prompt_tokens`)。 + +## 注意事项 + +* Token 计数的准确性依赖于 API 服务器在响应的 `usage` 字段中正确返回 `prompt_tokens` 和 `completion_tokens`。 +* `min_tokens` 参数的有效性取决于目标 API 端点是否支持它。提示增强是作为一种备用策略。 +* 如果未安装 `tiktoken`,Token 计数将基于字符长度进行近似估算,这对于非英语文本或代码来说准确性较低。 \ No newline at end of file diff --git a/llm-api-benchmark/vllm_benchmark.py b/llm-api-benchmark/vllm_benchmark.py new file mode 100644 index 0000000..76487ae --- /dev/null +++ b/llm-api-benchmark/vllm_benchmark.py @@ -0,0 +1,565 @@ +import asyncio +import time +import random +import string +import argparse +import pandas as pd +import aiohttp +import json +import os +from datetime import datetime +import uuid + +try: + import tiktoken + tokenizer = tiktoken.get_encoding("cl100k_base") + HAS_TOKENIZER = True +except: + print("警告: tiktoken未安装,将使用近似token计数方法") + HAS_TOKENIZER = False + +# API配置 +API_BASE = "http://10.0.24.40:8000/v1" +MODEL_NAME = "DeepSeek-V3-0324" +API_KEY = "1111" + +# 计算文本token数 +def count_tokens(text): + if HAS_TOKENIZER: + return len(tokenizer.encode(text)) + else: + # 近似:英文文本约4个字符/token + return len(text) // 4 + +# 生成指定token长度的文本(会引导模型生成更长的回复) +def generate_text_with_target_tokens(target_tokens): + # 基础主题(用于多样性) + topics = [ + "人工智能的发展已经改变了各行各业的工作方式和效率。", + "气候变化对全球生态系统带来了重大挑战。", + "太空探索不断揭示我们宇宙的新见解。", + "医学研究的进步大大提高了人类寿命。", + "全球经济在数字时代面临机遇与挑战。", + "文化交流一直在塑造人类历史上的社会。", + "教育系统不断适应不断变化的技术环境。", + "生物多样性保护仍然是重要的环境优先事项。", + "城市规划必须解决可持续性和人口增长问题。", + "意识哲学提出了关于存在的基本问题。" + ] + + # 从随机主题开始 + text = random.choice(topics) + + # 添加唯一标识符以防止KV缓存命中 + unique_id = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + text = f"[会话ID: {unique_id}] {text}" + + current_tokens = count_tokens(text) + + # 用于扩展提示的附加句子 + sentences = [ + "这种观点近年来受到了广泛关注。", + "研究人员继续探索这一现象的各个方面。", + "其影响超出了直接环境。", + "多种因素导致了这种复杂的动态。", + "历史先例为这个问题提供了宝贵的见解。", + "关于这个话题已经出现了不同的观点。", + "长期影响仍然是持续辩论的主题。", + "技术创新加速了这一领域的发展。", + "政策考虑必须考虑到各利益相关者的利益。", + "实证证据表明有几种可能的解释。", + "理论框架提供了结构化的理解方法。", + "实际应用继续随着新发现而发展。", + "与其他领域的交叉创造了有趣的协同效应。", + "文化视角为这一讨论增添了重要维度。", + "经济因素在塑造结果方面起着重要作用。", + "道德考虑在决策过程中仍然至关重要。", + "全球背景影响了这个问题的地方表现。", + "系统分析揭示了不易察觉的模式。", + "替代方法已经成为可行的策略。", + "平衡相互竞争的优先事项仍然是一个挑战。" + ] + + # 添加句子直到达到目标token计数 + while current_tokens < target_tokens: + # 添加随机句子 + sentence = random.choice(sentences) + text += " " + sentence + + # 偶尔添加一些随机数字或文本以增加多样性 + if random.random() < 0.3: + random_element = f" 案例{random.randint(1000, 9999)}: 示例类型{random.choice(string.ascii_uppercase)}。" + text += random_element + + current_tokens = count_tokens(text) + + return text, current_tokens + +# 创建指定token长度的多样化提示 +def create_diverse_prompts(token_length, count=10): + prompts = [] + actual_lengths = [] + + for _ in range(count): + text, actual_length = generate_text_with_target_tokens(token_length) + prompts.append(text) + actual_lengths.append(actual_length) + + avg_length = sum(actual_lengths) / len(actual_lengths) if actual_lengths else 0 + print(f" 创建了{len(prompts)}个提示,平均token长度: {avg_length:.1f}") + + return prompts + +# 创建结果目录 +def create_results_directory(base_dir="benchmark_results"): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + result_dir = f"{base_dir}_{timestamp}" + + if not os.path.exists(result_dir): + os.makedirs(result_dir) + print(f"创建结果目录: {result_dir}") + + # 为各类结果创建子目录 + requests_dir = os.path.join(result_dir, "requests") + if not os.path.exists(requests_dir): + os.makedirs(requests_dir) + + return result_dir, requests_dir + +# 向vLLM服务器发送API请求 +async def make_request(session, prompt, request_id, max_tokens=500, min_tokens=300): + url = f"{API_BASE}/completions" + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {API_KEY}" + } + + # 在提示末尾添加指令,要求模型生成详细回复 + instruction = "\n\n请针对上述内容提供详细分析和见解,至少500字的深入回应。包括多个方面的讨论、可能的影响、历史背景和未来展望。" + augmented_prompt = prompt + instruction + + payload = { + "model": MODEL_NAME, + "prompt": augmented_prompt, + "max_tokens": max(max_tokens, 500), # 确保max_tokens足够大 + "temperature": 0.7, + "min_tokens": 300, # 尝试使用min_tokens (如果API支持) + "stop": None, # 不设置停止标记 + } + + request_start_time = time.time() + + try: + async with session.post(url, headers=headers, json=payload) as response: + response_content = await response.text() + request_end_time = time.time() + + try: + response_json = json.loads(response_content) + + if response.status == 200: + input_tokens = response_json.get("usage", {}).get("prompt_tokens", 0) + output_tokens = response_json.get("usage", {}).get("completion_tokens", 0) + total_tokens = input_tokens + output_tokens + elapsed_time = request_end_time - request_start_time + + # 计算吞吐量 - 只考虑输出tokens + tokens_per_second = output_tokens / elapsed_time if elapsed_time > 0 else 0 + + # 同时保存总吞吐量(用于参考) + total_tokens_per_second = total_tokens / elapsed_time if elapsed_time > 0 else 0 + + # 获取响应文本 + completion_text = response_json.get("choices", [{}])[0].get("text", "") + + result = { + "request_id": request_id, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "total_tokens": total_tokens, + "elapsed_time": elapsed_time, + "tokens_per_second": tokens_per_second, # 只考虑输出tokens的速率 + "total_tokens_per_second": total_tokens_per_second, # 总tokens速率(参考) + "prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt, + "completion": completion_text[:200] + "..." if len(completion_text) > 200 else completion_text, + "status": "success", + "timestamp": datetime.now().isoformat(), + "full_response": response_json + } + else: + error_msg = str(response_json) + print(f"错误 {response.status}: {error_msg[:100]}...") + result = { + "request_id": request_id, + "status": "error", + "error_code": response.status, + "error": error_msg, + "elapsed_time": request_end_time - request_start_time, + "timestamp": datetime.now().isoformat(), + "prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt + } + except json.JSONDecodeError: + result = { + "request_id": request_id, + "status": "error", + "error_code": response.status, + "error": "JSON解析错误", + "raw_response": response_content[:500] + "..." if len(response_content) > 500 else response_content, + "elapsed_time": request_end_time - request_start_time, + "timestamp": datetime.now().isoformat(), + "prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt + } + except Exception as e: + result = { + "request_id": request_id, + "status": "exception", + "error": str(e), + "elapsed_time": time.time() - request_start_time, + "timestamp": datetime.now().isoformat(), + "prompt": prompt[:200] + "..." if len(prompt) > 200 else prompt + } + print(f"请求异常: {str(e)}") + + return result + +# 保存单个请求的结果 +def save_request_result(result, requests_dir): + request_id = result["request_id"] + file_path = os.path.join(requests_dir, f"request_{request_id}.json") + + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + return file_path + +# 运行并发测试 +async def run_concurrent_tests(prompts, concurrency, requests_dir, max_tokens=500, min_tokens=300): + if len(prompts) < concurrency: + print(f"警告: 提示数量({len(prompts)})不足以满足并发级别({concurrency})") + + print(f" 开始执行{concurrency}个并发请求...") + test_start_time = time.time() + + async with aiohttp.ClientSession() as session: + tasks = [] + for i in range(concurrency): + # 确保为每个并发请求使用不同的提示 + prompt_idx = i % len(prompts) + # 为每个请求生成唯一ID + request_id = str(uuid.uuid4()) + tasks.append(make_request(session, prompts[prompt_idx], request_id, max_tokens, min_tokens)) + + # 等待所有请求完成 + results = await asyncio.gather(*tasks) + + test_end_time = time.time() + total_test_time = test_end_time - test_start_time + + print(f" 所有{concurrency}个请求已完成,总耗时: {total_test_time:.2f}秒") + + # 保存每个请求的详细结果 + for result in results: + save_request_result(result, requests_dir) + + # 过滤出成功的请求 + successful_results = [r for r in results if r.get("status") == "success"] + + if not successful_results: + return { + "concurrency": concurrency, + "total_tokens_per_second": 0, + "avg_tokens_per_second": 0, + "success_rate": 0, + "total_tokens": 0, + "total_output_tokens": 0, + "max_elapsed_time": 0, + "avg_elapsed_time": 0, + "total_test_time": total_test_time + } + + # 计算所有请求处理的总token数 + total_tokens = sum(r["total_tokens"] for r in successful_results) + total_output_tokens = sum(r["output_tokens"] for r in successful_results) + + # 计算最长请求和平均请求时间 + max_elapsed_time = max(r["elapsed_time"] for r in successful_results) + avg_elapsed_time = sum(r["elapsed_time"] for r in successful_results) / len(successful_results) + + # 计算总吞吐量(所有并发请求的每秒token数) + # 这里使用总测试时间(从第一个请求开始到最后一个请求结束) + total_tokens_per_second = total_output_tokens / total_test_time if total_test_time > 0 else 0 + + # 计算每个请求的平均吞吐量 - 只考虑输出tokens + # 这里使用每个请求的单独耗时 + avg_tokens_per_second = sum(r["tokens_per_second"] for r in successful_results) / len(successful_results) + + # 成功率 + success_rate = len(successful_results) / concurrency + + summary = { + "concurrency": concurrency, + "total_tokens": total_tokens, + "total_output_tokens": total_output_tokens, + "max_elapsed_time": max_elapsed_time, + "avg_elapsed_time": avg_elapsed_time, + "total_test_time": total_test_time, + "total_tokens_per_second": total_tokens_per_second, + "avg_tokens_per_second": avg_tokens_per_second, + "success_rate": success_rate, + "requests_completed": len(successful_results), + "requests_failed": concurrency - len(successful_results) + } + + # 将测试摘要保存为单独的文件 + summary_file = os.path.join(requests_dir, f"summary_concurrency_{concurrency}.json") + with open(summary_file, 'w', encoding='utf-8') as f: + json.dump(summary, f, ensure_ascii=False, indent=2) + + return summary + +# 主基准测试函数 +async def run_benchmark(input_token_lengths, concurrency_levels, result_dir, requests_dir, max_output_tokens=500, min_output_tokens=300, prompts_per_length=20): + results = [] + + for token_length in input_token_lengths: + print(f"\n===== 测试输入token长度: ~{token_length} =====") + + # 为这个token长度创建多样化提示 + # 生成比最大并发数更多的提示以确保多样性 + prompts = create_diverse_prompts(token_length, count=max(prompts_per_length, max(concurrency_levels))) + + # 保存生成的提示到文件 + prompts_file = os.path.join(result_dir, f"prompts_length_{token_length}.txt") + with open(prompts_file, 'w', encoding='utf-8') as f: + for i, prompt in enumerate(prompts): + f.write(f"--- Prompt {i+1} ---\n") + f.write(f"{prompt}\n\n") + + # 为这个token长度创建子目录 + token_dir = os.path.join(requests_dir, f"token_length_{token_length}") + if not os.path.exists(token_dir): + os.makedirs(token_dir) + + for concurrency in concurrency_levels: + print(f"\n----- 测试并发数: {concurrency} -----") + + # 创建并发级别子目录 + concurrency_dir = os.path.join(token_dir, f"concurrency_{concurrency}") + if not os.path.exists(concurrency_dir): + os.makedirs(concurrency_dir) + + # 使用此并发级别运行测试 + test_result = await run_concurrent_tests(prompts, concurrency, concurrency_dir, max_output_tokens, min_output_tokens) + test_result["input_token_length"] = token_length + results.append(test_result) + + print(f" 总tokens/s: {test_result['total_tokens_per_second']:.2f}") + print(f" 每请求平均tokens/s: {test_result['avg_tokens_per_second']:.2f}") + print(f" 成功率: {test_result['success_rate'] * 100:.1f}%") + print(f" 平均请求时间: {test_result['avg_elapsed_time']:.2f}秒") + + # 测试之间添加延迟,确保系统稳定 + print(f" 等待系统冷却10秒...") + await asyncio.sleep(10) + + return results + +# 将结果保存到文件 +def save_results(results, result_dir): + # 保存原始结果CSV + results_file = os.path.join(result_dir, "benchmark_results.csv") + df = pd.DataFrame(results) + df.to_csv(results_file, index=False) + print(f"\n结果已保存到 {results_file}") + + # 创建数据透视表 + total_pivot_file = os.path.join(result_dir, "benchmark_total_tokens_per_second.csv") + total_pivot = df.pivot(index="input_token_length", columns="concurrency", values="total_tokens_per_second") + total_pivot.to_csv(total_pivot_file) + + avg_pivot_file = os.path.join(result_dir, "benchmark_avg_tokens_per_second.csv") + avg_pivot = df.pivot(index="input_token_length", columns="concurrency", values="avg_tokens_per_second") + avg_pivot.to_csv(avg_pivot_file) + + # 另外保存一个文件,标明这些指标只考虑了输出tokens + with open(os.path.join(result_dir, "tokens_per_second_metrics_info.txt"), 'w', encoding='utf-8') as f: + f.write("注意:tokens_per_second指标只考虑了LLM生成的输出tokens,不包括输入tokens。\n") + f.write("total_tokens_per_second: 所有请求生成的总输出tokens / 测试总耗时\n") + f.write("avg_tokens_per_second: 每个请求的(输出tokens / 请求耗时)的平均值\n") + + # 保存JSON格式的结果 + json_file = os.path.join(result_dir, "benchmark_results.json") + with open(json_file, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + + print(f"汇总表已保存到:") + print(f"- {total_pivot_file}") + print(f"- {avg_pivot_file}") + print(f"- {json_file}") + + # 返回DataFrame以供进一步分析 + return df + +# 生成图表 +def generate_charts(df, result_dir, token_lengths, concurrency_levels): + try: + import matplotlib.pyplot as plt + + # 设置使用英文标题和标签,避免中文显示问题 + + # 绘制不同输入长度下总tokens/s与并发数的关系 + plt.figure(figsize=(12, 8)) + for token_length in token_lengths: + subset = df[df["input_token_length"] == token_length] + plt.plot(subset["concurrency"], subset["total_tokens_per_second"], + marker='o', label=f"Input Length: {token_length}") + + plt.xlabel('Concurrency') + plt.ylabel('Total Output Tokens/s') + plt.title('vLLM Performance: Total Output Tokens/s vs Concurrency') + plt.grid(True) + plt.legend() + chart_file = os.path.join(result_dir, 'total_tokens_per_second_vs_concurrency.png') + plt.savefig(chart_file) + + # 绘制不同输入长度下平均tokens/s与并发数的关系 + plt.figure(figsize=(12, 8)) + for token_length in token_lengths: + subset = df[df["input_token_length"] == token_length] + plt.plot(subset["concurrency"], subset["avg_tokens_per_second"], + marker='o', label=f"Input Length: {token_length}") + + plt.xlabel('Concurrency') + plt.ylabel('Average Output Tokens/s per Request') + plt.title('vLLM Performance: Average Output Tokens/s vs Concurrency') + plt.grid(True) + plt.legend() + avg_chart_file = os.path.join(result_dir, 'avg_tokens_per_second_vs_concurrency.png') + plt.savefig(avg_chart_file) + + # 绘制成功率图表 + plt.figure(figsize=(12, 8)) + for token_length in token_lengths: + subset = df[df["input_token_length"] == token_length] + plt.plot(subset["concurrency"], subset["success_rate"] * 100, + marker='o', label=f"Input Length: {token_length}") + + plt.xlabel('Concurrency') + plt.ylabel('Success Rate (%)') + plt.title('vLLM Performance: Request Success Rate vs Concurrency') + plt.grid(True) + plt.ylim(0, 105) # 设置y轴范围为0-105% + plt.legend() + success_chart_file = os.path.join(result_dir, 'success_rate_vs_concurrency.png') + plt.savefig(success_chart_file) + + print("\n性能图表已保存到:") + print(f"- {chart_file}") + print(f"- {avg_chart_file}") + print(f"- {success_chart_file}") + except ImportError: + print("\nMatplotlib不可用,跳过图表生成。") + +# 主函数 +async def main(): + # 声明全局变量 + global API_BASE, MODEL_NAME, API_KEY + + parser = argparse.ArgumentParser(description="vLLM服务器性能基准测试") + parser.add_argument("--token-lengths", type=int, nargs="+", default=[10, 100, 500, 1000, 2000], + help="要测试的输入token长度列表") + parser.add_argument("--concurrency-levels", type=int, nargs="+", default=[1, 2, 4, 8, 16, 32], + help="要测试的并发级别列表") + parser.add_argument("--max-output-tokens", type=int, default=500, + help="每个请求的最大输出token数") + parser.add_argument("--min-output-tokens", type=int, default=300, + help="每个请求的最小期望输出token数") + parser.add_argument("--prompts-per-length", type=int, default=50, + help="每个token长度生成的多样化提示数量") + parser.add_argument("--api-base", type=str, default=API_BASE, + help="vLLM API的基础URL") + parser.add_argument("--model", type=str, default=MODEL_NAME, + help="用于测试的模型名称") + parser.add_argument("--api-key", type=str, default=API_KEY, + help="用于认证的API密钥") + parser.add_argument("--result-dir", type=str, default=None, + help="结果输出目录 (默认为带时间戳的目录)") + + args = parser.parse_args() + + # 从命令行参数更新全局变量 + API_BASE = args.api_base + MODEL_NAME = args.model + API_KEY = args.api_key + + print(f"\n===== vLLM 性能基准测试 =====") + print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"API基础URL: {API_BASE}") + print(f"模型: {MODEL_NAME}") + print(f"输入token长度: {args.token_lengths}") + print(f"并发级别: {args.concurrency_levels}") + print(f"最大输出token数: {args.max_output_tokens}") + print(f"最小输出token数: {args.min_output_tokens}") + print(f"每长度提示数: {args.prompts_per_length}") + + # 创建结果目录 + if args.result_dir: + result_dir = args.result_dir + if not os.path.exists(result_dir): + os.makedirs(result_dir) + requests_dir = os.path.join(result_dir, "requests") + if not os.path.exists(requests_dir): + os.makedirs(requests_dir) + else: + result_dir, requests_dir = create_results_directory() + + print(f"结果将保存到: {result_dir}") + + # 保存测试配置 + config = { + "timestamp": datetime.now().isoformat(), + "api_base": API_BASE, + "model": MODEL_NAME, + "token_lengths": args.token_lengths, + "concurrency_levels": args.concurrency_levels, + "max_output_tokens": args.max_output_tokens, + "min_output_tokens": args.min_output_tokens, + "prompts_per_length": args.prompts_per_length + } + + config_file = os.path.join(result_dir, "test_config.json") + with open(config_file, 'w', encoding='utf-8') as f: + json.dump(config, f, ensure_ascii=False, indent=2) + + # 运行基准测试 + print("\n开始执行基准测试...\n") + results = await run_benchmark( + args.token_lengths, + args.concurrency_levels, + result_dir, + requests_dir, + args.max_output_tokens, + args.min_output_tokens, + args.prompts_per_length + ) + + # 保存并显示结果 + df = save_results(results, result_dir) + + # 打印汇总表 + print("\n按输入长度和并发级别的总tokens/s汇总 (仅计算输出tokens):") + total_summary = df.pivot(index="input_token_length", columns="concurrency", values="total_tokens_per_second") + print(total_summary.round(2)) + + print("\n按输入长度和并发级别的每请求平均tokens/s汇总 (仅计算输出tokens):") + avg_summary = df.pivot(index="input_token_length", columns="concurrency", values="avg_tokens_per_second") + print(avg_summary.round(2)) + + # 生成图表 + generate_charts(df, result_dir, args.token_lengths, args.concurrency_levels) + + print(f"\n基准测试已完成!所有结果已保存到: {result_dir}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file