从 NanoBot 看 Agent Loop 的设计

此前我写过《快、简、稳 —— 我近期的个人智能体选择 NanoBot》 这篇文章,聊了从 OpenClaw 切到 NanoBot 的原因,以及日常使用的一些场景。用了一段时间之后,我最好奇的问题变成了:它只有几千行代码,是怎么做到在长对话里比 OpenClaw 稳这么多的,而且响应还快?
周末抽空把源码过了一遍,挑几个我觉得最有意思的设计讲讲。
1. 整体架构
NanoBot 把系统拆成三层:Channel、MessageBus、Agent。
┌─────────────────────────────────────────────┐
│ Channels (Telegram / DingTalk / Slack ...) │
└────────────────────┬────────────────────────┘
│ InboundMessage
┌──────▼──────┐
│ MessageBus │ ← 45 行
└──────┬──────┘
│
┌────────────▼────────────┐
│ AgentLoop │ 会话路由 / 并发控制
│ ┌──────────────────┐ │
│ │ AgentRunner │ │ 纯迭代逻辑
│ └──────────────────┘ │
└────────────┬────────────┘
│ OutboundMessage
┌──────▼──────┐
│ MessageBus │
└──────┘最底下的 MessageBus 就是两个 asyncio.Queue,整个文件就 45 行,核心如下:
class MessageBus:
def __init__(self):
self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()Channel 往 inbound 推消息,Agent 往 outbound 推回复,两边完全不认识对方。加一个新 Channel 不用动 Agent 代码,接一个新 Provider 也不用动 Channel 代码。目前 NanoBot 支持的 Telegram、DingTalk、Slack、企业微信这几个 Channel,就是靠这 45 行撑起来的。
中间的 AgentLoop 管"谁在说话、现在能不能处理",里面再嵌一个 AgentRunner 做纯迭代。Runner 不关心 Channel、不关心持久化、不关心产品逻辑,只知道怎么把一组消息喂给模型、处理工具调用、再循环。
这种分层看起来平平无奇,但沿着一条消息走一遍就会发现它的好处。
2. 主循环
AgentLoop.run() 的核心就这么几行:
while self._running:
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
# /stop 等优先命令直接处理
if self.commands.is_priority(raw):
await self.commands.dispatch_priority(ctx)
continue
# 会话处理中 → 塞进 pending_queue
effective_key = self._effective_session_key(msg)
if effective_key in self._pending_queues:
self._pending_queues[effective_key].put_nowait(msg)
continue
# 新请求 → 创建任务
asyncio.create_task(self._dispatch(msg))我第一次读这段的时候,有两个地方停下来看了很久。
2.1 跨会话并发,会话内串行
锁加在会话上,不在全局。同一个人的消息按顺序走——不会出现你刚问完 A,Agent 回来一个 B 的答复;不同用户之间又完全并行,一个慢请求不会把所有人都拖着等。再配一个全局 concurrency gate 兜住上限,不至于模型抽风时把机器打爆。
很朴素的做法,但恰好是多用户场景需要的。
2.2 中途注入(mid-turn injection)
这个更有意思。Agent 正在执行工具,用户等不及又补发了一条消息——常见的处理方式是再开一个任务。但同一会话并发两个 Agent,上下文很快就会乱。
NanoBot 的做法是:当前会话有活跃的 pending_queue 时,新消息不建任务,直接塞进队列。等 Agent 跑到合适的时机(工具执行完、最终回复写完、LLM 报错后),再把队列里的消息 drain 出来合并处理。每次最多取 3 条,防止吞得太多。
对 Agent 来说,这些追加的消息就像同一轮对话里的续发,语义是连贯的。
消息路由的事情讲清楚了,下一步是怎么拼 prompt。
3. 上下文怎么拼
每次调模型前,ContextBuilder 按固定顺序拼装系统提示:
[1] Identity — 运行环境信息(workspace 路径、OS、Python 版本)
[2] Bootstrap 文件 — AGENTS.md → SOUL.md → USER.md → TOOLS.md
[3] Memory — memory/MEMORY.md(跨会话积累的长期记忆)
[4] Always Skills — 始终激活的 skill 全文
[5] Skills 摘要 — 按需激活的 skill 列表(Agent 自己决定要不要调)
[6] Recent History — history.jsonl 最近 50 条交互记录顺序也是优先级——越靠前对模型注意力的影响越大。任一文件缺失直接跳过,不报错。
3.1 Bootstrap 文件
Bootstrap 是用户最主要的定制入口,四个文件分工很清楚:
- SOUL.md:Agent 的性格。比如
I solve problems by doing, not by describing what I would do. - USER.md:你的画像,语言偏好、时区、技术水平,让 Agent 调整表达方式。
- AGENTS.md:任务级规范,比如定时任务怎么处理、心跳任务放哪。
- TOOLS.md:工具使用的补充说明。
想调性格就改 SOUL,想加新的任务规则就改 AGENTS,互不干扰。
3.2 Memory 和 Skills
Memory 存的是跨会话积累的长期记忆,后台有 Consolidator 和 Dream 两个流程在持续整理它。Skills 是能力模块:标了 always 的直接全文塞进 prompt;其他的只挂个名字和简介,让 Agent 自己决定要不要 load 进来。这样既保证常用能力随时可用,又不会把所有东西都塞满上下文。
3.3 Runtime Context
用户消息发出去之前,正文前还会插一段 Runtime Context:
[Runtime Context — metadata only, not instructions]
Current Time: 2026-04-15 20:30
Channel: telegram
Chat ID: 123456789
[/Runtime Context]注意看那个标签——metadata only, not instructions。当前时间、来源 Channel 都告诉 Agent 了,但明确说这是参考信息不是指令。这样就算用户消息里带了伪造的时间或 chat_id,Agent 也能分清哪是命令哪是元数据。
4. 核心迭代:AgentRunner
Runner 是 NanoBot 真正的 Agent Loop。它对外只有一个 run(spec) 方法,接收消息、工具、模型名、最大迭代数,返回结果。Runner 不依赖任何外部状态,可以独立测试,也可以被 CLI、API、子 Agent 复用——产品差异通过 Hook 接口注入,Runner 本身不动。
4.1 完整工作流
把前三节串起来,一条消息从进入系统到最终返回,完整流程是这样:
用户消息
│
▼
AgentLoop.run()
├─ 优先命令(/stop 等)→ 直接处理,返回
├─ 会话处理中 → 塞进 pending_queue,等待注入
└─ 新请求 → create_task(_dispatch)
│
▼
_dispatch() # per-session Lock + 全局 concurrency gate
└─ _process_message()
├─ 恢复 crash checkpoint(如有)
├─ 组装上下文(系统提示 + 历史 + 记忆 + 当前消息)
└─ AgentRunner.run(spec)
│
▼
for iteration in range(max_iterations):
│
├─ [1] Context Governance
│ 清孤儿 → 回填缺失结果 → 压缩 → 裁剪历史
│
├─ [2] 调用 LLM(含重试)
│
├─ [3a] 有工具调用?
│ ├─ 写 checkpoint
│ ├─ 执行工具(只读并行,其余串行)
│ ├─ drain pending_queue(中途注入)
│ └─ continue → 下一次迭代
│
└─ [3b] 最终回复
├─ 空响应 → 重试(≤ 2 次)
├─ 被截断 → 追加恢复提示,continue
└─ 正常 → break,返回结果
│
├─ 保存本轮历史,清除 checkpoint
└─ 发布 OutboundMessage 到 bus想重点讲两个环节:Context Governance 和 Checkpoint。
4.2 Context Governance
每次调模型前,Runner 先对历史消息做一轮治理:
messages_for_model = self._drop_orphan_tool_results(messages)
messages_for_model = self._backfill_missing_tool_results(messages_for_model)
messages_for_model = self._microcompact(messages_for_model)
messages_for_model = self._apply_tool_result_budget(spec, messages_for_model)
messages_for_model = self._snip_history(spec, messages_for_model)
messages_for_model = self._drop_orphan_tool_results(messages_for_model)
messages_for_model = self._backfill_missing_tool_results(messages_for_model)每一步做什么:
drop_orphan_tool_results:清掉 tool_call 找不到对应结果的孤儿消息backfill_missing_tool_results:给缺失的工具结果补占位符microcompact:压缩重复的工具结果apply_tool_result_budget:控制工具结果的 token 占比snip_history:裁剪过长的历史
snip_history 可能产生新孤儿,所以末尾再清一遍。类似的思路也体现在工具定义上——发给模型前做稳定排序(内置工具按字母序在前,MCP 工具在后),保证每次 prompt 前缀一致,LLM 的 prompt cache 命中率更高。
这条链有两个细节值得留意。一个是原始 messages 始终不动,所有治理都在 messages_for_model 副本上——历史和发给模型的内容是两份独立数据。另一个是任意一步失败会立刻退回最小修复模式(只清孤儿 + 回填缺失结果),保证模型永远收到结构合法的消息。
长对话里这些结构问题会慢慢积累,提前修比事后救火稳得多。
4.3 Checkpoint 与 Crash 恢复
工具调用之前,Runner 会往会话元数据里写一个 checkpoint:
await self._emit_checkpoint(spec, {
"phase": "awaiting_tools",
"assistant_message": assistant_message,
"completed_tool_results": [],
"pending_tool_calls": [tc.to_openai_tool_call() for tc in response.tool_calls],
})如果进程在工具执行过程中崩了,下次启动时 AgentLoop 会检测到 checkpoint,把已完成的工具结果和未完成工具的合成错误一起塞回历史,Agent 从中断点继续跑。未完成的工具会收到一条 "Error: Task interrupted before this tool finished." 的结果,Agent 自己判断下一步怎么做。
5. Provider 抽象层
NanoBot 支持 Anthropic、OpenAI-compat、Azure、GitHub Copilot 等一堆 Provider,但每个具体实现类只需要实现 chat() 一个方法。重试逻辑、消息角色强制交替、图片降级、token 计量,全都在 base.py 里统一处理,新增一个 Provider 只关心调用协议本身的差异。
5.1 消息结构兼容
_enforce_role_alternation() 就是一个典型例子。不同 Provider 对消息结构的容忍度差得挺远:有的不接受末尾是 assistant 消息(不支持 prefill),有的不接受连续两条相同角色,有的在只剩 system 消息时直接报错。这个方法一次把三种情况都处理了——合并连续同角色消息、剔除尾部 assistant、只剩 system 时把最后一条 assistant 改成 user 兜底。
写新 Provider 时完全看不到这类防御代码,干净。
5.2 重试策略
重试判断是双层的:优先读结构化错误元数据(error_should_retry、error_status_code),读不到再扫响应文本关键词兜底。分两种模式:标准模式最多 3 次指数退避;持久模式以 60 秒为间隔上限持续重试,给长任务里 Provider 短暂限流的场景用。
6. 快,还有稳
回到开头那个问题 —— NanoBot 为什么快、长对话为什么稳。前面讲的一堆零散机制,串起来其实就是答案。
6.1 快从哪来
MessageBus是asyncio.Queue的薄封装,Channel 和 Agent 之间零拷贝零等待。/stop这种优先命令直接跳过任务队列,想停就能马上停。- 锁在会话粒度,我的慢请求卡不到你。
- 只读工具(搜索、读文件)会并发跑,多个搜索同时出结果。
- 中途注入塞进当前 turn,不用开新的 Agent。
- 工具定义和系统提示都稳定排序,LLM prompt cache 命中率更高,首 token 延迟明显下降。
6.2 稳从哪来
长对话在 OpenClaw 里最容易出三种问题——上下文结构坏掉、进程崩了丢进度、执行到一半"沉默"没反馈。NanoBot 对应的机制一个不少:
- 每轮都修历史:孤儿 tool_call、缺失工具结果、token 超预算、历史过长,发送前主动修好。
- Checkpoint 兜底:工具调用前写入,崩溃重启能接着跑。
- 重试分层:常规错误走指数退避,Provider 短暂限流走持久模式,重试期间打心跳日志不会沉默。
- 响应异常多重兜底:空响应自动重试,被截断追加恢复提示继续生成,达到
max_iterations用模板返回可读消息,不抛异常留半截对话。 - 流式输出:Channel 接
on_stream回调,工具在跑什么、模型在想什么都实时可见。
之前用 OpenClaw 遇到长对话时的那种不确定感——它到底在干活还是卡住了?上文它还记得吗?现在崩了我之前做的是不是都白费了?——在 NanoBot 这里基本消失了。
7. 小结
NanoBot 的精巧来自整体的克制。MessageBus 只传消息,AgentLoop 只做路由,AgentRunner 只做迭代,Provider base 吸收兼容性差异,Tool 接口只暴露三件事(名称、参数 schema、执行方法)。每一层都只管自己的事,不越界,不泛滥。
AI Agent 的核心逻辑其实很简单——给模型上下文、执行工具、循环而已。难的是那堆边界情况:消息乱序、上下文超长、Provider 各有脾气、进程可能崩、用户会中途插话。大框架应对这些问题的习惯是层层加抽象,代码量越滚越大。NanoBot 把每类问题集中在一处解决,防御代码不扩散,整个系统才能控制在几千行。
这种"每层只做一件事"的克制,比堆功能难多了,也是 NanoBot 在长对话场景下比 OpenClaw 稳的根本原因。对于把智能体当成日常生产力工具的人来说,它给的正是最值钱的那种确定性。