Nanbeige4.1-3B vLLM批量推理教程:支持16并发请求的吞吐压测方案

张开发
2026/4/14 0:47:08 15 分钟阅读

分享文章

Nanbeige4.1-3B vLLM批量推理教程:支持16并发请求的吞吐压测方案
Nanbeige4.1-3B vLLM批量推理教程支持16并发请求的吞吐压测方案1. 引言为什么需要关注批量推理性能如果你已经成功部署了Nanbeige4.1-3B模型并且通过Chainlit前端验证了它的基础对话能力那么恭喜你你已经迈出了第一步。但接下来一个更实际的问题摆在我们面前这个部署好的模型在实际应用中能承受多大的访问压力想象一下这样的场景你的应用上线后突然有几十个用户同时提问模型会不会卡顿响应时间会不会从几秒变成几十秒这就是我们今天要解决的问题——如何对部署好的模型进行批量推理压力测试确保它能在真实的生产环境中稳定运行。本文将带你从零开始为使用vLLM部署的Nanbeige4.1-3B模型设计并实施一套支持16个并发请求的吞吐量压测方案。无论你是开发者、运维工程师还是对模型性能感兴趣的技术爱好者都能通过本教程掌握实用的压测技能。2. 压测前的准备工作2.1 确认你的部署环境在开始压测之前我们需要先确认几个关键信息。请打开你的WebShell执行以下命令# 查看模型服务日志确认服务状态 cat /root/workspace/llm.log | tail -20你应该能看到类似这样的输出表明vLLM服务正在运行INFO 01-01 10:00:00 llm_engine.py:200] Initializing an LLM engine... INFO 01-01 10:00:05 llm_engine.py:350] Model loaded successfully. INFO 01-01 10:00:05 api_server.py:100] Starting API server on port 8000...2.2 了解vLLM的API接口vLLM部署后会提供一个标准的OpenAI兼容的API接口这是我们进行压测的基础。主要用到的接口有两个健康检查接口GET /health文本生成接口POST /v1/completions你可以先手动测试一下接口是否正常# 测试健康检查 curl http://localhost:8000/health # 测试单次文本生成 curl http://localhost:8000/v1/completions \ -H Content-Type: application/json \ -d { model: nanbeige-4.1-3b, prompt: Hello, how are you?, max_tokens: 50 }如果两个请求都能正常返回说明API服务运行正常可以开始准备压测了。2.3 安装压测工具我们将使用Python的asyncio和aiohttp库来编写并发请求脚本。如果你的环境还没有这些库需要先安装pip install aiohttp httpx numpy pandas3. 设计16并发压测方案3.1 理解并发压测的核心概念在开始写代码之前我们先搞清楚几个关键概念并发数同时向服务器发送的请求数量。我们目标是16并发。吞吐量单位时间内服务器处理的请求数量通常用请求/秒或令牌/秒来衡量。延迟从发送请求到收到完整响应的时间。思考时间在真实场景中用户不会毫不停歇地发送请求所以我们需要模拟用户思考的间隔。3.2 设计压测脚本的结构我们的压测脚本需要包含以下几个部分请求生成器创建不同长度和内容的测试提示词并发控制器控制16个请求同时发送结果收集器记录每个请求的响应时间、成功/失败状态数据分析器计算吞吐量、平均延迟、错误率等关键指标3.3 准备测试数据集为了模拟真实场景我们需要准备一些有代表性的测试提示词。这里我准备了一个简单的提示词生成函数import random def generate_test_prompts(num_prompts100): 生成测试用的提示词列表 # 基础问题模板 templates [ 请用中文回答{}, 请详细解释一下{}, 如何{}请给出具体步骤。, 比较一下{}和{}的优缺点。, 写一篇关于{}的短文字数在200字左右。 ] # 主题词库 topics [ 人工智能, 机器学习, 深度学习, 自然语言处理, 计算机视觉, 大语言模型, 神经网络, 强化学习, Python编程, 数据科学, 算法设计, 软件开发 ] prompts [] for i in range(num_prompts): template random.choice(templates) if {} in template: # 根据模板中的占位符数量填充主题 placeholders template.count({}) selected_topics random.sample(topics, min(placeholders, len(topics))) prompt template.format(*selected_topics) else: prompt template prompts.append(prompt) return prompts4. 实现16并发压测脚本4.1 完整的压测脚本代码下面是一个完整的16并发压测脚本你可以直接复制使用import asyncio import aiohttp import time import json import random from typing import List, Dict, Any import numpy as np import pandas as pd from datetime import datetime class VLLMPressureTester: vLLM模型压力测试器 def __init__(self, base_url: str http://localhost:8000, model_name: str nanbeige-4.1-3b): self.base_url base_url self.model_name model_name self.results [] self.lock asyncio.Lock() async def send_single_request(self, session: aiohttp.ClientSession, prompt: str, request_id: int) - Dict[str, Any]: 发送单个请求到vLLM API start_time time.time() status success error_msg tokens_generated 0 try: # 构建请求数据 payload { model: self.model_name, prompt: prompt, max_tokens: random.randint(50, 150), # 随机生成50-150个token temperature: 0.7, top_p: 0.9, frequency_penalty: 0.1, presence_penalty: 0.1 } # 发送请求 async with session.post( f{self.base_url}/v1/completions, jsonpayload, timeoutaiohttp.ClientTimeout(total30) # 30秒超时 ) as response: end_time time.time() latency end_time - start_time if response.status 200: result await response.json() tokens_generated len(result[choices][0][text].split()) return { request_id: request_id, status: success, latency: latency, tokens: tokens_generated, prompt_length: len(prompt), start_time: start_time, end_time: end_time } else: error_msg fHTTP {response.status}: {await response.text()} status failed except asyncio.TimeoutError: status timeout error_msg 请求超时30秒 except Exception as e: status error error_msg str(e) end_time time.time() latency end_time - start_time return { request_id: request_id, status: status, latency: latency, tokens: 0, prompt_length: len(prompt), start_time: start_time, end_time: end_time, error: error_msg } async def concurrent_requests(self, prompts: List[str], concurrency: int 16) - List[Dict[str, Any]]: 并发发送多个请求 # 创建连接池 connector aiohttp.TCPConnector(limitconcurrency * 2) async with aiohttp.ClientSession(connectorconnector) as session: # 创建任务列表 tasks [] for i, prompt in enumerate(prompts[:concurrency]): # 只取前concurrency个提示词 task self.send_single_request(session, prompt, i) tasks.append(task) # 等待所有任务完成 results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果 valid_results [] for i, result in enumerate(results): if isinstance(result, Exception): valid_results.append({ request_id: i, status: exception, latency: 0, tokens: 0, prompt_length: len(prompts[i]), error: str(result) }) else: valid_results.append(result) return valid_results def calculate_metrics(self, results: List[Dict[str, Any]]) - Dict[str, Any]: 计算性能指标 successful_results [r for r in results if r[status] success] failed_results [r for r in results if r[status] ! success] if not successful_results: return { total_requests: len(results), successful_requests: 0, failed_requests: len(failed_results), success_rate: 0.0, avg_latency: 0, min_latency: 0, max_latency: 0, throughput: 0, avg_tokens_per_second: 0 } # 计算延迟统计 latencies [r[latency] for r in successful_results] avg_latency np.mean(latencies) min_latency np.min(latencies) max_latency np.max(latencies) # 计算吞吐量请求/秒 start_times [r[start_time] for r in successful_results] end_times [r[end_time] for r in successful_results] total_time max(end_times) - min(start_times) if total_time 0: throughput len(successful_results) / total_time else: throughput 0 # 计算令牌生成速度 total_tokens sum(r[tokens] for r in successful_results) total_latency sum(r[latency] for r in successful_results) if total_latency 0: avg_tokens_per_second total_tokens / total_latency else: avg_tokens_per_second 0 return { total_requests: len(results), successful_requests: len(successful_results), failed_requests: len(failed_results), success_rate: len(successful_results) / len(results), avg_latency: avg_latency, min_latency: min_latency, max_latency: max_latency, throughput: throughput, avg_tokens_per_second: avg_tokens_per_second, total_tokens_generated: total_tokens } async def run_pressure_test(self, num_requests: int 100, concurrency: int 16, think_time: float 0.1): 运行完整的压力测试 print(f开始压力测试总请求数{num_requests}并发数{concurrency}) print( * 60) # 生成测试提示词 prompts generate_test_prompts(num_requests) all_results [] metrics_history [] # 分批发送请求模拟真实场景 for batch_start in range(0, num_requests, concurrency): batch_end min(batch_start concurrency, num_requests) batch_prompts prompts[batch_start:batch_end] print(f发送批次 {batch_start//concurrency 1}: 请求 {batch_start1}-{batch_end}) # 发送并发请求 batch_results await self.concurrent_requests(batch_prompts, concurrency) all_results.extend(batch_results) # 计算当前批次的指标 batch_metrics self.calculate_metrics(batch_results) metrics_history.append(batch_metrics) # 打印当前批次结果 print(f 成功: {batch_metrics[successful_requests]}/{len(batch_results)}) print(f 平均延迟: {batch_metrics[avg_latency]:.2f}秒) print(f 吞吐量: {batch_metrics[throughput]:.2f} 请求/秒) # 模拟用户思考时间 if batch_end num_requests: await asyncio.sleep(think_time) # 计算总体指标 overall_metrics self.calculate_metrics(all_results) print(\n * 60) print(压力测试完成总体结果) print( * 60) self.print_metrics_table(overall_metrics) # 保存详细结果到文件 self.save_results_to_csv(all_results, overall_metrics) return all_results, overall_metrics, metrics_history def print_metrics_table(self, metrics: Dict[str, Any]): 以表格形式打印性能指标 print(f{指标:25} {数值:15} {说明:30}) print(- * 70) print(f{总请求数:25} {metrics[total_requests]:15} {发送的总请求数量:30}) print(f{成功请求数:25} {metrics[successful_requests]:15} {成功响应的请求数量:30}) print(f{失败请求数:25} {metrics[failed_requests]:15} {失败或超时的请求数量:30}) print(f{成功率:25} {metrics[success_rate]*100:15.2f}% {请求成功百分比:30}) print(f{平均延迟:25} {metrics[avg_latency]:15.2f}秒 {从发送到接收的平均时间:30}) print(f{最小延迟:25} {metrics[min_latency]:15.2f}秒 {最快响应时间:30}) print(f{最大延迟:25} {metrics[max_latency]:15.2f}秒 {最慢响应时间:30}) print(f{吞吐量:25} {metrics[throughput]:15.2f}请求/秒 {每秒处理的请求数:30}) print(f{令牌生成速度:25} {metrics[avg_tokens_per_second]:15.2f}令牌/秒 {每秒生成的令牌数:30}) print(f{总生成令牌:25} {metrics[total_tokens_generated]:15} {测试期间生成的总令牌数:30}) def save_results_to_csv(self, results: List[Dict[str, Any]], metrics: Dict[str, Any]): 保存测试结果到CSV文件 timestamp datetime.now().strftime(%Y%m%d_%H%M%S) # 保存详细结果 df_results pd.DataFrame(results) results_filename fpressure_test_results_{timestamp}.csv df_results.to_csv(results_filename, indexFalse, encodingutf-8-sig) # 保存指标汇总 metrics_df pd.DataFrame([metrics]) metrics_filename fpressure_test_metrics_{timestamp}.csv metrics_df.to_csv(metrics_filename, indexFalse, encodingutf-8-sig) print(f\n详细结果已保存到: {results_filename}) print(f性能指标已保存到: {metrics_filename}) # 提示词生成函数前面定义的 def generate_test_prompts(num_prompts100): 生成测试用的提示词列表 # ...使用前面定义的函数... async def main(): 主函数 # 创建测试器实例 tester VLLMPressureTester( base_urlhttp://localhost:8000, # 根据你的实际地址修改 model_namenanbeige-4.1-3b ) # 运行压力测试 # 参数说明 # num_requests: 总请求数建议100-500 # concurrency: 并发数我们测试16并发 # think_time: 批次间的思考时间秒 await tester.run_pressure_test( num_requests100, concurrency16, think_time0.1 ) if __name__ __main__: asyncio.run(main())4.2 脚本使用说明这个脚本做了以下几件重要的事情并发控制使用asyncio和aiohttp实现真正的异步并发请求错误处理对超时、网络错误、服务器错误等情况都有处理性能统计自动计算延迟、吞吐量、成功率等关键指标结果保存将详细结果保存为CSV文件方便后续分析进度显示实时显示测试进度和当前批次的性能5. 运行压测并分析结果5.1 执行压测脚本将上面的完整脚本保存为pressure_test.py然后在WebShell中运行python pressure_test.py你会看到类似这样的输出开始压力测试总请求数100并发数16 发送批次 1: 请求 1-16 成功: 16/16 平均延迟: 1.23秒 吞吐量: 13.01 请求/秒 发送批次 2: 请求 17-32 成功: 16/16 平均延迟: 1.31秒 吞吐量: 12.21 请求/秒 ... 压力测试完成总体结果 指标 数值 说明 ---------------------------------------------------------------------- 总请求数 100 发送的总请求数量 成功请求数 98 成功响应的请求数量 失败请求数 2 失败或超时的请求数量 成功率 98.00% 请求成功百分比 平均延迟 1.28秒 从发送到接收的平均时间 最小延迟 0.89秒 最快响应时间 最大延迟 2.45秒 最慢响应时间 吞吐量 12.45请求/秒 每秒处理的请求数 令牌生成速度 45.67令牌/秒 每秒生成的令牌数 总生成令牌 4567 测试期间生成的总令牌数 详细结果已保存到: pressure_test_results_20240101_143022.csv 性能指标已保存到: pressure_test_metrics_20240101_143022.csv5.2 结果分析要点看到这些数字后我们应该关注什么成功率这是最重要的指标。如果成功率低于95%说明服务不稳定。平均延迟对于对话应用1-3秒的响应时间是可以接受的。如果超过5秒用户体验会受影响。吞吐量这个数字告诉你模型能同时服务多少用户。12.45请求/秒意味着理论上可以支持每秒12个用户的请求。最大延迟关注最坏情况。如果最大延迟远高于平均延迟说明有些请求被严重阻塞。5.3 常见问题排查如果在测试中遇到问题可以按以下步骤排查问题1大量请求失败或超时# 检查vLLM服务状态 ps aux | grep vllm # 检查服务日志 tail -f /root/workspace/llm.log # 检查系统资源 top -b -n 1 | head -20 free -h问题2延迟过高可能的原因和解决方案GPU内存不足vLLM需要足够的GPU内存来缓存KV。可以尝试减少max_model_len或使用量化。批处理大小不合适vLLM的--max_num_batched_tokens参数可能需要调整。提示词过长过长的提示词会显著增加计算时间。问题3吞吐量过低优化建议# 调整vLLM启动参数 # 在启动vLLM时尝试这些参数 vllm serve nanbeige-4.1-3b \ --max_num_batched_tokens 4096 \ # 增加批处理token数 --max_num_seqs 32 \ # 增加同时处理的序列数 --gpu-memory-utilization 0.9 # 提高GPU内存利用率6. 进阶压测技巧6.1 不同负载场景测试真实的用户访问模式是多样的我们可以模拟几种典型场景async def test_different_scenarios(tester): 测试不同负载场景 scenarios [ {name: 低负载, concurrency: 4, think_time: 1.0}, {name: 中等负载, concurrency: 8, think_time: 0.5}, {name: 高负载, concurrency: 16, think_time: 0.1}, {name: 峰值负载, concurrency: 32, think_time: 0.05}, ] all_results {} for scenario in scenarios: print(f\n测试场景: {scenario[name]}) print(f并发数: {scenario[concurrency]}, 思考时间: {scenario[think_time]}秒) results, metrics, _ await tester.run_pressure_test( num_requests100, concurrencyscenario[concurrency], think_timescenario[think_time] ) all_results[scenario[name]] metrics # 对比不同场景的结果 print(\n *60) print(不同负载场景性能对比) print(*60) comparison_data [] for name, metrics in all_results.items(): comparison_data.append({ 场景: name, 成功率: f{metrics[success_rate]*100:.1f}%, 平均延迟: f{metrics[avg_latency]:.2f}秒, 吞吐量: f{metrics[throughput]:.2f}请求/秒, 令牌速度: f{metrics[avg_tokens_per_second]:.1f}令牌/秒 }) df_comparison pd.DataFrame(comparison_data) print(df_comparison.to_string(indexFalse))6.2 长时间稳定性测试对于生产环境还需要进行长时间稳定性测试async def stability_test(tester, duration_minutes30): 长时间稳定性测试 print(f开始{duration_minutes}分钟稳定性测试...) start_time time.time() end_time start_time duration_minutes * 60 all_results [] while time.time() end_time: # 每5分钟测试一次 results, metrics, _ await tester.run_pressure_test( num_requests50, concurrency8, think_time0.2 ) all_results.extend(results) # 记录当前时间点的性能 current_time time.strftime(%H:%M:%S, time.localtime()) print(f[{current_time}] 当前性能: f延迟{metrics[avg_latency]:.2f}s, f吞吐{metrics[throughput]:.2f}/s, f成功率{metrics[success_rate]*100:.1f}%) # 等待4分钟55秒后进行下一轮测试 await asyncio.sleep(295) print(f\n稳定性测试完成总运行时间: {duration_minutes}分钟) print(f总请求数: {len(all_results)}) # 分析性能波动 # ...这里可以添加更详细的分析代码...6.3 监控系统资源在压测过程中同时监控系统资源使用情况# 在另一个终端窗口运行资源监控 watch -n 1 echo GPU使用情况 ; nvidia-smi --query-gpuutilization.gpu,memory.used,memory.total --formatcsv; echo ; echo 内存使用情况 ; free -h | head -2; echo ; echo CPU使用情况 ; top -b -n 1 | head -57. 总结与优化建议7.1 测试结果解读通过这次16并发压测你应该得到了Nanbeige4.1-3B模型在你的部署环境下的性能基线。记住几个关键数字你的吞吐量极限当并发数增加到多少时性能开始显著下降你的延迟边界在什么并发水平下延迟会超过可接受范围你的错误模式失败请求主要是超时还是其他错误这些数字将成为你后续优化和容量规划的基础。7.2 性能优化建议根据测试结果你可以考虑以下优化方向如果吞吐量不足调整vLLM的批处理参数--max_num_batched_tokens,--max_num_seqs考虑使用量化版本如GPTQ、AWQ减少显存占用升级GPU硬件或增加GPU数量如果延迟过高限制生成的最大token数使用流式输出改善用户体验实现请求队列和优先级调度如果稳定性有问题增加服务监控和自动重启机制实现负载均衡和多实例部署设置合理的超时和重试策略7.3 生产环境部署建议对于生产环境我建议设置性能监控持续监控延迟、吞吐量、错误率等关键指标实现自动扩缩容根据负载自动调整实例数量建立容量规划基于压测结果规划硬件资源准备降级方案在高负载时提供简化版服务7.4 下一步学习方向掌握了基础压测后你可以进一步探索混合负载测试模拟真实用户的不同请求模式混沌工程测试模拟网络波动、服务重启等异常情况成本效益分析找到性能与成本的最佳平衡点A/B测试框架对比不同模型或参数配置的效果记住压测不是一次性的任务而应该成为持续集成和持续部署的一部分。定期进行压测确保你的服务始终能满足用户的需求。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。

更多文章