Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Communication Patterns

Adora is a dataflow framework based on pub/sub message passing. On top of basic topics, the framework supports service (request/reply), action (goal/feedback/result), and streaming (session/segment/chunk) patterns using well-known metadata keys. No changes to the daemon, coordinator, or YAML syntax are required – the patterns are implemented as conventions at the node API level.

1. Topic (pub/sub)

The default pattern. A node publishes data on an output, and any node that subscribes to that output receives it.

nodes:
  - id: publisher
    outputs:
      - data
  - id: subscriber
    inputs:
      data: publisher/data

Use when: streaming sensor data, periodic status, fire-and-forget events.

2. Service (request/reply)

A client sends a request and expects exactly one response, correlated by a request_id metadata key.

Well-known metadata keys

KeyConstantDescription
request_idadora_node_api::REQUEST_IDUUID v7 correlating request and response

YAML

nodes:
  - id: client
    inputs:
      tick: adora/timer/millis/500
      response: server/response
    outputs:
      - request

  - id: server
    inputs:
      request: client/request
    outputs:
      - response

Node API helpers

#![allow(unused)]
fn main() {
// Client: send request with auto-generated request_id
let rid = node.send_service_request("request".into(), params, data)?;

// Server: pass through metadata.parameters (includes request_id)
node.send_service_response("response".into(), metadata.parameters, result)?;
}

The server MUST pass through the request_id from the incoming request’s metadata parameters into the response. The client matches responses to requests using this key.

Example: examples/service-example/

3. Action (goal/feedback/result)

A client sends a goal and receives periodic feedback plus a final result. Actions support cancellation.

Well-known metadata keys

KeyConstantDescription
goal_idadora_node_api::GOAL_IDUUID v7 identifying the goal
goal_statusadora_node_api::GOAL_STATUSFinal status of the goal

Goal status values:

ValueConstantMeaning
succeededGOAL_STATUS_SUCCEEDEDGoal completed successfully
abortedGOAL_STATUS_ABORTEDGoal aborted by server
canceledGOAL_STATUS_CANCELEDGoal canceled by client

YAML

nodes:
  - id: client
    inputs:
      tick: adora/timer/millis/2000
      feedback: server/feedback
      result: server/result
    outputs:
      - goal
      - cancel

  - id: server
    inputs:
      goal: client/goal
      cancel: client/cancel
    outputs:
      - feedback
      - result

Cancel pattern

The client sends a message on the cancel output with goal_id in the metadata. The server checks for cancel requests between processing steps and sends a result with goal_status = "canceled".

Example: examples/action-example/

4. Streaming (session/segment/chunk)

For real-time pipelines (voice, video, sensor streams) where a user can interrupt mid-stream and queued data must be discarded.

Well-known metadata keys

KeyTypeConstantDescription
session_idStringSESSION_IDIdentifies the conversation/session
segment_idIntegerSEGMENT_IDLogical unit within a session (e.g. one utterance)
seqIntegerSEQChunk sequence number within a segment
finBoolFINtrue on the last chunk of a segment
flushBoolFLUSHtrue to discard older queued messages on this input

YAML

nodes:
  - id: asr
    inputs:
      mic: mic-source/audio
    outputs:
      - text

  - id: llm
    inputs:
      text: asr/text
    outputs:
      - tokens

  - id: tts
    inputs:
      tokens: llm/tokens
    outputs:
      - audio

Node API

#![allow(unused)]
fn main() {
use adora_node_api::{StreamSegment, AdoraNode};

let mut seg = StreamSegment::new();

// Send chunks with auto-incrementing seq (e.g. inside an ASR node)
node.send_stream_chunk("text".into(), &mut seg, false, chunk_data)?;
// Mark final chunk of a segment
node.send_stream_chunk("text".into(), &mut seg, true, last_chunk)?;

// On user interruption: flush downstream queues and start a new segment.
// The prior segment ends without a fin=true signal -- old data is discarded.
let flush_params = seg.flush();
node.send_output("text".into(), flush_params, empty_data)?;
}

Queue flush behavior

When a message arrives with flush: true in its metadata, the receiver’s input queue is cleared of all older messages before the flush message is delivered. This enables instant interruption in voice pipelines – when the user speaks over TTS output, the ASR node sends a new segment with flush: true, and the TTS node immediately discards any queued audio chunks from the previous response.

Note: flush discards all queued messages on the input regardless of session_id. Do not multiplex independent sessions on a single input when using flush.

Python

# Streaming metadata is a plain dict
params = {
    "session_id": session_id,
    "segment_id": 1,
    "seq": 0,
    "fin": False,
    "flush": True,  # flush older queued messages
}
node.send_output("text", data, metadata={"parameters": params})

5. Choosing a pattern

Need a response?Long-running?Cancelable?Real-time stream?Pattern
No--NoTopic
YesNoNoNoService
YesYesOptionalNoAction
NoYesVia flushYesStreaming

6. Important details

  • goal_status matching is case-sensitive. Always use the exact lowercase values: "succeeded", "aborted", "canceled". The ROS2 bridge defaults to Aborted for unrecognised values.

7. Python compatibility

Python nodes use the same metadata conventions. Parameters are plain dicts with string keys:

import uuid

# Service client (uuid7 for time-ordered IDs, matching Rust API)
params = {"request_id": str(uuid.uuid7())}
node.send_output("request", data, metadata={"parameters": params})

# Service server -- pass through parameters
node.send_output("response", result, metadata=event["metadata"])

Note: uuid.uuid7() requires Python 3.13+. On older versions, use the uuid_utils package or uuid.uuid4() (random v4 also works for correlation, but loses time-ordering).