首页 / AI工具 / AI工作流自动化实战:用n8n与Lang...

AI工作流自动化实战:用n8n与LangGraph搭建企业级智能任务链

AI工作流自动化实战:用n8n与LangGraph搭建企业级智能任务链

用n8n与LangGraph搭建企业级AI工作流,包含可视化客服和多步骤智能体完整实战代码。

# AI工作流自动化实战:用n8n与LangGraph搭建企业级智能任务链

前言

在企业数字化转型过程中,越来越多的团队希望将大语言模型(LLM)能力嵌入到日常业务流程中——智能客服、文档审核、数据提取、自动化报告生成……这些场景的共同特点是:单一LLM调用无法完成,需要多个环节串联、条件分支、人工审批与外部系统集成。这正是"AI工作流自动化"要解决的核心问题。

本文将从概念讲起,经过工具选型对比,带你完成两个完整的实战项目:一个基于 n8n 的可视化AI客服工作流,一个基于 LangGraph 的多步骤Python智能体。所有代码均可直接复制运行。


一、AI工作流核心概念

在动手之前,先厘清几个关键术语:

概念说明
Workflow(工作流)由多个节点(Node)按有向无环图(DAG)或状态机方式连接而成的任务执行链路
Agent(智能体)具备自主决策能力的AI实体,可根据上下文选择调用不同工具或跳转到不同分支
Tool / Function CallingLLM通过结构化输出来调用外部函数(如查询数据库、发送邮件)
State(状态)工作流在执行过程中传递和维护的数据结构,每个节点可读写状态
Human-in-the-loop在关键节点插入人工审批,确保AI输出可控

一个典型的企业AI工作流长这样:

bash
用户输入 → 意图识别(LLM) → [分支A: 查询知识库] / [分支B: 转人工]
                                    ↓
                              结果汇总(LLM) → 格式化输出 → 通知推送

理解这些概念后,我们来看工具选型。


二、工具选型对比:n8n vs Dify vs LangGraph

维度n8nDifyLangGraph
定位通用自动化平台(支持AI节点)AI应用开发平台Python代码级AI Agent框架
使用方式可视化拖拽 + 少量JS/表达式可视化编排 + Prompt管理纯Python代码
学习曲线低(可视化为主)中高(需Python + LangChain基础)
灵活度中(节点能力有限制)中(平台封装较多)极高(完全自定义)
适合场景IT运维、客服、表单处理、定时任务对话机器人、知识库问答复杂多步推理、研究型Agent
部署方式Docker自托管 / 云服务Docker自托管 / 云服务随Python应用部署
社区生态400+集成节点国内生态活跃LangChain生态

选型建议:

本文选择 n8n + LangGraph 的组合,覆盖"低门槛快速搭建"和"高灵活度深度定制"两个极端。


三、实战1:用n8n搭建AI客服工作流

3.1 场景描述

搭建一个智能客服工作流:

  1. 用户通过Webhook发送问题
  2. AI识别意图(技术支持 / 售前咨询 / 投诉)
  3. 根据意图路由到不同处理分支
  4. 调用外部知识库获取答案
  5. 返回结构化响应

3.2 环境准备

bash
# 使用Docker快速启动n8n
docker run -d \
  --name n8n \
  -p 5678:5678 \
  -v n8n_data:/home/node/.n8n \
  -e N8N_BASIC_AUTH_ACTIVE=true \
  -e N8N_BASIC_AUTH_USER=admin \
  -e N8N_BASIC_AUTH_PASSWORD=your_password \
  n8nio/n8n:latest

启动后访问 http://localhost:5678,用配置的账号密码登录。

3.3 工作流节点配置步骤

步骤1:创建Webhook触发节点

bash
   http://localhost:5678/webhook/customer-support
  1. 保存后,n8n会自动生成Webhook URL,格式为:

Webhook节点会接收用户发送的JSON数据,预期格式为:{"user_id": "u001", "message": "我的订单什么时候发货?"}

步骤2:添加AI意图分类节点

