芥末
发布于 2025-12-08 / 0 阅读
0
0

MCP 原理解析与数据库智能查询实践

MCP(Model Context Protocol,模型上下文协议)解决的是一个很现实的问题:大语言模型本身只会生成文本,但真实业务系统里的数据、工具和服务都在模型外部。模型要查数据库、读文件、访问接口、分析日志、调用运维工具,就必须有一套稳定、统一、安全的连接方式。

没有统一协议时,每个 AI 应用都要为每个工具单独适配。接入 MySQL 要写一套调用逻辑,接入知识库要写一套调用逻辑,换一个模型或者换一个客户端,又可能重写一遍。MCP 的目标就是把这部分连接方式标准化,让 AI 应用像接入统一接口一样使用外部能力。

可以把 MCP 理解成大语言模型时代的“工具和数据接入协议”。它不负责替代模型,也不负责替代数据库,而是规定:

  • AI 应用怎样发现外部工具;
  • 工具怎样声明自己的参数;
  • 客户端怎样调用工具;
  • 服务端怎样返回结构化结果;
  • 模型能够访问哪些资源,哪些操作必须经过用户确认;
  • 本地进程、远程服务、HTTP 流式通信怎样承载这些消息。

MCP 解决了什么问题

大语言模型(LLM,Large Language Model)有两类典型短板。

一类是知识边界问题。模型训练完成后,无法天然知道企业内部文档、实时数据库记录、最新日志和业务系统状态。RAG(Retrieval-Augmented Generation,检索增强生成)主要解决这一类问题。

另一类是行动能力问题。模型可以判断“应该查询订单表”,但它不能自己连接数据库执行 SQL。Function Calling(函数调用)主要解决这一类问题。

MCP 则进一步把“检索数据”和“执行工具”都纳入统一协议中。

flowchart LR
    User[用户问题] --> App[AI 应用 / MCP Host]
    App --> LLM[大语言模型]
    LLM --> Decision{需要外部能力吗}
    Decision -->|不需要| Answer[直接生成回答]
    Decision -->|需要| Client[MCP Client]
    Client --> Server[MCP Server]
    Server --> Tools[工具 / API / 数据库 / 文件]
    Tools --> Server
    Server --> Client
    Client --> LLM
    LLM --> Answer

RAG、Function Calling 与 MCP 的关系

RAG 的核心是“给模型补充知识”。用户提出问题后,系统先从知识库、向量数据库或文档系统中检索相关内容,再把检索结果拼到提示词中交给模型生成答案。

典型流程如下:

flowchart LR
    Q[用户问题] --> E[问题向量化]
    E --> V[(向量数据库 / 知识库)]
    V --> C[检索相关上下文]
    C --> P[拼接提示词]
    P --> LLM[大语言模型]
    LLM --> A[生成回答]

Function Calling 的核心是“让模型决定调用哪个函数”。应用把可用函数列表、函数描述和参数结构告诉模型,模型根据用户意图输出函数名和参数,应用执行函数后再把结果交回模型。

sequenceDiagram
    participant U as 用户
    participant A as AI 应用
    participant M as 大语言模型
    participant F as 外部函数 / API

    U->>A: 查询某个订单状态
    A->>M: 用户问题 + 可用函数列表
    M-->>A: 调用 get_order_status(order_id)
    A->>F: 执行函数
    F-->>A: 返回订单状态
    A->>M: 函数结果
    M-->>A: 组织自然语言回答
    A-->>U: 返回结果

Function Calling 的问题在于适配成本高。不同模型的工具调用格式可能不同,不同业务工具也要分别封装,参数变更后还要同步修改客户端逻辑。MCP 把这些能力抽象成标准的 Client/Server 模型,工具和资源由 Server 暴露,AI 应用通过 Client 发现并调用。

能力主要解决的问题典型方式局限
RAG模型不知道外部知识检索文档、拼接上下文偏向读取信息,不擅长执行动作
Function Calling模型不能操作外部系统模型输出函数名和参数,应用执行工具适配分散,跨平台复用差
MCP统一连接数据源和工具Server 暴露资源、工具、提示词,Client 标准调用需要额外维护 MCP Server 和权限边界

MCP 可以承载 RAG 的数据源,也可以承载 Function Calling 的外部工具。它更像一个标准化连接层,让工具提供方和 AI 应用方按照同一套协议协作。

MCP 的整体架构

MCP 采用客户端—服务器模式,核心组件有三个。

组件作用常见形态
MCP Host承载大模型交互的宿主应用Claude Desktop、Cursor、Cline、Cherry Studio、IDE 插件
MCP Client集成在 Host 中的协议客户端,负责连接 Server、发现能力、发送调用请求Host 内置模块或自建客户端
MCP Server对外暴露资源、工具和提示词的轻量服务本地 Python/Node.js 进程、远程 HTTP 服务、企业内部工具网关

整体关系如下:

flowchart LR
    subgraph Host[MCP Host:AI 应用]
        UI[用户界面]
        LLM[大语言模型]
        Client[MCP Client]
    end

    subgraph Server[MCP Server]
        Resources[Resources 资源]
        Tools[Tools 工具]
        Prompts[Prompts 提示词]
    end

    UI --> LLM
    LLM --> Client
    Client <--> Server
    Resources --> DB[(数据库)]
    Tools --> API[外部 API / 运维工具]
    Prompts --> Templates[任务模板]

当用户在 Host 中输入问题时,Host 会让模型判断是否需要外部能力。如果需要,MCP Client 根据 Server 暴露出来的工具描述和参数结构发起调用。Server 执行对应逻辑后,把结构化结果返回给 Client,Client 再把结果交给模型生成最终回答。

典型运行链路如下:

