多智能体系统(MAS)通过多个自主智能体的交互来解决复杂的任务,在现代人工智能领域具有广泛的应用前景。本文将深入探讨如何使用 LangGraph 库在 Python 中实现一个“计划与执行”(Plan and Execute)的框架。在此过程中,我们将解释一些重要的概念,包括 Agent(智能体)、State(状态)、Graph(图)、如何构建和更新状态、以及如何在图中定义节点和边(条件与非条件)。通过逐步搭建该框架,您将了解如何使用LangGraph实现复杂的多智能体任务。

一、“Plan-and-Execute” 框架的诞生

1、 发明背景与动机

“Plan-and-Execute” 框架的概念最早可以追溯到20世纪70年代,源自人工智能(AI)和机器人研究中的任务规划问题。在此之前,大多数 AI 系统采用反应式(Reactive)模型,即智能体根据即时环境的变化直接做出决策。这种方式虽然简单,但面对复杂任务时常常会遇到瓶颈,主要表现为以下问题:

  • 缺乏全局视野:反应式系统只能处理当前状态和条件,而不能为长期目标进行合理的规划。复杂任务往往需要多步骤的执行,如果每个步骤都基于即时反应,系统容易陷入局部最优,难以达到全局最优。
  • 难以应对任务依赖:许多复杂任务具有相互依赖的多个步骤,比如在机器人任务中,先需要抓取物体,再进行搬运;在无人驾驶中,先需要规划路线,再执行。这些任务要求系统具备对多个子任务的全局规划能力,而不仅仅是对当前步骤的即时反应。

为了解决这些问题,研究者提出了将任务分为计划(Plan)和执行(Execute)两部分的思路这样,智能体可以在执行任务前先全局规划,再逐步执行计划,并在执行过程中根据环境变化进行适当的调整。此框架的诞生是为了在动态环境中执行复杂任务时,提供一种兼具全局规划性局部调整性的解决方案。

2、 关键发展历程

1970年代是自动规划领域的重要时期。当时,人工智能研究者发明了STRIPS(Stanford Research Institute Problem Solver)规划算法,这是早期用于任务规划的经典模型。STRIPS 能够定义智能体的初始状态目标状态以及达到目标的操作集合。通过这个方法,AI 可以自动生成从初始状态到目标状态的计划,确定执行步骤。

然而,STRIPS 以及早期的规划模型存在一些限制:

  • 静态假设:这些早期系统假设环境是静态且可预测的。一旦生成计划,环境不发生变化,系统就按照计划执行。这种假设在动态环境中显然不现实。
  • 执行与规划分离不彻底:这些系统的执行过程无法有效应对突发情况或环境的变化。因此,虽然它们能够制定详细的计划,但在遇到障碍或变化时很容易失败。

1980年代到1990年代,AI领域开始注重执行控制(Execution Monitoring)和反馈机制的发展。研究人员意识到,单纯的计划是不够的,系统还需要能够实时感知环境,并根据感知结果调整计划,这直接推动了“Plan-and-Execute”框架的演化。此时,许多新的框架和理论提出了以下重要改进:

  • 计划执行的闭环系统:系统在执行过程中不断监控执行情况,若发现与预期有偏差,可以重新规划或调整。
  • 实时反应与计划的结合:通过结合反应式和规划式系统,系统不仅可以根据当前环境做出即时反应,还能参考长期目标。

其中,著名的框架包括PRS(Procedural Reasoning System),它是一种能够根据当前任务环境动态调整计划的系统模型。这种系统已经具备了“Plan-and-Execute”框架的雏形,并在机器人控制、军事模拟等领域得到了广泛应用。

3、 为什么“Plan-and-Execute”框架重要?

在实际任务中,复杂系统不仅需要对未来进行合理规划,还要具备灵活应对突发事件的能力。这就是“Plan-and-Execute”框架的核心价值所在:它能够在静态规划和动态执行之间找到平衡。

  • 高效管理复杂任务:通过预先的全局规划,系统能够清晰地定义任务的多个步骤,以及任务之间的依赖关系。这对于那些多步骤、多阶段的任务尤其关键,能够减少任务的重复执行或错误步骤。
  • 应对动态环境:在执行过程中,框架允许智能体根据外部反馈信息重新规划,确保任务顺利完成。例如,在无人机任务中,若路径上出现障碍物,无人机能够依据当前环境重新规划飞行路线,而不需要从头开始。
  • 避免计算瓶颈:相比传统的反应式系统,“Plan-and-Execute”模型通过将规划与执行分离,减少了即时计算的压力。系统可以事先计算出一条路径或步骤链,然后在执行时只做微调,而不是每一步都需要复杂计算。

4、 现代应用

随着多智能体系统(MAS)和机器人控制的飞速发展,“Plan-and-Execute”框架被广泛应用于各类复杂任务中:

  • 机器人控制与自动化:在制造和物流领域,机器人往往需要执行复杂的任务链。通过“Plan-and-Execute”框架,机器人可以提前规划好任务步骤(如取货、装配、搬运等),并在实际执行过程中根据传感器数据实时调整。
  • 无人机编队:无人机任务执行中需要对飞行路线和任务顺序进行精确规划。通过这个框架,系统可以生成完整的飞行路线并逐步执行。如果遇到突发情况(如天气变化、障碍物出现),无人机能够动态调整任务。
  • 自动驾驶:自动驾驶汽车不仅要规划路线,还要应对复杂的交通状况。通过“Plan-and-Execute”模型,车辆可以预先规划路径,并在执行过程中根据实时交通情况(如前方堵车、路障等)调整行驶策略。
  • 游戏与模拟系统:在许多复杂的游戏或仿真系统中,角色需要按照预定任务执行一系列行为。这个框架允许角色进行高层次的任务规划,并根据游戏中的动态变化灵活调整策略。

二、核心概念

上文,我们对Plan-and-Execute框架有了一个初步的认识,在进入正式的代码实现之前。我们还需要了解一些相关的核心概念~

1、什么是 Agent(智能体)?

专业解释: 智能体的本质在于其自主性和交互性,它们根据感知的信息自主做出决策。多智能体系统中的智能体可以是合作的(为共同目标工作),也可以是竞争的(如对手关系)。在计算机科学中,Agent 通常指能够根据内部模型、规则或学习算法自主做出反应的程序或实体。

笔者总结: Agent(智能体)可以简单理解为一种“自主执行任务的小程序”。这些程序具备独立感知环境、做出决策并执行特定任务的能力。你可以把它想象成一位自动工作的助手,它不仅能自己处理问题,还能和其他助手合作或竞争,共同完成任务。

Agent 的核心能力包括:

  • 感知:智能体能通过“传感器”或输入信息,感知周围的环境变化,比如检测物体、接收数据等。
  • 决策:根据感知到的信息,智能体自主选择下一步行动,比如移动、与其他智能体互动或更新数据。
  • 执行:智能体根据决策采取行动,如调整位置、发送消息或改变周围环境。
  • 学习与适应:一些智能体通过学习和积累经验,不断优化自身行为。

