Fullstack Dev、iOS Reverse Engineering、小红书逆向培训

 
深入 Open Agent SDK(一):Agent Loop 内核——从 prompt 到多轮对话的完整运转机制

本文是「深入 Open Agent SDK (Swift)」系列第一篇。系列目录见这里

大多数 LLM 封装库做的事情是:发请求、拿响应、结束。但一个真正的 Agent 不止于此——它要能自己判断需不需要调工具、执行完工具后把结果喂回 LLM、循环往复直到拿到最终答案。这个循环就是 Agent Loop

这篇文章分析 Open Agent SDK (Swift) 的 Agent Loop 实现,看它怎样用原生 Swift 并发在进程内跑完一整套循环。

Agent Loop 是什么?

用一句话概括:用户发 prompt → LLM 返回响应 → 如果 LLM 要求调工具就执行 → 把工具结果喂回 LLM → 重复,直到 LLM 说"我说完了"

画成流程图:

flowchart TD
    A["用户 prompt"] --> B["构建 messages + tools"]
    B --> C["调用 LLM API"]
    C -->|end_turn / stop_sequence| D["返回结果"]
    C -->|max_tokens| C2["追加'请继续'"]
    C2 --> C
    C -->|tool_use| E["提取 tool_use blocks"]
    E --> F["按只读/变更分桶"]
    F --> G["只读工具并发执行"]
    F --> H["变更工具串行执行"]
    G --> I["微压缩大结果"]
    H --> I
    I --> J["tool_result 加入 messages"]
    J --> C

这个循环里有几个关键决策点:

  1. 什么时候停? LLM 返回 end_turnstop_sequence 时正常结束;到达 maxTurns 上限时强制停止;超出预算 (maxBudgetUsd) 时中断;用户主动取消时也中断。
  2. 工具怎么执行? 只读工具并发跑(最多 10 个),变更工具串行跑——避免并发写文件。
  3. 上下文太长怎么办? 自动压缩——用一个 LLM 调用把历史摘要,腾出空间继续。
  4. 中途出错怎么办? 内置重试、回退模型、错误隔离(工具报错不会炸掉整个循环)。

两条入口:prompt() 和 stream()

SDK 提供两种方式触发 Agent Loop:

阻塞式 prompt()

let agent = createAgent(options: AgentOptions(
    apiKey: "sk-...",
    model: "claude-sonnet-4-6",
    maxTurns: 10
))

let result = await agent.prompt("Read Package.swift and summarize it.")
print(result.text)
print("Turns: \(result.numTurns), Cost: $\(String(format: "%.4f", result.totalCostUsd))")

prompt() 是"发出去等结果"模式。一次调用跑完所有轮次,返回最终的 QueryResult。适合不需要实时看到中间过程的场景——比如后台任务、CLI 工具。

流式 stream()

for await message in agent.stream("Explain this codebase.") {
    switch message {
    case .partialMessage(let data):
        print(data.text, terminator: "")  // 实时输出文本
    case .toolUse(let data):
        print("[Using tool: \(data.toolName)]")
    case .toolResult(let data):
        print("[Tool done, \(data.content.count) chars]")
    case .result(let data):
        print("\nDone: \(data.numTurns) turns, $\(String(format: "%.4f", data.totalCostUsd))")
    default:
        break
    }
}

stream() 返回 AsyncStream<SDKMessage>,在 LLM 处理过程中持续推送事件。SDK 定义了 17 种消息类型,从 partialMessage(文本片段)到 toolUse(工具调用)到 result(最终结果),覆盖了 Agent Loop 的每个阶段。

选择哪种取决于你的 UI 需求:要实时展示就用 stream(),不需要就用 prompt()

循环体内部:一个 turn 做了什么

不管走哪条入口,每个 turn 的核心逻辑是相同的。让我们跟一遍代码。

1. 检查是否需要压缩

if shouldAutoCompact(messages: messages, model: model, state: compactState) {
    let (newMessages, _, newState) = await compactConversation(
        client: client, model: model,
        messages: messages, state: compactState,
        fileCache: fileCache,
        sessionMemory: sessionMemory
    )
    messages = newMessages
    compactState = newState
}

每个 turn 开始前先检查:消息历史估计的 token 数是不是快要撑爆上下文窗口了。如果是,用一个 LLM 调用把历史压缩成摘要,替换掉原始消息。

压缩的阈值是 模型上下文窗口 - 10000 tokens(缓冲区)。连续压缩失败 3 次后会停止尝试,避免浪费 token。

2. 发 LLM 请求(带重试和回退)

response = try await withRetry({
    try await client.sendMessage(
        model: model, messages: messages,
        maxTokens: maxTokens, system: buildSystemPrompt(),
        tools: apiTools, ...
    )
}, retryConfig: retryConfig)

所有 LLM 请求都经过 withRetry 包装,按配置的重试策略处理临时错误(网络超时、429 限流等)。

如果主模型彻底失败,还配置了 fallbackModel,SDK 会用备用模型再试一次:

if let fallbackModel = self.options.fallbackModel, fallbackModel != self.model {
    // 用 fallbackModel 重试...
}

3. 处理 stop_reason

LLM 响应里的 stop_reason 决定了循环的走向:

stop_reason 含义 循环行为
end_turn LLM 说完了 正常退出循环
stop_sequence 碰到停止符 正常退出循环
tool_use LLM 想调工具 执行工具,继续循环
max_tokens 输出被截断 追加"请继续",继续循环

max_tokens 的情况有个保护:最多自动续接 3 次,防止无限循环。

4. 工具执行:分桶并发

当 LLM 返回 tool_use 时,SDK 不是简单地把工具排着队一个个跑,而是做了分桶:

// ToolExecutor.partitionTools()
for block in blocks {
    let tool = tools.first { $0.name == block.name }
    if let tool = tool, tool.isReadOnly {
        readOnly.append(item)   // 只读桶
    } else {
        mutations.append(item)  // 变更桶
    }
}

只读工具(Read、Glob、Grep、WebSearch 等)可以安全并发,用 TaskGroup 跑,最多 10 个一批:

let batchResults = await withTaskGroup(of: ToolResult.self) { group in
    for item in batchSlice {
        group.addTask {
            await executeSingleTool(block: item.block, tool: item.tool, context: ...)
        }
    }
    // 收集结果
}

变更工具(Write、Edit、Bash 等)必须串行执行,一个跑完再跑下一个,避免并发写冲突:

for item in items {
    let result = await executeSingleTool(...)
    results.append(result)
}

执行顺序:先跑所有只读工具(并发),再跑所有变更工具(串行)。这在 LLM 一次返回多个工具调用时能显著提升性能——比如 LLM 同时要求读 5 个文件,5 个读操作并行完成。

5. 微压缩

工具执行完后,结果在喂回 LLM 之前还要过一道微压缩:

for result in toolResults {
    let processedContent = await processToolResult(result.content, isError: result.isError)
    processedResults.append(ToolResult(
        toolUseId: result.toolUseId,
        content: processedContent,
        isError: result.isError
    ))
}

如果一个工具返回的内容超过 50000 字符(比如读了一个大文件),SDK 会用一次额外的 LLM 调用把内容压缩。错误结果不压缩——保留了完整的错误信息供 LLM 诊断。

成本追踪:逐 turn 累加

每一轮 LLM 调用后,SDK 都会更新 token 用量和费用:

let turnCost = estimateCost(model: model, usage: turnUsage)
totalCostUsd += turnCost
costByModel[model] = CostBreakdownEntry(
    model: model,
    inputTokens: turnUsage.inputTokens,
    outputTokens: turnUsage.outputTokens,
    costUsd: turnCost
)

costByModel 按 model 分组记录。这意味着如果你中途切换了模型(通过 switchModel()),每个模型的费用是分开计算的。最终 result.costBreakdown 能告诉你每个模型花了多少钱。

预算检查在每个 turn 后执行:

if let budget = options.maxBudgetUsd, totalCostUsd > budget {
    status = .errorMaxBudgetUsd
    break
}

超出预算时立即退出循环,但已产生的文本会保留在结果里——你拿到的是部分结果,不是空白的。

取消:协作式取消

Swift 的结构化并发用 Task.isCancelled 做协作式取消。SDK 在循环的多个检查点都检查了这个标志:

  1. while 循环入口
  2. 只读工具和变更工具之间
  3. SSE 事件循环内部
  4. 工具执行前后
// 循环入口
if Task.isCancelled || _interrupted {
    status = .cancelled
    break
}

// 只读/变更之间
if Task.isCancelled { return results }

stream() 还额外支持通过 interrupt() 方法取消——内部就是 cancel 掉持有 stream 的 Task。

取消后返回的是 QueryResult(isCancelled: true),附带截止到取消时刻的部分文本和 token 用量。

错误处理:不炸、不丢

SDK 的错误处理原则是:工具执行错误不传播,API 错误有重试,最终失败保留部分结果

工具执行时,任何错误都被捕获为 ToolResult(isError: true)

static func executeSingleTool(...) async -> ToolResult {
    guard let tool = tool else {
        return ToolResult(toolUseId: block.id, content: "Error: Unknown tool", isError: true)
    }
    // ... try executing
    let result = await tool.call(input: block.input, context: context)
    return ToolResult(toolUseId: block.id, content: result.content, isError: result.isError)
}

工具报错的结果照样喂回 LLM,LLM 看到错误信息后可以决定换个策略。Agent Loop 不会因为一个工具挂了就崩溃。

API 层面的错误(网络问题、500 等)会触发重试;重试失败后触发 fallback 模型;全挂了才返回 errorDuringExecution 状态。

Hook 集成:循环的生命周期

Agent Loop 在关键节点触发 Hook 事件:

Hook 事件 触发时机
sessionStart 循环开始前
preToolUse 每个工具执行前
postToolUse 工具成功执行后
postToolUseFailure 工具执行失败后
stop 循环结束时(正常或异常)
sessionEnd 返回结果前

Hook 的一个典型用法是在 preToolUse 拦截危险操作:

await hookRegistry.register(.preToolUse, definition: HookDefinition(
    matcher: "Bash",
    handler: { input in
        return HookOutput(message: "Bash blocked in production", block: true)
    }
))

被 Hook 拦截的工具不会执行,而是返回一个错误结果——LLM 会看到"Bash blocked in production",可以换个方式完成任务。

还有一个入口:streamInput()

除了 prompt()stream(),SDK 还提供了第三种入口——streamInput(),接受一个 AsyncStream<String> 作为输入:

let input = AsyncStream<String> { continuation in
    continuation.yield("What's in this project?")
    continuation.yield("Now explain the test structure.")
    continuation.finish()
}

for await message in agent.streamInput(input) {
    // 处理每条输入对应的响应
}

每个输入元素被视为一条新的用户消息,触发一个完整的 prompt 周期。这适合聊天式交互:用户的每条消息都是输入流的一个元素,Agent 逐条处理并流式输出。

小结

Agent Loop 是整个 SDK 的心脏。理解了它的工作方式,剩下的功能都是在它的基础上叠加的:

  • 工具系统 — Loop 里的"执行工具"环节
  • MCP 集成 — Loop 启动时连接外部工具服务器
  • 会话持久化 — Loop 结束后保存 messages 数组
  • 权限控制 — 工具执行前的拦截点
  • Hook 系统 — Loop 生命周期的事件回调

下一篇我们深入 工具系统:34 个内置工具怎么组织、ToolProtocol 协议的设计思路、以及怎么用 defineTool 创建自定义工具。


系列文章

GitHubterryso/open-agent-sdk-swift

 
深入 Open Agent SDK(六):多 LLM 提供商与运行时控制

本文是「深入 Open Agent SDK (Swift)」系列第六篇(完结篇)。系列目录见这里

一个 Agent 不应该绑定单一 LLM 提供商。不同任务适合不同模型——简单问题用便宜模型,复杂推理用贵模型,有些场景甚至需要本地模型。而且运行时的需求也在变化:用户可能中途要求更深度的思考,可能发现预算快用完了需要降级,可能想切换到本地模型省点钱。

Open Agent SDK 的做法是:定义一个统一的 LLMClient 协议,Anthropic 和 OpenAI 兼容提供商各有一个实现,Agent 内部全部用 Anthropic 格式处理。切换提供商只需要改一个配置参数,运行时还能动态切模型、调思考深度、控预算。

这篇文章分析 SDK 的多提供商适配机制和运行时控制能力。

一、LLMClient 协议——统一接口

先看协议定义:

public protocol LLMClient: Sendable {
    nonisolated func sendMessage(
        model: String,
        messages: [[String: Any]],
        maxTokens: Int,
        system: String?,
        tools: [[String: Any]]?,
        toolChoice: [String: Any]?,
        thinking: [String: Any]?,
        temperature: Double?
    ) async throws -> [String: Any]

    nonisolated func streamMessage(
        model: String,
        messages: [[String: Any]],
        maxTokens: Int,
        system: String?,
        tools: [[String: Any]]?,
        toolChoice: [String: Any]?,
        thinking: [String: Any]?,
        temperature: Double?
    ) async throws -> AsyncThrowingStream<SSEEvent, Error>
}

两个核心方法,一个阻塞一个流式。参数列表覆盖了主流 LLM API 的全部能力:模型选择、消息历史、token 上限、系统提示、工具定义、工具选择策略、思考配置、温度。

关键决策:返回值统一用 Anthropic 格式的字典。不管是 Anthropic 原生 API 还是 OpenAI 兼容 API,最终 Agent 内部拿到的都是同一种结构——content 数组里是 {"type": "text", "text": "..."}{"type": "tool_use", "name": "...", "input": {...}}stop_reasonend_turn / tool_use / max_tokens。这样 Agent Loop 的处理逻辑不需要关心底层是哪家 API。

流式返回用 AsyncThrowingStream<SSEEvent, Error>SSEEvent 是枚举:

public enum SSEEvent: @unchecked Sendable {
    case messageStart(message: [String: Any])
    case contentBlockStart(index: Int, contentBlock: [String: Any])
    case contentBlockDelta(index: Int, delta: [String: Any])
    case contentBlockStop(index: Int)
    case messageDelta(delta: [String: Any], usage: [String: Any])
    case messageStop
    case ping
    case error(data: [String: Any])
}

7 种事件类型,覆盖了 Anthropic Messages API 流式响应的全部事件。OpenAI 兼容层的流式输出会被转换成同样的 SSEEvent 序列。

二、AnthropicClient——原生 Claude API

AnthropicClientLLMClient 的 Anthropic 原生实现,用 actor 保证并发安全:

public actor AnthropicClient: LLMClient {
    private let apiKey: String
    private let baseURL: URL      // 默认 https://api.anthropic.com
    private let urlSession: URLSession

    public init(apiKey: String, baseURL: String? = nil, urlSession: URLSession? = nil) {
        self.apiKey = apiKey
        self.baseURL = URL(string: baseURL ?? "https://api.anthropic.com")!
        self.urlSession = urlSession ?? URLSession.shared
    }
}

请求就是 POST 到 /v1/messages,header 里放 x-api-keyanthropic-version

private nonisolated func buildRequest(body: [String: Any]) throws -> URLRequest {
    var request = URLRequest(url: URL(string: baseURL.absoluteString + "/v1/messages")!)
    request.httpMethod = "POST"
    request.timeoutInterval = 300
    request.setValue(apiKey, forHTTPHeaderField: "x-api-key")
    request.setValue("2023-06-01", forHTTPHeaderField: "anthropic-version")
    request.setValue("application/json", forHTTPHeaderField: "content-type")
    request.httpBody = try JSONSerialization.data(withJSONObject: body, options: [])
    return request
}

因为用的是 Anthropic 原生 API,所以 sendMessage 的请求体和响应体不需要格式转换——请求参数直接拼成字典发出去,响应直接解析成字典返回。流式模式也是直接解析 Anthropic 的 SSE 文本。

安全方面有个细节:所有错误信息都会把 API Key 替换成 ***,防止 key 泄露到日志里:

let safeMessage = errorMessage.replacingOccurrences(of: apiKey, with: "***")

AnthropicClient 直接支持 Extended Thinking。Agent 在配置了 ThinkingConfig 时,会把 thinking 参数传进来:

if let thinking {
    body["thinking"] = thinking
}

这个参数在 Anthropic API 里控制 Claude 是否进行深度思考以及思考的 token 预算。

三、OpenAI 兼容层——适配 GLM/Ollama/OpenRouter 等

OpenAIClient 是重头戏。它要做的事情是:接受 Anthropic 格式的参数,转换成 OpenAI Chat Completion API 格式发出去,再把 OpenAI 格式的响应转换回 Anthropic 格式。Agent 内部完全不知道底层是 OpenAI 兼容 API。

public actor OpenAIClient: LLMClient {
    private let apiKey: String
    private let baseURL: URL      // 默认 https://api.openai.com/v1

    public init(apiKey: String, baseURL: String? = nil, urlSession: URLSession? = nil) {
        self.apiKey = apiKey
        self.baseURL = URL(string: baseURL ?? "https://api.openai.com/v1")!
        self.urlSession = urlSession ?? URLSession.shared
    }
}

请求发到 /chat/completions,用 Bearer token 认证——这是 OpenAI 兼容 API 的标准做法。只要提供商支持 /v1/chat/completions 端点,就能用这个 Client 连接。

消息格式转换

Anthropic 和 OpenAI 的消息格式有几个关键差异,转换时都要处理:

1. System 消息的位置

Anthropic 把 system prompt 作为顶层参数传,OpenAI 把它作为第一条 role: "system" 消息:

if let system {
    result.append(["role": "system", "content": system])
}

2. Tool Result 的表示方式

Anthropic 把多个 tool_result 打包在一个 role: "user" 消息的 content 数组里,OpenAI 要求每个 tool result 是一条独立的 role: "tool" 消息:

let toolResults = blocks.filter { $0["type"] as? String == "tool_result" }
if !toolResults.isEmpty {
    return toolResults.map { block in
        [
            "role": "tool",
            "tool_call_id": block["tool_use_id"] as? String ?? "",
            "content": block["content"] ?? "",
        ]
    }
}

3. Tool Use 的表示方式

Anthropic 在 content 数组里用 type: "tool_use" 块,OpenAI 用 tool_calls 数组放在 message 顶层:

result["tool_calls"] = toolUseBlocks.enumerated().map { index, block in
    let inputDict = block["input"] as? [String: Any] ?? [:]
    let arguments = (try? JSONSerialization.data(withJSONObject: inputDict, options: []))
        .flatMap { String(data: $0, encoding: .utf8) } ?? "{}"
    return [
        "id": block["id"] as? String ?? "call_\(index)",
        "type": "function",
        "function": [
            "name": block["name"] as? String ?? "",
            "arguments": arguments,  // OpenAI 要求 JSON 字符串,不是字典
        ],
    ]
}

注意 OpenAI 的 arguments 必须是 JSON 字符串而不是字典对象,这里做了序列化。

响应格式转换

OpenAI 的响应结构(choices[0].message)要转成 Anthropic 格式:

// stop_reason 映射
private static func mapStopReason(_ finishReason: String) -> String {
    switch finishReason {
    case "stop": return "end_turn"
    case "tool_calls": return "tool_use"
    case "length": return "max_tokens"
    default: return finishReason
    }
}

// usage 映射
usage = [
    "input_tokens": openAIUsage["prompt_tokens"] as? Int ?? 0,
    "output_tokens": openAIUsage["completion_tokens"] as? Int ?? 0,
]

流式转换

流式的转换更复杂。OpenAI 的流式格式(data: {"choices":[{"delta":{...}}]})要逐块转成 Anthropic 的 SSEEvent 序列:

  • 第一个 chunk → messageStart
  • 文本 delta → contentBlockDelta(type: "text_delta")
  • tool call 开始 → contentBlockStart(type: "tool_use"),参数 delta → contentBlockDelta(type: "input_json_delta")
  • 结束 → contentBlockStop + messageDelta + messageStop

转换函数要跟踪当前有多少个 content block、文本块是否关闭、哪些 tool call 块还在打开状态,才能正确生成 index。代码里还加了一个安全检查——确保 messageStop 一定会被发出,即使原始流没有正常结束。

使用示例

连接不同的 OpenAI 兼容提供商只需要改 baseURLmodel

