Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

WebSocket 控制面

Adora 的控制面使用 WebSocket 连接进行 CLI、协调器和守护进程之间的所有通信。单个 Axum 服务器在一个端口上暴露三个路由,取代了之前的多端口 TCP 设计。JSON 文本帧承载基于 UUID 关联的请求/应答协议,以及用于日志流的即发即弃事件。

功能一览

特性详情
路由/api/control(CLI)、/api/daemon(守护进程)、/health
传输格式JSON 文本帧 + 用于主题数据的二进制帧
协议UUID 关联的请求/应答 + 即发即弃事件
消息大小限制1 MiB(MAX_CONTROL_MESSAGE_BYTES
并发限制256 个连接(MAX_WS_CONNECTIONS
服务器框架Axum + Tower 中间件
客户端库tokio-tungstenite(集成测试、守护进程),自定义 WsSession(CLI)
安全重注册保护、守护进程 ID 验证、机器 ID 长度限制

架构

                        Single Axum server (one port)
                       ┌────────────────────────────┐
                       │  /api/control   (CLI)       │
  CLI ──── WS ────────>│  /api/daemon    (Daemons)   │
                       │  /health        (HTTP GET)  │
  Daemon ── WS ───────>│                             │
                       └──────────┬─────────────────┘
                                  │ mpsc::Sender<Event>
                                  v
                            Coordinator
                          (event loop)

协调器绑定单个 TcpListener 并提供 Axum 路由。每次 WebSocket 升级都生成一个处理任务,通过 mpsc::Sender<Event> 通道与协调器的主事件循环通信。

关键源文件

FileRole
binaries/coordinator/src/ws_server.rs路由、serve()、常量、ShutdownTrigger
binaries/coordinator/src/ws_control.rs/api/control 处理器
binaries/coordinator/src/ws_daemon.rs/api/daemon 处理器、安全、事件转换
binaries/cli/src/ws_client.rsWsSession 同步客户端包装器
libraries/message/src/ws_protocol.rsWsRequestWsResponseWsEventWsMessage 类型

线路协议

所有消息都是 JSON 文本帧。存在三种消息形式:

WsRequest(客户端 -> 服务端)

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "method": "control",
  "params": { "List": null }
}
Field类型描述
idUUID用于应答关联的唯一请求标识符
methodstring"control" 用于 CLI 请求,"daemon_event" / "daemon_command" 用于守护进程
paramsobject序列化的 ControlRequestTimestamped<CoordinatorRequest>

WsResponse(服务端 -> 客户端)

Success:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "result": { "DataflowList": [] }
}