简化类比: 你可以把智能体看作自动驾驶汽车,汽车感知道路、交通状况和障碍物,然后自主决定加速、减速或转弯,所有这些决策都是根据当前的环境和目的地来做出的。

2、什么是 State(状态)?

专业解释: 在多智能体系统中,状态是智能体的一个核心组成部分,表示智能体在某一时刻的内部信息集合。状态的更新依赖于智能体的感知与外界交互,它可以包含多个方面的信息,例如任务进度、资源状况、以及智能体对环境的理解。状态在多智能体系统中的重要性在于它决定了智能体的行动方向,并通过与环境的不断交互来进行更新​。

笔者总结: State(状态)是智能体当前的“情况描述”。它是某一时刻智能体所感知到的所有信息的集合,包括当前的任务、环境和资源情况。状态会随着时间推移不断变化,这也是智能体根据环境做出决策的基础。

状态的动态变化:

  • 感知更新:智能体感知到周围的变化,比如遇到障碍物,收到新的指令。
  • 内部状态更新:智能体根据感知到的信息调整自己,例如改变任务进度或路径。
  • 决策与行动:基于更新的状态,智能体做出新的决策,并继续执行任务。

简化类比: 想象你在导航软件中查看驾驶路线,导航的状态是实时更新的。如果前方出现交通堵塞,导航会重新规划路线,更新后的状态会影响接下来的驾驶决策。

3、什么是 Graph(图)?

专业解释: 图是多智能体系统中的基础数据结构之一。通过图,开发者可以直观地描述智能体在任务执行中的状态和状态转换。图中的节点代表任务的不同阶段或智能体的当前状态,边则表示智能体完成某项任务或满足某个条件后的状态转换。图的有向特性使得任务规划中的路径选择更加清晰,而无环图(DAG)在确保任务按照顺序执行方面尤为重要。

笔者总结: Graph(图)是一种非常实用的数据结构,通常用来表示任务的流程或决策路径。一个图由节点(Node)和边(Edge)组成,节点代表不同的状态或目标,边表示从一个状态到另一个状态的可能路径。

图的构建:

  • 节点:代表智能体的某个状态或任务。例如,一个机器人完成某个子任务后进入新的状态。
  • 边:表示状态之间的转换。边可以是有条件的(需要满足某个条件才能转换)或无条件的(自由转换)。

简化类比: 如果把任务比作一张地图,节点就是地图上的不同地点,边就是从一个地点到另一个地点的道路。有些道路可能需要特定条件才能通行(如道路畅通),而有些道路则可以随时使用。

4、什么是有向无环图(DAG)?

专业解释: 有向无环图(DAG)在多智能体系统中广泛应用于任务调度与规划。DAG 中的每条边都有明确的方向,并且没有回路,这确保了任务依赖关系的线性执行。在多任务环境中,DAG 可以清晰地表示任务之间的依赖关系,确保先完成必要的子任务,再执行后续任务。例如,在机器人操作中,DAG 可以用于确保机器人在到达目标前按顺序完成所有步骤

笔者总结: 在构建多智能体系统时,我们常用有向无环图(DAG,Directed Acyclic Graph)来表示任务流程。有向无环图是一种特殊的图,它的边有方向性,并且不存在从一个节点回到自身的循环路径。

DAG 的关键特性:

  • 有向性:图中的边有方向,表示任务从一个状态向另一个状态推进。
  • 无环性:图中没有从一个节点出发经过若干步骤后回到原点的路径,确保任务顺序执行而不会出现死循环。

简化类比: DAG 可以看作是一个“计划表”,你按照规定的顺序一步步执行任务,不会重复回到已经完成的任务。

5、LangGraph 库概述

专业解释: LangGraph 是多智能体系统的任务管理工具,通过图形结构为智能体提供明确的状态和任务规划。该库的核心功能是允许开发者使用 Python 定义图结构,并结合智能体的感知和决策机制,实现复杂任务的分解和执行。它简化了智能体任务的管理,确保智能体能够有效执行计划中的任务,并根据环境变化动态调整

笔者总结: LangGraph 是一个专为 Python 设计的库,帮助开发者轻松构建多智能体系统中的任务规划和图结构。通过 LangGraph,开发者可以为智能体定义任务状态和转换路径,确保智能体能够根据条件逐步完成任务。

核心功能:

  • 任务图结构:开发者可以通过节点和边构建智能体的任务流,定义状态间的转换规则。
  • 条件与无条件边:LangGraph 支持有条件和无条件的状态转换,确保任务执行的灵活性。
  • 任务执行模型:通过图模型,智能体可以在不同状态之间切换,按照规划路径完成任务。

6、什么是 Plan-and-Execute?

专业解释: Plan-and-Execute 模型将任务执行过程分为两个主要部分:规划和执行。在规划阶段,智能体根据任务目标生成一个多步骤的计划,所有子任务明确标记出要完成的步骤。然后在执行阶段,智能体逐个执行计划中的任务,并通过执行结果对计划进行动态更新和调整。这种模型的灵活性体现在能够处理任务变化或外界干扰,同时确保任务的最终顺利完成。

笔者总结: Plan-and-Execute 是一种用于任务分解和执行的智能体模型,其核心思想是先生成一个完整的多步骤计划(Plan),然后通过代理逐步执行每一项任务(Execute)。在每个任务执行完成后,智能体会对计划进行状态更新,并根据任务结果动态调整计划,确保任务按计划逐步完成。

所图所示:

Plan-and-Execute 的特点:

  • 长期规划:通过事先制定的计划,智能体能够更好地管理复杂、多阶段任务。它比简单的逐步反应式模型(ReAct)更有组织性,适合需要多步骤且相互依赖的任务。
  • 灵活的任务调整:在每一步任务执行后,Plan-and-Execute 模型会根据完成的结果更新状态,并决定是否需要修改后续的计划。这种动态调整的能力使它适应不断变化的环境。
  • 分布式计算与资源节约:Plan-and-Execute 可以在不同阶段使用不同强度的模型。例如,规划时可以调用强大的大模型,执行阶段则可以用较轻量的模型,从而优化资源使用。

在实际应用中,Plan-and-Execute 代理能通过规划步骤解决复杂任务,每个任务完成后,智能体会更新任务状态,并在必要时重新规划任务。这种方法特别适用于需要多步骤、长期规划的任务,例如信息查询、多阶段问题解决等。

三、搭建官网plan_and_execute的示例demo

在本节中,我们将展示如何使用 LangGraph 库来构建一个简化的“计划与执行”系统。这个系统中的智能体能够根据任务规划在不同的状态之间进行转换,直到完成其目标任务。你可以从官方文档中查看详细的代码示例:Plan_and_Eexcute。接下来我们将详细介绍这个示例的核心步骤和实现逻辑。

1、项目整体架构

笔者根据官网示例,进行了不同文件的划分。其中agent.py中定义了在线搜索等功能性结点,graph.py中定义了不同结点之间边的连接关系, plan.py中定义了Plan_and_Execute的plan以及replan结点。

2、 环境配置与基础设置

首先,我们需要安装必要的软件包:

pip install --quiet -U langgraph langchain-community langchain-openai tavily-python

