AI工作流自动化实战:用n8n与LangGraph搭建企业级智能任务链
用n8n与LangGraph搭建企业级AI工作流,包含可视化客服和多步骤智能体完整实战代码。
# AI工作流自动化实战:用n8n与LangGraph搭建企业级智能任务链
前言
在企业数字化转型过程中,越来越多的团队希望将大语言模型(LLM)能力嵌入到日常业务流程中——智能客服、文档审核、数据提取、自动化报告生成……这些场景的共同特点是:单一LLM调用无法完成,需要多个环节串联、条件分支、人工审批与外部系统集成。这正是"AI工作流自动化"要解决的核心问题。
本文将从概念讲起,经过工具选型对比,带你完成两个完整的实战项目:一个基于 n8n 的可视化AI客服工作流,一个基于 LangGraph 的多步骤Python智能体。所有代码均可直接复制运行。
一、AI工作流核心概念
在动手之前,先厘清几个关键术语:
| 概念 | 说明 |
|---|---|
| Workflow(工作流) | 由多个节点(Node)按有向无环图(DAG)或状态机方式连接而成的任务执行链路 |
| Agent(智能体) | 具备自主决策能力的AI实体,可根据上下文选择调用不同工具或跳转到不同分支 |
| Tool / Function Calling | LLM通过结构化输出来调用外部函数(如查询数据库、发送邮件) |
| State(状态) | 工作流在执行过程中传递和维护的数据结构,每个节点可读写状态 |
| Human-in-the-loop | 在关键节点插入人工审批,确保AI输出可控 |
一个典型的企业AI工作流长这样:
bash
用户输入 → 意图识别(LLM) → [分支A: 查询知识库] / [分支B: 转人工]
↓
结果汇总(LLM) → 格式化输出 → 通知推送
理解这些概念后,我们来看工具选型。
二、工具选型对比:n8n vs Dify vs LangGraph
| 维度 | n8n | Dify | LangGraph |
|---|---|---|---|
| 定位 | 通用自动化平台(支持AI节点) | AI应用开发平台 | Python代码级AI Agent框架 |
| 使用方式 | 可视化拖拽 + 少量JS/表达式 | 可视化编排 + Prompt管理 | 纯Python代码 |
| 学习曲线 | 低(可视化为主) | 低 | 中高(需Python + LangChain基础) |
| 灵活度 | 中(节点能力有限制) | 中(平台封装较多) | 极高(完全自定义) |
| 适合场景 | IT运维、客服、表单处理、定时任务 | 对话机器人、知识库问答 | 复杂多步推理、研究型Agent |
| 部署方式 | Docker自托管 / 云服务 | Docker自托管 / 云服务 | 随Python应用部署 |
| 社区生态 | 400+集成节点 | 国内生态活跃 | LangChain生态 |
选型建议:
- 如果你需要快速上线、团队中非技术人员参与维护 → 选 n8n
- 如果你需要面向终端用户的对话产品 → 选 Dify
- 如果你需要复杂的多步推理和完全自定义控制流 → 选 LangGraph
本文选择 n8n + LangGraph 的组合,覆盖"低门槛快速搭建"和"高灵活度深度定制"两个极端。
三、实战1:用n8n搭建AI客服工作流
3.1 场景描述
搭建一个智能客服工作流:
- 用户通过Webhook发送问题
- AI识别意图(技术支持 / 售前咨询 / 投诉)
- 根据意图路由到不同处理分支
- 调用外部知识库获取答案
- 返回结构化响应
3.2 环境准备
bash
# 使用Docker快速启动n8n
docker run -d \
--name n8n \
-p 5678:5678 \
-v n8n_data:/home/node/.n8n \
-e N8N_BASIC_AUTH_ACTIVE=true \
-e N8N_BASIC_AUTH_USER=admin \
-e N8N_BASIC_AUTH_PASSWORD=your_password \
n8nio/n8n:latest
启动后访问 http://localhost:5678,用配置的账号密码登录。
3.3 工作流节点配置步骤
步骤1:创建Webhook触发节点
bash
http://localhost:5678/webhook/customer-support
- 保存后,n8n会自动生成Webhook URL,格式为:
Webhook节点会接收用户发送的JSON数据,预期格式为:
{"user_id": "u001", "message": "我的订单什么时候发货?"}
步骤2:添加AI意图分类节点
text
你是一个意图分类器。根据用户消息,判断其属于以下哪类意图,仅输出类别名称:
- tech_support(技术支持)
- presale(售前咨询)
- complaint(投诉)
用户消息:{{ $json.body.message }}
- Resource:
Chat - Operation:
Generate - Model:
gpt-4o-mini - System Message:
步骤3:添加Switch路由节点
- Route 1(技术支持):条件
{{ $json.message }} contains 'tech_support' - Route 2(售前咨询):条件
{{ $json.message }} contains 'presale' - Route 3(投诉):条件
{{ $json.message }} contains 'complaint'
n8n的Switch节点在画布上会显示为菱形,每个输出端口对应一个分支,非常直观。
步骤4:添加知识库查询节点(以HTTP Request为例)
每个分支下添加 "HTTP Request" 节点,调用你的知识库API:
bash
Method: POST
URL: https://your-api.example.com/knowledge/search
Body (JSON):
{
"query": "{{ $('Webhook').item.json.body.message }}",
"category": "{{ $json.category }}"
}
Headers:
Authorization: Bearer {{ $env.KB_API_KEY }}
步骤5:添加AI回答生成节点
在每个分支的知识库查询后,添加OpenAI节点生成自然语言回答:
bash
System Message:
你是一个专业的客服助手。根据以下知识库返回的信息,用友好、专业的语气回答用户问题。
如果是投诉类问题,请表达歉意并给出解决方案。
知识库信息:
{{ $json.answer }}
用户原始问题:{{ $('Webhook').item.json.body.message }}
步骤6:添加Respond to Webhook节点
在所有分支汇聚后,添加 "Respond to Webhook" 节点,将AI回答返回给调用方:
bash
Respond With:
{
"status": "success",
"intent": "{{ $json.intent }}",
"reply": "{{ $json.message }}",
"timestamp": "{{ $now.toISO() }}"
}
3.4 完整工作流JSON(可导入n8n)
将以下JSON复制到n8n的导入功能中即可还原完整工作流:
json
{
"name": "AI Customer Support Workflow",
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "customer-support",
"responseMode": "responseNode"
},
"id": "webhook-1",
"name": "Webhook Trigger",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [0, 300]
},
{
"parameters": {
"model": "gpt-4o-mini",
"messages": {
"values": [
{
"content": "你是一个意图分类器。根据用户消息,判断其属于以下哪类意图,仅输出类别名称:\n- tech_support(技术支持)\n- presale(售前咨询)\n- complaint(投诉)\n\n用户消息:{{ $json.body.message }}"
}
]
}
},
"id": "intent-classifier",
"name": "Intent Classifier",
"type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"rules": {
"rules": [
{ "outputKey": "tech", "conditions": { "conditions": [{ "leftValue": "={{ $json.message }}", "rightValue": "tech_support", "operator": { "type": "string", "operation": "contains" } }] } },
{ "outputKey": "presale", "conditions": { "conditions": [{ "leftValue": "={{ $json.message }}", "rightValue": "presale", "operator": { "type": "string", "operation": "contains" } }] } },
{ "outputKey": "complaint", "conditions": { "conditions": [{ "leftValue": "={{ $json.message }}", "rightValue": "complaint", "operator": { "type": "string", "operation": "contains" } }] } }
]
}
},
"id": "switch-1",
"name": "Intent Router",
"type": "n8n-nodes-base.switch",
"typeVersion": 3,
"position": [500, 300]
}
],
"connections": {
"Webhook Trigger": { "main": [[{ "node": "Intent Classifier", "type": "main", "index": 0 }]] },
"Intent Classifier": { "main": [[{ "node": "Intent Router", "type": "main", "index": 0 }]] }
}
}
上述JSON为精简版骨架,实际部署时建议在n8n画布中手动补充知识库查询和回答生成节点,并配置真实的API密钥。
3.5 测试工作流
使用curl命令测试:
bash
curl -X POST http://localhost:5678/webhook/customer-support \
-H "Content-Type: application/json" \
-d '{
"user_id": "u001",
"message": "我的App一直闪退怎么办?"
}'
预期返回:
json
{
"status": "success",
"intent": "tech_support",
"reply": "您好!App闪退通常与缓存积压有关,建议您尝试以下步骤:1) 清除App缓存;2) 更新至最新版本;3) 重启设备后重试。如果问题持续,请提供您的设备型号,我们会进一步排查。",
"timestamp": "2026-07-02T16:30:00.000Z"
}
四、实战2:用LangGraph构建多步骤智能体
4.1 场景描述
构建一个自动化研究报告生成智能体,包含以下步骤:
- 分析需求 → 理解用户的研究主题
- 信息检索 → 调用搜索工具获取资料
- 内容整合 → LLM综合信息并生成大纲
- 章节撰写 → 逐章节展开详细内容
- 质量审查 → LLM自审并优化输出
- 格式化输出 → 生成最终Markdown报告
4.2 环境安装
bash
pip install langgraph langchain-openai langchain-community
4.3 完整代码实现
python
"""
LangGraph 多步骤研究报告智能体
功能:接收研究主题 → 检索信息 → 生成大纲 → 逐章撰写 → 自审优化 → 输出报告
"""
import json
import operator
from typing import TypedDict, Annotated, List, Dict
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain.tools import tool
# ========== 1. 定义状态 ==========
class ResearchState(TypedDict):
"""工作流状态:所有节点共享的数据结构"""
topic: str # 研究主题
search_results: str # 搜索结果原始文本
outline: str # 生成的大纲
sections: Annotated[list, operator.add] # 各章节内容(累加)
final_report: str # 最终报告
review_feedback: str # 审查反馈
iteration_count: int # 审查迭代次数
# ========== 2. 定义工具 ==========
@tool
def search_web(query: str) -> str:
"""
模拟网络搜索(生产环境替换为真实搜索API,如Tavily/SerpAPI)
"""
# 生产环境替换为:
# from langchain_community.utilities import SerpAPIWrapper
# search = SerpAPIWrapper()
# return search.run(query)
mock_results = {
"AI工作流": "2026年AI工作流市场预计增长45%,主要驱动力包括企业数字化转型..."
"n8n是最流行的开源工作流自动化工具,支持400+集成节点..."
"LangGraph成为构建复杂AI Agent的首选框架...",
"大模型应用": "大模型在企业中的应用场景已从客服扩展到代码生成、数据分析..."
"多Agent协作模式正在成为主流架构范式...",
"default": "相关领域正在快速发展,多项研究表明AI技术正在重塑工作方式..."
}
for key, value in mock_results.items():
if key in query:
return value
return mock_results["default"]
# ========== 3. 初始化LLM ==========
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
max_tokens=4096,
)
# ========== 4. 定义工作流节点 ==========
def analyze_requirement(state: ResearchState) -> dict:
"""节点1:分析用户需求,明确研究方向"""
prompt = (
f"你是一个研究分析师。用户想研究以下主题:'{state['topic']}'\n"
"请分析这个主题的:\n"
"1. 核心关键词(用于搜索)\n"
"2. 研究范围和边界\n"
"3. 目标读者\n"
"输出一段简洁的分析说明(100字以内)。"
)
response = llm.invoke([HumanMessage(content=prompt)])
return {"search_results": f"[需求分析] {response.content}"}
def retrieve_information(state: ResearchState) -> dict:
"""节点2:调用搜索工具获取信息"""
topic = state["topic"]
# 构造搜索查询
queries = [
f"{topic} 最新趋势 2026",
f"{topic} 实践案例",
f"{topic} 技术挑战与解决方案",
]
all_results = []
for q in queries:
result = search_web.invoke({"query": q})
all_results.append(f"查询'{q}':\n{result}")
combined = "\n\n".join(all_results)
previous = state.get("search_results", "")
return {"search_results": f"{previous}\n\n[检索结果]\n{combined}"}
def generate_outline(state: ResearchState) -> dict:
"""节点3:基于检索结果生成报告大纲"""
prompt = (
f"基于以下信息,为'{state['topic']}'生成一份结构化报告大纲。\n"
f"信息来源:\n{state['search_results']}\n\n"
"要求:\n"
"1. 包含3-5个主要章节\n"
"2. 每章包含2-3个子节\n"
"3. 大纲以Markdown格式输出\n"
"4. 确保内容逻辑连贯"
)
response = llm.invoke([HumanMessage(content=prompt)])
return {"outline": response.content}
def write_sections(state: ResearchState) -> dict:
"""节点4:按大纲逐章节撰写"""
outline = state["outline"]
search_results = state["search_results"]
prompt = (
f"你是一个专业的内容撰写者。请根据以下大纲和参考资料,撰写完整的报告内容。\n\n"
f"大纲:\n{outline}\n\n"
f"参考资料:\n{search_results}\n\n"
"要求:\n"
"1. 按大纲结构逐章展开\n"
"2. 每章300-500字\n"
"3. 包含具体数据和案例\n"
"4. 以Markdown格式输出完整报告\n"
"5. 语言专业但易懂"
)
response = llm.invoke([HumanMessage(content=prompt)])
# 将报告按章节拆分
sections = response.content.split("\n## ")
section_list = [f"## {s}" for s in sections if s.strip()]
return {"sections": section_list, "final_report": response.content}
def review_and_refine(state: ResearchState) -> dict:
"""节点5:质量审查与优化"""
report = state["final_report"]
iteration = state.get("iteration_count", 0)
prompt = (
f"你是一个严格的内容审核编辑。请审查以下报告并提出改进建议。\n\n"
f"报告内容:\n{report}\n\n"
"审查维度:\n"
"1. 内容准确性:是否有明显错误?\n"
"2. 结构完整性:各章节是否均衡?\n"
"3. 语言质量:表达是否清晰专业?\n"
"4. 信息密度:是否包含足够的数据和案例?\n\n"
"如果需要改进,输出具体的修改建议(Markdown格式)。\n"
"如果已经足够好,输出:'APPROVED'"
)
response = llm.invoke([HumanMessage(content=prompt)])
feedback = response.content
# 如果审查通过或迭代次数超过2次,进入输出
if "APPROVED" in feedback or iteration >= 2:
if iteration >= 2 and "APPROVED" not in feedback:
# 超过迭代上限但仍有反馈,强制通过但记录意见
return {
"review_feedback": feedback,
"iteration_count": iteration + 1,
"final_report": report + f"\n\n---\n*注:以下审查意见待后续优化*\n{feedback}"
}
return {
"review_feedback": feedback,
"iteration_count": iteration + 1,
}
# 如果需要修改,重新生成
refine_prompt = (
f"根据以下审查反馈,优化报告内容。\n\n"
f"原报告:\n{report}\n\n"
f"审查反馈:\n{feedback}\n\n"
"请直接输出优化后的完整报告(Markdown格式)。"
)
refined = llm.invoke([HumanMessage(content=refine_prompt)])
return {
"review_feedback": feedback,
"final_report": refined.content,
"iteration_count": iteration + 1,
}
def format_output(state: ResearchState) -> dict:
"""节点6:最终格式化"""
report = state["final_report"]
topic = state["topic"]
final = f"""# {topic} - 研究报告
> 本报告由AI智能体自动生成
> 生成时间:2026-07-02
> 审查迭代次数:{state.get('iteration_count', 0)}
---
{report}
---
*报告结束*
"""
return {"final_report": final}
# ========== 5. 条件路由:决定是否需要再次审查 ==========
def should_continue_review(state: ResearchState) -> str:
"""审查后决定下一步:继续审查 or 进入输出"""
feedback = state.get("review_feedback", "")
iteration = state.get("iteration_count", 0)
if "APPROVED" in feedback or iteration >= 2:
return "format_output"
return "review_and_refine"
# ========== 6. 构建状态图 ==========
def build_graph():
"""构建LangGraph工作流"""
workflow = StateGraph(ResearchState)
# 添加节点
workflow.add_node("analyze", analyze_requirement)
workflow.add_node("retrieve", retrieve_information)
workflow.add_node("outline", generate_outline)
workflow.add_node("write", write_sections)
workflow.add_node("review", review_and_refine)
workflow.add_node("format", format_output)
# 设置入口
workflow.set_entry_point("analyze")
# 添加边(线性流程)
workflow.add_edge("analyze", "retrieve")
workflow.add_edge("retrieve", "outline")
workflow.add_edge("outline", "write")
workflow.add_edge("write", "review")
# 审查后条件路由
workflow.add_conditional_edges(
"review",
should_continue_review,
{
"review_and_refine": "review",
"format_output": "format",
}
)
workflow.add_edge("format", END)
return workflow.compile()
# ========== 7. 执行 ==========
if __name__ == "__main__":
import os
# 设置API Key
os.environ["OPENAI_API_KEY"] = "your-openai-api-key-here"
# 构建并执行工作流
graph = build_graph()
# 初始状态
initial_state = {
"topic": "AI工作流自动化在企业中的应用",
"search_results": "",
"outline": "",
"sections": [],
"final_report": "",
"review_feedback": "",
"iteration_count": 0,
}
print("=" * 60)
print("开始执行研究报告生成工作流...")
print("=" * 60)
# 执行并打印每个节点的输出
for event in graph.stream(initial_state):
for node_name, node_output in event.items():
print(f"\n{'─' * 40}")
print(f"[节点] {node_name}")
print(f"{'─' * 40}")
# 只打印关键信息,避免过长输出
if "outline" in node_output and node_output["outline"]:
preview = node_output["outline"][:200]
print(f"大纲预览: {preview}...")
if "review_feedback" in node_output and node_output["review_feedback"]:
print(f"审查反馈: {node_output['review_feedback'][:150]}...")
if "iteration_count" in node_output:
print(f"迭代次数: {node_output['iteration_count']}")
# 打印最终报告
print("\n" + "=" * 60)
print("最终报告已生成!")
print("=" * 60)
print(graph.invoke(initial_state)["final_report"])
4.4 代码结构说明
bash
工作流执行流程:
analyze → retrieve → outline → write → review ─┬→ review (循环优化,最多2次)
└→ format → END
关键设计点:
- 状态管理:通过
ResearchStateTypedDict 统一管理所有节点间传递的数据 - 累加操作:
sections字段使用Annotated[list, operator.add]实现列表累加 - 条件路由:
should_continue_review函数决定审查后是继续优化还是输出 - 迭代上限:通过
iteration_count防止无限循环,最多迭代2次 - 工具封装:
@tool装饰器将函数注册为LangChain工具,方便后续扩展
4.5 运行方式
bash
# 设置API Key
export OPENAI_API_KEY="sk-your-key-here"
# 运行
python research_agent.py
程序会依次执行每个节点,最终输出一份完整的Markdown格式研究报告。
五、生产部署注意事项
5.1 安全与权限控制
- API Key管理:使用环境变量或密钥管理服务(如HashiCorp Vault),严禁硬编码
- 输入校验:在Webhook入口处做schema验证,防止注入攻击
- 输出过滤:对LLM输出做敏感信息脱敏处理
python
# n8n中可在Webhook节点前添加"Set"节点做校验
# 或在LangGraph入口节点添加校验逻辑
def validate_input(state: ResearchState) -> dict:
if not state["topic"] or len(state["topic"]) > 500:
raise ValueError("研究主题不能为空且不超过500字符")
return state
5.2 错误处理与重试
python
# n8n:在节点设置中配置"Retry on Fail",推荐3次,指数退避
# LangGraph:使用try-except包装节点函数
def safe_retrieve(state: ResearchState) -> dict:
try:
return retrieve_information(state)
except Exception as e:
# 记录错误并返回降级结果
return {
"search_results": f"[检索失败: {str(e)}] 请使用已有信息继续分析。",
}
5.3 可观测性
- n8n:内置Execution Log,可查看每次工作流的完整执行记录和耗时
- LangGraph:使用LangSmith进行追踪,或手动记录日志
python
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("research_agent")
def analyze_with_logging(state: ResearchState) -> dict:
logger.info(f"开始分析主题: {state['topic']}")
result = analyze_requirement(state)
logger.info(f"需求分析完成,结果长度: {len(result['search_results'])}")
return result
5.4 性能优化
- 对LLM调用启用流式输出,减少首字延迟
- 高频工作流考虑缓存LLM响应(语义缓存)
- n8n中善用
Split in Batches节点处理大批量数据 - LangGraph中可使用
RunnableLambda并行化独立步骤
5.5 成本控制
- 意图分类等简单任务使用
gpt-4o-mini,复杂推理使用gpt-4o - 设置Token使用上限,避免异常调用导致费用飙升
- n8n中可在Settings → Executions中配置每日执行上限
六、常见问题FAQ
Q1:n8n和LangGraph可以配合使用吗?
可以,而且这是推荐的企业架构。 n8n作为"编排层"负责Webhook接收、定时触发、通知推送、数据库写入等外围任务;LangGraph作为"大脑层"处理复杂的LLM推理逻辑。两者通过HTTP API对接:n8n的HTTP Request节点调用LangGraph暴露的FastAPI接口。
python
# LangGraph暴露为FastAPI接口
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI()
graph = build_graph()
@app.post("/api/research")
async def run_research(topic: str):
result = graph.invoke({"topic": topic, ...})
return JSONResponse(content=result)
Q2:LLM调用失败导致工作流中断怎么办?
n8n:在节点Settings中开启"Continue on Fail",并在下游节点中检查上游是否失败。也可以配置Error Trigger工作流,在主流程失败时自动发送告警通知。
LangGraph:使用 interrupt_before 和 interrupt_after 实现断点续传,或通过try-except包装每个节点函数,确保即使单步失败也不会中断整个流程。
Q3:如何实现"人工审批"环节(Human-in-the-loop)?
n8n:使用 "Wait" 节点暂停工作流,等待外部Webhook回调来恢复执行。适合审批、人工确认等场景。
LangGraph:使用 interrupt 机制:
python
from langgraph.types import interrupt
def human_review_node(state: ResearchState):
# 工作流在此暂停,等待人工输入
feedback = interrupt("请审查报告大纲,输入'approve'或修改意见:")
return {"review_feedback": feedback}
恢复执行时通过 Command(resume=human_input) 传入人工输入。
Q4:工作流执行时间太长,用户体验如何优化?
对于同步等待的场景,建议:
- 异步执行 + 回调通知:n8n中工作流改为异步执行,完成后通过邮件/Slack/企微通知用户
- 流式输出:LangGraph支持
astream_events实时推送每个节点的执行状态到前端 - 拆分工作流:将长流程拆为多个子工作流,先返回初步结果,后台继续处理
Q5:如何保护Prompt不被逆向工程?
- 核心Prompt存储在服务端(n8n Credential / 环境变量),不经过前端
- n8n工作流JSON导出时注意Prompt会被包含在内,分享前做脱敏
- LangGraph的Prompt直接写在代码中,跟随代码仓库的访问权限管理
- 可考虑对Prompt进行加密存储,运行时解密
Q6:n8n免费版够用吗?能否用于生产环境?
n8n的Fair-code许可允许自托管免费使用(包括商业用途),但以下功能需要付费:
- SSO单点登录(企业必需)
- 多用户权限管理
- 环境变量管理
对于个人项目或小团队,免费版完全可以用于生产环境。对于中大型企业,建议购买n8n Cloud或Self-hosted Enterprise许可证。
结语
AI工作流自动化正在成为企业AI落地的关键桥梁。本文通过两个实战项目展示了两种不同范式的实现路径:
- n8n:低代码、可视化、快速上线,适合标准化业务流程
- LangGraph:纯代码、高灵活度、完全可控,适合复杂推理场景
实际项目中,建议根据团队技术栈和场景复杂度灵活选型,甚至将两者组合使用——用n8n处理外围集成和流程编排,用LangGraph处理核心AI推理逻辑。这种架构既兼顾了开发效率,又保证了系统灵活性。
接下来你可以尝试:
- 将本文的客服工作流接入你的实际业务系统
- 为研究报告Agent接入真实的搜索API(如Tavily)
- 探索多Agent协作模式——让不同Agent分别负责研究、写作、审核
持续实践,你会发现AI工作流自动化的边界远比想象中更宽广。
本文首发于 1630.top,转载请注明出处。