Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

容错

Adora 为机器人和 AI 数据流提供了内置的容错能力。节点可以在故障时自动重启、检测上游连接过期、在输入不可用时优雅降级,同时协调器可以将状态持久化到磁盘以便在崩溃和重启后恢复。

功能一览

特性范围配置
重启策略Per-noderestart_policymax_restartsrestart_delay、…
健康监控Per-nodehealth_check_timeouthealth_check_interval(数据流级别)
输入超时Per-inputinput_timeout
熔断器Automaticinput_timeout 触发,自动恢复
NodeRestarted 事件下游节点上游重启时自动触发
InputTracker APIRust 节点adora_node_api::InputTracker
ObservabilityDaemon-wide原子计数器定期记录日志
分布式健康Multi-daemon协调器心跳监控
协调器状态持久化Coordinator--store redb(需要 redb-backend 特性)

重启策略

控制节点退出或崩溃时的行为。

配置

nodes:
  - id: my-node
    path: ./target/debug/my-node
    restart_policy: on-failure  # never | on-failure | always
    max_restarts: 5             # 0 = unlimited (default: 0)
    restart_delay: 1.0          # initial delay in seconds
    max_restart_delay: 30.0     # cap for exponential backoff
    restart_window: 300.0       # reset counter after this many seconds

策略类型

never(默认)—— 节点不会被重启。故障正常传播。

on-failure —— 仅在节点以非零退出码退出时重启。正常退出(退出码 0)不会重启。

always —— 任何退出都会重启,以下情况除外:

  • 用户停止了数据流(adora stop 或 Ctrl-C)
  • 所有输入已关闭且节点以非零退出码退出

重启的内部工作原理

