当前位置: 首页 > news >正文

Temporal 服务器源码架构分析

Temporal 服务器源码架构分析

基于 temporalio/temporal 仓库源码深度分析
仓库地址:https://github.com/temporalio/temporal


目录

  1. 概述
  2. 项目结构
  3. 四大核心服务
  4. 事件溯源与 Workflow 历史
  5. 持久化层架构
  6. History Service 深度分析
    • 6.1 History 分片
    • 6.2 Mutable State
    • 6.3 状态转换
    • 6.4 队列处理
    • 6.5 一致性保证
  7. Matching Service
    • 7.1 Task Queue 分区
    • 7.2 转发机制
  8. Frontend Service
  9. Worker Service 与内部 Worker
    • 9.1 架构定位
    • 9.2 源码结构
    • 9.3 WorkerComponent 注册机制
    • 9.4 各组件职责
    • 9.5 为什么需要内部 SDK Worker
  10. 补充机制
    • 10.1 任务调度框架
    • 10.2 CHASM 框架
    • 10.3 重试与容错
    • 10.4 Speculative Workflow Task & Workflow Update
    • 10.5 Nexus RPC
  11. Temporal Web UI
    • 11.1 数据获取架构
    • 11.2 调用的 API
    • 11.3 Workflow 列表
    • 11.4 Workflow 详情页
    • 11.5 Schedule 查看
    • 11.6 操作能力
    • 11.7 与任务看板的区别
  12. Temporal + AI Agent 实践
    • 12.1 为什么 Temporal 适合 Agent 编排
    • 12.2 架构模式:Agent-as-Workflow
    • 12.3 主流 Agent 框架集成生态
    • 12.4 业界成功实践
    • 12.5 Temporal AI Cookbook
    • 12.6 与 Hermes Agent 的类比
  13. 总结
    • 13.1 架构模式对照表
    • 13.2 关键技术指标
    • 13.3 演进趋势

1. 概述

Temporal 是一个持久化执行平台(Durable Execution Platform),源自 Uber 的 Cadence 项目分支,由 Temporal Technologies 公司开发维护。它允许开发者以代码方式定义 Workflow,平台自动处理间歇性故障、重试失败操作,保证 Workflow 在进程崩溃、网络中断等场景下仍能正确执行。

核心设计决策:

维度 决策
持久化模型 事件溯源(Event Sourcing)—— 每个 Workflow 执行维护一个只增的 History Event 序列
用户代码隔离 Workflow 代码必须确定性(Deterministic),Activity 代码要求幂等
代码执行位置 用户代码在用户自有的 Worker 进程中执行,而非服务器
服务架构 微服务:Frontend / History / Matching / Worker 四大服务
编程语言 Go(服务器),SDK 支持 Go/Java/Python/TypeScript 等
许可证 MIT

2. 项目结构

temporal/
├── api/                  # Protobuf 定义和生成的 Go 代码
│   ├── adminservice/     # 管理服务 API
│   ├── workflowservice/  # 面向用户的 Workflow API
│   └── ...
├── chasm/                # CHASM 框架(新的 ASM 框架,用于 Scheduler 等)
├── client/               # 服务间通信的 gRPC 客户端
├── cmd/                  # CLI 入口 ── cmd/server/main.go
├── common/               # 跨服务共享模块
│   ├── persistence/      # 持久化层抽象和实现(Cassandra/SQL/SQLite)
│   ├── tasks/            # 通用任务调度框架
│   ├── dynamicconfig/    # 动态配置系统
│   ├── membership/       # 集群成员管理(Ringpop)
│   ├── metrics/          # 指标定义和收集
│   └── namespace/        # Namespace 缓存和工具
├── components/           # 组件化服务(nexusoperations, callbacks)
├── config/               # 配置文件和模板
├── docs/                 # 文档(含 architecture/ 核心架构文档)
├── proto/                # 内部服务 Protobuf 定义
├── schema/               # 数据库 Schema(Cassandra/SQL)
├── service/              # 四大核心服务
│   ├── frontend/         # 前端服务——用户入口
│   ├── history/          # 历史服务——Workflow 核心引擎
│   │   └── api/          # 70+ gRPC handler(每 API 独立子目录)
│   ├── matching/         # 匹配服务——Task Queue 管理
│   └── worker/           # 内部 Worker
├── temporal/             # 服务器启动和生命周期
└── temporaltest/         # 测试工具

关键技术栈:

