首页 / AI工具 / MCP 协议详解:让 AI 连接任意数据...

MCP 协议详解:让 AI 连接任意数据源的完整实战

MCP 协议详解:让 AI 连接任意数据源的完整实战

深入解析 Model Context Protocol (MCP) 协议原理,手把手教你自建 MCP Server,实现 AI 与数据库、API、文件系统的无缝连接。

什么是 MCP?

Model Context Protocol(MCP)是 Anthropic 于 2024 年底开源的开放协议,旨在标准化 AI 模型与外部数据源、工具之间的通信方式。到 2026 年,MCP 已经成为 AI 应用生态的事实标准,被 Claude、Cursor、Cline 等众多 AI 编程工具广泛支持。

可以把 MCP 理解为 AI 世界的「USB-C 接口」—— 统一、开放、即插即用。

MCP 的核心架构

MCP 采用客户端-服务器架构,包含三个核心概念:


            ┌─────────────────────────────────────────────────────────┐
            │                    MCP Host (Claude/Cursor)              │
            │  ┌───────────────────────────────────────────────────┐  │
            │  │              MCP Client (协议客户端)                │  │
            │  │         stdio / SSE / HTTP 传输层                 │  │
            │  └───────────────────────────────────────────────────┘  │
            └─────────────────────────────────────────────────────────┘
                                      │
                      ┌───────────────┼───────────────┐
                      ▼               ▼               ▼
                ┌─────────┐    ┌─────────┐    ┌─────────┐
                │MCP Server│    │MCP Server│    │MCP Server│
                │ (文件系统)│    │ (数据库) │    │ (Git)   │
                └─────────┘    └─────────┘    └─────────┘
            

核心原语

|------|------|------|

| Resources | 暴露只读数据 | 文件内容、数据库记录、API 响应 |

| Tools | 暴露可执行操作 | 执行 SQL、发送邮件、运行命令 |

| Prompts | 暴露可复用模板 | 代码审查模板、日志分析模板 |

实战:搭建第一个 MCP Server

Step 1: 项目初始化

bash
            mkdir mcp-weather-server && cd mcp-weather-server
            npm init -y
            npm install @modelcontextprotocol/sdk zod
            npm install -D @types/node typescript
            npx tsc --init
            

Step 2: 实现天气查询 MCP Server

