跳到主要内容

MCP 异步任务

任务(Tasks)是 MCP 2025-11-25 版本引入的实验性功能,用于处理长时间运行的操作。任务允许客户端发起请求后立即返回,通过轮询或通知机制获取最终结果。

实验性功能

任务是 MCP 的实验性功能,其设计和行为可能会在未来的协议版本中发生变化。在生产环境使用前,请仔细评估风险。

为什么需要任务?

传统的 MCP 请求-响应模式适合快速操作,但对于以下场景存在局限:

  • 长时间计算:数据分析、模型训练等耗时操作
  • 批量处理:处理大量数据或文件
  • 外部作业集成:与 AWS Lambda、Celery 等外部作业系统集成
  • 用户交互等待:需要用户输入才能继续的操作

任务提供了"立即调用,延迟获取结果"的能力,解决了这些问题。

核心概念

请求者与接收者

任务涉及两个角色:

  • 请求者(Requestor):发起任务增强请求的一方,可以是客户端或服务器
  • 接收者(Receiver):接收并执行任务的一方,可以是客户端或服务器

根据场景不同,客户端和服务器都可能成为请求者或接收者。

任务状态

任务有五种状态:

状态说明
working任务正在执行中
input_required需要请求者提供额外输入
completed任务成功完成,结果可用
failed任务执行失败
cancelled任务被取消

状态转换遵循严格规则:

能力声明

支持任务的服务器和客户端必须在初始化时声明 tasks 能力。

服务器能力

from mcp.types import ServerCapabilities

server_capabilities = ServerCapabilities(
tasks={
"list": {}, # 支持 tasks/list 操作
"cancel": {}, # 支持 tasks/cancel 操作
"requests": {
"tools": {
"call": {} # 支持工具调用的任务增强
}
}
}
)

客户端能力

client_capabilities = {
"tasks": {
"list": {},
"cancel": {},
"requests": {
"sampling": {
"createMessage": {}
},
"elicitation": {
"create": {}
}
}
}
}

工具级别协商

工具可以声明是否支持任务执行:

from mcp.types import Tool

Tool(
name="analyze_large_dataset",
description="分析大型数据集",
inputSchema={...},
execution={
"taskSupport": "required" # 必须、可选或禁止
}
)

taskSupport 的取值:

  • "required":必须作为任务调用
  • "optional":可以普通调用或任务调用
  • "forbidden":禁止作为任务调用(默认值)

协议消息

创建任务

在请求中包含 task 参数来创建任务:

{
"jsonrpc": "2.0",
"id": "task-1",
"method": "tools/call",
"params": {
"name": "analyze_dataset",
"arguments": {
"dataset_id": "ds-123",
"analysis_type": "clustering"
},
"task": {
"ttl": 3600000
}
}
}

服务器返回 CreateTaskResult

{
"jsonrpc": "2.0",
"id": "task-1",
"result": {
"task": {
"taskId": "task-abc123",
"status": "working",
"createdAt": "2025-11-25T10:00:00Z",
"lastUpdatedAt": "2025-11-25T10:00:00Z",
"ttl": 3600000
},
"_meta": {
"io.modelcontextprotocol/model-immediate-response": "正在分析数据集,预计需要 5 分钟..."
}
}
}

查询任务状态

通过 tasks/get 轮询任务状态:

{
"jsonrpc": "2.0",
"id": "get-1",
"method": "tasks/get",
"params": {
"taskId": "task-abc123"
}
}

响应:

{
"jsonrpc": "2.0",
"id": "get-1",
"result": {
"task": {
"taskId": "task-abc123",
"status": "working",
"statusMessage": "已处理 45% 的数据",
"createdAt": "2025-11-25T10:00:00Z",
"lastUpdatedAt": "2025-11-25T10:02:30Z",
"ttl": 3600000,
"pollInterval": 5000
}
}
}

获取任务结果

任务完成后,通过 tasks/result 获取结果:

{
"jsonrpc": "2.0",
"id": "result-1",
"method": "tasks/result",
"params": {
"taskId": "task-abc123"
}
}

响应:

{
"jsonrpc": "2.0",
"id": "result-1",
"result": {
"content": [
{
"type": "text",
"text": "数据分析完成。发现 3 个聚类,详细信息见附件报告。"
}
]
},
"_meta": {
"io.modelcontextprotocol/related-task": {
"taskId": "task-abc123"
}
}
}

列出任务

获取当前所有任务:

{
"jsonrpc": "2.0",
"id": "list-1",
"method": "tasks/list",
"params": {
"cursor": null
}
}

响应:

{
"jsonrpc": "2.0",
"id": "list-1",
"result": {
"tasks": [
{
"taskId": "task-abc123",
"status": "completed",
"createdAt": "2025-11-25T10:00:00Z",
"lastUpdatedAt": "2025-11-25T10:05:00Z"
},
{
"taskId": "task-def456",
"status": "working",
"createdAt": "2025-11-25T10:10:00Z",
"lastUpdatedAt": "2025-11-25T10:12:00Z"
}
],
"nextCursor": "eyJvZmZzZXQiOjIwfQ=="
}
}

