Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

WebSocket 主题数据通道

主题数据通道扩展了 WebSocket 控制面,将实时数据流消息从协调器代理到 CLI 客户端。CLI 命令(如 topic echotopic hztopic info)无需直接访问 Zenoh 网络,而是通过现有的 WebSocket 连接以二进制帧的形式接收消息数据。

动机

场景Before (Zenoh direct)After (WS proxy)
CLI 与守护进程在同一台机器上WorksWorks
CLI 远程访问,Zenoh 可达WorksWorks
CLI 远程访问,无法访问 ZenohFailsWorks
Browser-based web UIImpossiblePossible
嵌入式目标设备,无本地磁盘Cannot record locally--proxy 将数据流式传输到 CLI

关键设计思路:CLI 和未来的 Web UI 通过 WebSocket 连接到协调器。由协调器代为订阅 Zenoh 并以二进制帧转发消息,使主题检查功能在 WebSocket 连接可达的任何地方都能使用。


架构

CLI  ──── WS (binary frames) ────>  Coordinator  ──── Zenoh sub ────>  Daemon
                                    (Zenoh proxy)                      (debug publish)

协调器充当 Zenoh 代理:

  1. CLI 通过现有的文本帧 WS 协议发送 TopicSubscribe 请求
  2. 协调器验证数据流并打开 Zenoh 订阅者
  3. 协调器将每个 Zenoh 采样以二进制 WS 帧转发回 CLI
  4. CLI 按订阅 UUID 将二进制帧分发到相应的消费者

关键源文件

FileRole
libraries/message/src/cli_to_coordinator.rsTopicSubscribeTopicUnsubscribe 请求变体
libraries/message/src/coordinator_to_cli.rsTopicSubscribed reply variant
binaries/coordinator/src/ws_control.rsZenoh 代理:订阅并转发二进制帧
binaries/coordinator/src/control.rsControlEvent::TopicSubscribe for validation
binaries/cli/src/ws_client.rsWsSession::subscribe_topics(),二进制帧分发
binaries/cli/src/command/topic/echo.rs通过 WS 进行主题回显
binaries/cli/src/command/topic/hz.rs通过 WS 进行主题频率测量
binaries/cli/src/command/topic/info.rs通过 WS 获取主题元数据/统计信息
binaries/cli/src/command/record.rs--proxy 标志用于基于 WS 的录制

线路协议

订阅握手(JSON 文本帧)

订阅使用现有的基于 UUID 关联的请求-应答协议:

请求(CLI -> 协调器):

{
  "id": "abc-123",
  "method": "control",
  "params": {
    "TopicSubscribe": {
      "dataflow_id": "550e8400-...",
      "topics": [["camera_node", "image"], ["lidar_node", "points"]]
    }
  }
}

响应(协调器 -> CLI):

{
  "id": "abc-123",
  "result": {
    "TopicSubscribed": {
      "subscription_id": "7f1b3a00-..."
    }
  }
}

取消订阅(CLI -> 协调器):

{
  "id": "def-456",
  "method": "control",
  "params": {
    "TopicUnsubscribe": {
      "subscription_id": "7f1b3a00-..."
    }
  }
}

Binary data frames

握手完成后,协调器推送二进制 WS 帧。每个帧都有一个固定大小的头部:

 0                   16                              N
 ├───────────────────┼──────────────────────────────┤
 │  subscription_id  │  Timestamped<InterDaemonEvent>│
 │  (16 bytes UUID)  │  (bincode serialized)         │
 └───────────────────┴──────────────────────────────┘
FieldSize描述
subscription_id16 bytesTopicSubscribed 确认匹配的 UUID,用于多路复用
payloadvariable来自 Zenoh 的原始 Timestamped<InterDaemonEvent> bincode 字节

16 字节的 UUID 前缀允许在单个 WS 连接上多路复用多个订阅,而无需额外的帧开销。


Data Flow

CLI                         WsSession                     Coordinator
 │                              │                              │
 │── subscribe_topics() ───────>│                              │
 │                              │── WsRequest{TopicSubscribe} >│
 │                              │                              │ validate dataflow
 │                              │                              │ open Zenoh session (lazy)
 │                              │                              │ spawn subscriber tasks
 │                              │<── WsResponse{TopicSubscribed}│
 │<── (sub_id, data_rx) ───────│                              │
 │                              │                              │
 │                              │       ┌── Zenoh sample ──────│ Daemon publishes
 │                              │<──────│ Binary frame         │
 │<── data_rx.recv() ──────────│       │ (sub_id + payload)   │
 │                              │       │                      │
 │                              │<──────│ Binary frame         │
 │<── data_rx.recv() ──────────│       │                      │
 │                              │       └                      │
 │                              │                              │
 │   (drop session) ───────────>│── Close ────────────────────>│ abort subscriber tasks

Coordinator internals

  1. 验证ControlEvent::TopicSubscribe 被发送到协调器事件循环,由其检查数据流是否存在以及是否启用了 publish_all_messages_to_zenoh: true
  2. Zenoh 延迟初始化:协调器的 Zenoh 会话在第一次 TopicSubscribe 请求时打开,并在同一 WS 连接的后续订阅中复用
  3. 按主题分配任务:每个 (node_id, data_id) 对会生成一个 tokio 任务,订阅对应的 Zenoh 主题并将采样转发到二进制帧通道
  4. 背压:二进制帧通道容量为 64。使用 try_send —— 如果通道已满(消费者过慢),采样将被静默丢弃,而不是阻塞 Zenoh 订阅者
  5. 清理:当 WS 连接关闭时,所有订阅者任务将被终止