typescript
            // src/index.ts
            import { Server } from "@modelcontextprotocol/sdk/server/index.js";
            import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
            import {
              CallToolRequestSchema,
              ListToolsRequestSchema,
              Tool,
            } from "@modelcontextprotocol/sdk/types.js";
            
            // 定义工具列表
            const TOOLS: Tool[] = [
              {
                name: "get_weather",
                description: "获取指定城市的实时天气信息",
                inputSchema: {
                  type: "object",
                  properties: {
                    city: {
                      type: "string",
                      description: "城市名称,如 \"北京\", \"上海\"",
                    },
                    units: {
                      type: "string",
                      enum: ["celsius", "fahrenheit"],
                      description: "温度单位",
                      default: "celsius",
                    },
                  },
                  required: ["city"],
                },
              },
              {
                name: "get_forecast",
                description: "获取未来 7 天天气预报",
                inputSchema: {
                  type: "object",
                  properties: {
                    city: { type: "string", description: "城市名称" },
                    days: {
                      type: "number",
                      description: "预报天数 (1-7)",
                      minimum: 1,
                      maximum: 7,
                    },
                  },
                  required: ["city", "days"],
                },
              },
            ];
            
            // 模拟天气数据(生产环境应调用真实 API)
            async function fetchWeather(city: string, units: string = "celsius") {
              // 实际项目中调用天气 API,如 OpenWeatherMap
              const mockData: Record<string, any> = {
                "北京": { temp: 28, condition: "晴", humidity: 45, wind: 12 },
                "上海": { temp: 30, condition: "多云", humidity: 65, wind: 18 },
                "广州": { temp: 33, condition: "雷阵雨", humidity: 80, wind: 10 },
                "深圳": { temp: 32, condition: "阴", humidity: 75, wind: 15 },
              };
              
              const data = mockData[city] || { temp: 25, condition: "未知", humidity: 50, wind: 10 };
              const temp = units === "fahrenheit" ? data.temp * 9/5 + 32 : data.temp;
              const unit = units === "fahrenheit" ? "°F" : "°C";
              
              return {
                city,
                temperature: `${temp}${unit}`,
                condition: data.condition,
                humidity: `${data.humidity}%`,
                windSpeed: `${data.wind} km/h`,
                updatedAt: new Date().toISOString(),
              };
            }
            
            async function fetchForecast(city: string, days: number) {
              const conditions = ["晴", "多云", "阴", "小雨", "雷阵雨"];
              const forecast = [];
              for (let i = 0; i < days; i++) {
                const date = new Date();
                date.setDate(date.getDate() + i);
                forecast.push({
                  date: date.toISOString().split("T")[0],
                  condition: conditions[Math.floor(Math.random() * conditions.length)],
                  highTemp: 25 + Math.floor(Math.random() * 10),
                  lowTemp: 18 + Math.floor(Math.random() * 8),
                });
              }
              return { city, forecast };
            }
            
            // 创建 Server 实例
            const server = new Server(
              { name: "weather-mcp-server", version: "1.0.0" },
              { capabilities: { tools: {} } }
            );
            
            // 处理工具列表请求
            server.setRequestHandler(ListToolsRequestSchema, async () => {
              return { tools: TOOLS };
            });
            
            // 处理工具调用请求
            server.setRequestHandler(CallToolRequestSchema, async (request) => {
              const { name, arguments: args } = request.params;
            
              try {
                if (name === "get_weather") {
                  const { city, units = "celsius" } = args as { city: string; units?: string };
                  const weather = await fetchWeather(city, units);
                  return {
                    content: [
                      {
                        type: "text",
                        text: JSON.stringify(weather, null, 2),
                      },
                    ],
                  };
                }
            
                if (name === "get_forecast") {
                  const { city, days } = args as { city: string; days: number };
                  const forecast = await fetchForecast(city, days);
                  return {
                    content: [
                      {
                        type: "text",
                        text: JSON.stringify(forecast, null, 2),
                      },
                    ],
                  };
                }
            
                throw new Error(`未知工具: ${name}`);
              } catch (error) {
                return {
                  content: [
                    {
                      type: "text",
                      text: `错误: ${error instanceof Error ? error.message : String(error)}`,
                    },
                  ],
                  isError: true,
                };
              }
            });
            
            // 启动 stdio 传输
            async function main() {
              const transport = new StdioServerTransport();
              await server.connect(transport);
              console.error("Weather MCP Server 已启动 (stdio 模式)");
            }
            
            main().catch(console.error);
            

Step 3: 编译与配置

json
            // package.json
            {
              "name": "mcp-weather-server",
              "version": "1.0.0",
              "type": "module",
              "bin": {
                "weather-mcp": "./dist/index.js"
              },
              "scripts": {
                "build": "tsc",
                "start": "node dist/index.js"
              },
              "dependencies": {
                "@modelcontextprotocol/sdk": "^1.0.0",
                "zod": "^3.22.0"
              }
            }
            
json
            // tsconfig.json
            {
              "compilerOptions": {
                "target": "ES2022",
                "module": "Node16",
                "moduleResolution": "Node16",
                "outDir": "./dist",
                "rootDir": "./src",
                "strict": true,
                "esModuleInterop": true,
                "skipLibCheck": true
              }
            }
            
bash
            npm run build
            

Step 4: 在 Claude Code 中配置

json
            // ~/Library/Application Support/Claude/settings.json (macOS)
            // 或 ~/.config/claude/settings.json (Linux)
            {
              "mcpServers": {
                "weather": {
                  "command": "node",
                  "args": ["/absolute/path/to/mcp-weather-server/dist/index.js"]
                }
              }
            }
            

配置完成后,在 Claude Code 中即可直接调用:


            User: 帮我查一下北京今天的天气
            Claude: 我来为您查询北京的天气信息...
            [调用 get_weather 工具]
            北京今天晴,气温 28°C,湿度 45%,风速 12 km/h
            

进阶实战:数据库 MCP Server

连接 PostgreSQL 数据库,让 AI 直接查询数据:

