9.7.3 Agent 间通信
- 理解多 Agent 通信为什么是系统成败关键
- 分清消息传递、共享状态、事件总线三类常见通信方式
- 看懂一个最小事件总线示例
- 理解同步通信和异步通信的工程差别
为什么通信会成为多 Agent 系统的核心问题?
Section titled “为什么通信会成为多 Agent 系统的核心问题?”多 Agent 最大的风险不是“不会干活”,而是“互相没对齐”
Section titled “多 Agent 最大的风险不是“不会干活”,而是“互相没对齐””即使每个 Agent 单独都很强,系统也可能因为通信设计差而出问题:
- 重复劳动
- 消息丢失
- 信息理解不一致
- 任务已经完成还在继续讨论
一个很直观的类比
Section titled “一个很直观的类比”多 Agent 很像一个小团队协作:
- 分工只是第一步
- 真正决定效率的,往往是开会、交接、同步、回传这些沟通机制
这就是为什么通信不是“附属模块”,而是核心结构。
最常见的三种通信方式
Section titled “最常见的三种通信方式”直接消息传递(message passing)
Section titled “直接消息传递(message passing)”一个 Agent 明确给另一个 Agent 发消息。
优点:
- 简单
- 清晰
- 好追踪
缺点:
- Agent 之间耦合比较强
共享状态(shared state / blackboard)
Section titled “共享状态(shared state / blackboard)”所有 Agent 都往一个共享工作区写入和读取信息。
优点:
- 不需要每次显式点对点发送
- 很适合多方协同观察同一个任务状态
缺点:
- 更容易写乱
- 权限和冲突更难控
事件总线(event bus)
Section titled “事件总线(event bus)”Agent 不一定直接知道彼此,而是把消息发到总线,由订阅者接收。
优点:
- 更解耦
- 更适合复杂系统
缺点:
- 调试更复杂
先看最简单的点对点消息传递
Section titled “先看最简单的点对点消息传递”一个最小示例
Section titled “一个最小示例”message = { "from": "planner", "to": "worker", "type": "task_assignment", "content": "请整理退款政策的关键条件"}
print(message)预期输出:
{'from': 'planner', 'to': 'worker', 'type': 'task_assignment', 'content': '请整理退款政策的关键条件'}为什么这已经很重要?
Section titled “为什么这已经很重要?”因为它把通信的几个关键元素都显式化了:
- 谁发的
- 发给谁
- 消息类型
- 消息内容
这比“随便传一段自然语言”稳很多。
消息格式为什么要标准化?
Section titled “消息格式为什么要标准化?”一个坏消息格式
Section titled “一个坏消息格式”bad_message = "帮我做这个任务"print(bad_message)预期输出:
帮我做这个任务问题在于:
- 不知道谁发的
- 不知道任务类型
- 不知道上下文
- 不知道下一步怎么处理
一个更稳的消息结构
Section titled “一个更稳的消息结构”good_message = { "from": "planner", "to": "researcher", "type": "search_request", "task_id": "task_001", "payload": { "query": "退款政策" }}
print(good_message)预期输出:
{'from': 'planner', 'to': 'researcher', 'type': 'search_request', 'task_id': 'task_001', 'payload': {'query': '退款政策'}}这才更像一个能进入系统流水线的消息。

