Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

通信模式

Adora is a dataflow framework based on pub/sub message passing. On top of basic topics, the framework supports service (request/reply), action (goal/feedback/result), and streaming (session/segment/chunk) patterns using well-known metadata keys. No changes to the daemon, coordinator, or YAML syntax are required – the patterns are implemented as conventions at the node API level.

1. 主题(发布/订阅)

默认模式。一个节点在输出上发布数据,订阅该输出的任何节点都会接收到它。

nodes:
  - id: 发布者
    outputs:
      - data
  - id: 订阅者
    inputs:
      data: publisher/data

适用场景:流式传感器数据、周期性状态、即发即忘事件。

2. 服务(请求/应答)

客户端发送请求并期望收到恰好一个响应,通过 request_id 元数据键进行关联。

约定的元数据键

KeyConstant描述
request_idadora_node_api::REQUEST_ID用于关联请求和响应的 UUID v7

YAML

nodes:
  - id: 客户端
    inputs:
      tick: adora/timer/millis/500
      response: server/response
    outputs:
      - request

  - id: 服务端
    inputs:
      request: client/request
    outputs:
      - response

节点 API 辅助函数

#![allow(unused)]
fn main() {
// Client: send request with auto-generated request_id
let rid = node.send_service_request("request".into(), params, data)?;

// Server: pass through metadata.parameters (includes request_id)
node.send_service_response("response".into(), metadata.parameters, result)?;
}

服务端必须将传入请求的元数据参数中的 request_id 透传到响应中。客户端使用该键将响应与请求进行匹配。

示例examples/service-example/

3. 动作(目标/反馈/结果)

客户端发送一个目标并接收周期性的反馈以及最终结果。Action 支持取消操作。

约定的元数据键

KeyConstant描述
goal_idadora_node_api::GOAL_ID用于标识目标的 UUID v7
goal_statusadora_node_api::GOAL_STATUS目标的最终状态

目标状态值:

ConstantMeaning
succeededGOAL_STATUS_SUCCEEDED目标成功完成
abortedGOAL_STATUS_ABORTED目标被服务端中止
canceledGOAL_STATUS_CANCELED目标被客户端取消

YAML

nodes:
  - id: 客户端
    inputs:
      tick: adora/timer/millis/2000
      feedback: server/feedback
      result: server/result
    outputs:
      - goal
      - cancel

  - id: 服务端
    inputs:
      goal: client/goal
      cancel: client/cancel
    outputs:
      - feedback
      - result

取消模式

客户端在 cancel 输出上发送带有 goal_id 元数据的消息。服务端在处理步骤之间检查取消请求,并发送 goal_status = "canceled" 的结果。

示例examples/action-example/

4. Streaming (session/segment/chunk)

For real-time pipelines (voice, video, sensor streams) where a user can interrupt mid-stream and queued data must be discarded.

约定的元数据键

Key类型Constant描述
session_idStringSESSION_IDIdentifies the conversation/session
segment_idIntegerSEGMENT_IDLogical unit within a session (e.g. one utterance)
seqIntegerSEQChunk sequence number within a segment
finBoolFINtrue on the last chunk of a segment
flushBoolFLUSHtrue to discard older queued messages on this input

YAML

nodes:
  - id: asr
    inputs:
      mic: mic-source/audio
    outputs:
      - text

  - id: llm
    inputs:
      text: asr/text
    outputs:
      - tokens

  - id: tts
    inputs:
      tokens: llm/tokens
    outputs:
      - audio

节点 API

#![allow(unused)]
fn main() {
use adora_node_api::{StreamSegment, AdoraNode};

let mut seg = StreamSegment::new();

// Send chunks with auto-incrementing seq (e.g. inside an ASR node)
node.send_stream_chunk("text".into(), &mut seg, false, chunk_data)?;
// Mark final chunk of a segment
node.send_stream_chunk("text".into(), &mut seg, true, last_chunk)?;

// On user interruption: flush downstream queues and start a new segment.
// The prior segment ends without a fin=true signal -- old data is discarded.
let flush_params = seg.flush();
node.send_output("text".into(), flush_params, empty_data)?;
}

Queue flush behavior

When a message arrives with flush: true in its metadata, the receiver’s input queue is cleared of all older messages before the flush message is delivered. This enables instant interruption in voice pipelines – when the user speaks over TTS output, the ASR node sends a new segment with flush: true, and the TTS node immediately discards any queued audio chunks from the previous response.

Note: flush discards all queued messages on the input regardless of session_id. Do not multiplex independent sessions on a single input when using flush.

Python

# Streaming metadata is a plain dict
params = {
    "session_id": session_id,
    "segment_id": 1,
    "seq": 0,
    "fin": False,
    "flush": True,  # flush older queued messages
}
node.send_output("text", data, metadata={"parameters": params})

5. Choosing a pattern

需要响应?Long-running?Cancelable?Real-time stream?模式
--Topic
Service
OptionalAction
Via flushStreaming

6. Important details

  • goal_status 匹配区分大小写。 请始终使用精确的小写值:"succeeded""aborted""canceled"。ROS2 桥接对无法识别的值默认使用 Aborted

7. Python compatibility

Python 节点使用相同的元数据约定。参数是键为字符串的普通字典:

import uuid

# Service client (uuid7 for time-ordered IDs, matching Rust API)
params = {"request_id": str(uuid.uuid7())}
node.send_output("request", data, metadata={"parameters": params})

# Service server -- pass through parameters
node.send_output("response", result, metadata=event["metadata"])

注意uuid.uuid7() 需要 Python 3.13+。在旧版本中,请使用 uuid_utils 包或 uuid.uuid4()(随机 v4 也可用于关联,但会丢失时间排序特性)。