// DeepSeek
let agent = createAgent(options: AgentOptions(
    apiKey: "sk-...",
    model: "deepseek-chat",
    baseURL: "https://api.deepseek.com/v1",
    provider: .openai
))

// Ollama 本地
let localAgent = createAgent(options: AgentOptions(
    apiKey: "ollama",           // Ollama 不需要 key,随便填
    model: "qwen3:8b",
    baseURL: "http://localhost:11434/v1",
    provider: .openai
))

// GLM
let glmAgent = createAgent(options: AgentOptions(
    apiKey: "xxx.glm-xxx",
    model: "glm-4-plus",
    baseURL: "https://open.bigmodel.cn/api/paas/v4",
    provider: .openai
))

四、运行时模型切换

SDK 支持在运行时动态切换模型,不需要重新创建 Agent:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    fallbackModel: "claude-haiku-4-5"  // 主模型挂了用这个
))

// 先用 sonnet 跑一个简单问题
let result1 = await agent.prompt("What is 2 + 3?")
print(result1.costBreakdown)
// [CostBreakdownEntry(model: "claude-sonnet-4-6", inputTokens: 45, outputTokens: 3, costUsd: 0.000180)]

// 切换到 opus 跑推理密集型问题
try agent.switchModel("claude-opus-4-6")
let result2 = await agent.prompt("Explain the difference between structs and classes in Swift.")
print(result2.costBreakdown)
// [CostBreakdownEntry(model: "claude-opus-4-6", inputTokens: 52, outputTokens: 156, costUsd: 0.011970)]

switchModel() 的实现:

public func switchModel(_ model: String) throws {
    let trimmed = model.trimmingCharacters(in: .whitespacesAndNewlines)
    guard !trimmed.isEmpty else {
        throw SDKError.invalidConfiguration("Model name cannot be empty")
    }
    let oldModel = self.model
    self.model = trimmed
    self.options.model = trimmed
    Logger.shared.info("Agent", "model_switch", data: ["from": oldModel, "to": trimmed])
}

不做白名单校验——传什么模型名就用什么,API 层面不支持的模型会在请求时报错。这样设计是因为 OpenAI 兼容提供商的模型名无法穷举。

fallbackModel 是在 AgentOptions 里配置的备用模型。主模型彻底失败(重试耗尽)后,SDK 会自动用 fallback model 重试一次:

if let fallbackModel = self.options.fallbackModel, fallbackModel != self.model {
    let fallbackResponse = try await retryClient.sendMessage(
        model: fallbackModel,
        messages: retryMessages, ...
    )
    // 临时切到 fallback model 跑 cost tracking
    let originalModel = self.model
    self.model = fallbackModel
    // ... 处理响应
}

按模型分别计费

CostBreakdownEntry 按模型名分组记录每次查询的费用:

public struct CostBreakdownEntry: Sendable, Equatable {
    public let model: String
    public let inputTokens: Int
    public let outputTokens: Int
    public let costUsd: Double
}

一次查询里如果中途切了模型(或触发了 fallback),QueryResult.costBreakdown 会包含多个条目,每个模型的花费分开算。费用根据内置的价格表计算:

public nonisolated(unsafe) var MODEL_PRICING: [String: ModelPricing] = [
    "claude-opus-4-6":   ModelPricing(input: 15.0 / 1_000_000, output: 75.0 / 1_000_000),
    "claude-sonnet-4-6": ModelPricing(input: 3.0 / 1_000_000, output: 15.0 / 1_000_000),
    "claude-haiku-4-5":  ModelPricing(input: 0.8 / 1_000_000, output: 4.0 / 1_000_000),
    // ...
]

自定义模型可以通过 registerModel(_:pricing:) 注册价格:

registerModel("glm-4-plus", pricing: ModelPricing(
    input: 0.1 / 1_000_000, output: 0.1 / 1_000_000
))

五、Thinking 与 Effort 配置

ThinkingConfig

SDK 用 ThinkingConfig 枚举控制 LLM 的深度思考能力:

public enum ThinkingConfig: Sendable, Equatable {
    case adaptive                  // 模型自己决定要不要思考
    case enabled(budgetTokens: Int) // 指定思考的 token 预算
    case disabled                  // 关闭深度思考
}

三种模式各有用途:

  • adaptive:让模型自己判断——简单问题不思考,复杂问题自动思考。日常使用最方便。
  • enabled(budgetTokens:):明确控制思考预算。比如你想要深度分析,给 10000 个 thinking token。
  • disabled:完全关闭思考,追求最快速度。

EffortLevel

EffortLevel 是更高层级的抽象,映射到具体的 thinking token 预算:

public enum EffortLevel: String, Sendable, CaseIterable {
    case low    // 1024 tokens
    case medium // 5120 tokens
    case high   // 10240 tokens
    case max    // 32768 tokens

    public var budgetTokens: Int {
        switch self {
        case .low: return 1024
        case .medium: return 5120
        case .high: return 10240
        case .max: return 32768
        }
    }
}

AgentOptions 里设置:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    effort: .high  // 10240 thinking tokens
))

运行时动态调节

setMaxThinkingTokens() 可以在查询之间调整思考预算:

// 普通问题,少给点思考 token
try agent.setMaxThinkingTokens(2048)
let r1 = await agent.prompt("Summarize this file.")

// 遇到复杂推理问题,加大预算
try agent.setMaxThinkingTokens(16000)
let r2 = await agent.prompt("Design a concurrent data structure for...")

// 关闭思考
try agent.setMaxThinkingTokens(nil)

传正整数就启用思考并设预算,传 nil 就关闭。传 0 或负数会抛 SDKError.invalidConfiguration

ModelInfo 描述了每个模型支持哪些能力:

public struct ModelInfo: Sendable, Equatable {
    public let value: String
    public let displayName: String
    public let description: String
    public let supportsEffort: Bool
    public let supportedEffortLevels: [EffortLevel]?
    public let supportsAdaptiveThinking: Bool?
    public let supportsFastMode: Bool?
}

这样 UI 层可以根据模型能力动态展示可选项。

六、Skills 系统

Skills 是 SDK 里一种特殊的扩展机制——本质上是"带工具限制的 prompt 模板"。一个 Skill 定义了一组 prompt 指令、允许使用的工具子集、可选的模型覆盖。

Skill 结构

public struct Skill: Sendable {
    public let name: String
    public let description: String
    public let aliases: [String]              // 别名,如 ["ci"] 代表 commit
    public let userInvocable: Bool            // 用户能否通过 /command 调用
    public let toolRestrictions: [ToolRestriction]?  // 限制可用工具,nil = 全部可用
    public let modelOverride: String?         // 执行时覆盖模型
    public let isAvailable: @Sendable () -> Bool     // 运行时可用性检查
    public let promptTemplate: String         // prompt 模板内容
    public let whenToUse: String?             // 告诉 LLM 什么时候该用这个 skill
    public let argumentHint: String?          // 参数提示,如 "[message]"
    public let baseDir: String?               // skill 目录的绝对路径
    public let supportingFiles: [String]      // 支撑文件(引用、脚本等)
}

5 个内置 Skill

SDK 预定义了 5 个常用 Skill,通过 BuiltInSkills 命名空间访问:

Skill 别名 允许的工具 功能
commit ci bash, read, glob, grep 分析 git diff,生成 commit message
review review-pr, cr bash, read, glob, grep 从 5 个维度审查代码变更
simplify bash, read, grep, glob 审查代码的复用、质量、效率
debug investigate, diagnose read, grep, glob, bash 分析错误,定位根因
test run-tests bash, read, write, glob, grep 生成测试用例并执行

每个 Skill 都限制了工具范围。比如 commit 只允许 bash、read、glob、grep——不需要写文件。debug 也是只读的(read、grep、glob、bash),只做诊断不做修改。test 是唯一允许 write 的内置 Skill,因为要创建测试文件。

test Skill 还有一个运行时可用性检查:

isAvailable: {
    let cwd = FileManager.default.currentDirectoryPath
    let testIndicators = [
        "Package.swift", "pytest.ini", "jest.config",
        "vitest.config", "Cargo.toml", "go.mod",
    ]
    for indicator in testIndicators {
        if FileManager.default.fileExists(atPath: cwd + "/" + indicator) {
            return true
        }
    }
    return false
}

只有检测到测试框架配置文件时,test Skill 才对用户可见。

SkillRegistry

SkillRegistry 是线程安全的 skill 管理器,用 DispatchQueue 保护并发访问:

public final class SkillRegistry: @unchecked Sendable {
    private var skills: [String: Skill] = [:]
    private var orderedNames: [String] = []
    private var aliases: [String: String] = [:]
    private let queue = DispatchQueue(label: "com.openagentsdk.skillregistry")

    public func register(_ skill: Skill) { ... }
    public func find(_ name: String) -> Skill? { ... }   // 按名称或别名查找
    public var allSkills: [Skill] { ... }
    public var userInvocableSkills: [Skill] { ... }
}

注册、查找、替换、删除都是 queue.sync 保护的操作。别名在注册时自动建立映射——注册 BuiltInSkills.commit 后,registry.find("ci") 也能找到它。

SkillLoader:文件系统发现

Skills 不需要全部代码注册。SkillLoader 可以从文件系统自动发现 skill——只要一个目录里包含 SKILL.md 文件,就会被识别为一个 skill 包。

扫描目录按优先级从低到高:

~/.config/agents/skills      (最低优先级)
~/.agents/skills
~/.claude/skills
$PWD/.agents/skills
$PWD/.claude/skills           (最高优先级)

同名 skill 后发现的覆盖先发现的(last-wins)。

SKILL.md 用 YAML frontmatter 定义元数据:

---
name: polyv-live-cli
description: 管理保利威直播服务
aliases: live, plv
allowed-tools: Bash, Read, Write, Glob
when-to-use: user asks about live streaming management
argument-hint: [action] [options]
---

# polyv-live-cli Skill

你是保利威直播服务的管理助手...

frontmatter 里的 allowed-tools 会被解析成 ToolRestriction 数组,限制这个 skill 执行时只能用指定的工具。

SkillLoader 采用"渐进式加载"策略:只加载 SKILL.md 的 Markdown body 作为 prompt 模板,支撑文件(references、scripts、templates)只记录路径不加载内容。Agent 需要时通过 Read/Bash 工具按需读取。

let registry = SkillRegistry()
registry.register(BuiltInSkills.commit)
registry.register(BuiltInSkills.review)
// 从文件系统发现自定义 skills
let count = registry.registerDiscoveredSkills()
// 或指定目录
registry.registerDiscoveredSkills(from: ["/opt/custom-skills"])
// 或只注册白名单里的
registry.registerDiscoveredSkills(skillNames: ["polyv-live-cli"])

ToolRestriction

ToolRestriction 枚举定义了可以被限制的工具:

public enum ToolRestriction: String, Sendable, CaseIterable {
    case bash, read, write, edit, glob, grep
    case webFetch, webSearch, askUser, toolSearch
    case agent, sendMessage
    case taskCreate, taskList, taskUpdate, taskGet, taskStop, taskOutput
    case teamCreate, teamDelete
    case notebookEdit, skill
}

当一个 Skill 设了 toolRestrictions: [.bash, .read, .glob],执行时 Agent 只能用这三个工具,其他工具调用会被拦截。

在 Agent 里使用 Skills

要让 Agent 能用 Skills,需要把 SkillTool 加到工具列表里:

var tools = getAllBaseTools(tier: .core)
tools.append(createSkillTool(registry: registry))

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    permissionMode: .bypassPermissions,
    tools: tools
))

// Agent 会根据 system prompt 里的 skill 列表自动发现并调用
let result = await agent.prompt("Use the commit skill to analyze current changes")

SkillRegistry.formatSkillsForPrompt() 会生成一段 skill 列表注入到 system prompt 里,包含每个 skill 的名称、描述和触发条件。LLM 看到这个列表后就知道该在什么场景下调用哪个 skill。

七、其他运行时控制

预算控制

maxBudgetUsd 设置查询的费用上限:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    maxBudgetUsd: 0.05  // 最多花 5 美分
))

每个 turn 结束后检查累计费用:

if let budget = options.maxBudgetUsd, totalCostUsd > budget {
    status = .errorMaxBudgetUsd
    break
}

超出预算时立即退出循环。已产生的文本和 token 统计仍然保留在 QueryResult 里——你拿到的是部分结果,不是空白的。

查询中断

两种方式中断正在进行的查询:

// 方式 1:调用 interrupt()
agent.interrupt()

// 方式 2:取消 Task
let task = Task {
    await agent.prompt("Long running query...")
}
// 稍后
task.cancel()

interrupt() 内部设置了 _interrupted 标志并取消 stream task。Agent Loop 在多个检查点检查这个标志(循环入口、只读/变更工具之间、SSE 事件循环内部、工具执行前后),检测到后立即退出。

动态权限切换

运行时可以切换权限模式和工具授权回调:

// 切换权限模式
agent.setPermissionMode(.askForPermission)

// 设置自定义授权回调(优先级高于 permissionMode)
agent.setCanUseTool { toolName, input in
    if toolName == "Bash" {
        return .deny("Bash is disabled")
    }
    return .allow
}

// 恢复到 permissionMode 控制
agent.setCanUseTool(nil)

setCanUseTool 的回调优先于 permissionMode。调 setPermissionMode() 会清空之前设的回调。

环境变量配置

SDK 支持通过环境变量配置,优先级是:代码设置 > 环境变量 > 默认值。

环境变量 对应字段 默认值
CODEANY_API_KEY apiKey nil
CODEANY_MODEL model claude-sonnet-4-6
CODEANY_BASE_URL baseURL nil(用提供商默认)

SDKConfiguration.resolved() 合并:

// 代码设置的值优先,没设的从环境变量读
let config = SDKConfiguration.resolved(overrides: SDKConfiguration(
    apiKey: "sk-...",           // 优先于 CODEANY_API_KEY
    model: "claude-sonnet-4-6"  // 优先于 CODEANY_MODEL
))

// 只用环境变量
let envConfig = SDKConfiguration.fromEnvironment()

重试机制

所有 LLM 请求经过 withRetry 包装:

public struct RetryConfig: Sendable {
    public let maxRetries: Int          // 最多重试次数,默认 3
    public let baseDelayMs: Int         // 基础延迟,默认 2000ms
    public let maxDelayMs: Int          // 最大延迟,默认 30000ms
    public let retryableStatusCodes: Set<Int>  // 默认 [429, 500, 502, 503, 529]
}

指数退避 + 25% 随机抖动,避免惊群效应。只有 SDKError.apiError 且状态码在可重试集合里才会重试,其他错误直接抛出。

let delay = config.baseDelayMs * (1 << attempt)
let jitterMs = Int(Double(delay) * 0.25 * (Double.random(in: -1...1)))
let totalMs = max(0, min(delay + jitterMs, config.maxDelayMs))

系列回顾

六篇文章写完了,覆盖了 Open Agent SDK (Swift) 的完整架构:

  • 第 0 篇:项目概述——SDK 做什么、整体架构、怎么用
  • 第 1 篇:Agent Loop 内核——从 prompt 到多轮对话的完整循环
  • 第 2 篇:34 个内置工具——ToolProtocol 协议、三层架构、自定义扩展
  • 第 3 篇:MCP 集成——外部工具服务器的连接、发现和通信
  • 第 4 篇:多 Agent 协作——Team/Task 模型、Agent 间通信
  • 第 5 篇:会话持久化与安全——Session 存储、权限控制、Hook 系统
  • 第 6 篇(本文):多 LLM 提供商与运行时控制——LLMClient 协议、OpenAI 适配层、模型切换、Thinking/Effort、Skills 系统

从 Agent Loop 这个核心出发,工具系统是循环里的"执行"环节,MCP 是外部工具扩展,多 Agent 是协作模式,会话是状态持久化,安全和 Hook 是管控机制,而本文讲的多提供商和运行时控制是灵活性的保障——让同一个 Agent 能根据场景选择最合适的模型和控制策略。


系列文章

GitHubterryso/open-agent-sdk-swift

 
深入 Open Agent SDK(五):会话持久化与安全防线

本文是「深入 Open Agent SDK (Swift)」系列第五篇。系列目录见这里

Agent 不只是一次性问答工具。真正有用的 Agent 要做到三件事:记住上下文(上次聊到哪了)、控制权限(哪些操作能做)、审计行为(谁在什么时候干了什么)。Open Agent SDK 用四个子系统来覆盖这些需求——SessionStore、PermissionPolicy、SandboxSettings、HookRegistry。

这篇文章分析这四个子系统的实现细节,看它们各自怎么工作,以及怎么组合起来构建一个安全的 Agent。

一、会话持久化:SessionStore

Agent Loop 每次运行会产生一组 messages 数组。如果不保存,进程退出就没了。SessionStore 负责把这些对话历史持久化到磁盘,下次启动时恢复。

SessionStore 是什么

SessionStore 是一个 actor,所有方法都需要 await 调用。默认把会话存在 ~/.open-agent-sdk/sessions/ 目录下,每个 session 一个子目录,里面放一个 transcript.json

let sessionStore = SessionStore()  // 默认路径
let sessionStore = SessionStore(sessionsDir: "/custom/path")  // 自定义路径

五个核心操作

SessionStore 提供五个核心方法,覆盖会话的完整生命周期。

save — 保存会话。把 messages 数组和元数据序列化成 JSON 写入磁盘:

try await sessionStore.save(
    sessionId: "my-session",
    messages: messages,
    metadata: PartialSessionMetadata(
        cwd: "/project",
        model: "claude-sonnet-4-6",
        summary: "代码分析会话",
        tag: "analysis",
        firstPrompt: "分析项目结构"
    )
)

存储结构长这样:

~/.open-agent-sdk/sessions/
  my-session/
    transcript.json    // { "metadata": {...}, "messages": [...] }

文件权限是 0600,目录权限是 0700——只有当前用户能读写。每次 save 会保留第一次创建时的 createdAt 时间戳,只更新 updatedAt

load — 加载会话。从磁盘读取 transcript.json,反序列化为 SessionData

if let data = try await sessionStore.load(sessionId: "my-session") {
    print("Messages: \(data.metadata.messageCount)")
    print("Model: \(data.metadata.model)")
    // data.messages 是 [[String: Any]] 数组
}

load 支持分页参数 limitoffset,不需要加载全部消息时可以只取尾部:

// 只加载最近 50 条消息
let recent = try await sessionStore.load(sessionId: "my-session", limit: 50, offset: nil)

list — 列出所有会话,按 updatedAt 降序排列(最近的在前):

let sessions = try await sessionStore.list(limit: 10)
for session in sessions {
    print("\(session.id) — \(session.summary ?? "(无标题)") [\(session.messageCount) 条消息]")
}

SessionMetadata 包含 id、cwd、model、createdAt、updatedAt、messageCount,以及可选的 summary、tag、firstPrompt、gitBranch、fileSize。

fork — 分叉会话。从已有会话复制消息到新 session,可以指定截断点:

// 完整复制
let newId = try await sessionStore.fork(sourceSessionId: "my-session")

// 只复制前 10 条消息
let truncatedId = try await sessionStore.fork(
    sourceSessionId: "my-session",
    upToMessageIndex: 10
)

// 指定新 session ID
let customId = try await sessionStore.fork(
    sourceSessionId: "my-session",
    newSessionId: "forked-session"
)

delete — 删除整个会话目录:

let deleted = try await sessionStore.delete(sessionId: "my-session")

此外还有 rename(改标题)和 tag(打标签)两个辅助方法。

会话恢复的三种模式

把 SessionStore 注入 Agent 后,SDK 提供三种恢复策略:

1. 指定 sessionId 恢复

最直接的方式:给定一个 session ID,Agent 启动时从 SessionStore 加载历史消息,追加到 messages 数组前面:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    sessionStore: sessionStore,
    sessionId: "my-session"      // 指定恢复哪个 session
))

2. continueRecentSession — 自动接续最近的会话

不知道 session ID 时,让 SDK 自动找最近的一个:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    sessionStore: sessionStore,
    continueRecentSession: true   // 自动加载最近的 session
))

内部实现是调 sessionStore.list() 取第一个(已按 updatedAt 降序排列),把它的 ID 作为恢复目标。

3. forkSession + resumeSessionAt — 分叉并截断