取消任务

取消正在执行的任务:

{
"jsonrpc": "2.0",
"id": "cancel-1",
"method": "tasks/cancel",
"params": {
"taskId": "task-abc123"
}
}

响应:

{
"jsonrpc": "2.0",
"id": "cancel-1",
"result": {}
}

任务状态通知

接收者可以主动发送状态变更通知:

{
"jsonrpc": "2.0",
"method": "notifications/tasks/status",
"params": {
"task": {
"taskId": "task-abc123",
"status": "completed",
"statusMessage": "分析完成",
"createdAt": "2025-11-25T10:00:00Z",
"lastUpdatedAt": "2025-11-25T10:05:00Z"
}
}
}

Python 实现

基础任务服务器

import asyncio
from datetime import datetime
from mcp.server import Server
from mcp.types import Tool, TextContent, Task, TaskStatus

app = Server("task-server")

# 任务存储
tasks: dict[str, Task] = {}
task_results: dict[str, any] = {}

@app.list_tools()
async def list_tools():
return [
Tool(
name="long_analysis",
description="执行长时间数据分析",
inputSchema={
"type": "object",
"properties": {
"data_size": {"type": "integer", "description": "数据量"}
},
"required": ["data_size"]
},
execution={"taskSupport": "optional"}
)
]

@app.call_tool()
async def call_tool(name: str, arguments: dict | None, task: dict | None = None):
if name == "long_analysis":
# 检查是否为任务调用
if task:
return await handle_as_task(arguments, task)
else:
return await handle_normal(arguments)

async def handle_as_task(arguments: dict, task_params: dict):
"""作为任务处理"""
data_size = arguments["data_size"]

# 创建任务
task_id = generate_task_id()
now = datetime.utcnow().isoformat() + "Z"

task = Task(
taskId=task_id,
status=TaskStatus.WORKING,
createdAt=now,
lastUpdatedAt=now,
ttl=task_params.get("ttl", 3600000)
)

tasks[task_id] = task

# 启动后台任务
asyncio.create_task(run_analysis(task_id, data_size))

# 返回任务创建结果
return {
"task": task.model_dump(),
"_meta": {
"io.modelcontextprotocol/model-immediate-response":
f"开始分析 {data_size} 条数据,请稍候..."
}
}

async def run_analysis(task_id: str, data_size: int):
"""后台执行分析任务"""
try:
# 模拟长时间处理
for i in range(10):
await asyncio.sleep(2)

# 更新状态
tasks[task_id].statusMessage = f"已处理 {i+1}0% 的数据"
tasks[task_id].lastUpdatedAt = datetime.utcnow().isoformat() + "Z"

# 完成任务
tasks[task_id].status = TaskStatus.COMPLETED
tasks[task_id].lastUpdatedAt = datetime.utcnow().isoformat() + "Z"

# 存储结果
task_results[task_id] = [
TextContent(
type="text",
text=f"分析完成!处理了 {data_size} 条数据,发现 5 个模式。"
)
]

except Exception as e:
tasks[task_id].status = TaskStatus.FAILED
tasks[task_id].statusMessage = str(e)
tasks[task_id].lastUpdatedAt = datetime.utcnow().isoformat() + "Z"

任务管理端点

from mcp.types import McpError

@app.get_task()
async def get_task(task_id: str) -> dict:
"""获取任务状态"""
if task_id not in tasks:
raise McpError(code=-32602, message=f"Task not found: {task_id}")

task = tasks[task_id]
return {
"task": task.model_dump()
}

@app.get_task_result()
async def get_task_result(task_id: str) -> dict:
"""获取任务结果"""
if task_id not in tasks:
raise McpError(code=-32602, message=f"Task not found: {task_id}")

task = tasks[task_id]

# 等待任务完成
while task.status not in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
await asyncio.sleep(1)

if task.status == TaskStatus.FAILED:
raise McpError(code=-32603, message=task.statusMessage or "Task failed")

if task.status == TaskStatus.CANCELLED:
raise McpError(code=-32603, message="Task was cancelled")

return {
"content": task_results.get(task_id, []),
"_meta": {
"io.modelcontextprotocol/related-task": {
"taskId": task_id
}
}
}

@app.list_tasks()
async def list_tasks(cursor: str | None = None) -> dict:
"""列出所有任务"""
task_list = list(tasks.values())
return {
"tasks": [t.model_dump() for t in task_list[:20]],
"nextCursor": None if len(task_list) <= 20 else "more"
}

@app.cancel_task()
async def cancel_task(task_id: str) -> dict:
"""取消任务"""
if task_id not in tasks:
raise McpError(code=-32602, message=f"Task not found: {task_id}")

task = tasks[task_id]

if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]:
raise McpError(code=-32602, message="Cannot cancel task in terminal state")

