首页 / AI工具 / 从零构建 AI Agent:基于 ReAct 模式的智能任务...

从零构建 AI Agent:基于 ReAct 模式的智能任务调度系统实战

从零构建 AI Agent:基于 ReAct 模式的智能任务调度系统实战

AI Agent(智能体)正在从概念走向生产环境。不同于单次对话的大模型调用,Agent 具备自主规划、工具调用、记忆迭代的能力,能够独立完成复杂任务。本文将手把手教你基于 ReAct(Reasoning + Acting)模式,从零构建一个支持工具调用的智能任务调度系统,涵盖完整代码、部署步骤与性能优化。

---

一、什么是 ReAct 模式?

ReAct 由 Google 在 2022 年提出,核心思想是让模型交替进行推理(Reasoning)和行动(Acting)

1. Thought(思考):分析当前状态,决定下一步策略 2. Action(行动):调用外部工具(搜索、计算、API 等) 3. Observation(观察):获取工具返回的结果 4. 循环:重复上述步骤直至完成任务

相比 Chain-of-Thought 仅输出推理过程,ReAct 将推理与外部世界交互结合,大幅提升了模型解决实际问题的能力。

---

二、系统架构设计

我们将构建一个轻量级 Agent 框架,核心组件包括:

组件职责
LLM Engine负责生成 Thought、选择 Action、产出 Final Answer
Tool Registry工具注册中心,支持动态加载工具
Memory短期对话记忆与长期向量记忆
Executor循环执行 ReAct 流程,处理工具调用与结果回注

技术栈

---

三、完整代码实现

3.1 基础工具定义

import json
import os
from typing import Callable, Dict, List, Optional
from openai import OpenAI

class Tool:
    """工具基类,每个工具需实现 name、description、parameters 和 run 方法"""
    def __init__(self, name: str, description: str, parameters: dict, func: Callable):
        self.name = name
        self.description = description
        self.parameters = parameters
        self.func = func

    def run(self, **kwargs):
        return self.func(**kwargs)

# 定义三个示例工具
def search_knowledge(query: str, top_k: int = 3) -> str:
    """模拟企业内部知识库检索"""
    kb = {
        "退款政策": "订单支付后7天内可申请退款,需联系客服并提供订单号。",
        "发货时间": "工作日16:00前下单当天发货,节假日顺延。",
        "会员权益": "VIP会员享受全场95折、专属客服、优先发货权益。"
    }
    results = [v for k, v in kb.items() if query in k or query in v]
    return "\n".join(results[:top_k]) if results else "未找到相关知识,建议联系人工客服。"

def calculate(expr: str) -> str:
    """安全数学计算工具"""
    allowed = set("0123456789+-*/(). ")
    if not all(c in allowed for c in expr):
        return "错误:表达式包含非法字符"
    try:
        result = eval(expr, {"__builtins__": {}}, {})
        return str(result)
    except Exception as e:
        return f"计算错误: {e}"

def send_alert(message: str, level: str = "info") -> str:
    """模拟发送告警通知"""
    levels = {"info": "【信息】", "warning": "【警告】", "critical": "【紧急】"}
    prefix = levels.get(level, "【信息】")
    return f"{prefix} 告警已发送: {message}"

# 注册工具
tools = [
    Tool("search_knowledge", "检索企业内部知识库,输入问题关键词返回相关政策说明", 
         {"query": {"type": "string", "description": "检索关键词"}}, search_knowledge),
    Tool("calculate", "执行数学计算,输入合法算术表达式返回结果", 
         {"expr": {"type": "string", "description": "算术表达式如 15*23+8"}}, calculate),
    Tool("send_alert", "发送系统告警通知", 
         {"message": {"type": "string"}, "level": {"type": "string", "enum": ["info", "warning", "critical"]}}, send_alert),
]

3.2 ReAct Agent 核心引擎

class ReActAgent:
    def __init__(self, llm_client: OpenAI, model: str = "gpt-4o", max_steps: int = 10):
        self.client = llm_client
        self.model = model
        self.max_steps = max_steps
        self.tools: Dict[str, Tool] = {}
        self.memory: List[dict] = []  # 对话历史

    def register_tool(self, tool: Tool):
        self.tools[tool.name] = tool

    def build_system_prompt(self) -> str:
        tool_descs = "\n".join(
            f"- {name}: {t.description}, 参数: {json.dumps(t.parameters, ensure_ascii=False)}"
            for name, t in self.tools.items()
        )
        return f"""你是一个智能任务调度助手。请严格按 ReAct 模式解决问题:

可用工具:
{tool_descs}

输出格式要求(严格遵循):
Thought: [你的思考过程,分析当前状态]
Action: [工具名]([JSON格式参数])
Observation: [等待系统填入工具返回结果]
...
当获得足够信息时,输出:
Final Answer: [最终答案]

规则:
1. 每次只能输出一个 Thought + Action
2. 若工具参数不完整,先 Thought 分析需要什么信息
3. 禁止编造 Observation,必须等待真实工具返回"""

    def parse_action(self, text: str) -> Optional[tuple]:
        import re
        m = re.search(r'Action:\s*(\w+)\((.*)\)', text, re.DOTALL)
        if not m:
            return None
        tool_name, args_str = m.group(1), m.group(2)
        try:
            args = json.loads(args_str) if args_str.strip() else {}
        except json.JSONDecodeError:
            args = {}
        return tool_name, args

    def run(self, user_query: str) -> str:
        self.memory = [{"role": "system", "content": self.build_system_prompt()}]
        self.memory.append({"role": "user", "content": f"用户问题: {user_query}\n请开始解决。"})

        for step in range(self.max_steps):
            resp = self.client.chat.completions.create(
                model=self.model,
                messages=self.memory,
                temperature=0.2,
                max_tokens=800
            )
            assistant_msg = resp.choices[0].message.content
            self.memory.append({"role": "assistant", "content": assistant_msg})

            if "Final Answer:" in assistant_msg:
                return assistant_msg.split("Final Answer:", 1)[1].strip()

            action = self.parse_action(assistant_msg)
            if not action:
                self.memory.append({"role": "user", "content": "Observation: 未检测到合法 Action,请重新思考并输出正确格式。"})
                continue

            tool_name, args = action
            if tool_name not in self.tools:
                obs = f"Observation: 工具 '{tool_name}' 不存在,可用工具: {list(self.tools.keys())}"
            else:
                try:
                    result = self.tools[tool_name].run(**args)
                    obs = f"Observation: {result}"
                except Exception as e:
                    obs = f"Observation: 工具调用异常: {e}"

            self.memory.append({"role": "user", "content": obs})
            print(f"[Step {step+1}] {tool_name}({args}) -> {obs[:120]}...")

        return "Agent 达到最大步数限制,未能完成任务。"

