Dora CLI 参考
Dora(AI-Dora,数据流导向机器人架构)是一个 100% Rust 的实时机器人与 AI 应用框架。本文档从终端用户和开发者两个角度介绍 dora CLI。
目录
- Quick Start
- Installation
- Core Concepts
- Dataflow Descriptor
- Command Reference
- Environment Variables
- Architecture Guide
- Writing Nodes
- Writing Operators
- Distributed Deployments – see also Distributed Deployment Guide for cluster management, scheduling, and operations
- Troubleshooting
- Debugging and Observability – standalone guide covering record/replay, topic inspection, log analysis, and resource monitoring
- API References: Rust | Python | C | C++
快速开始
# Create a new project
dora new my-robot --kind dataflow --lang rust
# Run locally (no coordinator/daemon needed)
dora run dataflow.yml
# Or use coordinator/daemon for production
dora up
dora start dataflow.yml --attach
# Ctrl-C to stop
dora down
安装
From crates.io (recommended)
cargo install dora-cli
从源码安装
cargo install --path binaries/cli --locked
验证
dora --version
dora status
核心概念
数据流
数据流是由类型化数据通道连接的节点有向图。节点产生输出供其他节点作为输入消费。框架处理数据路由、序列化(Apache Arrow)和生命周期管理。
执行模式
| 模式 | 命令 | 基础设施 | 用例 |
|---|---|---|---|
| 本地 | dora run | 无 | 开发、测试、单机 |
| 分布式 | dora up + dora start | Coordinator + Daemon(s) | 生产、多机 |
组件角色
CLI --> Coordinator --> Daemon(s) --> Nodes / Operators
(control plane) (per machine) (user code)
- CLI:用户界面。发送命令、显示日志。
- 协调器:跨机器编排数据流生命周期。
- 守护进程:生成节点进程、管理 IPC、收集指标。
- 节点:产生和消费 Arrow 数据的独立进程。
- 算子:在共享运行时内运行的进程内代码(比节点更低延迟)。
数据格式
所有数据以 Apache Arrow 列式数组的形式在系统中流转。这使得同机节点间零拷贝共享内存传输和零序列化开销成为可能。
数据流描述符
Dataflows are defined in YAML files. Here is the complete schema:
最小示例
nodes:
- id: sender
path: sender.py
outputs:
- message
- id: receiver
path: receiver.py
inputs:
message: sender/message
完整模式
# Dataflow-level settings
health_check_interval: 5.0 # health check sweep interval in seconds (default: 5.0)
nodes:
- id: my-node # unique identifier (required)
name: "My Node" # human-readable name (optional)
description: "..." # description (optional)
# --- Source (pick one) ---
path: ./target/debug/my-node # local executable
# path: https://example.com/node.zip # download from URL
# git: https://github.com/org/repo.git # build from git
# branch: main # git branch (mutually exclusive with tag/rev)
# tag: v1.0 # git tag
# rev: abc123 # git commit hash
# --- Build ---
build: cargo build -p my-node # shell command to build (optional)
# --- Inputs ---
inputs:
# Short form: source_node/output_id
tick: dora/timer/millis/100
data: other-node/output
# Long form with options
sensor_data:
source: sensor/frames
queue_size: 10 # input buffer size (default: 10)
queue_policy: drop_oldest # or "backpressure" (buffers up to 10x queue_size)
input_timeout: 5.0 # circuit breaker timeout in seconds
# --- Outputs ---
outputs:
- processed
- status
# --- Environment ---
env:
MY_VAR: "value"
FROM_ENV:
__dora_env: HOST_VAR # read from host environment
args: "--verbose" # command-line arguments
# --- Fault tolerance ---
restart_policy: on-failure # never (default) | on-failure | always
max_restarts: 5 # 0 = unlimited
restart_delay: 1.0 # initial backoff in seconds
max_restart_delay: 30.0 # backoff cap in seconds
restart_window: 300.0 # reset counter after N seconds
health_check_timeout: 30.0 # kill if no activity for N seconds
# --- Logging ---
min_log_level: info # source-level filter (daemon-side)
send_stdout_as: raw_output # route raw stdout as data output
send_logs_as: log_entries # route structured logs as data output
max_log_size: "50MB" # rotate log files at this size
max_rotated_files: 5 # number of rotated files to keep (1-100)
# --- Deployment ---
_unstable_deploy:
machine: A # target machine/daemon ID
# Debug settings
_unstable_debug:
publish_all_messages_to_zenoh: true # required for topic echo/hz/info
内置定时器节点
定时器是以固定间隔发出 tick 的虚拟节点:
inputs:
tick: dora/timer/millis/100 # every 100ms
slow: dora/timer/millis/1000 # every 1s
fast: dora/timer/hz/30 # 30 Hz (~33ms)
算子节点
算子在共享运行时中进程内运行(无独立进程):
nodes:
# Single operator (shorthand)
- id: detector
operator:
python: detect.py
build: pip install -r requirements.txt
inputs:
image: camera/frames
outputs:
- bbox
# Multiple operators sharing a runtime
- id: runtime-node
operators:
- id: preprocessor
shared-library: ../../target/debug/libpreprocess
inputs:
raw: sensor/data
outputs:
- processed
- id: analyzer
shared-library: ../../target/debug/libanalyze
inputs:
data: runtime-node/preprocessor/processed
outputs:
- result
分布式部署
使用 _unstable_deploy 将节点分配到特定机器:
nodes:
- id: camera-driver
_unstable_deploy:
machine: robot-arm
path: ./target/debug/camera
outputs:
- frames
- id: ml-inference
_unstable_deploy:
machine: gpu-server
path: ./target/debug/inference
inputs:
frames: camera-driver/frames
outputs:
- predictions
当节点位于不同机器时,通信自动从共享内存切换到 Zenoh 发布/订阅。
命令参考
生命周期命令
dora run
在本地运行数据流,无需协调器或守护进程。适合开发和测试。
dora run <PATH> [OPTIONS]
| 参数/标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | Path to dataflow descriptor YAML |
--stop-after <DURATION> | Auto-stop after duration (e.g., 30s, 5m) | |
--uv | false | Use uv for Python node management |
--debug | false | Enable debug topics (equivalent to publish_all_messages_to_zenoh: true) |
--allow-shell-nodes | false | Enable shell-based node execution |
--log-level <LEVEL> | stdout | Min display level: error|warn|info|debug|trace|stdout |
--log-format <FORMAT> | pretty | Output format: pretty|json|compact |
--log-filter <FILTER> | Per-node level overrides: "node1=debug,node2=warn" |
Examples:
# Basic run
dora run dataflow.yml
# Stop after 10 seconds, only show warnings
dora run dataflow.yml --stop-after 10s --log-level warn
# Python dataflow with uv
dora run dataflow.yml --uv
# Debug one node, silence others
dora run dataflow.yml --log-level warn --log-filter "sensor=debug"
# JSON output for CI pipelines
dora run dataflow.yml --log-format json --stop-after 30s 2>test.json
dora up
在本地模式下启动协调器和守护进程。
dora up
Spawns dora coordinator and dora daemon as background processes. Waits for both to be ready before returning. Idempotent: if already running, does nothing.
dora down (alias: dora destroy)
拆卸协调器和守护进程。首先停止所有运行中的数据流。
dora down [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
dora build
运行数据流描述符中定义的构建命令。
dora build <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | Dataflow descriptor path |
--uv | false | Use uv for Python builds |
--local | false | Force local build (skip coordinator) |
--strict-types | false | Treat type warnings as errors (non-zero exit code) |
Type checking: After expanding modules, build runs the same type checks as validate. Warnings are printed by default; use --strict-types (or set strict_types: true in the YAML) to fail the build on type mismatches. User-defined types in a types/ directory next to the dataflow are loaded automatically.
Build strategy: If nodes have _unstable_deploy sections and a coordinator is reachable, builds are distributed to target machines. Otherwise, builds run locally.
Git sources: Nodes with a git: field are cloned/updated before building. The build command runs from the git repository root.
dora start
在运行中的协调器上启动数据流。
dora start <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | Dataflow descriptor path |
--name <NAME>, -n | Assign a name to the dataflow | |
--attach | auto | Attach to log stream and wait for completion |
--detach | auto | Return immediately after spawn |
--debug | false | Enable debug topics (equivalent to publish_all_messages_to_zenoh: true) |
--hot-reload | false | Watch Python files and reload on change |
--uv | false | Use uv for Python nodes |
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
If neither --attach nor --detach is specified: attaches if running in a TTY, detaches otherwise.
Attach mode: Streams logs, handles Ctrl-C gracefully (first = stop, second = force kill).
Hot reload: Watches Python operator source files. On change, sends a reload request to the coordinator which propagates to the daemon.
dora stop
停止运行中的数据流。
dora stop [UUID_OR_NAME] [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
[UUID_OR_NAME] | interactive | Dataflow UUID or name |
--name <NAME>, -n | Alternative name specification | |
--grace-duration <DURATION> | Graceful shutdown timeout | |
--force, -f | false | Immediate termination |
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
If no identifier is given and running in a TTY, presents an interactive picker.
Stop sequence: Send Event::Stop -> wait grace duration -> SIGTERM -> hard kill.
dora restart
重启运行中的数据流(停止 + 使用存储的描述符重新启动)。无需 YAML 路径 – 协调器保留了原始描述符。
dora restart [UUID] [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
[UUID] | Dataflow UUID | |
--name <NAME>, -n | Restart by name instead of UUID | |
--grace-duration <DURATION> | Graceful shutdown timeout for the stop phase | |
--force, -f | false | Force kill before restart |
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
Examples:
# Restart by name
dora restart --name my-app
# Restart by UUID with forced stop
dora restart a1b2c3d4-... --force
dora record
Record dataflow messages to an .adorec file for offline replay. See Debugging Guide for full workflows.
dora record <DATAFLOW_YAML> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<DATAFLOW_YAML> | 必需 | Path to dataflow descriptor |
-o, --output <PATH> | recording_{timestamp}.adorec | Output file path |
--topics <TOPICS> | all | Comma-separated node/output topics to record |
--proxy | false | Stream via WebSocket instead of recording on target |
--output-yaml <PATH> | Write modified YAML without running (dry run) |
Default mode injects a record node into the dataflow. --proxy mode requires a running dataflow and publish_all_messages_to_zenoh: true.
dora replay
Replay a recorded .adorec file by replacing source nodes with replay nodes. See Debugging Guide for full workflows.
dora replay <FILE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<FILE> | 必需 | Path to .adorec recording |
--speed <FLOAT> | 1.0 | Playback speed (0 = max speed) |
--loop | false | Loop the recording |
--replace <NODE_IDS> | all recorded | Comma-separated nodes to replace |
--output-yaml <PATH> | Write modified YAML without running (dry run) |
监控命令
dora list (alias: dora ps)
列出运行中的数据流及指标。
dora list [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--format <FMT>, -f | table | Output format: table|json |
--status <STATUS> | Filter: running|finished|failed | |
--name <PATTERN> | Filter by name (case-insensitive substring) | |
--sort-by <FIELD> | Sort by: cpu|memory | |
--quiet, -q | false | Print only UUIDs |
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
Output columns: UUID, Name, Status, Nodes, CPU, Memory
dora logs
显示和跟踪数据流与节点的日志。
dora logs [UUID_OR_NAME] [NODE] [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
[UUID_OR_NAME] | Dataflow UUID or name | |
[NODE] | Node name (required unless --all-nodes) | |
--all-nodes | false | Merge logs from all nodes by timestamp |
--tail <N> | all | Show last N lines |
--follow, -f | false | Stream new log entries |
--local | false | Read from local out/ directory |
--since <DURATION> | Show logs newer than duration ago | |
--until <DURATION> | Show logs older than duration ago | |
--level <LEVEL> | stdout | Min log level |
--log-format <FORMAT> | pretty | Output format |
--log-filter <FILTER> | Per-node level overrides | |
--grep <PATTERN> | Case-insensitive text search | |
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
Filter pipeline: Read/Parse -> Time filters -> Grep -> Tail -> Display
Examples:
# Follow all nodes live
dora logs my-dataflow --all-nodes --follow
# Last 50 errors from a specific node
dora logs my-dataflow sensor --level error --tail 50
# Search logs from last 5 minutes
dora logs my-dataflow --all-nodes --since 5m --grep "timeout"
# Read local files (no coordinator needed)
dora logs --local --all-nodes --tail 100
# Post-mortem analysis: errors in time window
dora logs --local sensor --since 1h --until 30m --level error
Duration formats: 30 (seconds), 30s, 5m, 1h, 2d
dora inspect top (alias: dora top)
Real-time TUI monitor for node resource usage (like top).
dora inspect top [OPTIONS]
dora top [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--refresh-interval <SECONDS> | 2 | Update interval (min: 1) |
--once | false | Print a single JSON snapshot and exit (for scripting/CI) |
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
Requires an interactive terminal (unless --once is used).
| Key | 动作 |
|---|---|
q / Esc | Quit |
Up / k | Select previous node |
Down / j | Select next node |
n | Sort by node name |
c | Sort by CPU |
m | Sort by memory |
r | Force refresh |
Columns: NODE, STATUS, DATAFLOW, PID, CPU%, MEMORY (MB), RESTARTS, QUEUE, NET TX, NET RX, I/O READ (MB/s), I/O WRITE (MB/s)
- STATUS: Running, Restarting, Degraded (broken inputs), or Failed
- RESTARTS: Current restart count per node
- QUEUE: Pending messages in the node’s input queue
- NET TX/RX: Cumulative cross-daemon network bytes sent/received via Zenoh
CPU values are per-core (can exceed 100% with multiple cores). Metrics come from daemons, so this works for distributed deployments.
Scripting example:
# JSON snapshot for CI/monitoring pipelines
dora top --once | jq '.[].cpu_usage'
dora topic list
List all topics (outputs) in a running dataflow.
dora topic list [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
-d <DATAFLOW>, --dataflow | interactive | Dataflow UUID or name |
--format <FMT> | table | Output format: table|json |
dora topic echo
Subscribe to topics and display messages in real-time.
dora topic echo [OPTIONS] [DATA...]
| 标志 | 默认 | 描述 |
|---|---|---|
-d <DATAFLOW>, --dataflow | 必需 | Dataflow UUID or name |
[DATA...] | all outputs | Topics to echo (e.g., node1/output) |
--format <FMT> | table | Output format: table|json |
Requires _unstable_debug.publish_all_messages_to_zenoh: true in the descriptor.
dora topic hz
Measure topic publish frequency with a TUI dashboard.
dora topic hz [OPTIONS] [DATA...]
| 标志 | 默认 | 描述 |
|---|---|---|
-d <DATAFLOW>, --dataflow | 必需 | Dataflow UUID or name |
[DATA...] | all outputs | Topics to measure |
--window <SECONDS> | 10 | Sliding window (min: 1) |
Requires an interactive terminal. Displays: Avg (ms), Avg (Hz), Min (ms), Max (ms), Std (ms), plus a rate sparkline and histogram for the selected topic.
dora topic info
Show detailed metadata of a single topic.
dora topic info [OPTIONS] DATA
| 标志 | 默认 | 描述 |
|---|---|---|
-d <DATAFLOW>, --dataflow | 必需 | Dataflow UUID or name |
DATA | 必需 | Single topic (e.g., camera/image) |
--duration <SECONDS> | 5 | Collection duration (min: 1) |
Subscribes to the topic for the specified duration and reports: type (Arrow schema), publisher, subscribers, message count, bandwidth.
dora node
Manage and inspect dataflow nodes.
dora node list
dora node list [OPTIONS]
Lists nodes in a running dataflow with their status, CPU, memory, and restart count.
Columns: NODE, STATUS, PID, CPU%, MEMORY (MB), RESTARTS, DATAFLOW
dora node info
Show detailed information about a specific node including status, inputs, outputs, and metrics.
dora node info <NODE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID to inspect |
-d <DATAFLOW>, --dataflow | interactive | Dataflow UUID or name |
-f <FORMAT>, --format | table | Output format: table|json |
dora node restart
Restart a single node within a running dataflow. The daemon stops the node process and respawns it.
dora node restart <NODE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID to restart |
-d <DATAFLOW>, --dataflow | interactive | Dataflow UUID or name |
--grace <DURATION> | Grace period before force-killing the node |
dora node stop
Stop a single node within a running dataflow without stopping the entire dataflow.
dora node stop <NODE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID to stop |
-d <DATAFLOW>, --dataflow | interactive | Dataflow UUID or name |
--grace <DURATION> | Grace period before force-killing the node |
dora topic pub
Publish JSON data to a topic in a running dataflow. Requires publish_all_messages_to_zenoh: true.
dora topic pub <TOPIC> [DATA] [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<TOPIC> | 必需 | Topic to publish to (format: node_id/output_id) |
[DATA] | JSON data to publish (required unless --file) | |
--file <PATH> | Read data from a JSON file instead of command line | |
--count <N> | 1 | Number of messages to publish |
-d <DATAFLOW>, --dataflow | 必需 | Dataflow UUID or name |
Examples:
# Publish a single value
dora topic pub -d my-app sensor/threshold '[42]'
# Publish from file, 10 times
dora topic pub -d my-app sensor/config --file config.json --count 10
dora param
Manage runtime parameters for nodes. Parameters are persisted in the coordinator store and optionally forwarded to running nodes.
dora param list
List all runtime parameters for a node.
dora param list <NODE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID |
-d <DATAFLOW>, --dataflow | interactive | Dataflow UUID or name |
--format <FMT> | table | Output format: table|json |
dora param get
Get a single runtime parameter value.
dora param get <NODE> <KEY> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID |
<KEY> | 必需 | Parameter key |
-d <DATAFLOW>, --dataflow | interactive | Dataflow UUID or name |
dora param set
Set a runtime parameter. The value is JSON. The parameter is stored in the coordinator and forwarded to the node if it is running.
dora param set <NODE> <KEY> <VALUE> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID |
<KEY> | 必需 | Parameter key (max 256 bytes) |
<VALUE> | 必需 | Parameter value as JSON (max 64KB serialized) |
-d <DATAFLOW>, --dataflow | interactive | Dataflow UUID or name |
Examples:
# Set a numeric parameter
dora param set -d my-app sensor threshold 42
# Set a string parameter
dora param set -d my-app camera resolution '"1080p"'
# Set a complex parameter
dora param set -d my-app detector config '{"confidence": 0.8, "nms": 0.5}'
dora param delete
Delete a runtime parameter.
dora param delete <NODE> <KEY> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NODE> | 必需 | Node ID |
<KEY> | 必需 | Parameter key |
-d <DATAFLOW>, --dataflow | interactive | Dataflow UUID or name |
dora doctor
Diagnose environment, coordinator/daemon connectivity, and optionally validate a dataflow YAML.
dora doctor [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--dataflow <PATH> | Path to a dataflow YAML to validate |
Checks performed:
- Coordinator reachability
- Daemon connectivity
- Active dataflow status
- Dataflow YAML validation (if
--dataflowprovided)
Examples:
# Basic health check
dora doctor
# Check environment + validate a dataflow
dora doctor --dataflow dataflow.yml
dora trace list
List recent traces captured by the coordinator. The coordinator captures spans from dora_coordinator and dora_core crates in-memory (up to 4096 spans). No external tracing infrastructure required.
dora trace list [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
Output columns: TRACE ID (first 12 chars), ROOT SPAN, SPANS, STARTED, DURATION
Example:
dora trace list
TRACE ID ROOT SPAN SPANS STARTED DURATION
a1b2c3d4e5f6 spawn_dataflow 12 2026-03-01 10:30:05 1.234s
f8e7d6c5b4a3 build_dataflow 5 2026-03-01 10:29:58 0.500s
dora trace view
View spans for a specific trace as an indented tree. Supports prefix matching on trace IDs.
dora trace view <TRACE_ID> [OPTIONS]
| 参数/标志 | 默认 | 描述 |
|---|---|---|
<TRACE_ID> | 必需 | Full trace ID or unique prefix |
--coordinator-addr <IP> | 127.0.0.1 | Coordinator address |
--coordinator-port <PORT> | 6013 | Coordinator port |
Example:
dora trace view a1b2c3d4
spawn_dataflow [INFO 1.234s] {build_id="abc", session_id="def"}
build_dataflow [INFO 0.500s]
download_node [DEBUG 0.200s] {url="..."}
start_inner [INFO 0.734s]
spawn_node [INFO 0.100s] {node_id="camera"}
spawn_node [INFO 0.080s] {node_id="detector"}
Trace IDs are prefix-matched: if the prefix uniquely identifies a trace, it resolves automatically. If ambiguous, you’ll be prompted to use a longer prefix.
设置命令
dora status (alias: dora check)
检查系统健康状态和连接性。
dora status [OPTIONS]
Reports coordinator connectivity, daemon status, and active dataflow count.
dora new
从模板生成新的项目或节点。
dora new <NAME> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<NAME> | 必需 | Project or node name |
--kind <KIND> | dataflow | dataflow|node |
--lang <LANG> | rust | rust|python|c|cxx |
dora expand
Expand module references in a dataflow and print the resulting flat YAML. Useful for debugging module composition.
dora expand <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | Dataflow descriptor (or module file with --module) |
--module | false | Validate a standalone module file instead of a full dataflow |
Examples:
# Expand a dataflow with modules
dora expand dataflow.yml
# Validate a module file
dora expand --module modules/navigation.module.yml
See the Modules Guide for full documentation on module composition.
dora graph
以图形方式可视化数据流。
dora graph <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | Dataflow descriptor path |
--mermaid | false | Output Mermaid diagram text |
--open | false | Open HTML in browser |
Without --mermaid, generates an interactive HTML file using mermaid.js. When outputs have type annotations, edge labels include the type name (e.g. image [Image]).
# Generate HTML
dora graph dataflow.yml --open
# Generate Mermaid for GitHub markdown
dora graph dataflow.yml --mermaid
dora validate
Validate a dataflow YAML file and check type annotations.
dora validate <PATH> [OPTIONS]
| 标志 | 默认 | 描述 |
|---|---|---|
<PATH> | 必需 | Dataflow descriptor path |
--strict-types | false | Treat warnings as errors (non-zero exit code for CI) |
Checks:
- Key existence:
output_types/input_typeskeys exist in the correspondingoutputs/inputslists - URN resolution: All type URNs resolve in the standard or user-defined type library
- Edge compatibility: Connected edges have compatible types (exact match, widening, or user-defined rules)
- Parameterized types: Parameter mismatches (e.g.
AudioFrame[sample_type=f32]vsAudioFrame[sample_type=i16]) - Timer auto-typing: Timer inputs are automatically typed as
std/core/v1/UInt64 - Type inference: When only upstream annotates a type, it is inferred on the downstream input
- Metadata patterns:
output_metadatakeys andpatternshorthands are validated - Schema compatibility: Struct types are checked at the field level (missing/wrong fields)
User-defined types in a types/ directory next to the dataflow are loaded automatically.
# Validate with warnings
dora validate dataflow.yml
# Strict mode for CI (exit 1 on warnings)
dora validate --strict-types dataflow.yml
See the Type Annotations Guide for the full type library and usage details.
实用命令
dora completion
生成 shell 补全脚本。
dora completion [SHELL]
Shell is auto-detected if omitted. Supported: bash, zsh, fish, elvish, powershell.
# Bash
eval "$(dora completion bash)"
echo 'eval "$(dora completion bash)"' >> ~/.bashrc
# Zsh
eval "$(dora completion zsh)"
echo 'eval "$(dora completion zsh)"' >> ~/.zshrc
# Fish
dora completion fish > ~/.config/fish/completions/dora.fish
dora system
System management commands.
dora system status [OPTIONS]
Currently provides status as a subcommand (equivalent to dora status).
自管理命令
dora self update
检查并安装 CLI 更新。
dora self update [--check-only]
Downloads from GitHub releases (dora-rs/dora).
dora self uninstall
从系统中移除 CLI。
dora self uninstall [--force]
Without --force, prompts for confirmation (requires a TTY). Tries uv pip uninstall first, then pip uninstall, then binary self-delete.
环境变量
All environment variables serve as fallbacks. CLI flags always take precedence.
| 变量 | 默认 | 命令 | 描述 |
|---|---|---|---|
DORA_COORDINATOR_ADDR | 127.0.0.1 | All coordinator commands | Coordinator IP address |
DORA_COORDINATOR_PORT | 6013 | All coordinator commands | Coordinator WebSocket port |
DORA_LOG_LEVEL | stdout | run, logs | Default minimum log level |
DORA_LOG_FORMAT | pretty | run, logs | Default output format |
DORA_LOG_FILTER | run, logs | Default per-node level overrides | |
DORA_ALLOW_SHELL_NODES | run | Enable shell node execution | |
DORA_RUNTIME_TYPE_CHECK | run, start | Runtime type checking: warn (log mismatches) or error (fail on mismatch). See Type Annotations |
# Set defaults for a development session
export DORA_COORDINATOR_ADDR=192.168.1.10
export DORA_LOG_LEVEL=info
export DORA_LOG_FORMAT=compact
架构指南
This section is for developers who want to understand the framework internals, extend it, or debug issues.
通信栈
┌─────────────────────────────────────┐
│ CLI (dora) │
│ WebSocket (JSON request/reply) │
└─────────────┬───────────────────────┘
│
┌─────────────▼───────────────────────┐
│ Coordinator │
│ WebSocket control + daemon mgmt │
│ State: InMemoryStore | RedbStore │
└──┬──────────────────────────────┬───┘
│ │
┌────────────▼──────────┐ ┌─────────────▼──────────┐
│ Daemon A │ │ Daemon B │
│ (machine: robot) │ │ (machine: gpu-server) │
│ │ │ │
│ ┌─────┐ ┌─────┐ │ │ ┌──────┐ ┌───────┐ │
│ │Node1│ │Node2│ │ │ │Node3 │ │Node4 │ │
│ └──┬──┘ └──┬──┘ │ │ └──┬───┘ └───┬───┘ │
│ │shmem │shmem │ │ │shmem │shmem │
│ └────┬────┘ │ │ └─────┬─────┘ │
└──────────┼────────────┘ └───────────┼────────────┘
│ │
└──────── Zenoh pub/sub ────────┘
(cross-machine)
协议层
| 层级 | Transport | Format | Use |
|---|---|---|---|
| CLI <-> Coordinator | WebSocket | JSON (ControlRequest/Reply) | Commands, log streaming |
| Coordinator <-> Daemon | WebSocket | JSON (DaemonCoordinatorEvent) | Node lifecycle, metrics |
| Daemon <-> Node (small) | TCP / Unix socket | Custom binary | Control messages, small data |
| Daemon <-> Node (large) | Shared memory | Zero-copy Arrow | Data messages > 4KB |
| Daemon <-> Daemon | Zenoh pub/sub | Arrow + metadata | Cross-machine data routing |
协调器内部机制
The coordinator is an event-driven async server:
Event Sources:
- CLI WebSocket connections (ControlRequest)
- Daemon WebSocket connections (DaemonEvent)
- Heartbeat timer (3s interval)
- External events (for embedding)
Event Loop:
merge_all(cli_events, daemon_events, heartbeat, external)
-> handle_event()
-> update state
-> persist to store (if redb)
-> send replies
Key types:
#![allow(unused)]
fn main() {
// State
RunningDataflow { uuid, name, descriptor, daemons, node_metrics, ... }
RunningBuild { build_id, errors, log_subscribers, pending_results, ... }
DaemonConnection { sender, pending_replies, last_heartbeat }
// Store trait
trait CoordinatorStore: Send + Sync {
fn put_dataflow(&self, record: &DataflowRecord) -> Result<()>;
fn get_dataflow(&self, uuid: &Uuid) -> Result<Option<DataflowRecord>>;
fn list_dataflows(&self) -> Result<Vec<DataflowRecord>>;
// ... daemon and build methods
}
}
Store backends:
memory(default): In-memory, lost on restart.redb: Persistent to disk (~/.dora/coordinator.redb). Survives crashes. Requiresredb-backendfeature.
dora coordinator --store redb
dora coordinator --store redb:/custom/path.redb
守护进程内部机制
The daemon manages node processes on a single machine:
Per Node:
1. Build (if build command specified)
2. Spawn process with DORA_NODE_CONFIG env var
3. Node registers via TCP/shmem handshake
4. Route inputs/outputs between nodes
5. Collect metrics (CPU, memory, I/O)
6. Handle restart policy on exit
7. Forward logs to coordinator
Communication:
- Shared memory for messages > 4KB (zero-copy)
- TCP for control messages and small data
- flume channels for internal event routing
Metrics collection:
#![allow(unused)]
fn main() {
struct NodeMetrics {
pid: u32,
cpu_usage: f32, // per-core percentage
memory_mb: f64,
disk_read_mb_s: Option<f64>,
disk_write_mb_s: Option<f64>,
status: NodeStatus, // Running | Restarting | Degraded | Failed
restart_count: u32,
pending_messages: u64,
}
}
消息类型
All inter-component messages are defined in libraries/message/:
#![allow(unused)]
fn main() {
// Node identification
struct NodeId(String); // [a-zA-Z0-9_.-]
struct DataId(String); // same validation
type DataflowId = uuid::Uuid;
// Data metadata
struct Metadata {
timestamp: uhlc::Timestamp, // hybrid logical clock
type_info: ArrowTypeInfo, // Arrow schema
parameters: MetadataParameters, // custom key-value pairs
}
// Node events (daemon -> node)
enum NodeEvent {
Stop,
Reload { operator_id },
Input { id, metadata, data },
InputClosed { id },
InputRecovered { id },
NodeRestarted { id },
AllInputsClosed,
}
}
时间戳
Dora uses a Unified Hybrid Logical Clock (UHLC) for distributed causality. Every message carries a uhlc::Timestamp that preserves causal ordering across machines without synchronized clocks.
零拷贝共享内存
For large messages (> 4KB), the daemon uses shared memory regions:
- Sender node requests a shared memory slot from daemon
- Daemon allocates a region and returns the ID
- Sender writes Arrow data directly into shared memory
- Daemon notifies receiver node of the region ID
- Receiver reads directly from shared memory (zero-copy)
- Receiver sends a drop token when done
This achieves 10-17x lower latency than ROS2 for large payloads.
编写节点
Rust 节点
use dora_node_api::{DoraNode, Event, IntoArrow};
use dora_core::config::DataId;
fn main() -> eyre::Result<()> {
let (mut node, mut events) = DoraNode::init_from_env()?;
let output = DataId::from("result".to_owned());
while let Some(event) = events.recv() {
match event {
Event::Input { id, metadata, data } => {
// Process input data (Arrow array)
let result: u64 = 42;
node.send_output(
output.clone(),
metadata.parameters,
result.into_arrow(),
)?;
}
Event::Stop(_) => break,
Event::InputClosed { id } => {
eprintln!("input {id} closed");
}
Event::InputRecovered { id } => {
eprintln!("input {id} recovered");
}
_ => {}
}
}
Ok(())
}
Cargo.toml:
[dependencies]
dora-node-api = { workspace = true }
eyre = "0.6"
Python 节点
import pyarrow as pa
from dora import Node
node = Node()
for event in node:
if event["type"] == "INPUT":
# event["value"] is a PyArrow array
values = event["value"].to_pylist()
result = pa.array([sum(values)])
node.send_output("result", result)
elif event["type"] == "STOP":
break
C 节点
#include "node_api.h"
int main() {
void *ctx = init_dora_context_from_env();
// ... event loop using dora_next_event / dora_send_output
free_dora_context(ctx);
return 0;
}
节点日志
Nodes can emit structured logs:
Rust:
#![allow(unused)]
fn main() {
// Via tracing (recommended)
tracing::info!("processing frame {}", frame_id);
// Via node API
node.log_info("processing complete");
node.log_with_fields("info", "reading", None, Some(&fields));
}
Python:
import logging
logging.info("processing frame %d", frame_id)
# Or via node API
node.log("info", "processing complete")
编写算子
Operators run in-process inside a shared runtime, avoiding process spawn overhead.
Rust 算子
#![allow(unused)]
fn main() {
use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event};
#[register_operator]
#[derive(Default)]
pub struct MyOperator {
counter: u32,
}
impl DoraOperator for MyOperator {
fn on_event(
&mut self,
event: &Event,
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, String> {
match event {
Event::Input { id, data } => {
self.counter += 1;
output_sender.send(
"count".to_string(),
arrow::array::UInt32Array::from(vec![self.counter]),
)?;
Ok(DoraStatus::Continue)
}
Event::Stop => Ok(DoraStatus::Stop),
_ => Ok(DoraStatus::Continue),
}
}
}
}
Cargo.toml:
[lib]
crate-type = ["cdylib"]
[dependencies]
dora-operator-api = { workspace = true }
arrow = "53"
Python 算子
nodes:
- id: my-node
operator:
python: my_operator.py
inputs:
data: source/output
outputs:
- result
# my_operator.py
class Operator:
def __init__(self):
self.counter = 0
def on_event(self, event, send_output):
if event["type"] == "INPUT":
self.counter += 1
send_output("result", pa.array([self.counter]))
分布式部署
设置
# Machine A (coordinator + daemon)
dora up
# Machine B (daemon only, pointing to coordinator on Machine A)
dora daemon --interface 0.0.0.0 --coordinator-addr 192.168.1.10 --machine-id B
# Machine C (same)
dora daemon --interface 0.0.0.0 --coordinator-addr 192.168.1.10 --machine-id C
带机器分配的数据流
nodes:
- id: camera
_unstable_deploy:
machine: robot
path: ./camera-driver
outputs:
- frames
- id: inference
_unstable_deploy:
machine: gpu-server
path: ./ml-model
inputs:
frames: camera/frames
outputs:
- predictions
- id: actuator
_unstable_deploy:
machine: robot
path: ./actuator-driver
inputs:
commands: inference/predictions
构建和启动
# From any machine with coordinator access
dora build dataflow.yml # distributed build on target machines
dora start dataflow.yml --name my-robot --attach
监控
# Resource usage across all machines
dora top
# Logs from any node regardless of machine
dora logs my-robot inference --follow
# List all dataflows
dora list
协调器持久化
For production, use the redb store backend so the coordinator survives restarts:
dora coordinator --store redb
State is persisted to ~/.dora/coordinator.redb. On restart, stale dataflows are marked as failed and the coordinator resumes normal operation.
For managed cluster deployments (cluster.yml, SSH-based lifecycle, label scheduling, systemd services, rolling upgrades), see the Distributed Deployment Guide.
故障排除
For a comprehensive debugging guide covering record/replay workflows, topic inspection, resource monitoring, and end-to-end debugging scenarios, see Debugging and Observability Guide.
常见问题
“Could not connect to dora-coordinator”
- Run
dora upfirst, or checkDORA_COORDINATOR_ADDR/DORA_COORDINATOR_PORT - Verify with
dora status
“publish_all_messages_to_zenoh not enabled”
-
Use
--debugflag:dora start dataflow.yml --debugordora run dataflow.yml --debug -
Or add to your dataflow YAML:
_unstable_debug: publish_all_messages_to_zenoh: true -
Required for
topic echo,topic hz,topic info
“dora top requires an interactive terminal”
- These TUI commands need a real terminal (not piped output)
- Same applies to
topic hz
Node not receiving inputs
- Check that output names match:
source_node/output_id - Verify the source node lists the output in its
outputs:array - Check
dora topic listfor available topics
Logs not appearing
- Check
--log-levelsetting (defaultstdoutshows everything) - Check
min_log_levelin YAML (filters at source) - For distributed: verify coordinator/daemon connectivity
Build fails with git source
- Verify
git:URL is accessible - Check that
branch,tag, orrevexists - Build command runs from the git repo root, not the dataflow directory
调试工作流
# 1. Full environment diagnosis
dora doctor --dataflow dataflow.yml
# 2. Start with verbose logging and debug topics
dora run dataflow.yml --log-level trace --debug
# 3. Inspect a specific node
dora node info -d my-dataflow problem-node
# 4. Monitor specific node logs
dora logs my-dataflow problem-node --follow --level debug
# 5. Check resource usage
dora top
# 6. Inspect topic data
dora topic echo -d my-dataflow problem-node/output
# 7. Publish test data to a topic
dora topic pub -d my-dataflow problem-node/input '[1, 2, 3]'
# 8. Measure frequencies
dora topic hz -d my-dataflow --window 5
# 9. View/modify runtime parameters
dora param list -d my-dataflow problem-node
dora param set -d my-dataflow problem-node threshold 42
# 10. Restart a misbehaving node without stopping the dataflow
dora node restart -d my-dataflow problem-node
# 11. View coordinator traces (no external infra needed)
dora trace list
dora trace view <trace-id-prefix>
# 12. Visualize dataflow graph
dora graph dataflow.yml --open
日志文件位置
out/
<dataflow-uuid>/
log_<node-id>.jsonl # current log
log_<node-id>.1.jsonl # rotated (previous)
log_<node-id>.2.jsonl # rotated (older)
Read directly with:
dora logs --local --all-nodes
dora logs --local <node-name> --tail 50