feat(mcp): implement mcp server for kag (#594)

* fix ProjectCreateRequest param userNo name

* fix kag solver pipeline name

* add kag mcp server

* add readme for kag mcp server example

* update baike readme links

* format code and readme
This commit is contained in:
xionghuaidong 2025-06-19 16:27:13 +08:00 committed by GitHub
parent fc98ad136e
commit 4f078f2545
28 changed files with 355 additions and 17 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

View File

@ -12,6 +12,12 @@
from kag.bin.commands.info import ListRegisterInfo
from kag.bin.commands.builder import BuilderJobSubmit
from kag.bin.commands.benchmark import KAGBenchmark, RunBenchmark
from kag.bin.commands.mcp_server import RunKagMcpServer
__all__ = ["ListRegisterInfo", "BuilderJobSubmit", "KAGBenchmark", "RunBenchmark"]
__all__ = [
"ListRegisterInfo",
"BuilderJobSubmit",
"KAGBenchmark",
"RunBenchmark",
"RunKagMcpServer",
]

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import argparse
from kag.bin.base import Command
@Command.register("run_kag_mcp_server")
class RunKagMcpServer(Command):
def add_to_parser(self, subparsers: argparse._SubParsersAction): # noqa
from kag.mcp.server.kag_mcp_server import KagMcpServer
parser = subparsers.add_parser("mcp-server", help="Run KAG MCP server")
KagMcpServer.add_options(parser)
parser.set_defaults(func=self.get_handler())
@staticmethod
def handler(args: argparse.Namespace):
from kag.mcp.server.kag_mcp_server import KagMcpServer
KagMcpServer.run(args)

View File

@ -21,7 +21,12 @@ def build_parser():
help="subcommands supported by kag",
)
# add registered commands to parser
cmds = ["register_info", "submit_builder_job", "run_benchmark"]
cmds = [
"register_info",
"submit_builder_job",
"run_benchmark",
"run_kag_mcp_server",
]
add_commands(subparsers, cmds)
return parser

View File

@ -53,6 +53,8 @@ Execute [eval.py](./solver/eval.py) in the [solver](./solver) directory to ask d
cd solver && python eval.py && cd ..
```
We have implemented an MCP server in KAG, allowing the knowledge base built by KAG to be exposed via the MCP server for integration with agents that support the MCP protocol. Please refer to [KAG MCP Server Example: BaiKe](./mcp_server.md).
### Step 7: (Optional) Cleanup
To delete the checkpoints, execute the following command.

View File

@ -53,6 +53,8 @@ cd builder && python indexer.py && cd ..
cd solver && python eval.py && cd ..
```
我们在 KAG 中实现了 MCP server可以将 KAG 构建的知识库通过 MCP server 的形式暴露出来,供支持 MCP 协议的 Agent 集成。参考 [KAG MCP Server 示例百科问答BaiKe](./mcp_server_cn.md)。
### Step 7可选清理
若要删除 checkpoint可执行以下命令。

View File

@ -154,7 +154,7 @@ py_code_based_math_executor: &py_code_based_math_executor_conf
type: py_code_based_math_executor
llm: *openie_llm
solver_pipeline:
kag_solver_pipeline:
type: kag_static_pipeline
planner:
type: kag_static_planner

View File

