WebSocket 主题数据通道
主题数据通道扩展了 WebSocket 控制面,将实时数据流消息从协调器代理到 CLI 客户端。CLI 命令(如 topic echo、topic hz 和 topic info)无需直接访问 Zenoh 网络,而是通过现有的 WebSocket 连接以二进制帧的形式接收消息数据。
动机
| 场景 | Before (Zenoh direct) | After (WS proxy) |
|---|---|---|
| CLI 与守护进程在同一台机器上 | Works | Works |
| CLI 远程访问,Zenoh 可达 | Works | Works |
| CLI 远程访问,无法访问 Zenoh | Fails | Works |
| Browser-based web UI | Impossible | Possible |
| 嵌入式目标设备,无本地磁盘 | Cannot record locally | --proxy 将数据流式传输到 CLI |
关键设计思路:CLI 和未来的 Web UI 通过 WebSocket 连接到协调器。由协调器代为订阅 Zenoh 并以二进制帧转发消息,使主题检查功能在 WebSocket 连接可达的任何地方都能使用。
架构
CLI ──── WS (binary frames) ────> Coordinator ──── Zenoh sub ────> Daemon
(Zenoh proxy) (debug publish)
协调器充当 Zenoh 代理:
- CLI 通过现有的文本帧 WS 协议发送
TopicSubscribe请求 - 协调器验证数据流并打开 Zenoh 订阅者
- 协调器将每个 Zenoh 采样以二进制 WS 帧转发回 CLI
- CLI 按订阅 UUID 将二进制帧分发到相应的消费者
关键源文件
| File | Role |
|---|---|
libraries/message/src/cli_to_coordinator.rs | TopicSubscribe、TopicUnsubscribe 请求变体 |
libraries/message/src/coordinator_to_cli.rs | TopicSubscribed reply variant |
binaries/coordinator/src/ws_control.rs | Zenoh 代理:订阅并转发二进制帧 |
binaries/coordinator/src/control.rs | ControlEvent::TopicSubscribe for validation |
binaries/cli/src/ws_client.rs | WsSession::subscribe_topics(),二进制帧分发 |
binaries/cli/src/command/topic/echo.rs | 通过 WS 进行主题回显 |
binaries/cli/src/command/topic/hz.rs | 通过 WS 进行主题频率测量 |
binaries/cli/src/command/topic/info.rs | 通过 WS 获取主题元数据/统计信息 |
binaries/cli/src/command/record.rs | --proxy 标志用于基于 WS 的录制 |
线路协议
订阅握手(JSON 文本帧)
订阅使用现有的基于 UUID 关联的请求-应答协议:
请求(CLI -> 协调器):
{
"id": "abc-123",
"method": "control",
"params": {
"TopicSubscribe": {
"dataflow_id": "550e8400-...",
"topics": [["camera_node", "image"], ["lidar_node", "points"]]
}
}
}
响应(协调器 -> CLI):
{
"id": "abc-123",
"result": {
"TopicSubscribed": {
"subscription_id": "7f1b3a00-..."
}
}
}
取消订阅(CLI -> 协调器):
{
"id": "def-456",
"method": "control",
"params": {
"TopicUnsubscribe": {
"subscription_id": "7f1b3a00-..."
}
}
}
Binary data frames
握手完成后,协调器推送二进制 WS 帧。每个帧都有一个固定大小的头部:
0 16 N
├───────────────────┼──────────────────────────────┤
│ subscription_id │ Timestamped<InterDaemonEvent>│
│ (16 bytes UUID) │ (bincode serialized) │
└───────────────────┴──────────────────────────────┘
| Field | Size | 描述 |
|---|---|---|
subscription_id | 16 bytes | 与 TopicSubscribed 确认匹配的 UUID,用于多路复用 |
| payload | variable | 来自 Zenoh 的原始 Timestamped<InterDaemonEvent> bincode 字节 |
16 字节的 UUID 前缀允许在单个 WS 连接上多路复用多个订阅,而无需额外的帧开销。
Data Flow
CLI WsSession Coordinator
│ │ │
│── subscribe_topics() ───────>│ │
│ │── WsRequest{TopicSubscribe} >│
│ │ │ validate dataflow
│ │ │ open Zenoh session (lazy)
│ │ │ spawn subscriber tasks
│ │<── WsResponse{TopicSubscribed}│
│<── (sub_id, data_rx) ───────│ │
│ │ │
│ │ ┌── Zenoh sample ──────│ Daemon publishes
│ │<──────│ Binary frame │
│<── data_rx.recv() ──────────│ │ (sub_id + payload) │
│ │ │ │
│ │<──────│ Binary frame │
│<── data_rx.recv() ──────────│ │ │
│ │ └ │
│ │ │
│ (drop session) ───────────>│── Close ────────────────────>│ abort subscriber tasks
Coordinator internals
- 验证:
ControlEvent::TopicSubscribe被发送到协调器事件循环,由其检查数据流是否存在以及是否启用了publish_all_messages_to_zenoh: true - Zenoh 延迟初始化:协调器的 Zenoh 会话在第一次
TopicSubscribe请求时打开,并在同一 WS 连接的后续订阅中复用 - 按主题分配任务:每个
(node_id, data_id)对会生成一个 tokio 任务,订阅对应的 Zenoh 主题并将采样转发到二进制帧通道 - 背压:二进制帧通道容量为 64。使用
try_send—— 如果通道已满(消费者过慢),采样将被静默丢弃,而不是阻塞 Zenoh 订阅者 - 清理:当 WS 连接关闭时,所有订阅者任务将被终止
WsSession (CLI side)
The WsSession::subscribe_topics() method:
- 序列化
TopicSubscribe请求 - 通过内部命令通道发送
SessionCommand::SubscribeTopics - 异步
session_loop将其包装为WsRequest并发送 - 收到
TopicSubscribed确认后,将data_tx发送者注册到以subscription_id为键的topic_subscribers中 - 二进制帧的分发方式是:提取前 16 字节作为 UUID,将剩余部分发送到匹配的
data_tx
session_loop 中维护的状态:
pending_topic_subscribes: HashMap<Uuid, (ack_tx, data_tx)>—— 等待确认topic_subscribers: HashMap<Uuid, Sender>—— 正在接收二进制数据的活跃订阅
前提条件
数据流描述符必须启用调试消息发布:
_unstable_debug:
publish_all_messages_to_zenoh: true
如果未启用,协调器将拒绝 TopicSubscribe 并返回:
dataflow {id} not found or publish_all_messages_to_zenoh not enabled
CLI Commands
adora topic echo
将主题数据实时流式输出到终端。
# Echo a single topic
adora topic echo -d my-dataflow camera_node/image
# Echo multiple topics
adora topic echo -d my-dataflow robot1/pose robot2/vel
# JSON output for piping
adora topic echo -d my-dataflow robot1/pose --format json
内部实现:调用 session.subscribe_topics(),从 data_rx 通道接收 Timestamped<InterDaemonEvent>,反序列化 Arrow 数据,并以表格或 JSON 格式渲染。
adora topic hz
交互式 TUI,显示每个主题的发布频率统计信息。
# All topics
adora topic hz -d my-dataflow --window 10
# Specific topics
adora topic hz -d my-dataflow robot1/pose robot2/vel --window 5
使用 ratatui 构建 TUI。后台 std::thread 从 data_rx 接收事件,并通过 BTreeMap<(node_id, data_id), index> 查找分发到每个主题的 HzStats 跟踪器。
adora topic info
一次性获取主题元数据和统计信息。
adora topic info -d my-dataflow camera_node/image --duration 5
在 --duration 指定的秒数内收集消息,然后显示类型信息、发布者、订阅者(来自描述符)、消息数量和带宽。
adora record --proxy
通过 WebSocket 流式传输数据流数据以进行本地录制。
# Start dataflow first
adora start dataflow.yml --detach
# Record via proxy (data streams through coordinator to CLI)
adora record dataflow.yml --proxy -o capture.adorec
# Record specific topics
adora record dataflow.yml --proxy --topics sensor/image,lidar/points
使用场景:目标机器(运行守护进程的机器)没有本地磁盘或存储空间有限。--proxy 标志将数据通过协调器 WebSocket 路由到 CLI 所在的机器,在本地写入 .adorec 文件。
不使用 --proxy(默认行为)时,会向数据流中注入一个录制节点,直接在守护进程所在的机器上录制。
Zenoh Topic Format
协调器使用 adora_core::topics::zenoh_output_publish_topic() 中的格式订阅 Zenoh 主题:
adora/{dataflow_id}/{node_id}/{data_id}
每个主题的负载为 Timestamped<InterDaemonEvent>,使用 bincode 序列化。协调器将这些字节原样转发(前置订阅 UUID)—— 无需重新序列化。
Backpressure and Performance
| 参数 | 值 | Rationale |
|---|---|---|
| 二进制帧通道容量 | 64 | 在延迟和内存之间取得平衡 |
| Drop policy | Drop on full | 优先保证新鲜度而非完整性 |
| Binary format | 原始 bincode(不使用 base64) | 避免大负载 33% 的额外开销 |
对于高吞吐量的主题(摄像头图像、点云),如果 WS 连接较慢,二进制帧通道可能会被填满。丢弃的采样是静默的 —— CLI 会在 topic hz 中显示降低的频率,但不会卡住。
错误处理
| Error | Source | Response |
|---|---|---|
| Dataflow not found | Coordinator validation | 带有错误消息的 WsResponse |
publish_all_messages_to_zenoh not enabled | Coordinator validation | 带有错误消息的 WsResponse |
| Zenoh 会话打开失败 | Coordinator | 带有错误消息的 WsResponse |
| Zenoh subscriber failure | Per-topic task | 输出警告日志,任务退出 |
| 二进制帧过短(<16 字节) | CLI session_loop | 输出警告日志,帧被丢弃 |
| Unknown subscription UUID | CLI session_loop | Frame dropped silently |
| WS connection closed | Either side | 所有任务被终止,待处理的确认收到错误 |
测试覆盖
| Tier | Location | 覆盖内容 |
|---|---|---|
| 单元测试(客户端) | binaries/cli/src/ws_client.rs | handle_response_topic_subscribe_ack —— 验证确认路由和订阅者注册 |
| Unit (all existing) | binaries/cli/src/ws_client.rs | 已更新,通过 handle_response 传递主题订阅状态 |
The TopicSubscribe / binary frame path is primarily validated via integration testing with a running coordinator and Zenoh session. See Testing Guide for smoke test instructions.