然后,需要设置 OpenAI, Tavily, LangSmith的API密钥,用于处理语言模型调用,搜索功能和监控LLM 应用。创建.env文件:

OPENAI_API_KEY=sk-proj-xxx
TAVILY_API_KEY=tvly-xxx
LANGSMITH_API_KEY=lsv2_sk_xxx

3、agent.py——自定义Agent结点

在这个示例中,我们使用 Tavily 搜索工具作为执行代理的工具之一。

1)环境变量的加载

from dotenv import load_dotenv
load_dotenv()

这部分代码用于加载环境变量文件(.env),其中可能存储了敏感信息,如 API 密钥。这些密钥在运行时会从环境变量中获取,而不是硬编码在代码中,确保了安全性。

2)引入依赖

from langchain import hub  
from langchain_openai import ChatOpenAI  
from langgraph.prebuilt import create_react_agent  
from langchain_community.tools.tavily_search import TavilySearchResults  

这里引入了几个模块和库:

  • langchain:一个用于处理链式代理和自然语言处理任务的库。
  • langchain_openai:OpenAI 的模型接口,我们在这里使用它来创建 GPT-4 模型实例。
  • langgraph.prebuilt:提供预定义的 Agent 相关功能,其中 create_react_agent 函数用于创建一个可以执行工具操作的代理。
  • langchain_community.tools.tavily_search:这是一个工具库,提供 Tavily 搜索功能,它允许你进行网络搜索,并返回相关的搜索结果。

3)定义搜索工具

tools = [TavilySearchResults(max_results=3)] 

在这里,我们实例化了 TavilySearchResults,这是一个用于进行网络搜索的工具。设置 max_results=3 意味着 Tavily 搜索每次最多返回 3 个搜索结果。这个工具将被用在代理中。

4)提示语(Prompt)

prompt = "你是一个有用的Agent"

我们为代理定义了一个简单的提示语(即 prompt),告诉代理 "你是一个有用的Agent"。这种提示语可以是引导代理行为的有效方式,尤其是通过大语言模型驱动的系统时,它有助于框定模型的行为和回答风格。

5)创建 LLM 实例

llm = ChatOpenAI(model="gpt-4-turbo-preview")

我们实例化了 ChatOpenAI 类,使用了 gpt-4-turbo-preview 模型。这个模型基于 OpenAI 的 GPT-4,能够处理复杂的语言任务。你可以根据需求切换不同版本的模型,但 GPT-4 通常能提供较好的性能,尤其是当需要复杂推理和交互时。

6)创建代理执行器

agent_executor = create_react_agent(llm, tools, state_modifier=prompt)

create_react_agent 是我们用来构造 Agent 的核心函数,它接收三个参数:

  • llm:GPT 模型实例,用于生成回答。
  • tools:可以用来辅助模型执行任务的工具(这里是 TavilySearchResults 搜索工具)。
  • state_modifier:指定一个提示语,帮助模型了解其作用和任务。

代理执行器 (agent_executor) 就是最终生成的代理对象,它整合了 LLM 和搜索工具。

7)调用代理并打印结果

response = agent_executor.invoke({"messages": [("user", "who is the winner of the us open")]})
print(response)

我们调用 agent_executor 来处理一个用户问题,询问 "谁赢得了 US Open"(美国网球公开赛)。该代理会使用 Tavily 搜索工具查询网络并返回结果。invoke 方法接收一个包含 messages 的参数,该参数结构模拟对话,其中用户消息是搜索问题。

8)完整代码

# 设置 API 密钥
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()


from langchain import hub  # 从 langchain 导入 hub 模块
from langchain_openai import ChatOpenAI  # 从 langchain_openai 导入 ChatOpenAI 类
from langgraph.prebuilt import create_react_agent  # 从 langgraph.prebuilt 导入 create_react_agent 函数
from langchain_community.tools.tavily_search import TavilySearchResults  # 从 langchain_community.tools 导入 TavilySearchResults 类

# 定义工具,使用 Tavily 搜索工具
tools = [TavilySearchResults(max_results=3)]  # 设置最大结果为 3

# 获取提示
prompt = "你是一个有用的Agent"  # 定义代理的提示
print(prompt)  # 打印提示

# 创建 LLM 实例
llm = ChatOpenAI(model="gpt-4-turbo-preview")  # 实例化 ChatOpenAI,使用 gpt-4-turbo-preview 模型
agent_executor = create_react_agent(llm, tools, state_modifier=prompt)  # 创建代理执行器

# 可选:调用代理并打印响应
response = agent_executor.invoke({"messages": [("user", "who is the winner of the us open")]})  # 调用代理,询问 US Open 胜者
print(response)  # 打印响应

这段代码展示了如何将 OpenAI 的 GPT 模型与搜索工具整合在一起,通过提示语引导代理完成特定任务。在实际应用中,你可以根据需求修改提示语、模型版本或者引入更多工具——将在下面小节中阐述。

4、plany.py——定义系统节点plan、replan

1)环境变量加载

from dotenv import load_dotenv
# 加载环境变量
load_dotenv()

这部分代码通过 dotenv 库加载环境变量文件(通常为 .env 文件),其中可能存放诸如 API 密钥等敏感信息。这一步确保了你的 API 密钥在代码运行时被安全地加载,而不会暴露在代码中。

2)定义计划执行模型 PlanExecute

import operator
from typing import Annotated, List, Tuple
from typing_extensions import TypedDict

class PlanExecute(TypedDict):
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

PlanExecute 是一个 TypedDict,它定义了一个用于管理计划执行的结构化数据。它包含:

  • input: 用户提供的原始输入。
  • plan: 系统生成的执行步骤列表。
  • past_steps: 一个带注解的列表,用于记录过去已经执行的步骤。operator.add 表示每次新增步骤时,该列表会扩展。
  • response: 最终的用户响应。 这种结构便于系统在执行复杂任务时跟踪状态。

3)定义计划模型 Plan

from pydantic import BaseModel, Field

class Plan(BaseModel):
    """Plan to follow in future"""
    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )

这里定义了一个 Plan 模型,继承自 pydantic.BaseModel,用于表示任务的执行步骤。模型中的字段 steps 是一个有序的步骤列表,字段描述表明这些步骤应该按照执行顺序排列。

4)规划器提示模板 planner_prompt