sequenceDiagram
    participant U as 用户
    participant H as MCP Host
    participant M as 大语言模型
    participant C as MCP Client
    participant S as MCP Server
    participant D as 数据库 / 外部系统

    U->>H: 输入自然语言问题
    H->>M: 提交问题和可用工具信息
    M-->>H: 判断需要调用工具
    H->>U: 请求确认敏感操作
    U-->>H: 同意
    H->>C: 发起工具调用
    C->>S: JSON-RPC tools/call
    S->>D: 执行查询或调用接口
    D-->>S: 返回结果
    S-->>C: 返回结构化数据
    C-->>H: 返回调用结果
    H->>M: 问题 + 工具结果
    M-->>H: 生成回答
    H-->>U: 展示结果

如果使用 Cline、Cursor 这类已经支持 MCP 的 Host,通常不需要自己写 MCP Client,只需要开发 MCP Server,然后在客户端配置 JSON 即可。

MCP Server 的三类核心能力

MCP Server 主要暴露三类能力:

  • Resources:资源;
  • Tools:工具;
  • Prompts:提示词。

三者职责不同,但经常配合使用。

flowchart TB
    Resources[资源 Resources<br/>提供可读取数据] --> Tools[工具 Tools<br/>执行动作或查询]
    Prompts[提示词 Prompts<br/>约束任务流程和输出格式] --> Tools
    Prompts --> Resources
    Tools --> Result[结构化结果]

Resources:把外部数据交给模型使用

资源是 MCP Server 暴露给客户端的可读取数据。它类似 REST API 里的 GET 接口,强调读取数据,不应执行大量计算,也不应产生明显副作用。

资源可以是:

  • 文件内容,例如 Markdown、JSON、XML、源代码、配置文件;
  • 数据库元数据或查询结果;
  • 日志、监控指标、系统状态;
  • 图片、截图、音频等二进制数据;
  • 企业内部知识库中的文档片段。

资源通过 URI(Uniform Resource Identifier,统一资源标识符)寻址,例如:

file:///home/app/config.yaml
logs://session-123/10
mysql://demo_db/users
postgres://warehouse/orders/schema

一个查询日志资源可以这样定义:

from typing import Any, Dict

@mcp.resource("logs://{session_id}/{limit}")
def get_query_logs(limit: str = "5", session_id: str = "anonymous") -> Dict[str, Any]:
    """读取指定会话最近的 SQL 查询日志。"""
    try:
        limit_val = int(limit)
        if limit_val <= 0:
            return {
                "success": False,
                "error": "limit must be a positive integer",
            }

        logs = query_logger.get_logs(
            session_id=session_id,
            limit=limit_val,
        )
        total = query_logger.total_query_count(session_id=session_id)

        return {
            "success": True,
            "logs": logs,
            "total_queries": total,
        }
    except Exception as exc:
        return {
            "success": False,
            "error": str(exc),
        }

资源的关键点是“可控读取”。模型不能随意扫描服务器上的所有文件,也不能直接访问任意数据库表。Server 暴露什么 URI,Client 才能发现什么资源;应用层或用户决定哪些资源可以注入给模型。

Tools:把外部动作封装成可调用函数

工具是 MCP Server 暴露的可执行能力。它可以查询数据库、调用接口、分析 SQL、检查表结构、读取慢查询日志,也可以执行运维诊断动作。

工具通过名称、描述和 JSON Schema 声明参数。模型根据工具描述判断是否调用,并生成符合 Schema 的参数。

一个数据库连接工具的参数声明可以这样设计:

from mcp.types import Tool

def as_tool(self) -> Tool:
    return Tool(
        name=self.name,
        description=self.description,
        inputSchema={
            "type": "object",
            "properties": {
                "host": {
                    "type": "string",
                    "description": "数据库主机地址",
                },
                "port": {
                    "type": "integer",
                    "description": "数据库端口",
                },
                "user": {
                    "type": "string",
                    "description": "数据库用户名",
                },
                "password": {
                    "type": "string",
                    "description": "数据库密码",
                },
                "database": {
                    "type": "string",
                    "description": "数据库名称",
                },
            },
            "required": ["host", "port", "user", "password", "database"],
        },
    )

数据库查询工具可以限制为只读事务,避免模型执行写操作:

from typing import Any, Dict
import pymysql

@mcp.tool()
def query_data(sql: str, session_id: str = "anonymous") -> Dict[str, Any]:
    """执行只读 SQL 查询。"""
    conn = get_connection()
    cursor = None

    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)

        cursor.execute("SET TRANSACTION READ ONLY")
        cursor.execute("START TRANSACTION")

        try:
            cursor.execute(sql)
            results = cursor.fetchall()
            conn.commit()

            log_query(
                operation=sql,
                success=True,
                session_id=session_id,
            )

            return {
                "success": True,
                "results": results,
                "rowCount": len(results),
            }
        except Exception as exc:
            conn.rollback()
            log_query(
                operation=sql,
                success=False,
                error=str(exc),
                session_id=session_id,
            )
            return {
                "success": False,
                "error": str(exc),
            }
    finally:
        if cursor:
            cursor.close()
        conn.close()

工具调用通常采用“模型判断 + 用户确认”的双轨机制:

环节控制方目的
是否需要调用工具大语言模型根据用户意图选择工具
是否允许执行敏感操作用户或应用策略避免越权、误删、误改
工具具体执行逻辑MCP Server统一鉴权、审计、限流、参数校验
结果解释大语言模型把结构化结果转成用户可理解的回答

Prompts:把任务流程固化成模板

提示词是服务端定义的可复用任务模板。它可以约束模型如何理解任务、如何选择工具、如何生成 SQL、如何输出结果。

对于数据库智能查询,提示词尤其重要。因为用户通常说的是自然语言,例如:

查一下最近一周失败订单数量最高的商品。

