> ## Documentation Index
> Fetch the complete documentation index at: https://ccb.agent-aura.top/llms.txt
> Use this file to discover all available pages before exploring further.

# 流式响应机制 - Claude Code 打字机效果原理

> 解析 Claude Code 流式响应实现：如何通过 SSE 逐 token 接收 AI 输出，实现实时打字机效果，提升用户等待体验。

## 为什么需要流式

想象 AI 需要 30 秒才能生成完整回答——如果等 30 秒后才一次性显示，用户体验是灾难性的。

流式响应让用户**实时看到 AI 的思考过程**：

* 文字逐字出现，用户能提前判断方向是否正确
* 工具调用的参数在生成过程中就能预览
* 长时间任务不会让用户觉得"卡死了"

## `BetaRawMessageStreamEvent` 核心事件类型

流式 API 返回的是一系列 `BetaRawMessageStreamEvent`，每种事件类型对应流式响应的不同阶段（`src/services/api/claude.ts`）：

```
message_start           ← 消息开始，包含 model、usage 初始值
  ├── content_block_start   ← 内容块开始（text / tool_use / thinking）
  │   ├── content_block_delta  ← 增量数据（text_delta / input_json_delta / thinking_delta）
  │   ├── content_block_delta  ← ... 持续到达
  │   └── content_block_stop   ← 内容块结束，yield AssistantMessage
  ├── content_block_start   ← 下一个内容块...
  │   └── ...
  └── message_delta       ← stop_reason + 最终 usage
message_stop            ← 消息结束
```

### 事件处理状态机

`src/services/api/claude.ts` 中 `queryModelWithStreaming()` 函数的事件处理循环实现了一个基于 `switch(part.type)` 的状态机：

| 事件类型                  | 处理逻辑                                | 状态变更                       |
| --------------------- | ----------------------------------- | -------------------------- |
| `message_start`       | 初始化 `partialMessage`，记录 TTFT（首字节延迟） | `usage` 初始化                |
| `content_block_start` | 按 `part.index` 创建对应类型的内容块           | `contentBlocks[index]` 初始化 |
| `content_block_delta` | 按子类型增量追加数据                          | text / thinking / input 累加 |
| `content_block_stop`  | 构建完整 `AssistantMessage` 并 yield     | 消息推入 `newMessages`         |
| `message_delta`       | 更新 stop\_reason 和最终 usage           | 写回最后一条消息                   |
| `message_stop`        | 无操作（流结束标记）                          | —                          |

### 内容块类型及其增量数据

`content_block_start` 中的 `content_block.type` 决定了如何处理后续 delta：

| 内容块类型             | Delta 类型                             | 累加逻辑                                                       |
| ----------------- | ------------------------------------ | ---------------------------------------------------------- |
| `text`            | `text_delta`                         | `text += delta.text`                                       |
| `thinking`        | `thinking_delta` + `signature_delta` | `thinking += delta.thinking`，`signature = delta.signature` |
| `tool_use`        | `input_json_delta`                   | `input += delta.partial_json`（JSON 字符串增量拼接）                |
| `server_tool_use` | `input_json_delta`                   | 同 tool\_use                                                |
| `connector_text`  | `connector_text_delta`               | 特殊连接器文本（feature flag 控制）                                   |

关键设计：`content_block_start` 时所有文本字段初始化为空字符串，只通过 `content_block_delta` 累加。这是因为 SDK 有时在 start 和 delta 中重复发送相同文本。

## 文本 chunk 和 tool\_use block 的交织

一次 AI 响应可能包含多个内容块，交替出现：

```
content_block_start (text, index=0)     "我来帮你修复这个 bug。"
content_block_delta  (text_delta)       "首先..."
content_block_stop  (index=0)
content_block_start (tool_use, index=1) { name: "Read", input: "..." }
content_block_delta  (input_json_delta) '{"file_p' → 'ath":' → '"src/foo.ts"}'
content_block_stop  (index=1)
content_block_start (text, index=2)     "我已经看到了问题所在..."
content_block_stop  (index=2)
```

每个 `content_block_stop` 触发一次 `yield`，将完整的 AssistantMessage 推送给消费者。这意味着一个 AI 响应会产生**多条** `AssistantMessage`——文本消息和工具调用消息交替产出。

`stop_reason` 要等到 `message_delta` 才确定（可能是 `end_turn`、`tool_use`、`max_tokens` 等），所以最后一条消息的 `stop_reason` 是**回写**的：

```typescript theme={null}
// claude.ts — stop_reason 回写逻辑（直接属性修改，不用对象替换）
// 因为 transcript 写队列持有 message.message 的引用
const lastMsg = newMessages.at(-1)
if (lastMsg) {
  lastMsg.message.usage = usage
  lastMsg.message.stop_reason = stopReason
}
```

## 流式中的错误处理

### 网络断开

流式连接依赖 SSE（Server-Sent Events）。当连接中断时，系统有两层检测机制：