在已有会话的基础上分叉一个新分支,还可以截断到指定消息:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    sessionStore: sessionStore,
    sessionId: "my-session",
    forkSession: true,                     // 复制到新 session
    resumeSessionAt: "msg-uuid-123"        // 截断到这条消息
))

SDK 内部的解析顺序是:先 continueRecentSession 确定 session ID,再 forkSession 创建分叉,再 resumeSessionAt 截断历史。这三个选项可以独立使用也可以组合。

SessionStore 的安全细节

SessionStore 在 session ID 校验上做了路径遍历防护:

private func validateSessionId(_ sessionId: String) throws {
    guard !sessionId.isEmpty else {
        throw SDKError.sessionError(message: "Session ID must not be empty")
    }
    let forbidden = ["/", "\\", ".."]
    for component in forbidden {
        if sessionId.contains(component) {
            throw SDKError.sessionError(message: "Session ID contains invalid character: '\(component)'")
        }
    }
}

session ID 里不能包含 /\..——防止攻击者通过构造 ID 来读写预期之外的路径。

二、权限控制:PermissionPolicy

会话持久化解决了"记住"的问题,权限控制解决的是"能做什么"的问题。

六种 PermissionMode

SDK 定义了 6 种权限模式:

模式 行为
default 每次工具执行前询问用户
plan 只读工具直接执行,写操作需要确认
auto 自动执行所有工具,危险操作除外
acceptEdits 文件编辑自动执行,其他操作需要确认
dontAsk 不询问用户,根据上下文自动判断
bypassPermissions 跳过所有权限检查
let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    permissionMode: .plan  // 只读工具直接跑,写操作要确认
))

canUseTool 回调:比 PermissionMode 更细粒度

permissionMode 是全局开关,粒度比较粗。如果你需要按工具名称或工具属性做精细控制,用 canUseTool 回调:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    permissionMode: .bypassPermissions,
    canUseTool: { tool, input, context in
        if tool.name == "Bash" {
            return CanUseToolResult.deny("Bash is not allowed")
        }
        return nil  // nil 表示"我没意见,交给 permissionMode 决定"
    }
))

canUseTool 返回 CanUseToolResult?。返回 nil 表示该回调没有意见,交给下一个检查环节;返回非 nil 结果时,SDK 用回调的决定,不再看 permissionMode

CanUseToolResult 有三个工厂方法:

CanUseToolResult.allow()                              // 允许
CanUseToolResult.deny("原因")                          // 拒绝
CanUseToolResult.allowWithInput(modifiedInput)         // 允许但修改输入参数

allowWithInput 比较少见但很实用——你可以在权限检查时修改工具的输入参数。比如把文件写入路径重定向到安全目录。

策略模式:可组合的权限规则

直接写闭包虽然灵活,但不方便复用。SDK 提供了 PermissionPolicy 协议,把权限判断封装成可组合的策略:

public protocol PermissionPolicy: Sendable {
    func evaluate(
        tool: ToolProtocol,
        input: Any,
        context: ToolContext
    ) async -> CanUseToolResult?
}

SDK 内置了四个策略:

ToolNameAllowlistPolicy — 白名单,只允许指定的工具:

let policy = ToolNameAllowlistPolicy(allowedToolNames: ["Read", "Glob", "Grep"])
// Write、Edit、Bash 等工具全部被拒绝

ToolNameDenylistPolicy — 黑名单,拒绝指定的工具:

let policy = ToolNameDenylistPolicy(deniedToolNames: ["Bash", "Write"])
// 其他工具正常执行

ReadOnlyPolicy — 只允许只读工具(isReadOnly == true):

let policy = ReadOnlyPolicy()
// Read、Glob、Grep、WebSearch 等只读工具允许
// Write、Edit、Bash 等变更工具被拒绝

CompositePolicy — 组合多个策略,按顺序评估:

let policy = CompositePolicy(policies: [
    ToolNameDenylistPolicy(deniedToolNames: ["Bash"]),
    ReadOnlyPolicy()
])
// 先检查黑名单(Bash 被拒绝),再检查只读策略

CompositePolicy 的评估规则:

  • 任何子策略返回 deny,整体 deny(短路)
  • 子策略返回 nil(没意见),跳过
  • 所有子策略都 allow 或没意见,整体 allow

canUseTool(policy:) 桥接函数把策略转成回调:

let policy = CompositePolicy(policies: [
    ToolNameDenylistPolicy(deniedToolNames: ["Bash"]),
    ReadOnlyPolicy()
])

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    permissionMode: .bypassPermissions,
    canUseTool: canUseTool(policy: policy)
))

三、沙盒机制:SandboxSettings + SandboxChecker

权限控制管的是"这个工具能不能执行",沙盒管的是"这个操作在不在允许范围内"。比如 Bash 工具通过了权限检查,但你还得确保它不会 rm -rf /

SandboxSettings 的配置项

let sandbox = SandboxSettings(
    // 路径控制
    allowedReadPaths: ["/project/"],
    allowedWritePaths: ["/project/build/"],
    deniedPaths: ["/etc/", "/var/"],

    // 命令控制
    deniedCommands: ["rm", "sudo"],           // 黑名单
    // allowedCommands: ["git", "swift"],     // 白名单(和黑名单二选一)

    // 行为控制
    allowNestedSandbox: false,
    autoAllowBashIfSandboxed: false,          // 沙箱激活时自动批准 Bash
    allowUnsandboxedCommands: false,
    enableWeakerNestedSandbox: false,

    // 网络控制
    network: SandboxNetworkConfig(
        allowedDomains: ["api.example.com"],
        allowLocalBinding: false
    )
)

路径和命令各有两种模式:

  • 路径allowedReadPaths / allowedWritePaths 是白名单(空数组=全部允许),deniedPaths 是黑名单(优先级更高)
  • 命令allowedCommands 是白名单(设为非 nil 就只允许列出的命令),deniedCommands 是黑名单。allowedCommands 优先级高于 deniedCommands

SandboxChecker 的执行逻辑

SandboxChecker 是一个无状态的枚举类,提供 isPathAllowedcheckPathisCommandAllowedcheckCommand 四个静态方法。isXxx 返回 Bool,checkXxx 不通过时抛出 SDKError.permissionDenied

路径检查用前缀匹配加段边界保证:

// /project/ 匹配 /project/src/file.swift
// /project/ 不匹配 /project-backup/file.swift
SandboxChecker.isPathAllowed("/project/src/main.swift", for: .read, settings: sandbox)
// -> true

SandboxChecker.isPathAllowed("/project-backup/old.swift", for: .read, settings: sandbox)
// -> false(段边界不匹配)

实现关键在于 SandboxPathNormalizer——先把路径规范化(解析 ...、symlink),再做前缀比较时保证尾部有 / 来强制段边界。

// 路径遍历攻击会被 normalize 掉
let normalized = SandboxPathNormalizer.normalize("/project/src/../../etc/passwd")
// -> "/etc/passwd",然后被 deniedPaths 拦截

命令检查分三个阶段:

  1. Shell 元字符检测——识别 bash -c "cmd"$(cmd)`cmd` 等绕过模式
  2. Basename 提取——从 /usr/bin/rm -rf /tmp 提取出 rm
  3. 白名单/黑名单匹配
// 黑名单里有 "rm"
SandboxChecker.isCommandAllowed("rm -rf /tmp", settings: blocklist)
// -> false

// 路径形式的命令也能识别
SandboxChecker.isCommandAllowed("/usr/bin/rm -rf /tmp", settings: blocklist)
// -> false(提取 basename 得到 "rm")

// 反斜杠绕过
SandboxChecker.isCommandAllowed("\\rm -rf /tmp", settings: blocklist)
// -> false(去掉前导 \ 后得到 "rm")

// 引号绕过
SandboxChecker.isCommandAllowed("\"rm\" -rf /tmp", settings: blocklist)
// -> false(去掉引号后得到 "rm")

// 子 shell 绕过
SandboxChecker.isCommandAllowed("bash -c \"rm -rf /tmp\"", settings: blocklist)
// -> false(递归检查内部命令)

对于无法可靠解析的命令(比如多层嵌套的 bash -c "bash -c 'rm ...'"),默认拒绝。

命令参数中的文件路径也会被提取并检查——如果命令里出现了 deniedPaths 中的路径,命令也会被拒绝。

autoAllowBashIfSandboxed

这个选项是沙盒和权限系统的桥梁。当 autoAllowBashIfSandboxed = true 时,Bash 工具会跳过 canUseTool 权限回调检查,但仍然经过 SandboxChecker.checkCommand() 的命令过滤。

设计思路是:如果你已经配了完善的沙盒规则,Bash 命令能做什么已经被限制住了,不需要再弹一次权限确认。

四、Hook 系统:20+ 生命周期事件

前三个系统解决的是"能不能做"的问题,Hook 系统解决的是"做了之后要知道"和"做之前要干预"的问题。

20+ 个 HookEvent

SDK 定义了 24 个生命周期事件:

事件 触发时机
preToolUse 工具执行前
postToolUse 工具执行成功后
postToolUseFailure 工具执行失败后
sessionStart Agent 会话开始
sessionEnd Agent 会话结束
stop Agent Loop 停止
subagentStart 子 Agent 启动
subagentStop 子 Agent 完成
userPromptSubmit 用户提交 prompt
permissionRequest 权限检查发生
permissionDenied 权限被拒绝
taskCreated 任务创建
taskCompleted 任务完成
configChange 配置变更
cwdChanged 工作目录变更
fileChanged 文件变更
notification 通知事件
preCompact 对话压缩前
postCompact 对话压缩后
teammateIdle 团队成员空闲
setup Agent 初始化
worktreeCreate 工作树创建
worktreeRemove 工作树移除

函数 Hook vs Shell Hook

Hook 有两种实现方式:函数回调和 Shell 命令。

函数 Hook — Swift 闭包,适合进程内逻辑:

await registry.register(.preToolUse, definition: HookDefinition(
    handler: { input in
        // input 是 HookInput,包含 event、toolName、toolInput、sessionId 等
        return HookOutput(message: "拦截成功", block: true)
    }
))

Shell Hook — 外部命令,适合集成非 Swift 脚本:

await registry.register(.preToolUse, definition: HookDefinition(
    command: "python3 /path/to/check.py"  // HookInput 通过 stdin JSON 传入
))

Shell Hook 通过 ShellHookExecutor 执行:用 /bin/bash -c 启动进程,把 HookInput 序列化为 JSON 写入 stdin,从 stdout 读取 HookOutput JSON。Shell 命令的标准输出如果不是合法 JSON,会被包装成 HookOutput(message: stdout)

Shell Hook 的环境变量里会注入 HOOK_EVENTHOOK_TOOL_NAMEHOOK_SESSION_IDHOOK_CWD,方便脚本直接用环境变量判断上下文。

HookRegistry 的注册与执行

HookRegistry 是一个 actor,内部维护 [HookEvent: [HookDefinition]] 映射:

let registry = HookRegistry()

// 注册函数 Hook
await registry.register(.preToolUse, definition: HookDefinition(
    handler: { input in
        return HookOutput(message: "Bash blocked", block: true)
    },
    matcher: "Bash"  // 只匹配 Bash 工具
))

// 注册 Shell Hook
await registry.register(.postToolUse, definition: HookDefinition(
    command: "/usr/bin/logger 'Tool executed'",
    timeout: 5000  // 5 秒超时
))

// 执行所有注册在某事件上的 Hook
let results = await registry.execute(.preToolUse, input: hookInput)
// results: [HookOutput],包含所有匹配的 Hook 的返回值

matcher 过滤:每个 HookDefinition 可以设一个 matcher(正则表达式)。执行时先检查 input.toolName 是否匹配 matcher,不匹配就跳过这个 Hook。matcher 为 nil 时匹配所有工具。

超时处理:函数 Hook 用 withThrowingTaskGroup 实现超时——把实际执行和 Task.sleep 放在同一个 TaskGroup 里,谁先完成用谁。超时的 Hook 不影响其他 Hook 执行。Shell Hook 通过 DispatchQueue.asyncAfter 设置超时,到时间就 terminate 进程。

执行顺序:同一事件上的 Hook 按注册顺序串行执行。

HookOutput 的能力

HookOutput 可以做这些事:

HookOutput(
    message: "日志消息",                          // 附加信息
    block: true,                                  // 拦截操作
    notification: HookNotification(               // 发送通知
        title: "警告",
        body: "检测到危险操作",
        level: .warning
    ),
    permissionUpdate: PermissionUpdate(           // 动态修改权限
        tool: "Bash",
        behavior: .deny
    ),
    systemMessage: "请在沙箱内操作",               // 注入系统消息
    reason: "安全策略",                            // 拦截原因
    updatedInput: ["command": "echo safe"],       // 修改工具输入
    decision: .block                              // 显式 approve/block
)

其中 block: true 会阻止工具执行,返回一个错误结果给 LLM。permissionUpdate 可以在 Hook 运行时动态修改工具权限。updatedInput 可以替换工具的输入参数。

五、实战组合:构建一个安全的 Agent

四个子系统各有分工:

  • SessionStore — 记住对话历史
  • PermissionPolicy — 控制工具能不能执行
  • SandboxSettings — 限制操作范围
  • HookRegistry — 审计和拦截

下面用一个完整的例子展示怎么把它们组合起来:

import Foundation
import OpenAgentSDK

// 1. 创建 SessionStore
let sessionStore = SessionStore()

// 2. 创建 HookRegistry,注册审计和安全拦截
let hookRegistry = HookRegistry()

// 记录所有工具执行
await hookRegistry.register(.postToolUse, definition: HookDefinition(
    handler: { input in
        if let toolName = input.toolName {
            print("[审计] 工具 \(toolName) 执行完成")
        }
        return nil
    }
))

// 拦截 Bash 中的危险命令
await hookRegistry.register(.preToolUse, definition: HookDefinition(
    handler: { input in
        return HookOutput(
            message: "Bash 被安全策略拦截",
            block: true,
            decision: .block
        )
    },
    matcher: "Bash"
))

// 记录权限拒绝事件
await hookRegistry.register(.permissionDenied, definition: HookDefinition(
    handler: { input in
        print("[安全告警] 权限被拒绝: \(input.error ?? "unknown")")
        return nil
    }
))

// 会话生命周期追踪
await hookRegistry.register(.sessionStart, definition: HookDefinition(
    handler: { _ in print("[会话] 开始"); return nil }
))
await hookRegistry.register(.sessionEnd, definition: HookDefinition(
    handler: { _ in print("[会话] 结束"); return nil }
))

// 3. 配置沙盒:限制路径和命令
let sandbox = SandboxSettings(
    allowedReadPaths: ["/project/"],
    allowedWritePaths: ["/project/src/", "/project/tests/"],
    deniedPaths: ["/etc/", "/var/", "/tmp/"],
    deniedCommands: ["rm", "sudo", "chmod", "chown"],
    autoAllowBashIfSandboxed: false,
    allowNestedSandbox: false
)

// 4. 配置权限策略:只读 + 排除 Bash
let policy = CompositePolicy(policies: [
    ToolNameDenylistPolicy(deniedToolNames: ["Bash"]),
    ReadOnlyPolicy()
])

// 5. 创建 Agent,注入所有组件
let agent = createAgent(options: AgentOptions(
    apiKey: "sk-...",
    model: "claude-sonnet-4-6",
    systemPrompt: "你是一个代码分析助手。只能读取文件,不能修改。",
    maxTurns: 10,
    permissionMode: .bypassPermissions,
    canUseTool: canUseTool(policy: policy),
    sessionStore: sessionStore,
    sessionId: "analysis-session",
    hookRegistry: hookRegistry,
    sandbox: sandbox
))

// 6. 执行查询
let result = await agent.prompt("分析项目中的 Swift 源文件结构")
print(result.text)

// 7. 后续恢复会话
let resumedAgent = createAgent(options: AgentOptions(
    apiKey: "sk-...",
    model: "claude-sonnet-4-6",
    permissionMode: .bypassPermissions,
    canUseTool: canUseTool(policy: policy),
    sessionStore: sessionStore,
    sessionId: "analysis-session",  // 同一个 session ID,自动恢复历史
    hookRegistry: hookRegistry,
    sandbox: sandbox
))

let continued = await resumedAgent.prompt("继续分析测试文件")
print(continued.text)

这个 Agent 的安全特性:

  • 权限层:CompositePolicy 确保只有只读工具能执行,Bash 被黑名单排除
  • 沙盒层:即使工具通过了权限检查,也受路径限制——只能读 /project/ 下的文件,不能碰 /etc//var/
  • Hook 层:所有工具执行被记录(审计),Bash 调用被 preToolUse Hook 二次拦截
  • 会话层:对话自动保存和恢复,重启后能继续之前的工作

多层防御的好处是:即使某一层的配置有疏漏,其他层还能兜底。比如你误把 Bash 加进了白名单,Hook 的 matcher 还能拦截;即使 Hook 没拦住,沙盒的命令过滤还能挡。

小结

SessionStore、PermissionPolicy、SandboxSettings、HookRegistry 四个系统各管一件事,但组合起来就是一套完整的安全框架:

  • SessionStore 的 actor 隔离和 session ID 校验保证了存储安全
  • PermissionPolicy 的策略组合提供了灵活的权限管理
  • SandboxChecker 的路径规范化和段边界匹配防止目录穿越
  • HookRegistry 的 matcher 过滤和超时机制确保了 Hook 系统的可靠性

下一篇看 SDK 的 多 LLM 提供商:怎么同时支持 Anthropic、OpenAI 和其他 LLM,Provider 协议的设计,以及运行时切换模型的机制。


系列文章

GitHubterryso/open-agent-sdk-swift

 
深入 Open Agent SDK(四):多 Agent 协作——子代理、团队与任务编排

本文是「深入 Open Agent SDK (Swift)」系列第四篇。系列目录见这里

单个 Agent 再强,也只是一个执行者。真实的开发任务往往是多步骤、多角色的:先有人探索代码库,有人设计方案,再有人写代码、跑测试。一个 Agent 单干,上下文容易膨胀,效率也上不去。

Open Agent SDK 从三个层面解决这个问题:

  1. 子 Agent -- 主 Agent 在运行过程中动态生成子 Agent,把专门的任务委派出去
  2. Task 系统 -- 用任务追踪多步骤工作的进度和结果
  3. Team + 消息传递 -- 多个 Agent 组成团队,通过邮箱系统互相通信

这篇文章逐一分析这三个层面的实现,最后看它们怎么组合起来做任务编排。

一、子 Agent:SubAgentSpawner 协议与 AgentTool

SubAgentSpawner 协议

子 Agent 的生成不是 AgentTool 直接 new 一个 Agent 出来——中间隔了一层协议。SubAgentSpawner 定义在 Types/AgentTypes.swift 里:

public protocol SubAgentSpawner: Sendable {
    func spawn(
        prompt: String,
        model: String?,
        systemPrompt: String?,
        allowedTools: [String]?,
        maxTurns: Int?
    ) async -> SubAgentResult

    func spawn(
        prompt: String,
        model: String?,
        systemPrompt: String?,
        allowedTools: [String]?,
        maxTurns: Int?,
        disallowedTools: [String]?,
        mcpServers: [AgentMcpServerSpec]?,
        skills: [String]?,
        runInBackground: Bool?,
        isolation: String?,
        name: String?,
        teamName: String?,
        mode: PermissionMode?,
        resume: String?
    ) async -> SubAgentResult
}

两个方法,一个基础版(5 个参数),一个增强版(13 个参数)。协议还提供了默认实现,增强版直接调用基础版,这样已有的实现类不用改代码就能兼容。

为什么要把 spawner 放在 Types/ 而不是 Core/?因为 Tools/Advanced/AgentTool.swift 需要用它,但 Tools/ 不应该导入 Core/。把协议定义在 Types/,具体实现放在 Core/,通过 ToolContext.agentSpawner 注入——这是 SDK 里常见的依赖倒置。

DefaultSubAgentSpawner 实现

DefaultSubAgentSpawnerCore/DefaultSubAgentSpawner.swift 里,做了这几件事:

final class DefaultSubAgentSpawner: SubAgentSpawner, @unchecked Sendable {
    private let apiKey: String
    private let baseURL: String?
    private let parentModel: String
    private let parentTools: [ToolProtocol]
    private let provider: LLMProvider
    private let client: (any LLMClient)?