模型要完成这个问题,需要知道:

  • 当前数据库名称;
  • 有哪些表;
  • 每张表的字段;
  • 字段含义和类型;
  • SQL 只能查询,不能更新;
  • 返回结果应该是表格、JSON 还是 Markdown;
  • 是否需要给出执行计划和索引建议。

服务端可以把数据库元数据动态填入提示词模板:

@mcp.prompt()
def generate_db_gpt_prompt() -> str:
    """生成数据库问答任务提示词。"""
    tables_info = get_tables()
    database_name = tables_info["database"]
    tables = tables_info["tables"]

    table_definitions = []
    for table in tables:
        table_desc = get_table_description(table)
        if table_desc.get("success"):
            table_definitions.append(table_desc["table_definition"])

    return DB_GPT_SYSTEM_PROMPT.format(
        database_name=database_name,
        table_definitions="\n".join(table_definitions),
    )

一个简化后的数据库提示词模板如下:

DB_GPT_SYSTEM_PROMPT = """
你是一个数据库查询助手,需要根据用户问题生成安全、正确的 MySQL 查询语句。

数据库名:
{database_name}

可用表结构:
{table_definitions}

约束:
1. 只能生成 DQL 查询语句,例如 SELECT。
2. 只能使用已提供的表和字段。
3. 查询结果最多限制为 10000 行。
4. 生成 SQL 后需要检查语法和字段是否存在。
5. 如果可以给出索引优化建议,需要结合表结构和查询条件说明原因。
6. 输出必须是 JSON。

用户问题:
{user_question}

返回格式:
{{
  "thoughts": "分析思路",
  "sql": "SQL 查询语句",
  "display_type": "Table 或 Text",
  "index_advice": "索引建议"
}}
"""

提示词适合描述“应该怎样完成任务”,工具适合执行“确定的外部动作”,资源适合提供“可读取上下文”。三者配合后,MCP Server 才能从一个函数集合变成一个可编排的能力服务。

MCP Client 怎样和 Server 交互

MCP Client 负责建立连接、初始化会话、发现工具和资源、调用工具、读取资源。使用 Python SDK 时,核心对象通常是 ClientSession

一个客户端连接本地 Server 脚本的流程如下:

from contextlib import AsyncExitStack
from typing import Optional

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


class MCPClient:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()

    async def connect_to_server(self, server_script_path: str):
        is_python = server_script_path.endswith(".py")
        is_js = server_script_path.endswith(".js")

        if not (is_python or is_js):
            raise ValueError("server script must be a .py or .js file")

        command = "python" if is_python else "node"

        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None,
        )

        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )

        read_stream, write_stream = stdio_transport

        self.session = await self.exit_stack.enter_async_context(
            ClientSession(read_stream, write_stream)
        )

        await self.session.initialize()

        tools_response = await self.session.list_tools()
        print("tools:", [tool.name for tool in tools_response.tools])

        resources_response = await self.session.list_resources()
        print("resources:", [resource.uri for resource in resources_response.resources])

        prompts_response = await self.session.list_prompts()
        print("prompts:", [prompt.name for prompt in prompts_response.prompts])

客户端和服务端之间的常见交互可以拆成三类:

操作MCP 方法作用
初始化initialize建立协议会话,交换能力信息
工具发现tools/list获取 Server 暴露的工具
工具调用tools/call调用某个工具并传入参数
资源发现resources/list获取可读取资源列表
资源读取resources/read根据 URI 读取资源
提示词发现prompts/list获取提示词模板列表
提示词获取prompts/get获取动态填充后的提示词

数据库问答场景中,Client 的核心工作流程通常是:

flowchart TB
    Q[用户自然语言问题] --> P[Client 获取数据库提示词]
    P --> LLM[调用大语言模型]
    LLM --> J[模型返回 JSON<br/>包含 thoughts / sql / display_type]
    J --> HasSQL{是否包含 SQL}
    HasSQL -->|否| Direct[直接返回模型回答]
    HasSQL -->|是| Call[Client 调用 query_data 工具]
    Call --> Server[MCP Server 执行只读 SQL]
    Server --> Result[返回结构化查询结果]
    Result --> Merge[合并思路、SQL、结果]
    Merge --> A[返回给用户]

对应代码可以这样写:

import json

async def process_query(self, query: str) -> str:
    """使用大语言模型处理数据库相关问题。"""
    prompt_result = await self.session.get_prompt("generate_db_gpt_prompt")
    system_prompt = prompt_result.messages[0].content.text

    llm_response = self.llm.chat(
        system_prompt=system_prompt,
        content=query,
        response_format="json_object",
        conversation_history=self.conversation_history,
    )

    response_data = json.loads(llm_response)

    self.conversation_history.append({
        "role": "user",
        "content": query,
    })
    self.conversation_history.append({
        "role": "assistant",
        "content": llm_response,
    })

    if not response_data.get("sql"):
        return json.dumps(response_data, ensure_ascii=False, indent=2)

    query_result = await self.session.call_tool(
        "query_data",
        {
            "sql": response_data["sql"],
            "session_id": self.session_id,
        },
    )

    final_response = {
        "thoughts": response_data.get("thoughts"),
        "sql": response_data["sql"],
        "display_type": response_data.get("display_type", "Table"),
        "results": (
            json.loads(query_result.content[0].text)
            if query_result.content and query_result.content[0].text
            else None
        ),
    }

    return json.dumps(final_response, ensure_ascii=False, indent=2)

这个流程完成了自然语言到 SQL、SQL 到工具调用、工具结果到自然语言解释的闭环。

MCP 的底层通信:JSON-RPC 2.0

MCP 使用 JSON-RPC 2.0 作为消息格式。JSON-RPC 是一种轻量级远程过程调用协议,消息是 JSON 对象,适合跨语言、跨进程、跨网络传输。

一个请求消息通常包含:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "query_data",
    "arguments": {
      "sql": "SELECT * FROM users LIMIT 10",
      "session_id": "session-001"
    }
  }
}