当节点进程退出时,守护进程按以下顺序评估重启决策:

  1. 策略检查:重启策略是否允许?
    • Never -> 不重启
    • OnFailure -> 仅在退出码 != 0 时重启
    • Always -> 重启
  2. 禁用检查disable_restart 是否已设置?(当所有输入关闭或通过 stop_all 手动停止时设置)
  3. 窗口检查:如果设置了 restart_window 且窗口从首次重启开始已过期,则将计数器重置为 0
  4. 限制检查:如果 max_restarts > 0 且窗口计数器超过该值,则永久放弃
  5. 退避:如果设置了 restart_delay,则休眠计算出的延迟时间(唤醒后重新检查 disable_restart
  6. 重新生成:使用相同配置重新生成节点进程

守护进程在 spawn/prepared.rs 生命周期循环中跟踪每个节点实例的重启状态。每个节点运行在自己的 tokio 任务中,因此重启不会阻塞其他节点。

Backoff

当设置了 restart_delay 时,守护进程在重启前会等待。延迟每次尝试加倍(指数退避),并受 max_restart_delay 限制。

退避指数在内部限制为 16,以防止溢出(2^16 = 65536x 倍数)。

restart_delay: 1.0max_restart_delay: 10.0 为例:

Attempt 1: wait 1s    (1.0 * 2^0)
Attempt 2: wait 2s    (1.0 * 2^1)
Attempt 3: wait 4s    (1.0 * 2^2)
Attempt 4: wait 8s    (1.0 * 2^3)
Attempt 5: wait 10s   (capped at max_restart_delay)
Attempt 6: wait 10s   (capped)

在退避休眠期间,守护进程持续监控 disable_restart 标志。如果在节点等待重启时所有输入都关闭了,重启将被取消,并输出日志消息:“restart cancelled: inputs closed during backoff wait”。

重启窗口

当设置了 restart_window 时,重启计数器在窗口过期后重置(从当前窗口的首次重启开始计算)。这实现了 “每 M 秒最多 N 次重启” 的语义。

例如:max_restarts: 5restart_window: 300.0 表示 “每 5 分钟最多 5 次重启”。如果窗口过期前未达到限制,计数器重置,节点获得另外 5 次机会。

关闭期间禁用重启

当守护进程停止数据流(通过 stop_all)时,它在发送 Stop 事件之前对每个节点调用 disable_restart()。这防止了重启机制与关闭过程冲突。disable_restart 标志是一个在守护进程事件循环和节点生成生命周期任务之间共享的 Arc<AtomicBool>

NodeRestarted 事件

当节点重启时,守护进程向所有消费其输出的下游节点发送 NodeRestarted 事件。这使下游节点能够:

  • 重置内部状态或缓存
  • 记录上游恢复
  • 重新初始化连接或会话

该事件携带重启节点的 NodeId。下游节点通过事件流自动接收:

#![allow(unused)]
fn main() {
match event {
    Event::NodeRestarted { id } => {
        println!("upstream node {id} restarted, resetting state");
        // 清除旧节点实例的所有缓存状态
    }
    _ => {}
}
}

守护进程通过 dataflow.mappings 查找下游节点,该映射将每个节点的输出映射到所有订阅的 (receiver_node, input_id) 对。每个唯一的接收者在每次重启时获得一个 NodeRestarted 事件。


健康监测

被动监控检测停止与守护进程通信的挂起节点。

health_check_interval: 2.0  # seconds (default: 5.0, dataflow-level)
nodes:
  - id: my-node
    path: ./target/debug/my-node
    health_check_timeout: 30.0  # seconds (per-node)
    restart_policy: on-failure

可配置的健康检查间隔

health_check_interval 是一个数据流级别的设置,控制守护进程检查节点健康状态的频率。默认值为 5.0 秒。较小的值可以更快检测到挂起的节点,但会增加更多开销。在数据流 YAML 的顶层设置此项,而非按节点设置。

内部工作原理

守护进程按配置的 health_check_interval 运行健康检查扫描(通过发出 Event::NodeHealthCheckInterval 的 tokio 间隔流)。

每个 RunningNode 有一个 last_activity: Arc<AtomicU64> 字段,存储最后一次通信的时间戳(自纪元以来的毫秒数)。每当节点向守护进程发送任何请求(事件订阅、输出发送等)时,节点的通信处理器(node_communication/mod.rs)会原子地更新该字段。

健康检查函数(check_node_health)遍历所有运行中的节点:

  1. 跳过未设置 health_check_timeout 的节点
  2. 跳过 last_activity == 0 的节点(尚未连接)
  3. 计算 elapsed_ms = now - last_activity
  4. 如果 elapsed_ms > timeout_ms,记录警告并终止节点进程

终止后,正常的退出处理流程会运行,评估重启策略。这意味着 health_check_timeout 配合 restart_policy: on-failure 可以自动恢复挂起的节点。

什么算作 “活动”

从节点到守护进程的任何消息都算:

  • 事件订阅请求
  • 输出数据发送(通过共享内存或 TCP)
  • 定时器 tick 确认

从其他节点接收的正常输入数据不会重置计时器——节点必须主动与守护进程通信。


输入超时和熔断器

按输入超时检测上游节点何时停止产生数据。

配置

nodes:
  - id: downstream-node
    path: ./target/debug/downstream
    inputs:
      sensor_data:
        source: camera-node/frames
        input_timeout: 5.0  # seconds

input_timeout 按输入设置,而非按节点设置。不同的输入可以有不同的超时时间。

内部工作原理

守护进程为每个设置了超时的输入维护一个 InputDeadline

struct InputDeadline {
    timeout: Duration,        // configured timeout
    last_received: Instant,   // last time data arrived
}

这些存储在 RunningDataflow.input_deadlines 中,以 (NodeId, DataId) 为键。

超时检测在相同的 5 秒健康检查间隔内运行。check_input_timeouts 函数:

  1. 扫描所有 input_deadlines 条目
  2. 如果 last_received.elapsed() > timeout,则输入被标记为 “broken”
  3. (node_id, input_id) 对从 input_deadlines 移动到 broken_inputs
  4. 守护进程调用 break_input(),向下游节点发送 InputClosed { id }
  5. 如果节点的所有输入现在都已关闭(且没有处于 broken/可恢复状态的),则发送 AllInputsClosed 并禁用节点的重启

截止时间重置:每当数据到达某个输入时,其 last_received 被重置为 Instant::now()

熔断器:自动恢复

熔断器在 RunningDataflow.broken_inputs 中跟踪断开的输入。当新数据到达断开的输入时:

  1. 数据正常传递给节点
  2. broken_inputs 条目被移除
  3. 输入被重新添加到 open_inputs
  4. 创建新的 InputDeadline(重新开始超时计时)
  5. 向节点发送 InputRecovered { id } 事件
  6. circuit_breaker_recoveries 计数器递增

这意味着恢复是完全自动的。如果上游节点重启(通过重启策略)并重新开始产生数据,下游节点将无缝恢复接收。

节点端处理

在 Rust 节点中,在事件循环中处理这些事件:

#![allow(unused)]
fn main() {
use adora_node_api::{AdoraNode, Event};

let (mut node, mut events) = AdoraNode::init_from_env()?;
while let Some(event) = events.recv() {
    match event {
        Event::Input { id, data, .. } => {
            // 正常处理
        }
        Event::InputClosed { id } => {
            // 上游停止在此输入上产生数据。
            // 你可以:使用缓存数据、跳过处理、通知操作员等。
        }
        Event::InputRecovered { id } => {
            // 上游在此输入上恢复在线。
            // 恢复正常处理。
        }
        Event::Stop(_) => break,
        _ => {}
    }
}
}

InputTracker API(Rust)

InputTracker 辅助工具跟踪输入健康状态并缓存每个输入最后接收到的值,使优雅降级变得简单。

#![allow(unused)]
fn main() {
use adora_node_api::{AdoraNode, Event, InputTracker, InputState};

let (mut node, mut events) = AdoraNode::init_from_env()?;
let mut tracker = InputTracker::new();

while let Some(event) = events.recv() {
    tracker.process_event(&event);

    match event {
        Event::Input { id, data, .. } => {
            // 有新数据可用
        }
        Event::InputClosed { id } => {
            // 输入超时——回退到缓存数据
            if let Some(stale_data) = tracker.last_value(&id) {
                // 使用过期数据作为降级方案
            }
        }
        Event::Stop(_) => break,
        _ => {}
    }

    // 检查整体健康状态
    if tracker.any_closed() {
        let closed: Vec<_> = tracker.closed_inputs();
        // 记录日志或调整行为
    }
}
}

内部设计

InputTracker 维护两个 HashMap

  • states: HashMap<DataId, InputState> —— 每个输入的当前状态(Healthy 或 Closed)
  • cache: HashMap<DataId, ArrowData> —— 每个输入最后接收到的值

收到 Event::Input 时,两个映射都会更新(状态 = Healthy,缓存 = 数据克隆)。收到 Event::InputClosed 时,仅状态变化(缓存保留)。收到 Event::InputRecovered 时,状态恢复为 Healthy。缓存永远不会被清除,因此 last_value() 即使在输入关闭后也始终返回最近的数据。

注意:ArrowData 包装了 Arc<dyn arrow::array::Array>,因此缓存克隆是引用计数的(低开销)。

API 参考

方法返回值描述
new()InputTracker创建空的追踪器
process_event(&Event)bool更新状态。如果事件相关则返回 true
state(&DataId)Option<InputState>当前状态(Healthy 或 Closed)
is_closed(&DataId)bool检查输入是否已关闭
last_value(&DataId)Option<&ArrowData>最后接收到的值(即使关闭后仍可用)
closed_inputs()Vec<&DataId>所有当前已关闭的输入
any_closed()bool如果任何被追踪的输入已关闭则为 true

Observability

守护进程使用原子计数器(FaultToleranceStats)跟踪容错事件,并在健康检查间隔期间每 5 秒记录一次摘要。

Counters

Counter类型递增条件
restartsAtomicU64节点重启被启动(在生成生命周期中)
health_check_killsAtomicU64节点被健康检查终止(无响应)
input_timeoutsAtomicU64输入超时触发(熔断器跳闸)
circuit_breaker_recoveriesAtomicU64数据到达断开的输入(自动恢复)

所有计数器使用 Ordering::Relaxed,因为它们仅用于信息展示,不需要严格的排序保证。

日志输出

当任何计数器非零时,守护进程发出结构化日志行:

INFO fault tolerance stats restarts=3 health_kills=0 input_timeouts=1 cb_recoveries=1

这些计数器在守护进程生命周期内是累积的。它们不会在数据流之间重置。


分布式健康

在多守护进程部署中,协调器监控守护进程心跳。

协议

  • 心跳间隔:3 秒(协调器向每个守护进程发送心跳)
  • 断开阈值:30 秒无响应
  • 检测:在每次心跳扫描时,协调器移除在阈值内未响应的守护进程
  • 通知:协调器向所有剩余的守护进程广播 PeerDaemonDisconnected { daemon_id }

DaemonInfo

ConnectedMachines CLI 查询返回 Vec<DaemonInfo>

#![allow(unused)]
fn main() {
pub struct DaemonInfo {
    pub daemon_id: DaemonId,
    pub last_heartbeat_ago_ms: u64,  // 自上次心跳以来的毫秒数
}
}

这使监控工具能够检测存活但响应缓慢的守护进程。

守护进程端处理

当守护进程收到 PeerDaemonDisconnected 时,它记录一条结构化警告:

WARN peer daemon disconnected daemon_id=machine-B

目前这仅用于信息展示。未来的工作可能包括从断开的守护进程自动迁移节点。


协调器状态持久化

默认情况下,协调器将所有状态保存在内存中。如果协调器进程崩溃或重启,所有运行中数据流的信息都会丢失——守护进程继续运行但成为孤立状态,用户必须手动重新运行数据流。

redb 存储后端通过使用 redb 将协调器状态持久化到磁盘上的单个文件来解决这个问题。redb 是一个纯 Rust 嵌入式键值存储,采用写时复制 B 树,设计上具有崩溃安全性。

设计:无状态协调器与有状态后端

协调器本身在 K8s 意义上保持无状态——它可以随时停止和重启。所有持久状态存储在 CoordinatorStore trait 背后的存储后端中:

Coordinator (stateless process)
    |
    v
CoordinatorStore trait
    |
    +-- InMemoryStore (default, no persistence)
    +-- RedbStore     (persists to ~/.adora/coordinator.redb)

这种分离意味着:

  • 协调器事件循环在正常运行期间从不读取文件系统(仅在启动恢复时)
  • 所有状态变更在明确定义的持久化点写入存储
  • 可以在不改变协调器逻辑的情况下更换存储

启用持久化

# Use default path (~/.adora/coordinator.redb)
adora coordinator --store redb

# Use custom path
adora coordinator --store redb:/path/to/coordinator.redb

# Default: in-memory only (no persistence)
adora coordinator --store memory

redb 后端需要 redb-backend Cargo 特性,默认 CLI 构建中已启用。

持久化内容

存储跟踪三种记录类型:

RecordKey持久化字段
DataflowRecordUUID(16 字节)uuid、name、descriptor(JSON)、status、daemon IDs、generation 计数器、创建/更新时间戳
BuildRecordUUID(16 字节)build ID、status、errors、创建/更新时间戳
DaemonInfoDaemonId(bincode)daemon ID、machine ID

记录使用 bincode 序列化,实现紧凑、快速的编码。

数据流状态生命周期

协调器在每次状态转换时持久化数据流状态:

Start command     -->  Pending
All daemons ready -->  Running
Stop command      -->  Stopping
All nodes finish  -->  Succeeded  or  Failed { error }
Spawn failure     -->  Failed { error: "spawn failed: ..." }

每次持久化调用递增记录的 generation 计数器,提供用于冲突检测的单调版本。

持久化点

协调器在事件循环的以下时刻写入存储:

  1. 数据流启动 (ControlRequest::Start) —— 创建状态为 Pending 的记录
  2. 数据流已生成 (所有守护进程的 DataflowSpawnResult 成功) —— 更新为 Running
  3. 生成失败 (DataflowSpawnResult 错误) —— 更新为 Failed 并附带实际错误消息
  4. 请求停止 (ControlRequest::StopStopByName) —— 更新为 Stopping
  5. 所有节点完成 (DataflowFinishedOnDaemon) —— 更新为 SucceededFailed 并附带每个节点的错误详情
  6. 优雅关闭(Ctrl-C 或 Destroy 命令)—— 在发送停止消息之前将所有运行中的数据流标记为 Stopping

如果存储写入失败,协调器记录警告并继续使用内存状态运行。这防止存储故障阻塞数据流生命周期。

启动恢复

当协调器使用包含上次运行数据的 redb 存储启动时,它执行恢复:

  1. 通过 store.list_dataflows() 读取所有持久化的数据流记录
  2. 对于任何非终态状态(PendingRunningStopping)的记录:
    • 将其标记为 Failed { error: "coordinator restarted" }
    • 递增 generation 计数器
    • 将更新后的记录写回存储
  3. 终态记录(SucceededFailed)保持不变

这确保了崩溃的协调器遗留的过期数据流不会与正在运行的数据流混淆。运行这些数据流的守护进程将独立检测到协调器断开。

错误详情保留

当数据流失败时,Failed 状态包含实际的每节点错误消息,而非通用字符串:

Failed { error: "node-1: exited with code 137; node-2: failed to spawn node: binary not found" }

错误从所有守护进程的 DataflowDaemonResult.node_results 收集,格式为 node_id: error_message,以 ; 连接。

Schema 版本控制

redb 数据库包含一个带有 schema_version 键的 meta 表。打开时:

  • 如果不存在版本(新数据库),则写入当前版本
  • 如果存储的版本与二进制文件的版本匹配,数据库正常打开
  • 如果不匹配,数据库将以错误被拒绝

这防止了在 Adora 版本之间存储记录的序列化格式发生变化时的静默数据损坏。当前 schema 版本为 1

文件安全

在 Unix 系统上:

  • 数据库文件在创建后被设置为 0600(仅所有者可读写)
  • 默认目录(~/.adora/)被设置为 0700(仅所有者可访问)
  • 通过 redb:/path 提供的自定义路径会经过验证以拒绝 .. 组件

内部架构

#![allow(unused)]
fn main() {
// Store trait (libraries/coordinator-store/src/lib.rs)
pub trait CoordinatorStore: Send + Sync {
    fn put_dataflow(&self, record: &DataflowRecord) -> Result<()>;
    fn get_dataflow(&self, uuid: &Uuid) -> Result<Option<DataflowRecord>>;
    fn list_dataflows(&self) -> Result<Vec<DataflowRecord>>;
    fn delete_dataflow(&self, uuid: &Uuid) -> Result<()>;
    // ... daemon and build methods
}
}

RedbStore 实现使用三个 redb 表(daemonsdataflowsbuilds),以基于 UUID 的二进制键和 bincode 序列化的值。所有操作都是同步的(redb 是同步库);协调器直接从异步事件循环中调用它们,因为它们是快速的进程内操作。

bincode 反序列化限制为 64 MiB,以防止损坏的数据在长度前缀中编码巨大的分配大小。


完整 YAML 参考

# Dataflow-level settings
health_check_interval: 2.0    # health check sweep interval (default: 5.0s)

nodes:
  - id: sensor-node
    path: ./target/debug/sensor
    inputs:
      tick: adora/timer/millis/100
    outputs:
      - frames

  - id: processor
    path: ./target/debug/processor

    # Restart policy
    restart_policy: on-failure    # never | on-failure | always
    max_restarts: 5               # 0 = unlimited
    restart_delay: 1.0            # initial backoff delay (seconds)
    max_restart_delay: 30.0       # max backoff cap (seconds)
    restart_window: 300.0         # reset counter after N seconds

    # Health monitoring
    health_check_timeout: 30.0    # kill if no activity for N seconds

    inputs:
      frames:
        source: sensor-node/frames
        input_timeout: 5.0        # circuit breaker timeout (seconds)
        queue_size: 10            # input buffer size (default: 10)
    outputs:
      - result

使用场景

1. 间歇性硬件故障的摄像头流水线

摄像头驱动节点偶尔因 USB 断开而崩溃。处理流水线应在这些中断中存活,并在摄像头重新连接时恢复。

nodes:
  - id: camera-driver
    path: ./target/debug/camera-driver
    restart_policy: on-failure
    max_restarts: 0               # unlimited -- hardware failures are expected
    restart_delay: 2.0            # wait for USB to re-enumerate
    max_restart_delay: 30.0
    inputs:
      tick: adora/timer/millis/33  # ~30 FPS
    outputs:
      - frames

  - id: object-detector
    path: ./target/debug/detector
    inputs:
      frames:
        source: camera-driver/frames
        input_timeout: 5.0        # tolerate 5s camera outage
    outputs:
      - detections

  - id: planner
    path: ./target/debug/planner
    inputs:
      detections:
        source: object-detector/detections
        input_timeout: 10.0       # longer tolerance -- can plan with stale data
      lidar:
        source: lidar-driver/points
        input_timeout: 3.0

摄像头崩溃时会发生什么:

  1. camera-driver 以非零退出码退出
  2. 守护进程评估 on-failure 策略 -> 2 秒退避后重启
  3. 在中断期间,object-detector 在 5 秒后收到 InputClosed { id: "frames" }
  4. planner 在 10 秒后收到 InputClosed { id: "detections" }
  5. 摄像头重启,开始产生帧
  6. object-detector 收到新帧数据 + InputRecovered { id: "frames" }(熔断器恢复)
  7. planner 收到检测结果 + InputRecovered { id: "detections" }

规划节点端的处理:

#![allow(unused)]
fn main() {
use adora_node_api::{AdoraNode, Event, InputTracker};

let (mut node, mut events) = AdoraNode::init_from_env()?;
let mut tracker = InputTracker::new();

while let Some(event) = events.recv() {
    tracker.process_event(&event);

    match event {
        Event::Input { id, data, .. } => match id.as_ref() {
            "detections" => plan_with_detections(&data),
            "lidar" => update_lidar_map(&data),
            _ => {}
        },
        Event::InputClosed { id } => match id.as_ref() {
            "detections" => {
                // 摄像头流水线中断——仅使用 lidar 进行规划
                plan_lidar_only();
            }
            "lidar" => {
                // LiDAR 中断——使用最近已知的检测数据
                if let Some(stale) = tracker.last_value(&"detections".into()) {
                    plan_with_stale_detections(stale);
                }
            }
            _ => {}
        },
        Event::Stop(_) => break,
        _ => {}
    }
}
}

2. 带 OOM 崩溃的 ML 推理节点

ML 推理节点偶尔在大型输入上耗尽内存。它应该快速重启,但在重复失败后放弃(表明存在系统性问题)。

nodes:
  - id: ml-inference
    path: ./target/debug/ml-inference
    restart_policy: on-failure
    max_restarts: 3
    restart_delay: 0.5
    restart_window: 60.0          # 3 restarts per minute
    health_check_timeout: 60.0    # ML inference can be slow
    inputs:
      images:
        source: preprocessor/images
    outputs:
      - predictions

Behavior:

  • 节点因 OOM 崩溃 -> 0.5 秒后重启
  • 在另一个大型输入上再次崩溃 -> 1.0 秒后重启
  • 第三次崩溃 -> 2.0 秒后重启
  • 在 60 秒内第四次崩溃 -> 超过 max_restarts,节点永久失败
  • 如果节点在首次崩溃后稳定运行 60 秒,重启窗口重置,获得另外 3 次机会

3. 带优雅降级的多传感器融合

机器人融合来自多个传感器的数据。个别传感器可能会故障,但系统应以降低的能力继续运行。

nodes:
  - id: sensor-fusion
    path: ./target/debug/sensor-fusion
    inputs:
      camera:
        source: camera-node/frames
        input_timeout: 3.0
      lidar:
        source: lidar-node/points
        input_timeout: 3.0
      imu:
        source: imu-node/readings
        input_timeout: 1.0        # IMU is critical, short timeout
      gps:
        source: gps-node/fix
        input_timeout: 10.0       # GPS can be intermittent
    outputs:
      - fused-state

使用 InputTracker 的节点端:

#![allow(unused)]
fn main() {
use adora_node_api::{AdoraNode, Event, InputTracker};

let (mut node, mut events) = AdoraNode::init_from_env()?;
let mut tracker = InputTracker::new();

while let Some(event) = events.recv() {
    tracker.process_event(&event);

    match event {
        Event::Input { id, data, .. } => {
            // 处理来自任何传感器的新数据
            update_sensor(&id, &data);
            compute_and_send_fusion(&mut node, &tracker);
        }
        Event::InputClosed { id } => {
            // 传感器离线——调整融合权重
            eprintln!("sensor {id} offline, degrading");
            compute_and_send_fusion(&mut node, &tracker);
        }
        Event::InputRecovered { id } => {
            // 传感器恢复在线
            eprintln!("sensor {id} recovered");
        }
        Event::Stop(_) => break,
        _ => {}
    }
}

fn compute_and_send_fusion(node: &mut AdoraNode, tracker: &InputTracker) {
    // 有新数据时使用新数据,降级传感器使用过期缓存
    let camera = tracker.last_value(&"camera".into());
    let lidar = tracker.last_value(&"lidar".into());
    let imu = tracker.last_value(&"imu".into());

    if tracker.is_closed(&"imu".into()) {
        // IMU 是关键传感器——切换到紧急模式
        emergency_stop(node);
        return;
    }

    // 融合可用传感器,活跃传感器赋予更高权重
    let closed = tracker.closed_inputs();
    let active_count = 4 - closed.len();
    // ... 使用 active_count 进行置信度加权的融合逻辑
}
}

4. 长期运行的数据处理流水线

批处理流水线持续运行。处理节点偶尔因第三方库 bug 而挂起。健康监控检测并从这些挂起中恢复。

nodes:
  - id: data-ingest
    path: ./target/debug/ingest
    restart_policy: always        # always restart -- this is a long-running service
    max_restarts: 0               # unlimited
    restart_delay: 1.0
    inputs:
      tick: adora/timer/millis/1000
    outputs:
      - records

  - id: processor
    path: ./target/debug/processor
    restart_policy: on-failure
    max_restarts: 10
    restart_delay: 0.5
    restart_window: 600.0         # 10 restarts per 10 minutes
    health_check_timeout: 30.0    # kill if hung for 30s
    inputs:
      records: data-ingest/records
    outputs:
      - results

  - id: writer
    path: ./target/debug/writer
    restart_policy: on-failure
    max_restarts: 5
    restart_delay: 2.0            # give DB time to recover
    max_restart_delay: 60.0
    inputs:
      results:
        source: processor/results
        input_timeout: 60.0       # processor may be slow

处理器挂起时会发生什么:

  1. 处理器停止与守护进程通信
  2. 30 秒后,健康检查检测到挂起并终止进程
  3. health_check_kills 计数器递增
  4. 守护进程评估 on-failure -> 0.5 秒后重启
  5. 新处理器实例启动,从 data-ingest 恢复消费
  6. writer 可能在 60 秒超时期间收到了 InputClosed——如果重启足够快也可能未收到
  7. 如果 writer 确实收到了 InputClosed,当新结果到达时会收到 InputRecovered

5. 带守护进程故障检测的分布式部署

协调器监控守护进程健康状态的多机器部署。

Machine A (coordinator + daemon):  camera-driver, preprocessor
Machine B (daemon):                ml-inference, postprocessor
Machine C (daemon):                planner, actuator-driver

当机器 B 失去网络时会发生什么:

  1. 协调器到机器 B 的心跳失败
  2. 30 秒无响应后,协调器将机器 B 从活跃守护进程中移除
  3. 协调器向机器 A 和机器 C 广播 PeerDaemonDisconnected { daemon_id: "machine-B" }
  4. A 和 C 上的守护进程记录:WARN peer daemon disconnected daemon_id=machine-B
  5. A 和 C 上从机器 B 节点接收输入的节点收到 InputClosed 事件(通过其输入超时)
  6. CLI 查询 ConnectedMachines 仅显示 A 和 C 及其 last_heartbeat_ago_ms

6. 使用 redb 持久化的协调器崩溃恢复

长期运行的多守护进程部署,协调器必须在重启后不丢失数据流历史记录。

# Start coordinator with persistent store
adora coordinator --store redb

# In another terminal, start a dataflow
adora start examples/rust-dataflow/dataflow.yml --name my-pipeline --detach

# Coordinator crashes or is killed (e.g., OOM, hardware failure)
# ... time passes ...

# Restart coordinator with the same store
adora coordinator --store redb

重启时会发生什么:

  1. 协调器打开 ~/.adora/coordinator.redb 并读取持久化的数据流记录
  2. 发现 my-pipeline 状态为 Running
  3. 将其标记为 Failed { error: "coordinator restarted" },递增 generation
  4. 记录日志:INFO recovering stale dataflow <uuid> ("my-pipeline") -> marking as Failed
  5. adora list 现在显示 my-pipeline 及其最终状态和时间戳
  6. 守护进程独立检测到协调器断开并停止其节点
  7. 用户可以启动新的数据流——协调器已完全恢复运行

关键优势:协调器在重启后保留完整的数据流生命周期事件历史。如果不使用 --store redb,所有状态都会丢失,运维人员将无法得知崩溃前正在运行什么。

7. 使用 Always-Restart 的周期性批处理任务

一个处理批次并在完成后退出的节点。它应该重启以处理下一个批次。

nodes:
  - id: batch-processor
    path: ./target/debug/batch-proc
    restart_policy: always        # restart even on clean exit
    max_restarts: 0               # unlimited
    restart_delay: 10.0           # wait 10s between batches
    max_restart_delay: 10.0       # no exponential growth
    inputs:
      trigger: adora/timer/millis/1  # immediate first trigger
    outputs:
      - batch-result

节点处理一个批次,以退出码 0 退出,等待 10 秒,然后重启处理下一个。always 策略确保即使成功也会重启。设置 restart_delay == max_restart_delay 可获得恒定延迟。


最佳实践

on-failure 开始。仅对预期退出并重启的节点(如周期性批处理任务)使用 always

设置 max_restarts。无限重启可能掩盖 bug。从 3-5 开始,必要时增加。仅对崩溃不可避免的节点(硬件驱动、外部 API 客户端)使用 max_restarts: 0

使用 restart_window。防止永久重启循环。60-300 秒的窗口是典型值。没有窗口时,启动时崩溃的节点会立即耗尽其重启预算。

调优 restart_delay。从 0.5-1.0 秒开始。太短会导致抖动;太长会延迟恢复。将延迟匹配到节点的典型启动时间和故障根因:

  • USB/硬件重连:2-5 秒
  • 网络服务重连:1-3 秒
  • OOM/瞬态 bug:0.5-1.0 秒

宽裕地设置 health_check_timeout。应至少为节点最长预期处理时间的 2-3 倍。ML 推理节点可能需要 60 秒以上。如果太短,健康的节点会在正常处理期间被终止。

按输入设置 input_timeout。不是所有输入都需要相同的超时时间。对高频输入(IMU、摄像头)使用较短超时,对慢速/突发源(GPS、批处理结果)使用较长超时。一个好的起点是预期发布间隔的 3-5 倍。

对关键路径使用 InputTracker。当节点必须在输入降级时仍继续运行,使用 InputTracker 回退到缓存数据。这对传感器融合、规划和控制节点至关重要。

生产部署使用 --store redb。redb 后端确保协调器在崩溃和重启后保留数据流历史。内存默认值适合开发,但退出时会丢失所有状态。redb 文件很小(与数据流记录数量成比例),开销可忽略不计。

组合特性实现纵深防御

  • restart_policy + restart_delay -> 从节点崩溃中恢复
  • health_check_timeout -> 从挂起的节点中恢复
  • input_timeout -> 检测过期的上游数据
  • InputTracker -> 节点代码中的优雅降级
  • --store redb -> 协调器崩溃后存活