@ -0,0 +1,61 @@
# KAG MCP Server Example: BaiKe
[English](./mcp_server.md) |
[简体中文](./mcp_server_cn.md)
We have implemented an MCP server in KAG, allowing the knowledge base built by KAG to be exposed via the MCP server for integration with agents that support the MCP protocol.
## 1. Precondition
Please refer to [KAG Example: BaiKe](./README.md) to build the knowledge base and ensure the solver can produce answers successfully.
## 2. Launch the MCP server
Navigate to the directory containing the KAG project's configuration file [kag_config.yaml](./kag_config.yaml), then run the following command to launch the MCP server.
```bash
kag mcp-server
```
![Launch KAG MCP server](/_static/images/examples/baike/kag-launch-mcp-server.png)
By default, the KAG MCP server starts on port 3000 using ServerSent Events (SSE). You can use the ``--help`` option to view all the supported mcp-server options.
```bash
kag mcp-server --help
```
## 3. Configure Cursor to connect to the KAG MCP server
In Cursor, use the following configuration to connect to the KAG MCP server:
```json
{
"mcpServers": {
"kag": {
"url": "http://127.0.0.1:3000/sse"
}
}
}
```
Restart Cursor and verify that the connection to the KAG MCP server is successful.
![Configure KAG MCP server in Cursor](/_static/images/examples/baike/kag-configure-in-cursor.png)
## 4. Test the KAG MCP Server in a Cursor Chat Session
Create a new chat session in Cursor and ask questions related to the knowledge base, for example:
```text
查询知识库回答:周星驰的姓名有何含义?
```
When you see the prompt ``Calling qa_pipeline``, click ``Run tool`` to execute the tool.
![Approve KAG MCP server call](/_static/images/examples/baike/kag-mcp-server-call-approve.png)
Below is a screenshot showing a successful invocation of the KAG MCP server.
![KAG MCP server call succeeded](/_static/images/examples/baike/kag-mcp-server-call-succeed.png)

View File

@ -0,0 +1,61 @@
# KAG MCP Server 示例百科问答BaiKe
[English](./mcp_server.md) |
[简体中文](./mcp_server_cn.md)
我们在 KAG 中实现了 MCP server可以将 KAG 构建的知识库通过 MCP server 的形式暴露出来,供支持 MCP 协议的 Agent 集成。
## 1. 前置条件
参考 [KAG 示例百科问答BaiKe](./README_cn.md) 构建知识库并确保 solver 可正常产出问题答案。
## 2. 运行 MCP server
进入到 KAG 项目的配置文件 [kag_config.yaml](./kag_config.yaml) 在所在目录,执行以下命令启动 MCP server。
```bash
kag mcp-server
```
![Launch KAG MCP server](/_static/images/examples/baike/kag-launch-mcp-server.png)
KAG MCP server 默认以 sse 方式在端口 3000 启动服务。可使用 ``--help`` 选项查看 mcp-server 支持的选项。
```bash
kag mcp-server --help
```
## 3. 配置 Cursor 连接 KAG MCP server
在 Cursor 中使用以下配置连接 KAG MCP server。
```json
{
"mcpServers": {
"kag": {
"url": "http://127.0.0.1:3000/sse"
}
}
}
```
重启 Cursor确认 KAG MCP server 连接成功。
![Configure KAG MCP server in Cursor](/_static/images/examples/baike/kag-configure-in-cursor.png)
## 4. 测试在 Cursor 聊天会话中使用 KAG MCP server
新建 Cursor 聊天会话,询问知识库相关的问题,例如:
```text
查询知识库回答:周星驰的姓名有何含义?
```
当提示 ``Calling qa_pipeline`` 时,点 ``Run tool`` 运行工具。
![Approve KAG MCP server call](/_static/images/examples/baike/kag-mcp-server-call-approve.png)
以下是成功调用 KAG MCP server 的截图。
![KAG MCP server call succeeded](/_static/images/examples/baike/kag-mcp-server-call-succeed.png)

View File

@ -5,7 +5,7 @@ from kag.open_benchmark.utils.eval_qa import EvalQa
async def qa(query):
qa_obj = EvalQa(task_name="baike", solver_pipeline_name="solver_pipeline")
qa_obj = EvalQa(task_name="baike", solver_pipeline_name="kag_solver_pipeline")
answer, trace = await qa_obj.qa(query=query, gold="")
print(f"{query} is {answer}")

View File

@ -157,7 +157,7 @@ py_code_based_math_executor: &py_code_based_math_executor_conf
type: py_code_based_math_executor
llm: *chat_llm
solver_pipeline:
kag_solver_pipeline:
type: kag_static_pipeline
planner:
type: lf_kag_static_planner

View File

@ -10,7 +10,7 @@ logger = logging.getLogger(__name__)
class CsQaEvaluator(EvalQa):
def __init__(self, solver_pipeline_name="solver_pipeline"):
def __init__(self, solver_pipeline_name="kag_solver_pipeline"):
self.task_name = "csqa"
super().__init__(self.task_name, solver_pipeline_name)
self.solver_pipeline_name = solver_pipeline_name