typescript
            // src/db-server.ts
            import { Server } from "@modelcontextprotocol/sdk/server/index.js";
            import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
            import {
              CallToolRequestSchema,
              ListToolsRequestSchema,
            } from "@modelcontextprotocol/sdk/types.js";
            import { Pool } from "pg";
            
            const pool = new Pool({
              host: process.env.DB_HOST || "localhost",
              port: parseInt(process.env.DB_PORT || "5432"),
              database: process.env.DB_NAME,
              user: process.env.DB_USER,
              password: process.env.DB_PASSWORD,
            });
            
            const server = new Server(
              { name: "postgres-mcp", version: "1.0.0" },
              { capabilities: { tools: {} } }
            );
            
            server.setRequestHandler(ListToolsRequestSchema, async () => ({
              tools: [
                {
                  name: "query",
                  description: "执行只读 SQL 查询(SELECT 语句)",
                  inputSchema: {
                    type: "object",
                    properties: {
                      sql: {
                        type: "string",
                        description: "SELECT SQL 语句",
                      },
                      params: {
                        type: "array",
                        description: "查询参数",
                        items: { type: "string" },
                      },
                    },
                    required: ["sql"],
                  },
                },
                {
                  name: "list_tables",
                  description: "列出数据库中的所有表",
                  inputSchema: {
                    type: "object",
                    properties: {},
                  },
                },
                {
                  name: "describe_table",
                  description: "获取表结构信息",
                  inputSchema: {
                    type: "object",
                    properties: {
                      table: { type: "string", description: "表名" },
                    },
                    required: ["table"],
                  },
                },
              ],
            }));
            
            server.setRequestHandler(CallToolRequestSchema, async (request) => {
              const { name, arguments: args } = request.params;
            
              try {
                if (name === "query") {
                  const { sql, params = [] } = args as { sql: string; params?: string[] };
                  
                  // 安全检查:只允许 SELECT
                  const normalizedSql = sql.trim().toLowerCase();
                  if (!normalizedSql.startsWith("select")) {
                    throw new Error("只允许执行 SELECT 查询");
                  }
            
                  const result = await pool.query(sql, params);
                  return {
                    content: [
                      {
                        type: "text",
                        text: JSON.stringify(result.rows, null, 2),
                      },
                    ],
                  };
                }
            
                if (name === "list_tables") {
                  const result = await pool.query(`
                    SELECT table_name 
                    FROM information_schema.tables 
                    WHERE table_schema = 'public'
                  `);
                  return {
                    content: [
                      {
                        type: "text",
                        text: result.rows.map((r) => r.table_name).join("\n"),
                      },
                    ],
                  };
                }
            
                if (name === "describe_table") {
                  const { table } = args as { table: string };
                  const result = await pool.query(`
                    SELECT column_name, data_type, is_nullable
                    FROM information_schema.columns
                    WHERE table_name = $1
                  `, [table]);
                  return {
                    content: [
                      {
                        type: "text",
                        text: JSON.stringify(result.rows, null, 2),
                      },
                    ],
                  };
                }
            
                throw new Error(`未知工具: ${name}`);
              } catch (error) {
                return {
                  content: [
                    {
                      type: "text",
                      text: `错误: ${error instanceof Error ? error.message : String(error)}`,
                    },
                  ],
                  isError: true,
                };
              }
            });
            
            async function main() {
              const transport = new StdioServerTransport();
              await server.connect(transport);
              console.error("PostgreSQL MCP Server 已启动");
            }
            
            main().catch(console.error);
            

MCP 协议传输层详解

stdio 传输(最常用)

适用于本地工具,通过标准输入输出通信:

typescript
            import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
            const transport = new StdioServerTransport();
            

优点:简单、无需网络、适合命令行工具

SSE 传输(Server-Sent Events)

适用于远程服务,基于 HTTP:

typescript
            import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
            import express from "express";
            
            const app = express();
            
            app.get("/sse", async (req, res) => {
              const transport = new SSEServerTransport("/message", res);
              await server.connect(transport);
            });
            
            app.post("/message", async (req, res) => {
              // 处理客户端消息
            });
            
            app.listen(3000);
            

实操步骤:自建 MCP Server 完整流程