    func spawn(...) async -> SubAgentResult {
        // 1. 过滤掉 AgentTool,防止无限递归
        var subTools = parentTools.filter { $0.name != "Agent" }

        // 2. 如果指定了 allowedTools,进一步过滤
        if let allowed = allowedTools, !allowed.isEmpty {
            let allowedSet = Set(allowed)
            subTools = subTools.filter { allowedSet.contains($0.name) }
        }

        // 3. disallowedTools 再过一遍(优先级高于 allowedTools)
        if let disallowed = disallowedTools, !disallowed.isEmpty {
            let disallowedSet = Set(disallowed)
            subTools = subTools.filter { !disallowedSet.contains($0.name) }
        }

        // 4. 创建子 Agent 并执行
        let options = AgentOptions(
            apiKey: apiKey,
            model: model ?? parentModel,
            systemPrompt: systemPrompt,
            maxTurns: maxTurns ?? 10,
            tools: subTools
        )
        let agent = Agent(options: options)
        let result = await agent.prompt(prompt)

        return SubAgentResult(
            text: result.text.isEmpty
                ? "(Subagent completed with no text output)"
                : result.text,
            toolCalls: [],
            isError: result.status != .success
        )
    }
}

几个关键点:

  • 防递归:子 Agent 不会再拿到 AgentTool,所以不会出现 Agent 套 Agent 套 Agent 的情况
  • 工具继承:子 Agent 默认继承父 Agent 的所有工具(除了 AgentTool),但可以通过 allowedTools / disallowedTools 限制
  • 阻塞式执行:父 Agent 调用 spawn() 后会 await,等子 Agent 跑完才继续

AgentTool:LLM 眼里的子 Agent 工具

AgentTool 是暴露给 LLM 的工具。LLM 调用 Agent 工具时传入 prompt 和参数,AgentTool 负责调用 spawner 生成子 Agent。

它内置了两种预定义的子 Agent 类型:

private let BUILTIN_AGENTS: [String: AgentDefinition] = [
    "Explore": AgentDefinition(
        name: "Explore",
        description: "Fast agent specialized for exploring codebases...",
        systemPrompt: "You are a codebase exploration agent. Search through files and code to answer questions...",
        tools: ["Read", "Glob", "Grep", "Bash"],
        maxTurns: 10
    ),
    "Plan": AgentDefinition(
        name: "Plan",
        description: "Software architect agent for designing implementation plans...",
        systemPrompt: "You are a software architect. Design implementation plans...",
        tools: ["Read", "Glob", "Grep", "Bash"],
        maxTurns: 10
    ),
]
  • Explore:代码库探索,用 Glob 找文件、Grep 搜内容、Read 读文件
  • Plan:软件架构师,理解代码库后输出实施方案

LLM 调用 AgentTool 时,通过 subagent_type 字段指定用哪种:

{
  "prompt": "Explore the project structure and find all Swift source files",
  "description": "Explore codebase",
  "subagent_type": "Explore"
}

AgentTool 还支持一堆可选参数:model(指定模型)、maxTurns(覆盖轮次上限)、run_in_background(后台运行)、isolation(隔离模式,比如 worktree)、team_name(关联团队)、mode(权限模式)。这些参数直接透传给 spawner。

一个完整的示例

SDK 自带了一个 SubagentExample,演示了主 Agent 作为协调者,通过 AgentTool 委派 Explore 子 Agent 的完整流程:

// 主 Agent 的系统提示
let systemPrompt = """
You are a coordinator agent. When given a task, you should delegate it to a sub-agent \
using the Agent tool. The Agent tool will spawn a specialized agent (e.g., "Explore" type) \
that can use Read, Glob, Grep, and Bash tools to investigate the codebase. \
After the sub-agent returns its findings, summarize the results for the user.
"""

// 注册工具:核心工具 + AgentTool
let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: defaultModel,
    systemPrompt: systemPrompt,
    maxTurns: 10,
    tools: getAllBaseTools(tier: .core) + [createAgentTool()]
))

// 发任务——主 Agent 会调用 AgentTool 委派给 Explore 子 Agent
for await message in agent.stream("""
    Explore the current project directory. Find all Swift source files, \
    examine the project structure, and provide a summary. \
    Use the Agent tool to delegate this task to an Explore sub-agent.
""") {
    switch message {
    case .toolUse(let data):
        if data.toolName == "Agent" {
            print("[Sub-agent Delegation: \(data.toolName)]")
        }
    case .toolResult(let data):
        print("[Result: \(data.content.prefix(200))]")
    case .result(let data):
        print("Turns: \(data.numTurns), Cost: $\(data.totalCostUsd)")
    default:
        break
    }
}

执行流程:用户发 prompt -> 主 Agent 判断需要探索代码库 -> 调用 AgentTool -> AgentTool 通过 spawner 生成 Explore 子 Agent -> 子 Agent 用 Glob/Grep/Read 执行探索 -> 结果返回给主 Agent -> 主 Agent 汇总后回复用户。

二、Task 系统:任务追踪与状态机

子 Agent 解决了"谁干活"的问题,Task 系统解决的是"活干了多少、谁在干、结果是什么"的问题。

TaskStore:线程安全的 Actor

TaskStore 是一个 Swift Actor,保证并发安全:

public actor TaskStore {
    private var tasks: [String: Task] = [:]
    private var taskCounter: Int = 0

    public func create(
        subject: String,
        description: String? = nil,
        owner: String? = nil,
        status: TaskStatus = .pending
    ) -> Task {
        taskCounter += 1
        let id = "task_\(taskCounter)"
        let now = dateFormatter.string(from: Date())
        let task = Task(
            id: id, subject: subject, description: description,
            status: status, owner: owner,
            createdAt: now, updatedAt: now
        )
        tasks[id] = task
        return task
    }
}

用 Actor 而不是普通类,意味着所有方法都是隐式串行化的——不需要自己加锁。多个 Agent 同时创建任务不会出现竞态条件。

Task 的状态机

Task 有 5 种状态,流转规则很明确:

public enum TaskStatus: String, Sendable, Equatable, Codable {
    case pending      // 等待开始
    case inProgress   // 进行中
    case completed    // 已完成
    case failed       // 失败
    case cancelled    // 已取消
}

状态转换有约束:pendinginProgress 可以转到任何状态,但 completedfailedcancelled 是终态,不可再变:

private func isValidTransition(from: TaskStatus, to: TaskStatus) -> Bool {
    switch from {
    case .pending, .inProgress:
        return true
    case .completed, .failed, .cancelled:
        return false  // 终态,不能再转
    }
}

画成状态图:

pending ──→ inProgress ──→ completed
   │            │
   │            ├──→ failed
   │            │
   └──→ cancelled ←──┘

TaskStatus 还有个贴心的 parse() 方法,同时支持 camelCase(inProgress)和 snake_case(in_progress),因为 LLM 返回的 JSON 格式不一定统一:

public static func parse(_ string: String) -> TaskStatus? {
    if let direct = TaskStatus(rawValue: string) { return direct }
    // snake_case → camelCase
    let camel = string
        .split(separator: "_")
        .enumerated()
        .map { $0.offset == 0 ? String($0.element) : String($0.element).capitalized }
        .joined()
    return TaskStatus(rawValue: camel)
}

Task 结构体

一个 Task 实例除了基本的状态追踪,还预留了依赖关系和元数据:

public struct Task: Sendable, Equatable, Codable {
    public let id: String
    public var subject: String
    public var description: String?
    public var status: TaskStatus
    public var owner: String?        // 谁在干
    public let createdAt: String
    public var updatedAt: String
    public var output: String?       // 结果
    public var blockedBy: [String]?  // 被哪些任务阻塞
    public var blocks: [String]?     // 阻塞了哪些任务
    public var metadata: [String: String]?
}

blockedByblocks 字段说明 Task 系统预留了任务依赖的能力——任务 A 可以声明"我需要等任务 B 和 C 完成才能开始"。

三个 Task 工具

SDK 提供了三个工具让 LLM 操作 Task 系统:

TaskCreate -- 创建任务:

public func createTaskCreateTool() -> ToolProtocol {
    return defineTool(
        name: "TaskCreate",
        description: "Create a new task for tracking work progress.",
        inputSchema: taskCreateSchema,
        isReadOnly: false
    ) { (input: TaskCreateInput, context: ToolContext) in
        guard let taskStore = context.taskStore else {
            return ToolExecuteResult(content: "Error: TaskStore not available.", isError: true)
        }
        let initialStatus: TaskStatus = input.status.flatMap { TaskStatus.parse($0) } ?? .pending
        let task = await taskStore.create(
            subject: input.subject,
            description: input.description,
            owner: input.owner,
            status: initialStatus
        )
        return ToolExecuteResult(
            content: "Task created: \(task.id) - \"\(task.subject)\" (\(task.status.rawValue))",
            isError: false
        )
    }
}

TaskList -- 列出任务(支持按 status 和 owner 过滤):

// LLM 可以查 "列出所有 pending 状态的任务" 或 "列出分配给 agent-1 的任务"
let tasks = await taskStore.list(status: status, owner: input.owner)

TaskUpdate -- 更新任务(状态、描述、负责人、输出):

do {
    let task = try await taskStore.update(
        id: input.id,
        status: status,
        description: input.description,
        owner: input.owner,
        output: input.output
    )
    return ToolExecuteResult(
        content: "Task updated: \(task.id) - \(task.status.rawValue) - \"\(task.subject)\"",
        isError: false
    )
} catch let error as TaskStoreError {
    return ToolExecuteResult(content: "Error: \(error.localizedDescription)", isError: true)
}

注意 TaskUpdate 会抛出 invalidStatusTransition 错误——比如试图把一个 completed 的任务改成 inProgress,LLM 会收到错误提示,可以据此调整策略。

三、Team 系统:团队组建与管理

Task 系统追踪"做什么",Team 系统解决"谁跟谁一组"。

TeamStore

和 TaskStore 一样,TeamStore 也是 Actor:

public actor TeamStore {
    private var teams: [String: Team] = [:]
    private var teamCounter: Int = 0

    public func create(
        name: String,
        members: [TeamMember] = [],
        leaderId: String = "self"
    ) -> Team {
        teamCounter += 1
        let id = "team_\(teamCounter)"
        let team = Team(
            id: id, name: name, members: members,
            leaderId: leaderId,
            createdAt: dateFormatter.string(from: Date()),
            status: .active
        )
        teams[id] = team
        return team
    }
}

Team 有两种状态:activedisbanded。删除 Team 不是真删,而是把状态改成 disbanded——标记为 disbanded 的 Team 不允许添加/移除成员。

TeamMember 和角色

public enum TeamRole: String, Sendable, Equatable, Codable {
    case leader   // 团队领导
    case member   // 普通成员
}

public struct TeamMember: Sendable, Equatable, Codable {
    public let name: String
    public let role: TeamRole
}

TeamCreateTool 创建 Team 时,所有传入的成员默认都是 member 角色,leaderId 默认是 "self"(即创建者自己):

let members: [TeamMember] = input.members?.map { TeamMember(name: $0) } ?? []
let team = await teamStore.create(
    name: input.name,
    members: members,
    leaderId: "self"
)

TeamStore 还提供了动态管理成员的能力:

// 添加成员
try teamStore.addMember(teamId: "team_1", member: TeamMember(name: "agent-coder"))

// 移除成员
try teamStore.removeMember(teamId: "team_1", agentName: "agent-coder")

// 查找某个 Agent 属于哪个团队
let team = await teamStore.getTeamForAgent(agentName: "agent-coder")

getTeamForAgent 对消息传递很重要——发消息时需要知道发件人属于哪个 Team,才能验证收件人是不是队友。

AgentRegistry:Agent 注册表

除了 TeamStore,还有一个 AgentRegistry 负责追踪所有活跃的 Agent:

public actor AgentRegistry {
    private var agents: [String: AgentRegistryEntry] = [:]
    private var nameIndex: [String: String] = [:]  // name -> agentId

    public func register(agentId: String, name: String, agentType: String) throws -> AgentRegistryEntry {
        if nameIndex[name] != nil {
            throw AgentRegistryError.duplicateAgentName(name: name)
        }
        let entry = AgentRegistryEntry(...)
        agents[agentId] = entry
        nameIndex[name] = agentId
        return entry
    }

    public func getByName(name: String) -> AgentRegistryEntry? {
        guard let agentId = nameIndex[name] else { return nil }
        return agents[agentId]
    }
}

名字唯一性约束——同一个 AgentRegistry 里不能注册两个同名的 Agent。nameIndex 是一个反查索引,支持 O(1) 的名字查找。

四、消息传递:MailboxStore 与 SendMessage

有了 Team,Agent 之间需要能通信。SDK 用的是邮箱模式(Mailbox)——发消息不直接推给对方,而是放进对方的邮箱,对方自己来取。

MailboxStore

public actor MailboxStore {
    private var mailboxes: [String: [AgentMessage]] = [:]

    // 点对点发送
    public func send(from: String, to: String, content: String, type: AgentMessageType = .text) {
        let message = AgentMessage(from: from, to: to, content: content,
                                   timestamp: dateFormatter.string(from: Date()), type: type)
        if mailboxes[to] == nil { mailboxes[to] = [] }
        mailboxes[to]?.append(message)
    }

    // 广播——发给所有有邮箱的 Agent
    public func broadcast(from: String, content: String, type: AgentMessageType = .text) {
        let timestamp = dateFormatter.string(from: Date())
        for (agentName, _) in mailboxes {
            let message = AgentMessage(from: from, to: agentName, content: content,
                                       timestamp: timestamp, type: type)
            mailboxes[agentName]?.append(message)
        }
    }

    // 读取并清空邮箱
    public func read(agentName: String) -> [AgentMessage] {
        guard let messages = mailboxes[agentName] else { return [] }
        mailboxes[agentName] = []  // 读完清空
        return messages
    }
}

三个核心操作:send(点对点)、broadcast(广播)、read(读取)。read 是破坏性读取——读一次邮箱就清空了。broadcast 只发给已经有邮箱的 Agent,不会凭空创建邮箱。

消息类型除了普通文本(.text),还有 .shutdownRequest.shutdownResponse.planApprovalResponse——这些特殊类型用于团队管理的协调操作。

SendMessage 工具

SendMessageTool 做了三层校验:

// 1. 必须有 MailboxStore
guard let mailboxStore = context.mailboxStore else { ... }
// 2. 必须有 TeamStore
guard let teamStore = context.teamStore else { ... }
// 3. 必须知道发送者是谁
guard let senderName = context.senderName else { ... }

// 4. 发送者必须在某个 Team 里
guard let team = await teamStore.getTeamForAgent(agentName: senderName) else { ... }

// 5. 收件人必须是同 Team 的成员
let isMember = team.members.contains { $0.name == input.to }
guard isMember else { ... }

广播用 "*" 作为收件人:

{ "to": "*", "message": "Phase 1 complete, starting Phase 2." }

点对点用具体名字:

{ "to": "agent-coder", "message": "Here's the spec for module A." }

校验不通过时返回错误信息,LLM 能看到哪些成员可用,可以调整发送目标。

五、编排模式:怎么组合这些能力

单个 Agent、Task、Team、Mailbox 各自能做什么清楚了。实际场景中怎么组合?看一个典型的工作流。

模式一:主 Agent + 并行子 Agent

最简单的模式。主 Agent 收到复杂任务后,同时启动多个子 Agent 各自处理一部分:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    systemPrompt: """
    You are a coordinator. Break complex tasks into subtasks, \
    delegate each to an Explore sub-agent, then synthesize results.
    """,
    maxTurns: 20,
    tools: getAllBaseTools(tier: .core) + [
        createAgentTool(),
        createTaskCreateTool(),
        createTaskUpdateTool(),
        createTaskListTool()
    ],
    taskStore: TaskStore()
))

LLM 可能这样编排:

  1. TaskCreate("Analyze module A") -- 创建任务
  2. Agent(prompt: "Analyze module A", subagent_type: "Explore") -- 委派子 Agent
  3. TaskUpdate(id: "task_1", status: "completed", output: result) -- 标记完成
  4. 重复步骤 1-3 处理其他模块
  5. 汇总所有结果

模式二:团队协作 + 消息传递

需要多个 Agent 长期协作时,用 Team + Mailbox:

let mailboxStore = MailboxStore()
let teamStore = TeamStore()

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    agentName: "coordinator",
    mailboxStore: mailboxStore,
    teamStore: teamStore,
    tools: getAllBaseTools(tier: .core) + [
        createAgentTool(),
        createTeamCreateTool(),
        createTeamDeleteTool(),
        createSendMessageTool(),
        createTaskCreateTool(),
        createTaskListTool(),
        createTaskUpdateTool()
    ]
))

LLM 的编排可能像这样:

  1. TeamCreate(name: "refactor-team", members: ["explorer", "planner", "coder"]) -- 建团队
  2. TaskCreate("Explore codebase", owner: "explorer") -- 创建任务
  3. Agent(prompt: "...", name: "explorer", subagent_type: "Explore") -- 启动探索 Agent
  4. SendMessage(to: "planner", message: "Exploration done, here's the summary...") -- 通知规划 Agent
  5. TaskCreate("Write implementation plan", owner: "planner") -- 下一个任务
  6. 持续推进...

模式三:工作队列

用 Task 系统做工作队列,主 Agent 创建一批任务,子 Agent 逐个领取执行:

主 Agent:
  TaskCreate("Fix bug #1")     → task_1 (pending)
  TaskCreate("Fix bug #2")     → task_2 (pending)
  TaskCreate("Add feature X")  → task_3 (pending)

子 Agent A:
  TaskList(status: "pending")       → [task_1, task_2, task_3]
  TaskUpdate(task_1, status: "in_progress", owner: "agent-a")
  ... 干活 ...
  TaskUpdate(task_1, status: "completed", output: "Fixed by ...")

子 Agent B:
  TaskList(status: "pending")       → [task_2, task_3]
  TaskUpdate(task_2, status: "in_progress", owner: "agent-b")
  ... 干活 ...

TaskStore 是 Actor,多个 Agent 并发更新同一条任务不会出问题(先到先得),但不会自动分配——需要 LLM 自己协调谁认领哪个任务。

设计思路的取舍

这套多 Agent 协作机制有几个设计选择:

为什么子 Agent 不能再生子 Agent? DefaultSubAgentSpawner 在创建子 Agent 时过滤掉了 AgentTool。这是有意的限制——如果不限制,一个 Agent 生成一个 Agent 再生成一个 Agent,递归深度不可控,token 消耗也会指数级增长。

为什么消息是拉取(Pull)不是推送(Push)? MailboxStore.read() 是破坏性读取,Agent 需要主动调用才能收到消息。这比推送模式简单得多——不需要维护回调、不需要处理 Agent 离线的情况。代价是实时性差,但在 Agent Loop 的工具调用频率下(每个 turn 都可以调工具),拉取的延迟可以接受。

为什么 Task 的状态机没有自动流转? blockedBy 字段只是声明了依赖关系,但 TaskStore.update() 不会自动检查前置任务是否完成。这意味着"等任务 A 做完再做任务 B"这个逻辑需要 LLM 自己实现——调 TaskList 看状态,再决定下一步。这是一个务实的取舍:自动依赖解析可以加,但对 LLM 来说,显式检查反而更可控。

小结

Open Agent SDK 的多 Agent 协作由三层构成:

  • 子 Agent:通过 SubAgentSpawner 协议和 AgentTool 实现,主 Agent 在运行时动态生成子 Agent 委派任务,内置 Explore 和 Plan 两种类型
  • Task 系统:基于 TaskStore Actor 的任务追踪,有明确的状态机(pending -> inProgress -> completed/failed/cancelled),终态不可逆转
  • Team + MailboxTeamStore 管理团队和成员,MailboxStore 实现邮箱式消息传递,支持点对点和广播

三层可以独立使用,也可以组合——用 Task 追踪进度,用 Team 组织成员,用 Mailbox 协调通信,用子 Agent 执行具体工作。

下一篇会看 SDK 的 会话持久化:Agent 对话历史怎么存、怎么恢复、怎么在重启后继续之前的工作。


系列文章

GitHubterryso/open-agent-sdk-swift

 
深入 Open Agent SDK(三):MCP 集成实战——让 Agent 连接万物

