通信模式
Adora is a dataflow framework based on pub/sub message passing. On top of basic topics, the framework supports service (request/reply), action (goal/feedback/result), and streaming (session/segment/chunk) patterns using well-known metadata keys. No changes to the daemon, coordinator, or YAML syntax are required – the patterns are implemented as conventions at the node API level.
1. 主题(发布/订阅)
默认模式。一个节点在输出上发布数据,订阅该输出的任何节点都会接收到它。
nodes:
- id: 发布者
outputs:
- data
- id: 订阅者
inputs:
data: publisher/data
适用场景:流式传感器数据、周期性状态、即发即忘事件。
2. 服务(请求/应答)
客户端发送请求并期望收到恰好一个响应,通过 request_id 元数据键进行关联。
约定的元数据键
| Key | Constant | 描述 |
|---|---|---|
request_id | adora_node_api::REQUEST_ID | 用于关联请求和响应的 UUID v7 |
YAML
nodes:
- id: 客户端
inputs:
tick: adora/timer/millis/500
response: server/response
outputs:
- request
- id: 服务端
inputs:
request: client/request
outputs:
- response
节点 API 辅助函数
#![allow(unused)]
fn main() {
// Client: send request with auto-generated request_id
let rid = node.send_service_request("request".into(), params, data)?;
// Server: pass through metadata.parameters (includes request_id)
node.send_service_response("response".into(), metadata.parameters, result)?;
}
服务端必须将传入请求的元数据参数中的 request_id 透传到响应中。客户端使用该键将响应与请求进行匹配。
示例:examples/service-example/
3. 动作(目标/反馈/结果)
客户端发送一个目标并接收周期性的反馈以及最终结果。Action 支持取消操作。
约定的元数据键
| Key | Constant | 描述 |
|---|---|---|
goal_id | adora_node_api::GOAL_ID | 用于标识目标的 UUID v7 |
goal_status | adora_node_api::GOAL_STATUS | 目标的最终状态 |
目标状态值:
| 值 | Constant | Meaning |
|---|---|---|
succeeded | GOAL_STATUS_SUCCEEDED | 目标成功完成 |
aborted | GOAL_STATUS_ABORTED | 目标被服务端中止 |
canceled | GOAL_STATUS_CANCELED | 目标被客户端取消 |
YAML
nodes:
- id: 客户端
inputs:
tick: adora/timer/millis/2000
feedback: server/feedback
result: server/result
outputs:
- goal
- cancel
- id: 服务端
inputs:
goal: client/goal
cancel: client/cancel
outputs:
- feedback
- result
取消模式
客户端在 cancel 输出上发送带有 goal_id 元数据的消息。服务端在处理步骤之间检查取消请求,并发送 goal_status = "canceled" 的结果。
示例:examples/action-example/
4. Streaming (session/segment/chunk)
For real-time pipelines (voice, video, sensor streams) where a user can interrupt mid-stream and queued data must be discarded.
约定的元数据键
| Key | 类型 | Constant | 描述 |
|---|---|---|---|
session_id | String | SESSION_ID | Identifies the conversation/session |
segment_id | Integer | SEGMENT_ID | Logical unit within a session (e.g. one utterance) |
seq | Integer | SEQ | Chunk sequence number within a segment |
fin | Bool | FIN | true on the last chunk of a segment |
flush | Bool | FLUSH | true to discard older queued messages on this input |
YAML
nodes:
- id: asr
inputs:
mic: mic-source/audio
outputs:
- text
- id: llm
inputs:
text: asr/text
outputs:
- tokens
- id: tts
inputs:
tokens: llm/tokens
outputs:
- audio
节点 API
#![allow(unused)]
fn main() {
use adora_node_api::{StreamSegment, AdoraNode};
let mut seg = StreamSegment::new();
// Send chunks with auto-incrementing seq (e.g. inside an ASR node)
node.send_stream_chunk("text".into(), &mut seg, false, chunk_data)?;
// Mark final chunk of a segment
node.send_stream_chunk("text".into(), &mut seg, true, last_chunk)?;
// On user interruption: flush downstream queues and start a new segment.
// The prior segment ends without a fin=true signal -- old data is discarded.
let flush_params = seg.flush();
node.send_output("text".into(), flush_params, empty_data)?;
}
Queue flush behavior
When a message arrives with flush: true in its metadata, the receiver’s input queue is cleared of all older messages before the flush message is delivered. This enables instant interruption in voice pipelines – when the user speaks over TTS output, the ASR node sends a new segment with flush: true, and the TTS node immediately discards any queued audio chunks from the previous response.
Note: flush discards all queued messages on the input regardless of session_id. Do not multiplex independent sessions on a single input when using flush.
Python
# Streaming metadata is a plain dict
params = {
"session_id": session_id,
"segment_id": 1,
"seq": 0,
"fin": False,
"flush": True, # flush older queued messages
}
node.send_output("text", data, metadata={"parameters": params})
5. Choosing a pattern
| 需要响应? | Long-running? | Cancelable? | Real-time stream? | 模式 |
|---|---|---|---|---|
| 否 | - | - | 否 | Topic |
| 是 | 否 | 否 | 否 | Service |
| 是 | 是 | Optional | 否 | Action |
| 否 | 是 | Via flush | 是 | Streaming |
6. Important details
goal_status匹配区分大小写。 请始终使用精确的小写值:"succeeded"、"aborted"、"canceled"。ROS2 桥接对无法识别的值默认使用Aborted。
7. Python compatibility
Python 节点使用相同的元数据约定。参数是键为字符串的普通字典:
import uuid
# Service client (uuid7 for time-ordered IDs, matching Rust API)
params = {"request_id": str(uuid.uuid7())}
node.send_output("request", data, metadata={"parameters": params})
# Service server -- pass through parameters
node.send_output("response", result, metadata=event["metadata"])
注意:
uuid.uuid7()需要 Python 3.13+。在旧版本中,请使用uuid_utils包或uuid.uuid4()(随机 v4 也可用于关联,但会丢失时间排序特性)。