claude-mem 源码分析:Worker 服务与 AI 处理管道

02 · Worker 服务与 AI 处理管道

长驻后台 HTTP Worker 协调 AI 压缩管道(Claude/Gemini/OpenRouter 多提供商),将工具调用转为结构化 observation 并持久化到 SQLite,PID 单例保障进程唯一性。

概览

WorkerService 是 claude-mem 的"大脑"——一个长驻后台的 HTTP 服务,负责:

  1. 接收所有生命周期 hook 发来的请求(session-init / observation / summarize)
  2. 调度 AI 提供商(Claude SDK / Gemini / OpenRouter)压缩工具调用为结构化 observation
  3. 将结果持久化到 SQLite 并通过 SSE 实时广播给 Viewer UI

源文件:src/services/worker-service.ts(993 行,是整个系统的入口)


1. WorkerService 作为 HTTP 服务器

1.1 启动链

1
2
3
4
5
6
7
8
bun-runner.js  →  worker-service.cjs  start 子命令
   └─ ensureWorkerStarted(port)          // worker-spawner.ts
        ├─ 检查端口是否已被占用           // isPortInUse()
        ├─ 若未运行 → spawnDaemon()       // ProcessManager.ts:408
        │     ├─ Linux: setsid + bun --daemon
        │     └─ Windows: PowerShell Start-Process Hidden
        └─ waitForHealth(port, timeout)  // 轮询 /api/health 直到就绪

💡 Tip 为什么用 setsidsetsid 创建新 session 组,使守护进程脱离父进程的控制终端(TTY)。这样即使 hook 进程退出,worker 也继续运行。Windows 用 Start-Process -WindowStyle Hidden 实现同等效果。

1.2 --daemon 模式的单例保证

worker-service.ts:936-951 — 守护进程入口(--daemon 或默认 case):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 双重保险:PID 文件 + 端口检测
const existingPidInfo = readPidFile();
if (verifyPidFileOwnership(existingPidInfo)) {
  logger.info('SYSTEM', 'Worker already running (PID alive), refusing to start duplicate');
  process.exit(0);          // 已有存活进程,直接退出
}

if (await isPortInUse(port)) {
  logger.info('SYSTEM', 'Port already in use, refusing to start duplicate', { port });
  process.exit(0);           // 端口被占,安静退出
}

verifyPidFileOwnership 会比对 PID 文件中的 startToken(进程启动时间戳),防止 PID 复用导致误判。

1.3 WorkerService 构造函数:依赖注入图

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// worker-service.ts:107-203
constructor() {
  this.dbManager    = new DatabaseManager();
  this.sessionManager = new SessionManager(this.dbManager);
  this.sseBroadcaster = new SSEBroadcaster();
  this.sdkAgent     = new ClaudeProvider(this.dbManager, this.sessionManager);
  this.geminiAgent  = new GeminiProvider(this.dbManager, this.sessionManager);
  this.openRouterAgent = new OpenRouterProvider(this.dbManager, this.sessionManager);
  this.completionHandler = new SessionCompletionHandler(...);
  // ...
  this.server = new Server({ getInitializationComplete, getAiStatus, ... });
  this.registerRoutes();
}