from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.""",
        ),
        ("placeholder", "{messages}"),
    ]
)
  • ChatPromptTemplate.from_messages 用于创建一个提示模板,定义了 system 角色的提示信息。
  • 提示语告诉系统:对于给定的目标,生成一个简单的分步计划,确保每一步足够清晰完整,并且一步一步地解决问题,避免不必要的步骤。
  • 系统的最终输出必须直接回答问题。

5)创建规划器 planner

planner = planner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0
).with_structured_output(Plan)
  • planner 是一个通过管道操作符 | 创建的规划器,它将 planner_prompt 连接到 ChatOpenAI 模型(这里使用的是 gpt-4o 模型)。
  • temperature=0 表示生成结果时趋向于确定性输出,即不会有太多的随机性。
  • with_structured_output(Plan) 表明系统将输出与前面定义的 Plan 结构对齐。

6)规划器调用

planner.invoke(
    {
        "messages": [
            ("user", "what is the hometown of the current Australia open winner?")
        ]
    }
)
  • 调用 planner.invoke 来执行计划生成器,这里用户的输入是:"当前澳大利亚网球公开赛冠军的家乡在哪里?"。
  • 系统会根据这个问题生成一个步骤清晰的计划。

7)定义响应和行动模型

from typing import Union

class Response(BaseModel):
    """Response to user."""
    response: str

class Act(BaseModel):
    """Action to perform."""
    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."
    )

这里定义了两个模型:

  • Response:用于表示直接返回用户的响应。
  • Act:表示一个需要执行的行动。Act 可以是一个 Response(如果已知答案可以直接响应),也可以是一个 Plan(如果需要进一步的步骤来获取答案)。

8)重新规划提示模板 replanner_prompt

replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)
  • replanner_prompt 用于重新规划任务。如果在执行过程中需要更新计划,该模板会生成新的步骤。
  • 提示语询问的是:当前的目标是什么?原计划是什么?已经完成了哪些步骤?并根据这些信息更新计划。

9)创建重新规划器 replanner

replanner = replanner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0
).with_structured_output(Act)
  • replanner 是基于 replanner_prompt 和 ChatOpenAI 模型生成的重新规划器。它允许系统在任务的中途重新生成或调整计划。
  • with_structured_output(Act) 表示系统将生成的输出会符合 Act 模型的结构。这意味着输出可以是两种情况之一:一个直接的 Response,用来回答用户的问题。一个新的或更新的 Plan,如果需要进一步行动。

10)完整代码

# 设置 API 密钥
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()

import operator
from typing import Annotated, List, Tuple
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from typing import Union

# 定义计划执行模型
class PlanExecute(TypedDict):
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

# 定义计划模型
class Plan(BaseModel):
    """Plan to follow in future"""
    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )

# 创建计划提示模板
planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.""",
        ),
        ("placeholder", "{messages}"),
    ]
)

# 计划生成器
planner = planner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0
).with_structured_output(Plan)

# 调用计划生成器
planner.invoke(
    {
        "messages": [
            ("user", "what is the hometown of the current Australia open winner?")
        ]
    }
)

# 定义响应和行动模型
class Response(BaseModel):
    """Response to user."""
    response: str

class Act(BaseModel):
    """Action to perform."""
    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."
    )

# 重新规划提示模板
replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)

# 重新规划器
replanner = replanner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0
).with_structured_output(Act)

完整功能解释:

  1. 加载环境变量:通过 load_dotenv(),加载环境变量中的 API 密钥等信息。这使得 API 密钥等敏感数据能够在运行时安全地获取。

  2. 计划生成:在用户提出目标任务后,系统会通过 planner 模块生成一个分步计划(Plan)。这个计划会包含所有必要的步骤来完成任务,确保每一步都有足够的上下文信息来推动任务前进。

  3. 计划执行:在计划生成后,系统可以根据当前的执行进度(即已经完成的步骤)和剩余步骤来调整任务。PlanExecute 定义了跟踪执行状态的结构。

  4. 响应和行动:Act 模型定义了系统的两种可能操作方式:

  5. 如果任务的计划已经执行完毕,系统可以直接生成一个 Response,将最终答案返回给用户。

  6. 如果还有未完成的步骤,系统会返回一个 Plan,以便继续执行。

  7. 重新规划:如果在执行过程中,某些步骤需要更新或调整,系统会使用 replanner 来重新生成一个任务计划,并基于已经完成的步骤更新剩余任务。replanner 允许系统根据实际进展灵活调整任务,而不必从头重新开始。

5、graph.py——定义不同结点之间边的连接关系

现在我们构建完了所需的所有结点,但使其运作起来,还需要构建一个基于状态图(StateGraph)的自动化执行流程。通过计划生成、步骤执行、重新规划等一系列流程,系统能够动态地执行任务,并在任务中途根据实际进展进行调整。这里plan_and_execute使用的就是有向无环图

1)加载环境变量与导入依赖

from dotenv import load_dotenv
from agent import agent_executor
# 加载环境变量
load_dotenv()

import plan
  • dotenv 模块用于加载环境变量,其中可能存储了 API 密钥等信息。
  • agent_executor 是从 agent.py 导入的,是执行搜索任务的代理执行器。

2)定义异步步骤执行函数 execute_step

async def execute_step(state: PlanExecute):
    plan = state["plan"]
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step {1}, {task}."""
    agent_response = await agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}
    )
    return {
        "past_steps": [(task, agent_response["messages"][-1].content)],
    }
  • 这个函数负责执行计划中的某一个步骤。它将当前的计划步骤格式化为一个文本并传递给 agent_executor,该代理使用 GPT 模型来处理具体任务。
  • agent_executor.ainvoke() 是一个异步调用,它返回执行步骤后的结果,并将该步骤的结果添加到 past_steps 中。

3)定义异步的计划生成函数 plan_step

async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})
    return {"plan": plan.steps}

plan_step 是一个生成任务计划的函数。它通过调用 planner 来生成并返回一个步骤列表,planner 会根据用户的输入(如目标任务)生成一个有序的计划步骤。

4)定义异步的重新规划函数 replan_step

async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        return {"plan": output.action.steps}
  • 如果在执行任务时需要重新规划,这个函数会调用 replanner 来根据当前状态更新计划。返回的可能是一个直接的 response(任务已经完成),或者是一个新的计划。

5)定义结束判断函数 should_end

def should_end(state: PlanExecute) -> Literal["agent", "__end__"]:
    if "response" in state and state["response"]:
        return "__end__"
    else:
        return "agent"
  • 这个函数用于判断任务是否已经结束。如果任务已经产生了一个有效的 response,流程将结束;否则,它将继续执行任务。

6)定义状态图 workflow

from langgraph.graph import StateGraph, START

workflow = StateGraph(PlanExecute)

# Add the plan node
workflow.add_node("planner", plan_step)

# Add the execution step
workflow.add_node("agent", execute_step)

# Add a replan node
workflow.add_node("replan", replan_step)

workflow.add_edge(START, "planner")
workflow.add_edge("planner", "agent")
workflow.add_edge("agent", "replan")

workflow.add_conditional_edges(
    "replan",
    should_end,
)
  • 通过 StateGraph 定义了任务的执行流程(状态图),每个步骤都是一个 node。任务从 planner 节点开始,生成计划;接着进入 agent 节点,执行该计划;如果需要调整,则进入 replan 节点。
  • 边(edges)连接了这些节点,指示任务的执行顺序。最后,通过 add_conditional_edges 方法,决定是否结束任务。

7)编译状态图

app = workflow.compile()
  • workflow.compile() 将整个状态图编译成可执行的应用。这个应用程序可以像其他普通的可运行函数一样使用。

8)可视化状态图

img_path = "graph.png"
graph_image = app.get_graph(xray=True).draw_mermaid_png()

with open(img_path, "wb") as img_file:
    img_file.write(graph_image)

from IPython.display import Image, display
display(Image(img_path))
  • 这一段代码用于可视化状态图,生成并展示了任务的执行流程图。draw_mermaid_png() 将状态图转换为 PNG 图像。

9)运行应用程序

config = {"recursion_limit": 50}
inputs = {"input": "what is the hometown of the mens 2024 Australia open winner?"}

async def main():
    async for event in app.astream(inputs, config=config):
        for k, v in event.items():
            if k != "__end__":
                print(v)
  • config 设置了递归限制,防止执行过多次循环。
  • inputs 是输入数据(问题)。在这个例子中,询问的是 "2024年澳大利亚网球公开赛男单冠军的家乡是哪儿?"。
  • main() 函数异步运行整个流程,捕获并处理任务执行过程中的事件。

10)启动主程序

if __name__ == "__main__":
    asyncio.run(main())
  • 这是标准的 Python 主程序入口。asyncio.run(main()) 启动异步主循环,运行整个任务执行流程。

11)完整代码

# 设置 API 密钥
from dotenv import load_dotenv
from agent import agent_executor
# 加载环境变量
load_dotenv()

import plan

import asyncio
from typing import Literal
from plan import PlanExecute, replanner, Response, planner
from langgraph.graph import StateGraph, START


async def execute_step(state: PlanExecute):
    plan = state["plan"]
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step {1}, {task}."""
    agent_response = await agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}
    )
    return {
        "past_steps": [(task, agent_response["messages"][-1].content)],
    }