一个成功响应通常包含:

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "{\"success\": true, \"rowCount\": 10}"
      }
    ]
  }
}

一个错误响应通常包含:

{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32602,
    "message": "Invalid params"
  }
}

MCP 中常见的 JSON-RPC 消息类型如下:

类型是否需要响应用途
Request初始化、工具调用、资源读取
Response返回请求结果
Error返回错误码和错误信息
Notification发送异步事件,例如进度通知、状态变化

在 Stdio 传输中,消息发送时会被序列化成 JSON 字符串,并追加换行符:

json_payload = session_message.message.model_dump_json(
    by_alias=True,
    exclude_none=True,
)

await process.stdin.send((json_payload + "\n").encode("utf-8"))

接收方按行读取,再反序列化成 JSON-RPC 消息:

line = await read_line()

try:
    message = types.JSONRPCMessage.model_validate_json(line)
except Exception as exc:
    await read_stream_writer.send(exc)
else:
    await read_stream_writer.send(SessionMessage(message))

JSON-RPC 只规定消息格式,不规定消息走哪种传输通道。MCP 在传输层支持 Stdio、SSE、Streamable HTTP 等方式。

MCP 的传输方式

不同部署场景适合不同传输方式。

传输方式通信载体适合场景特点
Stdio标准输入 / 标准输出本地插件、本地脚本、IDE 集成简单、低延迟、只适合本机
SSEHTTP POST + Server-Sent Events远程服务、云端工具基于 HTTP,服务端可推送消息
Streamable HTTPHTTP POST + 可选 SSE 流远程 MCP 服务、需要会话管理的场景支持有状态会话、恢复、流式响应
WebSocket双向长连接需要强实时双向通信的系统MCP 生态中不一定作为首选标准传输

Stdio:本地进程通信

Stdio 适合 Host 启动本地 MCP Server。Client 把 Server 当成子进程启动,然后通过标准输入输出交换 JSON-RPC 消息。

sequenceDiagram
    participant C as MCP Client
    participant P as Server 子进程
    participant S as MCP Server

    C->>P: 启动子进程
    C->>S: 向 stdin 写入 JSON-RPC 请求
    S->>S: 解析请求并执行工具
    S-->>C: 从 stdout 输出 JSON-RPC 响应
    C->>P: 关闭 stdin / 终止进程

服务端可以这样启动 Stdio 模式:

import asyncio
from mcp.server.stdio import stdio_server

async def run_stdio():
    await initialize_global_resources()

    try:
        async with stdio_server() as (read_stream, write_stream):
            await app.run(
                read_stream,
                write_stream,
                app.create_initialization_options(),
            )
    finally:
        await close_global_resources()


if mode == "stdio":
    asyncio.run(run_stdio())

Cline 这类客户端可以通过 JSON 配置启动本地 Server:

{
  "mcp_db": {
    "timeout": 60,
    "type": "stdio",
    "command": "uv",
    "args": [
      "--directory",
      "/path/to/mcp_for_db/src",
      "run",
      "-m",
      "server.mcp.server_mysql",
      "--mode",
      "stdio"
    ],
    "env": {
      "MYSQL_HOST": "localhost",
      "MYSQL_PORT": "3306",
      "MYSQL_USER": "root",
      "MYSQL_PASSWORD": "password",
      "MYSQL_DATABASE": "mcp_db",
      "MYSQL_ROLE": "admin",
      "PYTHONPATH": "/path/to/MCP-DB"
    }
  }
}

Stdio 的优势是简单,不需要额外启动 Web 服务。缺点是只能用于本机进程间通信,不适合作为多用户共享的远程服务。

SSE:用 HTTP 模拟双向通信

SSE(Server-Sent Events,服务器发送事件)本身是服务端到客户端的单向推送。MCP 使用 HTTP POST 承载客户端到服务端的消息,用 SSE 承载服务端到客户端的消息,从而组合出双向通信效果。

sequenceDiagram
    participant C as MCP Client
    participant S as MCP Server

    C->>S: GET /sse 建立 SSE 连接
    S-->>C: endpoint 事件,返回 POST 地址
    C->>S: POST /messages 发送 JSON-RPC 请求
    S-->>C: HTTP 状态码表示已接收
    S-->>C: SSE message 事件返回 JSON-RPC 响应
    C->>S: 关闭 SSE 连接

SSE 消息通常包含事件类型和数据:

event: message
data: {"jsonrpc":"2.0","id":1,"result":{"content":[]}}

服务端核心逻辑可以这样组织:

from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.responses import Response
import uvicorn

from mcp.server.sse import SseServerTransport

def run_sse():
    sse = SseServerTransport("/messages/")

    async def handle_sse(request):
        async with sse.connect_sse(
            request.scope,
            request.receive,
            request.send,
        ) as streams:
            await app.run(
                streams[0],
                streams[1],
                app.create_initialization_options(),
            )

        return Response(status_code=204)

    starlette_app = Starlette(
        routes=[
            Route("/sse", endpoint=handle_sse),
            Mount("/messages/", app=sse.handle_post_message),
        ],
    )

    config = uvicorn.Config(
        app=starlette_app,
        host="0.0.0.0",
        port=9000,
        loop="asyncio",
    )

    uvicorn.Server(config).run()

客户端配置可以简化为:

{
  "mysql_mcp_server": {
    "timeout": 60,
    "type": "sse",
    "url": "http://localhost:9000/sse"
  }
}

SSE 适合远程部署,尤其适合需要服务端推送进度、结果或异步事件的场景。

Streamable HTTP:更完整的 HTTP 传输

Streamable HTTP 在 HTTP POST 的基础上支持流式响应和会话管理。它可以工作在有状态模式,也可以工作在无状态模式。

