Adora CLI 参考
Adora(AI-Dora,面向数据流的机器人架构)是一个 100% Rust 框架,用于构建实时机器人和 AI 应用。本文档从终端用户和开发者两个角度介绍 adora CLI。
目录
- 快速开始
- Installation
- 核心概念
- 数据流描述符
- 命令参考
- 环境变量
- 架构指南
- 编写节点
- 编写算子
- Distributed Deployments – see also Distributed Deployment Guide for cluster management, scheduling, and operations
- Troubleshooting
- 调试与可观测性 – 独立指南,涵盖录制/回放、主题检查、日志分析和资源监控
- API References: Rust | Python | C | C++
快速开始
# Create a new project
adora new my-robot --kind dataflow --lang rust
# Run locally (no coordinator/daemon needed)
adora run dataflow.yml
# Or use coordinator/daemon for production
adora up
adora start dataflow.yml --attach
# Ctrl-C to stop
adora down
安装
From crates.io (recommended)
cargo install adora-cli
从源码安装
cargo install --path binaries/cli --locked
验证
adora --version
adora status
核心概念
数据流
数据流是由类型化数据通道连接的节点有向图。节点产生输出,其他节点将其作为输入消费。框架负责数据路由、序列化(Apache Arrow)和生命周期管理。
执行模式
| 模式 | 命令 | 基础设施 | 用例 |
|---|---|---|---|
| 本地 | adora run | 无 | 开发、测试、单机 |
| 分布式 | adora up + adora start | 协调器 + 守护进程 | 生产、多机 |
组件角色
CLI --> Coordinator --> Daemon(s) --> Nodes / Operators
(control plane) (per machine) (user code)
- CLI:用户界面。发送命令、显示日志。
- 协调器:跨机器编排数据流生命周期。
- 守护进程:生成节点进程、管理 IPC、收集指标。
- 节点:产生和消费 Arrow 数据的独立进程。
- 算子:在共享运行时内运行的进程内代码(比节点更低延迟)。
数据格式
所有数据以 Apache Arrow 列式数组的形式在系统中流转。这使得同机节点间零拷贝共享内存传输和零序列化开销成为可能。
数据流描述符
数据流在 YAML 文件中定义。以下是完整的 schema:
最小示例
nodes:
- id: sender
path: sender.py
outputs:
- message
- id: receiver
path: receiver.py
inputs:
message: sender/message
完整模式
# Dataflow-level settings
health_check_interval: 5.0 # health check sweep interval in seconds (default: 5.0)
nodes:
- id: my-node # 唯一标识符(必填)
name: "My Node" # 人类可读名称(可选)
description: "..." # 描述(可选)
# --- Source (pick one) ---
path: ./target/debug/my-node # 本地可执行文件
# path: https://example.com/node.zip # 从 URL 下载
# git: https://github.com/org/repo.git # 从 git 构建
# branch: main # git 分支(与 tag/rev 互斥)
# tag: v1.0 # git 标签
# rev: abc123 # git 提交哈希
# --- Build ---
build: cargo build -p my-node # 构建用的 shell 命令(可选)
# --- Inputs ---
inputs:
# Short form: source_node/output_id
tick: adora/timer/millis/100
data: other-node/output
# Long form with options
sensor_data:
source: sensor/frames
queue_size: 10 # input buffer size (default: 10)
queue_policy: drop_oldest # or "backpressure" (buffers up to 10x queue_size)
input_timeout: 5.0 # circuit breaker timeout in seconds
# --- Outputs ---
outputs:
- processed
- status
# --- Environment ---
env:
MY_VAR: "value"
FROM_ENV:
__adora_env: HOST_VAR # 从宿主环境读取
args: "--verbose" # 命令行参数
# --- Fault tolerance ---
restart_policy: on-failure # never(默认)| on-failure | always
max_restarts: 5 # 0 = unlimited
restart_delay: 1.0 # initial backoff in seconds
max_restart_delay: 30.0 # backoff cap in seconds
restart_window: 300.0 # reset counter after N seconds
health_check_timeout: 30.0 # kill if no activity for N seconds
# --- Logging ---
min_log_level: info # 源级别过滤(守护进程端)
send_stdout_as: raw_output # 将原始标准输出路由为数据输出
send_logs_as: log_entries # 将结构化日志路由为数据输出
max_log_size: "50MB" # 达到此大小时轮转日志文件
max_rotated_files: 5 # number of rotated files to keep (1-100)
# --- Deployment ---
_unstable_deploy:
machine: A # 目标机器/守护进程 ID
# 调试设置
_unstable_debug:
publish_all_messages_to_zenoh: true # required for topic echo/hz/info
内置定时器节点
定时器是以固定间隔发出 tick 的虚拟节点:
inputs:
tick: adora/timer/millis/100 # 每 100ms
slow: adora/timer/millis/1000 # 每 1s
fast: adora/timer/hz/30 # 30 Hz(约 33ms)
算子节点
算子在共享运行时中进程内运行(无独立进程):
nodes:
# Single operator (shorthand)
- id: detector
operator:
python: detect.py
build: pip install -r requirements.txt
inputs:
image: camera/frames
outputs:
- bbox
# Multiple operators sharing a runtime
- id: runtime-node
operators:
- id: preprocessor
shared-library: ../../target/debug/libpreprocess
inputs:
raw: sensor/data
outputs:
- processed
- id: analyzer
shared-library: ../../target/debug/libanalyze
inputs:
data: runtime-node/preprocessor/processed
outputs:
- result
分布式部署
使用 _unstable_deploy 将节点分配到特定机器:
nodes:
- id: camera-driver
_unstable_deploy:
machine: robot-arm
path: ./target/debug/camera
outputs:
- frames
- id: ml-inference
_unstable_deploy:
machine: gpu-server
path: ./target/debug/inference
inputs:
frames: camera-driver/frames
outputs:
- predictions
当节点位于不同机器时,通信自动从共享内存切换到 Zenoh 发布/订阅。
命令参考
生命周期命令
adora run
在本地运行数据流,无需协调器或守护进程。适合开发和测试。
adora run <PATH> [OPTIONS]
| 参数/标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | 数据流描述符 YAML 的路径 |
--stop-after <DURATION> | 在指定时间后自动停止(例如 30s、5m) | |
--uv | false | 使用 uv 进行 Python 节点管理 |
--debug | false | 启用调试主题(等同于 publish_all_messages_to_zenoh: true) |
--allow-shell-nodes | false | 启用基于 shell 的节点执行 |
--log-level <LEVEL> | stdout | 最低显示级别:error|warn|info|debug|trace|stdout |
--log-format <FORMAT> | pretty | 输出格式:pretty|json|compact |
--log-filter <FILTER> | 按节点级别覆盖:"node1=debug,node2=warn" |
Examples:
# Basic run
adora run dataflow.yml
# Stop after 10 seconds, only show warnings
adora run dataflow.yml --stop-after 10s --log-level warn
# Python dataflow with uv
adora run dataflow.yml --uv
# Debug one node, silence others
adora run dataflow.yml --log-level warn --log-filter "sensor=debug"
# JSON output for CI pipelines
adora run dataflow.yml --log-format json --stop-after 30s 2>test.json
adora up
在本地模式下启动协调器和守护进程。
adora up
以后台进程方式启动 adora coordinator 和 adora daemon。等待两者就绪后返回。幂等操作:如果已在运行,则不执行任何操作。
adora down(别名:adora destroy)
拆卸协调器和守护进程。首先停止所有运行中的数据流。
adora down [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
adora build
运行数据流描述符中定义的构建命令。
adora build <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | 数据流描述符路径 |
--uv | false | 使用 uv 构建 Python |
--local | false | 强制本地构建(跳过协调器) |
--strict-types | false | Treat type warnings as errors (non-zero exit code) |
Type checking: After expanding modules, build runs the same type checks as validate. Warnings are printed by default; use --strict-types (or set strict_types: true in the YAML) to fail the build on type mismatches. User-defined types in a types/ directory next to the dataflow are loaded automatically.
**构建策略:**如果节点有 _unstable_deploy 部分且协调器可达,构建将分发到目标机器。否则在本地构建。
**Git 源:**带有 git: 字段的节点在构建前会被克隆/更新。构建命令从 git 仓库根目录运行。
adora start
在运行中的协调器上启动数据流。
adora start <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | 数据流描述符路径 |
--name <NAME>, -n | 为数据流指定名称 | |
--attach | auto | 附加到日志流并等待完成 |
--detach | auto | 启动后立即返回 |
--debug | false | 启用调试主题(等同于 publish_all_messages_to_zenoh: true) |
--hot-reload | false | 监视 Python 文件并在变更时重新加载 |
--uv | false | 使用 uv 管理 Python 节点 |
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
如果既未指定 --attach 也未指定 --detach:在 TTY 中运行时附加,否则分离。
**附加模式:**流式输出日志,优雅处理 Ctrl-C(第一次 = 停止,第二次 = 强制终止)。
**热重载:**监视 Python 算子源文件。文件变更时,向协调器发送重载请求,协调器将其传播到守护进程。
adora stop
停止运行中的数据流。
adora stop [UUID_OR_NAME] [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
[UUID_OR_NAME] | interactive | 数据流 UUID 或名称 |
--name <NAME>, -n | 替代名称指定方式 | |
--grace-duration <DURATION> | 优雅关闭超时时间 | |
--force, -f | false | 立即终止 |
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
如果未提供标识符且在 TTY 中运行,将显示交互式选择器。
**停止序列:**发送 Event::Stop -> 等待优雅关闭时间 -> SIGTERM -> 强制终止。
adora restart
重启运行中的数据流(停止 + 使用存储的描述符重新启动)。无需 YAML 路径 – 协调器保留了原始描述符。
adora restart [UUID] [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
[UUID] | 数据流 UUID | |
--name <NAME>, -n | 按名称而非 UUID 重启 | |
--grace-duration <DURATION> | 停止阶段的优雅关闭超时时间 | |
--force, -f | false | 重启前强制终止 |
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
Examples:
# Restart by name
adora restart --name my-app
# Restart by UUID with forced stop
adora restart a1b2c3d4-... --force
adora record
将数据流消息录制到 .adorec 文件以供离线回放。完整工作流请参阅调试指南。
adora record <DATAFLOW_YAML> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<DATAFLOW_YAML> | 必需 | 数据流描述符路径 |
-o, --output <PATH> | recording_{timestamp}.adorec | 输出文件路径 |
--topics <TOPICS> | all | 逗号分隔的 node/output 主题用于录制 |
--proxy | false | 通过 WebSocket 流式传输而非在目标机器上录制 |
--output-yaml <PATH> | 写入修改后的 YAML 但不运行(干运行) |
默认模式向数据流注入录制节点。--proxy 模式需要正在运行的数据流和 publish_all_messages_to_zenoh: true。
adora replay
通过将源节点替换为回放节点来回放录制的 .adorec 文件。完整工作流请参阅调试指南。
adora replay <FILE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<FILE> | 必需 | .adorec 录制文件路径 |
--speed <FLOAT> | 1.0 | 回放速度(0 = 最大速度) |
--loop | false | 循环播放录制 |
--replace <NODE_IDS> | 所有已录制的 | 逗号分隔的要替换的节点 |
--output-yaml <PATH> | 写入修改后的 YAML 但不运行(干运行) |
监控命令
adora list(别名:adora ps)
列出运行中的数据流及指标。
adora list [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--format <FMT>, -f | table | 输出格式:table|json |
--status <STATUS> | 过滤:running|finished|failed | |
--name <PATTERN> | 按名称过滤(不区分大小写的子串匹配) | |
--sort-by <FIELD> | 排序:cpu|memory | |
--quiet, -q | false | 仅打印 UUID |
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
输出列: UUID、Name、Status、Nodes、CPU、Memory
adora logs
显示和跟踪数据流与节点的日志。
adora logs [UUID_OR_NAME] [NODE] [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
[UUID_OR_NAME] | 数据流 UUID 或名称 | |
[NODE] | 节点名称(除非使用 --all-nodes,否则必填) | |
--all-nodes | false | 按时间戳合并所有节点的日志 |
--tail <N> | all | 显示最后 N 行 |
--follow, -f | false | 流式输出新日志条目 |
--local | false | 从本地 out/ 目录读取 |
--since <DURATION> | 显示指定时间之后的日志 | |
--until <DURATION> | 显示指定时间之前的日志 | |
--level <LEVEL> | stdout | 最低日志级别 |
--log-format <FORMAT> | pretty | 输出格式 |
--log-filter <FILTER> | 按节点级别覆盖 | |
--grep <PATTERN> | 不区分大小写的文本搜索 | |
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
**过滤流水线:**读取/解析 -> 时间过滤 -> Grep -> Tail -> 显示
Examples:
# Follow all nodes live
adora logs my-dataflow --all-nodes --follow
# Last 50 errors from a specific node
adora logs my-dataflow sensor --level error --tail 50
# Search logs from last 5 minutes
adora logs my-dataflow --all-nodes --since 5m --grep "timeout"
# Read local files (no coordinator needed)
adora logs --local --all-nodes --tail 100
# Post-mortem analysis: errors in time window
adora logs --local sensor --since 1h --until 30m --level error
时间格式: 30(秒)、30s、5m、1h、2d
adora inspect top(别名:adora top)
节点资源使用的实时 TUI 监控器(类似 top)。
adora inspect top [OPTIONS]
adora top [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--refresh-interval <SECONDS> | 2 | 更新间隔(最小值:1) |
--once | false | 打印单次 JSON 快照并退出(用于脚本/CI) |
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
需要交互式终端(除非使用 --once)。
| Key | 动作 |
|---|---|
q / Esc | Quit |
Up / k | 选择上一个节点 |
Down / j | 选择下一个节点 |
n | 按节点名称排序 |
c | 按 CPU 排序 |
m | 按内存排序 |
r | 强制刷新 |
列: NODE、STATUS、DATAFLOW、PID、CPU%、MEMORY (MB)、RESTARTS、QUEUE、NET TX、NET RX、I/O READ (MB/s)、I/O WRITE (MB/s)
- STATUS:Running、Restarting、Degraded(输入中断)或 Failed
- RESTARTS:每个节点的当前重启次数
- QUEUE:节点输入队列中的待处理消息
- NET TX/RX:通过 Zenoh 跨守护进程发送/接收的累计网络字节数
CPU 值为单核百分比(多核情况下可超过 100%)。指标来自守护进程,因此适用于分布式部署。
脚本示例:
# JSON snapshot for CI/monitoring pipelines
adora top --once | jq '.[].cpu_usage'
adora topic list
列出运行中数据流的所有主题(输出)。
adora topic list [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
-d <DATAFLOW>, --dataflow | interactive | 数据流 UUID 或名称 |
--format <FMT> | table | 输出格式:table|json |
adora topic echo
订阅主题并实时显示消息。
adora topic echo [OPTIONS] [DATA...]
| 标志 | 默认 | 描述 |
|---|---|---|
-d <DATAFLOW>, --dataflow | 必需 | 数据流 UUID 或名称 |
[DATA...] | 所有输出 | 要回显的主题(例如 node1/output) |
--format <FMT> | table | 输出格式:table|json |
需要在描述符中设置 _unstable_debug.publish_all_messages_to_zenoh: true。
adora topic hz
使用 TUI 仪表盘测量主题发布频率。
adora topic hz [OPTIONS] [DATA...]
| 标志 | 默认 | 描述 |
|---|---|---|
-d <DATAFLOW>, --dataflow | 必需 | 数据流 UUID 或名称 |
[DATA...] | 所有输出 | 要测量的主题 |
--window <SECONDS> | 10 | 滑动窗口(最小值:1) |
**需要交互式终端。**显示:Avg (ms)、Avg (Hz)、Min (ms)、Max (ms)、Std (ms),以及所选主题的速率火花图和直方图。
adora topic info
显示单个主题的详细元数据。
adora topic info [OPTIONS] DATA
| 标志 | 默认 | 描述 |
|---|---|---|
-d <DATAFLOW>, --dataflow | 必需 | 数据流 UUID 或名称 |
DATA | 必需 | 单个主题(例如 camera/image) |
--duration <SECONDS> | 5 | 采集持续时间(最小值:1) |
订阅主题指定的持续时间并报告:类型(Arrow schema)、发布者、订阅者、消息计数、带宽。
adora node
管理和检查数据流节点。
adora node list
adora node list [OPTIONS]
列出运行中数据流的节点及其状态、CPU、内存和重启次数。
列: NODE、STATUS、PID、CPU%、MEMORY (MB)、RESTARTS、DATAFLOW
adora node info
Show detailed information about a specific node including status, inputs, outputs, and metrics.
adora node info <NODE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID to inspect |
-d <DATAFLOW>, --dataflow | interactive | 数据流 UUID 或名称 |
-f <FORMAT>, --format | table | 输出格式:table|json |
adora node restart
Restart a single node within a running dataflow. The daemon stops the node process and respawns it.
adora node restart <NODE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID to restart |
-d <DATAFLOW>, --dataflow | interactive | 数据流 UUID 或名称 |
--grace <DURATION> | Grace period before force-killing the node |
adora node stop
Stop a single node within a running dataflow without stopping the entire dataflow.
adora node stop <NODE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID to stop |
-d <DATAFLOW>, --dataflow | interactive | 数据流 UUID 或名称 |
--grace <DURATION> | Grace period before force-killing the node |
adora topic pub
Publish JSON data to a topic in a running dataflow. Requires publish_all_messages_to_zenoh: true.
adora topic pub <TOPIC> [DATA] [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<TOPIC> | 必需 | Topic to publish to (format: node_id/output_id) |
[DATA] | JSON data to publish (required unless --file) | |
--file <PATH> | Read data from a JSON file instead of command line | |
--count <N> | 1 | Number of messages to publish |
-d <DATAFLOW>, --dataflow | 必需 | 数据流 UUID 或名称 |
Examples:
# Publish a single value
adora topic pub -d my-app sensor/threshold '[42]'
# Publish from file, 10 times
adora topic pub -d my-app sensor/config --file config.json --count 10
adora param
Manage runtime parameters for nodes. Parameters are persisted in the coordinator store and optionally forwarded to running nodes.
adora param list
List all runtime parameters for a node.
adora param list <NODE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | 节点 ID |
-d <DATAFLOW>, --dataflow | interactive | 数据流 UUID 或名称 |
--format <FMT> | table | 输出格式:table|json |
adora param get
Get a single runtime parameter value.
adora param get <NODE> <KEY> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | 节点 ID |
<KEY> | 必需 | Parameter key |
-d <DATAFLOW>, --dataflow | interactive | 数据流 UUID 或名称 |
adora param set
Set a runtime parameter. The value is JSON. The parameter is stored in the coordinator and forwarded to the node if it is running.
adora param set <NODE> <KEY> <VALUE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | 节点 ID |
<KEY> | 必需 | Parameter key (max 256 bytes) |
<VALUE> | 必需 | Parameter value as JSON (max 64KB serialized) |
-d <DATAFLOW>, --dataflow | interactive | 数据流 UUID 或名称 |
Examples:
# Set a numeric parameter
adora param set -d my-app sensor threshold 42
# Set a string parameter
adora param set -d my-app camera resolution '"1080p"'
# Set a complex parameter
adora param set -d my-app detector config '{"confidence": 0.8, "nms": 0.5}'
adora param delete
Delete a runtime parameter.
adora param delete <NODE> <KEY> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | 节点 ID |
<KEY> | 必需 | Parameter key |
-d <DATAFLOW>, --dataflow | interactive | 数据流 UUID 或名称 |
adora doctor
Diagnose environment, coordinator/daemon connectivity, and optionally validate a dataflow YAML.
adora doctor [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--dataflow <PATH> | Path to a dataflow YAML to validate |
Checks performed:
- Coordinator reachability
- Daemon connectivity
- Active dataflow status
- Dataflow YAML validation (if
--dataflowprovided)
Examples:
# Basic health check
adora doctor
# Check environment + validate a dataflow
adora doctor --dataflow dataflow.yml
adora trace list
列出协调器捕获的最近追踪。协调器在内存中捕获 adora_coordinator 和 adora_core crate 的 span(最多 4096 个 span)。无需外部追踪基础设施。
adora trace list [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
输出列: TRACE ID(前 12 个字符)、ROOT SPAN、SPANS、STARTED、DURATION
Example:
adora trace list
TRACE ID ROOT SPAN SPANS STARTED DURATION
a1b2c3d4e5f6 spawn_dataflow 12 2026-03-01 10:30:05 1.234s
f8e7d6c5b4a3 build_dataflow 5 2026-03-01 10:29:58 0.500s
adora trace view
以缩进树形式查看特定追踪的 span。支持追踪 ID 的前缀匹配。
adora trace view <TRACE_ID> [OPTIONS]
| 参数/标志 | 默认 | 描述 |
|---|---|---|
<TRACE_ID> | 必需 | 完整追踪 ID 或唯一前缀 |
--coordinator-addr <IP> | 127.0.0.1 | 协调器地址 |
--coordinator-port <PORT> | 6013 | 协调器端口 |
Example:
adora trace view a1b2c3d4
spawn_dataflow [INFO 1.234s] {build_id="abc", session_id="def"}
build_dataflow [INFO 0.500s]
download_node [DEBUG 0.200s] {url="..."}
start_inner [INFO 0.734s]
spawn_node [INFO 0.100s] {node_id="camera"}
spawn_node [INFO 0.080s] {node_id="detector"}
追踪 ID 支持前缀匹配:如果前缀唯一标识一个追踪,则自动解析。如果有歧义,系统会提示您使用更长的前缀。
设置命令
adora status(别名:adora check)
检查系统健康状态和连接性。
adora status [OPTIONS]
报告协调器连接状态、守护进程状态和活跃数据流数量。
adora new
从模板生成新的项目或节点。
adora new <NAME> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NAME> | 必需 | 项目或节点名称 |
--kind <KIND> | dataflow | dataflow|node |
--lang <LANG> | rust | rust|python|c|cxx |
adora expand
Expand module references in a dataflow and print the resulting flat YAML. Useful for debugging module composition.
adora expand <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | Dataflow descriptor (or module file with --module) |
--module | false | Validate a standalone module file instead of a full dataflow |
Examples:
# Expand a dataflow with modules
adora expand dataflow.yml
# Validate a module file
adora expand --module modules/navigation.module.yml
See the Modules Guide for full documentation on module composition.
adora graph
以图形方式可视化数据流。
adora graph <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | 数据流描述符路径 |
--mermaid | false | 输出 Mermaid 图表文本 |
--open | false | 在浏览器中打开 HTML |
Without --mermaid, generates an interactive HTML file using mermaid.js. When outputs have type annotations, edge labels include the type name (e.g. image [Image]).
# Generate HTML
adora graph dataflow.yml --open
# Generate Mermaid for GitHub markdown
adora graph dataflow.yml --mermaid
adora validate
Validate a dataflow YAML file and check type annotations.
adora validate <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | 数据流描述符路径 |
--strict-types | false | Treat warnings as errors (non-zero exit code for CI) |
Checks:
- Key existence:
output_types/input_typeskeys exist in the correspondingoutputs/inputslists - URN resolution: All type URNs resolve in the standard or user-defined type library
- Edge compatibility: Connected edges have compatible types (exact match, widening, or user-defined rules)
- Parameterized types: Parameter mismatches (e.g.
AudioFrame[sample_type=f32]vsAudioFrame[sample_type=i16]) - Timer auto-typing: Timer inputs are automatically typed as
std/core/v1/UInt64 - Type inference: When only upstream annotates a type, it is inferred on the downstream input
- Metadata patterns:
output_metadatakeys andpatternshorthands are validated - Schema compatibility: Struct types are checked at the field level (missing/wrong fields)
User-defined types in a types/ directory next to the dataflow are loaded automatically.
# Validate with warnings
adora validate dataflow.yml
# Strict mode for CI (exit 1 on warnings)
adora validate --strict-types dataflow.yml
See the Type Annotations Guide for the full type library and usage details.
实用命令
adora completion
生成 shell 补全脚本。
adora completion [SHELL]
省略时自动检测 shell。支持:bash、zsh、fish、elvish、powershell。
# Bash
eval "$(adora completion bash)"
echo 'eval "$(adora completion bash)"' >> ~/.bashrc
# Zsh
eval "$(adora completion zsh)"
echo 'eval "$(adora completion zsh)"' >> ~/.zshrc
# Fish
adora completion fish > ~/.config/fish/completions/adora.fish
adora system
系统管理命令。
adora system status [OPTIONS]
目前提供 status 作为子命令(等同于 adora status)。
自管理命令
adora self update
检查并安装 CLI 更新。
adora self update [--check-only]
从 GitHub releases(dora-rs/adora)下载。
adora self uninstall
从系统中移除 CLI。
adora self uninstall [--force]
不使用 --force 时,会提示确认(需要 TTY)。依次尝试 uv pip uninstall、pip uninstall,然后二进制自删除。
环境变量
所有环境变量作为备选值。CLI 标志始终优先。
| 变量 | 默认 | 命令 | 描述 |
|---|---|---|---|
ADORA_COORDINATOR_ADDR | 127.0.0.1 | 所有协调器命令 | 协调器 IP 地址 |
ADORA_COORDINATOR_PORT | 6013 | 所有协调器命令 | 协调器 WebSocket 端口 |
ADORA_LOG_LEVEL | stdout | run、logs | 默认最低日志级别 |
ADORA_LOG_FORMAT | pretty | run、logs | 默认输出格式 |
ADORA_LOG_FILTER | run、logs | 默认的按节点级别覆盖 | |
ADORA_ALLOW_SHELL_NODES | run | 启用 shell 节点执行 | |
ADORA_RUNTIME_TYPE_CHECK | run, start | Runtime type checking: warn (log mismatches) or error (fail on mismatch). See Type Annotations |
# Set defaults for a development session
export ADORA_COORDINATOR_ADDR=192.168.1.10
export ADORA_LOG_LEVEL=info
export ADORA_LOG_FORMAT=compact
架构指南
本节面向希望了解框架内部机制、扩展框架或调试问题的开发者。
通信栈
┌─────────────────────────────────────┐
│ CLI (adora) │
│ WebSocket (JSON request/reply) │
└─────────────┬───────────────────────┘
│
┌─────────────▼───────────────────────┐
│ Coordinator │
│ WebSocket control + daemon mgmt │
│ State: InMemoryStore | RedbStore │
└──┬──────────────────────────────┬───┘
│ │
┌────────────▼──────────┐ ┌─────────────▼──────────┐
│ Daemon A │ │ Daemon B │
│ (machine: robot) │ │ (machine: gpu-server) │
│ │ │ │
│ ┌─────┐ ┌─────┐ │ │ ┌──────┐ ┌───────┐ │
│ │Node1│ │Node2│ │ │ │Node3 │ │Node4 │ │
│ └──┬──┘ └──┬──┘ │ │ └──┬───┘ └───┬───┘ │
│ │shmem │shmem │ │ │shmem │shmem │
│ └────┬────┘ │ │ └─────┬─────┘ │
└──────────┼────────────┘ └───────────┼────────────┘
│ │
└──────── Zenoh pub/sub ────────┘
(cross-machine)
协议层
| 层级 | Transport | Format | Use |
|---|---|---|---|
| CLI <-> 协调器 | WebSocket | JSON(ControlRequest/Reply) | 命令、日志流 |
| 协调器 <-> 守护进程 | WebSocket | JSON(DaemonCoordinatorEvent) | 节点生命周期、指标 |
| 守护进程 <-> 节点(小消息) | TCP / Unix socket | 自定义二进制 | 控制消息、小数据 |
| 守护进程 <-> 节点(大消息) | 共享内存 | 零拷贝 Arrow | 数据消息 > 4KB |
| 守护进程 <-> 守护进程 | Zenoh 发布/订阅 | Arrow + 元数据 | 跨机器数据路由 |
协调器内部机制
协调器是一个事件驱动的异步服务器:
Event Sources:
- CLI WebSocket connections (ControlRequest)
- Daemon WebSocket connections (DaemonEvent)
- Heartbeat timer (3s interval)
- External events (for embedding)
Event Loop:
merge_all(cli_events, daemon_events, heartbeat, external)
-> handle_event()
-> update state
-> persist to store (if redb)
-> send replies
关键类型:
#![allow(unused)]
fn main() {
// State
RunningDataflow { uuid, name, descriptor, daemons, node_metrics, ... }
RunningBuild { build_id, errors, log_subscribers, pending_results, ... }
DaemonConnection { sender, pending_replies, last_heartbeat }
// Store trait
trait CoordinatorStore: Send + Sync {
fn put_dataflow(&self, record: &DataflowRecord) -> Result<()>;
fn get_dataflow(&self, uuid: &Uuid) -> Result<Option<DataflowRecord>>;
fn list_dataflows(&self) -> Result<Vec<DataflowRecord>>;
// ... daemon and build methods
}
}
存储后端:
memory(默认):内存存储,重启后丢失。redb:持久化到磁盘(~/.adora/coordinator.redb)。崩溃后可恢复。需要redb-backendfeature。
adora coordinator --store redb
adora coordinator --store redb:/custom/path.redb
守护进程内部机制
守护进程管理单台机器上的节点进程:
Per Node:
1. Build (if build command specified)
2. Spawn process with ADORA_NODE_CONFIG env var
3. Node registers via TCP/shmem handshake
4. Route inputs/outputs between nodes
5. Collect metrics (CPU, memory, I/O)
6. Handle restart policy on exit
7. Forward logs to coordinator
Communication:
- Shared memory for messages > 4KB (zero-copy)
- TCP for control messages and small data
- flume channels for internal event routing
指标采集:
#![allow(unused)]
fn main() {
struct NodeMetrics {
pid: u32,
cpu_usage: f32, // 单核百分比
memory_mb: f64,
disk_read_mb_s: Option<f64>,
disk_write_mb_s: Option<f64>,
status: NodeStatus, // Running | Restarting | Degraded | Failed
restart_count: u32,
pending_messages: u64,
}
}
消息类型
所有组件间消息定义在 libraries/message/ 中:
#![allow(unused)]
fn main() {
// 节点标识
struct NodeId(String); // [a-zA-Z0-9_.-]
struct DataId(String); // 相同验证
type DataflowId = uuid::Uuid;
// 数据元数据
struct Metadata {
timestamp: uhlc::Timestamp, // 混合逻辑时钟
type_info: ArrowTypeInfo, // Arrow schema
parameters: MetadataParameters, // 自定义键值对
}
// 节点事件(守护进程 -> 节点)
enum NodeEvent {
Stop,
Reload { operator_id },
Input { id, metadata, data },
InputClosed { id },
InputRecovered { id },
NodeRestarted { id },
AllInputsClosed,
}
}
时间戳
Adora 使用统一混合逻辑时钟(UHLC)实现分布式因果关系。每条消息携带一个 uhlc::Timestamp,无需同步时钟即可在跨机器间保持因果顺序。
零拷贝共享内存
对于大消息(> 4KB),守护进程使用共享内存区域:
- 发送节点向守护进程请求共享内存槽位
- 守护进程分配区域并返回 ID
- 发送方将 Arrow 数据直接写入共享内存
- 守护进程通知接收节点区域 ID
- 接收方直接从共享内存读取(零拷贝)
- 接收方完成后发送释放令牌
对于大数据载荷,这比 ROS2 实现了 10-17 倍的延迟降低。
编写节点
Rust 节点
use adora_node_api::{AdoraNode, Event, IntoArrow};
use adora_core::config::DataId;
fn main() -> eyre::Result<()> {
let (mut node, mut events) = AdoraNode::init_from_env()?;
let output = DataId::from("result".to_owned());
while let Some(event) = events.recv() {
match event {
Event::Input { id, metadata, data } => {
// 处理输入数据(Arrow 数组)
let result: u64 = 42;
node.send_output(
output.clone(),
metadata.parameters,
result.into_arrow(),
)?;
}
Event::Stop(_) => break,
Event::InputClosed { id } => {
eprintln!("input {id} closed");
}
Event::InputRecovered { id } => {
eprintln!("input {id} recovered");
}
_ => {}
}
}
Ok(())
}
Cargo.toml:
[dependencies]
adora-node-api = { workspace = true }
eyre = "0.6"
Python 节点
import pyarrow as pa
from adora import Node
node = Node()
for event in node:
if event["type"] == "INPUT":
# event["value"] is a PyArrow array
values = event["value"].to_pylist()
result = pa.array([sum(values)])
node.send_output("result", result)
elif event["type"] == "STOP":
break
C 节点
#include "node_api.h"
int main() {
void *ctx = init_adora_context_from_env();
// ... 使用 adora_next_event / adora_send_output 的事件循环
free_adora_context(ctx);
return 0;
}
节点日志
节点可以发出结构化日志:
Rust:
#![allow(unused)]
fn main() {
// 通过 tracing(推荐)
tracing::info!("processing frame {}", frame_id);
// 通过节点 API
node.log_info("processing complete");
node.log_with_fields("info", "reading", None, Some(&fields));
}
Python:
import logging
logging.info("processing frame %d", frame_id)
# Or via node API
node.log("info", "processing complete")
编写算子
算子在共享运行时内以进程内方式运行,避免了进程启动开销。
Rust 算子
#![allow(unused)]
fn main() {
use adora_operator_api::{register_operator, AdoraOperator, AdoraOutputSender, AdoraStatus, Event};
#[register_operator]
#[derive(Default)]
pub struct MyOperator {
counter: u32,
}
impl AdoraOperator for MyOperator {
fn on_event(
&mut self,
event: &Event,
output_sender: &mut AdoraOutputSender,
) -> Result<AdoraStatus, String> {
match event {
Event::Input { id, data } => {
self.counter += 1;
output_sender.send(
"count".to_string(),
arrow::array::UInt32Array::from(vec![self.counter]),
)?;
Ok(AdoraStatus::Continue)
}
Event::Stop => Ok(AdoraStatus::Stop),
_ => Ok(AdoraStatus::Continue),
}
}
}
}
Cargo.toml:
[lib]
crate-type = ["cdylib"]
[dependencies]
adora-operator-api = { workspace = true }
arrow = "53"
Python 算子
nodes:
- id: my-node
operator:
python: my_operator.py
inputs:
data: source/output
outputs:
- result
# my_operator.py
class Operator:
def __init__(self):
self.counter = 0
def on_event(self, event, send_output):
if event["type"] == "INPUT":
self.counter += 1
send_output("result", pa.array([self.counter]))
分布式部署
设置
# Machine A (coordinator + daemon)
adora up
# Machine B (daemon only, pointing to coordinator on Machine A)
adora daemon --interface 0.0.0.0 --coordinator-addr 192.168.1.10 --machine-id B
# Machine C (same)
adora daemon --interface 0.0.0.0 --coordinator-addr 192.168.1.10 --machine-id C
带机器分配的数据流
nodes:
- id: camera
_unstable_deploy:
machine: robot
path: ./camera-driver
outputs:
- frames
- id: inference
_unstable_deploy:
machine: gpu-server
path: ./ml-model
inputs:
frames: camera/frames
outputs:
- predictions
- id: actuator
_unstable_deploy:
machine: robot
path: ./actuator-driver
inputs:
commands: inference/predictions
构建和启动
# From any machine with coordinator access
adora build dataflow.yml # distributed build on target machines
adora start dataflow.yml --name my-robot --attach
监控
# Resource usage across all machines
adora top
# Logs from any node regardless of machine
adora logs my-robot inference --follow
# List all dataflows
adora list
协调器持久化
在生产环境中,使用 redb 存储后端以使协调器在重启后存续:
adora coordinator --store redb
状态持久化到 ~/.adora/coordinator.redb。重启时,过期的数据流会被标记为失败,协调器恢复正常运行。
For managed cluster deployments (cluster.yml, SSH-based lifecycle, label scheduling, systemd services, rolling upgrades), see the Distributed Deployment Guide.
故障排除
关于涵盖录制/回放工作流、主题检查、资源监控和端到端调试场景的综合调试指南,请参阅调试与可观测性指南。
常见问题
“Could not connect to adora-coordinator”
- 先运行
adora up,或检查ADORA_COORDINATOR_ADDR/ADORA_COORDINATOR_PORT - 使用
adora status验证
“publish_all_messages_to_zenoh not enabled”
-
使用
--debug标志:adora start dataflow.yml --debug或adora run dataflow.yml --debug -
或在数据流 YAML 中添加:
_unstable_debug: publish_all_messages_to_zenoh: true -
topic echo、topic hz、topic info需要此设置
“adora top requires an interactive terminal”
- 这些 TUI 命令需要真实终端(非管道输出)
- 同样适用于
topic hz
节点未接收到输入
- 检查输出名称是否匹配:
source_node/output_id - 验证源节点在其
outputs:数组中列出了该输出 - 使用
adora topic list检查可用主题
日志未显示
- 检查
--log-level设置(默认stdout显示所有内容) - 检查 YAML 中的
min_log_level(在源端过滤) - 分布式环境下:验证协调器/守护进程的连接性
使用 git 源构建失败
- 验证
git:URL 是否可访问 - 检查
branch、tag或rev是否存在 - 构建命令从 git 仓库根目录运行,而非数据流目录
调试工作流
# 1. Full environment diagnosis
adora doctor --dataflow dataflow.yml
# 2. Start with verbose logging and debug topics
adora run dataflow.yml --log-level trace --debug
# 3. Inspect a specific node
adora node info -d my-dataflow problem-node
# 4. Monitor specific node logs
adora logs my-dataflow problem-node --follow --level debug
# 5. Check resource usage
adora top
# 6. Inspect topic data
adora topic echo -d my-dataflow problem-node/output
# 7. Publish test data to a topic
adora topic pub -d my-dataflow problem-node/input '[1, 2, 3]'
# 8. Measure frequencies
adora topic hz -d my-dataflow --window 5
# 9. View/modify runtime parameters
adora param list -d my-dataflow problem-node
adora param set -d my-dataflow problem-node threshold 42
# 10. Restart a misbehaving node without stopping the dataflow
adora node restart -d my-dataflow problem-node
# 11. View coordinator traces (no external infra needed)
adora trace list
adora trace view <trace-id-prefix>
# 12. Visualize dataflow graph
adora graph dataflow.yml --open
日志文件位置
out/
<dataflow-uuid>/
log_<node-id>.jsonl # current log
log_<node-id>.1.jsonl # rotated (previous)
log_<node-id>.2.jsonl # rotated (older)
直接读取:
adora logs --local --all-nodes
adora logs --local <node-name> --tail 50