async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})
    return {"plan": plan.steps}


async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        return {"plan": output.action.steps}


def should_end(state: PlanExecute) -> Literal["agent", "__end__"]:
    if "response" in state and state["response"]:
        return "__end__"
    else:
        return "agent"


from langgraph.graph import StateGraph, START

workflow = StateGraph(PlanExecute)

# Add the plan node
workflow.add_node("planner", plan_step)

# Add the execution step
workflow.add_node("agent", execute_step)

# Add a replan node
workflow.add_node("replan", replan_step)

workflow.add_edge(START, "planner")
workflow.add_edge("planner", "agent")
workflow.add_edge("agent", "replan")

workflow.add_conditional_edges(
    "replan",
    should_end,
)

# Compile the workflow
app = workflow.compile()

# 可视化状态图
img_path = "graph.png"
graph_image = app.get_graph(xray=True).draw_mermaid_png()

with open(img_path, "wb") as img_file:
    img_file.write(graph_image)

from IPython.display import Image, display
display(Image(img_path))

# 配置和输入
config = {"recursion_limit": 50}
inputs = {"input": "what is the hometown of the mens 2024 Australia open winner?"}


# 处理事件流
async def main():
    async for event in app.astream(inputs, config=config):
        for k, v in event.items():
            if k != "__end__":
                print(v)

# 运行主程序
if __name__ == "__main__":
    asyncio.run(main())

6、运行结果及分析

1)graph.png

运行 graph.py 文件后,项目文件夹中会生成一张 graph.png 图片,该图片展示了当前项目的整体架构。此架构图以有向无环图(DAG)的形式呈现,其中:

  • 实线 表示无条件的边,表示流程中节点之间的固定连接。
  • 虚线 表示有条件的边,表示根据某些条件决定是否执行特定的节点。

通过这张图,我们可以直观地看到任务的计划(plan)与执行(execute)流程,以及它们在工作流中的交互关系。

图片没生成的同学,可以直接打开磁盘目录看看~

2)运行结果分析

同时,我们能够在控制台看到如下输出:

{'plan': ["Identify the winner of the men's 2024 Australian Open.", 'Research the hometown of the identified winner.']}
{'past_steps': [("Identify the winner of the men's 2024 Australian Open.", "The winner of the men's singles tennis title at the 2024 Australian Open was Jannik Sinner. He defeated Daniil Medvedev in the final with scores of 3-6, 3-6, 6-4, 6-4, 6-3 to win his first major singles title.")]}
{'plan': ['Research the hometown of Jannik Sinner.']}
{'past_steps': [('Research the hometown of Jannik Sinner.', "Jannik Sinner's hometown is Sexten, which is located in northern Italy.")]}
{'response': "The hometown of the men's 2024 Australian Open winner, Jannik Sinner, is Sexten, located in northern Italy."}

整个执行流程如下:

📌生成任务计划:

{'plan': ["Identify the winner of the men's 2024 Australian Open.", 'Research the hometown of the identified winner.']}
  • 节点调用: 流程从 START 节点开始,进入 planner 节点,调用了 plan_step() 函数。
  • 功能: plan_step() 函数根据用户问题生成任务计划。在本例中,计划生成了两个步骤:识别 2024 年澳网男单冠军是谁,查找该冠军的家乡。
  • 实线(无条件边): 计划生成后,实线将任务传递给下一个节点,无条件进入执行阶段。

📌执行第一个步骤:识别冠军

{'past_steps': [("Identify the winner of the men's 2024 Australian Open.", "The winner of the men's singles tennis title at the 2024 Australian Open was Jannik Sinner. He defeated Daniil Medvedev in the final with scores of 3-6, 3-6, 6-4, 6-4, 6-3 to win his first major singles title.")]}
  • 节点调用: 进入 agent 节点,调用了 execute_step() 函数,执行计划中的第一个任务:识别冠军。
  • 功能: execute_step() 调用代理(如 OpenAI 的 GPT 模型)来查询答案,成功返回了 Jannik Sinner 是 2024 年澳网男单冠军。
  • 结果存储: 任务结果被记录在 past_steps 中,表示该步骤已完成。
  • 实线(无条件边): 执行第一个步骤后,实线将流程连接到下一步,通常这时候会进行重新规划。

📌重新规划(Replan Step)

{'plan': ['Research the hometown of Jannik Sinner.']}
  • 节点调用: 进入 replan 节点,调用 replan_step() 函数。由于第一个步骤已经执行,系统根据结果重新规划任务。
  • 功能: 在 replan_step() 中,系统更新了原计划,将第二步调整为“查找 Jannik Sinner 的家乡”,这是基于第一个步骤完成后的上下文信息。
  • 虚线(条件边): replan 节点通过虚线边与执行节点连接,这表示有条件的执行,只有在执行完成并需要调整计划时才触发重新规划。

📌执行第二个步骤(Execute Step)

{'past_steps': [('Research the hometown of Jannik Sinner.', "Jannik Sinner's hometown is Sexten, which is located in northern Italy.")]}
  • 节点调用: 再次进入 agent 节点,调用 execute_step() 函数,执行更新后的任务:“查找 Jannik Sinner 的家乡”。
  • 功能: 系统通过代理查询,得到了结果:Jannik Sinner 的家乡是 Sexten,位于意大利北部。
  • 结果存储: 此步骤结果也被记录在 past_steps 中。
  • 实线(无条件边): 步骤被无条件执行完毕后,流程进入结束判断阶段。

📌结束判断