flowchart TB
    Init[Client POST initialize] --> Session[Server 分配 mcp-session-id]
    Session --> Post[Client POST JSON-RPC 请求]
    Post --> Mode{响应方式}
    Mode -->|JSON| JsonResp[直接返回 JSON-RPC 响应]
    Mode -->|SSE Stream| Stream[返回 SSE 流式响应]
    Session --> Get[Client GET 建立服务器推送流]
    Get --> Push[Server 主动推送请求或通知]

有状态模式会维护会话 ID,适合连接恢复、长任务和多轮交互。无状态模式每个请求创建独立传输实例,实现简单,但无法保留会话上下文。

客户端使用方式类似:

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async with streamablehttp_client("http://localhost:9000/mcp") as (
    read_stream,
    write_stream,
    _,
):
    async with ClientSession(read_stream, write_stream) as session:
        result = await session.initialize()
        tools = await session.list_tools()

MCP 与 REST API、WebSocket 的差异

MCP、REST API 和 WebSocket 不是同一层面的完全替代关系。REST API 偏向资源接口风格,WebSocket 偏向传输通道,MCP 则定义了 AI 应用和工具服务之间的语义协议。

对比项MCPREST APIWebSocket
核心定位AI 工具和上下文协议通用 HTTP 接口风格双向通信通道
是否定义工具发现通常需要额外文档
是否定义资源/提示词
消息格式JSON-RPC 2.0HTTP 方法 + JSON 等自定义消息
适合模型调用高,工具描述和参数 Schema 标准化中,需要额外适配中,需要自定义协议
典型场景LLM 调用工具、读取上下文业务系统接口实时聊天、推送、协同编辑

REST API 可以作为 MCP Server 背后的真实工具。也就是说,MCP Server 可以把某个 REST API 封装成 MCP 工具,提供给模型调用。

搭建一个 MCP 项目环境

Python 版 MCP SDK 要求 Python 3.10 或更高版本,常用依赖管理工具是 uv。

macOS 安装 uv:

curl -LsSf https://astral.sh/uv/install.sh | sh
source "$HOME/.local/bin/env"

uv --version

创建项目并安装 MCP CLI:

uv init MCP-DB
cd MCP-DB

uv add "mcp[cli]"

uv run mcp --help

一个最小 MCP Server 可以这样写:

# server.py
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("demo-server")


@mcp.tool()
def add(a: int, b: int) -> int:
    """计算两个整数之和。"""
    return a + b


@mcp.resource("config://app")
def get_config() -> str:
    """读取应用配置。"""
    return "env=dev\nversion=1.0.0"


@mcp.prompt()
def explain_prompt(topic: str) -> str:
    return f"请用工程师能理解的方式解释:{topic}"


if __name__ == "__main__":
    mcp.run()

通过 uv 启动:

uv run python server.py

真实项目通常不会只写一个文件,而是把 Server、Tools、Resources、Prompts、配置、安全、日志拆开管理。

数据库 MCP Server 的工程化设计

数据库场景非常适合 MCP,但也更需要控制风险。一个数据库 MCP Server 不应该只是简单执行模型给出的 SQL,而要具备元数据读取、SQL 鉴权、连接池、日志审计、提示词编排和多用户隔离能力。

一个较完整的目录可以这样组织:

DW-DBA-MCP/
├── datas/
│   ├── files/                  # 工具执行过的 SQL 文件
│   ├── logs/                   # 服务运行日志
│   └── version/
├── mcp_for_db/
│   ├── client/
│   │   ├── api.py              # FastAPI 接口
│   │   └── client.py           # 自建 MCP Client
│   ├── debug/
│   │   └── mcp_logger.py       # 记录 Client 与 Server 通信数据
│   ├── envs/
│   │   ├── common.env
│   │   ├── dify.env
│   │   └── mysql.env
│   ├── server/
│   │   ├── common/
│   │   │   ├── prompts.py      # 提示词模板
│   │   │   └── tools.py        # 工具描述
│   │   ├── core/
│   │   │   ├── base_server.py
│   │   │   ├── config_manager.py
│   │   │   ├── env_distribute.py
│   │   │   └── service_manager.py
│   │   ├── server_mysql/
│   │   │   ├── mysql_server.py
│   │   │   ├── prompts/
│   │   │   ├── resources/
│   │   │   └── tools/
│   │   ├── server_dify/
│   │   │   ├── dify_server.py
│   │   │   └── tools/
│   │   └── shared/
│   │       ├── oauth/
│   │       ├── security/       # SQL 鉴权
│   │       ├── templates/
│   │       └── utils/
├── pyproject.toml
├── requirements.txt
└── uv.lock

整体结构可以抽象成:

flowchart TB
    API[FastAPI 接口] --> Client[自建 MCP Client]
    Client --> Manager[多 MCP Server 管理器]

    Manager --> MySQL[MySQL MCP Server]
    Manager --> Dify[DiFy 知识库 MCP Server]
    Manager --> Other[其他 MCP Server]

    MySQL --> Tools[数据库工具]
    MySQL --> Resources[数据库资源]
    MySQL --> Prompts[数据库提示词]

    Tools --> Security[SQL 鉴权 / 拦截]
    Security --> Pool[数据库连接池]
    Pool --> DB[(MySQL)]

    MySQL --> Logs[执行日志 / 审计日志]

数据库侧可以暴露哪些能力

能力示例风险控制
表结构查询获取库表、字段、注释、索引只读
SQL 执行执行 SELECT 查询限制 DQL,禁止 DDL/DML
执行计划分析EXPLAIN SELECT ...参数校验,超时控制
慢查询分析读取慢日志、给出优化建议敏感字段脱敏
表锁分析查看当前锁等待、阻塞链路限制系统表访问范围
健康检查连接数、事务、缓存命中率聚合指标,避免暴露敏感配置
知识库检索访问数据库规范、业务词典权限隔离