task.status = TaskStatus.CANCELLED
task.lastUpdatedAt = datetime.utcnow().isoformat() + "Z"

return {}

客户端使用

发起任务调用

from mcp import ClientSession

async def call_tool_as_task(client: ClientSession):
# 调用工具并创建任务
result = await client.call_tool(
name="long_analysis",
arguments={"data_size": 10000},
task={"ttl": 3600000} # 1小时 TTL
)

task_id = result.task.taskId
print(f"任务已创建: {task_id}")

# 轮询任务状态
while True:
status = await client.get_task(task_id)
task = status.task

print(f"状态: {task.status}, 消息: {task.statusMessage}")

if task.status in ["completed", "failed", "cancelled"]:
break

# 使用建议的轮询间隔
poll_interval = task.pollInterval or 5000
await asyncio.sleep(poll_interval / 1000)

# 获取结果
if task.status == "completed":
result = await client.get_task_result(task_id)
print(f"结果: {result.content}")

使用通知

async def call_with_notifications(client: ClientSession):
# 设置状态变更回调
async def on_task_status(notification):
task = notification.params["task"]
print(f"任务 {task['taskId']} 状态变更: {task['status']}")

client.on_notification("notifications/tasks/status", on_task_status)

# 创建任务
result = await client.call_tool(
name="long_analysis",
arguments={"data_size": 10000},
task={}
)

task_id = result.task.taskId

# 等待结果(会被通知唤醒)
result = await client.get_task_result(task_id)
return result

处理 input_required 状态

当任务需要额外输入时,会进入 input_required 状态:

async def handle_input_required(client: ClientSession, task_id: str):
"""处理需要输入的任务"""
while True:
status = await client.get_task(task_id)

if status.task.status == "input_required":
# 任务需要输入,调用 tasks/result 获取输入请求
input_request = await client.get_task_result(task_id)

# input_request 包含服务器需要的输入信息
# 例如:需要用户确认敏感操作
user_input = await get_user_input(input_request)

# 提供输入后,任务会继续执行
# 注意:提供输入的方式取决于具体实现

elif status.task.status in ["completed", "failed", "cancelled"]:
break

await asyncio.sleep(1)

最佳实践

服务器端

  1. 设置合理的 TTL:根据任务预计执行时间设置 TTL
  2. 提供即时响应:在 _meta 中提供用户友好的即时消息
  3. 更新状态消息:定期更新 statusMessage 让用户了解进度
  4. 建议轮询间隔:设置合理的 pollInterval 避免过度轮询
  5. 清理过期任务:定期清理 TTL 已过期的任务

客户端

  1. 尊重轮询间隔:使用服务器建议的 pollInterval
  2. 处理所有状态:正确处理所有可能的任务状态
  3. 超时控制:设置总超时时间,避免无限等待
  4. 优雅取消:在不需要结果时主动取消任务
  5. 错误处理:正确处理任务失败和取消的情况

安全考虑

任务隔离

任务 ID 是访问任务状态和结果的主要机制。必须确保:

import secrets

def generate_task_id() -> str:
"""生成安全的任务 ID"""
# 使用加密安全的随机 ID
return secrets.token_urlsafe(32)

def validate_task_access(user_id: str, task_id: str) -> bool:
"""验证用户是否有权访问任务"""
task = tasks.get(task_id)
if not task:
return False

# 检查任务是否属于该用户
return task.owner_id == user_id

资源管理

MAX_CONCURRENT_TASKS = 10
MAX_TTL = 86400000 # 24小时

@app.call_tool()
async def call_tool(name, arguments, task=None):
# 检查并发任务数
user_tasks = get_user_tasks(current_user)
if len(user_tasks) >= MAX_CONCURRENT_TASKS:
raise McpError(
code=-32603,
message=f"Maximum concurrent tasks ({MAX_CONCURRENT_TASKS}) reached"
)

# 限制 TTL
if task and "ttl" in task:
task["ttl"] = min(task["ttl"], MAX_TTL)

# ...

与外部作业系统集成

任务 API 设计为可以与各种外部作业系统集成:

AWS Lambda 示例

import boto3

lambda_client = boto3.client("lambda")

async def start_lambda_task(function_name: str, payload: dict) -> str:
"""启动 Lambda 函数作为任务"""
response = await lambda_client.invoke_async(
FunctionName=function_name,
InvokeArgs=json.dumps(payload)
)

# 使用 Lambda 请求 ID 作为任务 ID
return response["ResponseMetadata"]["RequestId"]

Celery 示例

from celery import Celery

celery_app = Celery("tasks", broker="redis://localhost:6379")

@celery_app.task
def long_analysis_task(data_size: int):
"""Celery 任务"""
# 执行分析...
return {"result": "analysis complete"}

async def start_celery_task(arguments: dict) -> str:
"""启动 Celery 任务"""
result = long_analysis_task.delay(arguments["data_size"])
return result.id # 使用 Celery 任务 ID

下一步