{'response': "The hometown of the men's 2024 Australian Open winner, Jannik Sinner, is Sexten, located in northern Italy."}
  • 节点调用: 调用 should_end() 函数,检查是否完成了所有任务步骤。
  • 功能: should_end() 函数判断 response 字段是否已填充结果。如果任务已经有最终答案(这里是 Jannik Sinner 的家乡),那么流程结束。
  • 条件边: 如果任务完成,通过条件边返回最终结果,流程终止。

planner结点只会执行一次,就是负责初始化plan,plan中包含多个step步骤。plan的更新是通过replan结点实现的,它通过用户的输入input、上一步的plan、当前执行的step,三方面重新制定新的plan。同时,结点间的流向由条件边或者无条件边决定。

肯定有小伙伴有疑问,replan和agent之间不是形成环路了吗,怎么是有向无环图了呢?这里注意,有向无环图指的是,从起始节点(如 START)出发,永远不会回到这个起始节点。在这种情况下,图的整体是无环的。在局部的节点之间,可能会存在节点之间的反复调用(比如 agent 和 replan 之间),这种局部的反复执行属于流程的一部分,但并不违反有向无环图的定义。

四、拓展Plan_and_execute实现多agent

我们已经完成了Plan_and_Execute的demo搭建示例,对Plan_and_Execute有了一个较为深刻的认识,但是只有一个agent结点,并不能算真正意义上的多代理系统,我们还需要加入更多的agent结点。以下是修改的细节:

1、项目整体架构

2、agent.py——加入更多的结点

这里我们新增了两个结点执行器,draw_agent_executor 给自己发送邮件,email_agent 调用OpenAI绘图接口。完整代码如下:

# 设置 API 密钥
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import state

import tool


import os  # 导入操作系统模块

import requests
from langchain_core.messages import HumanMessage  # 从 langchain_core 导入 HumanMessage 类

# 设置 API 密钥
from langchain_core.tools import BaseTool
from pydantic import PrivateAttr, BaseModel

from langchain import hub  # 从 langchain 导入 hub 模块
from langchain_openai import ChatOpenAI  # 从 langchain_openai 导入 ChatOpenAI 类
from langgraph.prebuilt import create_react_agent  # 从 langgraph.prebuilt 导入 create_react_agent 函数
from langchain_community.tools.tavily_search import TavilySearchResults  # 从 langchain_community.tools 导入 TavilySearchResults 类

# 定义工具,使用 Tavily 搜索工具
tools = [TavilySearchResults(max_results=3)]  # 设置最大结果为 3

# 获取提示
prompt = "你是一个有用的Agent"  # 定义代理的提示
print(prompt)  # 打印提示

# 创建 LLM 实例
llm = ChatOpenAI(model="gpt-4-turbo-preview")  # 实例化 ChatOpenAI,使用 gpt-4-turbo-preview 模型
search_agent_executor = create_react_agent(llm, tools, state_modifier=prompt)  # 创建代理执行器

# 可选:调用代理并打印响应
# response = agent_executor.invoke({"messages": [("user", "who is the winner of the us open")]})  # 调用代理,询问 US Open 胜者
# print(response)  # 打印响应
# 定义绘图工具
# 定义符合 LangChain 规范的绘图工具
class DrawTool(BaseTool):
    name: str = "generate_image"  # 添加类型注解
    description: str = "Generates an image based on the input text."  # 添加类型注解

    # 工具的调用函数
    def _run(self, prompt: str) -> str:
        """根据给定的提示生成图片"""

        # 在调用时获取 API key,避免在初始化时处理
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            return "API key is missing. Please set the OPENAI_API_KEY environment variable."


        """根据给定的提示生成图片"""
        url = "https://api.openai.com/v1/images/generations"

        headers = {
            "Authorization": f"Bearer sk-proj-xxx",
            "Content-Type": "application/json"
        }

        # 请求体
        payload = {
            "prompt": prompt,
            "n": 1,  # 生成一张图片
            "size": "1024x1024"  # 设置图片的大小
        }

        try:
            response = requests.post(url, json=payload, headers=headers)
            response.raise_for_status()  # 检查请求是否成功

            image_url = response.json()["data"][0]["url"]  # 获取生成的图片 URL
            return image_url

        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            return "图像生成失败"

    async def _arun(self, prompt: str) -> str:
        """异步版本的运行函数"""
        # 直接调用同步版本的 _run 方法
        return self._run(prompt)

# 创建绘图工具实例
draw_tool = DrawTool()

# 将绘图工具作为代理的工具之一
draw_tools = [draw_tool]  # 将绘图工具放入工具列表

# 创建 LLM 实例
llm = ChatOpenAI(model="gpt-4o")  # 实例化 ChatOpenAI,使用 gpt-4-turbo-preview 模型

# 创建绘图代理
draw_agent_executor = create_react_agent(llm, draw_tools, state_modifier="你是一个有用的Agent,可以调用tool可以画图")

# 可选:调用代理并打印响应
# response = draw_agent_executor.invoke({"messages": [("user", "画一张帅哥图")]})
# print(response)  # 打印响应



class EmailTool(BaseTool):
    name: str = "send_email"
    description: str = "Sends an email based on extracted subject, body, and URL."

    # 工具的调用函数
    def _run(self, prompt: str) -> str:
        class email_parameter(BaseModel):  # 定义用于存储邮件参数的模型
            subject: str  # 邮件主题
            body: str  # 邮件内容
            url: str  # 链接

            # 创建邮件提示模板

        email_prompt = ChatPromptTemplate.from_messages(
            [
                'system', "Extract the email subject and body for me based on the input:",  # 系统消息,要求提取邮件主题和内容
                MessagesPlaceholder(variable_name='messages'),  # 占位符,用于动态插入消息
            ]
        )

        # 把 prompt 封装成字典,以符合 chain 的调用要求
        input_data = {
            "messages": [("user", prompt)]  # 把 prompt 作为 user 消息
        }
        # 创建处理邮件的链,将提示与 LLM 结合,期望结构化输出
        chain = (email_prompt | llm.with_structured_output(email_parameter))
        result = chain.invoke(input_data)  # 执行链并获取结果

        # 发送邮件,使用提取的主题和内容,收件人是指定邮箱
        tool.send_email(subject=result.subject, body=result.body + result.url, to_email="663800595@qq.com")

    async def _arun(self, prompt: str) -> str:
        """异步版本的运行函数"""
        return self._run(prompt)

# 创建发送邮件工具实例
email_tool = EmailTool()

# 将发送邮件工具作为代理的工具之一
email_tools = [email_tool]  # 将发送邮件工具放入工具列表

# 创建 LLM 实例
llm = ChatOpenAI(model="gpt-4o")  # 实例化 ChatOpenAI,使用 gpt-4-turbo-preview 模型

# 创建邮件代理
email_agent_executor = create_react_agent(llm, email_tools, state_modifier="你是一个有用的Agent,可以将用户的话直接传入,调用tool发送邮件,不需要主题和正文,都将在tool中处理")

# 可选:调用代理并打印响应
# response = email_agent_executor.invoke({"messages": [("user", "将这段内容发送到我的邮箱:你好,这是测试")]})
# print(response)  # 打印响应