text
   你是一个意图分类器。根据用户消息,判断其属于以下哪类意图,仅输出类别名称:
   - tech_support(技术支持)
   - presale(售前咨询)
   - complaint(投诉)

   用户消息:{{ $json.body.message }}

步骤3:添加Switch路由节点

n8n的Switch节点在画布上会显示为菱形,每个输出端口对应一个分支,非常直观。

步骤4:添加知识库查询节点(以HTTP Request为例)

每个分支下添加 "HTTP Request" 节点,调用你的知识库API:

bash
Method: POST
URL: https://your-api.example.com/knowledge/search
Body (JSON):
{
  "query": "{{ $('Webhook').item.json.body.message }}",
  "category": "{{ $json.category }}"
}
Headers:
  Authorization: Bearer {{ $env.KB_API_KEY }}

步骤5:添加AI回答生成节点

在每个分支的知识库查询后,添加OpenAI节点生成自然语言回答:

bash
System Message:
你是一个专业的客服助手。根据以下知识库返回的信息,用友好、专业的语气回答用户问题。
如果是投诉类问题,请表达歉意并给出解决方案。

知识库信息:
{{ $json.answer }}

用户原始问题:{{ $('Webhook').item.json.body.message }}

步骤6:添加Respond to Webhook节点

在所有分支汇聚后,添加 "Respond to Webhook" 节点,将AI回答返回给调用方:

bash
Respond With:
{
  "status": "success",
  "intent": "{{ $json.intent }}",
  "reply": "{{ $json.message }}",
  "timestamp": "{{ $now.toISO() }}"
}

3.4 完整工作流JSON(可导入n8n)

将以下JSON复制到n8n的导入功能中即可还原完整工作流:

json
{
  "name": "AI Customer Support Workflow",
  "nodes": [
    {
      "parameters": {
        "httpMethod": "POST",
        "path": "customer-support",
        "responseMode": "responseNode"
      },
      "id": "webhook-1",
      "name": "Webhook Trigger",
      "type": "n8n-nodes-base.webhook",
      "typeVersion": 2,
      "position": [0, 300]
    },
    {
      "parameters": {
        "model": "gpt-4o-mini",
        "messages": {
          "values": [
            {
              "content": "你是一个意图分类器。根据用户消息,判断其属于以下哪类意图,仅输出类别名称:\n- tech_support(技术支持)\n- presale(售前咨询)\n- complaint(投诉)\n\n用户消息:{{ $json.body.message }}"
            }
          ]
        }
      },
      "id": "intent-classifier",
      "name": "Intent Classifier",
      "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
      "typeVersion": 1,
      "position": [250, 300]
    },
    {
      "parameters": {
        "rules": {
          "rules": [
            { "outputKey": "tech", "conditions": { "conditions": [{ "leftValue": "={{ $json.message }}", "rightValue": "tech_support", "operator": { "type": "string", "operation": "contains" } }] } },
            { "outputKey": "presale", "conditions": { "conditions": [{ "leftValue": "={{ $json.message }}", "rightValue": "presale", "operator": { "type": "string", "operation": "contains" } }] } },
            { "outputKey": "complaint", "conditions": { "conditions": [{ "leftValue": "={{ $json.message }}", "rightValue": "complaint", "operator": { "type": "string", "operation": "contains" } }] } }
          ]
        }
      },
      "id": "switch-1",
      "name": "Intent Router",
      "type": "n8n-nodes-base.switch",
      "typeVersion": 3,
      "position": [500, 300]
    }
  ],
  "connections": {
    "Webhook Trigger": { "main": [[{ "node": "Intent Classifier", "type": "main", "index": 0 }]] },
    "Intent Classifier": { "main": [[{ "node": "Intent Router", "type": "main", "index": 0 }]] }
  }
}

上述JSON为精简版骨架,实际部署时建议在n8n画布中手动补充知识库查询和回答生成节点,并配置真实的API密钥。

3.5 测试工作流

使用curl命令测试:

bash
curl -X POST http://localhost:5678/webhook/customer-support \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "u001",
    "message": "我的App一直闪退怎么办?"
  }'

预期返回:

json
{
  "status": "success",
  "intent": "tech_support",
  "reply": "您好!App闪退通常与缓存积压有关,建议您尝试以下步骤:1) 清除App缓存;2) 更新至最新版本;3) 重启设备后重试。如果问题持续,请提供您的设备型号,我们会进一步排查。",
  "timestamp": "2026-07-02T16:30:00.000Z"
}

四、实战2:用LangGraph构建多步骤智能体

4.1 场景描述

构建一个自动化研究报告生成智能体,包含以下步骤:

  1. 分析需求 → 理解用户的研究主题
  2. 信息检索 → 调用搜索工具获取资料
  3. 内容整合 → LLM综合信息并生成大纲
  4. 章节撰写 → 逐章节展开详细内容
  5. 质量审查 → LLM自审并优化输出
  6. 格式化输出 → 生成最终Markdown报告

4.2 环境安装

bash
pip install langgraph langchain-openai langchain-community

4.3 完整代码实现

python
"""
LangGraph 多步骤研究报告智能体
功能:接收研究主题 → 检索信息 → 生成大纲 → 逐章撰写 → 自审优化 → 输出报告
"""

import json
import operator
from typing import TypedDict, Annotated, List, Dict
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain.tools import tool


# ========== 1. 定义状态 ==========
class ResearchState(TypedDict):
    """工作流状态:所有节点共享的数据结构"""
    topic: str                    # 研究主题
    search_results: str           # 搜索结果原始文本
    outline: str                  # 生成的大纲
    sections: Annotated[list, operator.add]  # 各章节内容(累加)
    final_report: str             # 最终报告
    review_feedback: str          # 审查反馈
    iteration_count: int         # 审查迭代次数


# ========== 2. 定义工具 ==========
@tool
def search_web(query: str) -> str:
    """
    模拟网络搜索(生产环境替换为真实搜索API,如Tavily/SerpAPI)
    """
    # 生产环境替换为:
    # from langchain_community.utilities import SerpAPIWrapper
    # search = SerpAPIWrapper()
    # return search.run(query)

    mock_results = {
        "AI工作流": "2026年AI工作流市场预计增长45%,主要驱动力包括企业数字化转型..."
        "n8n是最流行的开源工作流自动化工具,支持400+集成节点..."
        "LangGraph成为构建复杂AI Agent的首选框架...",

        "大模型应用": "大模型在企业中的应用场景已从客服扩展到代码生成、数据分析..."
        "多Agent协作模式正在成为主流架构范式...",

        "default": "相关领域正在快速发展,多项研究表明AI技术正在重塑工作方式..."
    }

    for key, value in mock_results.items():
        if key in query:
            return value
    return mock_results["default"]


# ========== 3. 初始化LLM ==========
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.7,
    max_tokens=4096,
)


# ========== 4. 定义工作流节点 ==========
def analyze_requirement(state: ResearchState) -> dict:
    """节点1:分析用户需求,明确研究方向"""
    prompt = (
        f"你是一个研究分析师。用户想研究以下主题:'{state['topic']}'\n"
        "请分析这个主题的:\n"
        "1. 核心关键词(用于搜索)\n"
        "2. 研究范围和边界\n"
        "3. 目标读者\n"
        "输出一段简洁的分析说明(100字以内)。"
    )
    response = llm.invoke([HumanMessage(content=prompt)])
    return {"search_results": f"[需求分析] {response.content}"}


def retrieve_information(state: ResearchState) -> dict:
    """节点2:调用搜索工具获取信息"""
    topic = state["topic"]
    # 构造搜索查询
    queries = [
        f"{topic} 最新趋势 2026",
        f"{topic} 实践案例",
        f"{topic} 技术挑战与解决方案",
    ]

    all_results = []
    for q in queries:
        result = search_web.invoke({"query": q})
        all_results.append(f"查询'{q}':\n{result}")

    combined = "\n\n".join(all_results)
    previous = state.get("search_results", "")
    return {"search_results": f"{previous}\n\n[检索结果]\n{combined}"}