3.3 运行与测试

if __name__ == "__main__":
    client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"))
    agent = ReActAgent(client, model="gpt-4o")
    
    for t in tools:
        agent.register_tool(t)

    # 测试场景1:知识检索+计算
    query1 = "我买了3件会员价95折后199元的商品,加上运费15元,总共要付多少钱?另外会员发货快吗?"
    print("=== 测试1: 复合任务 ===")
    print(agent.run(query1))

    # 测试场景2:告警触发
    query2 = "系统CPU使用率超过90%持续5分钟,请发送紧急告警"
    print("\n=== 测试2: 告警任务 ===")
    print(agent.run(query2))

运行结果示例:

[Step 1] calculate({'expr': '199 * 0.95 * 3 + 15'}) -> Observation: 582.15...
[Step 2] search_knowledge({'query': '会员发货'}) -> Observation: VIP会员享受全场95折、专属客服、优先发货权益。...
=== 测试1: 复合任务 ===
Final Answer: 您总共需要支付 582.15 元。关于发货:VIP 会员享受优先发货权益,工作日16:00前下单当天发货。

[Step 1] send_alert({'message': '系统CPU使用率超过90%持续5分钟', 'level': 'critical'}) -> Observation: 【紧急】 告警已发送: ...
=== 测试2: 告警任务 ===
Final Answer: 紧急告警已发送。请立即检查系统负载并排查异常进程。

---

四、实操步骤:部署到生产环境

步骤1:环境准备

python3 -m venv agent-env
source agent-env/bin/activate
pip install openai langchain langchain-openai

步骤2:配置环境变量

export OPENAI_API_KEY="sk-xxx"
export OPENAI_BASE_URL="https://api.openai.com/v1"  # 或国内代理地址

步骤3:扩展工具集

实际生产环境中,建议封装以下工具:

工具场景
SQL 查询业务数据分析
REST API 调用对接内部微服务
向量检索私有文档问答
邮件/企微发送消息触达
文件读写报表生成

步骤4:添加记忆持久化

import redis

class RedisMemory:
    def __init__(self, redis_client: redis.Redis, session_id: str):
        self.r = redis_client
        self.key = f"agent:memory:{session_id}"
    
    def add(self, msg: dict):
        self.r.rpush(self.key, json.dumps(msg, ensure_ascii=False))
    
    def get_all(self) -> List[dict]:
        raw = self.r.lrange(self.key, 0, -1)
        return [json.loads(x) for x in raw]

---

五、性能优化与监控

1. 并行工具调用:若模型支持 parallel_tool_calls,可同时发起多个独立工具请求,降低总延迟。 2. Prompt 缓存:对静态 system prompt 使用 OpenAI 的 cache_control,减少重复 token 消耗。 3. 超时熔断:为每个工具设置独立超时(如 5s),防止慢查询阻塞 Agent 循环。 4. 日志追踪:每轮记录 Thought/Action/Observation,便于事后审计与调试。

---

六、常见问题 FAQ

Q1: ReAct 和 Function Calling 有什么区别?

Function Calling 是模型原生支持的"结构化输出+工具选择"能力;ReAct 是一种推理范式。实践中可将二者结合:用 Function Calling 做 Action 解析,用 ReAct 思想设计 system prompt 引导模型分步思考。

Q2: Agent 陷入循环怎么办?

设置 max_steps 上限(建议 10-15),并在 prompt 中增加"若连续 3 步无实质进展,输出 Final Answer 说明无法解决"。

Q3: 工具返回结果过长,超出模型上下文怎么办?

对长文本做摘要(map-reduce)或分块检索,只将最相关的 Top-K 片段注入 Observation。

Q4: 如何降低 API 调用成本? Q5: Agent 的安全性如何保障? ---

七、总结

本文从原理到代码,完整演示了基于 ReAct 模式的 AI Agent 构建流程。核心要点:

你可以基于此框架快速扩展企业内部智能客服、运维助手、数据分析 Agent 等场景。