3、tool.py——发送邮件函数

这里编写一个工具方法,用于给自己发送邮件。完整代码如下:

from dotenv import load_dotenv, find_dotenv  # 从 dotenv 库导入加载和查找环境变量的函数

load_dotenv(find_dotenv())  # 加载环境变量

import smtplib
from email.mime.text import MIMEText


import os  # 导入操作系统模块,用于访问环境变量

import requests


def send_email(subject, body, to_email):  # 定义发送邮件的函数,接受主题、内容和收件人邮箱
    # SMTP 服务器配置
    smtp_server = 'smtp.qq.com'  # SMTP 服务器地址
    smtp_port = 465  # 通常用于 SSL 的端口
    sender_email = 'xxx@qq.com'  # 发件人邮箱
    # password = 'xxx'  # 小心处理密码安全


    # 创建邮件内容
    message = MIMEText(body, 'plain')  # 创建纯文本邮件
    message['From'] = sender_email  # 设置发件人
    message['To'] = to_email  # 设置收件人
    message['Subject'] = subject  # 设置邮件主题

    # 发送邮件
    try:
        server = smtplib.SMTP_SSL(smtp_server, smtp_port)  # 使用 SSL 创建 SMTP 服务器连接
        server.login(sender_email, password)  # 登录到邮件服务器
        server.sendmail(sender_email, to_email, message.as_string())  # 发送邮件
        server.quit()  # 退出服务器
        print("Email sent successfully!")  # 打印发送成功的消息
    except Exception as e:  # 捕捉异常
        print(f"Failed to send email: {e}")  # 打印错误信息


def create_openai_connection(url, json_request_body):
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"  # 从环境变量中获取API密钥
    }

    # 发送POST请求
    response = requests.post(url, json=json_request_body, headers=headers)
    response.raise_for_status()  # 检查响应状态
    return response.json()

4、plan.py——修改planner、replan结点

对于现在的多个Agent结点,我们在总结Plan的同时,还需要明确知道每个Step步骤需要调用到哪个Agent结点执行,因此我们需要修改planner以及replan结点的提示词,在总结Step的基础上,额外总结出Step所需要执行的结点名称。这样一来,我们构建条件边也有了思路,即通过判断Step中包含的结点名称来进行直接路由。

1)这里我们需要修改plan的提示词

这里我们需要在生成plan的时候,同时提取出各个Step需要使用到的Agent的名称,以此通过有条件边路由,去执行对应结点功能。

# agent_list 列表
agent_list = ["generate_image_agent","search_online_agent","send_email_agent"]  # 示例代理列表

agent_list_further = {
    "generate_image_agent (for generating an image)",
    "search_online_agent (for searching information)",
    "send_email_agent (for sending email)"
}

# 定义规划提示模板
planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            f"""For the given objective, come up with a clear, simple step-by-step plan. \
Each step must include both a detailed task description and the corresponding agent that will execute that step. \
The available agents are: generate_image_agent (for generating an image), search_online_agent (for searching information), and send_email_agent (for sending an email). \
You **must** provide the task description in detail and specify which agent should handle each task.

**Your output must strictly follow this structure**:
1. Agent: [Agent Name from generate_image_agent, search_online_agent, send_email_agent] - Step X: [Detailed Task Description]
2. Agent: [Agent Name from generate_image_agent, search_online_agent, send_email_agent] - Step X: [Detailed Task Description]

For example:
1. Agent: search_online_agent - Step 1: Search for the most famous animal in Chengdu.
2. Agent: generate_image_agent - Step 2: Generate an image of the most famous animal in Chengdu.
3. Agent: send_email_agent - Step 3: Send the generated image to the user’s email.

Ensure that:
1. Every step has a clearly described task.
2. The agent handling each task is explicitly mentioned.
3. The final step should conclude with the final answer to the user's query.""",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(options=str(agent_list), agent_list=", ".join(agent_list))

2)修改提取的数据结构

这里明确让AI提取出数据,包含Agent名称以及每一步的执行步骤内容。

# 定义计划模型
class Plan(BaseModel):
    """Plan to follow in future"""  # 用于未来执行的计划

    steps: List[str] = Field(
        description="the agent name the step will use and different steps to follow (should be in sorted order)"  # 描述计划的步骤,应该是有序的
    )

3)完整代码

# 设置 API 密钥
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 导入所需的库
import operator
import os
from typing import Annotated, List, Tuple, TypedDict
from langchain_openai import ChatOpenAI  # 导入OpenAI的聊天模型

# 定义状态类
class PlanExecute(TypedDict):
    input: str  # 用户输入
    plan: List[str]  # 计划的步骤列表
    past_steps: Annotated[List[Tuple], operator.add]  # 已执行的步骤
    response: str  # 最终响应

from pydantic import BaseModel, Field  # 导入Pydantic用于数据模型

# 假设你有一个 agent_list 列表
agent_list = ["generate_image_agent","search_online_agent","send_email_agent"]  # 示例代理列表

# 定义计划模型
class Plan(BaseModel):
    """Plan to follow in future"""  # 用于未来执行的计划

    steps: List[str] = Field(
        description="the agent name the step will use and different steps to follow (should be in sorted order)"  # 描述计划的步骤,应该是有序的
    )

from langchain_core.prompts import ChatPromptTemplate  # 导入提示模板


agent_list_further = {
    "generate_image_agent (for generating an image)",
    "search_online_agent (for searching information)",
    "send_email_agent (for sending email)"
}

# 定义规划提示模板
planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            f"""For the given objective, come up with a clear, simple step-by-step plan. \
Each step must include both a detailed task description and the corresponding agent that will execute that step. \
The available agents are: generate_image_agent (for generating an image), search_online_agent (for searching information), and send_email_agent (for sending an email). \
You **must** provide the task description in detail and specify which agent should handle each task.

**Your output must strictly follow this structure**:
1. Agent: [Agent Name from generate_image_agent, search_online_agent, send_email_agent] - Step X: [Detailed Task Description]
2. Agent: [Agent Name from generate_image_agent, search_online_agent, send_email_agent] - Step X: [Detailed Task Description]

For example:
1. Agent: search_online_agent - Step 1: Search for the most famous animal in Chengdu.
2. Agent: generate_image_agent - Step 2: Generate an image of the most famous animal in Chengdu.
3. Agent: send_email_agent - Step 3: Send the generated image to the user’s email.

Ensure that:
1. Every step has a clearly described task.
2. The agent handling each task is explicitly mentioned.
3. The final step should conclude with the final answer to the user's query.""",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(options=str(agent_list), agent_list=", ".join(agent_list))



# 创建规划实例
planner = planner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0  # 使用OpenAI的GPT模型,设置温度为0以确保一致性
).with_structured_output(Plan)

# 调用规划器生成计划
planner.invoke(
    {
        "messages": [
            ("user", "what is the hometown of the current Australia open winner?")  # 用户询问的问题
        ]
    }
)

# 重新规划部分
from typing import Union  # 导入Union类型

# 定义响应类
class Response(BaseModel):
    """Response to user."""  # 响应用户的类

    response: str  # 响应内容

