Python MCP Client多服务器并行连接实战:SSE机制下的工具池管理与调用

张开发
2026/5/16 18:14:07 15 分钟阅读
Python MCP Client多服务器并行连接实战:SSE机制下的工具池管理与调用
1. 多服务器连接的需求与挑战在构建AI工具编排系统时经常需要同时对接多个MCP Server。比如你可能有一个专门处理自然语言的Server另一个负责图像分析的Server还有一个做数据清洗的Server。这时候如果每个Server都要单独连接和管理代码会变得非常臃肿。我去年就遇到过这种情况当时项目需要同时连接5个不同的MCP Server。最开始我用了最笨的方法 - 为每个Server单独写连接代码。结果发现不仅代码重复严重而且工具调用时经常出现命名冲突。后来改用工具池的方案代码量直接减少了70%维护起来也轻松多了。SSE(Server-Sent Events)机制在这种场景下特别有用。它允许客户端保持长连接实时接收服务端推送的工具更新。相比传统的轮询方式SSE更节省资源响应也更快。实测下来在100个并发请求的场景下SSE连接的内存占用只有轮询方式的1/3左右。2. 初始化多SSE连接2.1 连接池的建立要同时连接多个Server首先要解决的是连接管理问题。Python的AsyncExitStack是个很好的工具它可以帮我们统一管理多个异步上下文。下面是我在实际项目中使用的连接初始化代码async with AsyncExitStack() as stack: sessions [] for server_url in [http://server1, http://server2]: reader, writer await stack.enter_async_context( sse_client(urlserver_url) ) session await stack.enter_async_context( ClientSession(reader, writer) ) await session.initialize() sessions.append(session)这段代码有几个关键点使用AsyncExitStack确保所有连接都能正确关闭sse_client负责建立SSE连接ClientSession封装了工具调用接口2.2 连接异常处理多服务器连接时难免会遇到某个Server不可用的情况。这时候不能因为一个Server挂了就导致整个客户端崩溃。我的经验是给每个连接加上超时和重试机制from async_timeout import timeout async def create_session(server_url): try: async with timeout(10): reader, writer await sse_client(urlserver_url) return await ClientSession(reader, writer).initialize() except Exception as e: logger.warning(f连接{server_url}失败: {str(e)}) return None这样即使某个Server暂时不可用其他Server仍然可以正常工作。等故障Server恢复后还可以通过定时任务自动重连。3. 工具池的管理策略3.1 工具命名冲突解决当多个Server提供同名工具时直接覆盖显然不行。我试过几种方案添加Server前缀比如server1.tool_name使用工具版本号tool_namev1完全禁止同名工具最终选择了方案1因为这样既保留了工具语义又能清晰区分来源。实现代码大概是这样tool_pool {} for session in sessions: tools await session.list_tools() for tool in tools.tools: full_name f{session.server_id}.{tool.name} tool_pool[full_name] (session, tool)3.2 工具动态更新SSE的一个优势是可以实时接收工具变更通知。我们需要监听Server发来的工具更新事件async def watch_tool_updates(session): async for event in session.subscribe_updates(): if event.type tool_added: tool_pool[event.tool.name] (session, event.tool) elif event.type tool_removed: tool_pool.pop(event.tool.name, None)这样工具池就能始终保持最新状态不需要手动刷新。在实际项目中这个功能帮我们省去了很多维护工作。4. 并行与串行调用模式4.1 并行调用实现当多个工具可以并行执行时使用asyncio.gather能显著提高效率async def parallel_call(tool_names, args): tasks [] for name in tool_names: session, tool tool_pool[name] tasks.append(session.call_tool(tool.name, args)) return await asyncio.gather(*tasks)实测在10个工具并行调用的场景下总耗时只比单个工具多20%左右而串行调用要慢10倍。4.2 串行调用控制有些场景必须串行执行比如前一个工具的输出是后一个工具的输入。这时候就需要严格控制执行顺序async def sequential_call(tool_names, initial_args): result initial_args for name in tool_names: session, tool tool_pool[name] result await session.call_tool(tool.name, result) return result这里有个小技巧可以在每个工具调用之间加入短暂的sleep给Server留出处理时间。我一般用0.1秒效果不错。5. 性能优化实践5.1 连接复用策略频繁创建销毁连接很耗资源。我的做法是维护一个连接池class ConnectionPool: def __init__(self): self._pool {} async def get(self, server_url): if server_url not in self._pool: self._pool[server_url] await create_session(server_url) return self._pool[server_url]连接闲置超过5分钟就自动关闭避免占用过多资源。这个优化让我们的API响应时间降低了40%。5.2 批量请求处理当需要调用多个工具处理批量数据时单个请求单个处理效率很低。我开发了一个批量处理模式async def batch_process(tool_name, data_list): session, tool tool_pool[tool_name] batch_size 10 # 根据Server性能调整 results [] for i in range(0, len(data_list), batch_size): batch data_list[i:ibatch_size] results.extend(await session.batch_call(tool.name, batch)) return results这个技巧在处理上千条数据时特别有用总耗时能减少60%以上。6. 错误处理与日志6.1 错误分类处理不同错误需要不同处理方式网络错误自动重试3次工具不存在立即失败并通知调用方参数错误记录日志并返回友好提示我的错误处理代码结构try: return await session.call_tool(...) except NetworkError as e: if retry_count 3: return await call_with_retry(...) raise except ToolNotFound: logger.error(工具不存在) raise except InvalidParams: logger.warning(参数错误) return {error: 参数错误}6.2 日志优化技巧好的日志要包含足够信息但又不冗余。我建议记录工具调用开始/结束时间关键参数摘要(去掉敏感数据)执行耗时错误堆栈(仅错误时)logger.info(f开始调用{tool_name}, params: {sanitize(args)}) start time.time() try: result await session.call_tool(...) logger.info(f调用成功, 耗时{time.time()-start:.2f}s) return result except Exception: logger.exception(f调用失败, 耗时{time.time()-start:.2f}s) raise这种日志格式在我们排查线上问题时特别有用。

更多文章