Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

WebSocket Topic Data Channel

The topic data channel extends the WebSocket control plane to proxy live dataflow messages from the coordinator to CLI clients. Instead of requiring direct Zenoh network access, CLI commands like topic echo, topic hz, and topic info receive message data over the existing WebSocket connection as binary frames.

Motivation

ScenarioBefore (Zenoh direct)After (WS proxy)
CLI on same machine as daemonWorksWorks
CLI remote, Zenoh reachableWorksWorks
CLI remote, no Zenoh accessFailsWorks
Browser-based web UIImpossiblePossible
Embedded target, no local diskCannot record locally--proxy streams to CLI

The key insight: CLI and future web UIs connect to the coordinator via WebSocket. By having the coordinator subscribe to Zenoh on their behalf and forward messages as binary frames, topic inspection works anywhere the WebSocket connection reaches.


Architecture

CLI  ──── WS (binary frames) ────>  Coordinator  ──── Zenoh sub ────>  Daemon
                                    (Zenoh proxy)                      (debug publish)

The coordinator acts as a Zenoh proxy:

  1. CLI sends a TopicSubscribe request over the existing text-frame WS protocol
  2. Coordinator validates the dataflow and opens Zenoh subscribers
  3. Coordinator forwards each Zenoh sample as a binary WS frame back to the CLI
  4. CLI dispatches binary frames by subscription UUID to the appropriate consumer

Key source files

FileRole
libraries/message/src/cli_to_coordinator.rsTopicSubscribe, TopicUnsubscribe request variants
libraries/message/src/coordinator_to_cli.rsTopicSubscribed reply variant
binaries/coordinator/src/ws_control.rsZenoh proxy: subscribe, forward binary frames
binaries/coordinator/src/control.rsControlEvent::TopicSubscribe for validation
binaries/cli/src/ws_client.rsWsSession::subscribe_topics(), binary frame dispatch
binaries/cli/src/command/topic/echo.rsTopic echo via WS
binaries/cli/src/command/topic/hz.rsTopic frequency measurement via WS
binaries/cli/src/command/topic/info.rsTopic metadata/stats via WS
binaries/cli/src/command/record.rs--proxy flag for WS-based recording

Wire Protocol

Subscription handshake (JSON text frames)

The subscription uses the existing UUID-correlated request-reply protocol:

Request (CLI -> Coordinator):

{
  "id": "abc-123",
  "method": "control",
  "params": {
    "TopicSubscribe": {
      "dataflow_id": "550e8400-...",
      "topics": [["camera_node", "image"], ["lidar_node", "points"]]
    }
  }
}

Response (Coordinator -> CLI):

{
  "id": "abc-123",
  "result": {
    "TopicSubscribed": {
      "subscription_id": "7f1b3a00-..."
    }
  }
}

Unsubscribe (CLI -> Coordinator):

{
  "id": "def-456",
  "method": "control",
  "params": {
    "TopicUnsubscribe": {
      "subscription_id": "7f1b3a00-..."
    }
  }
}

Binary data frames

After the handshake, the coordinator pushes binary WS frames. Each frame has a fixed-size header:

 0                   16                              N
 ├───────────────────┼──────────────────────────────┤
 │  subscription_id  │  Timestamped<InterDaemonEvent>│
 │  (16 bytes UUID)  │  (bincode serialized)         │
 └───────────────────┴──────────────────────────────┘
FieldSizeDescription
subscription_id16 bytesUUID matching the TopicSubscribed ack, for multiplexing
payloadvariableRaw Timestamped<InterDaemonEvent> bincode bytes from Zenoh

The 16-byte UUID prefix allows multiplexing multiple subscriptions on a single WS connection without additional framing overhead.


Data Flow

CLI                         WsSession                     Coordinator
 │                              │                              │
 │── subscribe_topics() ───────>│                              │
 │                              │── WsRequest{TopicSubscribe} >│
 │                              │                              │ validate dataflow
 │                              │                              │ open Zenoh session (lazy)
 │                              │                              │ spawn subscriber tasks
 │                              │<── WsResponse{TopicSubscribed}│
 │<── (sub_id, data_rx) ───────│                              │
 │                              │                              │
 │                              │       ┌── Zenoh sample ──────│ Daemon publishes
 │                              │<──────│ Binary frame         │
 │<── data_rx.recv() ──────────│       │ (sub_id + payload)   │
 │                              │       │                      │
 │                              │<──────│ Binary frame         │
 │<── data_rx.recv() ──────────│       │                      │
 │                              │       └                      │
 │                              │                              │
 │   (drop session) ───────────>│── Close ────────────────────>│ abort subscriber tasks