# 定义动作类
class Act(BaseModel):
    """Action to perform."""  # 要执行的动作类

    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."  # 描述动作的类型
    )


# 定义重新规划的提示模板
replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed (In particular, some url addresses generated need to be carried) - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan ,in particular, url addresses generated need to be filled). Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)


replanner = replanner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0
).with_structured_output(Act)

5、graph.py——以结点名称判断,搭建有条件边

这里的其他步骤大同小异,稍微注意一下结点之间边的连接,我们采用自定义有条件边进行连接,不是无条件边直接连接了。完整代码如下:

# 设置 API 密钥
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()

import plan


import asyncio
from typing import Literal
from agent import search_agent_executor, draw_agent_executor, email_agent_executor
from plan import PlanExecute, replanner, Response, planner
from langgraph.graph import StateGraph, START

async def execute_step(state: PlanExecute):
    plan = state["plan"]  # 获取当前的计划
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))  # 将计划格式化为字符串
    task = plan[0]  # 获取当前的任务
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step {1}, {task}."""  # 构造执行任务的提示
    agent_response = await search_agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}  # 发送消息给代理执行任务
    )
    return {
        "past_steps": [(task, agent_response["messages"][-1].content)],  # 返回过去的步骤
    }

async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})  # 调用 planner 获取计划
    return {"plan": plan.steps}  # 返回计划步骤

async def replan_step(state: PlanExecute):
    # print("Replan step triggered")  # 打印日志以查看何时触发
    # print(f"Current plan: {state['plan']}")  # 打印当前计划内容
    output = await replanner.ainvoke(state)  # 调用 replanner 进行重新规划
    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        # print("Update plan:"+ str(output.action.steps))
        return {"plan": output.action.steps}

def should_end_all_agents(state: PlanExecute) -> Literal["search_online_agent", "generate_image_agent", "send_email_agent", "__end__"]:
    """
    检查当前计划中的任务,选择下一个需要执行的 agent。如果所有任务已完成,则返回 "__end__"。
    """
    plan = state.get("plan", [])
    if not plan:
        return "__end__"  # 如果计划为空,直接结束
    if "response" in state and state["response"]:  # 检查是否有响应
        return "__end__"  # 如果有,结束流程
    # 判断哪个 agent 需要继续执行
    for step in plan:
        if "search_online_agent" in step:
            return "search_online_agent"
        elif "generate_image_agent" in step:
            return "generate_image_agent"
        elif "send_email_agent" in step:
            return "send_email_agent"

    return "__end__"  # 如果没有需要执行的任务,返回结束



async def draw_step(state: PlanExecute):
    plan = state["plan"]  # 获取当前的计划
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))  # 将计划格式化为字符串
    task = plan[0]  # 获取当前的任务
    task_formatted = f"""For the following plan:
    {plan_str}\n\nYou are tasked with executing step {1}, which is to generate a visual representation of {task}. Please generate the image directly."""



    agent_response = await draw_agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}  # 发送消息给代理执行任务
    )
    # 提取生成的图片 URL 并将其存储到 state 中
    image_url = agent_response["messages"][-1].content  # 假设最后的响应是生成的图片 URL
    state["image_url"] = image_url  # 存储图片 URL 到 state 中

    return {
        "past_steps": [(task, agent_response["messages"][-1].content)],  # 返回过去的步骤
    }

async def send_email_step(state: PlanExecute):
    plan = state["plan"]  # 获取当前的计划
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))  # 将计划格式化为字符串
    task = plan[0]  # 获取当前的任务
    task_formatted = f"""For the following plan:
    {plan_str}\n\nYou are tasked with executing step {1}, which is to send email of {task}."""
    agent_response = await email_agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}  # 发送消息给代理执行任务
    )
    return {
        "past_steps": [(task, agent_response["messages"][-1].content)],  # 返回过去的步骤
    }

# 创建状态图
workflow = StateGraph(PlanExecute)

# 添加计划节点
workflow.add_node("planner", plan_step)

# 添加执行步骤
workflow.add_node("search_online_agent", execute_step)

# 添加重新规划节点
workflow.add_node("replan", replan_step)

# 添加执行步骤
workflow.add_node("generate_image_agent", draw_step)

# 添加执行步骤
workflow.add_node("send_email_agent", send_email_step)


workflow.add_edge(START, "planner")  # 从起始节点到计划节点


# 从执行节点到重新规划节点
workflow.add_edge("search_online_agent", "replan")
# 从执行节点到重新规划节点
workflow.add_edge("generate_image_agent", "replan")
# 从执行节点到重新规划节点
workflow.add_edge("send_email_agent", "replan")

workflow.add_conditional_edges("replan", should_end_all_agents)  # 添加条件边,判断是否结束



# 定义自定义条件函数,根据 plan 中的字符串来决定执行哪个代理
def get_next_agent_name(plan_steps, agent_list):
    if len(plan_steps) > 0:
        step_description = plan_steps[0]  # 假设 plan.steps 是字符串列表,获取第一个步骤的描述
        for agent in agent_list:
            if agent in step_description:  # 检查代理名称是否在步骤描述中
                return agent
    return "__end__"  # 如果没有匹配的代理,返回结束标志 "__end__"

# 创建自定义的 condition_map,代理名称作为键值
condition_map = {agent: agent for agent in plan.agent_list}  # 创建条件映射
condition_map["__end__"] = "__end__"  # 添加结束条件

# 添加条件边到 workflow
workflow.add_conditional_edges(
    "planner",
    lambda x: get_next_agent_name(x["plan"], plan.agent_list),  # 使用自定义包含逻辑
    condition_map
)


# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()

# 可视化状态图
img_path = "graph.png"
graph_image = app.get_graph(xray=True).draw_mermaid_png()  # 直接获取图像数据

with open(img_path, "wb") as img_file:
    img_file.write(graph_image)  # 保存为文件

from IPython.display import Image, display
display(Image(img_path))  # 读取并显示文件



# 运行应用程序
config = {"recursion_limit": 50}
inputs = {"input": "成都最出名的动物?你再画它的图片,然后将绘图后的结果发送到我的邮箱中"}


# 处理事件流
async def main():
    async for event in app.astream(inputs, config=config):
        for k, v in event.items():
            if k != "__end__":
                print(v)


# 运行主程序
if __name__ == "__main__":
    asyncio.run(main())

6、运行及测试

现在我们运行graph.py文件,能够看到在项目根目录下生成了graph.py,我们可以清晰的看到目前图的整体架构:

并且,查看邮箱,我们将能够收到生成熊猫的图片:

还挺可爱的是吧,不过一些抽象的画图描述以及对人物的绘画,很难评,有条件还是直接对接midjourney接口更好。

这里需要注意,OpenAI的DALL.E生成的图片会有过期时间,记得及时保存到本地或者上传到自己的服务器。

至此我们完成了Plan_and_Eexcute框架的基本搭建,如果想要融入其他Agent结点,实现更多拓展功能,在此基础上编写即可。

有疑惑或者想要与我进行技术探讨,欢迎在评论区留言,我看到都会及时回复~