def generate_outline(state: ResearchState) -> dict:
    """节点3:基于检索结果生成报告大纲"""
    prompt = (
        f"基于以下信息,为'{state['topic']}'生成一份结构化报告大纲。\n"
        f"信息来源:\n{state['search_results']}\n\n"
        "要求:\n"
        "1. 包含3-5个主要章节\n"
        "2. 每章包含2-3个子节\n"
        "3. 大纲以Markdown格式输出\n"
        "4. 确保内容逻辑连贯"
    )
    response = llm.invoke([HumanMessage(content=prompt)])
    return {"outline": response.content}


def write_sections(state: ResearchState) -> dict:
    """节点4:按大纲逐章节撰写"""
    outline = state["outline"]
    search_results = state["search_results"]

    prompt = (
        f"你是一个专业的内容撰写者。请根据以下大纲和参考资料,撰写完整的报告内容。\n\n"
        f"大纲:\n{outline}\n\n"
        f"参考资料:\n{search_results}\n\n"
        "要求:\n"
        "1. 按大纲结构逐章展开\n"
        "2. 每章300-500字\n"
        "3. 包含具体数据和案例\n"
        "4. 以Markdown格式输出完整报告\n"
        "5. 语言专业但易懂"
    )
    response = llm.invoke([HumanMessage(content=prompt)])

    # 将报告按章节拆分
    sections = response.content.split("\n## ")
    section_list = [f"## {s}" for s in sections if s.strip()]

    return {"sections": section_list, "final_report": response.content}


def review_and_refine(state: ResearchState) -> dict:
    """节点5:质量审查与优化"""
    report = state["final_report"]
    iteration = state.get("iteration_count", 0)

    prompt = (
        f"你是一个严格的内容审核编辑。请审查以下报告并提出改进建议。\n\n"
        f"报告内容:\n{report}\n\n"
        "审查维度:\n"
        "1. 内容准确性:是否有明显错误?\n"
        "2. 结构完整性:各章节是否均衡?\n"
        "3. 语言质量:表达是否清晰专业?\n"
        "4. 信息密度:是否包含足够的数据和案例?\n\n"
        "如果需要改进,输出具体的修改建议(Markdown格式)。\n"
        "如果已经足够好,输出:'APPROVED'"
    )
    response = llm.invoke([HumanMessage(content=prompt)])
    feedback = response.content

    # 如果审查通过或迭代次数超过2次,进入输出
    if "APPROVED" in feedback or iteration >= 2:
        if iteration >= 2 and "APPROVED" not in feedback:
            # 超过迭代上限但仍有反馈,强制通过但记录意见
            return {
                "review_feedback": feedback,
                "iteration_count": iteration + 1,
                "final_report": report + f"\n\n---\n*注:以下审查意见待后续优化*\n{feedback}"
            }
        return {
            "review_feedback": feedback,
            "iteration_count": iteration + 1,
        }

    # 如果需要修改,重新生成
    refine_prompt = (
        f"根据以下审查反馈,优化报告内容。\n\n"
        f"原报告:\n{report}\n\n"
        f"审查反馈:\n{feedback}\n\n"
        "请直接输出优化后的完整报告(Markdown格式)。"
    )
    refined = llm.invoke([HumanMessage(content=refine_prompt)])

    return {
        "review_feedback": feedback,
        "final_report": refined.content,
        "iteration_count": iteration + 1,
    }


def format_output(state: ResearchState) -> dict:
    """节点6:最终格式化"""
    report = state["final_report"]
    topic = state["topic"]

    final = f"""# {topic} - 研究报告

> 本报告由AI智能体自动生成
> 生成时间:2026-07-02
> 审查迭代次数:{state.get('iteration_count', 0)}

---

{report}

---

*报告结束*
"""
    return {"final_report": final}