View File

@ -178,7 +178,7 @@ py_code_based_math_executor: &py_code_based_math_executor_conf
type: py_code_based_math_executor
llm: *chat_llm
solver_pipeline:
kag_solver_pipeline:
type: kag_static_pipeline
planner:
type: lf_kag_static_planner

View File

@ -9,7 +9,7 @@ logger = logging.getLogger()
async def qa(query):
reporter: TraceLogReporter = TraceLogReporter()
resp = SolverPipelineABC.from_config(KAG_CONFIG.all_config["solver_pipeline"])
resp = SolverPipelineABC.from_config(KAG_CONFIG.all_config["kag_solver_pipeline"])
answer = await resp.ainvoke(query, reporter=reporter)
logger.info(f"\n\nso the answer for '{query}' is: {answer}\n\n")

View File

@ -154,7 +154,7 @@ py_code_based_math_executor: &py_code_based_math_executor_conf
type: py_code_based_math_executor
llm: *chat_llm
solver_pipeline:
kag_solver_pipeline:
type: kag_static_pipeline
planner:
type: lf_kag_static_planner

View File

@ -184,7 +184,7 @@ py_code_based_math_executor: &py_code_based_math_executor_conf
type: py_code_based_math_executor
llm: *chat_llm
solver_pipeline:
kag_solver_pipeline:
type: kag_static_pipeline
planner:
type: lf_kag_static_planner

View File

@ -15,7 +15,9 @@ class MedicineDemo:
async def qa(self, query):
reporter: TraceLogReporter = TraceLogReporter()
resp = SolverPipelineABC.from_config(KAG_CONFIG.all_config["solver_pipeline"])
resp = SolverPipelineABC.from_config(
KAG_CONFIG.all_config["kag_solver_pipeline"]
)
answer = await resp.ainvoke(query, reporter=reporter)
logger.info(f"\n\nso the answer for '{query}' is: {answer}\n\n")

View File

@ -127,7 +127,7 @@ py_code_based_math_executor: &py_code_based_math_executor_conf
type: py_code_based_math_executor
llm: *chat_llm
solver_pipeline:
kag_solver_pipeline:
type: kag_static_pipeline
planner:
type: lf_kag_static_planner

View File

@ -17,7 +17,9 @@ class EvaQA:
async def qa(self, query):
reporter: TraceLogReporter = TraceLogReporter()
resp = SolverPipelineABC.from_config(KAG_CONFIG.all_config["solver_pipeline"])
resp = SolverPipelineABC.from_config(
KAG_CONFIG.all_config["kag_solver_pipeline"]
)
answer = await resp.ainvoke(query, reporter=reporter)
logger.info(f"\n\nso the answer for '{query}' is: {answer}\n\n")

View File

@ -126,7 +126,7 @@ py_code_based_math_executor: &py_code_based_math_executor_conf
type: py_code_based_math_executor
llm: *chat_llm
solver_pipeline:
kag_solver_pipeline:
type: kag_static_pipeline
planner:
type: lf_kag_static_planner

View File

@ -16,7 +16,9 @@ class SupplyChainDemo:
async def qa(self, query):
reporter: TraceLogReporter = TraceLogReporter()
resp = SolverPipelineABC.from_config(KAG_CONFIG.all_config["solver_pipeline"])
resp = SolverPipelineABC.from_config(
KAG_CONFIG.all_config["kag_solver_pipeline"]
)
answer = await resp.ainvoke(query, reporter=reporter)
logger.info(f"\n\nso the answer for '{query}' is: {answer}\n\n")