DQL(Data Query Language,数据查询语言)通常指 SELECT 查询。数据库 MCP Server 面向生产环境时,应默认只允许 DQL,把 INSERTUPDATEDELETEDROPALTER 等操作拦截掉。

自动注册 Resources、Tools 和 Prompts

工程化项目中,如果每新增一个工具都要手动到主 Server 注册,维护成本会越来越高。更好的方式是定义基类和注册表,让子类加载时自动注册。

资源自动注册的核心结构如下:

classDiagram
    class BaseResource {
        +name: str
        +description: str
        +uri: AnyUrl
        +mimeType: str
        +auto_register: bool
        +get_resource_descriptions()
        +read_resource(uri)
    }

    class ResourceRegistry {
        +register(resource_class)
        +register_instance(resource)
        +get_resource(uri)
        +get_all_resources()
    }

    class MySQLResource {
        +get_resource_descriptions()
        +read_resource(uri)
    }

    class TableResource {
        +read_resource(uri)
        +get_table_metadata(table_name)
    }

    BaseResource <|-- MySQLResource
    BaseResource <|-- TableResource
    ResourceRegistry --> BaseResource

Server 基类负责接入 MCP 路由

多服务基类只需要把 MCP 的 list_resourcesread_resource 转发给资源注册表:

from typing import List
from pydantic import AnyUrl
from mcp.types import Resource

async def setup_server(self):
    """设置 MCP Server 路由。"""
    if self.server_setup_completed:
        self.logger.debug("server routes already configured")
        return

    @self.server.list_resources()
    async def handle_list_resources() -> List[Resource]:
        registry = self.get_resource_registry()
        if registry is None:
            return []

        if hasattr(registry, "get_all_resources"):
            return await registry.get_all_resources()

        return []

    @self.server.read_resource()
    async def handle_read_resource(uri: AnyUrl) -> str:
        registry = self.get_resource_registry()
        if registry is None:
            raise ValueError("resource registry is not initialized")

        content = await registry.get_resource(uri)
        return content if content is not None else "null"

注册表负责管理资源实例

from typing import ClassVar, Dict, List, Type
from urllib.parse import urlparse
from pydantic import AnyUrl
from mcp.types import Resource

class ResourceRegistry:
    """管理所有资源实例。"""

    _resources: ClassVar[Dict[str, "BaseResource"]] = {}

    @classmethod
    def register(cls, resource_class: Type["BaseResource"]):
        resource = resource_class()
        cls._resources[str(resource.uri)] = resource

    @classmethod
    def register_instance(cls, resource: "BaseResource"):
        cls._resources[str(resource.uri)] = resource

    @classmethod
    async def get_resource(cls, uri: AnyUrl) -> str:
        parsed = urlparse(str(uri))
        path_parts = parsed.path.strip("/").split("/")

        if not path_parts or not path_parts[0]:
            raise ValueError(f"invalid resource uri: {uri}")

        uri_str = f"{parsed.scheme}://{parsed.netloc}/{parsed.path}"

        for resource in cls._resources.values():
            if str(resource.uri) == uri_str:
                return await resource.read_resource(uri)

        for resource in cls._resources.values():
            if str(resource.uri).endswith(path_parts[0]):
                return await resource.read_resource(uri)

        raise ValueError(f"unregistered resource: {uri}")

    @classmethod
    async def get_all_resources(cls) -> List[Resource]:
        result = []

        # 避免遍历过程中动态注册表资源导致字典变化。
        resources_copy = list(cls._resources.values())

        for resource in resources_copy:
            descriptions = await resource.get_resource_descriptions()
            result.extend(descriptions)

        return result

基类通过 __init_subclass__ 自动注册

from typing import List
from pydantic import AnyUrl
from mcp.types import Resource

class BaseResource:
    """资源基类。"""

    name: str = ""
    description: str = ""
    uri: AnyUrl | None = None
    mimeType: str = "text/plain"
    auto_register: bool = True

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)

        if cls.auto_register and cls.uri is not None:
            ResourceRegistry.register(cls)

    async def get_resource_descriptions(self) -> List[Resource]:
        raise NotImplementedError

    async def read_resource(self, uri: AnyUrl) -> str:
        raise NotImplementedError

有了这个基类,只要定义资源子类并被包导入,就会自动注册。

MySQL 表资源的实现思路

数据库资源可以分成两层:

  • 根资源:代表一个数据库,负责扫描表列表;
  • 表资源:代表具体表,负责读取表数据或表元数据。
flowchart TB
    MySQLResource[MySQLResource<br/>数据库根资源] --> Scan[扫描 information_schema.tables]
    Scan --> Table1[TableResource: users]
    Scan --> Table2[TableResource: orders]
    Scan --> Table3[TableResource: products]

    Table1 --> Read1[读取表数据 / 字段元数据]
    Table2 --> Read2[读取表数据 / 字段元数据]
    Table3 --> Read3[读取表数据 / 字段元数据]

表资源示例:

import aiomysql
from pydantic import AnyUrl

