Rust API 参考
本文档介绍用于构建 Adora 数据流组件的两个主要 Rust crate:
adora-node-api– 用于独立节点可执行文件adora-operator-api– 用于由 Adora 运行时管理的进程内算子
节点 API (adora-node-api)
添加到你的 Cargo.toml:
[dependencies]
adora-node-api = { workspace = true }
AdoraNode
用于发送输出和获取节点信息的主要结构体。通过以下初始化函数之一获取。
初始化
#![allow(unused)]
fn main() {
// 推荐:自动检测环境(守护进程、测试或交互模式)。
pub fn init_from_env() -> NodeResult<(Self, EventStream)>
// 与 init_from_env 相同,但出错时不回退到交互模式。
pub fn init_from_env_force() -> NodeResult<(Self, EventStream)>
// 用于动态节点:通过节点 ID 连接到守护进程。
pub fn init_from_node_id(node_id: NodeId) -> NodeResult<(Self, EventStream)>
// 先尝试 init_from_env;回退到 init_from_node_id。
pub fn init_flexible(node_id: NodeId) -> NodeResult<(Self, EventStream)>
// 独立交互模式(在终端提示输入)。
pub fn init_interactive() -> NodeResult<(Self, EventStream)>
// 使用合成输入/输出的集成测试模式。
pub fn init_testing(
input: TestingInput,
output: TestingOutput,
options: TestingOptions,
) -> NodeResult<(Self, EventStream)>
}
init_from_env 是推荐的入口点。它按顺序检查:
- 由
setup_integration_testing设置的线程局部测试状态 ADORA_NODE_CONFIG环境变量(由守护进程设置)ADORA_TEST_WITH_INPUTS环境变量(基于文件的集成测试)- 交互终端回退(仅当 stdin 为 TTY 时)
发送输出
所有发送方法会静默忽略未在数据流 YAML 中声明的输出 ID。
#![allow(unused)]
fn main() {
// 发送 Arrow 数组。在有利时将数据复制到共享内存。
pub fn send_output(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data: impl Array,
) -> NodeResult<()>
// 发送原始字节。在有利时复制到共享内存。
pub fn send_output_bytes(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data_len: usize,
data: &[u8],
) -> NodeResult<()>
// 通过闭包发送原始字节以实现零拷贝写入。
pub fn send_output_raw<F>(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data_len: usize,
data: F,
) -> NodeResult<()>
where
F: FnOnce(&mut [u8])
// 发送带有显式 Arrow 类型信息的原始字节。
pub fn send_typed_output<F>(
&mut self,
output_id: DataId,
type_info: ArrowTypeInfo,
parameters: MetadataParameters,
data_len: usize,
data: F,
) -> NodeResult<()>
where
F: FnOnce(&mut [u8])
// 发送带有类型信息的预分配 DataSample。
pub fn send_output_sample(
&mut self,
output_id: DataId,
type_info: ArrowTypeInfo,
parameters: MetadataParameters,
sample: Option<DataSample>,
) -> NodeResult<()>
// 将输出 ID 报告为已关闭。不再允许对这些 ID 发送。
pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> NodeResult<()>
}
Service, Action, and Streaming Helpers
Higher-level methods for the communication patterns. These use well-known metadata keys to correlate requests, goals, responses, and streaming segments.
#![allow(unused)]
fn main() {
// 生成唯一的、时间有序的 ID(UUID v7)用于关联。
pub fn new_request_id() -> String
pub fn new_goal_id() -> String // new_request_id 的别名
// 发送服务请求。将 `request_id` 注入参数并返回。
pub fn send_service_request(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data: impl Array,
) -> NodeResult<String>
// 发送服务响应。send_output 的语义别名。
// 调用者必须传递来自传入请求元数据的 request_id。
pub fn send_service_response(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data: impl Array,
) -> NodeResult<()>
}
服务示例(客户端发送请求,服务端回复):
#![allow(unused)]
fn main() {
// 客户端:自动生成并注入 request_id
let rid = node.send_service_request("request".into(), params, data)?;
// Server: pass through metadata.parameters (includes request_id)
node.send_service_response("response".into(), metadata.parameters, result)?;
}
动作示例(客户端发送目标,服务端流式传输反馈 + 结果):
#![allow(unused)]
fn main() {
use adora_node_api::{GOAL_ID, GOAL_STATUS, GOAL_STATUS_SUCCEEDED, Parameter};
// 客户端:生成 goal_id,附加到参数
let goal_id = AdoraNode::new_goal_id();
params.insert(GOAL_ID.to_string(), Parameter::String(goal_id));
node.send_output("goal".into(), params, data)?;
// 服务端:提取 goal_id,发送带 goal_status 的反馈/结果
let gid = get_string_param(&metadata.parameters, GOAL_ID);
}
Streaming example (real-time voice/video pipeline with interruption):
#![allow(unused)]
fn main() {
use adora_node_api::StreamSegment;
// Create a streaming segment builder (auto-generates session_id)
let mut seg = StreamSegment::new();
// Send chunks with auto-incrementing seq
node.send_stream_chunk("text".into(), &mut seg, false, chunk_data)?;
// Mark final chunk of a segment
node.send_stream_chunk("text".into(), &mut seg, true, last_chunk)?;
// On user interruption: flush downstream queues and start a new segment
let flush_params = seg.flush();
node.send_output("text".into(), flush_params, empty_data)?;
}
See patterns.md for the full guide and examples/service-example and examples/action-example for working code.
数据分配
#![allow(unused)]
fn main() {
// 分配给定大小的 DataSample。
// 对 >= ZERO_COPY_THRESHOLD (4096 字节) 的数据使用共享内存。
pub fn allocate_data_sample(&mut self, data_len: usize) -> NodeResult<DataSample>
}
节点信息
#![allow(unused)]
fn main() {
// 数据流 YAML 中的节点 ID。
pub fn id(&self) -> &NodeId
// 本次数据流运行的唯一标识符。
pub fn dataflow_id(&self) -> &DataflowId
// 此节点的输入/输出配置。
pub fn node_config(&self) -> &NodeRunConfig
// 如果此节点在之前退出或故障后被重启则为 true。
pub fn is_restart(&self) -> bool
// 此节点被重启的次数(首次运行为 0)。
pub fn restart_count(&self) -> u32
// 解析后的数据流 YAML 描述符。
pub fn dataflow_descriptor(&self) -> NodeResult<&Descriptor>
}
日志
Rust nodes have two ways to emit structured logs. Both produce identical structured log entries in the daemon.
Option 1: Node API (recommended for most cases)
All log methods emit structured JSONL to stdout, which the daemon parses automatically. Works with min_log_level filtering, send_logs_as routing, and adora/logs subscribers.
#![allow(unused)]
fn main() {
// 通用结构化日志。级别:"error"、"warn"、"info"、"debug"、"trace"。
pub fn log(&self, level: &str, message: &str, target: Option<&str>)
// 带有附加键值字段的结构化日志。
pub fn log_with_fields(
&self,
level: &str,
message: &str,
target: Option<&str>,
fields: Option<&BTreeMap<String, String>>,
)
// 便捷方法(无 target 参数)。
pub fn log_error(&self, message: &str)
pub fn log_warn(&self, message: &str)
pub fn log_info(&self, message: &str)
pub fn log_debug(&self, message: &str)
pub fn log_trace(&self, message: &str)
}
Option 2: Rust tracing crate
When adora’s tracing subscriber is initialized (via init_tracing() or the default feature), tracing::info!() etc. output structured JSON to stdout that the daemon parses identically:
#![allow(unused)]
fn main() {
tracing::info!("Sensor started");
tracing::warn!(sensor_id = "temp-01", "High temperature");
}
Use tracing when you want ecosystem integration (spans, instrumentation, OpenTelemetry). Use node.log_*() when you want explicit control or structured fields as BTreeMap.
| 方法 | Structured? | Fields? | OpenTelemetry? | 最适用于 |
|---|---|---|---|---|
node.log_info(msg) | 是 | 否 | 否 | Quick one-liner |
node.log_with_fields(...) | 是 | Yes (BTreeMap) | 否 | Structured key-value context |
tracing::info!(key = val, msg) | 是 | Yes (spans) | 是 | Ecosystem integration, OTel |
println!() | No (stdout level) | 否 | 否 | Quick debugging |
EventStream
此节点传入事件的异步迭代器。实现了 futures::Stream trait。
收到 Stop 事件后,事件流会自行关闭。节点应在流结束后退出。
#![allow(unused)]
fn main() {
// 阻塞直到下一个事件到达。流关闭时返回 None。
// 使用内部 EventScheduler,可能为公平性重新排序事件。
pub fn recv(&mut self) -> Option<Event>
// 带超时的阻塞。超时时返回 Event::Error。
pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event>
// 带 EventScheduler 重排序的异步接收。
pub async fn recv_async(&mut self) -> Option<Event>
// 带超时的异步接收。超时时返回 Event::Error。
pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event>
// 非阻塞接收。无可用数据时返回 TryRecvError::Empty。
pub fn try_recv(&mut self) -> Result<Event, TryRecvError>
// 非阻塞地排空所有缓冲事件。
// 无可用数据时返回 Some(Vec::new());流关闭时返回 None。
pub fn drain(&mut self) -> Option<Vec<Event>>
// 如果调度器或接收器中没有缓冲事件则为 true。
pub fn is_empty(&self) -> bool
// Returns and resets accumulated drop counts per input ID.
// For `drop_oldest` inputs, drops happen at `queue_size`.
// For `backpressure` inputs, drops happen at 10x `queue_size` (hard safety cap).
pub fn drain_drop_counts(&mut self) -> HashMap<DataId, u64>
}
EventStream 还实现了 futures::Stream<Item = Event>,因此可以与 StreamExt::next() 和其他组合器一起使用。与 recv/recv_async 不同,Stream 实现不使用 EventScheduler,保留事件的时间顺序。
Event
表示传入事件。此枚举为 #[non_exhaustive]——忽略未知变体以保持向前兼容。
#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum Event {
// 从另一个节点接收到输入。
Input {
id: DataId, // YAML 中的输入 ID(非发送者的输出 ID)
metadata: Metadata, // 时间戳和类型信息
data: ArrowData, // Apache Arrow 数据
},
// 映射到此输入的发送者已退出;不会再有数据到达。
InputClosed { id: DataId },
// 先前关闭的输入已恢复(例如,上游节点在超时后恢复)。
InputRecovered { id: DataId },
// 上游节点已重启。适用于重置缓存或状态。
NodeRestarted { id: NodeId },
// 事件流即将关闭。原因见 StopCause。
Stop(StopCause),
// 指示节点重新加载算子(运行时内部使用)。
Reload { operator_id: Option<OperatorId> },
// 意外的内部错误。记录日志用于调试。
Error(String),
}
}
StopCause
#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum StopCause {
// 通过 `adora stop` 或 Ctrl-C 显式停止。请尽快退出,否则将被终止。
Manual,
// 所有输入已关闭(上游节点已退出)。仅在节点有输入时发送。
AllInputsClosed,
}
}
辅助类型
DataSample
适合作为输出消息发送的数据区域。对 >= ZERO_COPY_THRESHOLD 的数据使用共享内存以实现零拷贝传输。
实现了 Deref<Target = [u8]> 和 DerefMut,用于读写底层字节。
Metadata 和 MetadataParameters
#![allow(unused)]
fn main() {
// 附加到每个输入事件的完整元数据。
pub struct Metadata {
// 包含时间戳、Arrow 类型信息和用户自定义参数。
}
// 发送输出时附加的用户控制的元数据字段。
// BTreeMap<String, Parameter> 的类型别名。
// 默认为空。传递输入的 metadata.parameters 以转发元数据。
pub type MetadataParameters = BTreeMap<String, Parameter>;
// 单个元数据参数值。
pub enum Parameter {
Bool(bool), Integer(i64), Float(f64), String(String),
ListInt(Vec<i64>), ListFloat(Vec<f64>), ListString(Vec<String>),
Timestamp(DateTime<Utc>),
}
// Extract typed parameters, returning None if missing or wrong type.
pub fn get_string_param<'a>(params: &'a MetadataParameters, key: &str) -> Option<&'a str>
pub fn get_integer_param(params: &MetadataParameters, key: &str) -> Option<i64>
pub fn get_bool_param(params: &MetadataParameters, key: &str) -> Option<bool>
}
Well-known metadata keys (for communication patterns):
| Constant | 值 | 使用方 |
|---|---|---|
REQUEST_ID | "request_id" | 服务请求/响应关联 |
GOAL_ID | "goal_id" | 动作目标标识 |
GOAL_STATUS | "goal_status" | 动作结果状态 |
GOAL_STATUS_SUCCEEDED | "succeeded" | 目标成功完成 |
GOAL_STATUS_ABORTED | "aborted" | 目标被服务端中止 |
GOAL_STATUS_CANCELED | "canceled" | 目标被客户端取消 |
SESSION_ID | "session_id" | Streaming session identifier |
SEGMENT_ID | "segment_id" | Streaming segment within a session |
SEQ | "seq" | Streaming chunk sequence number |
FIN | "fin" | Last chunk of a streaming segment |
FLUSH | "flush" | Discard older queued messages on input |
所有常量均从 adora_node_api 重新导出。
标识类型
#![allow(unused)]
fn main() {
// 运行中数据流实例的唯一标识符(UUID v4)。
pub struct DataflowId(/* ... */);
// 数据流 YAML 中定义的节点标识符。
pub struct NodeId(/* ... */);
// 数据流 YAML 中定义的输入/输出标识符。
pub struct DataId(/* ... */);
}
错误类型
#![allow(unused)]
fn main() {
#[derive(Debug, Error)]
pub enum NodeError {
Init(String), // 配置解析、环境变量、守护进程握手
Connection(String), // 守护进程连接丢失
Output(String), // 发送或关闭失败
Data(String), // 分配或描述符解析
Internal(eyre::Report), // 意外错误的兜底
}
pub type NodeResult<T> = Result<T, NodeError>;
}
TryRecvError
#![allow(unused)]
fn main() {
pub enum TryRecvError {
Empty, // 当前没有可用事件
Closed, // 事件流已关闭
}
}
ZERO_COPY_THRESHOLD
#![allow(unused)]
fn main() {
pub const ZERO_COPY_THRESHOLD: usize = 4096;
}
小于此阈值的消息通过 TCP 发送。等于或超过此大小的消息使用共享内存进行零拷贝传输。
ArrowData
#![allow(unused)]
fn main() {
// arrow::array::ArrayRef 的包装器。实现了到内部 ArrayRef 的 Deref。
pub struct ArrowData(pub arrow::array::ArrayRef);
}
来自 Event::Input 的数据以 ArrowData 形式到达。使用 TryFrom 转换或 Arrow API 提取类型化的值。
InputTracker
用于跟踪输入健康状态和缓存每个输入最后接收值的辅助工具。在上游节点超时时用于优雅降级。
#![allow(unused)]
fn main() {
pub struct InputTracker { /* ... */ }
impl InputTracker {
pub fn new() -> Self
// 从事件更新状态。如果事件相关则返回 true。
pub fn process_event(&mut self, event: &Event) -> bool
// 输入的当前状态(Healthy 或 Closed),如果被追踪的话。
pub fn state(&self, id: &DataId) -> Option<InputState>
// 如果输入当前已关闭则为 true。
pub fn is_closed(&self, id: &DataId) -> bool
// 输入最后接收到的值。即使关闭后仍可用。
pub fn last_value(&self, id: &DataId) -> Option<&ArrowData>
// 所有当前处于 Closed 状态的输入。
pub fn closed_inputs(&self) -> Vec<&DataId>
// 如果任何被追踪的输入已关闭则为 true。
pub fn any_closed(&self) -> bool
}
pub enum InputState {
Healthy, // 正常接收数据
Closed, // 上游退出或超时
}
}
集成测试
integration_testing 模块提供了无需运行守护进程即可测试节点的工具。
setup_integration_testing
设置线程局部状态,使同一线程上下一次调用 AdoraNode::init_from_env 时以测试模式初始化。
#![allow(unused)]
fn main() {
pub fn setup_integration_testing(
input: TestingInput,
output: TestingOutput,
options: TestingOptions,
)
}
TestingInput
#![allow(unused)]
fn main() {
pub enum TestingInput {
// 从 JSON 文件加载事件(必须反序列化为 IntegrationTestInput)。
FromJsonFile(PathBuf),
// 直接提供事件。
Input(IntegrationTestInput),
}
}
TestingOutput
#![allow(unused)]
fn main() {
pub enum TestingOutput {
// 将输出写入 JSONL 文件(创建或覆盖)。
ToFile(PathBuf),
// 将输出作为 JSONL 写入任意 writer。
ToWriter(Box<dyn std::io::Write + Send>),
// 将每个输出作为 JSON 对象发送到 flume 通道。
ToChannel(flume::Sender<serde_json::Map<String, serde_json::Value>>),
}
}
TestingOptions
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Default)]
pub struct TestingOptions {
// 跳过输出中的时间偏移以进行确定性比较。
pub skip_output_time_offsets: bool,
}
}
环境变量测试
使用 init_from_env 的节点也支持通过环境变量进行基于文件的测试:
| 变量 | 描述 |
|---|---|
ADORA_TEST_WITH_INPUTS | JSON 输入文件的路径(IntegrationTestInput 格式) |
ADORA_TEST_WRITE_OUTPUTS_TO | 输出 JSONL 文件的路径(默认:输入文件旁的 outputs.jsonl) |
ADORA_TEST_NO_OUTPUT_TIME_OFFSET | 如果设置,省略时间偏移以获得确定性输出 |
算子 API (adora-operator-api)
算子是由 Adora 运行时管理的进程内组件。它们被编译为共享库(.so/.dylib/.dll)并由运行时加载。
添加到你的 Cargo.toml:
[dependencies]
adora-operator-api = { workspace = true }
[lib]
crate-type = ["cdylib"]
AdoraOperator Trait
#![allow(unused)]
fn main() {
pub trait AdoraOperator: Default {
fn on_event(
&mut self,
event: &Event,
output_sender: &mut AdoraOutputSender,
) -> Result<AdoraStatus, String>;
}
}
实现此 trait 以定义算子的行为。运行时对每个传入事件调用 on_event。返回 AdoraStatus 以控制执行流程。
Event(算子)
算子的 Event 枚举比节点的 Event 更简单,使用 &str 作为 ID。
#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum Event<'a> {
// 收到一个输入。
Input { id: &'a str, data: ArrowData },
// 将输入数据解析为 Arrow 数组失败。
InputParseError { id: &'a str, error: String },
// 输入被发送者关闭。
InputClosed { id: &'a str },
// 算子应停止。
Stop,
}
}
AdoraOutputSender
#![allow(unused)]
fn main() {
pub struct AdoraOutputSender<'a>(/* ... */);
impl AdoraOutputSender<'_> {
// 发送输出。`id` 是数据流 YAML 中的输出 ID。
pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String>
}
}
AdoraStatus
从 on_event 返回以控制算子生命周期。
#![allow(unused)]
fn main() {
pub enum AdoraStatus {
Continue, // 继续运行,等待下一个事件
Stop, // 停止此算子
StopAll, // 停止整个数据流
}
}
register_operator! 宏
生成 Adora 运行时加载和调用算子所需的 FFI 入口点。
#![allow(unused)]
fn main() {
use adora_operator_api::register_operator;
register_operator!(MyOperator);
}
每个 crate 必须在顶层精确调用一次,传入实现了 AdoraOperator 的类型。
快速开始示例:节点
一个接收 tick 输入并发送随机数作为输出的最小节点。
use adora_node_api::{AdoraNode, Event, IntoArrow, adora_core::config::DataId};
fn main() -> eyre::Result<()> {
let (mut node, mut events) = AdoraNode::init_from_env()?;
let output = DataId::from("random".to_owned());
while let Some(event) = events.recv() {
match event {
Event::Input { id, metadata, data } => {
if id.as_str() == "tick" {
let value: u64 = fastrand::u64(..);
node.send_output(
output.clone(),
metadata.parameters,
value.into_arrow(),
)?;
}
}
Event::Stop(_) => {}
_ => {}
}
}
Ok(())
}
对应的数据流 YAML:
nodes:
- id: timer
path: adora/timer/millis/100
outputs:
- tick
- id: my-node
path: ./target/debug/my-node
inputs:
tick: timer/tick
outputs:
- random
- id: sink
path: ./target/debug/sink
inputs:
data: my-node/random
快速开始示例:算子
一个计数 tick 并转发格式化消息的最小算子。
#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]
fn main() {
use adora_operator_api::{
AdoraOperator, AdoraOutputSender, AdoraStatus, Event, IntoArrow, register_operator,
};
register_operator!(MyOperator);
#[derive(Debug, Default)]
struct MyOperator {
ticks: usize,
}
impl AdoraOperator for MyOperator {
fn on_event(
&mut self,
event: &Event,
output_sender: &mut AdoraOutputSender,
) -> Result<AdoraStatus, String> {
match event {
Event::Input { id, data } => match *id {
"tick" => {
self.ticks += 1;
let msg = format!("tick count: {}", self.ticks);
output_sender.send("status".into(), msg.into_arrow())?;
}
other => eprintln!("ignoring unexpected input {other}"),
},
Event::InputClosed { id } => {
if *id == "tick" {
return Ok(AdoraStatus::Stop);
}
}
Event::Stop => {}
other => {
eprintln!("received unknown event {other:?}");
}
}
Ok(AdoraStatus::Continue)
}
}
}
对应的数据流 YAML:
nodes:
- id: timer
path: adora/timer/millis/500
outputs:
- tick
- id: runtime-node
operator:
shared_library: ./target/debug/libmy_operator
inputs:
tick: timer/tick
outputs:
- status