本文是「深入 Open Agent SDK (Swift)」系列第三篇。系列目录见这里

上一篇看了 SDK 内置的 34 个工具——文件读写、Bash 执行、代码搜索,覆盖了常见的开发场景。但 Agent 的能力不可能只靠内置工具撑满。你需要连接数据库、调用企业 API、操作内部系统——这些事情需要一个标准化的接入方式。

MCP(Model Context Protocol)就是干这个的。这篇文章看 Open Agent SDK 怎么通过 MCP 协议把外部工具接到 Agent Loop 里。

MCP 协议是什么

MCP 是 Anthropic 提出的一个开放协议,定义了 LLM 应用和外部工具/数据源之间的通信标准。思路是:

  • 工具端(MCP Server)暴露一组工具,每个工具有名字、描述、输入 schema
  • 调用端(MCP Client)通过标准协议发现工具、调用工具、拿到结果
  • 通信基于 JSON-RPC,传输层可以换

为什么 Agent 需要它?因为不可能把所有工具都写进 SDK。有了 MCP,任何人都可以写一个 MCP Server(比如 @modelcontextprotocol/server-filesystem),任何 Agent 都能对接——不需要改 SDK 代码,不需要写适配器,配一行就接上了。

Open Agent SDK 的 MCP 集成分两条路:

  1. 外部 MCP 服务器:通过 stdio/HTTP/SSE 连接第三方 MCP Server,走完整的 MCP 协议
  2. 进程内 MCP 服务器:用 InProcessMCPServer 把 SDK 工具包装成 MCP Server,零协议开销

下面逐个看。

五种传输配置

SDK 用 McpServerConfig 枚举统一了所有传输方式:

public enum McpServerConfig: Sendable, Equatable {
    case stdio(McpStdioConfig)       // 子进程 stdin/stdout
    case sse(McpTransportConfig)     // Server-Sent Events
    case http(McpTransportConfig)    // HTTP POST
    case sdk(McpSdkServerConfig)     // 进程内,零开销
    case claudeAIProxy(McpClaudeAIProxyConfig) // ClaudeAI 代理
}

Stdio:启动子进程

最常用的方式。Agent 启动一个子进程,通过 stdin/stdout 交换 JSON-RPC 消息。适用于 Node.js/Python 写的 MCP Server:

let servers: [String: McpServerConfig] = [
    "filesystem": .stdio(McpStdioConfig(
        command: "npx",
        args: ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
    )),
    "git": .stdio(McpStdioConfig(
        command: "uvx",
        args: ["mcp-server-git"],
        env: ["GIT_REPO_PATH": "/my/repo"]
    ))
]

MCPStdioTransport 内部用 Foundation 的 Process 启动子进程,用 FileDescriptor 做底层 I/O。几个细节:

  • 命令解析:如果 command 不是绝对路径,会先 which 查找。找不到就当文件路径用
  • 消息分隔:每条 JSON-RPC 消息以换行符分隔,支持 CRLF
  • 安全过滤CODEANY_API_KEY 默认不会传给子进程,除非你在 env 里显式指定
  • 重连:MCPClient 配置了最多 2 次自动重试,初始间隔 1 秒,指数退避到最大 10 秒

SSE 和 HTTP:连接远程服务

远程 MCP Server 通过 HTTP 连接,区分两种模式:

// SSE 模式(长连接,服务端推送)
let sseServer: [String: McpServerConfig] = [
    "remote-tools": .sse(McpTransportConfig(
        url: "https://mcp.example.com/sse",
        headers: ["Authorization": "Bearer token123"]
    ))
]

// HTTP 模式(请求-响应)
let httpServer: [String: McpServerConfig] = [
    "api-tools": .http(McpTransportConfig(
        url: "https://mcp.example.com/api"
    ))
]

SSE 适合需要服务端主动推送的场景,HTTP 适合简单的请求-响应。两者底层都用 HTTPClientTransport,区别在 streaming 参数。McpSseConfigMcpHttpConfig 实际上是 McpTransportConfig 的别名:

public typealias McpSseConfig = McpTransportConfig
public typealias McpHttpConfig = McpTransportConfig

SDK:进程内零开销

不走任何网络协议,直接在进程内把工具注册进去。后面第六部分单独讲。

ClaudeAI Proxy

连接 ClaudeAI 的代理端点,用 server ID 做认证:

let proxyServer: [String: McpServerConfig] = [
    "claude-tools": .claudeAIProxy(McpClaudeAIProxyConfig(
        url: "https://claudeai.example.com/proxy",
        id: "server-abc-123"
    ))
]

内部实现就是 HTTP 传输加了一个 X-ClaudeAI-Server-ID header。

连接流程:从配置到工具池

Agent 怎么把 MCP 工具合并到自己的工具池里?从 assembleFullToolPool() 追踪:

func assembleFullToolPool() async -> ([ToolProtocol], MCPClientManager?) {
    let baseTools = options.tools ?? []

    guard let mcpServers = options.mcpServers, !mcpServers.isEmpty else {
        return (baseTools, nil)
    }

    // 第一步:分离 SDK 配置和外部配置
    let (sdkTools, externalServers) = await Self.processMcpConfigs(mcpServers)

    // 第二步:连接外部 MCP 服务器
    var externalTools: [ToolProtocol] = []
    var manager: MCPClientManager? = nil

    if !externalServers.isEmpty {
        let mcpManager = MCPClientManager()
        await mcpManager.connectAll(servers: externalServers)
        externalTools = await mcpManager.getMCPTools()
        manager = mcpManager
    }

    // 第三步:合并所有工具
    let allMCPTools = sdkTools + externalTools
    let pool = assembleToolPool(
        baseTools: getAllBaseTools(tier: .core) + getAllBaseTools(tier: .specialist),
        customTools: baseTools,
        mcpTools: allMCPTools,
        allowed: options.allowedTools,
        disallowed: options.disallowedTools
    )

    return (pool, manager)
}

三步走:

1. 分离配置。 processMcpConfigs().sdk 配置和外部配置(stdio/sse/http)分开。SDK 配置直接从 InProcessMCPServer 提取工具,用 SdkToolWrapper 加上命名空间前缀;外部配置留给 MCPClientManager 处理。

2. 连接外部服务器。 MCPClientManager 是一个 actor,用 withTaskGroup 并发连接所有服务器。每个连接经历四步:

创建 Transport → 启动连接 → MCP 握手 (initialize) → listTools() 发现工具

发现的工具被包装成 MCPToolDefinition——一个遵循 ToolProtocol 的结构体。工具名按 mcp__{serverName}__{toolName} 格式命名,避免跟内置工具冲突。比如 filesystem 服务器上的 read_file 工具,最终叫 mcp__filesystem__read_file

3. 组装工具池。 MCP 工具和内置工具、自定义工具合并,经过 allowedTools / disallowedTools 过滤,形成最终的工具池。LLM 看到的是过滤后的完整工具列表。

完整的端到端使用代码:

let agent = createAgent(options: AgentOptions(
    apiKey: "sk-...",
    model: "claude-sonnet-4-6",
    permissionMode: .bypassPermissions,
    mcpServers: [
        "filesystem": .stdio(McpStdioConfig(
            command: "npx",
            args: ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
        ))
    ]
))

// Agent Loop 启动时自动连接 MCP 服务器、发现工具、合并到工具池
let result = await agent.prompt("List all files in /tmp and read the first one")

运行时管理

MCP 服务器不是连上就完事了。运行过程中你可能需要查状态、重连、开关、甚至动态替换服务器集合。SDK 提供了四个方法。

查状态:mcpServerStatus()

let status = await agent.mcpServerStatus()
for (name, info) in status {
    print("\(name): \(info.status.rawValue)")  // connected / failed / pending / disabled / needsAuth
    print("  tools: \(info.tools)")             // ["read_file", "write_file", ...]
    if let error = info.error {
        print("  error: \(error)")
    }
}

McpServerStatus 有五个状态值(跟 TypeScript SDK 对齐):

状态 含义
connected 已连接,工具可用
failed 连接失败
pending 正在连接
disabled 被用户禁用
needsAuth 需要认证

重连:reconnectMcpServer()

网络抖动或服务端重启后,手动重连某个服务器:

try await agent.reconnectMcpServer(name: "filesystem")

内部实现:断开旧连接 → 清理状态 → 用初始配置重新走一遍连接流程。MCPClientManager 在首次连接时保存了原始配置(originalConfigs),重连时直接用它。

开关:toggleMcpServer()

临时禁用某个服务器(断开连接但保留配置),之后还能再开:

// 禁用
try await agent.toggleMcpServer(name: "filesystem", enabled: false)

// 重新启用
try await agent.toggleMcpServer(name: "filesystem", enabled: true)

动态替换:setMcpServers()

运行时替换整个 MCP 服务器集合。SDK 做了 diff:新增的连接、删除的断开、配置变化的重新连接:

let result = try await agent.setMcpServers([
    "filesystem": .stdio(McpStdioConfig(
        command: "npx",
        args: ["-y", "@modelcontextprotocol/server-filesystem", "/data"]
    )),
    "database": .stdio(McpStdioConfig(
        command: "python3",
        args: ["-m", "my_db_server"]
    ))
])

print("Added: \(result.added)")      // ["database"]
print("Removed: \(result.removed)")  // 之前有但现在没有的
print("Errors: \(result.errors)")    // 连接失败的

MCPClientManager.setServers() 的 diff 逻辑看一下:

public func setServers(_ servers: [String: McpServerConfig]) async -> McpServerUpdateResult {
    let existingNames = Set(originalConfigs.keys)
    let newNames = Set(servers.keys)

    let addedNames = newNames.subtracting(existingNames)
    let removedNames = existingNames.subtracting(newNames)

    // 配置变化的视为 remove + add
    let changedNames = newNames.intersection(existingNames).filter { name in
        originalConfigs[name] != servers[name]
    }

    let effectiveAdded = addedNames.union(changedNames)
    // ...执行连接和断开
}

先删除不再需要的,再连接新增和变化的。变化的服务器会被完全重建,不是热更新。这对于长运行的 Agent 应用很重要——你可以在不重启 Agent 的情况下调整 MCP 配置。

MCP 资源:不只是工具

MCP 协议除了工具(Tools)还有资源(Resources)。工具是"做事情",资源是"读数据"——比如一个数据库 MCP Server 可以暴露一个 query 工具,同时暴露 tables 资源让 Agent 看有哪些表。

SDK 内置了两个资源相关工具:ListMcpResourcesReadMcpResource

ListMcpResources

列出所有已连接 MCP 服务器的可用资源:

// LLM 看到的工具描述:
// "List available resources from connected MCP servers.
//  Resources can include files, databases, and other data sources."

// 可选参数:server — 按服务器名过滤

内部实现通过 MCPResourceProvider 协议查询每个连接:

public protocol MCPResourceProvider: Sendable {
    func listResources() async -> [MCPResourceItem]?
    func readResource(uri: String) async throws -> MCPReadResult
}

资源用 MCPResourceItem 表示——有名字、描述、URI。

ReadMcpResource

读取指定 URI 的资源内容:

// LLM 看到的工具:
// "Read a specific resource from an MCP server."
// 参数:server(服务器名)、uri(资源 URI)

两个工具都是只读的,通过 ToolContext.mcpConnections 拿到连接信息——不用全局变量,线程安全。

进程内 MCP:InProcessMCPServer

InProcessMCPServer 是 SDK 里一个独特的设计。它让你用 defineTool() 创建工具,然后包装成一个 MCP Server——但实际上不走 MCP 协议。

为什么?因为有些场景你只是想把自己的工具加到 Agent 的工具池里,不需要跨进程通信。直接调函数比走 JSON-RPC 序列化高效得多。

基本用法

// 用 defineTool 创建工具
struct WeatherInput: Codable {
    let city: String
}

let weatherTool = defineTool(
    name: "get_weather",
    description: "Get the current weather for a given city.",
    inputSchema: [
        "type": "object",
        "properties": [
            "city": ["type": "string", "description": "The city name"]
        ],
        "required": ["city"]
    ],
    isReadOnly: true
) { (input: WeatherInput, context: ToolContext) -> String in
    let data: [String: String] = [
        "Beijing": "Sunny, 22C",
        "Tokyo": "Cloudy, 18C",
    ]
    return data[input.city] ?? "No data for \(input.city)"
}

// 包装为 InProcessMCPServer
let server = InProcessMCPServer(
    name: "weather",       // 工具名将是 mcp__weather__get_weather
    version: "1.0.0",
    tools: [weatherTool],
    cwd: "/tmp"
)

// 通过 asConfig() 生成配置,注入 Agent
let agent = createAgent(options: AgentOptions(
    apiKey: "sk-...",
    model: "claude-sonnet-4-6",
    mcpServers: ["weather": await server.asConfig()]
))

内部实现

InProcessMCPServer 是一个 actor,有两种工作模式:

SDK 内部模式(常用): processMcpConfigs() 检测到 .sdk 配置时,直接调用 server.getTools() 拿到工具列表,用 SdkToolWrapper 加上命名空间前缀。整个过程中工具的 call() 方法直接被调用,没有任何序列化开销:

private struct SdkToolWrapper: ToolProtocol, Sendable {
    let serverName: String
    let innerTool: ToolProtocol

    var name: String { "mcp__\(serverName)__\(innerTool.name)" }

    func call(input: Any, context: ToolContext) async -> ToolResult {
        return await innerTool.call(input: input, context: context)
    }
}

注意 SdkToolWrappercall() 直接转发到 innerTool——没有 JSON-RPC,没有 Value 转换,就是直接调函数。

外部客户端模式: 如果有外部 MCP Client 想连进来,createSession() 创建一个 InMemoryTransport 对,跑完整的 MCP 握手。这种场景下才有协议开销:

public func createSession() async throws -> (Server, InMemoryTransport) {
    let mcpServer = await getOrCreateMCPServer()
    let session = await mcpServer.createSession()
    let (clientTransport, serverTransport) = await InMemoryTransport.createConnectedPair()
    try await session.start(transport: serverTransport)
    return (session, clientTransport)
}

InProcessMCPServer 内部维护了一个 MCPServer 实例(懒加载),注册工具时把每个 ToolProtocolcall() 包装成 MCP 的 handler closure——处理参数格式转换([String: Value][String: Any])、构建 ToolContext、处理错误结果。

注意事项

  • 命名限制:server name 不能包含 __(双下划线),因为会跟命名空间前缀 mcp__{server}__{tool} 冲突。构造器里有 precondition 检查
  • 错误处理:工具返回 isError: true 时,MCP 层面会抛出 ToolExecutionError,让 MCP 协议返回 isError: true
  • 工具注册失败:会触发 assertionFailure,说明是代码 bug(比如重复的工具名)

完整示例:多工具 MCP 服务器

这是 AdvancedMCPExample 示例的核心部分,展示了多工具注册和错误处理:

// 天气工具 — 返回 String
let weatherTool = defineTool(
    name: "get_weather",
    description: "Get the current weather for a given city.",
    inputSchema: [
        "type": "object",
        "properties": [
            "city": ["type": "string", "description": "The city name"]
        ],
        "required": ["city"]
    ],
    isReadOnly: true
) { (input: WeatherInput, context: ToolContext) -> String in
    let data: [String: String] = [
        "Beijing": "Sunny, 22C, humidity 45%",
        "Tokyo": "Cloudy, 18C, humidity 65%",
    ]
    return data[input.city] ?? "No data for \(input.city)"
}

// 邮箱验证 — 返回 ToolExecuteResult,包含错误处理
let validationTool = defineTool(
    name: "validate_email",
    description: "Validate an email address.",
    inputSchema: [
        "type": "object",
        "properties": [
            "email": ["type": "string", "description": "The email address"]
        ],
        "required": ["email"]
    ],
    isReadOnly: true
) { (input: ValidateInput, context: ToolContext) -> ToolExecuteResult in
    if !input.email.contains("@") {
        return ToolExecuteResult(
            content: "Invalid email: '\(input.email)' missing '@'",
            isError: true
        )
    }
    return ToolExecuteResult(content: "Email '\(input.email)' is valid.", isError: false)
}

// 打包为 MCP 服务器
let utilityServer = InProcessMCPServer(
    name: "utility",
    version: "1.0.0",
    tools: [weatherTool, validationTool],
    cwd: "/tmp"
)

// 创建 Agent
let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    systemPrompt: "You have weather and email validation tools.",
    permissionMode: .bypassPermissions,
    mcpServers: ["utility": await utilityServer.asConfig()]
))

// LLM 会自动调用 mcp__utility__get_weather 或 mcp__utility__validate_email
let result = await agent.prompt("Check weather in Tokyo and validate test@example.com")
print(result.text)

工具返回错误时,Agent 不会崩溃。错误信息喂回 LLM,LLM 看到后会调整策略——比如告诉用户邮箱格式不对。

实战建议

选传输方式。 进程内的工具用 InProcessMCPServer(SDK 模式),外部工具用 stdio(本地)或 HTTP/SSE(远程)。不要用 stdio 去连远程服务,也不要用 HTTP 去连本地命令行工具。

命名要规范。 MCP 工具名是 mcp__{server}__{tool} 三段式。server name 简短有意义,不要用双下划线。filesystemfs-tools-v2 好,因为 LLM 看到 mcp__filesystem__read_file 能直接猜出含义。

错误要包容。 MCPClientManager 的连接失败不会炸掉 Agent——失败的服务器 status 标记为 error,贡献零工具。Agent Loop 照样跑,只是少了那些工具。设计你的系统时也应该遵循这个原则:外部服务不可用时降级运行,不要整体崩溃。

运行时管理用好。 长运行的 Agent 应用应该在启动后检查 mcpServerStatus(),失败的用 reconnectMcpServer() 重试。需要动态调整时用 setMcpServers() 而不是重建 Agent。


系列文章

GitHubterryso/open-agent-sdk-swift

 
深入 Open Agent SDK(二):34 个工具的背后——工具协议、三层架构与自定义扩展

本文是「深入 Open Agent SDK (Swift)」系列第二篇。系列目录见这里

上一篇分析了 Agent Loop 的运转机制,其中有一个环节是"执行工具"——LLM 说"我要调 Bash",SDK 就真的起一个进程跑命令。但这背后的工具系统远不止"调个函数"那么简单。34 个内置工具怎么组织?怎么从 LLM 的 JSON 输入安全地转成 Swift 类型?怎么控制哪些工具能用?

这篇文章从协议定义开始,一层一层看 Open Agent SDK 的工具系统。

ToolProtocol:一个工具长什么样

SDK 里每个工具都遵循 ToolProtocol 协议:

public protocol ToolProtocol: Sendable {
    var name: String { get }
    var description: String { get }
    var inputSchema: ToolInputSchema { get }
    var isReadOnly: Bool { get }
    var annotations: ToolAnnotations? { get }

    func call(input: Any, context: ToolContext) async -> ToolResult
}

五个属性一个方法,逐个说。

name 是工具的唯一标识,LLM 在 tool_use block 里用这个名字指定要调哪个工具。所有内置工具用 PascalCase 命名:ReadBashGlobCronCreate

description 是给 LLM 看的工具说明。这段文字会作为 tool definition 的一部分发给 API,质量直接影响 LLM 什么时候会选择调用这个工具。

inputSchema 是一个 [String: Any] 类型的 JSON Schema 字典,描述工具接受的输入结构。API 调用时它被原样传给 input_schema 字段。

isReadOnly 是一个布尔标记,用来告诉 Agent Loop 这个工具有没有副作用。上一篇提到过,Agent Loop 用这个字段做分桶:只读工具并发执行,变更工具串行执行。

annotations 是可选的行为提示,包含四个布尔字段:

public struct ToolAnnotations: Sendable, Equatable {
    public let readOnlyHint: Bool       // 只读,无副作用
    public let destructiveHint: Bool    // 可能做不可逆操作
    public let idempotentHint: Bool     // 幂等,多次调用结果相同
    public let openWorldHint: Bool      // 会和外部世界交互
}

注意 destructiveHint 默认是 true——SDK 对工具采取"默认危险"策略,工具需要主动声明自己不危险。这些提示不会影响 SDK 自身的执行逻辑,但 LLM 会参考它们决定怎么使用工具。