class TableResource(BaseResource):
    """代表具体数据库表。"""

    auto_register = False

    TABLE_EXISTS_QUERY = """
        SELECT COUNT(*) AS table_exists
        FROM information_schema.tables
        WHERE table_schema = %s AND table_name = %s
    """

    COLUMN_METADATA_QUERY = """
        SELECT COLUMN_NAME, DATA_TYPE
        FROM information_schema.columns
        WHERE table_schema = %s AND table_name = %s
        ORDER BY ORDINAL_POSITION
    """

    def __init__(self, db_name: str, table_name: str, description: str):
        self.db_name = db_name
        self.table_name = table_name
        self.name = f"table: {table_name}"
        self.uri = AnyUrl(f"mysql://{db_name}/{table_name}")
        self.description = description
        self.mimeType = "text/csv"

    async def get_resource_descriptions(self):
        return []

    async def read_resource(self, uri: AnyUrl) -> str:
        table_name = extract_table_name(uri)
        column_metadata = await self.get_table_metadata(table_name)

        async with get_current_database_manager().get_connection() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cursor:
                safe_query = build_safe_select_query(table_name)
                await cursor.execute(safe_query)

                columns = [col[0] for col in cursor.description]
                rows = await cursor.fetchall()

                return generate_csv(columns, rows, column_metadata)

    async def get_table_metadata(self, table_name: str):
        db_name = get_current_database_manager().get_current_config()["database"]

        async with get_current_database_manager().get_connection() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cursor:
                await cursor.execute(
                    self.TABLE_EXISTS_QUERY,
                    (db_name, table_name),
                )
                exists = await cursor.fetchone()

                if not exists or not exists["table_exists"]:
                    raise ValueError(
                        f"table {table_name!r} does not exist in database {db_name!r}"
                    )

                await cursor.execute(
                    self.COLUMN_METADATA_QUERY,
                    (db_name, table_name),
                )

                rows = await cursor.fetchall()
                return [
                    (row["COLUMN_NAME"], row["DATA_TYPE"])
                    for row in rows
                ]

数据库根资源负责发现所有表,并把每张表注册成 TableResource

from mcp.types import Resource
from pydantic import AnyUrl

class MySQLResource(BaseResource):
    """MySQL 数据库根资源。"""

    name = "MySQL 数据库"
    uri = AnyUrl("mysql://localhost/default")
    description = "提供 MySQL 数据库表访问能力"
    mimeType = "text/csv"
    auto_register = True

    TABLE_QUERY = """
        SELECT
            TABLE_NAME AS table_name,
            TABLE_COMMENT AS table_comment,
            TABLE_ROWS AS estimated_rows
        FROM information_schema.tables
        WHERE table_schema = %s
    """

    def __init__(self):
        self.cache = {}

    async def get_resource_descriptions(self):
        db_manager = get_current_database_manager()
        if db_manager is None:
            return []

        db_name = db_manager.get_current_config().get("database")
        if not db_name:
            return []

        if "table_descriptions" in self.cache:
            return self.cache["table_descriptions"]

        async with db_manager.get_connection() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cursor:
                await cursor.execute(self.TABLE_QUERY, (db_name,))
                tables = await cursor.fetchall()

                resources = []

                for table in tables:
                    table_name = table["table_name"]
                    description = table["table_comment"] or f"{table_name} 表"

                    if table["estimated_rows"]:
                        description += f" (~{table['estimated_rows']} 行)"

                    table_resource = TableResource(
                        db_name=db_name,
                        table_name=table_name,
                        description=description,
                    )

                    ResourceRegistry.register_instance(table_resource)

                    resources.append(
                        Resource(
                            uri=table_resource.uri,
                            name=table_resource.name,
                            mimeType=table_resource.mimeType,
                            description=table_resource.description,
                        )
                    )

                self.cache["table_descriptions"] = resources
                return resources

    async def read_resource(self, uri: AnyUrl) -> str:
        return json.dumps(
            {
                "name": self.name,
                "uri": str(self.uri),
                "description": self.description,
                "type": "database_root",
            },
            ensure_ascii=False,
        )

新增资源时,只需要实现对应子类,并在包初始化文件里导入:

from .db_resource import MySQLResource, TableResource

__all__ = [
    "MySQLResource",
    "TableResource",
]

工具和提示词也可以采用类似的注册表机制,把“新增能力”变成“新增一个类或一个模块”,避免所有逻辑堆在主服务入口里。

数据库 MCP 的安全控制

数据库 MCP Server 的最大风险来自模型生成的 SQL。模型可能因为理解错误生成错误表名,也可能把用户的危险指令转成高危 SQL。服务端必须把安全控制放在执行层,而不是只依赖提示词约束。

一条 SQL 执行前至少要经过这些检查:

flowchart TB
    SQL[模型生成 SQL] --> Parse[SQL 解析]
    Parse --> Type{语句类型}
    Type -->|非 SELECT| Reject[拒绝执行]
    Type -->|SELECT| TableCheck[表权限检查]
    TableCheck --> ColumnCheck[字段权限检查 / 脱敏]
    ColumnCheck --> LimitCheck[LIMIT / 超时 / 行数限制]
    LimitCheck --> Explain[可选 EXPLAIN 分析]
    Explain --> Execute[只读事务执行]
    Execute --> Audit[记录审计日志]
    Audit --> Return[返回结果]

关键安全策略如下:

风险控制方式
执行写操作只允许 DQL,拦截 UPDATEDELETEINSERTDROPALTER
SQL 注入表名白名单、字段白名单、参数化查询、SQL AST 解析
查询过大强制 LIMIT、超时控制、最大返回行数
敏感字段泄露字段级权限、手机号/身份证/地址脱敏
越权访问库表会话级配置、用户级数据源隔离
不可追溯记录用户、会话、SQL、工具名、执行结果和错误
模型误判高危操作二次确认,失败后返回明确错误

即使提示词中写了“只能生成 SELECT”,服务端也不能信任模型输出。提示词是软约束,鉴权和拦截才是硬约束。

数据库智能查询的实际链路

以“查询用户表里张三的数据”为例,较完整的链路如下:

sequenceDiagram
    participant U as 用户
    participant API as FastAPI / Host
    participant C as MCP Client
    participant M as 大语言模型
    participant S as MySQL MCP Server
    participant DB as MySQL

    U->>API: 查询用户表里张三的数据
    API->>C: 提交问题
    C->>S: prompts/get 获取数据库提示词
    S-->>C: 返回数据库名、表结构、约束
    C->>M: 用户问题 + 提示词
    M-->>C: 返回 SQL JSON
    C->>S: tools/call query_data(sql)
    S->>S: SQL 鉴权、表权限检查、只读事务
    S->>DB: 执行 SELECT
    DB-->>S: 返回结果集
    S->>S: 记录审计日志
    S-->>C: 返回结构化结果
    C-->>API: 合并 SQL、结果和分析
    API-->>U: 展示回答

