从零构建 AI Agent:基于 ReAct 模式的智能任务调度系统实战
AI Agent(智能体)正在从概念走向生产环境。不同于单次对话的大模型调用,Agent 具备自主规划、工具调用、记忆迭代的能力,能够独立完成复杂任务。本文将手把手教你基于 ReAct(Reasoning + Acting)模式,从零构建一个支持工具调用的智能任务调度系统,涵盖完整代码、部署步骤与性能优化。
---
一、什么是 ReAct 模式?
ReAct 由 Google 在 2022 年提出,核心思想是让模型交替进行推理(Reasoning)和行动(Acting):
1. Thought(思考):分析当前状态,决定下一步策略 2. Action(行动):调用外部工具(搜索、计算、API 等) 3. Observation(观察):获取工具返回的结果 4. 循环:重复上述步骤直至完成任务
相比 Chain-of-Thought 仅输出推理过程,ReAct 将推理与外部世界交互结合,大幅提升了模型解决实际问题的能力。
---
二、系统架构设计
我们将构建一个轻量级 Agent 框架,核心组件包括:
| 组件 | 职责 |
|---|---|
| LLM Engine | 负责生成 Thought、选择 Action、产出 Final Answer |
| Tool Registry | 工具注册中心,支持动态加载工具 |
| Memory | 短期对话记忆与长期向量记忆 |
| Executor | 循环执行 ReAct 流程,处理工具调用与结果回注 |
技术栈
- Python 3.10+
- OpenAI API / Claude API(兼容任意 OpenAI 格式接口)
langchain(工具抽象与提示模板)openai(大模型调用)
三、完整代码实现
3.1 基础工具定义
import json
import os
from typing import Callable, Dict, List, Optional
from openai import OpenAI
class Tool:
"""工具基类,每个工具需实现 name、description、parameters 和 run 方法"""
def __init__(self, name: str, description: str, parameters: dict, func: Callable):
self.name = name
self.description = description
self.parameters = parameters
self.func = func
def run(self, **kwargs):
return self.func(**kwargs)
# 定义三个示例工具
def search_knowledge(query: str, top_k: int = 3) -> str:
"""模拟企业内部知识库检索"""
kb = {
"退款政策": "订单支付后7天内可申请退款,需联系客服并提供订单号。",
"发货时间": "工作日16:00前下单当天发货,节假日顺延。",
"会员权益": "VIP会员享受全场95折、专属客服、优先发货权益。"
}
results = [v for k, v in kb.items() if query in k or query in v]
return "\n".join(results[:top_k]) if results else "未找到相关知识,建议联系人工客服。"
def calculate(expr: str) -> str:
"""安全数学计算工具"""
allowed = set("0123456789+-*/(). ")
if not all(c in allowed for c in expr):
return "错误:表达式包含非法字符"
try:
result = eval(expr, {"__builtins__": {}}, {})
return str(result)
except Exception as e:
return f"计算错误: {e}"
def send_alert(message: str, level: str = "info") -> str:
"""模拟发送告警通知"""
levels = {"info": "【信息】", "warning": "【警告】", "critical": "【紧急】"}
prefix = levels.get(level, "【信息】")
return f"{prefix} 告警已发送: {message}"
# 注册工具
tools = [
Tool("search_knowledge", "检索企业内部知识库,输入问题关键词返回相关政策说明",
{"query": {"type": "string", "description": "检索关键词"}}, search_knowledge),
Tool("calculate", "执行数学计算,输入合法算术表达式返回结果",
{"expr": {"type": "string", "description": "算术表达式如 15*23+8"}}, calculate),
Tool("send_alert", "发送系统告警通知",
{"message": {"type": "string"}, "level": {"type": "string", "enum": ["info", "warning", "critical"]}}, send_alert),
]
3.2 ReAct Agent 核心引擎
class ReActAgent:
def __init__(self, llm_client: OpenAI, model: str = "gpt-4o", max_steps: int = 10):
self.client = llm_client
self.model = model
self.max_steps = max_steps
self.tools: Dict[str, Tool] = {}
self.memory: List[dict] = [] # 对话历史
def register_tool(self, tool: Tool):
self.tools[tool.name] = tool
def build_system_prompt(self) -> str:
tool_descs = "\n".join(
f"- {name}: {t.description}, 参数: {json.dumps(t.parameters, ensure_ascii=False)}"
for name, t in self.tools.items()
)
return f"""你是一个智能任务调度助手。请严格按 ReAct 模式解决问题:
可用工具:
{tool_descs}
输出格式要求(严格遵循):
Thought: [你的思考过程,分析当前状态]
Action: [工具名]([JSON格式参数])
Observation: [等待系统填入工具返回结果]
...
当获得足够信息时,输出:
Final Answer: [最终答案]
规则:
1. 每次只能输出一个 Thought + Action
2. 若工具参数不完整,先 Thought 分析需要什么信息
3. 禁止编造 Observation,必须等待真实工具返回"""
def parse_action(self, text: str) -> Optional[tuple]:
import re
m = re.search(r'Action:\s*(\w+)\((.*)\)', text, re.DOTALL)
if not m:
return None
tool_name, args_str = m.group(1), m.group(2)
try:
args = json.loads(args_str) if args_str.strip() else {}
except json.JSONDecodeError:
args = {}
return tool_name, args
def run(self, user_query: str) -> str:
self.memory = [{"role": "system", "content": self.build_system_prompt()}]
self.memory.append({"role": "user", "content": f"用户问题: {user_query}\n请开始解决。"})
for step in range(self.max_steps):
resp = self.client.chat.completions.create(
model=self.model,
messages=self.memory,
temperature=0.2,
max_tokens=800
)
assistant_msg = resp.choices[0].message.content
self.memory.append({"role": "assistant", "content": assistant_msg})
if "Final Answer:" in assistant_msg:
return assistant_msg.split("Final Answer:", 1)[1].strip()
action = self.parse_action(assistant_msg)
if not action:
self.memory.append({"role": "user", "content": "Observation: 未检测到合法 Action,请重新思考并输出正确格式。"})
continue
tool_name, args = action
if tool_name not in self.tools:
obs = f"Observation: 工具 '{tool_name}' 不存在,可用工具: {list(self.tools.keys())}"
else:
try:
result = self.tools[tool_name].run(**args)
obs = f"Observation: {result}"
except Exception as e:
obs = f"Observation: 工具调用异常: {e}"
self.memory.append({"role": "user", "content": obs})
print(f"[Step {step+1}] {tool_name}({args}) -> {obs[:120]}...")
return "Agent 达到最大步数限制,未能完成任务。"
3.3 运行与测试
if __name__ == "__main__":
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"))
agent = ReActAgent(client, model="gpt-4o")
for t in tools:
agent.register_tool(t)
# 测试场景1:知识检索+计算
query1 = "我买了3件会员价95折后199元的商品,加上运费15元,总共要付多少钱?另外会员发货快吗?"
print("=== 测试1: 复合任务 ===")
print(agent.run(query1))
# 测试场景2:告警触发
query2 = "系统CPU使用率超过90%持续5分钟,请发送紧急告警"
print("\n=== 测试2: 告警任务 ===")
print(agent.run(query2))
运行结果示例:
[Step 1] calculate({'expr': '199 * 0.95 * 3 + 15'}) -> Observation: 582.15...
[Step 2] search_knowledge({'query': '会员发货'}) -> Observation: VIP会员享受全场95折、专属客服、优先发货权益。...
=== 测试1: 复合任务 ===
Final Answer: 您总共需要支付 582.15 元。关于发货:VIP 会员享受优先发货权益,工作日16:00前下单当天发货。
[Step 1] send_alert({'message': '系统CPU使用率超过90%持续5分钟', 'level': 'critical'}) -> Observation: 【紧急】 告警已发送: ...
=== 测试2: 告警任务 ===
Final Answer: 紧急告警已发送。请立即检查系统负载并排查异常进程。
---
四、实操步骤:部署到生产环境
步骤1:环境准备
python3 -m venv agent-env
source agent-env/bin/activate
pip install openai langchain langchain-openai
步骤2:配置环境变量
export OPENAI_API_KEY="sk-xxx"
export OPENAI_BASE_URL="https://api.openai.com/v1" # 或国内代理地址
步骤3:扩展工具集
实际生产环境中,建议封装以下工具:
| 工具 | 场景 |
|---|---|
| SQL 查询 | 业务数据分析 |
| REST API 调用 | 对接内部微服务 |
| 向量检索 | 私有文档问答 |
| 邮件/企微发送 | 消息触达 |
| 文件读写 | 报表生成 |
步骤4:添加记忆持久化
import redis
class RedisMemory:
def __init__(self, redis_client: redis.Redis, session_id: str):
self.r = redis_client
self.key = f"agent:memory:{session_id}"
def add(self, msg: dict):
self.r.rpush(self.key, json.dumps(msg, ensure_ascii=False))
def get_all(self) -> List[dict]:
raw = self.r.lrange(self.key, 0, -1)
return [json.loads(x) for x in raw]
---
五、性能优化与监控
1. 并行工具调用:若模型支持 parallel_tool_calls,可同时发起多个独立工具请求,降低总延迟。
2. Prompt 缓存:对静态 system prompt 使用 OpenAI 的 cache_control,减少重复 token 消耗。
3. 超时熔断:为每个工具设置独立超时(如 5s),防止慢查询阻塞 Agent 循环。
4. 日志追踪:每轮记录 Thought/Action/Observation,便于事后审计与调试。
---
六、常见问题 FAQ
Q1: ReAct 和 Function Calling 有什么区别?Function Calling 是模型原生支持的"结构化输出+工具选择"能力;ReAct 是一种推理范式。实践中可将二者结合:用 Function Calling 做 Action 解析,用 ReAct 思想设计 system prompt 引导模型分步思考。
Q2: Agent 陷入循环怎么办?设置 max_steps 上限(建议 10-15),并在 prompt 中增加"若连续 3 步无实质进展,输出 Final Answer 说明无法解决"。
对长文本做摘要(map-reduce)或分块检索,只将最相关的 Top-K 片段注入 Observation。
Q4: 如何降低 API 调用成本?- 轻量推理用
gpt-4o-mini或国产模型 - 复用历史对话的 KV Cache
- 对确定性步骤用规则引擎替代模型调用
- 工具层做参数校验与权限控制
- 敏感操作(转账、删库)需人工确认
- 对模型输出做内容安全过滤
七、总结
本文从原理到代码,完整演示了基于 ReAct 模式的 AI Agent 构建流程。核心要点:
- ReAct 通过 Thought-Action-Observation 循环让模型具备自主决策能力
- 工具注册中心让 Agent 能力可扩展
- 生产部署需关注记忆持久化、超时熔断与成本优化