Step 1: 确定功能范围

明确你的 MCP Server 要提供什么能力:

Step 2: 设计工具接口

使用 JSON Schema 定义输入参数:

typescript
            const toolSchema = {
              name: "send_email",
              description: "发送邮件",
              inputSchema: {
                type: "object",
                properties: {
                  to: { type: "string", format: "email" },
                  subject: { type: "string", minLength: 1 },
                  body: { type: "string" },
                },
                required: ["to", "subject", "body"],
              },
            };
            

Step 3: 实现业务逻辑

封装具体的业务操作,保持 MCP Server 层薄、业务层厚。

Step 4: 安全加固

typescript
            // 输入验证
            import { z } from "zod";
            
            const QuerySchema = z.object({
              sql: z.string().regex(/^select/i, "只允许 SELECT 语句"),
              params: z.array(z.string()).optional(),
            });
            
            // 执行前验证
            QuerySchema.parse(args);
            

Step 5: 测试与部署

bash
            # 本地测试
            node dist/index.js < test-input.json
            
            # 部署为系统服务
            npx mcp-server-deployer ./dist/index.js --name my-server
            

常见问题 FAQ

Q1: MCP 和 Function Calling 有什么区别?

A: Function Calling 是模型层面的能力,由 AI 模型决定是否调用函数;MCP 是协议层面的标准,定义了 AI 与外部系统通信的规范。MCP 可以封装 Function Calling,但适用范围更广。

Q2: 一个 MCP Server 可以暴露多个工具吗?

A: 可以。一个 MCP Server 可以同时暴露多个 Tools、Resources 和 Prompts,通过 ListToolsRequestSchema 返回全部列表。

Q3: MCP 支持哪些编程语言?

A: 官方提供 TypeScript/JavaScript、Python SDK,社区还有 Go、Rust、Java 的实现。协议基于 JSON-RPC,任何语言都可以实现。

Q4: 如何处理 MCP Server 的身份验证?

A: 传输层负责身份验证。stdio 模式依赖操作系统权限;SSE/HTTP 模式可以在 Header 中传递 Token,或在连接建立时进行 OAuth 交换。

Q5: MCP Server 崩溃会影响 AI 客户端吗?

A: 不会。MCP 设计为松耦合,Server 崩溃后客户端会收到断开通知,AI 仍然可以正常对话,只是相关工具不可用。

Q6: 如何调试 MCP Server?

A: 将日志输出到 stderr(stdio 模式下 stdout 被协议占用):

typescript
            console.error("调试信息:", data);
            

Q7: 可以在浏览器中使用 MCP 吗?

A: 可以。通过 SSE 或 WebSocket 传输,浏览器端 MCP Client 可以连接远程 MCP Server。

Q8: 现有 API 如何快速包装成 MCP Server?

A: 使用转换层模式:

typescript
            // 保持原有 API 不变,添加 MCP 适配层
            server.setRequestHandler(CallToolRequestSchema, async (req) => {
              const result = await existingApi.call(req.params.arguments);
              return { content: [{ type: "text", text: JSON.stringify(result) }] };
            });
            

MCP 生态与未来展望

截至 2026 年,MCP 生态已经涌现出大量开源 Server:

原语作用示例

|------|----------|------|

| 文件系统 | mcp-filesystem | 读写本地文件 |

| 数据库 | mcp-postgres, mcp-mysql | SQL 查询 |

| 浏览器 | mcp-puppeteer | 网页操作 |

| 版本控制 | mcp-git | Git 命令 |

| 云服务 | mcp-aws, mcp-gcp | 云资源管理 |

MCP 的未来发展方向包括:

  1. 多模态支持:处理图片、音频、视频资源
  2. 联邦架构:多个 MCP Server 自动协调
  3. 安全沙箱:限制 Server 的访问权限
  4. 标准化市场:类似应用商店的 MCP Server 分发

结语

MCP 协议正在重塑 AI 与外部世界的连接方式。通过本文的实战演练,你已经掌握了从基础概念到数据库集成的完整能力。下一步,尝试为你的业务系统封装一个 MCP Server,让 AI 真正「触达」你的数据。


本文首发于 1630.top,转载请注明出处。

类别代表项目功能