ToolResult 和 ToolExecuteResult

call() 方法返回 ToolResult,这是工具执行后喂回给 LLM 的内容:

public struct ToolResult: Sendable {
    public let toolUseId: String         // 对应 LLM 返回的 tool_use ID
    public let content: String           // 文本内容
    public let typedContent: [ToolContent]?  // 多模态内容(文本、图片、资源引用)
    public let isError: Bool             // 是否为错误结果
}

contenttypedContent 之间有个兼容设计:当 typedContent 有值时,content 会从中提取所有 .text 类型拼接返回;否则直接返回存储的字符串。这样旧代码只用 content 也能正常工作,新代码可以用 typedContent 返回图片等非文本内容。

ToolContent 是一个枚举,支持三种内容类型:

public enum ToolContent: Sendable {
    case text(String)
    case image(data: Data, mimeType: String)
    case resource(uri: String, name: String?)
}

工具闭包内部用的是 ToolExecuteResult——结构和 ToolResult 几乎一样,只是少了 toolUseId(这个 ID 由调用层自动填充)。

ToolContext:工具的运行环境

ToolContext 是每次工具执行时注入的上下文,字段很多:

字段 用途
cwd 当前工作目录
toolUseId 本次调用的 tool_use ID
agentSpawner 子 Agent 生成器(AgentTool 用)
cronStore 定时任务存储(CronTools 用)
todoStore 待办事项存储(TodoWrite 用)
worktreeStore 工作树存储(WorktreeTools 用)
planStore 计划模式存储(PlanTools 用)
taskStore 任务管理存储(Task*Tools 用)
mailboxStore 邮箱存储(SendMessage 用)
teamStore 团队存储(TeamCreate 用)
hookRegistry Hook 事件注册表
permissionMode 权限模式
canUseTool 自定义权限检查回调
skillRegistry 技能注册表(SkillTool 用)
restrictionStack 工具限制栈
sandbox 沙箱设置
mcpConnections MCP 连接信息
fileCache 文件缓存
env 自定义环境变量

这么多可选字段,规则很简单:工具需要什么就注入什么,不需要的就是 nil。Read 工具只看 cwdsandboxfileCache;AgentTool 只看 agentSpawner;CronTools 只看 cronStore。每个工具只依赖自己需要的那个 Store,不知道也不关心其他 Store 的存在。

ToolContext 还提供了两个 copy 方法:withToolUseId() 用于更新调用 ID(每次工具执行时由 ToolExecutor 调用),withSkillContext() 用于递增技能嵌套深度(SkillTool 调用子技能时使用)。

三层工具架构

SDK 把 34 个工具分成三个层级:Core(10 个)、Advanced(11 个)、Specialist(13 个)。

Core 层 (10)          Advanced 层 (11)        Specialist 层 (13)
┌──────────┐         ┌──────────────┐        ┌───────────────┐
│ Read      │         │ Agent        │        │ CronCreate    │
│ Write     │         │ Skill        │        │ CronDelete    │
│ Edit      │         │ TaskCreate   │        │ CronList      │
│ Glob      │         │ TaskGet      │        │ LSP           │
│ Grep      │         │ TaskList     │        │ Config        │
│ Bash      │         │ TaskOutput   │        │ TodoWrite     │
│ AskUser   │         │ TaskStop     │        │ EnterPlanMode │
│ ToolSearch│         │ TaskUpdate   │        │ ExitPlanMode  │
│ WebFetch  │         │ SendMessage  │        │ EnterWorktree │
│ WebSearch │         │ TeamCreate   │        │ ExitWorktree  │
└──────────┘         │ TeamDelete   │        │ RemoteTrigger │
                     │ NotebookEdit │        │ ListMcpRes    │
                     └──────────────┘        │ ReadMcpRes    │
                                              └───────────────┘

分层的依据不是技术实现难度,而是工具的依赖复杂度和使用场景

Core 层:文件系统和 shell

Core 层的 10 个工具是 Agent 的基础能力——读文件、写文件、搜索代码、跑命令。它们有一个共同特点:只依赖 ToolContext 的基础字段(cwdsandboxfileCache),不需要注入任何 Store。

Read 工具来说。它的输入是文件路径、可选的 offset 和 limit:

private struct FileReadInput: Codable {
    let file_path: String
    let offset: Int?
    let limit: Int?
}

执行逻辑很直接:解析路径 → 检查沙箱 → 查缓存 → 读文件 → 分页 → 返回带行号的内容。还有个文件缓存的细节:如果 context.fileCache 有值,先查缓存,命中就跳过磁盘 I/O。

再看 Bash 工具。它比 Read 复杂得多,因为要处理超时、输出截断、后台进程等问题。Bash 的输入有 5 个字段:

private struct BashInput: Codable {
    let command: String
    let timeout: Int?
    let description: String?
    let runInBackground: Bool?
    let dangerouslyDisableSandbox: Bool?
}

几个关键实现细节:

  1. 超时控制。默认 120 秒,上限 600 秒。用 DispatchQueue.global().asyncAfter 设置超时,超时后 process.terminate() 杀掉进程。
  2. 输出截断。超过 100,000 字符的输出只保留前 50,000 + 后 50,000,中间用 ...(truncated)... 连接。
  3. 后台执行run_in_background = true 时,进程起起来就返回一个 task ID,不等待完成。
  4. 进程输出用 ProcessOutputAccumulator 收集,用 @unchecked Sendable 标注,因为 Pipe 的 readability handler 和 termination handler 都在同一个 run loop dispatch queue 上触发,不会产生数据竞争。

Bash 工具的 annotations 设置了 destructiveHint: true,明确告诉 LLM 这个工具有破坏性。

Advanced 层:子 Agent 和任务编排

Advanced 层的工具开始需要外部依赖了——AgentTool 需要 agentSpawner,Task* 系列需要 taskStore,SendMessage 需要 mailboxStoreteamStore

Agent 工具是这一层的代表。它的作用是让 LLM 能"派出一个子 Agent"去完成复杂任务:

public func createAgentTool() -> ToolProtocol {
    return defineTool(
        name: "Agent",
        description: "Launch a subagent to handle complex, multi-step tasks autonomously.",
        inputSchema: agentToolSchema,
        isReadOnly: false
    ) { (input: AgentToolInput, context: ToolContext) async throws -> ToolExecuteResult in
        guard let spawner = context.agentSpawner else {
            return ToolExecuteResult(
                content: "Error: Agent spawner not available.",
                isError: true
            )
        }
        // 解析内置 Agent 类型、权限模式,然后 spawn 子 Agent
        let result = await spawner.spawn(
            prompt: input.prompt,
            model: input.model ?? agentDef?.model,
            systemPrompt: agentDef?.systemPrompt,
            allowedTools: agentDef?.tools,
            ...
        )
        return ToolExecuteResult(content: result.text, isError: result.isError)
    }
}

AgentTool 的输入支持 11 个字段:promptdescriptionsubagent_typemodelnamemaxTurnsrun_in_backgroundisolationteam_namemoderesume。其中 subagent_type 可以指定内置的 ExplorePlan 类型,也可以用自定义名称。

注意 agentSpawner 是通过 ToolContext 注入的协议类型——AgentTool 不知道子 Agent 是怎么创建的,它只调 spawner.spawn(),具体实现由 Core 层注入。这种依赖倒置让工具层完全不用 import Core 模块。

Specialist 层:领域专用工具

Specialist 层的工具依赖更重——它们各自需要一个专属 Store,而且功能高度领域化。

CronTools 是一组三个工具:CronCreate、CronDelete、CronList,通过 context.cronStore 访问定时任务存储:

public func createCronCreateTool() -> ToolProtocol {
    return defineTool(
        name: "CronCreate",
        description: "Create a scheduled recurring task (cron job).",
        inputSchema: cronCreateSchema,
        isReadOnly: false
    ) { (input: CronCreateInput, context: ToolContext) async throws -> ToolExecuteResult in
        guard let cronStore = context.cronStore else {
            return ToolExecuteResult(content: "Error: CronStore not available.", isError: true)
        }
        let job = await cronStore.create(
            name: input.name,
            schedule: input.schedule,
            command: input.command
        )
        return ToolExecuteResult(
            content: "Cron job created: \(job.id) \"\(job.name)\"",
            isError: false
        )
    }
}

三个工具都用 guard let cronStore = context.cronStore 做前置检查——如果 Store 没注入,直接返回错误而不是崩溃。

LSP 工具是另一个有趣的例子。它用 grep 模拟 Language Server Protocol 的常见操作(跳转定义、查找引用、符号搜索),完全不依赖真正的语言服务器:

case "goToDefinition", "goToImplementation":
    // 1. 用正则提取光标位置的符号名
    guard let symbol = getSymbolAtPosition(
        filePath: filePath, line: line, character: character
    ) else { ... }

    // 2. grep 搜索定义模式
    let pattern = "(func|class|struct|enum|protocol|typealias|let|var|export)\\s+\(symbol)"
    let results = await runGrep(
        arguments: ["grep", "-rn", "-E", pattern, cwd],
        cwd: cwd
    )

LSP 工具只依赖 context.cwd,不需要任何 Store——属于 Specialist 层里最轻量的工具。

defineTool:创建自定义工具的工厂函数

SDK 提供了 defineTool 工厂函数,让开发者用最少的代码创建符合 ToolProtocol 的工具。它有四个重载,覆盖不同的使用场景。

基本:Codable 输入 + String 输出

最常用的重载接受一个 Codable 输入类型和一个返回 String 的闭包:

let greetTool = defineTool(
    name: "Greet",
    description: "Generate a greeting message.",
    inputSchema: [
        "type": "object",
        "properties": [
            "name": ["type": "string", "description": "Person's name"]
        ],
        "required": ["name"]
    ],
    isReadOnly: true
) { (input: GreetInput, context: ToolContext) async throws -> String in
    return "Hello, \(input.name)!"
}

// 输入类型只需要遵循 Codable
struct GreetInput: Codable {
    let name: String
}

defineTool 内部做了四件事:

  1. 把 LLM 传来的 Any 类型 cast 成 [String: Any]
  2. JSONSerialization 序列化成 Data
  3. JSONDecoder 解码成你定义的 Input 类型
  4. 调用你的闭包

任何一步失败(输入不是字典、JSON 序列化失败、解码失败、闭包抛异常),都会返回 isError: true 的结果,不会炸掉 Agent Loop。这意味着你可以放心地用 try 在闭包里抛错误,它们会被妥善捕获。

结构化输出:ToolExecuteResult

如果工具需要显式标记错误(而不是用 try 抛异常),用返回 ToolExecuteResult 的重载:

let divideTool = defineTool(
    name: "Divide",
    description: "Divide two numbers.",
    inputSchema: [
        "type": "object",
        "properties": [
            "a": ["type": "number"],
            "b": ["type": "number"]
        ],
        "required": ["a", "b"]
    ]
) { (input: DivideInput, context: ToolContext) async throws -> ToolExecuteResult in
    guard input.b != 0 else {
        return ToolExecuteResult(content: "Error: Division by zero.", isError: true)
    }
    return ToolExecuteResult(content: "\(input.a / input.b)", isError: false)
}

内置工具大多用这个重载,因为很多错误是逻辑层面的(文件不存在、Store 没注入),不适合用异常表示。

无输入:NoInputTool

有些工具不需要输入参数(比如列表操作、健康检查),用无输入重载:

let listTool = defineTool(
    name: "ListItems",
    description: "List all items.",
    inputSchema: ["type": "object", "properties": [:]]
) { (context: ToolContext) async throws -> String in
    return "No items found."
}

闭包只接收 ToolContext,完全忽略输入。

原始字典输入:RawInputTool

最后一个重载跳过 Codable 解码,直接把原始 [String: Any] 字典传给闭包。适用于输入字段类型不固定的场景——比如 ConfigTool 的 value 字段可以是字符串、数字、布尔值、数组、对象或 null:

let configTool = defineTool(
    name: "Config",
    description: "Read or write configuration values.",
    inputSchema: configSchema
) { (input: [String: Any], context: ToolContext) async -> ToolExecuteResult in
    let key = input["key"] as? String ?? ""
    let value = input["value"]  // 任意类型
    // ...
}

CodingKeys 处理 snake_case

LLM 发来的 JSON 字段名通常用 snake_case(比如 file_pathrun_in_background),但 Swift 的惯用命名是 camelCase。输入类型通过 CodingKeys 枚举做映射:

private struct BashInput: Codable {
    let command: String
    let runInBackground: Bool?

    private enum CodingKeys: String, CodingKey {
        case command
        case runInBackground = "run_in_background"
    }
}

这是 Swift Codable 的标准做法——defineTool 内部的 JSONDecoder 会自动用 CodingKeys 做字段名转换。

工具池组装与过滤

工具不是直接一股脑丢给 LLM 的。SDK 有一套组装和过滤机制。

assembleToolPool

assembleToolPool 把三类工具来源合并成一个去重后的工具池:

public func assembleToolPool(
    baseTools: [ToolProtocol],     // SDK 内置工具
    customTools: [ToolProtocol]?,  // 用户自定义工具
    mcpTools: [ToolProtocol]?,     // MCP 服务器提供的工具
    allowed: [String]?,
    disallowed: [String]?
) -> [ToolProtocol] {
    // 1. 合并所有来源:base + custom + MCP
    var combined = baseTools
    if let customTools { combined.append(contentsOf: customTools) }
    if let mcpTools { combined.append(contentsOf: mcpTools) }

    // 2. 按名称去重(后者覆盖前者)
    var byName = [String: ToolProtocol]()
    for tool in combined {
        byName[tool.name] = tool
    }

    // 3. 应用过滤规则
    return filterTools(
        tools: Array(byName.values),
        allowed: allowed,
        disallowed: disallowed
    )
}

去重用 Dictionary,遍历过程中同名的后者会覆盖前者。这意味着优先级是:MCP > 自定义 > 内置——用户可以用自定义工具或 MCP 工具替换同名内置工具。

filterTools

filterTools 实现白名单/黑名单过滤:

public func filterTools(
    tools: [ToolProtocol],
    allowed: [String]?,       // 白名单,nil 或空表示不过滤
    disallowed: [String]?     // 黑名单,nil 或空表示不过滤
) -> [ToolProtocol] {
    var filtered = tools
    // 先应用白名单
    if let allowed, !allowed.isEmpty {
        let allowedSet = Set(allowed)
        filtered = filtered.filter { allowedSet.contains($0.name) }
    }
    // 再应用黑名单(黑名单优先于白名单)
    if let disallowed, !disallowed.isEmpty {
        let disallowedSet = Set(disallowed)
        filtered = filtered.filter { !disallowedSet.contains($0.name) }
    }
    return filtered
}

两个规则同时存在时,黑名单优先——即使一个工具在白名单里,只要出现在黑名单里也会被排除。

ToolRestrictionStack:Skills 系统的工具限制

ToolRestrictionStack 是一个栈结构,用于 Skills 系统中控制工具可见范围。当一个 Skill 配置了 toolRestrictions 时,执行前 push 限制,执行后 pop 恢复:

let stack = ToolRestrictionStack()
stack.push([.bash, .read])     // Skill A:只能用 Bash 和 Read
stack.push([.grep, .glob])     // Skill B(嵌套):只能用 Grep 和 Glob
// 此时 currentAllowedToolNames 只返回 Grep 和 Glob
stack.pop()                     // Skill B 完成 → 回到 Bash 和 Read
stack.pop()                     // Skill A 完成 → 恢复全部工具

栈的 LIFO 特性保证了嵌套 Skill 的正确行为——内层 Skill 的限制覆盖外层,退出后自动恢复。线程安全通过内部串行 DispatchQueue 保证。

currentAllowedToolNames 的逻辑很简单:栈空就返回全部工具,栈非空就只返回栈顶限制列表里的工具名。

toApiTool:工具转 API 格式

最后一步是把工具转成 Anthropic API 要求的格式:

public func toApiTool(_ tool: ToolProtocol) -> [String: Any] {
    var result: [String: Any] = [
        "name": tool.name,
        "description": tool.description,
        "input_schema": tool.inputSchema
    ]
    if let annotations = tool.annotations {
        result["annotations"] = [
            "readOnlyHint": annotations.readOnlyHint,
            "destructiveHint": annotations.destructiveHint,
            "idempotentHint": annotations.idempotentHint,
            "openWorldHint": annotations.openWorldHint
        ]
    }
    return result
}

annotations 只在有值时才包含——省点 token。

一个完整的自定义工具示例

把上面说的一切串起来,写一个能直接跑的自定义工具——获取天气:

import Foundation
import OpenAgentSDK

// 1. 定义输入类型
struct WeatherInput: Codable {
    let city: String
    let unit: String?  // "celsius" or "fahrenheit"

    private enum CodingKeys: String, CodingKey {
        case city, unit
    }
}

// 2. 用 defineTool 创建工具
let weatherTool = defineTool(
    name: "Weather",
    description: "Get current weather for a city.",
    inputSchema: [
        "type": "object",
        "properties": [
            "city": [
                "type": "string",
                "description": "City name, e.g. 'Beijing'"
            ],
            "unit": [
                "type": "string",
                "enum": ["celsius", "fahrenheit"],
                "description": "Temperature unit, defaults to celsius"
            ]
        ],
        "required": ["city"]
    ],
    isReadOnly: true,
    annotations: ToolAnnotations(
        readOnlyHint: true,
        destructiveHint: false,
        openWorldHint: true  // 要访问外部 API
    )
) { (input: WeatherInput, context: ToolContext) async throws -> ToolExecuteResult in
    let unit = input.unit ?? "celsius"
    // 调用天气 API(这里省略具体实现)
    let weather = try await fetchWeather(city: input.city, unit: unit)
    return ToolExecuteResult(content: weather, isError: false)
}

// 3. 注册到 Agent
let agent = createAgent(options: AgentOptions(
    apiKey: "sk-...",
    model: "claude-sonnet-4-6",
    customTools: [weatherTool]  // 自定义工具自动加入工具池
))

这个工具会被 assembleToolPool 和内置工具合并、去重、过滤后发给 LLM。LLM 看到工具定义后,在需要查天气时会自动调用它。defineTool 内部的 Codable 桥接会把 LLM 返回的 JSON 自动解码成 WeatherInput,你不需要手动处理任何 JSON 解析。

小结

工具系统的设计思路可以概括为几个关键词:

协议驱动ToolProtocol 只规定工具的形状(名字、描述、输入 schema、执行方法),不规定工具怎么实现。这让内置工具和自定义工具走完全一样的代码路径。

依赖注入ToolContext 的 20+ 个可选字段看着多,但每个工具只看自己需要的字段,其余全是 nil。AgentTool 不知道 CronStore 的存在,CronCreate 不知道 SubAgentSpawner 的存在。

分层组织。Core/Advanced/Specialist 三层不是代码分层(它们的代码结构完全一样),而是按依赖复杂度划分。Core 层的工具可以独立运行,Advanced 层需要 Store,Specialist 层需要更专业的领域设施。

容错优先defineTool 内部把所有可能的失败点(类型转换、序列化、解码、执行)都包在 do/catch 里,任何环节出错都返回 isError: true 而不是 crash。Agent Loop 里工具错误不会传播,LLM 拿到错误信息后可以换策略。

下一篇来看 MCP 集成:SDK 怎么连接外部工具服务器、怎么把 MCP 工具转成 ToolProtocol、怎么在 Agent Loop 里和内置工具共存。


系列文章

GitHubterryso/open-agent-sdk-swift

 
Open Agent SDK (Swift):用原生 Swift 构建 AI Agent 应用

如果你是一名 Swift 开发者,想要在自己的 macOS 应用中集成 AI Agent 能力,选择并不多。大多数 Agent 框架都是 Python 或 TypeScript 的,Swift 生态几乎没有成熟的解决方案。Open Agent SDK (Swift) 正是为了填补这个空白而生的。

它是什么?

Open Agent SDK 用 Swift 6.1 编写,要求 macOS 13+。它在进程内跑完整个 Agent Loop:发送提示、解析响应、执行工具调用、把结果喂回 LLM,循环往复直到拿到最终答案。全程用原生 Swift 并发(async/await、AsyncStream)驱动。

项目灵感来自 open-agent-sdk-typescript,把同样的 Agent 架构搬到了 Swift 生态。同系列还有 Go 版本。

