Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Adora CLI 参考

Adora(AI-Dora,面向数据流的机器人架构)是一个 100% Rust 框架,用于构建实时机器人和 AI 应用。本文档从终端用户和开发者两个角度介绍 adora CLI。

目录


快速开始

# Create a new project
adora new my-robot --kind dataflow --lang rust

# Run locally (no coordinator/daemon needed)
adora run dataflow.yml

# Or use coordinator/daemon for production
adora up
adora start dataflow.yml --attach
# Ctrl-C to stop
adora down

安装

cargo install adora-cli

从源码安装

cargo install --path binaries/cli --locked

验证

adora --version
adora status

核心概念

数据流

数据流是由类型化数据通道连接的节点有向图。节点产生输出,其他节点将其作为输入消费。框架负责数据路由、序列化(Apache Arrow)和生命周期管理。

执行模式

模式命令基础设施用例
本地adora run开发、测试、单机
分布式adora up + adora start协调器 + 守护进程生产、多机

组件角色

CLI  -->  Coordinator  -->  Daemon(s)  -->  Nodes / Operators
              (control plane)  (per machine)    (user code)
  • CLI:用户界面。发送命令、显示日志。
  • 协调器:跨机器编排数据流生命周期。
  • 守护进程:生成节点进程、管理 IPC、收集指标。
  • 节点:产生和消费 Arrow 数据的独立进程。
  • 算子:在共享运行时内运行的进程内代码(比节点更低延迟)。

数据格式

所有数据以 Apache Arrow 列式数组的形式在系统中流转。这使得同机节点间零拷贝共享内存传输和零序列化开销成为可能。


数据流描述符

数据流在 YAML 文件中定义。以下是完整的 schema:

最小示例

nodes:
  - id: sender
    path: sender.py
    outputs:
      - message

  - id: receiver
    path: receiver.py
    inputs:
      message: sender/message

完整模式

# Dataflow-level settings
health_check_interval: 5.0    # health check sweep interval in seconds (default: 5.0)

nodes:
  - id: my-node                 # 唯一标识符(必填)
    name: "My Node"             # 人类可读名称(可选)
    description: "..."          # 描述(可选)

    # --- Source (pick one) ---
    path: ./target/debug/my-node          # 本地可执行文件
    # path: https://example.com/node.zip  # 从 URL 下载
    # git: https://github.com/org/repo.git  # 从 git 构建
    #   branch: main            # git 分支(与 tag/rev 互斥)
    #   tag: v1.0               # git 标签
    #   rev: abc123             # git 提交哈希

    # --- Build ---
    build: cargo build -p my-node   # 构建用的 shell 命令(可选)

    # --- Inputs ---
    inputs:
      # Short form: source_node/output_id
      tick: adora/timer/millis/100
      data: other-node/output

      # Long form with options
      sensor_data:
        source: sensor/frames
        queue_size: 10            # input buffer size (default: 10)
        queue_policy: drop_oldest # or "backpressure" (buffers up to 10x queue_size)
        input_timeout: 5.0        # circuit breaker timeout in seconds

    # --- Outputs ---
    outputs:
      - processed
      - status

    # --- Environment ---
    env:
      MY_VAR: "value"
      FROM_ENV:
        __adora_env: HOST_VAR     # 从宿主环境读取
    args: "--verbose"             # 命令行参数

    # --- Fault tolerance ---
    restart_policy: on-failure    # never(默认)| on-failure | always
    max_restarts: 5               # 0 = unlimited
    restart_delay: 1.0            # initial backoff in seconds
    max_restart_delay: 30.0       # backoff cap in seconds
    restart_window: 300.0         # reset counter after N seconds
    health_check_timeout: 30.0    # kill if no activity for N seconds

    # --- Logging ---
    min_log_level: info           # 源级别过滤(守护进程端)
    send_stdout_as: raw_output    # 将原始标准输出路由为数据输出
    send_logs_as: log_entries     # 将结构化日志路由为数据输出
    max_log_size: "50MB"          # 达到此大小时轮转日志文件
    max_rotated_files: 5          # number of rotated files to keep (1-100)

    # --- Deployment ---
    _unstable_deploy:
      machine: A                  # 目标机器/守护进程 ID

# 调试设置
_unstable_debug:
  publish_all_messages_to_zenoh: true   # required for topic echo/hz/info

内置定时器节点

定时器是以固定间隔发出 tick 的虚拟节点:

inputs:
  tick: adora/timer/millis/100   # 每 100ms
  slow: adora/timer/millis/1000  # 每 1s
  fast: adora/timer/hz/30        # 30 Hz(约 33ms)

算子节点

算子在共享运行时中进程内运行(无独立进程):

nodes:
  # Single operator (shorthand)
  - id: detector
    operator:
      python: detect.py
      build: pip install -r requirements.txt
      inputs:
        image: camera/frames
      outputs:
        - bbox

  # Multiple operators sharing a runtime
  - id: runtime-node
    operators:
      - id: preprocessor
        shared-library: ../../target/debug/libpreprocess
        inputs:
          raw: sensor/data
        outputs:
          - processed
      - id: analyzer
        shared-library: ../../target/debug/libanalyze
        inputs:
          data: runtime-node/preprocessor/processed
        outputs:
          - result

分布式部署

使用 _unstable_deploy 将节点分配到特定机器:

nodes:
  - id: camera-driver
    _unstable_deploy:
      machine: robot-arm
    path: ./target/debug/camera
    outputs:
      - frames

  - id: ml-inference
    _unstable_deploy:
      machine: gpu-server
    path: ./target/debug/inference
    inputs:
      frames: camera-driver/frames
    outputs:
      - predictions

当节点位于不同机器时,通信自动从共享内存切换到 Zenoh 发布/订阅。


命令参考

生命周期命令

adora run

在本地运行数据流,无需协调器或守护进程。适合开发和测试。

adora run <PATH> [OPTIONS]
参数/标志默认描述
<PATH>必需数据流描述符 YAML 的路径
--stop-after <DURATION>在指定时间后自动停止(例如 30s5m
--uvfalse使用 uv 进行 Python 节点管理
--debugfalse启用调试主题(等同于 publish_all_messages_to_zenoh: true
--allow-shell-nodesfalse启用基于 shell 的节点执行
--log-level <LEVEL>stdout最低显示级别:error|warn|info|debug|trace|stdout
--log-format <FORMAT>pretty输出格式:pretty|json|compact
--log-filter <FILTER>按节点级别覆盖:"node1=debug,node2=warn"

Examples:

# Basic run
adora run dataflow.yml

# Stop after 10 seconds, only show warnings
adora run dataflow.yml --stop-after 10s --log-level warn

# Python dataflow with uv
adora run dataflow.yml --uv

# Debug one node, silence others
adora run dataflow.yml --log-level warn --log-filter "sensor=debug"

# JSON output for CI pipelines
adora run dataflow.yml --log-format json --stop-after 30s 2>test.json

adora up

在本地模式下启动协调器和守护进程。

adora up

以后台进程方式启动 adora coordinatoradora daemon。等待两者就绪后返回。幂等操作:如果已在运行,则不执行任何操作。

adora down(别名:adora destroy

拆卸协调器和守护进程。首先停止所有运行中的数据流。

adora down [OPTIONS]
标志默认描述
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

adora build

运行数据流描述符中定义的构建命令。

adora build <PATH> [OPTIONS]
标志默认描述
<PATH>必需数据流描述符路径
--uvfalse使用 uv 构建 Python
--localfalse强制本地构建(跳过协调器)
--strict-typesfalseTreat type warnings as errors (non-zero exit code)

Type checking: After expanding modules, build runs the same type checks as validate. Warnings are printed by default; use --strict-types (or set strict_types: true in the YAML) to fail the build on type mismatches. User-defined types in a types/ directory next to the dataflow are loaded automatically.

**构建策略:**如果节点有 _unstable_deploy 部分且协调器可达,构建将分发到目标机器。否则在本地构建。

**Git 源:**带有 git: 字段的节点在构建前会被克隆/更新。构建命令从 git 仓库根目录运行。

adora start

在运行中的协调器上启动数据流。

adora start <PATH> [OPTIONS]
标志默认描述
<PATH>必需数据流描述符路径
--name <NAME>, -n为数据流指定名称
--attachauto附加到日志流并等待完成
--detachauto启动后立即返回
--debugfalse启用调试主题(等同于 publish_all_messages_to_zenoh: true
--hot-reloadfalse监视 Python 文件并在变更时重新加载
--uvfalse使用 uv 管理 Python 节点
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

如果既未指定 --attach 也未指定 --detach:在 TTY 中运行时附加,否则分离。

**附加模式:**流式输出日志,优雅处理 Ctrl-C(第一次 = 停止,第二次 = 强制终止)。

**热重载:**监视 Python 算子源文件。文件变更时,向协调器发送重载请求,协调器将其传播到守护进程。

adora stop

停止运行中的数据流。

adora stop [UUID_OR_NAME] [OPTIONS]
标志默认描述
[UUID_OR_NAME]interactive数据流 UUID 或名称
--name <NAME>, -n替代名称指定方式
--grace-duration <DURATION>优雅关闭超时时间
--force, -ffalse立即终止
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

如果未提供标识符且在 TTY 中运行,将显示交互式选择器。

**停止序列:**发送 Event::Stop -> 等待优雅关闭时间 -> SIGTERM -> 强制终止。

adora restart

重启运行中的数据流(停止 + 使用存储的描述符重新启动)。无需 YAML 路径 – 协调器保留了原始描述符。

adora restart [UUID] [OPTIONS]
标志默认描述
[UUID]数据流 UUID
--name <NAME>, -n按名称而非 UUID 重启
--grace-duration <DURATION>停止阶段的优雅关闭超时时间
--force, -ffalse重启前强制终止
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

Examples:

# Restart by name
adora restart --name my-app

# Restart by UUID with forced stop
adora restart a1b2c3d4-... --force

adora record

将数据流消息录制到 .adorec 文件以供离线回放。完整工作流请参阅调试指南

adora record <DATAFLOW_YAML> [OPTIONS]
标志默认描述
<DATAFLOW_YAML>必需数据流描述符路径
-o, --output <PATH>recording_{timestamp}.adorec输出文件路径
--topics <TOPICS>all逗号分隔的 node/output 主题用于录制
--proxyfalse通过 WebSocket 流式传输而非在目标机器上录制
--output-yaml <PATH>写入修改后的 YAML 但不运行(干运行)

默认模式向数据流注入录制节点。--proxy 模式需要正在运行的数据流和 publish_all_messages_to_zenoh: true

adora replay

通过将源节点替换为回放节点来回放录制的 .adorec 文件。完整工作流请参阅调试指南

adora replay <FILE> [OPTIONS]
标志默认描述
<FILE>必需.adorec 录制文件路径
--speed <FLOAT>1.0回放速度(0 = 最大速度)
--loopfalse循环播放录制
--replace <NODE_IDS>所有已录制的逗号分隔的要替换的节点
--output-yaml <PATH>写入修改后的 YAML 但不运行(干运行)

监控命令

adora list(别名:adora ps

列出运行中的数据流及指标。

adora list [OPTIONS]
标志默认描述
--format <FMT>, -ftable输出格式:table|json
--status <STATUS>过滤:running|finished|failed
--name <PATTERN>按名称过滤(不区分大小写的子串匹配)
--sort-by <FIELD>排序:cpu|memory
--quiet, -qfalse仅打印 UUID
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

输出列: UUID、Name、Status、Nodes、CPU、Memory

adora logs

显示和跟踪数据流与节点的日志。

adora logs [UUID_OR_NAME] [NODE] [OPTIONS]
标志默认描述
[UUID_OR_NAME]数据流 UUID 或名称
[NODE]节点名称(除非使用 --all-nodes,否则必填)
--all-nodesfalse按时间戳合并所有节点的日志
--tail <N>all显示最后 N 行
--follow, -ffalse流式输出新日志条目
--localfalse从本地 out/ 目录读取
--since <DURATION>显示指定时间之后的日志
--until <DURATION>显示指定时间之前的日志
--level <LEVEL>stdout最低日志级别
--log-format <FORMAT>pretty输出格式
--log-filter <FILTER>按节点级别覆盖
--grep <PATTERN>不区分大小写的文本搜索
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

**过滤流水线:**读取/解析 -> 时间过滤 -> Grep -> Tail -> 显示

Examples:

# Follow all nodes live
adora logs my-dataflow --all-nodes --follow

# Last 50 errors from a specific node
adora logs my-dataflow sensor --level error --tail 50

# Search logs from last 5 minutes
adora logs my-dataflow --all-nodes --since 5m --grep "timeout"

# Read local files (no coordinator needed)
adora logs --local --all-nodes --tail 100

# Post-mortem analysis: errors in time window
adora logs --local sensor --since 1h --until 30m --level error

时间格式: 30(秒)、30s5m1h2d

adora inspect top(别名:adora top

节点资源使用的实时 TUI 监控器(类似 top)。

adora inspect top [OPTIONS]
adora top [OPTIONS]
标志默认描述
--refresh-interval <SECONDS>2更新间隔(最小值:1)
--oncefalse打印单次 JSON 快照并退出(用于脚本/CI)
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

需要交互式终端(除非使用 --once)。

Key动作
q / EscQuit
Up / k选择上一个节点
Down / j选择下一个节点
n按节点名称排序
c按 CPU 排序
m按内存排序
r强制刷新

列: NODE、STATUS、DATAFLOW、PID、CPU%、MEMORY (MB)、RESTARTS、QUEUE、NET TX、NET RX、I/O READ (MB/s)、I/O WRITE (MB/s)

  • STATUS:Running、Restarting、Degraded(输入中断)或 Failed
  • RESTARTS:每个节点的当前重启次数
  • QUEUE:节点输入队列中的待处理消息
  • NET TX/RX:通过 Zenoh 跨守护进程发送/接收的累计网络字节数

CPU 值为单核百分比(多核情况下可超过 100%)。指标来自守护进程,因此适用于分布式部署。

脚本示例:

# JSON snapshot for CI/monitoring pipelines
adora top --once | jq '.[].cpu_usage'

adora topic list

列出运行中数据流的所有主题(输出)。

adora topic list [OPTIONS]
标志默认描述
-d <DATAFLOW>, --dataflowinteractive数据流 UUID 或名称
--format <FMT>table输出格式:table|json

adora topic echo

订阅主题并实时显示消息。

adora topic echo [OPTIONS] [DATA...]
标志默认描述
-d <DATAFLOW>, --dataflow必需数据流 UUID 或名称
[DATA...]所有输出要回显的主题(例如 node1/output
--format <FMT>table输出格式:table|json

需要在描述符中设置 _unstable_debug.publish_all_messages_to_zenoh: true

adora topic hz

使用 TUI 仪表盘测量主题发布频率。

adora topic hz [OPTIONS] [DATA...]
标志默认描述
-d <DATAFLOW>, --dataflow必需数据流 UUID 或名称
[DATA...]所有输出要测量的主题
--window <SECONDS>10滑动窗口(最小值:1)

**需要交互式终端。**显示:Avg (ms)、Avg (Hz)、Min (ms)、Max (ms)、Std (ms),以及所选主题的速率火花图和直方图。

adora topic info

显示单个主题的详细元数据。

adora topic info [OPTIONS] DATA
标志默认描述
-d <DATAFLOW>, --dataflow必需数据流 UUID 或名称
DATA必需单个主题(例如 camera/image
--duration <SECONDS>5采集持续时间(最小值:1)

订阅主题指定的持续时间并报告:类型(Arrow schema)、发布者、订阅者、消息计数、带宽。

adora node

管理和检查数据流节点。

adora node list
adora node list [OPTIONS]

列出运行中数据流的节点及其状态、CPU、内存和重启次数。

列: NODE、STATUS、PID、CPU%、MEMORY (MB)、RESTARTS、DATAFLOW

adora node info

Show detailed information about a specific node including status, inputs, outputs, and metrics.

adora node info <NODE> [OPTIONS]
标志默认描述
<NODE>必需Node ID to inspect
-d <DATAFLOW>, --dataflowinteractive数据流 UUID 或名称
-f <FORMAT>, --formattable输出格式:table|json
adora node restart

Restart a single node within a running dataflow. The daemon stops the node process and respawns it.

adora node restart <NODE> [OPTIONS]
标志默认描述
<NODE>必需Node ID to restart
-d <DATAFLOW>, --dataflowinteractive数据流 UUID 或名称
--grace <DURATION>Grace period before force-killing the node
adora node stop

Stop a single node within a running dataflow without stopping the entire dataflow.

adora node stop <NODE> [OPTIONS]
标志默认描述
<NODE>必需Node ID to stop
-d <DATAFLOW>, --dataflowinteractive数据流 UUID 或名称
--grace <DURATION>Grace period before force-killing the node

adora topic pub

Publish JSON data to a topic in a running dataflow. Requires publish_all_messages_to_zenoh: true.

adora topic pub <TOPIC> [DATA] [OPTIONS]
标志默认描述
<TOPIC>必需Topic to publish to (format: node_id/output_id)
[DATA]JSON data to publish (required unless --file)
--file <PATH>Read data from a JSON file instead of command line
--count <N>1Number of messages to publish
-d <DATAFLOW>, --dataflow必需数据流 UUID 或名称

Examples:

# Publish a single value
adora topic pub -d my-app sensor/threshold '[42]'

# Publish from file, 10 times
adora topic pub -d my-app sensor/config --file config.json --count 10

adora param

Manage runtime parameters for nodes. Parameters are persisted in the coordinator store and optionally forwarded to running nodes.

adora param list

List all runtime parameters for a node.

adora param list <NODE> [OPTIONS]
标志默认描述
<NODE>必需节点 ID
-d <DATAFLOW>, --dataflowinteractive数据流 UUID 或名称
--format <FMT>table输出格式:table|json
adora param get

Get a single runtime parameter value.

adora param get <NODE> <KEY> [OPTIONS]
标志默认描述
<NODE>必需节点 ID
<KEY>必需Parameter key
-d <DATAFLOW>, --dataflowinteractive数据流 UUID 或名称
adora param set

Set a runtime parameter. The value is JSON. The parameter is stored in the coordinator and forwarded to the node if it is running.

adora param set <NODE> <KEY> <VALUE> [OPTIONS]
标志默认描述
<NODE>必需节点 ID
<KEY>必需Parameter key (max 256 bytes)
<VALUE>必需Parameter value as JSON (max 64KB serialized)
-d <DATAFLOW>, --dataflowinteractive数据流 UUID 或名称

Examples:

# Set a numeric parameter
adora param set -d my-app sensor threshold 42

# Set a string parameter
adora param set -d my-app camera resolution '"1080p"'

# Set a complex parameter
adora param set -d my-app detector config '{"confidence": 0.8, "nms": 0.5}'
adora param delete

Delete a runtime parameter.

adora param delete <NODE> <KEY> [OPTIONS]
标志默认描述
<NODE>必需节点 ID
<KEY>必需Parameter key
-d <DATAFLOW>, --dataflowinteractive数据流 UUID 或名称

adora doctor

Diagnose environment, coordinator/daemon connectivity, and optionally validate a dataflow YAML.

adora doctor [OPTIONS]
标志默认描述
--dataflow <PATH>Path to a dataflow YAML to validate

Checks performed:

  1. Coordinator reachability
  2. Daemon connectivity
  3. Active dataflow status
  4. Dataflow YAML validation (if --dataflow provided)

Examples:

# Basic health check
adora doctor

# Check environment + validate a dataflow
adora doctor --dataflow dataflow.yml

adora trace list

列出协调器捕获的最近追踪。协调器在内存中捕获 adora_coordinatoradora_core crate 的 span(最多 4096 个 span)。无需外部追踪基础设施。

adora trace list [OPTIONS]
标志默认描述
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

输出列: TRACE ID(前 12 个字符)、ROOT SPAN、SPANS、STARTED、DURATION

Example:

adora trace list
TRACE ID      ROOT SPAN          SPANS  STARTED              DURATION
a1b2c3d4e5f6  spawn_dataflow     12     2026-03-01 10:30:05  1.234s
f8e7d6c5b4a3  build_dataflow     5      2026-03-01 10:29:58  0.500s

adora trace view

以缩进树形式查看特定追踪的 span。支持追踪 ID 的前缀匹配。

adora trace view <TRACE_ID> [OPTIONS]
参数/标志默认描述
<TRACE_ID>必需完整追踪 ID 或唯一前缀
--coordinator-addr <IP>127.0.0.1协调器地址
--coordinator-port <PORT>6013协调器端口

Example:

adora trace view a1b2c3d4
spawn_dataflow [INFO 1.234s] {build_id="abc", session_id="def"}
  build_dataflow [INFO 0.500s]
    download_node [DEBUG 0.200s] {url="..."}
  start_inner [INFO 0.734s]
    spawn_node [INFO 0.100s] {node_id="camera"}
    spawn_node [INFO 0.080s] {node_id="detector"}

追踪 ID 支持前缀匹配:如果前缀唯一标识一个追踪,则自动解析。如果有歧义,系统会提示您使用更长的前缀。


设置命令

adora status(别名:adora check

检查系统健康状态和连接性。

adora status [OPTIONS]

报告协调器连接状态、守护进程状态和活跃数据流数量。

adora new

从模板生成新的项目或节点。

adora new <NAME> [OPTIONS]
标志默认描述
<NAME>必需项目或节点名称
--kind <KIND>dataflowdataflow|node
--lang <LANG>rustrust|python|c|cxx

adora expand

Expand module references in a dataflow and print the resulting flat YAML. Useful for debugging module composition.

adora expand <PATH> [OPTIONS]
标志默认描述
<PATH>必需Dataflow descriptor (or module file with --module)
--modulefalseValidate a standalone module file instead of a full dataflow

Examples:

# Expand a dataflow with modules
adora expand dataflow.yml

# Validate a module file
adora expand --module modules/navigation.module.yml

See the Modules Guide for full documentation on module composition.

adora graph

以图形方式可视化数据流。

adora graph <PATH> [OPTIONS]
标志默认描述
<PATH>必需数据流描述符路径
--mermaidfalse输出 Mermaid 图表文本
--openfalse在浏览器中打开 HTML

Without --mermaid, generates an interactive HTML file using mermaid.js. When outputs have type annotations, edge labels include the type name (e.g. image [Image]).

# Generate HTML
adora graph dataflow.yml --open

# Generate Mermaid for GitHub markdown
adora graph dataflow.yml --mermaid

adora validate

Validate a dataflow YAML file and check type annotations.

adora validate <PATH> [OPTIONS]
标志默认描述
<PATH>必需数据流描述符路径
--strict-typesfalseTreat warnings as errors (non-zero exit code for CI)

Checks:

  1. Key existence: output_types/input_types keys exist in the corresponding outputs/inputs lists
  2. URN resolution: All type URNs resolve in the standard or user-defined type library
  3. Edge compatibility: Connected edges have compatible types (exact match, widening, or user-defined rules)
  4. Parameterized types: Parameter mismatches (e.g. AudioFrame[sample_type=f32] vs AudioFrame[sample_type=i16])
  5. Timer auto-typing: Timer inputs are automatically typed as std/core/v1/UInt64
  6. Type inference: When only upstream annotates a type, it is inferred on the downstream input
  7. Metadata patterns: output_metadata keys and pattern shorthands are validated
  8. Schema compatibility: Struct types are checked at the field level (missing/wrong fields)

User-defined types in a types/ directory next to the dataflow are loaded automatically.

# Validate with warnings
adora validate dataflow.yml

# Strict mode for CI (exit 1 on warnings)
adora validate --strict-types dataflow.yml

See the Type Annotations Guide for the full type library and usage details.


实用命令

adora completion

生成 shell 补全脚本。

adora completion [SHELL]

省略时自动检测 shell。支持:bash、zsh、fish、elvish、powershell。

# Bash
eval "$(adora completion bash)"
echo 'eval "$(adora completion bash)"' >> ~/.bashrc

# Zsh
eval "$(adora completion zsh)"
echo 'eval "$(adora completion zsh)"' >> ~/.zshrc

# Fish
adora completion fish > ~/.config/fish/completions/adora.fish

adora system

系统管理命令。

adora system status [OPTIONS]

目前提供 status 作为子命令(等同于 adora status)。


自管理命令

adora self update

检查并安装 CLI 更新。

adora self update [--check-only]

从 GitHub releases(dora-rs/adora)下载。

adora self uninstall

从系统中移除 CLI。

adora self uninstall [--force]

不使用 --force 时,会提示确认(需要 TTY)。依次尝试 uv pip uninstallpip uninstall,然后二进制自删除。


环境变量

所有环境变量作为备选值。CLI 标志始终优先。

变量默认命令描述
ADORA_COORDINATOR_ADDR127.0.0.1所有协调器命令协调器 IP 地址
ADORA_COORDINATOR_PORT6013所有协调器命令协调器 WebSocket 端口
ADORA_LOG_LEVELstdoutrunlogs默认最低日志级别
ADORA_LOG_FORMATprettyrunlogs默认输出格式
ADORA_LOG_FILTERrunlogs默认的按节点级别覆盖
ADORA_ALLOW_SHELL_NODESrun启用 shell 节点执行
ADORA_RUNTIME_TYPE_CHECKrun, startRuntime type checking: warn (log mismatches) or error (fail on mismatch). See Type Annotations
# Set defaults for a development session
export ADORA_COORDINATOR_ADDR=192.168.1.10
export ADORA_LOG_LEVEL=info
export ADORA_LOG_FORMAT=compact

架构指南

本节面向希望了解框架内部机制、扩展框架或调试问题的开发者。

通信栈

                    ┌─────────────────────────────────────┐
                    │           CLI (adora)                │
                    │   WebSocket (JSON request/reply)     │
                    └─────────────┬───────────────────────┘
                                  │
                    ┌─────────────▼───────────────────────┐
                    │        Coordinator                   │
                    │   WebSocket control + daemon mgmt    │
                    │   State: InMemoryStore | RedbStore   │
                    └──┬──────────────────────────────┬───┘
                       │                              │
          ┌────────────▼──────────┐     ┌─────────────▼──────────┐
          │     Daemon A          │     │     Daemon B           │
          │  (machine: robot)     │     │  (machine: gpu-server) │
          │                       │     │                        │
          │  ┌─────┐  ┌─────┐    │     │  ┌──────┐  ┌───────┐  │
          │  │Node1│  │Node2│    │     │  │Node3 │  │Node4  │  │
          │  └──┬──┘  └──┬──┘    │     │  └──┬───┘  └───┬───┘  │
          │     │shmem    │shmem  │     │     │shmem      │shmem │
          │     └────┬────┘       │     │     └─────┬─────┘      │
          └──────────┼────────────┘     └───────────┼────────────┘
                     │                              │
                     └──────── Zenoh pub/sub ────────┘
                              (cross-machine)

协议层

层级TransportFormatUse
CLI <-> 协调器WebSocketJSON(ControlRequest/Reply)命令、日志流
协调器 <-> 守护进程WebSocketJSON(DaemonCoordinatorEvent)节点生命周期、指标
守护进程 <-> 节点(小消息)TCP / Unix socket自定义二进制控制消息、小数据
守护进程 <-> 节点(大消息)共享内存零拷贝 Arrow数据消息 > 4KB
守护进程 <-> 守护进程Zenoh 发布/订阅Arrow + 元数据跨机器数据路由

协调器内部机制

协调器是一个事件驱动的异步服务器:

Event Sources:
  - CLI WebSocket connections (ControlRequest)
  - Daemon WebSocket connections (DaemonEvent)
  - Heartbeat timer (3s interval)
  - External events (for embedding)

Event Loop:
  merge_all(cli_events, daemon_events, heartbeat, external)
    -> handle_event()
    -> update state
    -> persist to store (if redb)
    -> send replies

关键类型:

#![allow(unused)]
fn main() {
// State
RunningDataflow { uuid, name, descriptor, daemons, node_metrics, ... }
RunningBuild    { build_id, errors, log_subscribers, pending_results, ... }
DaemonConnection { sender, pending_replies, last_heartbeat }

// Store trait
trait CoordinatorStore: Send + Sync {
    fn put_dataflow(&self, record: &DataflowRecord) -> Result<()>;
    fn get_dataflow(&self, uuid: &Uuid) -> Result<Option<DataflowRecord>>;
    fn list_dataflows(&self) -> Result<Vec<DataflowRecord>>;
    // ... daemon and build methods
}
}

存储后端:

  • memory(默认):内存存储,重启后丢失。
  • redb:持久化到磁盘(~/.adora/coordinator.redb)。崩溃后可恢复。需要 redb-backend feature。
adora coordinator --store redb
adora coordinator --store redb:/custom/path.redb

守护进程内部机制

守护进程管理单台机器上的节点进程:

Per Node:
  1. Build (if build command specified)
  2. Spawn process with ADORA_NODE_CONFIG env var
  3. Node registers via TCP/shmem handshake
  4. Route inputs/outputs between nodes
  5. Collect metrics (CPU, memory, I/O)
  6. Handle restart policy on exit
  7. Forward logs to coordinator

Communication:
  - Shared memory for messages > 4KB (zero-copy)
  - TCP for control messages and small data
  - flume channels for internal event routing

指标采集:

#![allow(unused)]
fn main() {
struct NodeMetrics {
    pid: u32,
    cpu_usage: f32,      // 单核百分比
    memory_mb: f64,
    disk_read_mb_s: Option<f64>,
    disk_write_mb_s: Option<f64>,
    status: NodeStatus,  // Running | Restarting | Degraded | Failed
    restart_count: u32,
    pending_messages: u64,
}
}

消息类型

所有组件间消息定义在 libraries/message/ 中:

#![allow(unused)]
fn main() {
// 节点标识
struct NodeId(String);      // [a-zA-Z0-9_.-]
struct DataId(String);      // 相同验证
type DataflowId = uuid::Uuid;

// 数据元数据
struct Metadata {
    timestamp: uhlc::Timestamp,    // 混合逻辑时钟
    type_info: ArrowTypeInfo,      // Arrow schema
    parameters: MetadataParameters, // 自定义键值对
}

// 节点事件(守护进程 -> 节点)
enum NodeEvent {
    Stop,
    Reload { operator_id },
    Input { id, metadata, data },
    InputClosed { id },
    InputRecovered { id },
    NodeRestarted { id },
    AllInputsClosed,
}
}

时间戳

Adora 使用统一混合逻辑时钟(UHLC)实现分布式因果关系。每条消息携带一个 uhlc::Timestamp,无需同步时钟即可在跨机器间保持因果顺序。

零拷贝共享内存

对于大消息(> 4KB),守护进程使用共享内存区域:

  1. 发送节点向守护进程请求共享内存槽位
  2. 守护进程分配区域并返回 ID
  3. 发送方将 Arrow 数据直接写入共享内存
  4. 守护进程通知接收节点区域 ID
  5. 接收方直接从共享内存读取(零拷贝)
  6. 接收方完成后发送释放令牌

对于大数据载荷,这比 ROS2 实现了 10-17 倍的延迟降低。


编写节点

Rust 节点

use adora_node_api::{AdoraNode, Event, IntoArrow};
use adora_core::config::DataId;

fn main() -> eyre::Result<()> {
    let (mut node, mut events) = AdoraNode::init_from_env()?;

    let output = DataId::from("result".to_owned());

    while let Some(event) = events.recv() {
        match event {
            Event::Input { id, metadata, data } => {
                // 处理输入数据(Arrow 数组)
                let result: u64 = 42;
                node.send_output(
                    output.clone(),
                    metadata.parameters,
                    result.into_arrow(),
                )?;
            }
            Event::Stop(_) => break,
            Event::InputClosed { id } => {
                eprintln!("input {id} closed");
            }
            Event::InputRecovered { id } => {
                eprintln!("input {id} recovered");
            }
            _ => {}
        }
    }
    Ok(())
}

Cargo.toml:

[dependencies]
adora-node-api = { workspace = true }
eyre = "0.6"

Python 节点

import pyarrow as pa
from adora import Node

node = Node()

for event in node:
    if event["type"] == "INPUT":
        # event["value"] is a PyArrow array
        values = event["value"].to_pylist()
        result = pa.array([sum(values)])
        node.send_output("result", result)
    elif event["type"] == "STOP":
        break

C 节点

#include "node_api.h"

int main() {
    void *ctx = init_adora_context_from_env();
    // ... 使用 adora_next_event / adora_send_output 的事件循环
    free_adora_context(ctx);
    return 0;
}

节点日志

节点可以发出结构化日志:

Rust:

#![allow(unused)]
fn main() {
// 通过 tracing(推荐)
tracing::info!("processing frame {}", frame_id);

// 通过节点 API
node.log_info("processing complete");
node.log_with_fields("info", "reading", None, Some(&fields));
}

Python:

import logging
logging.info("processing frame %d", frame_id)

# Or via node API
node.log("info", "processing complete")

编写算子

算子在共享运行时内以进程内方式运行,避免了进程启动开销。

Rust 算子

#![allow(unused)]
fn main() {
use adora_operator_api::{register_operator, AdoraOperator, AdoraOutputSender, AdoraStatus, Event};

#[register_operator]
#[derive(Default)]
pub struct MyOperator {
    counter: u32,
}

impl AdoraOperator for MyOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut AdoraOutputSender,
    ) -> Result<AdoraStatus, String> {
        match event {
            Event::Input { id, data } => {
                self.counter += 1;
                output_sender.send(
                    "count".to_string(),
                    arrow::array::UInt32Array::from(vec![self.counter]),
                )?;
                Ok(AdoraStatus::Continue)
            }
            Event::Stop => Ok(AdoraStatus::Stop),
            _ => Ok(AdoraStatus::Continue),
        }
    }
}
}

Cargo.toml:

[lib]
crate-type = ["cdylib"]

[dependencies]
adora-operator-api = { workspace = true }
arrow = "53"

Python 算子

nodes:
  - id: my-node
    operator:
      python: my_operator.py
      inputs:
        data: source/output
      outputs:
        - result
# my_operator.py
class Operator:
    def __init__(self):
        self.counter = 0

    def on_event(self, event, send_output):
        if event["type"] == "INPUT":
            self.counter += 1
            send_output("result", pa.array([self.counter]))

分布式部署

设置

# Machine A (coordinator + daemon)
adora up

# Machine B (daemon only, pointing to coordinator on Machine A)
adora daemon --interface 0.0.0.0 --coordinator-addr 192.168.1.10 --machine-id B

# Machine C (same)
adora daemon --interface 0.0.0.0 --coordinator-addr 192.168.1.10 --machine-id C

带机器分配的数据流

nodes:
  - id: camera
    _unstable_deploy:
      machine: robot
    path: ./camera-driver
    outputs:
      - frames

  - id: inference
    _unstable_deploy:
      machine: gpu-server
    path: ./ml-model
    inputs:
      frames: camera/frames
    outputs:
      - predictions

  - id: actuator
    _unstable_deploy:
      machine: robot
    path: ./actuator-driver
    inputs:
      commands: inference/predictions

构建和启动

# From any machine with coordinator access
adora build dataflow.yml       # distributed build on target machines
adora start dataflow.yml --name my-robot --attach

监控

# Resource usage across all machines
adora top

# Logs from any node regardless of machine
adora logs my-robot inference --follow

# List all dataflows
adora list

协调器持久化

在生产环境中,使用 redb 存储后端以使协调器在重启后存续:

adora coordinator --store redb

状态持久化到 ~/.adora/coordinator.redb。重启时,过期的数据流会被标记为失败,协调器恢复正常运行。

For managed cluster deployments (cluster.yml, SSH-based lifecycle, label scheduling, systemd services, rolling upgrades), see the Distributed Deployment Guide.


故障排除

关于涵盖录制/回放工作流、主题检查、资源监控和端到端调试场景的综合调试指南,请参阅调试与可观测性指南

常见问题

“Could not connect to adora-coordinator”

  • 先运行 adora up,或检查 ADORA_COORDINATOR_ADDR/ADORA_COORDINATOR_PORT
  • 使用 adora status 验证

“publish_all_messages_to_zenoh not enabled”

  • 使用 --debug 标志:adora start dataflow.yml --debugadora run dataflow.yml --debug

  • 或在数据流 YAML 中添加:

    _unstable_debug:
      publish_all_messages_to_zenoh: true
    
  • topic echotopic hztopic info 需要此设置

adora top requires an interactive terminal”

  • 这些 TUI 命令需要真实终端(非管道输出)
  • 同样适用于 topic hz

节点未接收到输入

  • 检查输出名称是否匹配:source_node/output_id
  • 验证源节点在其 outputs: 数组中列出了该输出
  • 使用 adora topic list 检查可用主题

日志未显示

  • 检查 --log-level 设置(默认 stdout 显示所有内容)
  • 检查 YAML 中的 min_log_level(在源端过滤)
  • 分布式环境下:验证协调器/守护进程的连接性

使用 git 源构建失败

  • 验证 git: URL 是否可访问
  • 检查 branchtagrev 是否存在
  • 构建命令从 git 仓库根目录运行,而非数据流目录

调试工作流

# 1. Full environment diagnosis
adora doctor --dataflow dataflow.yml

# 2. Start with verbose logging and debug topics
adora run dataflow.yml --log-level trace --debug

# 3. Inspect a specific node
adora node info -d my-dataflow problem-node

# 4. Monitor specific node logs
adora logs my-dataflow problem-node --follow --level debug

# 5. Check resource usage
adora top

# 6. Inspect topic data
adora topic echo -d my-dataflow problem-node/output

# 7. Publish test data to a topic
adora topic pub -d my-dataflow problem-node/input '[1, 2, 3]'

# 8. Measure frequencies
adora topic hz -d my-dataflow --window 5

# 9. View/modify runtime parameters
adora param list -d my-dataflow problem-node
adora param set -d my-dataflow problem-node threshold 42

# 10. Restart a misbehaving node without stopping the dataflow
adora node restart -d my-dataflow problem-node

# 11. View coordinator traces (no external infra needed)
adora trace list
adora trace view <trace-id-prefix>

# 12. Visualize dataflow graph
adora graph dataflow.yml --open

日志文件位置

out/
  <dataflow-uuid>/
    log_<node-id>.jsonl          # current log
    log_<node-id>.1.jsonl        # rotated (previous)
    log_<node-id>.2.jsonl        # rotated (older)

直接读取:

adora logs --local --all-nodes
adora logs --local <node-name> --tail 50