问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

MCP核心概念与架构

创作时间:
作者:
@小白创作中心

MCP核心概念与架构

引用
CSDN
1.
https://blog.csdn.net/lovechris00/article/details/146327169

模型上下文协议(Model Context Protocol,简称MCP)是一个开放协议,它标准化了应用程序如何向大型语言模型提供上下文。将MCP想象成AI应用程序的USB-C接口。就像USB-C提供了一种标准化的方式将您的设备连接到各种外围设备和配件一样,MCP也提供了一种标准化的方式将AI模型连接到不同的数据源和工具。

MCP核心概念与架构

一、关于 MCP

MCP 是一个开放协议,它标准化了应用程序如何向大型语言模型提供上下文。将 MCP 想象成 AI 应用程序的 USB-C 接口。就像 USB-C 提供了一种标准化的方式将您的设备连接到各种外围设备和配件一样,MCP 也提供了一种标准化的方式将 AI 模型连接到不同的数据源和工具。

二、为什么选择MCP?

MCP 帮助你在 LLMs 之上构建代理和复杂的流程。LLMs 经常需要与数据和工具集成,MCP 提供:

  • 一个不断增长的预构建集成列表,您的LLM可以直接接入
  • 能够在LLM提供者和供应商之间进行切换的灵活性
  • 最佳实践:在您的基础设施中保护您的数据

通用架构

在其核心,MCP遵循客户端-服务器架构,其中主机应用程序可以连接到多个服务器:

  • MCP 主机: 像Claude Desktop、IDE或希望通过MCP访问数据的AI工具等程序
  • MCP 客户端: 与服务器保持1:1连接的协议客户端
  • MCP 服务器: 每个服务器通过标准化的模型上下文协议公开特定功能的轻量级程序
  • 本地数据源: MCP服务器可以安全访问的您的计算机文件、数据库和服务
  • 远程服务: 通过互联网(例如,通过API)可用的外部系统,MCP服务器可以连接到

MCP核心组件

一、协议层

协议层处理消息封装、请求/响应连接以及高级通信模式。

TypeScript

class Protocol<Request, Notification, Result> {
    // Handle incoming requests
    setRequestHandler<T>(schema: T, handler: (request: T, extra: RequestHandlerExtra) => Promise<Result>): void
    // Handle incoming notifications
    setNotificationHandler<T>(schema: T, handler: (notification: T) => Promise<void>): void
    // Send requests and await responses
    request<T>(request: Request, schema: T, options?: RequestOptions): Promise<T>
    // Send one-way notifications
    notification(notification: Notification): Promise<void>
}

Python

class Session(BaseSession[RequestT, NotificationT, ResultT]):
    async def send_request(
        self,
        request: RequestT,
        result_type: type[Result]
    ) -> Result:
        """
        Send request and wait for response. Raises McpError if response contains error.
        """
        # Request handling implementation
    async def send_notification(
        self,
        notification: NotificationT
    ) -> None:
        """Send one-way notification that doesn't expect response."""
        # Notification handling implementation
    async def _received_request(
        self,
        responder: RequestResponder[ReceiveRequestT, ResultT]
    ) -> None:
        """Handle incoming request from other side."""
        # Request handling implementation
    async def _received_notification(
        self,
        notification: ReceiveNotificationT
    ) -> None:
        """Handle incoming notification from other side."""
        # Notification handling implementation

关键类包括:

  • Protocol
  • Client
  • Server

二、传输层

传输层处理客户端和服务器之间的实际通信。MCP支持多种传输机制:

  1. 标准输入输出传输
  • 使用标准输入/输出进行通信
  • 适用于本地进程

  1. 使用SSE传输的HTTP
  • 使用服务器发送事件进行服务器到客户端的消息
  • HTTP POST 用于客户端到服务器的消息

所有传输都使用JSON-RPC2.0 来交换消息。有关模型上下文协议消息格式的详细信息,请参阅规范。

三、消息类型

MCP主要有以下几种消息类型:

  1. Requests期待从另一端收到响应:
interface Request {
  method: string;
  params?: { ... };
}
  1. 结果是请求的成功响应:
interface Result {
  [key: string]: unknown;
}
  1. 错误表示请求失败:
interface Error {
  code: number;
  message: string;
  data?: unknown;
}
  1. 通知是单向消息,不需要响应:
interface Notification {
  method: string;
  params?: { ... };
}

连接生命周期

1. 初始化

  1. 客户端发送带有协议版本和功能的 initialize 请求
  2. 服务器响应其协议版本和功能
  3. 客户端发送 initialized 通知作为确认
  4. 正常的消息交换开始

2. 消息交换

初始化之后,以下模式被支持:

  • 请求-响应: 客户端或服务器发送请求,另一方做出响应
  • 通知:任一方发送单向消息

3. 终止

任何一方都可以终止连接:

  • 通过 close() 进行干净关闭
  • 传输断开
  • 错误条件

错误处理

MCP 定义了以下标准错误代码:

enum ErrorCode {
  // Standard JSON-RPC error codes
  ParseError = -32700,
  InvalidRequest = -32600,
  MethodNotFound = -32601,
  InvalidParams = -32602,
  InternalError = -32603
}

SDKs 和应用程序可以定义它们自己的错误代码,其值大于 -32000。错误通过以下方式传播:

  • 请求的错误响应
  • 错误事件在传输上
  • 协议级错误处理器

实现示例

Here’s a basic example of implementing an MCP server:

Python

import asyncio
import mcp.types as types
from mcp.server import Server
from mcp.server.stdio import stdio_server
app = Server("example-server")
@app.list_resources()
async def list_resources() -> list[types.Resource]:
    return [
        types.Resource(
            uri="example://resource",
            name="Example Resource"
        )
    ]
async def main():
    async with stdio_server() as streams:
        await app.run(
            streams[0],
            streams[1],
            app.create_initialization_options()
        )
if __name__ == "__main__":
    asyncio.run(main)

TypeScript

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new Server({
  name: "example-server",
![](https://wy-static.wenxiaobai.com/chat-rag-image/487865130461273956)
  version: "1.0.0"
}, {
  capabilities: {
    resources: {}
  }
});
// Handle requests
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  return {
    resources: [
      {
        uri: "example://resource",
        name: "Example Resource"
      }
    ]
  };
});
// Connect transport
const transport = new StdioServerTransport();
await server.connect(transport);

最佳实践

1. 运输选择

  1. 本地通信
  • 使用stdio传输方式用于本地进程
  • 针对同一机器通信高效
  • 简单的过程管理
  1. 远程通信
  • 在需要HTTP兼容性的场景中使用SSE
  • 考虑安全性影响,包括身份验证和授权

2. 消息处理

  1. 请求处理
  • 仔细验证输入
  • 使用类型安全的模式
  • 优雅地处理错误
  • 实现超时
  1. 进度报告
  • 使用进度令牌进行长时间操作
  • 逐步报告进度
  • 当知道总进度时,包括总进度
  1. 错误管理
  • 使用适当的错误代码
  • 包含有用的错误消息
  • 错误发生时清理资源

安全考虑

  1. 传输安全
  • 使用 TLS 进行远程连接
  • 验证连接来源
  • 需要时实现身份验证
  1. 消息验证
  • 验证所有传入消息
  • 清理输入
  • 检查消息大小限制
  • 验证 JSON-RPC 格式
  1. 资源保护
  • 实施访问控制
  • 验证资源路径
  • 监控资源使用
  • 限制请求速率
  1. 错误处理
  • 不要泄露敏感信息
  • 记录与安全相关的错误
  • 实施适当的清理
  • 处理 DoS 场景

调试和监控

  1. 日志记录
  • 记录协议事件
  • 跟踪消息流程
  • 监控性能
  • 记录错误
  1. 诊断
  • 实施健康检查
  • 监控连接状态
  • 跟踪资源使用
  • 分析性能
  1. 测试
  • 测试不同的传输方式
  • 验证错误处理
  • 检查边缘情况
  • 进行服务器负载测试
© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号