0
kag/mcp/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,165 @@
# -*- coding: utf-8 -*-
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import argparse
import json
from typing import List
class KagMcpServer(object):
_supported_tools = "qa-pipeline", "retrieve"
_default_server_name = "kag"
_default_sse_port = 3000
@classmethod
def add_options(cls, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"-t",
"--transport",
help="specify MCP server transport; default to sse",
type=str,
default="sse",
choices=("sse", "stdio"),
)
parser.add_argument(
"-p",
"--port",
help="specify sse server port; default to %d" % cls._default_sse_port,
type=int,
default=cls._default_sse_port,
)
all_supported_tools = ",".join(cls._supported_tools)
parser.add_argument(
"--enabled-tools",
help="specify enabled tools, a comma separated list; "
"default to qa-pipeline; "
"use 'all' for all the supported tools: %s" % all_supported_tools,
type=str,
default="qa-pipeline",
)
@classmethod
def run(cls, args: argparse.Namespace) -> None:
transport = args.transport
port = args.port
enabled_tools = args.enabled_tools
server = cls(transport=transport, port=port, enabled_tools=enabled_tools)
server.serve()
def __init__(self, transport: str, port: int, enabled_tools: str) -> None:
self._transport = transport
self._port = port
self._enabled_tools = tuple(self._get_enabled_tools(enabled_tools))
self._check_mcp_package()
self._create_mcp_server()
@classmethod
def _get_enabled_tools(cls, spec: str) -> List[str]:
if spec == "all":
return list(cls._supported_tools)
tools = []
names = spec.split(",")
for name in names:
if name in cls._supported_tools:
tools.append(name)
else:
message = "unknown tool %s" % name
raise RuntimeError(message)
return tools
@classmethod
def _check_mcp_package(cls):
import importlib.util
if importlib.util.find_spec("mcp") is None:
message = "Please install 'mcp' to use KAG MCP server: `python -m pip install mcp`"
raise ModuleNotFoundError(message)
def _create_mcp_server(self) -> None:
from mcp.server.fastmcp import FastMCP # noqa
if self._transport == "sse":
mcp_server = FastMCP(self._default_server_name, port=self._port)
else:
mcp_server = FastMCP(self._default_server_name)
self._mcp_server = mcp_server
self._add_mcp_tools()
def _add_mcp_tools(self) -> None:
for name in self._enabled_tools:
if name == "qa-pipeline":
self._add_qa_pipeline_tool()
elif name == "retrieve":
self._add_retrieve_tool()
else:
assert False
def _add_qa_pipeline_tool(self) -> None:
async def qa_pipeline(query: str) -> str:
"""
Query the knowledge-base with `query`.
Args:
query: question to ask
"""
from kag.open_benchmark.utils.eval_qa import EvalQa
qa_obj = EvalQa(task_name="qa", solver_pipeline_name="kag_solver_pipeline")
answer, trace = await qa_obj.qa(query=query, gold="")
return answer
self._mcp_server.add_tool(qa_pipeline)
def _add_retrieve_tool(self) -> None:
async def retrieve(query: str) -> str:
"""
Query the knowledge-base with `query` to retrieve SPO-triples and document chunks.
Args:
query: query to execute
"""
from kag.common.conf import KAG_CONFIG
from kag.interface import ExecutorABC
from kag.interface import Task
from kag.interface import Context
executor = ExecutorABC.from_config(
KAG_CONFIG.all_config["kag_hybrid_executor"]
)
executor_schema = executor.schema()
executor_name = executor_schema["name"]
executor_arguments = {
"query": query,
}
task = Task(executor=executor_name, arguments=executor_arguments)
context = Context()
await executor.ainvoke(query=query, task=task, context=context)
data = {
"summary": task.result.summary,
"references": task.result.to_reference_list(),
}
result = json.dumps(
data, separators=(",", ": "), indent=4, ensure_ascii=False
)
return result
self._mcp_server.add_tool(retrieve)
def serve(self) -> None:
if self._transport == "sse":
self._mcp_server.run(transport="sse")
elif self._transport == "stdio":
self._mcp_server.run(transport="stdio")
else:
assert False

View File

@ -68,7 +68,7 @@ class ProjectClient(Client):
desc = kwargs.get("desc", None)
userNo = kwargs.get("userNo", "openspg")
project_create_request = rest.ProjectCreateRequest(
name=name, desc=desc, namespace=namespace, config=config, auto_schema=auto_schema, visibility=visibility,tag=tag, user_no=userNo
name=name, desc=desc, namespace=namespace, config=config, auto_schema=auto_schema, visibility=visibility,tag=tag, userNo=userNo
)
project = self._rest_client.project_create_post(