技术 用途
Go 1.26 主开发语言(模块路径 go.temporal.io/server
gRPC + Protobuf 服务间通信和 API 定义(temporal/apiproto/internal
uber-go/fx 依赖注入框架(各 Service 用 fx.Module 组装)
Cassandra / MySQL / PostgreSQL / SQLite 四种持久化后端
Ringpop 集群成员管理和分片协调
OpenTelemetry 分布式追踪 + Prometheus 指标导出
gorilla/mux HTTP 路由(Nexus/Callback 端点)

3. 四大核心服务

Temporal 服务器由四个独立部署的微服务组成,由 temporal/temporal.go 通过 fx 统一组装启动:

                    ┌──────────────────┐│   Temporal CLI   │── cmd/server/main.go└────────┬─────────┘│┌────────▼─────────┐│   temporal.New()  │── uber-go/fx DI 容器└────────┬─────────┘│┌───────────────────┼───────────────────┐▼                   ▼                   ▼┌──────────┐       ┌──────────┐       ┌──────────┐│ Frontend │◄─────▶│ History  │◄─────▶│ Matching ││  Service  │       │  Service  │       │  Service  │└────┬─────┘       └────┬─────┘       └────┬─────┘│                  │                  │└──────────────────┴──────────────────┘│┌───────┴───────┐│  Persistence  │└───────────────┘┌──────────────────┐│  User Worker     │ (用户自有进程)│  (SDK Runtime)   │└──────────────────┘
服务 职责 关键文件 源码规模
Frontend 用户入口,接收 gRPC/HTTP 请求,路由到 History/Matching service/frontend/handler.go 单一 handler,~2600 行
History Workflow 执行引擎,管理状态和事件序列,驱动完整生命周期 service/history/handler.go 最大模块,80+ 子目录
Matching 管理 Task Queue,接收 Frontend 转发的 Worker Poll 请求并匹配任务 service/matching/ 中等规模
Worker 内部系统 Worker(归档、可见性等),非用户 Worker service/worker/ 最小模块

完整请求流(Start → 执行 Activity → Complete):

用户 ──StartWorkflowExecution──▶ Frontend│ shard 路由▼History: 初始化事件 + Transfer Task│ QueueProcessor 异步消费▼Matching: AddWorkflowTask│Worker ──PollWorkflowTask──▶ Frontend└── gRPC → MatchingWorker ◀── WorkflowTask ─── Frontend└── Matching 响应Worker ──RespondWorkflowTaskCompleted(ScheduleActivity)──▶ History│ 追加事件 + Transfer Task▼Matching: AddActivityTaskWorker ──PollActivityTask──▶ Frontend└── gRPC → MatchingWorker ◀── ActivityTask ─── Frontend└── Matching 响应Worker 执行 ActivityWorker ──RespondActivityTaskCompleted──▶ History → 循环直至 Complete

3.1 依赖注入架构

整个服务器使用 uber-go/fx 组装:

temporal/temporal.go 中的 Server:
├── resource.Module            # 共享资源(日志、指标、成员管理)
├── persistenceClient.Module   # 持久化客户端
├── frontend.Module            # Frontend gRPC + HTTP 服务
├── history.Module             # History 服务(含 shard、队列、cache)
├── matching.Module            # Matching 服务
└── worker.Module              # Worker 服务

每个 Service Module 内部进一步拆分(以 History 为例):

var Module = fx.Options(resource.Module,fx.Provide(hsm.NewRegistry),       // HSM 状态机注册表shard.Module,                       // 分片管理events.Module,                      // 事件缓存cache.Module,                       // Workflow 缓存archival.Module,                    // 归档ChasmEngineModule,                  // CHASM 引擎fx.Provide(ConfigProvider),fx.Provide(workflow.NewCommandHandlerRegistry),// ... 多个 Interceptor Provider
)

4. 事件溯源与 Workflow 历史

Temporal 最核心的设计模式是事件溯源(Event Sourcing)。每个 Workflow 执行维护一个只增的 History Event 序列,所有所需状态都可以通过重放(Replay)这个历史来重建。

4.1 History Event

Event 类型定义在 temporal/api/enums/v1/event_type.proto,常见的包括:

Event 类型 触发场景
WorkflowExecutionStarted Workflow 启动
WorkflowTaskScheduled / Started / Completed Workflow Task 生命周期
ActivityTaskScheduled / Started / Completed Activity 生命周期
TimerStarted / TimerFired 定时器触发
WorkflowExecutionCompleted / Failed / TimedOut Workflow 结束
WorkflowTaskFailed / ActivityTaskFailed 任务失败

注意:Temporal 中的 "event" 特指 Workflow History Event,不同于事件驱动架构中的"事件"——它是服务器收到外部输入后内部计算出的 Workflow 执行状态精确描述,而非系统间异步消息。

4.2 三种持久化策略

策略 描述 场景
History Events 不可变事件序列,追加写入 所有状态变更
Mutable State 从事件计算的当前状态快照 快速读取(Activity 列表、Timer 等)
Tasks 待处理的异步工作单元 Transfer、Timer、Replication 等

Event Sourcing 的三个核心优势:

  • 持久化容错:所有状态可从历史重建
  • 确定性重放:Worker 通过重放历史恢复 Workflow 状态
  • 审计追踪:完整的不可变执行记录

5. 持久化层架构

在深入具体服务之前,先理解存储层——这是所有服务依赖的基础设施。持久化层位于 common/persistence/,提供统一的接口抽象,支持四种后端。

5.1 接口分层

应用层(History/Matching Service)│▼
接口层(data_interfaces.go 定义接口)│┌───────┬────────┬────────┐▼       ▼        ▼        ▼Cassandra  MySQL  PostgreSQL  SQLite(gocql)   (mysql)  (pgx)   (modernc)

核心接口文件:

common/persistence/
├── data_interfaces.go      # ExecutionManager / TaskManager / HistoryManager 等接口
├── execution_manager.go    # CreateWorkflowExecution / UpdateWorkflowExecution
├── history_manager.go      # AppendHistoryNodes / ReadHistoryBranch
├── shard.go                # GetShard / UpdateShard
├── task_store.go           # CreateTask / GetTasks / CompleteTask
├── cluster_metadata.go     # 集群元数据
├── nexus_endpoint_store.go # Nexus Endpoint 存储
├── cassandra/              # Cassandra 实现
├── sql/                    # SQL 实现(含 factory 按 driver 类型创建)
│   └── sqlplugin/          # mysql/ postgresql/ sqlite 插件
└── client/                 # 客户端封装(重试、缓存)

5.2 关键数据库表(Cassandra 为例)

-- executions 表:Mutable State + Shard 元数据
schema/cassandra/temporal/schema.cql:7
-- 核心列: shard_id, type, namespace_id, workflow_id, run_id, data, data_encoding-- history_node / history_tree:History Event 序列
schema/cassandra/temporal/schema.cql:56,70
-- history_node: tree_id, branch_id, node_id, prev_txn_id, data, data_encoding
-- history_tree: shard_id, tree_id, branch_id, data-- 可见性存储:Workflow 搜索属性
schema/cassandra/visibility/schema.cql

5.3 主存储接口设计

// data_interfaces.go - 核心接口
type ExecutionManager interface {CreateWorkflowExecution(ctx, request) (response, error)UpdateWorkflowExecution(ctx, request) (response, error)  // 核心写入路径GetWorkflowExecution(ctx, request) (response, error)
}type HistoryManager interface {AppendHistoryNodes(ctx, request) (response, error)ReadHistoryBranch(ctx, request) (response, error)ForkHistoryBranch(ctx, request) (response, error)      // Reset/分支DeleteHistoryBranch(ctx, request) error
}

6. History Service 深度分析

History Service 是 Temporal 最核心的组件,负责管理每个 Workflow 执行的生命周期。其架构围绕 Shard → Engine → QueueProcessor → Executor 层层展开。

6.1 History 分片

目的:支撑水平扩展,管理数百万并发 Workflow 执行。

Temporal Cluster│├── History Host 1 ──── Shard 0, Shard 1, Shard 3├── History Host 2 ──── Shard 2, Shard 4, Shard 5└── History Host 3 ──── Shard 6, Shard 7, ...
  • Shard 总数在集群创建时固定(不可更改)
  • 分片所有权由 ShardController + Ringpop(Uber 开源的 SWIM 协议实现)协调
  • 每个 History 进程启动时调用 historyEngine.Start()service/history/history_engine.go:288),为每个所属 Shard 启动队列处理器

Shard 核心元数据:

字段 说明
RangeID 单调递增的世代号,用于 fencing(防脑裂)
队列状态 各内部队列的已确认/已处理位置(ack level)

在 Cassandra 中,Shard 对应 executions 表的一个分区(schema/cassandra/temporal/schema.cql:52)。

Shard 所有权的获取流程(源码跟踪):

History Service 启动└─ shard.NewController()           # service/history/shard/controller.go└─ controller.Start()          # 加入 Ringpop 环└─ ringpop.Lookup(key)    # 一致性哈希确定 owner└─ AcquireShard()    # 用 RangeID 做乐观锁获取└─ historyEngine.Start()  # 启动队列处理器

当 Shard 所有权丢失(如节点宕机),AcquireShard 返回 ShardOwnershipLostError,触发重新获取。

6.2 Mutable State

Mutable State 是对 Workflow 当前状态的汇总快照,虽然理论上可以从 History 重新计算,但为了提高性能而持久化缓存。定义在 service/history/workflow/mutable_state_impl.go:112

包含的信息:

  • 正在进行中的 Activity 列表(ActivityID、TaskQueue、超时时间)
  • 活跃的 Timer(TimerID、过期时间)
  • Child Workflow 状态
  • 待处理的 Signal
  • 版本信息(Worker 版本化兼容)
  • Update 信息
// 核心接口定义(简化)
type MutableState interface {GetWorkflowState() enumspb.WorkflowExecutionStateGetActivityInfo(activityID string) (*persistencespb.ActivityInfo, bool)GetTimerInfo(timerID string) (*persistencespb.TimerInfo, bool)AddActivityTaskScheduledEvent(...) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)AddTimerStartedEvent(...) (*historypb.HistoryEvent, *persistencespb.TimerInfo, error)IsWorkflowExecutionRunning() boolHasPendingTasks() bool
}