# ========== 5. 条件路由:决定是否需要再次审查 ==========
def should_continue_review(state: ResearchState) -> str:
    """审查后决定下一步:继续审查 or 进入输出"""
    feedback = state.get("review_feedback", "")
    iteration = state.get("iteration_count", 0)

    if "APPROVED" in feedback or iteration >= 2:
        return "format_output"
    return "review_and_refine"


# ========== 6. 构建状态图 ==========
def build_graph():
    """构建LangGraph工作流"""
    workflow = StateGraph(ResearchState)

    # 添加节点
    workflow.add_node("analyze", analyze_requirement)
    workflow.add_node("retrieve", retrieve_information)
    workflow.add_node("outline", generate_outline)
    workflow.add_node("write", write_sections)
    workflow.add_node("review", review_and_refine)
    workflow.add_node("format", format_output)

    # 设置入口
    workflow.set_entry_point("analyze")

    # 添加边(线性流程)
    workflow.add_edge("analyze", "retrieve")
    workflow.add_edge("retrieve", "outline")
    workflow.add_edge("outline", "write")
    workflow.add_edge("write", "review")

    # 审查后条件路由
    workflow.add_conditional_edges(
        "review",
        should_continue_review,
        {
            "review_and_refine": "review",
            "format_output": "format",
        }
    )

    workflow.add_edge("format", END)

    return workflow.compile()


# ========== 7. 执行 ==========
if __name__ == "__main__":
    import os

    # 设置API Key
    os.environ["OPENAI_API_KEY"] = "your-openai-api-key-here"

    # 构建并执行工作流
    graph = build_graph()

    # 初始状态
    initial_state = {
        "topic": "AI工作流自动化在企业中的应用",
        "search_results": "",
        "outline": "",
        "sections": [],
        "final_report": "",
        "review_feedback": "",
        "iteration_count": 0,
    }

    print("=" * 60)
    print("开始执行研究报告生成工作流...")
    print("=" * 60)

    # 执行并打印每个节点的输出
    for event in graph.stream(initial_state):
        for node_name, node_output in event.items():
            print(f"\n{'─' * 40}")
            print(f"[节点] {node_name}")
            print(f"{'─' * 40}")

            # 只打印关键信息,避免过长输出
            if "outline" in node_output and node_output["outline"]:
                preview = node_output["outline"][:200]
                print(f"大纲预览: {preview}...")
            if "review_feedback" in node_output and node_output["review_feedback"]:
                print(f"审查反馈: {node_output['review_feedback'][:150]}...")
            if "iteration_count" in node_output:
                print(f"迭代次数: {node_output['iteration_count']}")

    # 打印最终报告
    print("\n" + "=" * 60)
    print("最终报告已生成!")
    print("=" * 60)
    print(graph.invoke(initial_state)["final_report"])

4.4 代码结构说明

bash
工作流执行流程:

analyze → retrieve → outline → write → review ─┬→ review (循环优化,最多2次)
                                               └→ format → END

关键设计点:

  1. 状态管理:通过 ResearchState TypedDict 统一管理所有节点间传递的数据
  2. 累加操作sections 字段使用 Annotated[list, operator.add] 实现列表累加
  3. 条件路由should_continue_review 函数决定审查后是继续优化还是输出
  4. 迭代上限:通过 iteration_count 防止无限循环,最多迭代2次
  5. 工具封装@tool 装饰器将函数注册为LangChain工具,方便后续扩展

4.5 运行方式

bash
# 设置API Key
export OPENAI_API_KEY="sk-your-key-here"

# 运行
python research_agent.py

程序会依次执行每个节点,最终输出一份完整的Markdown格式研究报告。


五、生产部署注意事项

5.1 安全与权限控制

python
# n8n中可在Webhook节点前添加"Set"节点做校验
# 或在LangGraph入口节点添加校验逻辑
def validate_input(state: ResearchState) -> dict:
    if not state["topic"] or len(state["topic"]) > 500:
        raise ValueError("研究主题不能为空且不超过500字符")
    return state