当用户明确给出库表名时,模型生成 SQL 的准确率通常更高。如果用户只给出中文业务描述,例如“查一下告警信息”,就需要先根据表注释、字段注释或业务词典定位可能的表,再生成 SQL。这个过程可以通过工具编排完成:

1. 根据用户描述调用 get_table_name,查找候选表。
2. 调用 get_table_desc,获取表结构和字段说明。
3. 结合用户条件生成 SELECT SQL。
4. 调用 explain_sql,检查执行计划。
5. 调用 query_data,执行只读查询。
6. 返回 SQL、查询结果和必要的优化建议。

这种编排可以写在提示词里,也可以由客户端代码控制。写在提示词里实现简单,但依赖模型是否稳定遵循;写在客户端里更可控,但灵活性较低。生产环境通常会把关键安全步骤放在代码里,把业务理解和输出格式放在提示词里。

自建 Client 与 FastAPI 封装

如果不希望用户直接面对 Cline 或 Claude Desktop,可以自建 MCP Client,并在外层提供 FastAPI 接口。

flowchart LR
    Web[Web / 内部平台] --> FastAPI[FastAPI]
    FastAPI --> Client[自建 MCP Client]
    Client --> LLM[大语言模型 API]
    Client --> MCP[MCP Server]
    MCP --> DB[(MySQL)]
    MCP --> KB[知识库 / 文档系统]

接口示例:

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()
mcp_client = MCPClient()


class QueryRequest(BaseModel):
    question: str
    user_id: str
    database: str | None = None


@app.post("/db/query")
async def query_database(req: QueryRequest):
    result = await mcp_client.process_query(
        query=req.question,
        user_id=req.user_id,
        database=req.database,
    )

    return {
        "success": True,
        "data": result,
    }

这种封装方式有几个好处:

  • 可以统一接入公司内部登录态和权限系统;
  • 可以在 API 层记录用户、请求来源和访问库名;
  • 可以把多个 MCP Server 藏在服务端,由 Client 自动路由;
  • 可以把结果集转成前端更容易展示的结构;
  • 可以在接口层做限流、超时、熔断和审计。

适合与不适合使用 MCP 的场景

场景是否适合原因
AI 助手读取数据库元数据适合表结构、字段、索引都可以做成资源或工具
自然语言生成 SQL 并查询适合,但要强鉴权MCP 工具可封装只读查询和审计
IDE 插件调用本地脚本适合Stdio 简单直接
企业内部工具统一接入模型适合MCP Server 可以作为工具网关
高频低延迟核心交易链路不适合直接使用LLM 推理和工具调用不可控,延迟高
需要强确定性的批处理任务不适合作为主执行器模型输出存在不确定性,应由规则系统主导
允许模型直接改生产数据不适合风险过高,必须加入审批和权限系统

MCP 适合把模型接入“辅助决策、查询、诊断、分析、编排”类场景,不适合让模型无监督地执行不可逆操作。

AI4DB 与 DB4AI 的延伸

数据库和 AI 的结合可以分成两个方向。

AI4DB(AI for Database)关注用 AI 提升数据库运维和治理能力,例如:

  • 慢 SQL 分析;
  • 索引推荐;
  • 容量预测;
  • 锁等待诊断;
  • 异常 SQL 识别;
  • 数据库健康巡检;
  • 故障自愈建议。

DB4AI(Database for AI)关注数据库怎样支撑 AI 场景,例如:

  • 向量检索;
  • 多模态数据存储;
  • 文本、图片、结构化数据统一管理;
  • 训练数据治理;
  • 推理结果落库;
  • 强事务和分析查询并存。

MCP 在这两个方向中都可以作为连接层。在 AI4DB 中,它把数据库诊断工具暴露给模型;在 DB4AI 中,它把知识库、向量库和业务数据源标准化接入 AI 应用。

关键实践建议

MCP 用在数据库场景时,重点不在“让模型能调用 SQL 工具”,而在“让模型只能以安全、可审计、可控的方式使用数据库能力”。

几个原则值得固定下来:

  1. 工具描述要具体
    工具名称、参数说明、返回结构都要清晰。模型依赖这些信息判断是否调用工具。

  2. 提示词只做任务约束,不承担安全边界
    安全边界必须在服务端代码里实现,例如 SQL 解析、权限校验、只读事务和审计。

  3. 资源不要暴露过宽
    不要把整个数据库无差别暴露给模型。应按用户、角色、环境控制可见库表和字段。

  4. 高危操作默认拒绝
    生产环境中,模型生成的 UPDATEDELETEDROPALTER 应默认拦截。确实需要执行时,应进入独立审批流程。

  5. 保留完整调用链路
    日志至少记录用户、会话、工具、参数、SQL、耗时、返回行数、错误信息。出了问题才能追溯。

  6. 客户端和服务端职责要分清
    Client 负责任务编排、模型交互和结果组织;Server 负责真实能力执行、权限控制和资源管理。

  7. 优先从只读查询和诊断类场景落地
    表结构问答、SQL 解释、慢查询分析、索引建议是低风险高价值入口。写操作、自动变更、自动修复要谨慎推进。

MCP 的价值不只是多了一种调用工具的方式,而是给大语言模型和外部系统之间建立了一条标准化、可扩展、可治理的通道。数据库场景天然具备大量结构化数据和运维工具,只要把权限、安全和审计设计好,MCP 可以成为数据库智能助手、SQL 分析助手和运维诊断助手的基础协议层。


评论