WsSession (CLI side)

The WsSession::subscribe_topics() method:

  1. 序列化 TopicSubscribe 请求
  2. 通过内部命令通道发送 SessionCommand::SubscribeTopics
  3. 异步 session_loop 将其包装为 WsRequest 并发送
  4. 收到 TopicSubscribed 确认后,将 data_tx 发送者注册到以 subscription_id 为键的 topic_subscribers
  5. 二进制帧的分发方式是:提取前 16 字节作为 UUID,将剩余部分发送到匹配的 data_tx

session_loop 中维护的状态:

  • pending_topic_subscribes: HashMap<Uuid, (ack_tx, data_tx)> —— 等待确认
  • topic_subscribers: HashMap<Uuid, Sender> —— 正在接收二进制数据的活跃订阅

前提条件

数据流描述符必须启用调试消息发布:

_unstable_debug:
  publish_all_messages_to_zenoh: true

如果未启用,协调器将拒绝 TopicSubscribe 并返回:

dataflow {id} not found or publish_all_messages_to_zenoh not enabled

CLI Commands

adora topic echo

将主题数据实时流式输出到终端。

# Echo a single topic
adora topic echo -d my-dataflow camera_node/image

# Echo multiple topics
adora topic echo -d my-dataflow robot1/pose robot2/vel

# JSON output for piping
adora topic echo -d my-dataflow robot1/pose --format json

内部实现:调用 session.subscribe_topics(),从 data_rx 通道接收 Timestamped<InterDaemonEvent>,反序列化 Arrow 数据,并以表格或 JSON 格式渲染。

adora topic hz

交互式 TUI,显示每个主题的发布频率统计信息。

# All topics
adora topic hz -d my-dataflow --window 10

# Specific topics
adora topic hz -d my-dataflow robot1/pose robot2/vel --window 5

使用 ratatui 构建 TUI。后台 std::threaddata_rx 接收事件,并通过 BTreeMap<(node_id, data_id), index> 查找分发到每个主题的 HzStats 跟踪器。

adora topic info

一次性获取主题元数据和统计信息。

adora topic info -d my-dataflow camera_node/image --duration 5

--duration 指定的秒数内收集消息,然后显示类型信息、发布者、订阅者(来自描述符)、消息数量和带宽。

adora record --proxy

通过 WebSocket 流式传输数据流数据以进行本地录制。

# Start dataflow first
adora start dataflow.yml --detach

# Record via proxy (data streams through coordinator to CLI)
adora record dataflow.yml --proxy -o capture.adorec

# Record specific topics
adora record dataflow.yml --proxy --topics sensor/image,lidar/points

使用场景:目标机器(运行守护进程的机器)没有本地磁盘或存储空间有限。--proxy 标志将数据通过协调器 WebSocket 路由到 CLI 所在的机器,在本地写入 .adorec 文件。

不使用 --proxy(默认行为)时,会向数据流中注入一个录制节点,直接在守护进程所在的机器上录制。


Zenoh Topic Format

协调器使用 adora_core::topics::zenoh_output_publish_topic() 中的格式订阅 Zenoh 主题:

adora/{dataflow_id}/{node_id}/{data_id}

每个主题的负载为 Timestamped<InterDaemonEvent>,使用 bincode 序列化。协调器将这些字节原样转发(前置订阅 UUID)—— 无需重新序列化。


Backpressure and Performance

参数Rationale
二进制帧通道容量64在延迟和内存之间取得平衡
Drop policyDrop on full优先保证新鲜度而非完整性
Binary format原始 bincode(不使用 base64)避免大负载 33% 的额外开销

对于高吞吐量的主题(摄像头图像、点云),如果 WS 连接较慢,二进制帧通道可能会被填满。丢弃的采样是静默的 —— CLI 会在 topic hz 中显示降低的频率,但不会卡住。


错误处理

ErrorSourceResponse
Dataflow not foundCoordinator validation带有错误消息的 WsResponse
publish_all_messages_to_zenoh not enabledCoordinator validation带有错误消息的 WsResponse
Zenoh 会话打开失败Coordinator带有错误消息的 WsResponse
Zenoh subscriber failurePer-topic task输出警告日志,任务退出
二进制帧过短(<16 字节)CLI session_loop输出警告日志,帧被丢弃
Unknown subscription UUIDCLI session_loopFrame dropped silently
WS connection closedEither side所有任务被终止,待处理的确认收到错误

测试覆盖

TierLocation覆盖内容
单元测试(客户端)binaries/cli/src/ws_client.rshandle_response_topic_subscribe_ack —— 验证确认路由和订阅者注册
Unit (all existing)binaries/cli/src/ws_client.rs已更新,通过 handle_response 传递主题订阅状态

The TopicSubscribe / binary frame path is primarily validated via integration testing with a running coordinator and Zenoh session. See Testing Guide for smoke test instructions.