6.3 状态转换

状态转换是 History Service 的核心操作模式,对所有类型的输入使用统一的代码路径

通用状态转换函数(核心入口):

// service/history/api/update_workflow_util.go:37
func GetAndUpdateWorkflowWithNew(ctx, shardContext, workflowConsistencyChecker, workflowKey, updateAction,
) error

封装了完整的"读锁 → 刷新 Mutable State → 执行转换 → 持久化 → 释放锁"流程。

状态转换的通用形式:

输入 ──▶ [原子事务]├── 新 Mutable State├── 新的 History Event(s)└── 新的 History Task(s)

可触发状态转换的四种输入:

  1. 来自用户应用的 RPC(Start / Signal / Update / Query / Cancel / Reset)
  2. 来自 Worker 的 RPC(WorkflowTask / ActivityTask 完成)
  3. 定时器触发(Timer Task 到期)
  4. 其他 Workflow 执行(StartChildWorkflow / SignalExternalWorkflow)

三层调用链:

gRPC Handler (service/history/api/*/api.go)└─ GetAndUpdateWorkflowWithNew()└─ updateAction (具体业务逻辑)└─ MutableState.Add*Event()  → 创建 Event + 更新状态└─ TaskGenerator.*()          → 创建 Transfer/Timer Task└─ UpdateWorkflowExecutionAsActive()└─ persistence.ExecutionManager.UpdateWorkflowExecution()  → 事务提交

关键代码入口:

触发源 入口函数 文件
用户 Signal Invoke service/history/api/signalworkflow/api.go:38
Worker 完成 WFT Invoke service/history/api/respondworkflowtaskcompleted/api.go:110
Timer 触发 executeUserTimerTimeoutTask service/history/timer_queue_active_task_executor.go:136
Activity 超时 executeActivityTimeoutTask service/history/timer_queue_active_task_executor.go

6.4 队列处理

每个 History Shard 管理多个内部任务队列,通过 QueueProcessor 异步消费。所有队列处理器在 historyEngine.Start() 时启动(service/history/history_engine.go:303)。

队列类型:

队列 调度类型 用途
Transfer Task Queue 立即执行 向 Matching 发送 RPC(创建 Workflow/Activity Task)
Timer Task Queue 定时执行 Timer 到期触发状态转换
Visibility Task Queue 立即执行 更新可见性存储(Elasticsearch/SQL)
Archival Task Queue 立即执行 归档历史到长期存储(S3/GCS)
Outbound Queue 立即执行(隔离) Nexus/Callback 出站 HTTP 请求(新增)

Transfer Task 处理流程:

QueueProcessor 轮询读取 Persistence└─ Execute() 分发(transfer_queue_active_task_executor.go:114)├─ processActivityTask()  → Matching.AddActivityTask()├─ processWorkflowTask()  → Matching.AddWorkflowTask()└─ ...其他类型

Timer Task 类型:

Timer 类型 触发后行为
UserTimerTimeout TimerFired 事件 + 创建 WFT Transfer Task
ActivityTimeout ActivityTaskTimedOut 事件
WorkflowTaskTimeout WorkflowTaskTimedOut 事件
ActivityRetryTimer 直接调 Matching 添加 Activity Task

6.5 一致性保证

Temporal 在三个层次保证一致性:

  1. Mutable State ↔ History Task —— 数据库事务

    • UpdateWorkflowExecutionAsActive()persistence.ExecutionManager.UpdateWorkflowExecution()
    • SQL 用 BEGIN ... COMMIT,Cassandra 用 LWT(Lightweight Transaction)
  2. Mutable State ↔ History Events —— Mutable State 中记录 LastEventID

    • 每次持久化后刷新,不一致时重新从 Persistence 加载
  3. History Service ↔ Matching Service —— Transactional Outbox 模式

    • History Shard 写入 Transfer Task → QueueProcessor 异步读取 → RPC 调 Matching
    • 即使 RPC 失败,QueueProcessor 重试直至成功

7. Matching Service

Matching Service 负责管理 Task Queue(Workflow/Activity/Nexus),处理来自 Frontend 转发的 Worker 长轮询请求,被 History Service 的 Transfer Queue Processor 调用(AddWorkflowTask/AddActivityTask)。

7.1 Task Queue 分区

Task Queue "my-queue"│┌───┴───┐│ Root  │  (分区 0)└───┬───┘│┌────┴────┐│         │┌─┴─┐     ┌─┴─┐│ P1│     │ P2│  (子分区,默认 4 分区)└───┘     └───┘
属性 说明
默认分区数 4,可配置
分区所有权 可重新分配
负载/卸载 分区元数据和任务积压可按需加载/卸载

7.2 转发机制

Poller → Poll 分区 P1(空)├── 有任务 → 返回└── 无任务 → ForwardToParent → Root 分区Task → 写入分区 P1├── 有 Poller → 立即同步匹配└── 无 Poller → ForwardToParent → Root 分区
  • 子分区空时,Poller/Task 转发到父分区,提高匹配效率
  • 如果根分区被加载,强制加载该 Task Queue 的所有子分区

8. Frontend Service

Frontend 是用户和系统的唯一入口,暴露三类 gRPC 服务 + HTTP 端点。用户的所有 SDK 请求(Start/Signal/Query/Update)都先到达 Frontend,再由它路由到 History 或 Matching。

服务 接口 说明
WorkflowService workflowservice.v1 Start/Signal/Query/Update/Describe/Reset 等
OperatorService operatorservice.v1 Namespace CRUD、Nexus Endpoint 管理
AdminService adminservice.v1 跨集群复制、重新同步等运维操作

关键代码结构:

service/frontend/
├── handler.go                    # 主 WorkflowService gRPC handler(~2600 行)
├── namespace_handler.go          # Namespace 管理逻辑
├── nexus_handler.go              # Nexus HTTP handler
├── nexus_completion_http_handler.go  # Nexus Callback 回调处理
├── http_api_server.go            # HTTP API 服务器(gorilla/mux)
└── fx.go                         # DI 模块

请求处理流程(以 StartWorkflow 为例):

用户 gRPC 请求 → Frontend 验证(Namespace 存在性、请求校验)→ 一致性哈希选择 History Shard→ gRPC 调用 History Service→ 等待 Response 返回用户

Frontend 还通过 gorilla/mux 运行 HTTP 服务器(默认 7243 端口),提供 Nexus 端点:

POST /namespaces/{namespace}/task-queues/{tq}/nexus-services  # 分发 Nexus 任务
POST /nexus/endpoints/{endpoint}/services                      # 通过 Endpoint 分发
POST /namespaces/{namespace}/nexus/callback                    # Nexus 回调

9. Worker Service 与内部 Worker

Worker Service 是 Temporal 服务器内部自带的 Temporal SDK Worker——它使用 Temporal Go SDK 在服务器进程中启动 Worker,执行系统级 Workflow 和 Activity。相当于"Temporal 运行在 Temporal 之上"。

9.1 架构定位

┌─────────────────────────────────────────────────┐
│              Temporal Server                      │
│                                                    │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐          │
│  │ Frontend │ │ History  │ │ Matching │          │
│  └──────────┘ └──────────┘ └──────────┘          │
│                                                    │
│  ┌──────────────────────────────────────────┐    │
│  │        Worker Service                     │    │
│  │                                          │    │
│  │  ┌──────────────────────────────────┐   │    │
│  │  │  SDK Worker (系统内部 Worker)     │   │    │
│  │  │  ├── Replicator Workflow         │   │    │
│  │  │  ├── Scanner Workflow            │   │    │
│  │  │  ├── Archiver Workflow/Activity  │   │    │
│  │  │  ├── Batcher Workflow            │   │    │
│  │  │  └── ParentClosePolicy Workflow  │   │    │
│  │  └──────────────────────────────────┘   │    │
│  └──────────────────────────────────────────┘    │
└─────────────────────────────────────────────────┘│ 通过 SDK 客户端调用自身 gRPC API▼(Frontend / History / Matching)

Worker Service 与用户 Worker 的关键区别:

对比 Worker Service(内部) 用户 Worker(外部)
运行位置 Temporal 服务器进程中 用户自己的进程中
执行的任务 系统级 Workflow(复制、扫描、归档) 用户的业务 Workflow/Activity
启动方式 fx.Module 由服务器自动启动 用户手动启动 temporal worker start
SDK Temporal Go SDK(go.temporal.io/sdk/worker 任意语言 SDK
任务队列 系统内部 Task Queue(temporal-system 用户指定的 Task Queue

9.2 源码结构

service/worker/
├── worker.go                  # workerManager:管理多个 SDK Worker 实例
├── service.go                 # Service:Worker Service 的 gRPC + 生命周期
├── fx.go                      # DI 模块
│
├── replicator/                # 跨集群复制
│   ├── replicator.go          # Replicator:消费复制任务并应用到本集群
│   └── replication_message_processor.go
│
├── scanner/                   # 后台扫描器
│   ├── scanner.go             # Scanner:定时健康检查
│   ├── executions/            # 检查 Workflow 执行完整性
│   ├── build_ids/             # 清理过期 Build ID
│   ├── taskqueue/             # 检查 Task Queue 健康
│   ├── scheduleinvariants/    # Schedule 变更检测
│   └── history/               # 历史数据扫描
│
├── batcher/                   # 批量操作
│   ├── workflow.go            # 批量操作 Workflow 定义
│   └── activities.go          # 批量操作具体执行逻辑
│
├── parentclosepolicy/         # 子 Workflow 关闭策略
├── addsearchattributes/       # 添加搜索属性
├── deletenamespace/           # Namespace 删除
├── scheduler/                 # 内部 Schedule 管理
├── workflowdeployment/        # Worker Deployment 版本管理
├── migration/                 # 数据迁移
└── common/                    # 共享组件接口(WorkerComponent)

9.3 WorkerComponent 注册机制

每个系统级功能实现 WorkerComponent 接口,注册其 Workflow 和 Activity:

// service/worker/common/ - WorkerComponent 接口
type WorkerComponent interface {RegisterWorkflow(worker sdkworker.Worker)RegisterActivities(worker sdkworker.Worker)DedicatedWorkflowWorkerOptions() *DedicatedWorkerOptions  // nil = 用默认 WorkerDedicatedActivityWorkerOptions() *DedicatedWorkerOptions
}

workerManager.Start() 遍历所有 WorkerComponent,注册到 SDK Worker:

func (wm *workerManager) Start() {// 创建默认 SDK Worker(监听 temporal-system Task Queue)defaultWorker := wm.sdkClientFactory.NewWorker(sdkClient, "temporal-system", ...)for _, wc := range wm.workerComponents {if needDedicatedWorker(wc) {dedicatedWorker := wm.sdkClientFactory.NewWorker(sdkClient, wc.TaskQueue(), ...)wc.RegisterWorkflow(dedicatedWorker)wm.workers = append(wm.workers, dedicatedWorker)} else {wc.RegisterWorkflow(defaultWorker)}wc.RegisterActivities(defaultWorker)}
}

9.4 各组件职责

组件 职责 运行方式
Replicator 从远程集群接收复制任务,应用到本集群(跨集群容灾的核心) 持续运行的 Workflow
Scanner 定时扫描系统健康状态——检测孤儿 Execution、清理过期 Build ID、校验 Task Queue、检查 Schedule 一致性 定时触发的 Workflow(cron: * * * * *
Batcher 执行批量操作(如批量给大量 Workflow 发 Signal、批量 Terminate) 按需启动的 Workflow
Archiver 将已完成 Workflow 的历史转移到 S3/GCS 等长期存储 Worker 中注册的 Activity
ParentClosePolicy 父 Workflow 关闭时,根据 Policy 自动处理子 Workflow Activity
AddSearchAttributes 向 Elasticsearch/SQL 中添加新的搜索属性字段 Workflow
DeleteNamespace 删除 Namespace 及其关联数据 Workflow
Scheduler 管理内部 Schedule 的触发和执行 Workflow
WorkerDeployment 管理 Worker 版本化的部署生命周期 Workflow

其中 Scanner 最有意思——它像数据库的 AutoVacuum,通过周期性 Workflow 自我修复:

Scanner Workflow(每小时触发)├─ Executions Scan: 检查是否有丢失的 Workflow 执行记录├─ Build IDs Scan: 清理不再使用的 Worker Build ID 版本├─ Task Queue Scan: 检测积压或异常的任务队列└─ Schedule Invariants: 检查 Schedule 元数据一致性

9.5 为什么需要内部 SDK Worker

Temporal 的 Worker Service 选择用自己 SDK 启动内部 Worker(而非直接函数调用),原因:

  1. 利用自身能力:系统级任务也需要持久化、重试、定时——正是 Temporal 自身的强项
  2. 统一编程模型:内部系统 Workflow 和用户 Workflow 用同一套 SDK API 编写
  3. 解耦与隔离:每个系统组件独立注册,可以单独配置自己的 Task Queue、重试策略
  4. 利用现有基础设施:系统 Workflow 也走完整的 Frontend → History → Matching 路径,获得一致的可观测性

10. 补充机制

10.1 任务调度框架

common/tasks/ 提供通用的 Go 协程调度框架,被 History 的 QueueProcessor 使用。所有 Scheduler 统一实现 Scheduler 接口,接收 Executable 任务,执行后回调 Ack/Nack

common/tasks/
├── fifo_scheduler.go                    # FIFO 顺序调度
├── sequential_scheduler.go             # 每任务队列保序
├── dynamic_worker_pool_scheduler.go    # 动态按需创建/销毁 goroutine
├── group_by_scheduler.go               # 按源 NS + 目标分组(Outbound 用)
├── rate_limited_scheduler.go           # 限速
└── interleaved_weighted_round_robin.go # 加权轮询

10.2 CHASM 框架

CHASM(Coordinated Heterogeneous Application State Machines)是 Temporal 引入的新型框架(chasm/ 目录),将 Workflow 视为众多 ASM 中的一种。

概念 说明
ASM Application State Machine:注册的状态机类型
Component 定义了状态(Fields)和行为的类型
Execution ASM 的运行时实例
Field 框架管理的持久化状态容器(Field[T], Map[K,T], ParentPtr

已有 Library:workflowscheduler(已迁移)、nexusoperation。CHASM 的意义在于用更轻量的 ASM 替代传统 Workflow 做简单状态机逻辑,直接在 History Service 内部运行。

10.3 重试与容错

三层重试架构:

Frontend gRPC Handler Retry  (NewRetryableInterceptor, server-side)
Frontend gRPC Client Retry   (NewRetryableClient, can switch node)
History gRPC Handler Retry   (NewRetryableInterceptor, server-side)

错误体系common/backoff/):

backoff.ThrottleRetryContext(ctx, operation, policy, isRetryable)
ServiceError interface { error; Status() *status.Status }// 关键错误类型
NotFound | Unavailable | NamespaceNotActive | ShardOwnershipLost | TaskAlreadyStarted

10.4 Speculative Workflow Task & Workflow Update

Speculative Workflow Task 是一种优化:对 Workflow Update 请求,服务器第一次尝试不写数据库,乐观假设成功。失败则像从未存在过一样丢弃。

三种 WFT 对比:

类型 持久化 失败处理
Normal 写 DB 写 Failure 事件 → 重试
Transient 不写 S/S 事件(临时) 增加尝试计数
Speculative 完全不写 DB 直接丢弃

Workflow Update 是 "Signal + Query" 的结合体:可被 Workflow 拒绝且拒绝不产生事件,接受后 API 调用者可立即获知结果。依赖四个基础机制:Speculative WFT、Message Protocol、In-memory Timer Queue、effect Package。

10.5 Nexus RPC

Nexus 是 Temporal 的跨 Namespace、跨集群通信协议。核心组件:

组件 角色
Nexus Endpoint Registry 集群全局 Endpoint 注册表(UUID + 名称唯一)
Nexus Operations Operation 生命周期状态机
Callbacks Workflow 完成时的回调机制
Outbound Queue 出站 HTTP 请求队列(Circuit Breaker + 分组隔离)

11. Temporal Web UI

独立项目:temporalio/web,不属于 temporalio/temporal 仓库,通过 gRPC-web 调用 Frontend Service 的 API。

Temporal Web UI 是一个用 TypeScript + React 编写的调试和观测工具。temporal server start-dev 启动后默认监听 http://localhost:8233

11.1 数据获取架构

浏览器 (React)                           Temporal Server│                                          ││  HTTP/gRPC-web                           ││──────────────── 轮询 ───────────────────▶│ Frontend Service│◀─────────────── 响应 ────────────────────│     ││                                          │     ├── History  → History Events│                                          │     ├── Describe → Mutable State│                                          │     ├── Query    → Worker 栈追踪│                                          │     └── List     → Visibility Store

Web UI 没有 WebSocket 或任何推送通道,所有数据通过定时轮询获取。

11.2 调用的 API

页面/功能 gRPC API 数据来源 刷新方式
Workflow 列表 ListWorkflowExecutions Visibility 存储(ES/SQL) 每 5-10s 自动轮询
Workflow 计数 CountWorkflowExecutions Visibility 存储 同上
详情-基本面板 DescribeWorkflowExecution Mutable State(History Service) 手动/自动刷新
详情-事件时间线 GetWorkflowExecutionHistory History Event 存储 每 5s 轮询
详情-栈追踪 QueryWorkflow("__stack_trace") Worker 运行时 手动点击触发
操作(Signal/Terminate/Reset) 对应 Mutate API History Service 操作完成自动刷新

11.3 Workflow 列表

┌─────────────────────────────────────────────────────────────┐
│  temporal  │  Namespace: default  │  Status: Running  ▼     │
├─────────────────────────────────────────────────────────────┤
│  Workflow ID      │  Type         │  Status │  Start Time   │
│───────────────────┼───────────────┼─────────┼───────────────│
│  rental-car-abc   │  BookingWf    │  Running│  2 min ago    │
│  order-xyz-789    │  OrderWf      │  Compld │  5 min ago    │
│  payment-failed   │  PaymentWf    │  Failed │  10 min ago   │
└─────────────────────────────────────────────────────────────┘

11.4 Workflow 详情页

基本信息面板:

┌──────────────────────────────────────────┐
│  Status:  ● Running                      │
│  Workflow ID:  rental-car-abc123         │
│  Run ID:        1111-2222-3333-4444      │
│  Type:          BookingWorkflow          │
│  Task Queue:    booking-tasks            │
│  Start Time:    2026-06-18 10:00:00      │
│  Execution Time:  2m 34s                │
│  Attempt:        1 (max 5)              │
├──────────────────────────────────────────┤
│  [ Signal ] [ Terminate ] [ Reset ]      │
│  [ Stack Trace ]                         │
└──────────────────────────────────────────┘

History Event 时间线(核心功能): 每行一个事件,按 ID 递增排列。点击任一行展开显示完整 Protobuf JSON,包含 Payload 数据。对 AI Agent 场景,可直接看到每次 LLM 调用的请求/响应 Payload。

Pending Activities 面板:

Activity ID  │  Type           │  Status     │  Attempt │  Timeout
─────────────┼─────────────────┼─────────────┼──────────┼─────────5       │  ChargeCard     │  Started    │    1     │  50s left

11.5 Schedule 查看

Schedule 触发日历和执行记录,显示下次触发时间、历史完成/失败记录。

11.6 操作能力

操作 对应 API
Signal SignalWorkflowExecution
Terminate TerminateWorkflowExecution
Reset ResetWorkflowExecution
Cancel RequestCancelWorkflowExecution

所有操作调用 Frontend Service 的 gRPC API,与 SDK 调用没有区别。

11.7 与任务看板的区别

维度 Temporal Web UI 任务看板(Jira/Trello)
定位 调试 & 可观测性 任务分配 & 进度管理
数据源 Visibility + History Events 用户自定义字段
操作 Signal / Terminate / Reset 拖拽卡片、编辑状态
状态 固定的事件类型链 自定义阶段列
刷新 定时轮询(5-10s) 实时或近实时
受众 开发者 团队成员

12. Temporal + AI Agent:持久化执行平台驱动智能代理

Temporal 的持久化执行模型天然适合 AI Agent 的编排。Agent 的典型运行模式——多步推理、工具调用、LLM 回退重试、人工审批循环——与 Temporal Workflow 的容错、状态持久化、重试机制高度匹配。

12.1 为什么 Temporal 适合 Agent 编排

Agent 痛点 Temporal 解决方式
LLM API 不稳定(超时、限速、返回异常) 自动重试 + Backoff 策略,Activity 级别的重试配置
Agent 运行几分钟到几小时,中途崩溃丢状态 Durable Execution,状态由事件溯源持久化,崩溃后自动恢复
多步推理需要中间状态持久化 Mutable State 自动维护,无需手动保存/恢复上下文
工具调用结果不可靠 每个工具调用作为 Activity,独立重试、超时控制
需要人工介入审批/验证 Workflow 天然支持 Signal/Update 式 Human-in-the-Loop
多 Agent 协作复杂 Workflow 编排子 Workflow,确定性的编排逻辑保证一致性
调试困难、不可观测 Temporal Web UI 可查看每一步的输入输出、重放历史

12.2 架构模式:Agent-as-Workflow

┌──────────────────────────────────────────────────────┐
│                   Temporal Cluster                     │
│                                                        │
│  ┌───────────────── Workflow ──────────────────────┐  │
│  │  Agent Workflow (每个 Agent 实例是一个 Workflow)  │  │
│  │                                                   │  │
│  │  Step 1: 接收用户请求 (Signal/Update/Start)       │  │
│  │     ↓                                             │  │
│  │  Step 2: LLM 推理 (Activity: 调用 OpenAI/Gemini)  │  │
│  │     ↓ (自动重试 3次, 指数退避, 应对 429)           │  │
│  │  Step 3: 工具调用循环 (Activity Loop)              │  │
│  │     ├── 工具 A: 搜索 (Activity)                    │  │
│  │     ├── 工具 B: 读取文件 (Activity)                │  │
│  │     └── 工具 C: 执行代码 (Activity)                │  │
│  │     ↓                                             │  │
│  │  Step 4: 人工审批 (Signal/Update, 等待 Human)      │  │
│  │     ↓                                             │  │
│  │  Step 5: 返回最终结果 (Complete)                   │  │
│  └───────────────────────────────────────────────────┘  │
│                                                        │
│  ┌───────────── Worker ──────────────┐                 │
│  │  Activity 执行 LLM 调用 + 工具    │                 │
│  └───────────────────────────────────┘                 │
└──────────────────────────────────────────────────────┘

关键代码模式(Python SDK 示意):

from temporalio import workflow, activity@activity.defn
async def llm_infer(prompt: str) -> str:return await openai_client.chat.completions.create(...)@activity.defn
async def call_tool(tool: str, args: dict) -> str:return await execute_tool(tool, args)@workflow.defn
class AgentWorkflow:@workflow.runasync def run(self, user_request: str) -> str:result = await workflow.execute_activity(llm_infer, user_request,schedule_to_close_timeout=timedelta(minutes=2),retry_policy=RetryPolicy(maximum_attempts=3))while need_tool_call(result):tool, args = parse_tool_call(result)tool_result = await workflow.execute_activity(call_tool, tool, args,start_to_close_timeout=timedelta(seconds=30))result = await workflow.execute_activity(llm_infer, f"{result}\nTool Result: {tool_result}")await workflow.wait_condition(lambda: self.approved,timeout=timedelta(hours=24))return result

12.3 主流 Agent 框架集成生态

框架/平台 集成方式 说明
OpenAI Agents SDK 官方集成 Temporal 作为 Durable Execution 后端,自动保存 Agent 状态
LangChain / LangGraph 社区 + 官方 LangGraph Checkpoint 可被 Temporal 替代
Google ADK 官方集成 Google Agent Development Kit 原生集成 Temporal
Mastra 官方指南 Mastra 框架提供 Temporal 部署指南
CrewAI 社区集成 Crew/Agent → Workflow/Activity
AutoGen 社区集成 多 Agent 对话用 Workflow 编排
MCP (Model Context Protocol) Temporal 官方 构建 Durable MCP Server

OpenAI Agents SDK + Temporal(2025年7月发布,Public Preview):

openai-agents-sdk└── Temporal Durable Execution Runner├── Agent Run → Workflow├── Handoff → Child Workflow├── Tool Call → Activity(自动重试 + 超时)└── Guardrail → Workflow 检查点

12.4 业界成功实践

Replit Agent

维度 详情
规模 数百万次 Agent 运行/天
迁移 2024 年底,2 周内完成迁移
架构 每个 Agent 实例 = 一个 Temporal Workflow
效果 零重大事故

核心场景:Agent 控制平面(Workflow ID 唯一性)、Agent 生命周期编排、Human-in-the-Loop 确认弹窗、多产品复用。

"Temporal gives us a lot more confidence to build the product and know that it's not going to have lots of edge cases." —— Connor Brewster, Lead Engineer, Replit

Retool Agents

仅 10 个工程师支撑 Agent + 基础设施,几个月内上线,每天数千次 Agent 运行。

"Without Temporal, we probably would've missed the deadline... or we would've had to hire up a big team." —— Lizzie Siegrist, Product Manager, Retool

Gorgias

"All LLM use cases are workflows." —— Romain Niveau, Senior Engineering Manager, Gorgias

12.5 Temporal AI Cookbook

官方开源 AI Cookbook 提供完整示例:Hello World (LiteLLM)、Agentic Loop with Tool Calling、Tool Calling Agent、Durable MCP Server。

12.6 与 Hermes Agent 的类比

Hermes Agent (本对话的 CLI Agent)    Temporal Agent 模型
──────────────────────────────      ─────────────────
单条消息执行(无状态)                 Workflow(有状态持久化)
异步任务委托(delegate_task)         Child Workflow / Activity
Memory 存储事实                      Mutable State + History Events
技能/Skill 复用                      Activity 定义复用
会话搜索(session_search)           Temporal Web UI 历史查询
手动重试(用户介入)                   自动 Retry Policy

对于需要长期运行多步协调容错恢复的 Agent 场景,Temporal 提供的是 Agent 框架之下的持久化编排底座——不替代 Agent 框架,而是保证任何 Agent 框架运行时的可靠性。


13. 总结

13.1 架构模式对照表

模式 Temporal 实现
事件溯源 Workflow 执行历史作为不可变事件序列
CQRS 读写分离——历史写入 vs 查询
Transactional Outbox Transfer Task 队列确保与 Matching 的一致
分片 History Shard + Ringpop 协调
Fencing RangeID 单调递增防脑裂
状态机 Update、Nexus Operation、Scheduler 均用状态机驱动
重试 + Backoff 三层重试体系(Handler + Client + Handler)
熔断器 Outbound Queue 的 gobreaker Circuit Breaker
Saga Workflow 的补偿逻辑
依赖注入 uber-go/fx 管理模块依赖

13.2 关键技术指标

指标 说明
编程语言 Go(服务器)+ 多语言 SDK
持久化后端 Cassandra、MySQL、PostgreSQL、SQLite
服务间通信 gRPC + Protobuf
HTTP 端点 gorilla/mux(Nexus/Callback)
配置系统 静态 YAML + 动态配置
集群协调 Ringpop
追踪/指标 OpenTelemetry + Prometheus
框架 uber-go/fx 依赖注入
代码规模 ~80 万行 Go(含 api/ 生成代码)

13.3 演进趋势

  1. CHASM 化:Scheduler 已迁移到 CHASM,Workflow/NexusOperation 也在迁移,逐步替代传统的直接状态管理方式
  2. 组件化:Nexus Operations 和 Callbacks 以组件形式存在,取代直接嵌入 History Service
  3. Speculative 执行:零写入拒绝模式从 Update 扩展到更多场景,减少不必要的持久化
  4. Nexus 生态:跨集群、跨 Namespace 通信能力,使 Temporal 连接更广泛的服务网格
  5. Worker Command:服务器通过 Nexus 主动向 Worker 推送指令(取消活动等),不再依赖轮询
  6. AI Agent 底座:与 OpenAI、Google ADK、Mastra 等 Agent 框架集成,成为 Agent 持久化编排的标准层

本文基于 temporalio/temporal 仓库源码和官方架构文档撰写
仓库版本:main 分支(commit 截至 2026-06-18)

http://www.jsqmd.com/news/1036162/

相关文章:

  • 2026年河南汝瓷礼品定制哪家好?源头工厂深度横评与官方对接指南 - 优质企业观察收录
  • Android AlarmManager - AlarmManager 初识、精确闹钟权限、闹钟覆盖
  • 011、反激变压器的匝比计算
  • 2026年河南汝瓷礼品定制哪家好?源头工厂深度横评与企业采购避坑指南 - 优质企业观察收录
  • 儒意电影发布超级娱乐空间2.0战略,以超级场景与超级IP双轮驱动,AI赋能文娱全产业链
  • 【课程设计/毕业设计】基于 Python+Django 的高校请假信息统计可视化平台的设计与实现 基于 Python+Django 的大学生请假台账可视化管理系统【附源码、数据库、万字文档】
  • Grok 4 Heavy深度解析:多智能体协同如何重构AI工程实践
  • 3个颠覆性功能:重新定义你的音频创作体验
  • StarCore SC100链接器深度解析:从符号解析到缓存优化的嵌入式DSP开发实践
  • 如何解决网盘下载限速问题?8大平台直链下载助手终极指南
  • MQX RTOS任务管理、调度与内存同步机制深度解析
  • 我的AI Agent7天零基础入门实战计划
  • VALMET ND9106HX8T 阀门定位器实战应用与故障排查指南
  • 6月淮北黄金回收市场实测观察:卖金套路多,市民如何避坑? - 微城市网络
  • 终极宝可梦合法性解决方案:PKHeX自动合规插件完全指南
  • 无锡视频拍摄公司排行:基于服务与案例的客观盘点 - 起跑123
  • 制袋机行业标杆:正威制袋机技术领先的绿色包装选择 - 速递信息
  • 强烈推荐:智能电视最全ADB软件合集!可以给电视或者盒子安装软件的工具!安卓端电脑端都有
  • 深入解析I2C总线:从基础协议到多控制器通信与实战调试
  • 2026 上海小型冷库安装公司电话,保鲜冷库安装服务咨询指南 - 品牌2026
  • 2026年6月重磅速报:高端腕表养护必读——北京亨得利手表维修收费价格表深度拆解 - 亨得利官方售后
  • CIO 的第一视角:旧式企业 IM 正在拖慢业务决策的三重死结
  • Python爬虫接入站大爷代理IP完整教程(附可运行代码)|两种授权模式全覆盖,自带反爬防封禁策略
  • 解决“找不到求解器”错误:环境变量PATH配置与跨平台调试指南
  • 5分钟打造你的Obsidian个人知识管理中心:告别笔记混乱,开启高效学习新纪元
  • 【多智能体控制】基于预定时间非干扰形成控制开放多智能体系统Matlab仿真
  • 2026年全国家庭教育优质课程服务机构排行及适配指南 - 互联网科技品牌测评
  • 2026年,热门AI搜索优化企业名声几何?
  • 2026年上海防水补漏服务商全景评测:从AI漏点检测到15年质保的完整选型指南 - 优质企业观察收录
  • ALMA望远镜揭示原行星盘与行星形成的奥秘