1. **被动停滞检测**（`src/services/api/claude.ts` 中 stall 检测逻辑）：当下一个事件到达时，计算与上一个事件的时间间隔。超过阈值（30 秒，`STALL_THRESHOLD_MS = 30_000`）记录为一次 stall，累积计数并写入遥测日志。这是被动检测——仅在下一个 chunk 到达时才触发，不会主动中断流。
2. **主动空闲超时看门狗**（`src/services/api/claude.ts` 中 `STREAM_IDLE_TIMEOUT_MS` 看门狗逻辑）：使用 `setTimeout` 设置 90 秒（可通过 `CLAUDE_STREAM_IDLE_TIMEOUT_MS` 环境变量覆盖）的硬性超时。如果在此期间没有收到任何事件，主动终止流并抛出错误进入重试流程。
3. **非流式降级**：作为最后手段，设置 `didFallBackToNonStreaming` 标志，通过 `executeNonStreamingRequest()` 回退到非流式请求（一次性获取完整响应）。

```typescript theme={null}
// claude.ts — 被动停滞检测
const STALL_THRESHOLD_MS = 30_000  // 30 秒无事件视为停滞
let totalStallTime = 0
let stallCount = 0

// claude.ts — 主动空闲超时
const STREAM_IDLE_TIMEOUT_MS =
  parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000
```

### API 限流

当 API 返回限流错误时，系统使用 `withRetry` 包装器进行指数退避重试。重试逻辑考虑了：

* 错误类型（429 限流 vs 500 服务器错误）
* 重试次数上限
* 退避间隔

### Token 超限

两种 token 超限场景有不同的处理：

| 场景          | stop\_reason                    | 处理方式                                        |
| ----------- | ------------------------------- | ------------------------------------------- |
| **输出超限**    | `max_tokens`                    | 生成错误消息，建议设置 `CLAUDE_CODE_MAX_OUTPUT_TOKENS` |
| **上下文窗口超限** | `model_context_window_exceeded` | 触发 compaction 压缩对话历史后重试                     |

```typescript theme={null}
// claude.ts — stop_reason 处理
if (stopReason === 'max_tokens') {
  yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
if (stopReason === 'model_context_window_exceeded') {
  // 复用 max_output_tokens 的恢复路径
  yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
```

### 流式停滞检测

系统持续监控事件到达间隔，检测"停滞"（stall）：

```typescript theme={null}
// claude.ts — stall 检测逻辑
const STALL_THRESHOLD_MS = 30_000  // 30 秒无事件视为停滞
if (timeSinceLastEvent > STALL_THRESHOLD_MS) {
  stallCount++
  totalStallTime += timeSinceLastEvent
  logEvent('tengu_streaming_stall', { stall_duration_ms, stall_count, ... })
}
```

这是**被动检测**——仅在下一个 chunk 到达时才触发比较。与之互补的是 90 秒主动空闲超时看门狗（`STREAM_IDLE_TIMEOUT_MS`），会直接中断长时间无响应的流。

## 工具执行的流式反馈

BashTool 的命令执行也是流式的——通过 `onProgress` 回调逐行推送输出：

```
BashTool.call() → runShellCommand() → AsyncGenerator
  ├── 每秒轮询输出文件 → onProgress(lastLines, allLines, ...)
  ├── yield { type: 'progress', output, fullOutput, elapsedTimeSeconds }
  └── return { code, stdout, interrupted, ... }
```

UI 层通过 `useToolCallProgress` hook 实时展示命令输出，而不是等命令完全结束。长时间运行的命令还支持自动后台化（`shouldAutoBackground`）。

## 多 Provider 适配

| Provider                          | 流式协议             | 特殊处理                     |
| --------------------------------- | ---------------- | ------------------------ |
| **firstParty** (Anthropic Direct) | 原生 SSE           | 延迟最低，TTFT 最快             |
| **AWS Bedrock**                   | AWS SDK 流式接口     | 需要额外的 beta header 和认证    |
| **Google Vertex**                 | gRPC → 事件流       | 通过 `getMergedBetas()` 适配 |
| **foundry**                       | Anthropic 兼容 API | 内部部署                     |
| **openai**                        | OpenAI 流式适配器     | 转换为 Anthropic 内部格式       |
| **gemini**                        | Gemini 流式适配器     | 转换为 Anthropic 内部格式       |
| **grok** (xAI)                    | Grok 流式适配器       | 转换为 Anthropic 内部格式       |

所有 Provider 通过统一的 `Stream<BetaRawMessageStreamEvent>` 抽象层屏蔽差异。上层代码（QueryEngine、REPL）不需要关心底层用的是哪个 Provider。

### Provider 选择

`src/utils/model/providers.ts` 中的 `getAPIProvider()` 根据配置决定使用哪个 Provider：

```typescript theme={null}
// 根据 api_provider 配置选择：
// "anthropic" → 直连
// "bedrock"   → AWS SDK
// "vertex"    → Google SDK
// 第三方 base URL → 自动检测
```

每个 Provider 需要适配的细节包括：认证方式、beta header、请求参数格式、错误码映射——但这些差异在 `claude.ts` 的 `queryStream()` 函数中被统一处理。
