上周折腾 OpenClaw 折腾到第三次配置不兼容的时候,我就在想,这玩意儿为什么不能自己写一个。
我要的其实很简单:一个长期在线的 bot,能聊天、能记事、能跑代码、能设定时任务、能从 Telegram 接消息也能从终端接消息。但凡用过现成方案就知道,所有这类平台都有一个共同问题——抽象太厚。你想改一个行为得翻三层配置,想加一个工具得学一套它们自己造的 DSL,出了 bug 栈回溯能追到火星上去。
所以这次我决定从头写。项目叫 Memoo,现在已经开源了:
这篇文章不是教程,是一个 architecture walkthrough——我想记录下在”从零写一个 AI agent”这个过程中,哪些决策是踩坑踩出来的,哪些是从一开始就想清楚的。下次想改造的时候,读这一篇就够了。
第一性原理:tool_use 就是骨架
先问一个最基本的问题:一个 AI agent 需要什么?
拆到最底层,只有三样东西:
- 一个能调用工具的 LLM
- 一个能执行工具的 runtime
- 一个能记住对话的存储
MCP、LangChain、各种 agent framework 做的事情,本质上都是在这三样东西外面套壳。但 Anthropic 的 Claude API 本身就自带 tool_use——模型会返回结构化的 tool_calls,你的代码执行完再把结果塞回去。这已经是一个完整的 agentic loop 了,我为什么还要再套一层?
所以 Memoo 的核心 loop 就非常直白。core/agent.py 里的 run() 方法基本就是一个 while True:
while True: response = await self._chat_with_fallback(messages, tools=tool_schemas, ...)
if not response.has_tool_calls: # 模型说完了,解析结构化输出 return self._parse_final_response(response)
# 有工具调用就执行 if len(response.tool_calls) > 1: results = await asyncio.gather(*[ self._execute_one_tool(tc, ctx) for tc in response.tool_calls ]) else: results = [await self._execute_one_tool(response.tool_calls[0], ctx)]
for tc, result in zip(response.tool_calls, results): messages.append(Message(role="tool_result", content=result, tool_call_id=tc.id))就这么几行,感知(LLM 读历史)、决策(模型选择 tool)、行动(执行 tool)、反思(结果进 history 下一轮再看)四个阶段全在里面了。没有 ReAct prompt、没有 chain-of-thought 硬编码、没有 graph 抽象。
单 tool call 绕过 asyncio.gather 是个刻意的微优化——gather 单个 coroutine 有 overhead,而单 call 是最常见的热路径。
结构化输出:让 API 帮你做语法约束
传统 agent 有一个永恒问题:模型最后”说完了”之后,你怎么知道它想说什么?如果你想从它的回复里提取多个字段(比如”reply”、“要不要压缩 memory”、“当前话题是什么”),一般做法是再加一次 LLM 调用做 structured extraction。贵、慢、脆弱。
Claude API 现在支持 output_schema 参数,可以直接用 JSON Schema 约束最终输出 token-by-token。所以我给 Memoo 定义了这么一个 schema:
RESPONSE_SCHEMA = { "type": "object", "properties": { "reply": {"type": "string", "description": "Reply to the user. Empty = NO_OP."}, "memory_notes": {"type": "array", "items": {"type": "string"}}, "current_topic": {"type": "string"}, "should_compress": {"type": "boolean"}, "did_success": {"type": "boolean"}, }, "required": ["reply", "memory_notes", "current_topic", "should_compress", "did_success"],}五个字段一次拿到。did_success 尤其妙——让模型自己报告成功/失败,orchestrator 不用再解析自由文本来判断任务状态。should_compress 也是由模型决定的,模型认为历史不再相关时会自己举手说”可以压缩了”,比设固定阈值靠谱得多。
NOTE结构化输出不是 prompt 层面的”请输出 JSON 格式”,是 API 层面的 grammar constraint,模型生成时被硬约束到 schema 允许的 token 集。不会出现 hallucinate 格式的情况。
Tool 注册:从 docstring 自动生成 schema
写 agent 的人都知道,给每个 tool 手动维护一份 JSON Schema 是噩梦。参数改了一个,schema 忘了同步,模型就开始瞎调。
Memoo 的 ToolRegistry 直接从 Python 的 type hints 和 Google 风格 docstring 反推 schema:
@registry.toolasync def web_search(query: str, max_results: int = 5) -> str: """Search the web for information.
Args: query: Search query string max_results: Maximum number of results to return """ ...decorator 内部做的事情:get_type_hints() 拿到参数类型 → _TYPE_MAP 查表转成 JSON Schema 类型 → 用 inspect.signature 找出哪些参数有默认值(有默认值的不进 required)→ 从 docstring 的 Args: 段落提取描述。
支持 list[str] 这种 generic,也支持 str | None 这种 Python 3.10+ 的 union 语法。关键代码就几十行:
def _python_type_to_json_schema(py_type: type) -> dict[str, Any]: origin = getattr(py_type, "__origin__", None) if origin is list: inner = py_type.__args__[0] return {"type": "array", "items": _python_type_to_json_schema(inner)} if isinstance(py_type, types.UnionType): # str | None non_none = [t for t in py_type.__args__ if t is not type(None)] return _python_type_to_json_schema(non_none[0]) return {"type": _TYPE_MAP.get(py_type, "string")}更妙的是 @registry.tool decorator 返回的是原函数本身,不做 wrapping——注册完 schema 之后,tool 依然是一个普通的 Python async function,单元测试里可以直接调。
Tool 本身也不用全局 import,tools/ 目录下任何带 register(registry, **deps) 的模块都会被 auto_discover_tools() 自动加载。依赖(memory、scheduler、config、app 本身)通过 deps dict 注入。写一个新 tool 只需要新建一个文件,不用改任何已有代码。
ContextVar 传递工具上下文
这是个小细节,但值得单独说。
Agent 执行 tool 的时候,tool 通常需要知道一些 session 信息:当前 chat_id、这个用户的 sandbox_dir、sub-agent 的递归深度等等。最粗暴的做法是把这些作为参数全部塞进 tool 签名,但那样每个 tool 都得写一堆用不上的参数,schema 也会污染到模型视野里。
Memoo 用 ContextVar:
_tool_context: ContextVar[dict[str, Any]] = ContextVar("tool_context", default={})
def set_context(ctx): _tool_context.set(ctx)def get_context(): return _tool_context.get()Agent 在 tool 执行前 set_context(ctx),tool 在内部 get_context() 拿数据。关键点在于:asyncio.create_task() 默认会复制当前的 ContextVar 状态到新 task。这意味着 sub-agent 用 create_task spawn 出去之后,它会有一份自己的 context 副本,修改 _agent_depth 不会污染父 context。多个并发 chat_id 之间也不会串。
这是个 Python 原生机制,比线程 thread-local 干净得多,我觉得很少有人用对。
两层记忆 + 混合 RAG
聊天 bot 的记忆永远是痛点。全存?context 窗口爆炸。滚动窗口?过一会儿就失忆了。
Memoo 用两层:
Tier 1 — messages 表:活跃消息,最多 200 条,就是 LLM 每轮看到的工作上下文。按时间排序。
Tier 2 — archive 表:被压缩的历史对话。每条 archive 包含 topic、summary、full_messages(原文备份)、token_count、importance(0-1 分)、embedding(JSON 浮点向量)。
检索的时候用混合 ranking,三个信号加权:
semantic_score = max(0.0, cosine_similarity(query_vec, entry_vec))keyword_score = 1.0 if query_lower in text else partial_match_ratiofinal = 0.5 * semantic_score + 0.3 * keyword_score + 0.2 * importance权重的设计理由:embedding 相似度是语义层面最强的信号(0.5),keyword 匹配处理 embedding fallback 或低质量情况(0.3),importance 给”发生过重要事情的对话”加权(0.2)。
importance 打分本身也是个很简单但有效的启发式:
score = 0.5 # baselinescore += min(0.15, len(messages) * 0.01) # 消息数score += min(0.15, tool_msgs * 0.05) # 用了 tool 的对话更重要score += min(0.10, len(total_content) / 50000) # 长度score += min(0.10, hits * 0.03) # 动作关键词命中动作关键词是手工维护的一个小集合:decided, created, fixed, installed, configured, remember, important。“做了事”的对话得分更高,后面检索时会被优先返回。
TIPFTS5 用的是 SQLite 官方推荐的 “external content table” 模式:
archive_fts虚拟表通过content='archive'只引用不复制数据,trigger 自动同步。省一半存储空间,索引还能用。
Dream Cycle:把记忆固化成事实
这个是我个人最喜欢的设计。
人类睡觉的时候会做记忆巩固——大脑回放白天的经历,把零散事件提炼成长期记忆。Memoo 有一个类似的 “dream” 进程(core/dream.py),定期跑:读取自上次 dream 以来的 archive 条目,让 LLM 分两步把里面的事实固化到 memory/MEMORY.md 和 memory/USER.md。
两步分开,是刻意的角色分离:
- Phase 1 — Analyst:一个 system prompt 专门做分析。输入是 archive 内容,输出是自由文本:“发现了什么新事实、哪些旧条目需要更新、有什么值得注意的模式”。
- Phase 2 — Editor:另一个 system prompt 专门做编辑。输入是 Phase 1 的分析 + 当前的 MEMORY.md/USER.md,输出是严格的 JSON
{"memory": "...", "user": "..."},直接覆写文件。
为什么分两步?因为”观察到什么”和”应该写什么”是两种不同的认知任务。合在一起模型会把二者混淆——要么分析不够深就急着改文件,要么改文件的时候把分析内容当成事实写进去。分开之后,Phase 1 可以放开胆子随便分析,Phase 2 只管执行编辑动作。
然后是 cost 优化。Anthropic 的 Batch API 提供整整 50% 的折扣(这个我 fact-check 过了,不是销售话术),代价是异步提交、24 小时内返回。dream 本来就是异步跑的,延迟无所谓,所以完全可以走 batch:
def _build_batch_request(custom_id, model, system, context_block, user_content, max_tokens): return { "custom_id": custom_id, "params": { "model": model, "system": [ {"type": "text", "text": system, "cache_control": {"type": "ephemeral"}}, ], "messages": [{ "role": "user", "content": [ {"type": "text", "text": context_block, "cache_control": {"type": "ephemeral"}}, {"type": "text", "text": user_content}, ], }], }, }注意 cache_control: ephemeral。Phase 1 和 Phase 2 共享同一个 context_block(当前的 MEMORY.md + USER.md),Phase 2 能命中 Phase 1 写的 cache,cached token 按正常 input 价格的 10% 收费。Batch 的 50% 折扣和 prompt cache 的 90% 折扣可以叠加——实际成本能压到原价的个位数百分比。
Cursor 用的是一个纯文本的 .dream_cursor 文件,存一个 int(上次处理到的 archive.id)。每次 dream 跑 WHERE id > cursor,处理完更新 cursor。幂等,简单,不会重复处理。
Sub-agent 与 context 传递
一个 agent 够用的时候就用一个,不够用就 spawn 一个 sub-agent 去干脏活。tools/subagent.py 提供的 spawn_agent 工具长这样:
async def spawn_agent( prompt: str, model: str = "", context_mode: str = "none", # full | summary | none readonly: bool = False, network_access: bool = True, background: str = "block", # block | bg timeout: int = 0, timeout_action: str = "background",) -> strcontext_mode 三挡:
full:把父对话完整传进去。贵但最完整。summary:用 compressor LLM(默认 Haiku,便宜)把父对话压缩成一段[Parent context]。none:白板,sub-agent 只看 prompt。
深度限制是硬的:默认最多 3 层,通过 _agent_depth context var 传递。sub-agent 要继续 spawn 的时候会被同一个 hook 拦住。
取消的传播是我折腾了一会儿才写对的。parent 被取消的时候,怎么通知所有活着的 sub-agent?用一个专门的 watcher task:
parent_cancel = ctx.get("_cancel_event")if parent_cancel: async def _propagate_cancel() -> None: await parent_cancel.wait() sub_agent.cancel() run._cancel_watcher = asyncio.create_task(_propagate_cancel())watcher 就一个 await,父 event 被 set 就立即给 sub_agent 发 cancel。sub_agent 自己的 run loop 每轮都会检查 cancel event,干净地退出。整个 cancel 树像信号链一样传下去。
还有一个我特别喜欢的细节——elastic timeout。timeout > 0 的时候用 asyncio.wait(..., timeout=...) 而不是 asyncio.wait_for:
done, pending = await asyncio.wait({task}, timeout=timeout)if not done and timeout_action == "background": return json.dumps({"run_id": run_id, "status": "moved_to_background"})wait_for 会在超时的时候 cancel task,wait 只是”停止等待”。如果 sub-agent 没跑完,它就从前台被踢到后台继续跑,主 agent 拿到 run_id 可以稍后用 read_agent_output(run_id) 查结果。不会白白杀掉一个跑了一半的任务。
Sandbox:真 OS 级隔离
让 agent 跑任意代码这件事,没人心里不打鼓。Python 层的 sandbox 全是假的——import os 两行绕过。
Memoo 的 core/sandbox.py 把 macOS 的 sandbox-exec 和 Linux 的 bubblewrap 藏在同一套 API 后面,运行时 platform.system() 自动选 backend。不支持的只有 Windows——没有等价的原生工具,而且我也不用 Windows 开发。
macOS 这边用的是 sandbox-exec(以前叫 Seatbelt),能用 SBPL(Sandbox Profile Language,其实是一种 Scheme 方言)定义极细粒度的 OS 级权限。动态生成的 profile 起手式是 (deny default)——先拒绝一切——然后只 allow 必要的文件访问和系统调用:
(version 1)(deny default)(allow process-fork)(allow process-exec)(allow file-read* (subpath "/usr/lib") (subpath "/System"))(allow file-read* file-write* (subpath "/path/to/sandbox/CHAT_ID"))(allow network* (remote ip)) ;; 可通过 network_access=false 关掉每个 chat_id 有独立的 sandbox 目录 sandbox/{chat_id}/,path hook 用 os.path.realpath 解析符号链接防止 escape:
abs_sandbox = os.path.realpath(session_dir)abs_path = os.path.realpath(os.path.join(abs_sandbox, path))if not abs_path.startswith(abs_sandbox + os.sep) and abs_path != abs_sandbox: return False, f"Path escapes sandbox: {path}"用 realpath 而不是简单 startswith 这一点很关键——否则一个指向 /etc 的 symlink 就能把你的 sandbox 穿个洞。
Linux backend 用 bubblewrap(bwrap)—— Chromium 和 Flatpak 底下都用的那个——通过 --unshare-* 系列 flag 做 namespace 隔离,效果上等价于 sandbox-exec 的 deny-default 语义。启动前会跑一次 smoke test(bwrap --ro-bind / / echo ok)确认二进制能用:
apt install bubblewrapdnf install bubblewrapWARNINGWindows 没有对应的 backend。原生缺少类似 sandbox-exec / bwrap 的 deny-default 进程级隔离机制,我暂时没打算折腾 WSL 的中间层方案。想在 Windows 上跑的话可以考虑 WSL2 + bubblewrap。
Mid-turn Injection:对话里插话
这个功能没人会主动想要,但用过一次就回不去了。
普通 bot 的交互是回合制的:你发消息 → bot 思考 → bot 回复 → 你发下一条。但 agent 的一个”回合”可能包含 10 几次 tool call,跑好几分钟。中间你想补充一句”对了顺便也做一下 X”,只能等它先结束。
Memoo 有一个 inject() 方法:
async def inject(self, run_id: str, text: str) -> bool: inbox = self._inboxes.get(run_id) if inbox is None: return False await inbox.put(text) return True每个 run 有一个 asyncio.Queue(inbox)。用户在 turn 中途发新消息时,handle_message 会先检查有没有 active task,如果有就直接 inject 到它的 inbox——而不是创建新 turn。
agent loop 在每次 tool execution 之后、下次 LLM 调用之前会 drain 这个 queue:
# 工具执行完之后while not inbox.empty(): extra = inbox.get_nowait() messages.append(Message(role="user", content=extra))下一轮 LLM 调用时就会看到追加的用户消息,不用重启 turn 也不用丢弃中间状态。从用户视角看就是”说错了 / 想补充”能丝滑插入。
还有一堆没展开的细节
这些是这篇文章塞不下但代码里都有的:
- Heartbeat 系统:
heartbeat/*.md文件带 YAML frontmatter(name, interval, enabled),定时触发 agent 跑一些自主任务(自检、整理笔记、发送日报)。 - Gateway:JSON-over-TCP 服务器,支持 token auth 和可选 mTLS。stream tool_start / tool_done / reply 事件。启动时生成一次性 token 写到
.gateway-token(mode 0o600),关机时删除。 - Crash boundary:
@crash_boundary("component")decorator 包住所有 async handler,异常写结构化 JSON 到.logs/crashes/,触发 webhook,排队 autofix。 - Skills 三级渐进式加载:L1 元数据常驻 system prompt(约 100 token/skill),L2 instructions 通过
load_skill()按需加载,L3 resources 通过load_skill_resource()。 - Advisor tool:Anthropic 新出的
advisor_20260301beta——让便宜的 Sonnet 在遇到硬问题时同步咨询 Opus,一次 API 请求内完成。Memoo 的 Anthropic provider 原生支持。 - LLM fallback chain:Anthropic 挂了自动切 OpenAI。最后一个 provider 才 report crash,前面的失败只 log warning。
为什么开源
其实一开始没打算开源。这是我自己用的东西,配置散在 config.yaml 和 .env 里,bind code 这种东西我自己知道就行。
但架构写完之后回头看,发现有几个东西我很少看见别人写对:ContextVar 在 async tool context 里的用法、FTS5 external content table、Batch API + prompt cache 叠加、cancel 通过 asyncio event 树传播、mid-turn injection 的实现。这些放着烂在私有仓库里太可惜。
另一方面,Memoo 刻意保持薄。没有 LangChain 那种 100 层抽象,没有独立的 graph DSL,核心代码加起来也就两三千行。你 clone 下来一下午就能读完整个 loop 是怎么跑的,改起来门槛极低。我希望它能成为一个”想自己写 agent 的人可以参考的最小实现”。
仓库已经 public:
MIT license,Python 3.12+,macOS 和 Linux 都能跑,uv sync && python main.py 起步。配好 ANTHROPIC_API_KEY 和 TELEGRAM_BOT_TOKEN 之后 /bind <code> 就能绑定。下一步准备折腾一个更好的 TUI,再给 scheduler 加几个常用的 heartbeat preset。
想折腾的人欢迎提 issue 和 PR。如果你也是”看到现成方案就想拆掉重写一遍”的那种人,那这个仓库应该正好对你胃口。