💡 Tip 两阶段初始化 start() 先让 HTTP server 开始监听,异步跑 initializeBackground()(初始化 DB、搜索索引等)。在 DB 就绪前,/api/* 端点返回 503,只有 /health/readiness/version 提前可用。这样 hook 可以快速收到 “worker ready” 响应,不会因为 DB 迁移而超时。

1.4 HTTP 端点全景

路由模块 路径前缀 核心功能
SessionRoutes /api/sessions/ init / observations / summarize / status
DataRoutes /api/data/ 查询历史 observations & summaries,SSE
SearchRoutes /api/search/ 语义搜索(Chroma + SQLite 混合)
ViewerRoutes / + /api/stream 前端静态资源 + SSE 实时推送
SettingsRoutes /api/settings/ 读写 settings.json
MemoryRoutes /api/memory/ 直接写入 memory observation
CorpusRoutes /api/corpus/ 知识语料库构建与查询
ChromaRoutes /api/chroma/ 向量索引管理
LogsRoutes /api/logs/ 日志查看
Server 内置 /api/context/inject 注入 context 到 session(UserPromptSubmit hook)
Server 内置 /api/instructions 返回 SKILL.md 给 MCP 客户端
Server 内置 /api/health, /api/readiness 存活检测

2. AI 压缩管道:从 PostToolUse 到 observation 入库

2.1 整体数据流

2.2 Prompt 结构:buildObservationPrompt

src/sdk/prompts.ts:81-113

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
return `<observed_from_primary_session>
  <what_happened>${obs.tool_name}</what_happened>
  <occurred_at>${new Date(obs.created_at_epoch).toISOString()}</occurred_at>
  <working_directory>${obs.cwd}</working_directory>
  <parameters>${JSON.stringify(toolInput, null, 2)}</parameters>
  <outcome>${JSON.stringify(toolOutput, null, 2)}</outcome>
</observed_from_primary_session>

Return either one or more <observation>...</observation> blocks, or an empty response if
this tool use should be skipped.
Concrete debugging findings ... count as durable discoveries and should be recorded.
Never reply with prose such as "Skipping" ... Non-XML text is discarded.`;

关键设计决策

  • 用 XML 标签而非自然语言描述:让 AI 响应也用 XML 格式,方便精确解析
  • 明确说明"空响应优于解释性文字":避免 AI 输出 prose 导致解析失败
  • tool_name 作为 what_happened:AI 能理解这是一次工具调用事件

2.3 AI 会话的消息流(createMessageGenerator)

ClaudeProvider.ts:351-438 — 一个 async generator,向 SDK 持续 yield 消息:

1
2
3
第1条消息: buildInitPrompt()        → 告知 AI 身份(Observer 角色)+ 用户原始请求
后续消息: buildObservationPrompt()  → 逐条发送工具调用
Stop hook: buildSummaryPrompt()     → 触发会话摘要

💡 Tip 为什么用 async generator 而不是一次性发所有消息? 工具调用是流式到来的——用户在和 Claude 主会话交互时,hook 实时发来每一次工具调用。generator 可以"暂停等待"新的 pending message,等消息到来再 yield 给 SDK。这样实现了流式处理而不需要等主会话结束才批量处理。

2.4 SDK 会话恢复机制(Resume)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// ClaudeProvider.ts:140-195
const shouldResume = hasRealMemorySessionId && session.lastPromptNumber > 1 && !session.forceInit;

const queryResult = query({
  prompt: messageGenerator,
  options: {
    model: modelId,
    ...(shouldResume && session.memorySessionId ? { resume: session.memorySessionId } : {}),
    // ...
  }
});

memorySessionId 是 Claude Agent SDK 分配的内部会话 ID,通过 message.session_id 捕获并写入 SQLite。这样 Worker 重启后能恢复 AI 的对话上下文,而不是每次都从头 init。

⚠️ Warning Worker 重启的对齐问题 Worker 重启后 lastPromptNumber 可能 > 1(DB 里有记录),但 SDK 进程已死,memorySessionId 对应的会话上下文不存在。所以 promptNumber === 1 时强制走 fresh start,即使 DB 里有 memorySessionId:

1
2
3
4
5
6
// ClaudeProvider.ts:170-176
if (session.lastPromptNumber > 1) {
  // 尝试 resume
} else {
  // 第一个 prompt 永远 fresh start(防止 stale resume)
}

3. Stop Hook:SessionCompletionHandler 与会话摘要

3.1 触发时序

1
2
3
4
5
6
Claude 主会话结束
  → Stop hook (bun-runner.js)
  → POST /api/sessions/summarize
  → SessionRoutes.handleSummarizeByClaudeId()
  → sessionManager.queueSummarize(sessionDbId, lastAssistantMessage)
  → ensureGeneratorRunning(sessionDbId, 'summarize')

3.2 摘要 Prompt:buildSummaryPrompt

src/sdk/prompts.ts:115-147

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
--- MODE SWITCH: PROGRESS SUMMARY ---
⚠️ CRITICAL TAG REQUIREMENT:
• 你 MUST 用 <summary>...</summary> 包裹整个响应
• 禁止用 <observation> 标签,会被丢弃

<summary>
  <request>...</request>           ← 用户做了什么
  <investigated>...</investigated> ← 探索了什么
  <learned>...</learned>           ← 学到了什么
  <completed>...</completed>       ← 完成了什么
  <next_steps>...</next_steps>     ← 下一步
  <notes>...</notes>               ← 备注
</summary>

摘要 vs Observation 的本质区别

维度 Observation Summary
触发时机 PostToolUse(每次工具调用后) Stop hook(会话结束时)
粒度 单次工具调用的压缩 整个会话的高层总结
XML 标签 <observation> <summary>
字段 type/title/facts/narrative/concepts/files request/investigated/learned/completed/next_steps
用途 细粒度历史检索 快速理解一次会话做了什么

3.3 SessionCompletionHandler

src/services/worker/session/SessionCompletionHandler.ts

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
finalizeSession(sessionDbId: number): void {
  sessionStore.markSessionCompleted(sessionDbId);         // 写 DB
  pendingStore.clearPendingForSession(sessionDbId);       // 清空队列中的遗留消息
  this.eventBroadcaster.broadcastSessionCompleted(sessionDbId); // SSE 通知 UI
}

async completeByDbId(sessionDbId: number): Promise<void> {
  this.finalizeSession(sessionDbId);
  await this.sessionManager.deleteSession(sessionDbId);   // 从内存 Map 移除
}

⚠️ Warning 为什么要 clearPendingForSession? Stop hook 和最后一次 PostToolUse hook 可能竞争到达 Worker。如果 summarize 先处理完,队列里还残留着未处理的 observation messages,会触发 generator 重启陷入循环。finalizeSession 强制清理它们,防止僵尸 generator。


4. Worker 单例机制详解

4.1 PID 文件

src/services/infrastructure/ProcessManager.ts:134-168

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 写入(启动时调用)
export function writePidFile(info: PidInfo): void {
  const resolvedToken = captureProcessStartToken(info.pid); // 用 ps/tasklist 获取进程启动时间
  const payload = { ...info, startToken: resolvedToken };
  writeFileSync(PID_FILE, JSON.stringify(payload, null, 2));
}
// PID_FILE = ~/.claude-mem/worker.pid(由 CLAUDE_MEM_DATA_DIR 驱动)

// 验证(防 PID 复用)
export function verifyPidFileOwnership(pidInfo): boolean {
  // 对比进程实际启动时间与 startToken,两者一致才认为是"我们的"进程
}

4.2 Supervisor 单例

src/supervisor/index.ts:141 — 模块级单例:

1
const supervisorSingleton = new Supervisor(getProcessRegistry());

Supervisor 负责:

  • start() 时检查 PID 文件(validateWorkerPidFile)防止双启
  • 注册 SIGTERM / SIGINT 信号处理器
  • --daemon 模式下忽略 SIGHUP(防止终端关闭导致退出)
  • 维护 ProcessRegistry(记录所有受管子进程,如 SDK 进程、MCP server)

4.3 端口计算

1
2
默认端口 = 37700 + (uid % 100)
可覆盖:CLAUDE_MEM_WORKER_PORT 环境变量

💡 Tip 为什么用 uid 取模? 同一台机器多个 OS 用户(如 CI 服务多租户)自动获得不同端口,无需手动配置。同一 UID 的多 profile 场景再用 CLAUDE_MEM_WORKER_PORT 手动区分。


5. 多 AI 提供商抽象

5.1 提供商选择逻辑

worker-service.ts:481-488SessionRoutes.ts:41-58

1
2
3
4
5
private getActiveAgent(): ClaudeProvider | GeminiProvider | OpenRouterProvider {
  if (isOpenRouterSelected() && isOpenRouterAvailable()) return this.openRouterAgent;
  if (isGeminiSelected()     && isGeminiAvailable())     return this.geminiAgent;
  return this.sdkAgent;  // 默认 Claude
}

三个提供商都实现相同接口:startSession(session, worker): Promise<void>,内部处理各自的 API 调用差异。

5.2 错误分类体系(ClassifiedProviderError)

src/services/worker/provider-errors.ts

1
2
3
4
5
6
7
type ProviderErrorClass =
  | 'transient'        // 网络抖动,可重试
  | 'unrecoverable'    // 上下文溢出、spawn 失败,不重试
  | 'rate_limit'       // 429,可重试(含 Retry-After)
  | 'quota_exhausted'  // 配额耗尽,停止处理
  | 'auth_invalid'     // API key 无效,停止处理
  | (string & {});     // 开放联合类型,提供商可扩展

每个提供商有自己的 classifier

提供商 函数 特殊判断
Claude classifyClaudeError SDK OverloadedError、ENOENT spawn 失败
Gemini classifyGeminiError 500 + body 含 “quota exceeded”(Gemini quirk)
OpenRouter classifyOpenRouterError OpenRouter 特有的错误码

worker-service.ts:550-586 中的分发逻辑:

1
2
3
4
5
6
7
8
9
const classified = isClassified(error)
  ? error                                    // 已在提供商边界分类
  : this.reclassifyAtDispatch(error, agent); // 安全网:兜底分类

if (dispatchKind === 'unrecoverable' || dispatchKind === 'auth_invalid' || dispatchKind === 'quota_exhausted') {
  hadUnrecoverableError = true;
  return;  // 停止重试
}
// 其余 transient / rate_limit → 会触发 generator 重启

💡 Tip 为什么不用原始 HTTP 状态码判断? 不同提供商的相同语义用不同状态码表达(Gemini 的 quota 可能是 500)。用 ClassifiedProviderError.kind 这层抽象,上层分发逻辑无需关心具体状态码,也方便未来添加新提供商。

5.3 重试机制(retry.ts)

src/services/worker/retry.ts:指数退避 + jitter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// POST 接口非幂等,最多重试 2 次
const DEFAULT_OPTIONS = {
  maxRetries: 2,
  perAttemptTimeoutMs: 30_000,
  baseDelayMs: 100,
  maxDelayMs: 30_000
};

// 退避公式:100ms * 2^attempt + random(0~50ms),上限 30s
computeBackoffMs(attempt) = min(100 * 2^attempt + jitter, 30000)

GeminiProvider / OpenRouterProvider 用 withRetry() 包装 fetch 调用;ClaudeProvider 走 Agent SDK,有自己的重试逻辑。


6. 关键设计亮点小结

6.1 响应先行,处理异步

1
2
3
HTTP 请求到来 → 同步写入 pending_messages → 立即返回 {"status": "queued"}
                             异步:generator 消费队列,调用 AI,写结果

Hook 的执行时间有 timeout 限制,如果 AI 处理是同步阻塞的,hook 就会超时失败。这种"入队立即返回"的设计让 hook 响应时间 < 100ms,AI 处理可以花几秒。

6.2 三层 Session ID 分离

  • contentSessionId:Claude 主会话 ID(来自 Claude Code,所有 hook 用这个对齐)
  • memorySessionId:Agent SDK 内部会话 ID(用于 AI 对话 resume)
  • sessionDbId:SQLite 自增主键(本地数据库用)

三个 ID 各司其职,前者关联 hook 请求,中者管理 AI 上下文延续,后者是数据库主键。

6.3 content_hash 去重

storeObservations 对 observation 内容做 hash,相同内容只存一次。这解决了 processAgentResponse 注释中提到的"同一 parsed observation 被映射到同一 DB 行"导致 Chroma 重复同步的问题(issue #2240)。

6.4 Fallback 提供商链

当 ClaudeProvider 因 SDK 进程终止而失败时(isSessionTerminatedError),runFallbackForTerminatedSession 会自动尝试 Gemini → OpenRouter,保证 observation 不丢失(worker-service.ts:665-710)。


7. 流程图:完整 observation 压缩链路


阅读提示:重点代码路径 worker-service.ts → SessionRoutes → ClaudeProvider.createMessageGenerator → sdk/prompts.ts → ResponseProcessor → SessionStore,这条链路覆盖了 80% 的核心逻辑。