Error:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "error": "no running dataflow with id ..."
}
Field类型描述
idUUID匹配发起请求的 id
resultobject?成功时存在(序列化的 ControlRequestReply
errorstring?失败时存在

WsEvent(双向)

{
  "event": "log",
  "payload": { "message": "sensor started", "level": "info" }
}

LogSubscribe/BuildLogSubscribe 被确认后用于日志流传输。

Dispatch

每个处理器使用自己的策略解析传入帧以保持 u128 保真度(见 u128 序列化):

  • CLI(ws_client.rs:对 result/payload 字段使用扁平的 IncomingFrame 结构体配合 serde_json::value::RawValue,完全避免使用 serde_json::Value。通过 event(日志推送)或 id(响应)的存在来区分。
  • 协调器控制处理器(ws_control.rs:解析为 WsRequest(始终是来自 CLI 的请求)。
  • 协调器守护进程处理器(ws_daemon.rs:检查 "method" 键以区分请求和响应。对请求使用 DaemonWsRequestRaw 辅助结构。
  • 守护进程(coordinator.rs:使用 CoordinatorCommandRaw / RegisterReplyRaw 辅助结构直接从原始 JSON 文本解析。

ws_protocol.rs 中定义了一个 WsMessage 无标签枚举用于通用分发,但生产处理器未使用:

#![allow(unused)]
fn main() {
#[serde(untagged)]
pub enum WsMessage {
    Request(WsRequest),
    Response(WsResponse),
    Event(WsEvent),
}
}

CLI 控制面(/api/control

CLI 连接到 /api/control 发送 ControlRequest 命令并接收 ControlRequestReply 响应。

连接生命周期

  1. 连接 —— HTTP 升级到 WebSocket
  2. 请求-应答 —— CLI 发送 WsRequest,协调器处理 ControlRequest,发送 WsResponse
  3. 日志订阅(可选)—— CLI 发送 LogSubscribe/BuildLogSubscribe,协调器以 WsResponse 确认,然后推送 WsEvent{event:"log"}
  4. 关闭 —— CLI 发送 Close 帧或断开连接

支持的 ControlRequest 变体

Variant描述
List列出所有运行中的数据流
Build触发数据流构建
WaitForBuild阻塞直到构建完成
Start启动数据流
WaitForSpawn阻塞直到节点生成
Stop / StopByName停止运行中的数据流
Reload热重载节点/算子
Check检查数据流状态
Destroy关闭所有守护进程
Logs获取历史日志
Info获取数据流详情
DaemonConnected检查是否有守护进程连接
ConnectedMachines列出已连接的守护进程
LogSubscribe订阅实时数据流日志
BuildLogSubscribe订阅实时构建日志
CliAndDefaultDaemonOnSameMachine检查共置
GetNodeInfo获取节点元数据
TopicSubscribeSubscribe to live topic data via binary WS frames (details)
TopicUnsubscribe取消主题订阅

日志订阅流程

CLI                         Coordinator
 │                              │
 │─── WsRequest{LogSubscribe} ─>│
 │                              │  (check dataflow exists)
 │<── WsResponse{subscribed} ───│
 │                              │
 │<── WsEvent{event:"log"} ────│  (repeated)
 │<── WsEvent{event:"log"} ────│
 │                              │
 │─── Close ───────────────────>│  (log_subscribers dropped)

如果找不到数据流,协调器返回带有错误的 WsResponse,不发送事件。

WsSession(CLI 客户端)

WsSession 是一个同步包装器,将阻塞的 CLI 代码桥接到异步 WebSocket 连接。它创建内部的 tokio::runtime::Runtime(当前线程)并生成异步 session_loop 任务。

CLI thread (sync)                       session_loop (async)
     │                                        │
     │── SessionCommand::Request ────────────>│── WsRequest ──> server
     │                                        │<── WsResponse ──
     │<── oneshot reply ─────────────────────│
     │                                        │
     │── SessionCommand::SubscribeLogs ──────>│── WsRequest ──> server
     │                                        │<── WsResponse (ack)
     │<── oneshot ack ───────────────────────│
     │<── std_mpsc log events ───────────────│<── WsEvent ──

会话循环维护:

  • pending_requests: HashMap<Uuid, oneshot::Sender> —— 用于请求-应答关联
  • pending_subscribes: HashMap<Uuid, (ack_tx, log_tx)> —— 用于订阅确认路由
  • log_subscribers: Vec<std_mpsc::Sender> —— 用于广播日志事件
  • pending_topic_subscribes: HashMap<Uuid, (ack_tx, data_tx)> —— 用于主题订阅确认路由
  • topic_subscribers: HashMap<Uuid, std_mpsc::Sender> —— 用于按订阅 UUID 分发二进制帧

Binary WS frames (topic data) are dispatched separately from text frames. See WebSocket Topic Data Channel for details.

断开连接时,所有待处理请求通过其 oneshot 通道收到错误。


守护进程面(/api/daemon

守护进程连接到 /api/daemon 进行注册、事件报告和接收协调器命令。

注册流程

Daemon                       Coordinator
  │                              │
  │── WsRequest{Register} ─────>│
  │                              │  (validate, assign daemon_id)
  │                              │  (track connection + cmd channel)
  │                              │
  │── WsRequest{Event{...}} ───>│  (subsequent events)
  1. 守护进程发送包含 DaemonRegisterRequest(版本 + 机器 ID)的 Register 请求
  2. 协调器验证版本兼容性和机器 ID 长度
  3. 协调器分配 DaemonId 并存储 DaemonConnection(包含用于向守护进程发送命令的 cmd_tx 通道)
  4. 连接通过 tracked_daemon_id 跟踪,用于断开时的清理

事件转换

守护进程事件被转换为协调器内部 Event 变体:

DaemonEvent协调器事件
AllNodesReadyEvent::Dataflow { ReadyOnDaemon }
AllNodesFinishedEvent::Dataflow { DataflowFinishedOnDaemon }
HeartbeatEvent::DaemonHeartbeat
Log(message)Event::Log(message)
ExitEvent::DaemonExit
NodeMetricsEvent::NodeMetrics
BuildResultEvent::DataflowBuildResult
SpawnResultEvent::DataflowSpawnResult

双向通信

协调器可以通过存储在 DaemonConnection 中的 cmd_tx 通道向守护进程发送命令。守护进程处理器维护 pending_replies: HashMap<Uuid, oneshot::Sender> 来关联守护进程响应与协调器发起的请求。

守护进程处理器上的消息路由:

  • 帧有 "method" 键 -> 守护进程请求(注册或事件)
  • 帧缺少 "method" 键 -> 守护进程对协调器命令的响应

u128 序列化变通方案

uhlc::ID 包含一个 NonZeroU128,超出了 serde_json::Value::Number 范围(仅 i64/u64/f64)。使用 serde_json::to_value() 会报 “number out of range” 错误,而 serde_json::from_slice::<Value>() 会通过存储为 f64 静默丢失精度。

所有生产代码对包含 uhlc::Timestamp 的数据绕过 serde_json::Value

ComponentSerializationDeserialization
守护进程(coordinator.rsto_string + format!辅助结构体(RegisterReplyRawCoordinatorCommandRaw)+ from_str
协调器控制(ws_control.rsto_string + format! 用于应答不适用(CLI 请求不包含 u128)
协调器守护进程(ws_daemon.rsN/ADaemonWsRequestRaw + from_str
协调器状态(state.rsstr::from_utf8 + format!(原始字节嵌入)N/A
CLI(ws_client.rs不适用(请求不包含 u128)IncomingFrame 配合 serde_json::value::RawValue

集成测试同样通过 format!() + serde_json::to_string()(非 to_value())手动构造 WsRequest JSON 字符串以匹配真实的线路格式。


安全

重注册保护

每个守护进程 WebSocket 连接只允许一次 Register 请求。如果连接尝试第二次注册,协调器记录警告并关闭连接:

daemon attempted re-register on same connection, rejecting

守护进程 ID 验证

注册后,每条 Event 消息必须包含与注册时分配的 daemon_id 匹配的 ID。不匹配的 ID 会导致连接终止:

daemon sent event with mismatched id: expected `X`, got `Y` -- closing connection

机器 ID 长度验证

DaemonRegisterRequest 中的 machine_id 字段限制为 256 字节。超大值会导致连接终止。

连接和消息限制

Limit实施方
最大消息大小1 MiBWebSocketUpgrade::max_message_size
最大并发连接数256Tower ConcurrencyLimitLayer

连接生命周期和保活

Establishment

/api/control/api/daemon 都使用标准 HTTP/1.1 WebSocket 升级。Axum WebSocketUpgrade 提取器处理握手。

Ping/pong

两个处理器都用包含相同载荷的 Pong 帧响应 Ping 帧:

#![allow(unused)]
fn main() {
Ok(Message::Ping(data)) => {
    let _ = ws_tx.send(Message::Pong(data)).await;
    continue;
}
}

优雅关闭

收到 Close 帧时:

  • 控制处理器:中断处理循环,丢弃日志订阅者通道
  • 守护进程处理器:中断循环,然后发出 Event::DaemonExit { daemon_id } 以立即清理

断开时的清理

控制连接

  • log_tx 通道被丢弃,停止向该客户端转发日志
  • 无需清理协调器状态(控制连接是无状态的)

守护进程连接

  • 如果 daemon_id 被跟踪则发出 DaemonExit 事件
  • cmd_txpending_replies 被丢弃
  • 协调器从其连接映射中移除守护进程

WsSession(CLI 客户端)

  • pending_requests 中的所有条目收到 Err("WS connection closed")
  • pending_subscribes 中的所有条目收到 Err("WS connection closed")

消息流示例

CLI 列出数据流

CLI                          WsSession                    Coordinator
 │                              │                              │
 │── request(&List) ───────────>│                              │
 │                              │── WsRequest ────────────────>│
 │                              │   id: "abc-123"              │
 │                              │   method: "control"          │
 │                              │   params: "List"             │
 │                              │                              │
 │                              │                    ControlEvent::IncomingRequest
 │                              │                    reply via oneshot
 │                              │                              │
 │                              │<── WsResponse ──────────────│
 │                              │   id: "abc-123"              │
 │                              │   result: {DataflowList:[]}  │
 │                              │                              │
 │<── ControlRequestReply ─────│                              │

守护进程注册

Daemon                                    Coordinator
  │                                           │
  │── WsRequest ─────────────────────────────>│
  │   method: "daemon_event"                  │
  │   params: {inner: Register{...},          │
  │            timestamp: ...}                │
  │                                           │  validate version
  │                                           │  validate machine_id
  │                                           │  assign daemon_id
  │                                           │  store DaemonConnection
  │                                           │
  │── WsRequest{Event{Heartbeat}} ──────────>│
  │                                           │  Event::DaemonHeartbeat
  │                                           │
  │                        (on WS close) ────>│  Event::DaemonExit

日志订阅生命周期

CLI                    WsSession              Coordinator
 │                        │                        │
 │── subscribe_logs() ───>│                        │
 │                        │── WsRequest ──────────>│
 │                        │   params: LogSubscribe │
 │                        │                        │  find dataflow
 │                        │<── WsResponse ────────│  {subscribed: true}
 │<── ack (Ok) ──────────│                        │
 │                        │                        │
 │                        │<── WsEvent{log} ──────│  (node produces log)
 │<── log_rx.recv() ─────│                        │
 │                        │<── WsEvent{log} ──────│
 │<── log_rx.recv() ─────│                        │
 │                        │                        │
 │   (drop session) ─────>│── Close ─────────────>│  (log_subscribers dropped)

测试覆盖

测试层级

TierLocationTests覆盖内容
单元测试(协议)libraries/message/src/ws_protocol.rs10往返序列化、无标签分发、错误情况
单元测试(客户端)binaries/cli/src/ws_client.rs6响应路由、订阅确认、主题订阅确认、孤立处理、断开
集成测试(控制)binaries/coordinator/tests/ws_control_tests.rs11健康检查、列表、无效 JSON/参数、销毁、DaemonConnected、ping/pong、并发请求、连接关闭、日志订阅
集成测试(守护进程)binaries/coordinator/tests/ws_daemon_tests.rs4注册、注册后状态、断开清理、ping/pong
端到端测试(WsSession)tests/ws-cli-e2e.rs4WsSession + 协调器:列表、状态、停止、多请求
Total35

Key test patterns

轮询超时机制:集成测试通过轮询协调器状态(例如 DaemonConnected),设置 2 秒截止时间和 20 毫秒休眠间隔,避免不稳定的时序假设。

禁止嵌套运行时:端到端测试在后台 std::thread 上运行协调器并使用独立的 tokio 运行时,而 WsSession(会创建自己的当前线程运行时)则在测试主线程上运行。这样可以避免 “cannot start a runtime from within a runtime” 恐慌错误。

测试中的 u128 变通方案:守护进程测试辅助函数通过 format!() + serde_json::to_string()(而非 serde_json::to_value())手动构造 WsRequest JSON 字符串,以保留线上传输的 uhlc::ID u128 值。

测试协调器设置:集成测试和端到端测试均使用 adora_coordinator::start_testing(),该函数绑定到端口 0(由操作系统分配)并接受空的外部事件流。


Configuration Reference

Constants

ConstantFile用途
MAX_CONTROL_MESSAGE_BYTES1 MiB (1,048,576)ws_server.rsWebSocket 最大帧大小
MAX_WS_CONNECTIONS256ws_server.rsTower concurrency limit

Server setup

#![allow(unused)]
fn main() {
// Production: called by coordinator's main startup
let (port, shutdown, future) = ws_server::serve(bind_addr, event_tx, clock).await?;
tokio::spawn(future);
// ...
shutdown.shutdown(); // graceful stop
}

Test setup

#![allow(unused)]
fn main() {
// Binds to port 0, returns (port, future)
let (port, future) = adora_coordinator::start_testing(
    "127.0.0.1:0".parse().unwrap(),
    futures::stream::empty(),
).await?;
}

Shutdown

ShutdownTrigger 封装了一个 oneshot::Sender<()>。调用 .shutdown() 会发送信号,Axum 服务器通过 with_graceful_shutdown 接收该信号。正在处理的请求会继续完成,新的连接将被拒绝。