元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
LangGraph基础概念
什么是LangGraph
核心特性解析
典型应用场景
▶
快速入门指南
环境安装配置
第一个LangGraph应用
执行流程演示
▶
核心组件解析
图结构基础
节点(Node)详解
边(Edge)的类型
执行引擎原理
路由策略配置
状态容器使用
错误处理机制
输入输出管道
配置管理系统
发布时间:
2025-04-01 15:12
↑
☰
# 执行流程演示 ## LangGraph执行流程详解 本文将通过一个实际示例,详细演示LangGraph的执行流程,帮助您深入理解LangGraph如何处理状态、执行节点以及控制应用流程。我们将构建一个多步骤的数据分析助手,展示LangGraph的执行机制。 ## 示例应用:数据分析助手 我们将创建一个数据分析助手,它能够: 1. 接收用户的数据分析请求 2. 分析请求并制定分析计划 3. 执行数据处理步骤 4. 生成分析结果 5. 根据需要进行结果优化 ## 完整代码 首先,让我们看一下完整的代码实现: ```python from typing import TypedDict, List, Dict, Any, Literal, Optional from langgraph.graph import StateGraph, END from langchain_openai import ChatOpenAI from langchain.schema import HumanMessage, AIMessage, SystemMessage import json import time # 定义状态类型 class AnalysisState(TypedDict): messages: List[Dict[str, Any]] # 对话历史 query: str # 用户查询 plan: Optional[List[str]] # 分析计划 current_step: int # 当前执行步骤 results: Optional[Dict[str, Any]] # 分析结果 feedback: Optional[str] # 结果反馈 # 初始化LLM llm = ChatOpenAI(model="gpt-3.5-turbo") # 节点1: 理解用户查询 def understand_query(state: AnalysisState) -> AnalysisState: print("\n执行节点: 理解用户查询") query = state["query"] messages = [ SystemMessage(content="你是一个数据分析助手。请理解用户的分析需求。"), HumanMessage(content=query) ] response = llm.invoke(messages) return {"messages": state["messages"] + [{"role": "assistant", "content": response.content}]} # 节点2: 制定分析计划 def create_plan(state: AnalysisState) -> AnalysisState: print("\n执行节点: 制定分析计划") messages = [ SystemMessage(content="你是一个数据分析专家。根据用户的需求,制定一个分步骤的数据分析计划。以JSON格式返回计划,格式为["步骤1", "步骤2", ...]。"), HumanMessage(content=state["query"]) ] response = llm.invoke(messages) # 解析计划 try: # 提取JSON部分 json_content = response.content if "```json" in json_content: json_content = json_content.split("```json")[1].split("```")[0].strip() elif "```" in json_content: json_content = json_content.split("```")[1].split("```")[0].strip() plan = json.loads(json_content) except Exception as e: # 如果解析失败,使用简单的文本分割 plan = [step.strip() for step in response.content.split("\n") if step.strip()] return { "plan": plan, "current_step": 0, "messages": state["messages"] + [{"role": "assistant", "content": f"分析计划已制定: {plan}"}] } # 节点3: 执行分析步骤 def execute_step(state: AnalysisState) -> AnalysisState: current_step = state["current_step"] plan = state["plan"] if current_step >= len(plan): return state step_description = plan[current_step] print(f"\n执行节点: 执行分析步骤 {current_step + 1}/{len(plan)} - {step_description}") # 模拟执行步骤 time.sleep(1) # 模拟处理时间 messages = [ SystemMessage(content=f"你是一个数据分析专家。执行以下分析步骤并返回结果: {step_description}"), HumanMessage(content=state["query"]) ] response = llm.invoke(messages) return { "current_step": current_step + 1, "messages": state["messages"] + [{ "role": "assistant", "content": f"步骤 {current_step + 1} 执行结果: {response.content}" }] } # 节点4: 生成分析结果 def generate_results(state: AnalysisState) -> AnalysisState: print("\n执行节点: 生成分析结果") # 收集所有步骤的结果 step_results = [msg["content"] for msg in state["messages"] if msg["role"] == "assistant" and "步骤" in msg["content"]] messages = [ SystemMessage(content="你是一个数据分析专家。根据以下分析步骤的结果,生成最终的分析报告。"), HumanMessage(content=f"原始查询: {state['query']}\n\n步骤结果:\n{'\n'.join(step_results)}") ] response = llm.invoke(messages) results = {"summary": response.content} return { "results": results, "messages": state["messages"] + [{"role": "assistant", "content": f"分析结果: {response.content}"}] } # 节点5: 评估结果质量 def evaluate_results(state: AnalysisState) -> AnalysisState: print("\n执行节点: 评估结果质量") results = state["results"] messages = [ SystemMessage(content="你是一个数据分析质量评估专家。评估以下分析结果的质量,并给出改进建议。"), HumanMessage(content=f"原始查询: {state['query']}\n\n分析结果: {results['summary']}") ] response = llm.invoke(messages) return { "feedback": response.content, "messages": state["messages"] + [{"role": "assistant", "content": f"质量评估: {response.content}"}] } # 节点6: 路由决策 def router(state: AnalysisState) -> Literal["execute_step", "generate_results", "refine_results", "end"]: # 如果还有步骤未执行,继续执行 if state["current_step"] < len(state["plan"]): return "execute_step" # 如果没有结果,生成结果 if "results" not in state or state["results"] is None: return "generate_results" # 如果有反馈但没有优化过,优化结果 if "feedback" in state and state["feedback"] and "refined" not in state["results"]: return "refine_results" # 否则结束 return "end" # 节点7: 优化结果 def refine_results(state: AnalysisState) -> AnalysisState: print("\n执行节点: 优化结果") feedback = state["feedback"] original_results = state["results"]["summary"] messages = [ SystemMessage(content="你是一个数据分析专家。根据反馈优化分析结果。"), HumanMessage(content=f"原始结果: {original_results}\n\n反馈: {feedback}\n\n请提供优化后的结果。") ] response = llm.invoke(messages) # 更新结果 refined_results = state["results"].copy() refined_results["refined"] = response.content return { "results": refined_results, "messages": state["messages"] + [{"role": "assistant", "content": f"优化结果: {response.content}"}] } # 创建图结构 graph = StateGraph(AnalysisState) # 添加节点 graph.add_node("understand_query", understand_query) graph.add_node("create_plan", create_plan) graph.add_node("execute_step", execute_step) graph.add_node("generate_results", generate_results) graph.add_node("evaluate_results", evaluate_results) graph.add_node("refine_results", refine_results) # 添加边 graph.add_edge("understand_query", "create_plan") graph.add_edge("create_plan", "execute_step") graph.add_conditional_edges( "execute_step", router, { "execute_step": "execute_step", "generate_results": "generate_results", "refine_results": "refine_results", "end": END } ) graph.add_edge("generate_results", "evaluate_results") graph.add_conditional_edges( "evaluate_results", router, { "execute_step": "execute_step", "generate_results": "generate_results", "refine_results": "refine_results", "end": END } ) graph.add_conditional_edges( "refine_results", router, { "execute_step": "execute_step", "generate_results": "generate_results", "refine_results": "refine_results", "end": END } ) # 设置入口节点 graph.set_entry_point("understand_query") # 编译图 app = graph.compile() # 执行图 def run_analysis(query: str): # 初始化状态 initial_state = { "messages": [], "query": query, "current_step": 0 } # 执行图 for state in app.stream(initial_state): # 打印当前状态 if "results" in state and state["results"] is not None: if "refined" in state["results"]: print(f"\n最终优化结果:\n{state['results']['refined']}") else: print(f"\n最终结果:\n{state['results']['summary']}") return state # 示例运行 if __name__ == "__main__": query = "分析过去一个月的销售数据,找出最畅销的产品类别和增长最快的区域。" final_state = run_analysis(query) ``` ## 执行流程详解 让我们详细解析LangGraph的执行流程: ### 1. 图结构定义 在LangGraph中,我们通过定义节点和边来构建应用流程: - **节点定义**:每个节点是一个函数,接收当前状态并返回更新后的状态。 - **边定义**:边定义了节点之间的连接关系,决定了执行流程。 - **条件路由**:通过router函数实现条件路由,根据状态动态决定下一步执行哪个节点。 ### 2. 状态管理 状态是LangGraph的核心概念,它在整个执行过程中传递和更新: - **状态定义**:使用TypedDict定义状态结构,确保类型安全。 - **状态更新**:每个节点函数返回部分状态更新,LangGraph自动合并这些更新。 - **状态流转**:状态在节点之间流转,保持应用的上下文信息。 ### 3. 执行机制 LangGraph的执行机制包括: - **图编译**:通过`graph.compile()`将定义好的图结构编译成可执行的应用。 - **流式执行**:使用`app.stream()`方法流式执行图,每次状态更新都会产生一个新的状态。 - **终止条件**:当路由返回`END`常量时,执行终止。 ### 4. 动态决策 我们的示例展示了LangGraph强大的动态决策能力: - **条件路由**:根据当前执行进度和结果质量,动态决定下一步操作。 - **循环执行**:通过路由回到同一节点实现循环,如执行多个分析步骤。 - **分支选择**:根据状态条件选择不同的执行路径。 ## 实际运行示例 当我们运行上面的示例代码,执行流程如下: 1. **理解用户查询**:分析用户的数据分析需求。 2. **制定分析计划**:根据需求生成步骤化的分析计划。 3. **执行分析步骤**:逐步执行每个分析步骤,直到所有步骤完成。 4. **生成分析结果**:汇总所有步骤的结果,生成最终分析报告。 5. **评估结果质量**:评估分析结果的质量,提供改进建议。 6. **优化结果**(如需要):根据评估反馈优化分析结果。 ### 示例输出 执行示例查询后,可能的输出如下: ``` 执行节点: 理解用户查询 执行节点: 制定分析计划 执行节点: 执行分析步骤 1/4 - 收集过去一个月的销售数据 执行节点: 执行分析步骤 2/4 - 按产品类别分析销售量 执行节点: 执行分析步骤 3/4 - 按区域分析销售增长率 执行节点: 执行分析步骤 4/4 - 识别最畅销产品类别和增长最快区域 执行节点: 生成分析结果 执行节点: 评估结果质量 执行节点: 优化结果 最终优化结果: 根据过去一个月的销售数据分析,我们发现: 1. 最畅销产品类别:电子产品,特别是智能手机和笔记本电脑,占总销售额的42%。 2. 增长最快的区域:华东地区,环比增长率达到23.5%,主要由上海和杭州市场驱动。 3. 值得注意的趋势: - 家居产品类别虽然排名第三,但增长率达到18%,显示出强劲的上升势头。 - 西南地区的二线城市市场渗透率提升显著,增长率为19.8%。 4. 建议行动: - 增加电子产品的库存,特别是畅销型号。 - 在华东地区增加营销投入,扩大市场份额。 - 关注家居产品类别的发展潜力,考虑扩大产品线。 - 制定针对西南地区二线城市的专项营销策略。 ``` ## 总结 LangGraph提供了一种强大而灵活的方式来构建复杂的AI应用流程: 1. **模块化设计**:将复杂流程拆分为独立节点,便于开发和维护。 2. **状态管理**:自动处理状态传递和更新,简化开发。 3. **动态决策**:支持条件路由和动态执行路径,实现智能流程控制。 4. **可视化潜力**:图结构便于可视化和理解应用流程。 通过LangGraph,我们可以构建更复杂、更智能的AI应用,实现超越简单问答的交互体验。