MCP 异步任务
任务(Tasks)是 MCP 2025-11-25 版本引入的实验性功能,用于处理长时间运行的操作。任务允许客户端发起请求后立即返回,通过轮询或通知机制获取最终结果。
实验性功能
任务是 MCP 的实验性功能,其设计和行为可能会在未来的协议版本中发生变化。在生产环境使用前,请仔细评估风险。
为什么需要任务?
传统的 MCP 请求-响应模式适合快速操作,但对于以下场景存在局限:
- 长时间计算:数据分析、模型训练等耗时操作
- 批量处理:处理大量数据或文件
- 外部作业集成:与 AWS Lambda、Celery 等外部作业系统集成
- 用户交互等待:需要用户输入才能继续的操作
任务提供了"立即调用,延迟获取结果"的能力,解决了这些问题。
核心概念
请求者与接收者
任务涉及两个角色:
- 请求者(Requestor):发起任务增强请求的一方,可以是客户端或服务器
- 接收者(Receiver):接收并执行任务的一方,可以是客户端或服务器
根据场景不同,客户端和服务器都可能成为请求者或接收者。
任务状态
任务有五种状态:
| 状态 | 说明 |
|---|---|
working | 任务正在执行中 |
input_required | 需要请求者提供额外输入 |
completed | 任务成功完成,结果可用 |
failed | 任务执行失败 |
cancelled | 任务被取消 |
状态转换遵循严格规则:
能力声明
支持任务的服务器和客户端必须在初始化时声明 tasks 能力。
服务器能力
from mcp.types import ServerCapabilities
server_capabilities = ServerCapabilities(
tasks={
"list": {}, # 支持 tasks/list 操作
"cancel": {}, # 支持 tasks/cancel 操作
"requests": {
"tools": {
"call": {} # 支持工具调用的任务增强
}
}
}
)
客户端能力
client_capabilities = {
"tasks": {
"list": {},
"cancel": {},
"requests": {
"sampling": {
"createMessage": {}
},
"elicitation": {
"create": {}
}
}
}
}
工具级别协商
工具可以声明是否支持任务执行:
from mcp.types import Tool
Tool(
name="analyze_large_dataset",
description="分析大型数据集",
inputSchema={...},
execution={
"taskSupport": "required" # 必须、可选或禁止
}
)
taskSupport 的取值:
"required":必须作为任务调用"optional":可以普通调用或任务调用"forbidden":禁止作为任务调用(默认值)
协议消息
创建任务
在请求中包含 task 参数来创建任务:
{
"jsonrpc": "2.0",
"id": "task-1",
"method": "tools/call",
"params": {
"name": "analyze_dataset",
"arguments": {
"dataset_id": "ds-123",
"analysis_type": "clustering"
},
"task": {
"ttl": 3600000
}
}
}
服务器返回 CreateTaskResult:
{
"jsonrpc": "2.0",
"id": "task-1",
"result": {
"task": {
"taskId": "task-abc123",
"status": "working",
"createdAt": "2025-11-25T10:00:00Z",
"lastUpdatedAt": "2025-11-25T10:00:00Z",
"ttl": 3600000
},
"_meta": {
"io.modelcontextprotocol/model-immediate-response": "正在分析数据集,预计需要 5 分钟..."
}
}
}
查询任务状态
通过 tasks/get 轮询任务状态:
{
"jsonrpc": "2.0",
"id": "get-1",
"method": "tasks/get",
"params": {
"taskId": "task-abc123"
}
}
响应:
{
"jsonrpc": "2.0",
"id": "get-1",
"result": {
"task": {
"taskId": "task-abc123",
"status": "working",
"statusMessage": "已处理 45% 的数据",
"createdAt": "2025-11-25T10:00:00Z",
"lastUpdatedAt": "2025-11-25T10:02:30Z",
"ttl": 3600000,
"pollInterval": 5000
}
}
}
获取任务结果
任务完成后,通过 tasks/result 获取结果:
{
"jsonrpc": "2.0",
"id": "result-1",
"method": "tasks/result",
"params": {
"taskId": "task-abc123"
}
}
响应:
{
"jsonrpc": "2.0",
"id": "result-1",
"result": {
"content": [
{
"type": "text",
"text": "数据分析完成。发现 3 个聚类,详细信息见附件报告。"
}
]
},
"_meta": {
"io.modelcontextprotocol/related-task": {
"taskId": "task-abc123"
}
}
}
列出任务
获取当前所有任务:
{
"jsonrpc": "2.0",
"id": "list-1",
"method": "tasks/list",
"params": {
"cursor": null
}
}
响应:
{
"jsonrpc": "2.0",
"id": "list-1",
"result": {
"tasks": [
{
"taskId": "task-abc123",
"status": "completed",
"createdAt": "2025-11-25T10:00:00Z",
"lastUpdatedAt": "2025-11-25T10:05:00Z"
},
{
"taskId": "task-def456",
"status": "working",
"createdAt": "2025-11-25T10:10:00Z",
"lastUpdatedAt": "2025-11-25T10:12:00Z"
}
],
"nextCursor": "eyJvZmZzZXQiOjIwfQ=="
}
}
取消任务
取消正在执行的任务:
{
"jsonrpc": "2.0",
"id": "cancel-1",
"method": "tasks/cancel",
"params": {
"taskId": "task-abc123"
}
}
响应:
{
"jsonrpc": "2.0",
"id": "cancel-1",
"result": {}
}
任务状态通知
接收者可以主动发送状态变更通知:
{
"jsonrpc": "2.0",
"method": "notifications/tasks/status",
"params": {
"task": {
"taskId": "task-abc123",
"status": "completed",
"statusMessage": "分析完成",
"createdAt": "2025-11-25T10:00:00Z",
"lastUpdatedAt": "2025-11-25T10:05:00Z"
}
}
}
Python 实现
基础任务服务器
import asyncio
from datetime import datetime
from mcp.server import Server
from mcp.types import Tool, TextContent, Task, TaskStatus
app = Server("task-server")
# 任务存储
tasks: dict[str, Task] = {}
task_results: dict[str, any] = {}
@app.list_tools()
async def list_tools():
return [
Tool(
name="long_analysis",
description="执行长时间数据分析",
inputSchema={
"type": "object",
"properties": {
"data_size": {"type": "integer", "description": "数据量"}
},
"required": ["data_size"]
},
execution={"taskSupport": "optional"}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict | None, task: dict | None = None):
if name == "long_analysis":
# 检查是否为任务调用
if task:
return await handle_as_task(arguments, task)
else:
return await handle_normal(arguments)
async def handle_as_task(arguments: dict, task_params: dict):
"""作为任务处理"""
data_size = arguments["data_size"]
# 创建任务
task_id = generate_task_id()
now = datetime.utcnow().isoformat() + "Z"
task = Task(
taskId=task_id,
status=TaskStatus.WORKING,
createdAt=now,
lastUpdatedAt=now,
ttl=task_params.get("ttl", 3600000)
)
tasks[task_id] = task
# 启动后台任务
asyncio.create_task(run_analysis(task_id, data_size))
# 返回任务创建结果
return {
"task": task.model_dump(),
"_meta": {
"io.modelcontextprotocol/model-immediate-response":
f"开始分析 {data_size} 条数据,请稍候..."
}
}
async def run_analysis(task_id: str, data_size: int):
"""后台执行分析任务"""
try:
# 模拟长时间处理
for i in range(10):
await asyncio.sleep(2)
# 更新状态
tasks[task_id].statusMessage = f"已处理 {i+1}0% 的数据"
tasks[task_id].lastUpdatedAt = datetime.utcnow().isoformat() + "Z"
# 完成任务
tasks[task_id].status = TaskStatus.COMPLETED
tasks[task_id].lastUpdatedAt = datetime.utcnow().isoformat() + "Z"
# 存储结果
task_results[task_id] = [
TextContent(
type="text",
text=f"分析完成!处理了 {data_size} 条数据,发现 5 个模式。"
)
]
except Exception as e:
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].statusMessage = str(e)
tasks[task_id].lastUpdatedAt = datetime.utcnow().isoformat() + "Z"
任务管理端点
from mcp.types import McpError
@app.get_task()
async def get_task(task_id: str) -> dict:
"""获取任务状态"""
if task_id not in tasks:
raise McpError(code=-32602, message=f"Task not found: {task_id}")
task = tasks[task_id]
return {
"task": task.model_dump()
}
@app.get_task_result()
async def get_task_result(task_id: str) -> dict:
"""获取任务结果"""
if task_id not in tasks:
raise McpError(code=-32602, message=f"Task not found: {task_id}")
task = tasks[task_id]
# 等待任务完成
while task.status not in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
await asyncio.sleep(1)
if task.status == TaskStatus.FAILED:
raise McpError(code=-32603, message=task.statusMessage or "Task failed")
if task.status == TaskStatus.CANCELLED:
raise McpError(code=-32603, message="Task was cancelled")
return {
"content": task_results.get(task_id, []),
"_meta": {
"io.modelcontextprotocol/related-task": {
"taskId": task_id
}
}
}
@app.list_tasks()
async def list_tasks(cursor: str | None = None) -> dict:
"""列出所有任务"""
task_list = list(tasks.values())
return {
"tasks": [t.model_dump() for t in task_list[:20]],
"nextCursor": None if len(task_list) <= 20 else "more"
}
@app.cancel_task()
async def cancel_task(task_id: str) -> dict:
"""取消任务"""
if task_id not in tasks:
raise McpError(code=-32602, message=f"Task not found: {task_id}")
task = tasks[task_id]
if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
raise McpError(code=-32602, message="Cannot cancel task in terminal state")
task.status = TaskStatus.CANCELLED
task.lastUpdatedAt = datetime.utcnow().isoformat() + "Z"
return {}
客户端使用
发起任务调用
from mcp import ClientSession
async def call_tool_as_task(client: ClientSession):
# 调用工具并创建任务
result = await client.call_tool(
name="long_analysis",
arguments={"data_size": 10000},
task={"ttl": 3600000} # 1小时 TTL
)
task_id = result.task.taskId
print(f"任务已创建: {task_id}")
# 轮询任务状态
while True:
status = await client.get_task(task_id)
task = status.task
print(f"状态: {task.status}, 消息: {task.statusMessage}")
if task.status in ["completed", "failed", "cancelled"]:
break
# 使用建议的轮询间隔
poll_interval = task.pollInterval or 5000
await asyncio.sleep(poll_interval / 1000)
# 获取结果
if task.status == "completed":
result = await client.get_task_result(task_id)
print(f"结果: {result.content}")
使用通知
async def call_with_notifications(client: ClientSession):
# 设置状态变更回调
async def on_task_status(notification):
task = notification.params["task"]
print(f"任务 {task['taskId']} 状态变更: {task['status']}")
client.on_notification("notifications/tasks/status", on_task_status)
# 创建任务
result = await client.call_tool(
name="long_analysis",
arguments={"data_size": 10000},
task={}
)
task_id = result.task.taskId
# 等待结果(会被通知唤醒)
result = await client.get_task_result(task_id)
return result
处理 input_required 状态
当任务需要额外输入时,会进入 input_required 状态:
async def handle_input_required(client: ClientSession, task_id: str):
"""处理需要输入的任务"""
while True:
status = await client.get_task(task_id)
if status.task.status == "input_required":
# 任务需要输入,调用 tasks/result 获取输入请求
input_request = await client.get_task_result(task_id)
# input_request 包含服务器需要的输入信息
# 例如:需要用户确认敏感操作
user_input = await get_user_input(input_request)
# 提供输入后,任务会继续执行
# 注意:提供输入的方式取决于具体实现
elif status.task.status in ["completed", "failed", "cancelled"]:
break
await asyncio.sleep(1)
最佳实践
服务器端
- 设置合理的 TTL:根据任务预计执行时间设置 TTL
- 提供即时响应:在
_meta中提供用户友好的即时消息 - 更新状态消息:定期更新
statusMessage让用户了解进度 - 建议轮询间隔:设置合理的
pollInterval避免过度轮询 - 清理过期任务:定期清理 TTL 已过期的任务
客户端
- 尊重轮询间隔:使用服务器建议的
pollInterval - 处理所有状态:正确处理所有可能的任务状态
- 超时控制:设置总超时时间,避免无限等待
- 优雅取消:在不需要结果时主动取消任务
- 错误处理:正确处理任务失败和取消的情况
安全考虑
任务隔离
任务 ID 是访问任务状态和结果的主要机制。必须确保:
import secrets
def generate_task_id() -> str:
"""生成安全的任务 ID"""
# 使用加密安全的随机 ID
return secrets.token_urlsafe(32)
def validate_task_access(user_id: str, task_id: str) -> bool:
"""验证用户是否有权访问任务"""
task = tasks.get(task_id)
if not task:
return False
# 检查任务是否属于该用户
return task.owner_id == user_id
资源管理
MAX_CONCURRENT_TASKS = 10
MAX_TTL = 86400000 # 24小时
@app.call_tool()
async def call_tool(name, arguments, task=None):
# 检查并发任务数
user_tasks = get_user_tasks(current_user)
if len(user_tasks) >= MAX_CONCURRENT_TASKS:
raise McpError(
code=-32603,
message=f"Maximum concurrent tasks ({MAX_CONCURRENT_TASKS}) reached"
)
# 限制 TTL
if task and "ttl" in task:
task["ttl"] = min(task["ttl"], MAX_TTL)
# ...
与外部作业系统集成
任务 API 设计为可以与各种外部作业系统集成:
AWS Lambda 示例
import boto3
lambda_client = boto3.client("lambda")
async def start_lambda_task(function_name: str, payload: dict) -> str:
"""启动 Lambda 函数作为任务"""
response = await lambda_client.invoke_async(
FunctionName=function_name,
InvokeArgs=json.dumps(payload)
)
# 使用 Lambda 请求 ID 作为任务 ID
return response["ResponseMetadata"]["RequestId"]
Celery 示例
from celery import Celery
celery_app = Celery("tasks", broker="redis://localhost:6379")
@celery_app.task
def long_analysis_task(data_size: int):
"""Celery 任务"""
# 执行分析...
return {"result": "analysis complete"}
async def start_celery_task(arguments: dict) -> str:
"""启动 Celery 任务"""
result = long_analysis_task.delay(arguments["data_size"])
return result.id # 使用 Celery 任务 ID