WebSocket 控制面
Adora 的控制面使用 WebSocket 连接进行 CLI、协调器和守护进程之间的所有通信。单个 Axum 服务器在一个端口上暴露三个路由,取代了之前的多端口 TCP 设计。JSON 文本帧承载基于 UUID 关联的请求/应答协议,以及用于日志流的即发即弃事件。
功能一览
| 特性 | 详情 |
|---|---|
| 路由 | /api/control(CLI)、/api/daemon(守护进程)、/health |
| 传输格式 | JSON 文本帧 + 用于主题数据的二进制帧 |
| 协议 | UUID 关联的请求/应答 + 即发即弃事件 |
| 消息大小限制 | 1 MiB(MAX_CONTROL_MESSAGE_BYTES) |
| 并发限制 | 256 个连接(MAX_WS_CONNECTIONS) |
| 服务器框架 | Axum + Tower 中间件 |
| 客户端库 | tokio-tungstenite(集成测试、守护进程),自定义 WsSession(CLI) |
| 安全 | 重注册保护、守护进程 ID 验证、机器 ID 长度限制 |
架构
Single Axum server (one port)
┌────────────────────────────┐
│ /api/control (CLI) │
CLI ──── WS ────────>│ /api/daemon (Daemons) │
│ /health (HTTP GET) │
Daemon ── WS ───────>│ │
└──────────┬─────────────────┘
│ mpsc::Sender<Event>
v
Coordinator
(event loop)
协调器绑定单个 TcpListener 并提供 Axum 路由。每次 WebSocket 升级都生成一个处理任务,通过 mpsc::Sender<Event> 通道与协调器的主事件循环通信。
关键源文件
| File | Role |
|---|---|
binaries/coordinator/src/ws_server.rs | 路由、serve()、常量、ShutdownTrigger |
binaries/coordinator/src/ws_control.rs | /api/control 处理器 |
binaries/coordinator/src/ws_daemon.rs | /api/daemon 处理器、安全、事件转换 |
binaries/cli/src/ws_client.rs | WsSession 同步客户端包装器 |
libraries/message/src/ws_protocol.rs | WsRequest、WsResponse、WsEvent、WsMessage 类型 |
线路协议
所有消息都是 JSON 文本帧。存在三种消息形式:
WsRequest(客户端 -> 服务端)
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"method": "control",
"params": { "List": null }
}
| Field | 类型 | 描述 |
|---|---|---|
id | UUID | 用于应答关联的唯一请求标识符 |
method | string | "control" 用于 CLI 请求,"daemon_event" / "daemon_command" 用于守护进程 |
params | object | 序列化的 ControlRequest 或 Timestamped<CoordinatorRequest> |
WsResponse(服务端 -> 客户端)
Success:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"result": { "DataflowList": [] }
}
Error:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"error": "no running dataflow with id ..."
}
| Field | 类型 | 描述 |
|---|---|---|
id | UUID | 匹配发起请求的 id |
result | object? | 成功时存在(序列化的 ControlRequestReply) |
error | string? | 失败时存在 |
WsEvent(双向)
{
"event": "log",
"payload": { "message": "sensor started", "level": "info" }
}
在 LogSubscribe/BuildLogSubscribe 被确认后用于日志流传输。
Dispatch
每个处理器使用自己的策略解析传入帧以保持 u128 保真度(见 u128 序列化):
- CLI(
ws_client.rs):对result/payload字段使用扁平的IncomingFrame结构体配合serde_json::value::RawValue,完全避免使用serde_json::Value。通过event(日志推送)或id(响应)的存在来区分。 - 协调器控制处理器(
ws_control.rs):解析为WsRequest(始终是来自 CLI 的请求)。 - 协调器守护进程处理器(
ws_daemon.rs):检查"method"键以区分请求和响应。对请求使用DaemonWsRequestRaw辅助结构。 - 守护进程(
coordinator.rs):使用CoordinatorCommandRaw/RegisterReplyRaw辅助结构直接从原始 JSON 文本解析。
ws_protocol.rs 中定义了一个 WsMessage 无标签枚举用于通用分发,但生产处理器未使用:
#![allow(unused)]
fn main() {
#[serde(untagged)]
pub enum WsMessage {
Request(WsRequest),
Response(WsResponse),
Event(WsEvent),
}
}
CLI 控制面(/api/control)
CLI 连接到 /api/control 发送 ControlRequest 命令并接收 ControlRequestReply 响应。
连接生命周期
- 连接 —— HTTP 升级到 WebSocket
- 请求-应答 —— CLI 发送
WsRequest,协调器处理ControlRequest,发送WsResponse - 日志订阅(可选)—— CLI 发送
LogSubscribe/BuildLogSubscribe,协调器以WsResponse确认,然后推送WsEvent{event:"log"}帧 - 关闭 —— CLI 发送
Close帧或断开连接
支持的 ControlRequest 变体
| Variant | 描述 |
|---|---|
List | 列出所有运行中的数据流 |
Build | 触发数据流构建 |
WaitForBuild | 阻塞直到构建完成 |
Start | 启动数据流 |
WaitForSpawn | 阻塞直到节点生成 |
Stop / StopByName | 停止运行中的数据流 |
Reload | 热重载节点/算子 |
Check | 检查数据流状态 |
Destroy | 关闭所有守护进程 |
Logs | 获取历史日志 |
Info | 获取数据流详情 |
DaemonConnected | 检查是否有守护进程连接 |
ConnectedMachines | 列出已连接的守护进程 |
LogSubscribe | 订阅实时数据流日志 |
BuildLogSubscribe | 订阅实时构建日志 |
CliAndDefaultDaemonOnSameMachine | 检查共置 |
GetNodeInfo | 获取节点元数据 |
TopicSubscribe | Subscribe to live topic data via binary WS frames (details) |
TopicUnsubscribe | 取消主题订阅 |
日志订阅流程
CLI Coordinator
│ │
│─── WsRequest{LogSubscribe} ─>│
│ │ (check dataflow exists)
│<── WsResponse{subscribed} ───│
│ │
│<── WsEvent{event:"log"} ────│ (repeated)
│<── WsEvent{event:"log"} ────│
│ │
│─── Close ───────────────────>│ (log_subscribers dropped)
如果找不到数据流,协调器返回带有错误的 WsResponse,不发送事件。
WsSession(CLI 客户端)
WsSession 是一个同步包装器,将阻塞的 CLI 代码桥接到异步 WebSocket 连接。它创建内部的 tokio::runtime::Runtime(当前线程)并生成异步 session_loop 任务。
CLI thread (sync) session_loop (async)
│ │
│── SessionCommand::Request ────────────>│── WsRequest ──> server
│ │<── WsResponse ──
│<── oneshot reply ─────────────────────│
│ │
│── SessionCommand::SubscribeLogs ──────>│── WsRequest ──> server
│ │<── WsResponse (ack)
│<── oneshot ack ───────────────────────│
│<── std_mpsc log events ───────────────│<── WsEvent ──
会话循环维护:
pending_requests: HashMap<Uuid, oneshot::Sender>—— 用于请求-应答关联pending_subscribes: HashMap<Uuid, (ack_tx, log_tx)>—— 用于订阅确认路由log_subscribers: Vec<std_mpsc::Sender>—— 用于广播日志事件pending_topic_subscribes: HashMap<Uuid, (ack_tx, data_tx)>—— 用于主题订阅确认路由topic_subscribers: HashMap<Uuid, std_mpsc::Sender>—— 用于按订阅 UUID 分发二进制帧
Binary WS frames (topic data) are dispatched separately from text frames. See WebSocket Topic Data Channel for details.
断开连接时,所有待处理请求通过其 oneshot 通道收到错误。
守护进程面(/api/daemon)
守护进程连接到 /api/daemon 进行注册、事件报告和接收协调器命令。
注册流程
Daemon Coordinator
│ │
│── WsRequest{Register} ─────>│
│ │ (validate, assign daemon_id)
│ │ (track connection + cmd channel)
│ │
│── WsRequest{Event{...}} ───>│ (subsequent events)
- 守护进程发送包含
DaemonRegisterRequest(版本 + 机器 ID)的Register请求 - 协调器验证版本兼容性和机器 ID 长度
- 协调器分配
DaemonId并存储DaemonConnection(包含用于向守护进程发送命令的cmd_tx通道) - 连接通过
tracked_daemon_id跟踪,用于断开时的清理
事件转换
守护进程事件被转换为协调器内部 Event 变体:
| DaemonEvent | 协调器事件 |
|---|---|
AllNodesReady | Event::Dataflow { ReadyOnDaemon } |
AllNodesFinished | Event::Dataflow { DataflowFinishedOnDaemon } |
Heartbeat | Event::DaemonHeartbeat |
Log(message) | Event::Log(message) |
Exit | Event::DaemonExit |
NodeMetrics | Event::NodeMetrics |
BuildResult | Event::DataflowBuildResult |
SpawnResult | Event::DataflowSpawnResult |
双向通信
协调器可以通过存储在 DaemonConnection 中的 cmd_tx 通道向守护进程发送命令。守护进程处理器维护 pending_replies: HashMap<Uuid, oneshot::Sender> 来关联守护进程响应与协调器发起的请求。
守护进程处理器上的消息路由:
- 帧有
"method"键 -> 守护进程请求(注册或事件) - 帧缺少
"method"键 -> 守护进程对协调器命令的响应
u128 序列化变通方案
uhlc::ID 包含一个 NonZeroU128,超出了 serde_json::Value::Number 范围(仅 i64/u64/f64)。使用 serde_json::to_value() 会报 “number out of range” 错误,而 serde_json::from_slice::<Value>() 会通过存储为 f64 静默丢失精度。
所有生产代码对包含 uhlc::Timestamp 的数据绕过 serde_json::Value:
| Component | Serialization | Deserialization |
|---|---|---|
守护进程(coordinator.rs) | to_string + format! | 辅助结构体(RegisterReplyRaw、CoordinatorCommandRaw)+ from_str |
协调器控制(ws_control.rs) | to_string + format! 用于应答 | 不适用(CLI 请求不包含 u128) |
协调器守护进程(ws_daemon.rs) | N/A | DaemonWsRequestRaw + from_str |
协调器状态(state.rs) | str::from_utf8 + format!(原始字节嵌入) | N/A |
CLI(ws_client.rs) | 不适用(请求不包含 u128) | IncomingFrame 配合 serde_json::value::RawValue |
集成测试同样通过 format!() + serde_json::to_string()(非 to_value())手动构造 WsRequest JSON 字符串以匹配真实的线路格式。
安全
重注册保护
每个守护进程 WebSocket 连接只允许一次 Register 请求。如果连接尝试第二次注册,协调器记录警告并关闭连接:
daemon attempted re-register on same connection, rejecting
守护进程 ID 验证
注册后,每条 Event 消息必须包含与注册时分配的 daemon_id 匹配的 ID。不匹配的 ID 会导致连接终止:
daemon sent event with mismatched id: expected `X`, got `Y` -- closing connection
机器 ID 长度验证
DaemonRegisterRequest 中的 machine_id 字段限制为 256 字节。超大值会导致连接终止。
连接和消息限制
| Limit | 值 | 实施方 |
|---|---|---|
| 最大消息大小 | 1 MiB | WebSocketUpgrade::max_message_size |
| 最大并发连接数 | 256 | Tower ConcurrencyLimitLayer |
连接生命周期和保活
Establishment
/api/control 和 /api/daemon 都使用标准 HTTP/1.1 WebSocket 升级。Axum WebSocketUpgrade 提取器处理握手。
Ping/pong
两个处理器都用包含相同载荷的 Pong 帧响应 Ping 帧:
#![allow(unused)]
fn main() {
Ok(Message::Ping(data)) => {
let _ = ws_tx.send(Message::Pong(data)).await;
continue;
}
}
优雅关闭
收到 Close 帧时:
- 控制处理器:中断处理循环,丢弃日志订阅者通道
- 守护进程处理器:中断循环,然后发出
Event::DaemonExit { daemon_id }以立即清理
断开时的清理
控制连接:
log_tx通道被丢弃,停止向该客户端转发日志- 无需清理协调器状态(控制连接是无状态的)
守护进程连接:
- 如果
daemon_id被跟踪则发出DaemonExit事件 cmd_tx和pending_replies被丢弃- 协调器从其连接映射中移除守护进程
WsSession(CLI 客户端):
pending_requests中的所有条目收到Err("WS connection closed")pending_subscribes中的所有条目收到Err("WS connection closed")
消息流示例
CLI 列出数据流
CLI WsSession Coordinator
│ │ │
│── request(&List) ───────────>│ │
│ │── WsRequest ────────────────>│
│ │ id: "abc-123" │
│ │ method: "control" │
│ │ params: "List" │
│ │ │
│ │ ControlEvent::IncomingRequest
│ │ reply via oneshot
│ │ │
│ │<── WsResponse ──────────────│
│ │ id: "abc-123" │
│ │ result: {DataflowList:[]} │
│ │ │
│<── ControlRequestReply ─────│ │
守护进程注册
Daemon Coordinator
│ │
│── WsRequest ─────────────────────────────>│
│ method: "daemon_event" │
│ params: {inner: Register{...}, │
│ timestamp: ...} │
│ │ validate version
│ │ validate machine_id
│ │ assign daemon_id
│ │ store DaemonConnection
│ │
│── WsRequest{Event{Heartbeat}} ──────────>│
│ │ Event::DaemonHeartbeat
│ │
│ (on WS close) ────>│ Event::DaemonExit
日志订阅生命周期
CLI WsSession Coordinator
│ │ │
│── subscribe_logs() ───>│ │
│ │── WsRequest ──────────>│
│ │ params: LogSubscribe │
│ │ │ find dataflow
│ │<── WsResponse ────────│ {subscribed: true}
│<── ack (Ok) ──────────│ │
│ │ │
│ │<── WsEvent{log} ──────│ (node produces log)
│<── log_rx.recv() ─────│ │
│ │<── WsEvent{log} ──────│
│<── log_rx.recv() ─────│ │
│ │ │
│ (drop session) ─────>│── Close ─────────────>│ (log_subscribers dropped)
测试覆盖
测试层级
| Tier | Location | Tests | 覆盖内容 |
|---|---|---|---|
| 单元测试(协议) | libraries/message/src/ws_protocol.rs | 10 | 往返序列化、无标签分发、错误情况 |
| 单元测试(客户端) | binaries/cli/src/ws_client.rs | 6 | 响应路由、订阅确认、主题订阅确认、孤立处理、断开 |
| 集成测试(控制) | binaries/coordinator/tests/ws_control_tests.rs | 11 | 健康检查、列表、无效 JSON/参数、销毁、DaemonConnected、ping/pong、并发请求、连接关闭、日志订阅 |
| 集成测试(守护进程) | binaries/coordinator/tests/ws_daemon_tests.rs | 4 | 注册、注册后状态、断开清理、ping/pong |
| 端到端测试(WsSession) | tests/ws-cli-e2e.rs | 4 | WsSession + 协调器:列表、状态、停止、多请求 |
| Total | 35 |
Key test patterns
轮询超时机制:集成测试通过轮询协调器状态(例如 DaemonConnected),设置 2 秒截止时间和 20 毫秒休眠间隔,避免不稳定的时序假设。
禁止嵌套运行时:端到端测试在后台 std::thread 上运行协调器并使用独立的 tokio 运行时,而 WsSession(会创建自己的当前线程运行时)则在测试主线程上运行。这样可以避免 “cannot start a runtime from within a runtime” 恐慌错误。
测试中的 u128 变通方案:守护进程测试辅助函数通过 format!() + serde_json::to_string()(而非 serde_json::to_value())手动构造 WsRequest JSON 字符串,以保留线上传输的 uhlc::ID u128 值。
测试协调器设置:集成测试和端到端测试均使用 adora_coordinator::start_testing(),该函数绑定到端口 0(由操作系统分配)并接受空的外部事件流。
Configuration Reference
Constants
| Constant | 值 | File | 用途 |
|---|---|---|---|
MAX_CONTROL_MESSAGE_BYTES | 1 MiB (1,048,576) | ws_server.rs | WebSocket 最大帧大小 |
MAX_WS_CONNECTIONS | 256 | ws_server.rs | Tower concurrency limit |
Server setup
#![allow(unused)]
fn main() {
// Production: called by coordinator's main startup
let (port, shutdown, future) = ws_server::serve(bind_addr, event_tx, clock).await?;
tokio::spawn(future);
// ...
shutdown.shutdown(); // graceful stop
}
Test setup
#![allow(unused)]
fn main() {
// Binds to port 0, returns (port, future)
let (port, future) = adora_coordinator::start_testing(
"127.0.0.1:0".parse().unwrap(),
futures::stream::empty(),
).await?;
}
Shutdown
ShutdownTrigger 封装了一个 oneshot::Sender<()>。调用 .shutdown() 会发送信号,Axum 服务器通过 with_graceful_shutdown 接收该信号。正在处理的请求会继续完成,新的连接将被拒绝。