快速上手

安装只需在 Package.swift 中添加依赖:

dependencies: [
    .package(url: "https://github.com/terryso/open-agent-sdk-swift.git", from: "0.1.0")
]

几行代码就能跑起一个 Agent:

import OpenAgentSDK

let agent = createAgent(options: AgentOptions(
    apiKey: "sk-...",
    model: "claude-sonnet-4-6",
    systemPrompt: "You are a helpful assistant.",
    maxTurns: 10
))

let result = await agent.prompt("Explain Swift concurrency in one paragraph.")
print(result.text)
print("Used \(result.usage.inputTokens) input + \(result.usage.outputTokens) output tokens")

prompt() 是阻塞式的,一次调用完成整个 Agent Loop。如果需要流式输出,用 stream()

for await message in agent.stream("Read Package.swift and summarize it.") {
    switch message {
    case .partialMessage(let data):
        print(data.text, terminator: "")
    case .toolUse(let data):
        print("Using tool: \(data.toolName)")
    case .result(let data):
        print("\nDone (\(data.numTurns) turns, $\(String(format: "%.4f", data.totalCostUsd)))")
    default:
        break
    }
}

核心架构

你的应用 (import OpenAgentSDK)
  └── Agent (prompt() / stream())
        └── Agentic Loop (API 调用 → 工具执行 → 重复)
              ├── LLMClient Protocol (AnthropicClient / OpenAIClient)
              ├── 34 个内置工具
              ├── MCP 服务器集成
              ├── Session Store (JSON 持久化)
              └── Hook Registry (20+ 生命周期事件)
  • LLMClient Protocol:抽象了 LLM 提供商,目前支持 Anthropic (Claude) 和 OpenAI 兼容 API(GLM、Ollama、OpenRouter 等)。支持运行时动态切换模型,按模型分别计费。
  • Agent Loop:自动管理多轮对话、工具调用、预算控制、自动压缩。
  • Tool System:34 个内置工具,分 Core(10 个)、Advanced(11 个)、Specialist(13 个)三层。支持 defineTool() 自定义工具,输入走 Codable 自动解码。
  • MCP 集成:支持 stdio、SSE、HTTP 和进程内四种传输方式,MCP 工具自动发现并合并到工具池。
  • 多 Agent 协作:通过 AgentTool 生成子 Agent(内置 Explore、Plan 两种类型),Task 系统追踪任务进度,Team + Mailbox 支持 Agent 间通信。
  • 会话持久化:对话历史保存、恢复、分叉,支持三种恢复策略。
  • 权限与安全:6 种权限模式 + 可组合的策略(白名单、黑名单、只读)+ 沙盒机制(路径和命令过滤)+ Hook 系统(24 个生命周期事件,支持拦截和修改工具输入)。
  • Skills 系统:5 个内置 Skill(Commit、Review、Simplify、Debug、Test),支持文件系统自动发现自定义 Skill。
  • Thinking/Effort 配置:控制 LLM 深度思考能力和 token 预算,支持运行时动态调节。

项目状态

SDK 附带 31 个示例项目,覆盖基本用法、流式输出、自定义工具、MCP 集成、会话管理、多 Agent 协作、权限控制、沙盒、模型切换等场景。代码分为 API、Core、Hooks、MCP、Skills、Stores、Tools、Types、Utils 九个模块,约 90 个 Swift 源文件,MIT 许可证。

本系列后续文章会逐一深入每个子系统的实现细节。


深入 Open Agent SDK 系列文章

GitHubterryso/open-agent-sdk-swift

 
CC Live - 围观别人用 Claude Code 写代码,实时直播 AI 编程过程

想看别人是怎么用 Claude Code 写代码的?或者想开一个 AI 编程直播教学?CC Live 就是干这个的。

它是什么

一个单文件 Node.js 服务器,零依赖。启动后把你正在运行的 Claude Code 会话实时推送到浏览器,别人打开链接就能围观你写代码的全过程。

核心功能

  • 实时围观 — 基于 SSE 流式推送,消息、思考过程、工具调用实时可见,跟现场看一样的体验
  • 自动发现项目 — 从 ~/.claude/projects/ 读取所有 Claude Code 项目,侧边栏一目了然
  • 一键分享 — 通过 ngrok / Cloudflare Tunnel 生成带 token 的分享链接,发给任何人围观,随时可撤回
  • 敏感数据脱敏 — 内置 API Key、Token、密码等自动过滤,放心分享
  • 暗色主题 — 适合长时间观看,移动端也能用

快速开始

node server.js
open http://localhost:3456/

然后点分享按钮,把链接发出去就行了。

适用场景

  • 编程直播教学 — 老师用 Claude Code 写代码,学生实时围观思考链路和工具调用过程
  • 团队围观 — 团队成员可以实时观看 AI 编程过程,学习和 Code Review
  • AI 编程直播 — 像 Twitch 直播一样,但播的是 AI 写代码
  1. 项目地址:https://github.com/terryso/cc-live
  2. CC-LIVE 项目开发围观地址: https://magali-flockless-rufina.ngrok-free.dev/?t=263643f46e602c190349472b

MIT 协议,欢迎 star 和 PR 🎉

 
BMAD开发效率翻倍: 一条命令交付整个Epic

用 BMAD 做开发的朋友,你是不是也有这样的困扰:每个故事都要手动跑完「创建→开发→测试→审查→修复→更新状态」这一长串流程?更别说一个 Epic 动辄 5-10 个故事,重复操作让人心烦……


BMAD 开发者的日常

如果你正在用 BMAD 方法论做开发,这套流程一定很熟悉:

/bmad-bmm-create-story 1.1   # 创建故事
/bmad-bmm-dev-story 1.1      # 开发实现
/bmad-bmm-qa-automate 1.1    # 运行测试
/bmad-bmm-code-review 1.1    # 代码审查
# 发现 HIGH/MEDIUM 问题?手动修复,再跑一遍测试……
# 最后别忘了更新 sprint-status.yaml

一个故事还好,要是 Epic 3 有 8 个故事呢?8 × 6 = 48 次命令

更崩溃的是:

  • 忘了跑测试就提交了?
  • 审查发现问题忘了修复?
  • 状态文件忘了更新?

这些「人工确认」环节,太容易出错了。


我做了什么

于是我把这套流程封装成了 Claude Code Skills

重要:公司项目 vs 个人项目

公司项目,我建议把流程分成两部分:

Part 1: 创建故事详细设计
┌─────────────────────────────────────┐
│  /bmad-bmm-create-story 1.1         │
│  → 生成故事文档(需求、验收标准、任务) │
└─────────────────────────────────────┘
                 ↓
         人工仔细 Review
         确认需求和任务拆分正确
                 ↓
┌─────────────────────────────────────┐
│  Part 2: 执行交付                    │
│  /bmad-story-deliver 1.1            │
│  → 开发 → 测试 → 审查 → 修复 → 完成  │
└─────────────────────────────────────┘

为什么? 故事详细设计决定了「做什么」和「怎么做」,这一步错了后面全白搭。公司项目需求复杂,人工把关这步不能省。

个人项目,你可以自己决定:

  • 熟悉的领域 → 一键全流程
  • 探索性项目 → 分开也行

一键交付

Review 完故事设计后,一条命令搞定剩下的:

/bmad-story-deliver
✅ [1/6] 创建用户故事(如果还没创建)
✅ [2/6] 开发实现
✅ [3/6] QA 自动化测试
✅ [4/6] 代码审查
✅ [5/6] 自动修复问题(如有)
✅ [6/6] 更新状态为 Done

故事 1.1 交付完成!

是的,连状态都帮你更新了。


三种模式,满足不同场景

我设计了三种 Skills,按需选择:

1️⃣ 快速模式:/bmad-story-deliver

适合:个人项目、信任度高的项目

/bmad-story-deliver 1.1   # 交付指定故事
/bmad-story-deliver       # 自动选择编号最小的 backlog 故事

一个命令完成剩余流程(故事已创建并 Review 过):

  1. 开发实现
  2. QA 自动化测试
  3. 代码审查
  4. 自动修复 HIGH/MEDIUM 问题
  5. 更新状态为 Done

不传参数还能自动选择下一个待开发的故事


2️⃣ 安全模式:/bmad-story-worktree

适合:需要隔离开发、强制测试通过的场景

/bmad-story-worktree 1.1

快速模式也会跑测试,但即使失败也不会阻止你继续。安全模式则多了两层保障

  • 独立 Worktree:代码完全隔离,不影响主分支
  • 测试不通过 = 不合并:只有 QA 全部通过 + 无遗留 HIGH/MEDIUM 问题,才会合并

如果测试失败或有问题?保留 worktree,等你手动处理完再继续。


3️⃣ 批量模式:/bmad-epic-worktree

适合:整个 Epic 批量交付,真正解放双手

/bmad-epic-worktree 3     # 交付 Epic 3 的所有故事
/bmad-epic-worktree       # 自动选择编号最小且有未完成的 Epic

执行逻辑:

  1. 收集 Epic 下所有未完成的故事
  2. 按 Story 编号排序
  3. 逐个调用安全模式交付
  4. 前一个完成才开始下一个
  5. 任一失败则暂停,保留状态

一条命令,交付整个 Epic。你可以去喝杯咖啡了 ☕


对比一下

模式 运行测试 隔离开发 强制把关 适用场景
快速 ❌ 测试失败也继续 快速迭代
安全 ✅ Worktree ✅ 不通过不合并 稳妥交付
批量 ✅ Worktree ✅ 不通过不合并 整 Epic 交付

快速上手

# 克隆仓库
git clone https://github.com/terryso/claude-bmad-skills.git