一个最小事件总线示例
Section titled “一个最小事件总线示例”from collections import defaultdict
class EventBus: def __init__(self): self.handlers = defaultdict(list)
def subscribe(self, event_type, handler): self.handlers[event_type].append(handler)
def publish(self, event_type, payload): for handler in self.handlers[event_type]: handler(payload)
def planner_handler(payload): print("[planner] 收到结果:", payload)
def worker_handler(payload): print("[worker] 收到任务:", payload) result = { "task_id": payload["task_id"], "summary": f"已完成对 {payload['query']} 的检索" } bus.publish("task_done", result)
bus = EventBus()bus.subscribe("task_assignment", worker_handler)bus.subscribe("task_done", planner_handler)
bus.publish("task_assignment", { "task_id": "task_001", "query": "退款政策"})预期输出:
[worker] 收到任务: {'task_id': 'task_001', 'query': '退款政策'}[planner] 收到结果: {'task_id': 'task_001', 'summary': '已完成对 退款政策 的检索'}这段代码真正教了什么?
Section titled “这段代码真正教了什么?”它教你:
- 通信不一定非要点对点耦合
- 可以通过事件类型来解耦
- 完成消息和结果消息可以走同一套基础设施
这已经非常接近真实系统的通信主线了。
共享状态:什么时候更适合?
Section titled “共享状态:什么时候更适合?”一个很典型的场景
Section titled “一个很典型的场景”如果多个 Agent 都围绕同一个任务工作,比如:
- 规划者写计划
- 检索者写资料
- 撰写者生成草稿
- 审核者写评审意见
这时很多信息都可以放在共享工作区里。
一个最小示例
Section titled “一个最小示例”shared_state = { "goal": "完成退款政策总结", "plan": [], "evidence": [], "draft": None, "review": None}
# plannershared_state["plan"] = ["查政策", "整理要点", "输出总结"]
# retrievershared_state["evidence"].append("购买后 7 天内且学习进度低于 20% 可退款")
# writershared_state["draft"] = "退款条件包括时间限制和学习进度限制。"
print(shared_state)预期输出:
{'goal': '完成退款政策总结', 'plan': ['查政策', '整理要点', '输出总结'], 'evidence': ['购买后 7 天内且学习进度低于 20% 可退款'], 'draft': '退款条件包括时间限制和学习进度限制。', 'review': None}这种方式的优缺点
Section titled “这种方式的优缺点”优点:
- 大家都能看同一块黑板
- 状态更集中
缺点:
- 谁能写什么要控制
- 很容易出现覆盖冲突
同步通信和异步通信怎么理解?
Section titled “同步通信和异步通信怎么理解?”一个 Agent 发出请求后,要等对方回复,自己才能继续。
优点:
- 简单
- 容易理解
缺点:
- 容易堵塞
发出消息后先继续做别的事,等对方完成后再回来处理结果。
优点:
- 更灵活
- 更适合复杂系统和高并发
缺点:
- 状态管理更复杂
一个很实用的工程直觉
Section titled “一个很实用的工程直觉”如果你的任务链很短、流程很清楚,先同步。 如果任务很长、等待时间不稳定,再考虑异步。
Agent 间通信最常见的失败点
Section titled “Agent 间通信最常见的失败点”消息格式不统一
Section titled “消息格式不统一”今天叫 task_id,明天叫 id,后天叫 job_id,系统会越来越乱。
消息发出去了,但没人处理
Section titled “消息发出去了,但没人处理”这是事件系统里很常见的问题:
- 发布了
- 但没有订阅者
多个 Agent 理解同一条消息的方式不同
Section titled “多个 Agent 理解同一条消息的方式不同”例如:
- 一个 Agent 觉得这是“检索请求”
- 另一个 Agent 觉得这是“总结请求”
这就会导致系统跑偏。
没有超时和重试
Section titled “没有超时和重试”如果一个 Agent 卡住,整个系统可能就一直等下去。
真实系统里怎样让通信更稳?
Section titled “真实系统里怎样让通信更稳?”统一消息协议
Section titled “统一消息协议”至少统一:
fromtotypetask_idpayload
统一状态追踪
Section titled “统一状态追踪”每条任务最好都有唯一 ID,便于:
- 追踪完整链路
- 回放
- 排错
统一超时和失败策略
Section titled “统一超时和失败策略”例如:
- 超时自动回退
- 失败转人工
- 多次重试后终止
学完这一页,至少保留这张证据卡:
- 角色
- 负责人、执行者、评审者,或专家职责
- 消息契约
- artifact、request、response 和交接状态
- 协同
- 路由、任务拆分、冲突解决和最终负责人
- 失败检查
- 重复工作、上下文丢失、没有明确负责人或消息循环
- 评估动作
- 将多 Agent 结果与单 Agent 基线对比
这一节最重要的不是记住“消息传递、事件总线、共享状态”这些词,而是理解:
多 Agent 通信的关键,不只是把消息发出去,而是让消息结构稳定、责任清晰、失败可控。
只有通信层做稳了,多 Agent 系统才不会因为“组织混乱”而把模型能力浪费掉。
- 给事件总线示例再加一个
reviewer_handler,让它订阅task_done。 - 设计一份你自己的统一消息协议,至少包含
type、task_id和payload。 - 想一想:什么时候你会更倾向于共享状态,而不是点对点消息?
- 用自己的话解释:为什么多 Agent 系统里,通信设计往往和任务分工同样重要?
参考实现与讲解
reviewer_handler应订阅task_done,读取 payload,检查结果是否满足标准,然后发布 review 事件,或把 review 状态写回 shared state。- 一个有用的协议可以包含
type、task_id、from、to、payload、evidence、status、timestamp。字段可以调整,但消息语义要稳定。 - 当多个 Agent 需要同一份不断变化的 artifact,或点对点消息会重复大量上下文时,shared state 更合适;简单交接和窄请求更适合直接消息。
- 通信设计重要,是因为即使角色本身不错,如果输入含糊、证据丢失、重复劳动、无法判断任务是否完成,系统仍然会失败。