本文将详细介绍如何使用 Model Context Protocol (MCP) 在 Python 中实现基于 STDIO 通信的 Client 与 Server。MCP 是一个开放协议,它使 LLM 应用与外部数据源和工具之间的无缝集成成为可能。无论你是构建 AI 驱动的 IDE、改善 chat 交互,还是构建自定义的 AI 工作流,MCP 提供了一种标准化的方式,将 LLM 与它们所需的上下文连接起来。部分灵感来源:Se7en。
提示
在开始前,请确保你已经安装了必要的依赖包:pip install openai mcp
- 1
本文中,我们将介绍如何配置环境、编写 MCP Server 以及实现 MCP Client。
环境配置
在使用 MCP 之前,需要先配置相关环境变量,以便 Client 与 Server 都能正确加载所需的参数。你可以在项目根目录下创建一个 .env
文件,并写入以下内容:
此外,创建一个 .env
文件来存储您的配置:
MODEL_NAME=deepseek-chat
BASE_URL=https://api.deepseek.com/v1
API_KEY=your_api_key_here
- 1
- 2
- 3
上述配置中,MODEL_NAME
表示使用的 OpenAI 模型名称(例如 “deepseek-chat”),BASE_URL
指向 OpenAI API 的基础地址,而 API_KEY
则为访问 API 所需的密钥。
书写 Server 的规范
构建 MCP Server(特别是基于 stdio
通信)时,推荐遵循统一规范,提升可读性、可维护性与复用性。
- 服务命名统一
使用MCP_SERVER_NAME
作为唯一名称,贯穿日志、初始化等环节。 - 日志配置清晰
统一使用logging
模块,推荐 INFO 级别,便于调试和追踪。 - 工具注册规范
通过@mcp.tool()
装饰器注册工具函数,要求:- 命名清晰
- 参数有类型注解
- 注释说明参数与返回值(推荐中文)
- 加入边界检查或异常处理
- 使用标准 stdio 启动方式
通过async with stdio_server()
获取输入输出流,统一调用_mcp_server.run(...)
启动服务。 - 初始化选项规范
使用InitializationOptions
设置服务名、版本及能力声明(通常由FastMCP
提供)。
-
通用模板
import asyncio import logging from mcp.server.fastmcp import FastMCP from mcp.server import InitializationOptions, NotificationOptions from mcp.server.stdio import stdio_server # STDIO 通信方式 # 定义唯一服务名称 MCP_SERVER_NAME = "your-stdio-server-name" # 配置日志输出 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(MCP_SERVER_NAME) # 创建 FastMCP 实例 mcp = FastMCP(MCP_SERVER_NAME) # 定义工具 @mcp.tool() def your_tool_name(param1: type, param2: type) -> return_type: """ 工具描述。 参数: - param1 (type): 描述 - param2 (type): 描述 返回: - return_type: 描述 """ # 工具实现 pass # 启动 MCP Server 主函数 async def main(): # 创建 stdio 通信通道 async with stdio_server() as (read_stream, write_stream): # 构建初始化选项 init_options = InitializationOptions( server_name=MCP_SERVER_NAME, server_version="1.0.0", capabilities=mcp._mcp_server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={} ) ) logger.info("MCP Server 以 STDIO 模式启动中...") # 启动 Server await mcp._mcp_server.run(read_stream, write_stream, init_options) # 主程序入口 if __name__ == "__main__": asyncio.run(main())
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
编写 MCP Server
MCP Server 的实现主要基于标准输入输出(STDIO)进行通信。服务端通过注册工具,向外界提供如加法、减法、乘法以及除法等计算功能。下面简述服务端的主要实现步骤:
- 初始化 FastMCP 实例
服务端首先创建一个 FastMCP 实例,并为其命名(例如 “math-stdio-server”)。 - 工具注册
使用装饰器的方式注册加法、减法、乘法、除法等工具,每个工具均包含详细的参数说明和返回值说明。 - 日志配置
通过 Python 标准日志模块对服务端进行日志配置,以便记录服务运行状态和错误信息。 - 建立 STDIO 通信
使用stdio_server()
函数建立基于 STDIO 的通信,并构造初始化选项,包含服务器名称、版本以及能力说明。随后,调用 MCP 内部的服务启动函数开始监听和处理来自 Client 的请求。
import asyncio
import logging
from mcp.server.fastmcp import FastMCP
from mcp.server import InitializationOptions, NotificationOptions
from mcp.server.stdio import stdio_server # 直接导入 stdio_server 函数
# 定义服务器名称
MCP_SERVER_NAME = "math-stdio-server"
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)
# 注册加法工具
@mcp.tool()
def add(a: float, b: float) -> float:
"""
加法工具
参数:
- a (float): 第一个数字(必填)
- b (float): 第二个数字(必填)
返回:
- float: a 与 b 的和
"""
return a + b
# 注册减法工具
@mcp.tool()
def subtract(a: float, b: float) -> float:
"""
减法工具
参数:
- a (float): 被减数(必填)
- b (float): 减数(必填)
返回:
- float: a 与 b 的差
"""
return a - b
# 注册乘法工具
@mcp.tool()
def multiply(a: float, b: float) -> float:
"""
乘法工具
参数:
- a (float): 第一个数字(必填)
- b (float): 第二个数字(必填)
返回:
- float: a 与 b 的积
"""
return a * b
# 注册除法工具
@mcp.tool()
def divide(a: float, b: float) -> float:
"""
除法工具
参数:
- a (float): 分子(必填)
- b (float): 分母(必填,且不能为零)
返回:
- float: a 与 b 的商
"""
if b == 0:
raise ValueError("除数不能为零")
return a / b
async def main():
# 使用 stdio_server 建立 STDIO 通信
async with stdio_server() as (read_stream, write_stream):
# 构造初始化选项
init_options = InitializationOptions(
server_name=MCP_SERVER_NAME,
server_version="1.0.0",
capabilities=mcp._mcp_server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
logger.info("通过 STDIO 模式启动 MCP Server ...")
# 使用内部的 _mcp_server 运行服务
await mcp._mcp_server.run(read_stream, write_stream, init_options)
if __name__ == "__main__":
asyncio.run(main())
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
编写 MCP Client
MCP Client 主要实现与多个基于 STDIO 的服务器建立连接,并通过 OpenAI API 对用户的自然语言查询进行处理,调用相应工具获得最终结果。客户端的主要逻辑可以分为以下部分:
- 初始化客户端
在 MCPClient 类的构造函数中,传入所需的模型名称、OpenAI API 基础地址、API 密钥以及包含服务端脚本路径的列表。客户端将使用这些参数初始化 OpenAI 异步客户端,同时准备一个 AsyncExitStack 来管理所有异步上下文。 - 建立多个 STDIO 连接
通过遍历服务器脚本列表,为每个脚本生成唯一标识符(如server0
、server1
等),然后依次调用stdio_client
函数建立连接,并通过 ClientSession 完成初始化。在连接成功后,从每个服务器获取可用工具列表,并将工具名称加上前缀(例如server0_add
)保存到映射表中,避免工具名称冲突。 - 处理用户查询
在process_query
方法中,客户端首先根据用户的输入构造消息,然后汇总所有连接服务器提供的工具,传递给 OpenAI API 进行处理。当 API 返回调用工具的请求时,客户端根据工具名称找到对应服务器会话,并执行相应的工具调用,收集返回结果后再交由 API 生成后续回复,直至所有工具调用处理完成。 - 交互式对话循环
客户端提供一个简单的命令行交互循环,用户输入查询后,调用process_query
方法获取最终回复,并打印在终端上。如果用户输入quit
或使用 Ctrl+C 中断,则客户端将平滑退出并释放所有资源。 - 资源清理
最后,在退出前,通过 AsyncExitStack 统一关闭所有连接,确保资源不会泄露。
import asyncio
import json
import os
import sys
from typing import List
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
class MCPClient:
def __init__(self, model_name: str, base_url: str, api_key: str, server_scripts: List[str]):
"""
初始化 MCP 客户端,支持多个 stdio 服务器。
:param model_name: OpenAI 模型名称,例如 "deepseek-chat"。
:param base_url: OpenAI API 基础地址,例如 "https://api.deepseek.com/v1"。
:param api_key: OpenAI API 密钥。
:param server_scripts: stdio 服务脚本路径列表。
"""
self.model_name = model_name
self.base_url = base_url
self.api_key = api_key
self.server_scripts = server_scripts
self.sessions = {} # server_id -> (session, session_ctx, stdio_ctx)
self.tool_mapping = {} # 带前缀的工具名 -> (session, 原始工具名)
self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)
self.exit_stack = AsyncExitStack()
async def initialize_sessions(self):
"""初始化所有 stdio 服务器连接,并收集工具映射。"""
for i, script in enumerate(self.server_scripts):
if not (os.path.exists(script) and script.endswith(".py")):
print(f"脚本 {script} 不存在或不是 .py 文件,跳过。")
continue
server_id = f"server{i}"
params = StdioServerParameters(command="python", args=[script], env=None)
try:
stdio_ctx = stdio_client(params)
stdio = await self.exit_stack.enter_async_context(stdio_ctx)
session_ctx = ClientSession(*stdio)
session = await self.exit_stack.enter_async_context(session_ctx)
await session.initialize()
self.sessions[server_id] = (session, session_ctx, stdio_ctx)
response = await session.list_tools()
for tool in response.tools:
self.tool_mapping[f"{server_id}_{tool.name}"] = (session, tool.name)
print(f"已连接到 {script},工具:{[tool.name for tool in response.tools]}")
except Exception as e:
print(f"连接 {script} 失败:{e}")
async def cleanup(self):
"""释放所有资源。"""
try:
await self.exit_stack.aclose()
print("所有连接资源已释放")
except asyncio.CancelledError:
pass
except Exception as e:
print(f"清理资源时异常:{e}")
async def _gather_available_tools(self):
"""汇总所有服务器的工具列表。"""
tools = []
for server_id, (session, _, _) in self.sessions.items():
response = await session.list_tools()
for tool in response.tools:
tools.append({
"type": "function",
"function": {
"name": f"{server_id}_{tool.name}",
"description": tool.description,
"parameters": tool.inputSchema,
}
})
return tools
async def process_query(self, query: str) -> str:
"""处理查询,调用 OpenAI API 和相应工具后返回结果。"""
messages = [{"role": "user", "content": query}]
available_tools = await self._gather_available_tools()
try:
response = await self.client.chat.completions.create(
model=self.model_name, messages=messages, tools=available_tools
)
except Exception as e:
return f"调用 OpenAI API 失败:{e}"
final_text = [response.choices[0].message.content or ""]
message = response.choices[0].message
# 当有工具调用时循环处理
while message.tool_calls:
for call in message.tool_calls:
tool_name = call.function.name
if tool_name not in self.tool_mapping:
final_text.append(f"未找到工具:{tool_name}")
continue
session, original_tool = self.tool_mapping[tool_name]
tool_args = json.loads(call.function.arguments)
try:
result = await session.call_tool(original_tool, tool_args)
final_text.append(f"[调用 {tool_name} 参数: {tool_args}]")
final_text.append(f"工具结果: {result.content}")
except Exception as e:
final_text.append(f"调用 {tool_name} 出错:{e}")
continue
messages += [
{"role": "assistant", "tool_calls": [{
"id": call.id,
"type": "function",
"function": {"name": tool_name, "arguments": json.dumps(tool_args)}
}]},
{"role": "tool", "tool_call_id": call.id, "content": str(result.content)}
]
try:
response = await self.client.chat.completions.create(
model=self.model_name, messages=messages, tools=available_tools
)
except Exception as e:
final_text.append(f"调用 OpenAI API 失败:{e}")
break
message = response.choices[0].message
if message.content:
final_text.append(message.content)
return "\n".join(final_text)
async def chat_loop(self):
"""交互式对话循环,捕获中断平滑退出。"""
print("MCP 客户端已启动,输入问题,输入 'quit' 退出。")
while True:
try:
query = input("问题: ").strip()
if query.lower() == "quit":
break
result = await self.process_query(query)
print("\n" + result)
except KeyboardInterrupt:
print("\n检测到中断信号,退出。")
break
except Exception as e:
print(f"发生错误:{e}")
async def main():
model_name = os.getenv("MODEL_NAME", "deepseek-chat")
base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1")
api_key = os.getenv("API_KEY")
if not api_key:
print("未设置 API_KEY 环境变量")
sys.exit(1)
# 示例:使用两个 stdio 脚本
server_scripts = ["server.py"]
client = MCPClient(model_name, base_url, api_key, server_scripts)
try:
await client.initialize_sessions()
await client.chat_loop()
except KeyboardInterrupt:
print("\n收到中断信号")
finally:
await client.cleanup()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程序已终止。")
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
总结
通过本文的介绍,我们了解了如何使用 MCP 协议在 Python 中构建基于 STDIO 通信的 Client 与 Server。服务端通过注册多个工具为外部应用提供计算能力,而客户端则利用 OpenAI API 和工具调用的方式,将自然语言查询转化为对具体工具的调用,最终将结果反馈给用户。
这种基于 STDIO 的通信方式不仅简化了服务端与客户端之间的连接,还能方便地支持多服务器同时运行,为构建灵活高效的 LLM 应用提供了坚实的基础。
评论记录:
回复评论: