Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Dataflow YAML Specification

Dataflows are defined in YAML files. Each file describes a graph of nodes, their inputs/outputs, and execution parameters.

A JSON Schema is available at the repo root (adora-schema.json) for editor autocompletion and validation.

快速开始

nodes:
  - id: sender
    path: sender.py
    outputs:
      - message

  - id: receiver
    path: receiver.py
    inputs:
      message: sender/message

Run with adora run dataflow.yml (local mode) or adora up && adora start dataflow.yml (networked mode).

Editor Setup

Add a schema comment at the top of your YAML file for VS Code autocompletion (requires the YAML extension):

# yaml-language-server: $schema=https://raw.githubusercontent.com/dora-rs/adora/main/adora-schema.json
nodes:
  - id: my-node
    # ... autocompletion works here

Root-Level Fields

Field类型默认描述
nodeslistrequiredList of node configurations
strict_typesboolfalseTreat type warnings as errors in validate and build
type_ruleslist[]User-defined type compatibility rules (see Type Annotations)
health_check_intervalfloat5.0Seconds between daemon health check sweeps. For each node with health_check_timeout set, the daemon checks whether the node has communicated within its timeout; if not, the node is killed and its restart_policy is evaluated
_unstable_deployobject--Root-level deployment config (see Deployment)
_unstable_debugobject--Debug options (see Debug)

Node Configuration

Every node requires an id. All other fields are optional (though most nodes need at least path or operator/operators).

Identity

Field类型描述
idstringRequired. Unique identifier. Must not contain /. Whitespace is discouraged
namestringHuman-readable display name (metadata only, used in tooling and logs)
descriptionstringDocumentation string (metadata only, not used at runtime)

Source

A node’s executable comes from a local path, a git repository, a module reference, or is implicit (operator/ROS2 nodes).

Field类型描述
pathstringPath to executable or script. Can also be a URL (legacy)
modulestringPath to a module definition file (mutually exclusive with path). See Modules Guide
gitstringGit repo URL. adora build clones it and uses the clone dir as working directory
branchstringBranch to checkout (requires git, mutually exclusive with tag/rev)
tagstringTag to checkout (requires git, mutually exclusive with branch/rev)
revstringCommit hash to checkout (requires git, mutually exclusive with branch/tag)
buildstringBuild commands run during adora build. Each line runs separately. pip/pip3 lines use uv when --uv is passed
argsstringCommand-line arguments (space-separated)

Example with git source:

- id: rust-node
  git: https://github.com/dora-rs/adora.git
  branch: main
  build: cargo build -p example-node --release
  path: target/release/example-node

Data I/O

Inputs

Inputs subscribe to another node’s output using the format <node-id>/<output-id>:

inputs:
  # Short form
  image: camera/frames
  tick: adora/timer/millis/100

  # Long form with options
  sensor_data:
    source: sensor/frames
    queue_size: 10
    queue_policy: drop_oldest
    input_timeout: 5.0

  # Lossless input (blocks sender when full)
  commands:
    source: controller/cmd
    queue_size: 100
    queue_policy: backpressure
Input option类型默认描述
sourcestringrequired<node-id>/<output-id> or timer path
queue_sizeinteger10Input buffer size
queue_policystringdrop_oldestdrop_oldest: drops oldest message when full. backpressure: buffers up to 10x queue_size without dropping (drops with ERROR log at hard cap)
input_timeoutfloat--Circuit breaker timeout in seconds. If no message arrives within this period, the daemon closes the input and the node receives an InputClosed event for graceful degradation

Built-in Timers

定时器是以固定间隔发出 tick 的虚拟节点:

inputs:
  tick: adora/timer/millis/100   # 每 100ms
  slow: adora/timer/millis/1000  # 每 1s
  fast: adora/timer/hz/30        # 30 Hz(约 33ms)

Built-in Log Aggregation

Subscribe to structured log messages from all (or filtered) nodes:

inputs:
  all_logs: adora/logs               # all nodes, all levels
  errors:   adora/logs/error         # error+ from all nodes
  sensor:   adora/logs/info/sensor   # info+ from specific node

Each message arrives as a JSON-encoded LogMessage string. See Logging for details.

Outputs

A list of output identifiers the node produces:

outputs:
  - processed_image
  - metadata

运维

Optional type annotations for inputs and outputs. Types are never required – unannotated ports remain fully dynamic.

- id: camera
  path: camera.py
  outputs:
    - image
    - depth
  output_types:
    image: std/media/v1/Image
    depth: std/media/v1/Image

- id: detector
  path: detect.py
  inputs:
    image: camera/image
  input_types:
    image: std/media/v1/Image
  outputs:
    - bbox
  output_types:
    bbox: std/vision/v1/BoundingBox
Field类型默认描述
output_typesobject{}Maps output IDs to type URNs. Keys must match entries in outputs
input_typesobject{}Maps input IDs to expected type URNs. Keys must match entries in inputs
output_metadataobject{}Maps output IDs to lists of required metadata keys
patternstring--Communication pattern shorthand: service-server, service-client, action-server, action-client

Type URNs use the format std/<category>/v<version>/<TypeName> and support parameters (e.g. std/media/v1/AudioFrame[sample_type=f32]). See the Type Annotations Guide for the full standard type library, parameterized types, compatibility rules, and user-defined types.

Run adora validate <file> to check type annotations statically. For runtime checking, set ADORA_RUNTIME_TYPE_CHECK=warn or error:

adora validate dataflow.yml
ADORA_RUNTIME_TYPE_CHECK=warn adora run dataflow.yml

Types also appear on adora graph edge labels when annotated.

Module Parameters

When using module:, pass configuration values via params::

- id: fast_pipeline
  module: modules/transform.module.yml
  inputs:
    data: sender/value
  params:
    speed: "2.0"
    mode: turbo

Inside the module, params are available as $PARAM_<UPPERCASE_KEY> in args: and as environment variables. See the Modules Guide for full documentation.

Environment

env:
  MY_VAR: "value"          # string
  DEBUG: true               # boolean
  PORT: 8080                # integer
  RATE: 1.5                 # float
  FROM_HOST:
    __adora_env: HOST_VAR   # read from host environment at runtime

Environment variables apply to both build commands and node execution. Values support $VAR expansion syntax.

日志

Field类型默认描述
send_stdout_asstring--Route raw stdout/stderr lines as a data output. Each line is sent as a separate Arrow message
send_logs_asstring--Route structured log entries as a data output. Each entry is a JSON string with fields: timestamp, level, node_id, message, target, fields
min_log_levelstring--Suppress logs below this level from file output, coordinator forwarding, and send_logs_as. Levels from most to least verbose: stdout (all output including raw stdout), trace, debug, info, warn, error
max_log_sizestring--Rotate log file at this size (e.g. "50MB", "1GB")
max_rotated_filesinteger5Number of rotated log files to keep

Example:

- id: sensor
  path: ./sensor
  min_log_level: info
  send_stdout_as: raw_output
  send_logs_as: log_entries
  max_log_size: "100MB"
  max_rotated_files: 3
  outputs:
    - data
    - raw_output
    - log_entries

When using send_stdout_as or send_logs_as, include the output name in the outputs list so downstream nodes can subscribe to it.

For a complete guide to all logging features, see Logging.

容错

Field类型默认描述
restart_policystringnevernever, on-failure, or always
max_restartsinteger0Max restart attempts. 0 = unlimited
restart_delayfloat--Initial backoff in seconds. Doubles each attempt
max_restart_delayfloat--Cap for exponential backoff
restart_windowfloat--Time window for counting restarts. The counter resets after this many seconds since the first restart in the current window. Enables “N restarts per M seconds” semantics with max_restarts
health_check_timeoutfloat--If the node does not communicate with the daemon (send outputs, subscribe, etc.) for this many seconds, the daemon kills the process and evaluates the restart_policy

Restart policies:

  • never (default): no automatic restart
  • on-failure: restart only on non-zero exit code
  • always: restart on any exit, except when stopped by user or all inputs closed with success

Example with exponential backoff:

- id: sensor
  path: ./sensor
  restart_policy: on-failure
  max_restarts: 5
  restart_delay: 1.0         # 1s, 2s, 4s, 8s, 16s
  max_restart_delay: 30.0    # capped at 30s
  restart_window: 300.0      # 5 restarts per 5 minutes
  health_check_timeout: 30.0

Deployment

使用 _unstable_deploy 将节点分配到特定机器:

- id: camera-driver
  _unstable_deploy:
    machine: robot-arm
  path: ./target/debug/camera
  outputs:
    - frames

- id: ml-inference
  _unstable_deploy:
    machine: gpu-server
    labels:
      gpu: "true"
    distribute: scp
  path: ./target/debug/inference
  inputs:
    frames: camera-driver/frames
Deploy field类型默认描述
machinestring--Target machine/daemon ID. The coordinator routes the node to the daemon registered with this ID
working_dirstring--Working directory on the target machine
labelsobject--Key-value labels for scheduling. The coordinator matches these against labels reported by each daemon at registration
distributestringlocalHow built binaries reach the target daemon: local – each daemon builds from source independently; scp – CLI pushes the built binary via SSH/SCP before spawn; http – daemon pulls the binary from the coordinator’s HTTP artifact store

当节点位于不同机器时,通信自动从共享内存切换到 Zenoh 发布/订阅。

算子节点

Operators run in-process inside a shared runtime (no separate process). Use operator for a single operator or operators for multiple.

Single Operator

The id field is optional for single operators (defaults to the node id):

- id: detector
  operator:
    python: detect.py
    build: pip install -r requirements.txt
    inputs:
      image: camera/frames
    outputs:
      - bbox

Multiple Operators

Each operator in operators requires a unique id:

- id: runtime-node
  operators:
    - id: preprocessor
      shared-library: ../../target/debug/libpreprocess
      inputs:
        raw: sensor/data
      outputs:
        - processed
    - id: analyzer
      shared-library: ../../target/debug/libanalyze
      inputs:
        data: runtime-node/preprocessor/processed
      outputs:
        - result

Operator Source Types

Field描述
pythonPython script path, or {source: "script.py", conda_env: "myenv"}
shared-libraryPath to a shared library (.so/.dylib/.dll)

Operators also support inputs, outputs, build, send_stdout_as, send_logs_as, min_log_level, max_log_size, and max_rotated_files with the same semantics as node-level fields.

ROS2 桥接

Declare a node as a ROS2 bridge to automatically convert between ROS2 DDS messages and Adora’s Arrow format. No custom code needed.

Single Topic

- id: camera_bridge
  ros2:
    topic: /camera/image_raw
    message_type: sensor_msgs/Image
    direction: subscribe
  outputs:
    - image

Multiple Topics

- id: robot_bridge
  ros2:
    topics:
      - topic: /camera/image_raw
        message_type: sensor_msgs/Image
        direction: subscribe
        output: image
      - topic: /cmd_vel
        message_type: geometry_msgs/Twist
        direction: publish
        input: velocity
    qos:
      reliable: true
  inputs:
    velocity: planner/cmd_vel
  outputs:
    - image

服务桥接

- id: add_service
  ros2:
    service: /add_two_ints
    service_type: example_interfaces/AddTwoInts
    role: 服务端
  inputs:
    request: client_node/request
  outputs:
    - response

动作桥接

- id: nav_action
  ros2:
    action: /navigate
    action_type: nav2_msgs/NavigateToPose
    role: 客户端
  inputs:
    goal: planner/goal
  outputs:
    - feedback
    - result

QoS Configuration

QoS can be set at the bridge level (applies to all topics) or per-topic:

QoS field类型默认描述
reliableboolfalseReliable vs best-effort transport
durabilitystringvolatilevolatile or transient_local
livelinessstringautomaticautomatic, manual_by_participant, manual_by_topic
lease_durationfloatinfinityLease duration in seconds
max_blocking_timefloat--Max blocking time for reliable transport
keep_lastinteger1History depth (KeepLast policy)
keep_allboolfalseUse KeepAll history instead of KeepLast

Other ROS2 Fields

Field类型默认描述
namespacestring/ROS2 namespace
node_namestringnode idROS2 node name

Debug

_unstable_debug:
  publish_all_messages_to_zenoh: true

Required for adora topic echo, adora topic hz, and adora topic info commands.

通信模式

Adora supports four communication patterns built on top of the dataflow:

  • Topic (default): pub/sub dataflow
  • Service: request/reply via request_id metadata
  • Action: goal/feedback/result via goal_id/goal_status metadata, with cancellation support
  • Streaming: session/segment/chunk via session_id/segment_id/seq/fin/flush metadata, with queue flush for interruption

See Communication Patterns for details and examples.

Full Example

health_check_interval: 10.0

_unstable_debug:
  publish_all_messages_to_zenoh: true

nodes:
  - id: webcam
    operator:
      python: webcam.py
      inputs:
        tick: adora/timer/millis/100
      outputs:
        - image

  - id: detector
    operator:
      python: detect.py
      build: pip install ultralytics
      inputs:
        image: webcam/image
      outputs:
        - bbox

  - id: plotter
    operator:
      python: plot.py
      inputs:
        image: webcam/image
        bbox: detector/bbox

  - id: logger
    path: ./logger
    inputs:
      bbox: detector/bbox
    send_stdout_as: logs
    min_log_level: info
    restart_policy: on-failure
    max_restarts: 3
    outputs:
      - logs