Coordinator internals

  1. Validation: ControlEvent::TopicSubscribe is sent to the coordinator event loop, which checks that the dataflow exists and has publish_all_messages_to_zenoh: true enabled
  2. Lazy Zenoh: The coordinator’s Zenoh session is opened on the first TopicSubscribe request and reused for subsequent subscriptions on the same WS connection
  3. Per-topic tasks: Each (node_id, data_id) pair spawns a tokio task that subscribes to the corresponding Zenoh topic and forwards samples to the binary frame channel
  4. Backpressure: The binary frame channel has capacity 64. try_send is used – if the channel is full (slow consumer), samples are silently dropped rather than blocking the Zenoh subscriber
  5. Cleanup: When the WS connection closes, all subscriber tasks are aborted

WsSession (CLI side)

The WsSession::subscribe_topics() method:

  1. Serializes a TopicSubscribe request
  2. Sends SessionCommand::SubscribeTopics through the internal command channel
  3. The async session_loop wraps it as a WsRequest and sends it
  4. On receiving the TopicSubscribed ack, registers the data_tx sender in topic_subscribers keyed by subscription_id
  5. Binary frames are dispatched by extracting the first 16 bytes as UUID and sending the remainder to the matching data_tx

State maintained in session_loop:

  • pending_topic_subscribes: HashMap<Uuid, (ack_tx, data_tx)> – awaiting ack
  • topic_subscribers: HashMap<Uuid, Sender> – active subscriptions receiving binary data

Prerequisites

The dataflow descriptor must enable debug message publishing:

_unstable_debug:
  publish_all_messages_to_zenoh: true

Without this, the coordinator rejects the TopicSubscribe with:

dataflow {id} not found or publish_all_messages_to_zenoh not enabled

CLI Commands

adora topic echo

Stream topic data to the terminal in real-time.

# Echo a single topic
adora topic echo -d my-dataflow camera_node/image

# Echo multiple topics
adora topic echo -d my-dataflow robot1/pose robot2/vel

# JSON output for piping
adora topic echo -d my-dataflow robot1/pose --format json

Internally: calls session.subscribe_topics(), receives Timestamped<InterDaemonEvent> from the data_rx channel, deserializes Arrow data, and renders as table or JSON.

adora topic hz

Interactive TUI displaying per-topic publish frequency statistics.

# All topics
adora topic hz -d my-dataflow --window 10

# Specific topics
adora topic hz -d my-dataflow robot1/pose robot2/vel --window 5

Uses ratatui for the TUI. A background std::thread receives events from data_rx and dispatches to per-topic HzStats trackers via a BTreeMap<(node_id, data_id), index> lookup.

adora topic info

One-shot topic metadata and statistics.

adora topic info -d my-dataflow camera_node/image --duration 5

Collects messages for --duration seconds, then displays type information, publisher, subscribers (from descriptor), message count, and bandwidth.

adora record --proxy

Stream dataflow data through WebSocket for local recording.

# Start dataflow first
adora start dataflow.yml --detach

# Record via proxy (data streams through coordinator to CLI)
adora record dataflow.yml --proxy -o capture.adorec

# Record specific topics
adora record dataflow.yml --proxy --topics sensor/image,lidar/points

Use case: the target machine (running the daemon) has no local disk or limited storage. The --proxy flag routes data through the coordinator WebSocket to the CLI machine, where the .adorec file is written locally.

Without --proxy (default), a record node is injected into the dataflow and records directly on the daemon’s machine.


Zenoh Topic Format

The coordinator subscribes to Zenoh topics using the format from adora_core::topics::zenoh_output_publish_topic():

adora/{dataflow_id}/{node_id}/{data_id}

Each topic carries Timestamped<InterDaemonEvent> as its payload, serialized with bincode. The coordinator forwards these bytes as-is (prepended with subscription UUID) – no re-serialization.


Backpressure and Performance

ParameterValueRationale
Binary frame channel capacity64Balance between latency and memory
Drop policyDrop on fullPrefer freshness over completeness
Binary formatRaw bincode (no base64)Avoid 33% overhead for large payloads

For high-throughput topics (camera images, point clouds), the binary frame channel may fill up if the WS connection is slow. Dropped samples are silent – the CLI will show reduced frequency in topic hz but won’t stall.


Error Handling

ErrorSourceResponse
Dataflow not foundCoordinator validationWsResponse with error message
publish_all_messages_to_zenoh not enabledCoordinator validationWsResponse with error message
Zenoh session open failureCoordinatorWsResponse with error message
Zenoh subscriber failurePer-topic taskWarning log, task exits
Binary frame too short (<16 bytes)CLI session_loopWarning log, frame dropped
Unknown subscription UUIDCLI session_loopFrame dropped silently
WS connection closedEither sideAll tasks aborted, pending acks get error

Test Coverage

TierLocationWhat’s covered
Unit (client)binaries/cli/src/ws_client.rshandle_response_topic_subscribe_ack – verifies ack routing and subscriber registration
Unit (all existing)binaries/cli/src/ws_client.rsUpdated to pass topic subscribe state through handle_response

The TopicSubscribe / binary frame path is primarily validated via integration testing with a running coordinator and Zenoh session. See Testing Guide for smoke test instructions.