5.2 错误处理与重试

python
# n8n:在节点设置中配置"Retry on Fail",推荐3次,指数退避
# LangGraph:使用try-except包装节点函数
def safe_retrieve(state: ResearchState) -> dict:
    try:
        return retrieve_information(state)
    except Exception as e:
        # 记录错误并返回降级结果
        return {
            "search_results": f"[检索失败: {str(e)}] 请使用已有信息继续分析。",
        }

5.3 可观测性

python
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("research_agent")

def analyze_with_logging(state: ResearchState) -> dict:
    logger.info(f"开始分析主题: {state['topic']}")
    result = analyze_requirement(state)
    logger.info(f"需求分析完成,结果长度: {len(result['search_results'])}")
    return result

5.4 性能优化

5.5 成本控制


六、常见问题FAQ

Q1:n8n和LangGraph可以配合使用吗?

可以,而且这是推荐的企业架构。 n8n作为"编排层"负责Webhook接收、定时触发、通知推送、数据库写入等外围任务;LangGraph作为"大脑层"处理复杂的LLM推理逻辑。两者通过HTTP API对接:n8n的HTTP Request节点调用LangGraph暴露的FastAPI接口。

python
# LangGraph暴露为FastAPI接口
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()
graph = build_graph()

@app.post("/api/research")
async def run_research(topic: str):
    result = graph.invoke({"topic": topic, ...})
    return JSONResponse(content=result)

Q2:LLM调用失败导致工作流中断怎么办?

n8n:在节点Settings中开启"Continue on Fail",并在下游节点中检查上游是否失败。也可以配置Error Trigger工作流,在主流程失败时自动发送告警通知。

LangGraph:使用 interrupt_beforeinterrupt_after 实现断点续传,或通过try-except包装每个节点函数,确保即使单步失败也不会中断整个流程。

Q3:如何实现"人工审批"环节(Human-in-the-loop)?

n8n:使用 "Wait" 节点暂停工作流,等待外部Webhook回调来恢复执行。适合审批、人工确认等场景。

LangGraph:使用 interrupt 机制:

python
from langgraph.types import interrupt

def human_review_node(state: ResearchState):
    # 工作流在此暂停,等待人工输入
    feedback = interrupt("请审查报告大纲,输入'approve'或修改意见:")
    return {"review_feedback": feedback}

恢复执行时通过 Command(resume=human_input) 传入人工输入。

Q4:工作流执行时间太长,用户体验如何优化?

对于同步等待的场景,建议:

  1. 异步执行 + 回调通知:n8n中工作流改为异步执行,完成后通过邮件/Slack/企微通知用户
  2. 流式输出:LangGraph支持 astream_events 实时推送每个节点的执行状态到前端
  3. 拆分工作流:将长流程拆为多个子工作流,先返回初步结果,后台继续处理

Q5:如何保护Prompt不被逆向工程?

Q6:n8n免费版够用吗?能否用于生产环境?

n8n的Fair-code许可允许自托管免费使用(包括商业用途),但以下功能需要付费:

对于个人项目或小团队,免费版完全可以用于生产环境。对于中大型企业,建议购买n8n Cloud或Self-hosted Enterprise许可证。


结语

AI工作流自动化正在成为企业AI落地的关键桥梁。本文通过两个实战项目展示了两种不同范式的实现路径:

实际项目中,建议根据团队技术栈和场景复杂度灵活选型,甚至将两者组合使用——用n8n处理外围集成和流程编排,用LangGraph处理核心AI推理逻辑。这种架构既兼顾了开发效率,又保证了系统灵活性。

接下来你可以尝试:

  1. 将本文的客服工作流接入你的实际业务系统
  2. 为研究报告Agent接入真实的搜索API(如Tavily)
  3. 探索多Agent协作模式——让不同Agent分别负责研究、写作、审核

持续实践,你会发现AI工作流自动化的边界远比想象中更宽广。


本文首发于 1630.top,转载请注明出处。