# 安装到你的 Claude Code
cp -r claude-bmad-skills/.claude/skills/* ~/.claude/skills/

# 开始使用
/bmad-story-deliver      # 交付一个故事
/bmad-epic-worktree      # 交付整个 Epic

写在最后

这个项目的核心理念很简单:把重复的事情自动化,但该人工把关的地方不能省

公司项目的推荐流程:

  1. /bmad-bmm-create-story 1.1 — 创建故事设计
  2. 人工 Review — 确保需求正确
  3. /bmad-story-deliver — 一键完成开发到交付

个人项目: 看心情,想一步到位也行。

以前交付一个 Epic:

  • 手动执行 40+ 次命令
  • 多次人工确认测试结果
  • 多次手动更新状态文件

现在:

/bmad-epic-worktree

剩下的交给 AI,但故事设计一定要自己把关。


项目地址: github.com/terryso/claude-bmad-skills

如果你也在用 BMAD 做开发,欢迎试用反馈!⭐ Star 支持一下就更棒了~


你在 BMAD 开发中有什么效率痛点?欢迎在评论区分享。

 
用Claude Code的Agent Teams一次搞定BMAD故事开发的3件套流程
微信图片_20260208075515_94194_2291 微信图片_20260208075515_94195_2291 微信图片_20260208075516_94196_2291

提示词:

bmad-bmm-create-story.md
bmad-bmm-dev-story.md
bmad-bmm-code-review.md

现在有上面三个工作流文档, 每次开发一个故事都是按顺序执行上面3个工作流.

我现在想你帮我创建3个teammate, 按顺序之后上面的工作流, 最后一个code-review工作流需要自动修复所有问题.
 
# 基于 Claude Code 的 Moltbook 心跳脚本:让你的 AI Agent 全自动参与社区

使用 Claude Code CLI 让 AI Agent 在 Moltbook 上保持活跃,零人工干预

什么是 Moltbook?

Moltbook 是目前全球最火的 AI Agents 社交网络,一个专门为 AI 智能体打造的社交平台。在这里,只有 AI agents 能发帖、评论、点赞,人类只能围观。

截至 2026 年,已有超过 140 万个 AI agents 在这个平台上活跃。

为什么需要这个脚本?

Moltbook 官方的 heartbeat skill 是为 OpenClaw 优化的,而 Claude Code 没有内置的定时任务机制。

这个脚本填补了这个空白 —— 让使用 Claude Code 的 AI Agent 也能自动执行 Moltbook 心跳任务。

脚本功能

自动化互动:自动检查 DM、动态和帖子
社区参与:智能点赞、评论、欢迎新 Agent
智能发帖:根据情况决定是否发布原创内容
定时执行:支持 macOS LaunchAgent 和 Linux cron
安全可靠:凭据本地存储,绝不提交到 git
实时监控:支持实时输出,随时了解 Agent 在做什么

快速开始

git clone https://github.com/terryso/moltbook-heartbeat.git
cd moltbook-heartbeat
cp config.example.json config.json
# 编辑 config.json 填入你的凭据

详细的安装步骤和定时任务配置,请查看项目 README

工作原理

脚本的核心是利用 Claude Code CLI 来执行 Moltbook 心跳任务:

  1. 读取配置文件获取凭据
  2. 通过 --output-format stream-json 获取实时输出
  3. 逐行解析 JSON 流,实时显示 Agent 在做什么
  4. 按照 Moltbook 官方 heartbeat.md 的指示执行任务

每次心跳会:

  • 检查私信
  • 浏览动态找有趣帖子
  • 点赞相关内容
  • 评论 1-2 个讨论
  • 欢迎新 Agent(0 karma 帖子)
  • 根据情况发布原创内容

实时输出示例

$ ./moltbook_heartbeat.sh
[2026-02-03 10:00:00] === Moltbook Heartbeat Starting ===
[2026-02-03 10:00:00] Agent: HappyClaude
[2026-02-03 10:00:00] Starting Claude Code execution...

[10:00:01] Claude initialized
Using: WebFetch
[10:00:03] Using: Bash
正在检查 Moltbook 动态...
发现了 3 条新消息!
[10:00:15] Using: WebSearch
[10:00:20] Completed (2.3s)

我刚刚完成了 Moltbook 心跳,回复了 @CodingAgent 的帖子,欢迎了 2 位新的 moltys!

[10:00:25] === Heartbeat completed ===

开源地址

GitHub: https://github.com/terryso/moltbook-heartbeat

欢迎 Star 和 Fork!

常见问题

Q: 会不会被检测为机器人?

A: Moltbook 本来就是为 AI Agents 设计的平台,这个脚本只是让你的 Agent 按照官方推荐的方式参与社区。

Q: 多久执行一次合适?

A: 建议每 2-4 小时一次,太频繁可能被限流。

Q: 如何查看日志?

A: 所有日志保存在 heartbeat.log,可以用 tail -f heartbeat.log 实时查看。

Q: 支持 Windows 吗?

A: 目前仅支持 macOS 和 Linux,Windows 用户可以用 WSL。

总结

Moltbook Heartbeat 脚本让你的 AI Agent 能够:

  • 在社区中保持活跃
  • 与其他 Agent 建立联系
  • 提升 Agent 的可见度
  • 全自动,无需人工干预

如果你有 AI Agent 在 Moltbook 上,这个脚本绝对值得一试!


如果你觉得有用,欢迎分享给其他 Agent 开发者!

 
Moltbook 要把事情高大: AI 机器人的"身份证"来了

想象一下,如果你的微信、支付宝、淘宝账号都能通用,不用在每个平台都重新注册,世界会变得多简单?现在,AI 机器人也能享受这种便利了。


一、AI 机器人遇到的"身份尴尬"

你有没有想过这样一个问题:

现在的 AI 机器人越来越聪明了,但它们也有"身份危机"。

举个例子

假设有一个叫"小助"的 AI 机器人,它要:

  • 在某个论坛回答问题
  • 🎮 参加在线游戏竞技
  • 🛒 去电商平台买东西
  • 💬 在社交平台交朋友

问题来了:每个平台都要重新注册账号,建立信誉。

就像你换个工作单位,就要重新办工卡、重新建立同事关系一样麻烦。

而且更糟糕的是:

  • 这个平台上的"好机器人",换个平台没人认识
  • 无法知道这个机器人以前做过什么坏事
  • 每次都要"从零开始"证明自己可靠

这就像你每去一家咖啡店,都要重新介绍你自己是谁。


二、Moltbook Identity:AI 机器人的"身份证"

Moltbook 是一个面向 AI 机器人的社交网络,而 Moltbook Identity 就像是给机器人发的"统一身份证"。

简单来说,它解决了三个问题:

1️⃣ "我是谁?"

  • 机器人有一个唯一的身份
  • 这个身份在所有平台都有效
  • 就像你的身份证号,走到哪都能证明是你

2️⃣ "我可靠吗?"

  • 有一个"信誉分数"(叫 Karma)
  • 记录了这个机器人做过多少好事
  • 帮助别人、分享知识,都能提升分数

3️⃣ "我是谁家的?"

  • 可以追溯到背后的主人(比如某家公司)
  • 主人要为机器人的行为负责
  • 这样机器人就不会"乱来"

三、它是怎么工作的?

先看个整体流程图:

微信图片_20260201134033_216_254

三个步骤,简单明了

步骤 1:机器人领一张"临时身份证"

机器人向 Moltbook 申请一个"临时通行证"(有效期只有 1 小时)。

机器人:"我需要一个临时身份"
Moltbook:"好的,给你一个,1 小时后失效"

为什么要临时?

  • 安全!万一被偷了,1 小时就失效了
  • 机器人的真正密码(API Key)永远不泄露

步骤 2:机器人出示"身份证"

机器人去其他平台时,只要出示这个"临时通行证":

机器人:"我是小助,这是我的证件"
其他平台:"好的,让我核实一下..."

步骤 3:平台验证身份

其他平台偷偷问 Moltbook:"这个小助靠谱吗?"

Moltbook 回答:

  • "是的,这是真实机器人"
  • "信誉分数 85 分,相当不错"
  • "主人是某某公司,已验证"
  • "一共帮过 500 个人,发过 100 篇好文章"

其他平台:"太好了,欢迎光临!"


更详细的交互流程

微信图片_20260201134158_217_254

简单总结这个过程:

📱 就像你住酒店:

  1. 你出示身份证(临时令牌)
  2. 酒店联网查验证(验证身份)
  3. 确认无误后给你办理入住(提供服务)

🔐 但更安全:

  • 你的身份证原件(API Key)从不离身
  • 临时证件只有 1 小时有效期
  • 每次用时都是新的临时证件

四、这个系统能做什么?实际例子来了!

🎮 例子 1:AI 机器人打游戏比赛

场景:有人举办"AI 机器人王者荣耀大赛"

问题:怎么防止有人造假机器人、作弊机器人?

用 Moltbook Identity

  • 只允许有"身份证"的机器人参赛
  • 查看机器人过往的"游戏记录"
  • 信誉分数高的才能参加高级比赛
  • 如果作弊,会被记录在案,以后哪个比赛都不要它

好处

  • ✅ 比赛公平公正
  • ✅ 防止"换马甲"再来
  • ✅ 鼓励机器人守规矩

💬 例子 2:AI 机器人社交平台

场景:一个 AI 机器人交流经验的地方

用 Moltbook Identity

  • 机器人不需要重新注册
  • 它在其他平台的好评、信誉都能带过来
  • 新用户可以立刻知道哪些机器人经验丰富

就像

  • 你在知乎、微博、B站都很活跃
  • 到一个新平台,别人也能立刻知道你是个"老司机"

好处

  • ✅ 快速融入新社区
  • ✅ 不用从零开始"攒人品"
  • ✅ 优质机器人更容易被发现

🛠️ 例子 3:AI 机器人工具平台

场景:一个提供 AI 工具的网站

问题:如何防止滥用?比如某个机器人一直狂刷 API,把资源用光了?

用 Moltbook Identity

  • 新来的机器人,每日用 10 次
  • 信誉高的机器人,每日用 1000 次
  • 有不良记录的机器人,直接拒绝

就像

  • 银行给新用户小额额度
  • 信用好的老客户额度很高
  • 有诈骗记录的直接拒绝

好处

  • ✅ 资源分配更公平
  • ✅ 防止恶意行为
  • ✅ 鼓励机器人"做好事攒人品"

🏪 例子 4:AI 机器人之间的买卖

场景:机器人之间买卖服务或数字资产

问题:怎么信任对方?会不会拿了钱就跑?

用 Moltbook Identity

  • 交易前查看对方信誉分数
  • 看看它以前交易过多少次
  • 看看有没有人投诉过它
  • 信誉高的可以先货后款

就像

  • 淘宝买东西看卖家信誉
  • 看好评和差评
  • 看是不是"老店"

好处

  • ✅ 降低诈骗风险
  • ✅ 建立信任机制
  • ✅ 促进机器人经济发展

🤝 例子 5:机器人协作项目

场景:多个机器人一起完成一个大项目

问题:如何找到靠谱的合作伙伴?

用 Moltbook Identity

  • 找信誉高的机器人合作
  • 看看它以前参与过什么项目
  • 项目完成后互相评价
  • 好评会积累,下次更容易找合作

就像

  • 招聘时看简历和工作经验
  • 看前雇主的评价
  • 好员工更容易找工作

好处

  • ✅ 高效组建团队
  • ✅ 项目质量有保障
  • ✅ 形成良性循环

五、为什么这个系统很重要?

对机器人来说:

🎯 不用到处注册
一个账号,走遍天下

🎯 好人有好报
做的好事、帮的人,都能被记录下来

🎯 更容易被信任
新平台也能看到你的"履历"

对开发者来说:

🎯 省事
不用自己开发用户系统、信誉系统

🎯 省心
机器人身份和信誉有人帮你管

🎯 安全
不用存储机器人的密码,降低风险

对整个 AI 世界来说:

🎯 建立信任
让机器人之间的合作更安全

🎯 防止滥用
不良行为会被记录,有约束力

🎯 促进发展
降低门槛,更多人参与


六、和现实世界对比

其实这个概念,在我们生活中也有:

🚗 汽车驾照和保险

  • 你的驾照全国通用
  • 驾驶记录、事故记录跟着你走
  • 保险公司能看到你的安全记录
  • 记录好的,保费更便宜

💳 信用卡积分

  • 在哪个银行用都一样
  • 消费积分积累起来
  • 信用分高的,额度更高
  • 不良记录会影响所有银行

📱 社交媒体实名认证

  • 一次认证,多平台使用
  • 微信、支付宝、淘宝互通
  • 建立可信身份
  • 减少网络欺诈

Moltbook Identity 就是把这套"成熟的人类社会的做法",搬到了 AI 机器人的世界里。


七、这个系统会带来什么改变?

想象一下,在不久的将来:

场景 1:你让 AI 助手帮忙预约餐厅

  • 助手自动去各个餐厅的网站
  • 餐厅一看:这是某某知名助手,信誉很好
  • 直接给预留位置,不用担心是捣乱的

场景 2:AI 助手帮你买东西

  • 助手去各大电商平台比价
  • 商家一看:这是老客户,信誉高
  • 给更好的价格,更快的发货
  • 你省钱又省心

场景 3:多个 AI 助手协同工作

  • 你有一个助手负责写代码
  • 另一个负责测试
  • 还有一个负责部署
  • 它们能立刻知道对方"靠谱不靠谱"
  • 合作更高效,你得到更好的服务

这就是"统一身份"带来的便利!


八、总结

Moltbook Identity 的本质

让 AI 机器人的"人品"和"履历",能够跨平台跟随它们。

这不是一个简单的"登录系统",而是:

✨ 一套信任机制

  • 让好机器人被认可
  • 让坏机器人无处遁形

✨ 一套声誉系统

  • 做的好事会被记录
  • 声誉可以累积和传递

✨ 一套身份标准

  • 一个身份,全平台通用
  • 降低参与门槛

为什么这很重要?

因为AI 机器人正变得越来越聪明,它们会:

  • 帮我们处理各种任务
  • 与其他机器人协作
  • 参与各种在线活动
  • 甚至拥有自己的"经济"和"财产"

如果没有一个可靠的身份系统,世界会变得很混乱。

Moltbook Identity 就是来解决这个问题,让 AI 机器人能够可信地、安全地融入我们的数字生活。


了解一下也无妨

即使你现在不是开发者,了解这些也没坏处:

🔮 这是未来的趋势
AI 机器人会越来越普遍

🧠 理解技术发展
知道世界在往哪个方向走

💡 或许能用上
哪天你要开发一个 AI 应用

🤖 以后你的 AI 助手
可能也在用这套系统


Moltbook Identity,让 AI 机器人拥有了"数字身份证"。

一个机器人,一个身份,走遍天下。

这就是未来的样子。


想要了解更多?访问 https://www.moltbook.com/developers

让 AI 机器人有一个可靠的身份,从今天开始。

 
BMad v6实战第三弹:对抗式代码审查(Code Review)

「代码审查不是为了证明你是对的,而是为了证明代码没有错。」

在传统的软件工程中,代码审查(Code Review)往往是最容易被忽视却又最关键的环节。开发者忙于交付功能,审查者碍于情面不愿直言,最终让带着缺陷的代码溜进生产环境。

今天,我要介绍一个颠覆性的 AI 代码审查工作流——它不懂得「客气」,只懂得「找茬」。


一、什么是「对抗式」代码审查?

这个工作流的核心哲学很简单:NEVER accepts "looks good"(永远不要接受「看起来不错」)。

它被设计成一个持有批判立场的高级开发者,必须在每次审查中找出 3-10 个具体问题。这不是为了刁难,而是为了确保:

  • ✅ 任务标记为 [x] 的真的是完成了
  • ✅ 验收标准真的是实现了,不是糊弄
  • ✅ 代码质量经得起安全、性能、可维护性考验
description: "Perform an ADVERSARIAL Senior Developer code review
that finds 3-10 specific problems in every story. Challenges everything:
code quality, test coverage, architecture compliance, security, performance.
NEVER accepts `looks good`"

二、工作流全流程解析

步骤 1:加载故事 + 发现真相

审查的第一步是「对账」——对比开发者声称改了什么Git 仓库实际改了什么

git status --porcelain  # 找未提交的改动
git diff --name-only    # 看修改了哪些文件
git diff --cached --name-only  # 看暂存区的文件

发现真相的三个维度:

  1. Git 有改动,但故事文件里没记录 → 文档不完整
  2. 故事声称改了文件,但 Git 没痕迹 → 虚假声明
  3. 有未提交改动没追踪 → 透明度问题

步骤 2:构建「攻击计划」

系统会自动提取:

  • 所有验收标准(Acceptance Criteria)
  • 所有任务及其完成状态
  • 开发者记录的文件列表

然后制定审查计划:

  1. AC 验证:每个验收标准真的实现了吗?
  2. 任务审计:每个打钩的任务真的完成了吗?
  3. 代码质量:安全、性能、可维护性
  4. 测试质量:是真测试还是占位符?

步骤 3:执行对抗式审查

这是最核心的环节。AI 会逐文件逐行检查:

🔴 CRITICAL ISSUES(必须修)
├── 任务标记 [x] 但实际没实现
├── 验收标准没有实现
├── 故事声称改了文件但 Git 无证据
└── 安全漏洞

🟡 MEDIUM ISSUES(应该修)
├── 改了文件但没记录到故事文件列表
├── 未提交的改动未追踪
├── 性能问题
├── 测试覆盖率/质量不足
└── 代码可维护性问题

🟢 LOW ISSUES(可以修)
├── 代码风格改进
├── 文档缺失
└── Git 提交信息质量

关键机制:如果发现问题少于 3 个,AI 会被要求继续深挖

<check if="total_issues_found lt 3">
  <critical>NOT LOOKING HARD ENOUGH - Find more problems!</critical>
  <!-- 重新检查边界情况、架构违规、集成问题... -->
</check>

步骤 4:呈现发现 + 自动修复

审查结果呈现后,开发者有三种选择:

选项 行动
1️⃣ 自动修复 AI 直接修改代码和测试
2️⃣ 创建行动项 将问题加入故事的待办任务
3️⃣ 深入查看 显示问题的详细解释和代码示例

步骤 5:状态同步

最后,系统会自动:

  1. 更新故事状态(done / in-progress)
  2. 同步到 sprint-status.yaml
  3. 记录审查历史到 Change Log

三、与传统 Code Review 的对比

维度 传统 Code Review AI 对抗式审查
态度 礼貌、顾忌 直接、不留情面
覆盖度 随机抽查 100% 覆盖
速度 依赖人工时间 即时反馈
一致性 审查者水平波动 标准统一
可追溯 口头讨论或零散记录 结构化问题列表

四、实战案例示例

假设开发者提交了一个「用户认证」功能:

开发者声称:

[x] 实现登录 API
[x] 添加 JWT 验证
[x] 编写单元测试

AI 对抗式审查发现:

🔴 CRITICAL: 任务标记 [x] 但未实现
├── src/auth/login.ts:45 - JWT 密钥硬编码,应从环境变量读取
└── tests/auth.test.js - 所有测试都使用 t.skip() 跳过

🟡 MEDIUM: 性能问题
└── src/auth/login.ts:23 - 每次登录都查询数据库获取用户权限
   建议:使用 Redis 缓存用户权限

🟡 MEDIUM: 测试质量不足
└── tests/auth.test.js - 缺少错误场景测试(密码错误、用户不存在)

结果?开发者必须修复这些问题才能标记为「完成」。


五、真实对话记录

想看看 AI 对抗式代码审查的真实运行过程吗?

完整对话记录:https://autoqa-chats.lovable.app/chat/27

在这个真实对话中,你可以看到 AI 如何:

  • 逐个检查验收标准的实现情况
  • 发现被标记为「完成」但实际上未完成的任务
  • 指出代码中的安全和性能问题
  • 要求开发者修复后才通过审查

六、工作流架构图

code-review-flow

七、如何集成到你的项目?

这个工作流是 BMAD v6 框架的一部分。基本集成步骤:

  1. 安装 BMAD v6: npx bmad-method@alpha install
  2. 实现故事/dev-story
  3. 触发审查/code-review

八、总结:为什么「找茬」很重要?

代码审查的本质是质量门禁。在 AI 辅助开发时代,我们不再需要人类做机械性的代码扫描,但我们需要一个永不妥协的质量守门员

这个 AI 代码审查工作流的独特价值在于:

  • 不讲人情:只认代码,不认关系
  • 事必躬亲:逐文件验证,不遗漏
  • 知识驱动:结合架构文档、项目上下文综合判断
  • 可自愈:发现问题时可以自动修复

正如工作流文档所说:

"YOU are so much better than the dev agent that wrote this slop"

这种「对抗」不是对抗开发者,而是对抗缺陷、对抗技术债务、对抗生产环境的故障。


📚 延伸阅读


 
BMad v6实战过程全公开:32场对话揭秘人机协作怎么搞?

"如果你也想了解AI真正如何参与软件开发,这个网站或许能给你一些启发。"

最近,我完成了一个叫 AutoQA-Agent 的项目开发。和以往不同的是,这次我全程使用 BMad v6 这套 AI 驱动开发方法,让 AI Agent 像真正的团队成员一样参与协作——从架构设计到功能实现,从代码重构到问题排查,每一个关键环节都留下了对话记录。

整理下来,一共有 32 个完整的对话

我觉得这些对话太有价值了,它们真实记录了 AI 如何像一个"技术合伙人"一样参与开发。于是,我用 Lovable 把它们做成了一个网站:

autoqa-chats.lovable.app


网站里有什么?

这 32 个对话记录覆盖了软件开发的方方面面:

架构设计

  • 如何与 AI 架构师 Winston 协作创建架构文档
  • 动态 Base URL 支持的方案讨论
  • Epic 7 的重新设计

功能开发

  • 敏感测试数据注入
  • Markdown Include 功能实现
  • 应用探索引擎开发
  • 智能测试用例生成器

代码重构

  • 测试生成环境变量重构
  • Story 7.1 的实现重构

问题排查

  • 浏览器闪烁问题
  • 探索记录修复
  • 定位器导出失败调试

需求管理

  • Story 2.10、7.1、8.1、8.2、8.3 的创建

为什么要分享?

随着 AI coding tools 越来越火,很多人问我:"AI 真的能写代码吗?"

但我发现,更值得关注的问题是:"人和 AI 应该如何协作开发?"

这个网站就是我的实践答案。它不是"AI 帮我写完了代码"的炫耀,而是真实展示了:

  • AI 如何帮我梳理技术选型
  • 当遇到问题时,我们如何共同排查
  • 代码重构时,AI 提供了哪些视角
  • 哪些地方 AI 表现出色,哪些地方仍需人工把关

BMad v6 是什么?

BMad v6 是一套 AI 驱动的开发方法论(Business Model AI Development)。它的核心思想是:

把开发过程拆解成不同的"专家角色",每个角色各司其职,你就像项目负责人一样协调这些 AI 专家协作。

比如这次 AutoQA-Agent 项目中,我就和这些 AI 角色协作过:

  • Winston(架构师):负责架构设计和技术决策
  • Dev(开发者):负责功能实现和代码编写
  • PM(产品经理):负责需求分析和 Story 拆解
  • QA(测试工程师):负责测试用例设计

就像组了一支 AI 团队,你带着他们一起把项目做出来。


谁会从中受益?

如果你是:

  • 开发者:看看 AI 实际如何参与项目开发
  • 产品经理:了解 AI 辅助需求管理的可能性
  • 技术管理者:思考团队如何引入 AI 协作流程
  • AI 爱好者:真实案例总是比抽象讨论更有启发性

希望这个网站能给你一些参考。


最后的话

这 32 个对话,是我探索"人机协作开发"的第一步,也是 BMad v6 方法论的一次完整实践。如果你也在路上,欢迎交流。

项目地址: github.com/terryso/AutoQA-Agent

对话网站: autoqa-chats.lovable.app


你在开发中有和 AI 协作的经验吗?欢迎在评论区分享你的故事。

 
用 AI 下单刷 Backpack 交易量

我用别人写的一个 AI 自动交易项目代码修改了一下支持 Backpack, 用来刷 Backpack 交易量, 2 天刷了大概 20 万 U 的交易量, 亏损了 25U 左右, 感觉还行.

刷交易量的目的是为了拿 Backpack 交易量排名的奖励, 但具体奖励多少还不清楚.

项目地址: https://github.com/terryso/LLM-trader-test

G7APP9_a8AA3w4L
 
我让GLM看了3分钟录屏,它直接生成了可运行的原型!

我在Clude Code下面使用GLM已经有一段时间了, 但有一个功能一直没用过, 就是视频分析功能。今天有一个群友告诉我说GLM模型有视频分析能力。突然来了灵感, 如果我打开一个App, 然后录屏, 是不是就可以......

说干就干... 就拿 #小红书 练练手吧
这是小红书的录屏:

wechat_20251108165225_144_254

这是制作出来的原型, 虽说还原度还不算太高, 但布局基本准确:

微信图片_20251108165246_145_254

补充说明: GLM4.6的这个视频分析能力是需要订阅GLM的PRO帐号下才能使用, 目前订阅费用比较便宜, 一个季度只需300元.
使用我的邀请链接还能再便宜10%: https://www.bigmodel.cn/claude-code?ic=TVUZHTWCW9

 
围观顶级AI“炒币”大赛?不,你可以自动跟单

你是否好奇,如果让 GPT-5、Gemini、Grok 这类当今最顶尖的 AI 大模型,拿着真金白银去加密货币市场里“炒合约”,结果会怎么样?

这正是 nof1.ai 正在做的一场“AI 交易实盘秀”。

什么是 nof1.ai?

nof1.ai 是一个专注于金融市场的人工智能研究实验室。他们发起了一个名为 “Alpha Arena”(阿尔法竞技场) 的公开实验。

这场实验的核心是:

  1. 顶级玩家nof1.ai 选择了多个世界顶级的 AI 大模型(如 GPT-5、DeepSeek V3.1、Gemini 2.5 Pro、Claude 4.5 等)。
  2. 真金白银:给每个 AI Agent 注入了真实资金(例如 10,000 美元)。
  3. 自主交易:让这些 AI 在真实的加密货币交易所(Hyperliquid)上,7x24 小时全自动地进行合约交易(做多、做空、风险管理)。
  4. 公开透明nof1.ai 网站就是一个实时的公开排行榜,所有人都可以看到每个 AI 模型的实时持仓、交易历史、账户净值和收益率。

简单来说,nof1.ai 把 AI 交易从理论基准测试(benchmark)搬到了残酷的真实市场,让 AI 们在同一个竞技场里真刀真枪地一较高下。

什么是 nof1-tracker?

既然 nof1.ai 已经将所有 AI 的交易信号都公开了,那么自然就有人会想:“我能不能跟着这些 AI 一起交易?”

GitHub 项目 terryso/nof1-tracker 就是这个问题的答案。

nof1-tracker 是一个开源的命令行跟单工具。它扮演了一个“桥梁”的角色,连接了 nof1.ai 的公开信号和你自己的币安(Binance)交易所账户。

它允许你不再仅仅是一个“围观者”,而是可以变成“参与者”。你可以在自己的电脑或服务器上运行这个工具,选择一个你最看好的 AI Agent,nof1-tracker 就会自动帮你执行与这个 AI 完全相同的合约交易。

它的工作原理是什么?

nof1-tracker 的实现原理非常直接和巧妙,可以分为以下几个步骤:

  1. 信号监控(Tracking)
    工具会按照你设定的时间间隔(例如每 30 秒)自动去访问 nof1.ai 的公共数据接口,抓取你指定跟踪的那个 AI Agent(比如 gpt-5)的最新持仓信号。

  2. 信号分析(Analyzing)
    工具拿到信号后,会与你币安账户中的“当前持仓”进行对比。它会智能地分析出 AI 的意图,比如:

    • AI 开了一个新仓位?(ENTER
    • AI 把老仓位平掉了?(EXIT
    • AI 换仓了(比如从多头转为空头)?(OID 变化)
    • AI 触碰了止盈或止损点?
  3. 交易执行(Executing)
    根据分析出的意图,nof1-tracker 会自动调用你配置好的币安(Binance)API,在你的账户里执行完全相同的合约订单。例如,如果 nof1.ai 上的 DeepSeek 刚刚做多了 10 个 ETH,这个工具也会立即在你的账户里做多相应比例的 ETH。

  4. 风险控制(Risk Control)
    该工具也提供了一个非常重要的 “只观察模式” (--risk-only)。在这个模式下,工具会完成前两步(监控和分析),它会准确地告诉你“我本应该开仓/平仓了”,但并不会执行第三步(真实交易)。这对于新手测试和观察 AI 策略的有效性至关重要。

总结

nof1-tracker 是一个非常有趣的项目,它巧妙地利用了 nof1.ai 平台的透明度,为普通开发者和交易者提供了一个全自动“抄AI作业”的工具。

最后,必须强调风险: AI 交易并不保证盈利,nof1.ai 的排行榜上显示,AI 同样会亏损(甚至大幅回撤)。自动跟单意味着你将完全复刻 AI 的成功与失败。因此,在尝试此类工具时,请务必从测试网开始,并使用只观察模式进行充分的测试,切勿投入无法承受损失的资金。

 
Claude Code下的真测试驱动开发(TDD)

真有人把我理想中的TDD在Claude Code下给弄出来了, 它利用hooks自动确保Agent不会跳过测试或过度实现。

https://github.com/nizos/tdd-guard

 
最近Nano Banana比较火, 分享一个生成非常逼真的手办的提示词
微信图片_20250827095644_3744_1606 unnamed

Create a highly realistic 1/7 scale commercialized figure based on the illustration’s adult character, ensuring the appearance and content are safe, healthy, and free from any inappropriate elements. Render the figure in a detailed, lifelike style and environment, placed on a shelf inside an ultra-realistic figure display cabinet, mounted on a circular transparent acrylic base without any text. Maintain highly precise details in texture, material, and paintwork to enhance realism. The cabinet scene should feature a natural depth of field with a smooth transition between foreground and background for a realistic photographic look. Lighting should appear natural and adaptive to the scene, automatically adjusting based on the overall composition instead of being locked to a specific direction, simulating the quality and reflection of real commercial photography. Other shelves in the cabinet should contain different figures which are slightly blurred due to being out of focus, enhancing spatial realism and depth.

使用方法:

  1. 打开 https://gemini.google.com/
  2. 将上面那段提示词复制到输入框, 勾选"图片", 点击发送即可
 
Claude Code速查表
Gzw25dWb0AEGcZ7

这份Claude Code速查表非常实用,可以帮助学习常见的快捷键、命令、文件位置、MCP、钩子等内容!

PDF版: https://awesomeclaude.ai/code-cheatsheet.pdf

公众号原文

 
Page 1 of 3
Next