智算多多
官方邮箱:service@zsdodo.com

公司地址:北京市丰台区南四环西路188号总部基地三区国联股份数字经济总部


在人工智能和分布式系统领域,Multi-Agent(多智能体)系统正逐渐成为解决复杂问题的关键范式。想象一下,当你面对一个需要协调多个专业团队才能完成的大型项目时,你会怎么做?你可能会将整个项目分解成若干个阶段,每个阶段再细分为具体的任务,然后分配给不同的团队去执行。
Multi-Agent 任务分解框架正是基于这一思想,将一个复杂的、难以直接解决的目标,系统地分解为一系列可管理、可执行的子任务,然后由多个智能体协作完成。这不仅提高了问题解决的效率,还增强了系统的鲁棒性和灵活性。
在这篇文章中,我们将深入探讨 Multi-Agent 任务分解的核心概念、算法原理、数学模型以及实际应用。我们将通过具体的代码示例和架构设计,帮助你构建一个完整的任务分解框架。无论你是 AI 研究者、软件工程师还是系统架构师,相信这篇文章都能为你提供有价值的参考。
Multi-Agent 系统(MAS)是由多个相互作用的智能体(Agent)组成的计算机系统。每个智能体都是一个自治的实体,能够感知环境、做出决策并采取行动。智能体之间通过通信、协作和协商来共同完成复杂的任务。
智能体的核心特性:
任务分解(Task Decomposition)是指将一个复杂的任务或目标,按照一定的规则和策略,分解为若干个相对简单、相互关联的子任务的过程。在 Multi-Agent 系统中,任务分解是实现有效协作的第一步。
为什么任务分解如此重要?
根据不同的标准,任务分解可以分为多种类型:
按分解方式分类:
按时间关系分类:
按知识依赖分类:
随着技术的发展,我们面临的问题越来越复杂。从供应链管理到智慧城市,从灾难响应到太空探索,这些问题往往具有以下特点:
传统的集中式系统在面对这些问题时往往显得力不从心,而 Multi-Agent 系统则提供了一种更灵活、更鲁棒的解决方案。任务分解作为 Multi-Agent 系统的核心技术之一,其重要性不言而喻。
任务分解的粒度(Granularity)是指子任务的大小和复杂度。粒度过粗,子任务仍然难以处理;粒度过细,则会增加协调和通信的开销。如何找到一个合适的平衡点,是任务分解面临的第一个挑战。
子任务之间往往存在复杂的依赖关系,包括数据依赖、控制依赖和时间依赖等。如何识别、表示和处理这些依赖关系,确保任务的正确执行顺序,是另一个重要挑战。
在现实世界中,任务的执行环境往往是不确定的。子任务可能会失败,资源可能会不可用,需求可能会发生变化。如何在任务分解阶段就考虑到这些不确定性,设计出具有适应性和容错性的分解方案,是一个极具挑战性的问题。
不同的智能体具有不同的能力和资源。如何将子任务分配给最合适的智能体,充分发挥每个智能体的优势,同时避免资源浪费和瓶颈,是任务分配和调度需要解决的核心问题。
任务分解不是一个一劳永逸的过程。在任务执行过程中,我们可能需要根据实际情况对分解方案进行动态调整和优化。如何设计一个能够持续学习和改进的分解框架,是一个长期的研究课题。
| 要素名称 | 描述 | 关键功能 |
|---|---|---|
| 目标分析器 | 负责理解和分析原始目标 | 目标解析、优先级排序、约束识别 |
| 任务分解器 | 将目标分解为子任务 | 分解策略选择、粒度控制、依赖关系建立 |
| 任务分配器 | 将子任务分配给合适的智能体 | 智能体能力匹配、负载均衡、优化分配 |
| 任务监控器 | 监控任务执行状态 | 进度跟踪、异常检测、状态更新 |
| 协调与通信模块 | 处理智能体之间的交互 | 消息传递、协议协商、冲突解决 |
| 学习与优化模块 | 从经验中学习并优化分解策略 | 性能评估、策略更新、知识积累 |
为了更好地理解这些核心要素之间的关系,我们可以使用 ER 实体关系图来表示:
分解为
包含
分配给
具有
有
依赖于
监控
优化
GOAL
TASK
SUBTASK
AGENT
CAPABILITY
DEPENDENCY
MONITOR
LEARNING_MODULE
接下来,让我们看一下任务分解框架的交互流程和数据流向:
学习与优化模块任务监控器智能体任务分配器任务分解器目标分析器用户学习与优化模块任务监控器智能体任务分配器任务分解器目标分析器用户提交原始目标分析目标与约束传递分析后的目标生成子任务与依赖关系传递任务分解结果查询能力信息返回能力描述进行任务分配分配子任务执行子任务汇报执行状态提供性能数据反馈优化建议报告最终结果
层次任务网络(Hierarchical Task Network, HTN)是一种经典的任务分解方法,它通过任务分解规则将复杂任务逐步分解为原始任务(可直接执行的任务)。
HTN 分解算法的基本思想是从初始任务网络开始,不断选择复合任务,应用合适的方法将其分解为子任务,直到任务网络中只剩下原始任务为止。
让我们用 Python 代码来实现一个简化版的 HTN 分解算法:
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
class TaskType(Enum):
PRIMITIVE = "primitive"
COMPOUND = "compound"
@dataclass
class Task:
name: str
task_type: TaskType
parameters: Dict[str, Any] = field(default_factory=dict)
preconditions: List[str] = field(default_factory=list)
effects: List[str] = field(default_factory=list)
@dataclass
class Method:
name: str
task: str # 该方法可分解的复合任务名称
preconditions: List[str] = field(default_factory=list)
subtasks: List[Task] = field(default_factory=list)
constraints: List[str] = field(default_factory=list)
class HTNPlanner:
def __init__(self):
self.methods: Dict[str, List[Method]] = {}
self.operators: Dict[str, Task] = {}
def add_method(self, method: Method):
"""添加一个任务分解方法"""
if method.task not in self.methods:
self.methods[method.task] = []
self.methods[method.task].append(method)
def add_operator(self, operator: Task):
"""添加一个原始任务操作符"""
assert operator.task_type == TaskType.PRIMITIVE
self.operators[operator.name] = operator
def is_applicable(self, conditions: List[str], state: set) -> bool:
"""检查前提条件是否满足"""
return all(cond in state for cond in conditions)
def apply_effects(self, effects: List[str], state: set) -> set:
"""应用任务效果,更新状态"""
new_state = state.copy()
for effect in effects:
if effect.startswith("~"):
# 删除负效果
new_state.discard(effect[1:])
else:
# 添加正效果
new_state.add(effect)
return new_state
def decompose(self, task: Task, state: set) -> Optional[List[Task]]:
"""
递归分解任务,返回原始任务列表
"""
if task.task_type == TaskType.PRIMITIVE:
# 原始任务,检查是否可执行
if self.is_applicable(task.preconditions, state):
return [task]
else:
return None
# 复合任务,尝试应用方法进行分解
if task.name not in self.methods:
return None
for method in self.methods[task.name]:
if not self.is_applicable(method.preconditions, state):
continue
# 尝试分解该方法的所有子任务
current_state = state.copy()
plan = []
success = True
for subtask in method.subtasks:
subtask_plan = self.decompose(subtask, current_state)
if subtask_plan is None:
success = False
break
# 更新状态
for primitive_task in subtask_plan:
if primitive_task.name in self.operators:
op = self.operators[primitive_task.name]
current_state = self.apply_effects(op.effects, current_state)
plan.extend(subtask_plan)
if success:
return plan
return None
def plan(self, initial_tasks: List[Task], initial_state: set) -> Optional[List[Task]]:
"""
生成完整的计划
"""
plan = []
current_state = initial_state.copy()
for task in initial_tasks:
task_plan = self.decompose(task, current_state)
if task_plan is None:
return None
# 更新状态
for primitive_task in task_plan:
if primitive_task.name in self.operators:
op = self.operators[primitive_task.name]
current_state = self.apply_effects(op.effects, current_state)
plan.extend(task_plan)
return plan
除了 HTN 方法外,基于图论的方法也是任务分解的重要途径。这种方法将任务表示为图中的节点,任务之间的依赖关系表示为边,然后通过图算法进行分解和优化。
任务依赖图是一种有向无环图(DAG),其中:
在任务依赖图中,关键路径(Critical Path)是指从起始节点到结束节点的最长路径,它决定了整个任务的最短完成时间。识别关键路径对于任务调度和优化至关重要。
让我们用 Python 实现关键路径算法:
from typing import List, Dict, Set, Tuple
from dataclasses import dataclass, field
default_factory=list
@dataclass
class TaskNode:
id: str
duration: float # 任务持续时间
dependencies: List[str] = field(default_factory=list) # 依赖的任务ID列表
# 以下属性将在算法中计算
earliest_start: float = 0.0
earliest_finish: float = 0.0
latest_start: float = float('inf')
latest_finish: float = float('inf')
slack: float = 0.0
class CriticalPathAnalyzer:
def __init__(self, tasks: List[TaskNode]):
self.tasks: Dict[str, TaskNode] = {task.id: task for task in tasks}
# 构建反向依赖图(用于反向遍历)
self.reverse_dependencies: Dict[str, List[str]] = {task.id: [] for task in tasks}
for task in tasks:
for dep_id in task.dependencies:
self.reverse_dependencies[dep_id].append(task.id)
def topological_sort(self) -> List[str]:
"""
拓扑排序,返回按依赖关系排序的任务ID列表
"""
visited: Set[str] = set()
temp_mark: Set[str] = set()
result: List[str] = []
def visit(node_id: str):
if node_id in temp_mark:
raise ValueError("任务依赖图存在环")
if node_id not in visited:
temp_mark.add(node_id)
for dep_id in self.tasks[node_id].dependencies:
visit(dep_id)
temp_mark.remove(node_id)
visited.add(node_id)
result.insert(0, node_id)
for task_id in self.tasks:
if task_id not in visited:
visit(task_id)
return result
def calculate_earliest_times(self, topo_order: List[str]):
"""
计算最早开始时间和最早完成时间(正向遍历)
"""
for task_id in topo_order:
task = self.tasks[task_id]
if not task.dependencies:
# 没有依赖的任务,最早开始时间为0
task.earliest_start = 0.0
else:
# 最早开始时间为所有依赖任务的最早完成时间的最大值
task.earliest_start = max(
self.tasks[dep_id].earliest_finish
for dep_id in task.dependencies
)
task.earliest_finish = task.earliest_start + task.duration
def calculate_latest_times(self, topo_order: List[str]):
"""
计算最晚开始时间和最晚完成时间(反向遍历)
"""
# 找到最大的最早完成时间作为项目完成时间
project_finish_time = max(task.earliest_finish for task in self.tasks.values())
# 反向拓扑排序
reverse_topo_order = list(reversed(topo_order))
# 初始化没有后继任务的节点的最晚完成时间
for task_id in reverse_topo_order:
task = self.tasks[task_id]
if not self.reverse_dependencies[task_id]:
task.latest_finish = project_finish_time
# 反向遍历计算最晚时间
for task_id in reverse_topo_order:
task = self.tasks[task_id]
if task.latest_finish == float('inf'):
# 有后继任务的节点,最晚完成时间为所有后继任务的最晚开始时间的最小值
task.latest_finish = min(
self.tasks[next_id].latest_start
for next_id in self.reverse_dependencies[task_id]
)
task.latest_start = task.latest_finish - task.duration
task.slack = task.latest_start - task.earliest_start
def find_critical_path(self) -> Tuple[List[str], float]:
"""
找到关键路径
返回:(关键路径任务ID列表, 项目最短完成时间)
"""
topo_order = self.topological_sort()
self.calculate_earliest_times(topo_order)
self.calculate_latest_times(topo_order)
# 关键路径上的任务是松弛时间为0的任务
critical_tasks = [task_id for task_id, task in self.tasks.items() if task.slack == 0]
# 构建关键路径(需要确保顺序正确)
# 首先找到起点(没有依赖的关键任务)
start_tasks = [task_id for task_id in critical_tasks if not self.tasks[task_id].dependencies]
def build_path(current: str, path: List[str]) -> List[str]:
path.append(current)
# 找到下一个关键任务
next_tasks = [
next_id for next_id in self.reverse_dependencies[current]
if next_id in critical_tasks and next_id not in path
]
if not next_tasks:
return path
# 选择最早完成时间最大的那个(确保是关键路径)
next_task = max(next_tasks, key=lambda x: self.tasks[x].earliest_finish)
return build_path(next_task, path)
critical_path = build_path(start_tasks[0], [])
project_duration = self.tasks[critical_path[-1]].earliest_finish
return critical_path, project_duration
我们可以用数学语言来形式化描述任务分解问题。首先,定义一些基本概念:
定义 1(任务):一个任务 T 可以表示为一个五元组:
T=⟨id,D,R,P,E⟩
其中:
定义 2(任务分解):任务分解是一个函数 D,它将一个任务 T 映射到一个任务集合 {T1,T2,…,Tn},满足:
D(T)={T1,T2,…,Tn}
并且这些子任务的组合能够完成原始任务 T。
定义 3(任务依赖关系):任务之间的依赖关系可以用偏序关系 ≺ 表示,其中 Ti≺Tj 表示任务 Ti 必须在任务 Tj 开始之前完成。
在 Multi-Agent 系统中,任务分配是将分解后的子任务分配给合适的智能体的过程。我们可以将其建模为一个优化问题。
假设我们有 m 个智能体 A={a1,a2,…,am} 和 n 个任务 T={t1,t2,…,tn}。每个智能体 ai 执行任务 tj 的成本为 cij,所需时间为 τij。我们的目标是找到一个任务分配方案,使得总成本最小,同时满足时间约束。
决策变量:
xij={1如果任务 tj 分配给智能体 ai0否则
目标函数:
min∑i=1m∑j=1ncijxij
约束条件:
在不确定环境中,我们可以使用马尔可夫决策过程(MDP)来建模任务分解问题。
定义 4(任务分解 MDP):一个任务分解 MDP 是一个五元组:
M=⟨S,A,P,R,γ⟩
其中:
我们的目标是找到一个策略 π:S→A,使得期望累积奖励最大化:
maxπE[∑t=0∞γtR(st,π(st))]
否
是
否
是
是
否
开始
输入原始目标
目标分析与建模
目标是否可行?
细化/调整目标
选择分解策略
执行任务分解
生成任务依赖关系
分解方案是否一致?
调整分解方案
任务分配与调度
执行与监控
是否需要调整?
动态调整分解
任务完成
是
否
是
否
是
否
否
是
否
是
是
否
开始
初始化: 任务网络TN, 状态S
TN是否为空?
返回成功
选择非原始任务t
t是否为原始任务?
前提条件是否满足?
返回失败
应用操作符, 更新S
从TN中移除t
获取t的所有方法
是否有未尝试的方法?
选择一个方法m
m的前提条件是否满足?
用m的子任务替换t
记录当前状态
递归处理新的TN
递归是否成功?
恢复状态
在这个项目中,我们将构建一个简化但功能完整的 Multi-Agent 任务分解系统。这个系统将模拟一个智能客服中心,能够处理用户的各种请求,如查询信息、预订服务、处理投诉等。
系统的主要功能包括:
我们将使用 Python 3.8+ 进行开发,主要依赖以下库:
pydantic:用于数据验证和设置管理networkx:用于图算法和可视化matplotlib:用于绘图fastapi:用于构建 API 接口(可选)安装依赖:
pip install pydantic networkx matplotlib fastapi uvicorn
我们的系统将包含以下核心功能模块:
智能体层
智能体管理层
任务处理层
用户交互层
发送请求
转发请求
分析结果
子任务
查询可用智能体
智能体信息
分配任务
分配任务
分配任务
分配任务
状态更新
状态更新
状态更新
状态更新
任务结果
最终响应
返回结果
用户
API 网关
请求分析器
任务分解器
结果聚合器
智能体注册中心
任务分配器
执行监控器
信息查询智能体
预订服务智能体
投诉处理智能体
支付处理智能体
让我们逐步实现这个系统。首先,定义一些基础的数据结构:
from typing import List, Dict, Any, Optional, Set, Callable
from enum import Enum
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import uuid
import time
from collections import deque
# 任务状态枚举
class TaskStatus(Enum):
PENDING = "pending"
ASSIGNED = "assigned"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
# 智能体状态枚举
class AgentStatus(Enum):
IDLE = "idle"
BUSY = "busy"
OFFLINE = "offline"
@dataclass
class Task:
"""任务数据结构"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str
description: str
task_type: str
parameters: Dict[str, Any] = field(default_factory=dict)
dependencies: List[str] = field(default_factory=list) # 依赖的任务ID
status: TaskStatus = TaskStatus.PENDING
assigned_agent: Optional[str] = None # 分配的智能体ID
result: Any = None
error: Optional[str] = None
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
completed_at: Optional[float] = None
priority: int = 0 # 优先级,数字越大优先级越高
@dataclass
class AgentCapability:
"""智能体能力描述"""
task_type: str
skill_level: float = 1.0 # 技能水平,0-1之间
max_concurrent_tasks: int = 1
average_processing_time: float = 1.0 # 平均处理时间(秒)
@dataclass
class Agent:
"""智能体数据结构"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str
capabilities: List[AgentCapability]
status: AgentStatus = AgentStatus.IDLE
current_tasks: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
class BaseAgent(ABC):
"""智能体基类"""
def __init__(self, agent: Agent):
self.agent = agent
@abstractmethod
def can_handle(self, task: Task) -> bool:
"""判断是否能处理某个任务"""
pass
@abstractmethod
async def execute(self, task: Task) -> Any:
"""执行任务"""
pass
def get_capability(self, task_type: str) -> Optional[AgentCapability]:
"""获取处理特定类型任务的能力"""
for capability in self.agent.capabilities:
if capability.task_type == task_type:
return capability
return None
class TaskDecomposer(ABC):
"""任务分解器基类"""
@abstractmethod
def decompose(self, task: Task) -> List[Task]:
"""将任务分解为子任务"""
pass
class HTNDecomposer(TaskDecomposer):
"""基于HTN的任务分解器"""
def __init__(self):
self.methods: Dict[str, List[Callable[[Task], List[Task]]]] = {}
def register_method(self, task_type: str, method: Callable[[Task], List[Task]]):
"""注册分解方法"""
if task_type not in self.methods:
self.methods[task_type] = []
self.methods[task_type].append(method)
def decompose(self, task: Task) -> List[Task]:
"""分解任务"""
if task.task_type not in self.methods:
# 没有分解方法,返回原任务
return [task]
# 尝试所有方法,选择第一个成功的
for method in self.methods[task.task_type]:
try:
subtasks = method(task)
if subtasks:
return subtasks
except Exception as e:
print(f"分解方法执行失败: {e}")
continue
# 所有方法都失败,返回原任务
return [task]
class TaskAllocator(ABC):
"""任务分配器基类"""
@abstractmethod
def allocate(self, tasks: List[Task], agents: List[Agent]) -> Dict[str, str]:
"""分配任务,返回任务ID到智能体ID的映射"""
pass
class GreedyAllocator(TaskAllocator):
"""贪心任务分配器"""
def allocate(self, tasks: List[Task], agents: List[Agent]) -> Dict[str, str]:
"""
贪心分配策略:将任务分配给最有能力且空闲的智能体
"""
allocation = {}
available_agents = [agent for agent in agents if agent.status == AgentStatus.IDLE]
# 按优先级排序任务
sorted_tasks = sorted(tasks, key=lambda t: t.priority, reverse=True)
for task in sorted_tasks:
best_agent = None
best_score = -1
for agent in available_agents:
# 检查智能体是否有能力处理该任务
capability = next((c for c in agent.capabilities if c.task_type == task.task_type), None)
if capability:
# 计算评分:技能水平 * (1 - 当前任务数/最大并发任务数)
current_load = len(agent.current_tasks)
load_factor = 1 - (current_load / capability.max_concurrent_tasks)
score = capability.skill_level * load_factor
if score > best_score:
best_score = score
best_agent = agent
if best_agent:
allocation[task.id] = best_agent.id
# 更新智能体状态
best_agent.current_tasks.append(task.id)
if len(best_agent.current_tasks) >= next(
c.max_concurrent_tasks for c in best_agent.capabilities
if c.task_type == task.task_type
):
best_agent.status = AgentStatus.BUSY
return allocation
class MultiAgentSystem:
"""多智能体系统主类"""
def __init__(self):
self.agents: Dict[str, BaseAgent] = {}
self.tasks: Dict[str, Task] = {}
self.task_queue: deque = deque()
self.decomposer: TaskDecomposer = HTNDecomposer()
self.allocator: TaskAllocator = GreedyAllocator()
self.completed_tasks: List[Task] = []
def register_agent(self, agent: BaseAgent):
"""注册智能体"""
self.agents[agent.agent.id] = agent
def set_decomposer(self, decomposer: TaskDecomposer):
"""设置任务分解器"""
self.decomposer = decomposer
def set_allocator(self, allocator: TaskAllocator):
"""设置任务分配器"""
self.allocator = allocator
def submit_task(self, task: Task) -> str:
"""提交任务"""
self.tasks[task.id] = task
self.task_queue.append(task.id)
return task.id
async def process_tasks(self):
"""处理任务队列"""
while self.task_queue:
task_id = self.task_queue.popleft()
task = self.tasks.get(task_id)
if not task:
continue
# 1. 分解任务
subtasks = self.decomposer.decompose(task)
# 如果分解出多个子任务,将它们加入队列,并设置依赖关系
if len(subtasks) > 1:
# 假设是顺序依赖,简化处理
for i, subtask in enumerate(subtasks):
if i > 0:
subtask.dependencies = [subtasks[i-1].id]
self.tasks[subtask.id] = subtask
self.task_queue.appendleft(subtask.id) # 优先处理子任务
continue
# 2. 分配任务
available_agents = [agent.agent for agent in self.agents.values()]
allocation = self.allocator.allocate([task], available_agents)
if task.id in allocation:
agent_id = allocation[task.id]
agent = self.agents[agent_id]
# 更新任务状态
task.status = TaskStatus.ASSIGNED
task.assigned_agent = agent_id
task.started_at = time.time()
# 执行任务
try:
result = await agent.execute(task)
task.result = result
task.status = TaskStatus.COMPLETED
except Exception as e:
task.error = str(e)
task.status = TaskStatus.FAILED
task.completed_at = time.time()
# 更新智能体状态
agent.agent.current_tasks.remove(task.id)
if agent.agent.status == AgentStatus.BUSY:
# 检查是否还有未完成的任务
has_pending_tasks = any(
self.tasks[t].status in [TaskStatus.ASSIGNED, TaskStatus.IN_PROGRESS]
for t in agent.agent.current_tasks
)
if not has_pending_tasks:
agent.agent.status = AgentStatus.IDLE
# 处理依赖该任务的其他任务
self._handle_dependent_tasks(task)
# 保存完成的任务
self.completed_tasks.append(task)
def _handle_dependent_tasks(self, completed_task: Task):
"""处理依赖已完成任务的其他任务"""
for task in self.tasks.values():
if task.status == TaskStatus.PENDING and completed_task.id in task.dependencies:
# 检查所有依赖是否都已完成
all_dependencies_completed = all(
self.tasks[dep_id].status == TaskStatus.COMPLETED
for dep_id in task.dependencies
)
if all_dependencies_completed:
# 将任务加入队列头部
self.task_queue.appendleft(task.id)
现在,让我们创建一些具体的智能体和分解方法,来构建一个简单的旅行预订系统:
import asyncio
# 创建具体的智能体类
class InformationAgent(BaseAgent):
"""信息查询智能体"""
def can_handle(self, task: Task) -> bool:
return task.task_type == "information_query"
async def execute(self, task: Task) -> Any:
# 模拟信息查询
query = task.parameters.get("query", "")
await asyncio.sleep(1) # 模拟处理时间
return f"关于 '{query}' 的查询结果: 找到了相关信息"
class BookingAgent(BaseAgent):
"""预订服务智能体"""
def can_handle(self, task: Task) -> bool:
return task.task_type == "booking"
async def execute(self, task: Task) -> Any:
# 模拟预订过程
service_type = task.parameters.get("service_type", "")
details = task.parameters.get("details", {})
await asyncio.sleep(2) # 模拟处理时间
return f"{service_type}预订成功: {details}"
class PaymentAgent(BaseAgent):
"""支付处理智能体"""
def can_handle(self, task: Task) -> bool:
return task.task_type == "payment"
async def execute(self, task: Task) -> Any:
# 模拟支付过程
amount = task.parameters.get("amount", 0)
payment_method = task.parameters.get("payment_method", "")
await asyncio.sleep(1.5) # 模拟处理时间
return f"支付成功: {amount}元 via {payment_method}"
# 创建智能体实例
info_capability = AgentCapability(
task_type="information_query",
skill_level=0.9,
max_concurrent_tasks=3,
average_processing_time=1.0
)
info_agent_data = Agent(
name="信息查询助手",
capabilities=[info_capability]
)
info_agent = InformationAgent(info_agent_data)
booking_capability = AgentCapability(
task_type="booking",
skill_level=0.85,
max_concurrent_tasks=2,
average_processing_time=2.0
)
booking_agent_data = Agent(
name="预订服务助手",
capabilities=[booking_capability]
)
booking_agent = BookingAgent(booking_agent_data)
payment_capability = AgentCapability(
task_type="payment",
skill_level=0.95,
max_concurrent_tasks=2,
average_processing_time=1.5
)
payment_agent_data = Agent(
name="支付处理助手",
capabilities=[payment_capability]
)
payment_agent = PaymentAgent(payment_agent_data)
# 创建系统并注册智能体
system = MultiAgentSystem()
system.register_agent(info_agent)
system.register_agent(booking_agent)
system.register_agent(payment_agent)
# 创建并注册任务分解方法
def decompose_travel_booking(task: Task) -> List[Task]:
"""分解旅行预订任务"""
destination = task.parameters.get("destination", "")
date = task.parameters.get("date", "")
budget = task.parameters.get("budget", 0)
# 1. 查询目的地信息
info_task = Task(
name=f"查询{destination}信息",
description=f"查询关于{destination}的旅行信息",
task_type="information_query",
parameters={"query": f"{destination}旅行推荐"}
)
# 2. 预订机票
