Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Rust API 参考

本文档介绍用于构建 Adora 数据流组件的两个主要 Rust crate:

  • adora-node-api – 用于独立节点可执行文件
  • adora-operator-api – 用于由 Adora 运行时管理的进程内算子

节点 API (adora-node-api)

添加到你的 Cargo.toml

[dependencies]
adora-node-api = { workspace = true }

AdoraNode

用于发送输出和获取节点信息的主要结构体。通过以下初始化函数之一获取。

初始化

#![allow(unused)]
fn main() {
// 推荐:自动检测环境(守护进程、测试或交互模式)。
pub fn init_from_env() -> NodeResult<(Self, EventStream)>

// 与 init_from_env 相同,但出错时不回退到交互模式。
pub fn init_from_env_force() -> NodeResult<(Self, EventStream)>

// 用于动态节点:通过节点 ID 连接到守护进程。
pub fn init_from_node_id(node_id: NodeId) -> NodeResult<(Self, EventStream)>

// 先尝试 init_from_env;回退到 init_from_node_id。
pub fn init_flexible(node_id: NodeId) -> NodeResult<(Self, EventStream)>

// 独立交互模式(在终端提示输入)。
pub fn init_interactive() -> NodeResult<(Self, EventStream)>

// 使用合成输入/输出的集成测试模式。
pub fn init_testing(
    input: TestingInput,
    output: TestingOutput,
    options: TestingOptions,
) -> NodeResult<(Self, EventStream)>
}

init_from_env 是推荐的入口点。它按顺序检查:

  1. setup_integration_testing 设置的线程局部测试状态
  2. ADORA_NODE_CONFIG 环境变量(由守护进程设置)
  3. ADORA_TEST_WITH_INPUTS 环境变量(基于文件的集成测试)
  4. 交互终端回退(仅当 stdin 为 TTY 时)

发送输出

所有发送方法会静默忽略未在数据流 YAML 中声明的输出 ID。

#![allow(unused)]
fn main() {
// 发送 Arrow 数组。在有利时将数据复制到共享内存。
pub fn send_output(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data: impl Array,
) -> NodeResult<()>

// 发送原始字节。在有利时复制到共享内存。
pub fn send_output_bytes(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data_len: usize,
    data: &[u8],
) -> NodeResult<()>

// 通过闭包发送原始字节以实现零拷贝写入。
pub fn send_output_raw<F>(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data_len: usize,
    data: F,
) -> NodeResult<()>
where
    F: FnOnce(&mut [u8])

// 发送带有显式 Arrow 类型信息的原始字节。
pub fn send_typed_output<F>(
    &mut self,
    output_id: DataId,
    type_info: ArrowTypeInfo,
    parameters: MetadataParameters,
    data_len: usize,
    data: F,
) -> NodeResult<()>
where
    F: FnOnce(&mut [u8])

// 发送带有类型信息的预分配 DataSample。
pub fn send_output_sample(
    &mut self,
    output_id: DataId,
    type_info: ArrowTypeInfo,
    parameters: MetadataParameters,
    sample: Option<DataSample>,
) -> NodeResult<()>

// 将输出 ID 报告为已关闭。不再允许对这些 ID 发送。
pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> NodeResult<()>
}

Service, Action, and Streaming Helpers

Higher-level methods for the communication patterns. These use well-known metadata keys to correlate requests, goals, responses, and streaming segments.

#![allow(unused)]
fn main() {
// 生成唯一的、时间有序的 ID(UUID v7)用于关联。
pub fn new_request_id() -> String
pub fn new_goal_id() -> String   // new_request_id 的别名

// 发送服务请求。将 `request_id` 注入参数并返回。
pub fn send_service_request(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data: impl Array,
) -> NodeResult<String>

// 发送服务响应。send_output 的语义别名。
// 调用者必须传递来自传入请求元数据的 request_id。
pub fn send_service_response(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data: impl Array,
) -> NodeResult<()>
}

服务示例(客户端发送请求,服务端回复):

#![allow(unused)]
fn main() {
// 客户端:自动生成并注入 request_id
let rid = node.send_service_request("request".into(), params, data)?;

// Server: pass through metadata.parameters (includes request_id)
node.send_service_response("response".into(), metadata.parameters, result)?;
}

动作示例(客户端发送目标,服务端流式传输反馈 + 结果):

#![allow(unused)]
fn main() {
use adora_node_api::{GOAL_ID, GOAL_STATUS, GOAL_STATUS_SUCCEEDED, Parameter};

// 客户端:生成 goal_id,附加到参数
let goal_id = AdoraNode::new_goal_id();
params.insert(GOAL_ID.to_string(), Parameter::String(goal_id));
node.send_output("goal".into(), params, data)?;

// 服务端:提取 goal_id,发送带 goal_status 的反馈/结果
let gid = get_string_param(&metadata.parameters, GOAL_ID);
}

Streaming example (real-time voice/video pipeline with interruption):

#![allow(unused)]
fn main() {
use adora_node_api::StreamSegment;

// Create a streaming segment builder (auto-generates session_id)
let mut seg = StreamSegment::new();

// Send chunks with auto-incrementing seq
node.send_stream_chunk("text".into(), &mut seg, false, chunk_data)?;
// Mark final chunk of a segment
node.send_stream_chunk("text".into(), &mut seg, true, last_chunk)?;

// On user interruption: flush downstream queues and start a new segment
let flush_params = seg.flush();
node.send_output("text".into(), flush_params, empty_data)?;
}

See patterns.md for the full guide and examples/service-example and examples/action-example for working code.

数据分配

#![allow(unused)]
fn main() {
// 分配给定大小的 DataSample。
// 对 >= ZERO_COPY_THRESHOLD (4096 字节) 的数据使用共享内存。
pub fn allocate_data_sample(&mut self, data_len: usize) -> NodeResult<DataSample>
}

节点信息

#![allow(unused)]
fn main() {
// 数据流 YAML 中的节点 ID。
pub fn id(&self) -> &NodeId

// 本次数据流运行的唯一标识符。
pub fn dataflow_id(&self) -> &DataflowId

// 此节点的输入/输出配置。
pub fn node_config(&self) -> &NodeRunConfig

// 如果此节点在之前退出或故障后被重启则为 true。
pub fn is_restart(&self) -> bool

// 此节点被重启的次数(首次运行为 0)。
pub fn restart_count(&self) -> u32

// 解析后的数据流 YAML 描述符。
pub fn dataflow_descriptor(&self) -> NodeResult<&Descriptor>
}

日志

Rust nodes have two ways to emit structured logs. Both produce identical structured log entries in the daemon.

Option 1: Node API (recommended for most cases)

All log methods emit structured JSONL to stdout, which the daemon parses automatically. Works with min_log_level filtering, send_logs_as routing, and adora/logs subscribers.

#![allow(unused)]
fn main() {
// 通用结构化日志。级别:"error"、"warn"、"info"、"debug"、"trace"。
pub fn log(&self, level: &str, message: &str, target: Option<&str>)

// 带有附加键值字段的结构化日志。
pub fn log_with_fields(
    &self,
    level: &str,
    message: &str,
    target: Option<&str>,
    fields: Option<&BTreeMap<String, String>>,
)

// 便捷方法(无 target 参数)。
pub fn log_error(&self, message: &str)
pub fn log_warn(&self, message: &str)
pub fn log_info(&self, message: &str)
pub fn log_debug(&self, message: &str)
pub fn log_trace(&self, message: &str)
}

Option 2: Rust tracing crate

When adora’s tracing subscriber is initialized (via init_tracing() or the default feature), tracing::info!() etc. output structured JSON to stdout that the daemon parses identically:

#![allow(unused)]
fn main() {
tracing::info!("Sensor started");
tracing::warn!(sensor_id = "temp-01", "High temperature");
}

Use tracing when you want ecosystem integration (spans, instrumentation, OpenTelemetry). Use node.log_*() when you want explicit control or structured fields as BTreeMap.

方法Structured?Fields?OpenTelemetry?最适用于
node.log_info(msg)Quick one-liner
node.log_with_fields(...)Yes (BTreeMap)Structured key-value context
tracing::info!(key = val, msg)Yes (spans)Ecosystem integration, OTel
println!()No (stdout level)Quick debugging

EventStream

此节点传入事件的异步迭代器。实现了 futures::Stream trait。

收到 Stop 事件后,事件流会自行关闭。节点应在流结束后退出。

#![allow(unused)]
fn main() {
// 阻塞直到下一个事件到达。流关闭时返回 None。
// 使用内部 EventScheduler,可能为公平性重新排序事件。
pub fn recv(&mut self) -> Option<Event>

// 带超时的阻塞。超时时返回 Event::Error。
pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event>

// 带 EventScheduler 重排序的异步接收。
pub async fn recv_async(&mut self) -> Option<Event>

// 带超时的异步接收。超时时返回 Event::Error。
pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event>

// 非阻塞接收。无可用数据时返回 TryRecvError::Empty。
pub fn try_recv(&mut self) -> Result<Event, TryRecvError>

// 非阻塞地排空所有缓冲事件。
// 无可用数据时返回 Some(Vec::new());流关闭时返回 None。
pub fn drain(&mut self) -> Option<Vec<Event>>

// 如果调度器或接收器中没有缓冲事件则为 true。
pub fn is_empty(&self) -> bool

// Returns and resets accumulated drop counts per input ID.
// For `drop_oldest` inputs, drops happen at `queue_size`.
// For `backpressure` inputs, drops happen at 10x `queue_size` (hard safety cap).
pub fn drain_drop_counts(&mut self) -> HashMap<DataId, u64>
}

EventStream 还实现了 futures::Stream<Item = Event>,因此可以与 StreamExt::next() 和其他组合器一起使用。与 recv/recv_async 不同,Stream 实现使用 EventScheduler,保留事件的时间顺序。


Event

表示传入事件。此枚举为 #[non_exhaustive]——忽略未知变体以保持向前兼容。

#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum Event {
    // 从另一个节点接收到输入。
    Input {
        id: DataId,           // YAML 中的输入 ID(非发送者的输出 ID)
        metadata: Metadata,   // 时间戳和类型信息
        data: ArrowData,      // Apache Arrow 数据
    },

    // 映射到此输入的发送者已退出;不会再有数据到达。
    InputClosed { id: DataId },

    // 先前关闭的输入已恢复(例如,上游节点在超时后恢复)。
    InputRecovered { id: DataId },

    // 上游节点已重启。适用于重置缓存或状态。
    NodeRestarted { id: NodeId },

    // 事件流即将关闭。原因见 StopCause。
    Stop(StopCause),

    // 指示节点重新加载算子(运行时内部使用)。
    Reload { operator_id: Option<OperatorId> },

    // 意外的内部错误。记录日志用于调试。
    Error(String),
}
}

StopCause

#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum StopCause {
    // 通过 `adora stop` 或 Ctrl-C 显式停止。请尽快退出,否则将被终止。
    Manual,

    // 所有输入已关闭(上游节点已退出)。仅在节点有输入时发送。
    AllInputsClosed,
}
}

辅助类型

DataSample

适合作为输出消息发送的数据区域。对 >= ZERO_COPY_THRESHOLD 的数据使用共享内存以实现零拷贝传输。

实现了 Deref<Target = [u8]>DerefMut,用于读写底层字节。

Metadata 和 MetadataParameters

#![allow(unused)]
fn main() {
// 附加到每个输入事件的完整元数据。
pub struct Metadata {
    // 包含时间戳、Arrow 类型信息和用户自定义参数。
}

// 发送输出时附加的用户控制的元数据字段。
// BTreeMap<String, Parameter> 的类型别名。
// 默认为空。传递输入的 metadata.parameters 以转发元数据。
pub type MetadataParameters = BTreeMap<String, Parameter>;

// 单个元数据参数值。
pub enum Parameter {
    Bool(bool), Integer(i64), Float(f64), String(String),
    ListInt(Vec<i64>), ListFloat(Vec<f64>), ListString(Vec<String>),
    Timestamp(DateTime<Utc>),
}

// Extract typed parameters, returning None if missing or wrong type.
pub fn get_string_param<'a>(params: &'a MetadataParameters, key: &str) -> Option<&'a str>
pub fn get_integer_param(params: &MetadataParameters, key: &str) -> Option<i64>
pub fn get_bool_param(params: &MetadataParameters, key: &str) -> Option<bool>
}

Well-known metadata keys (for communication patterns):

Constant使用方
REQUEST_ID"request_id"服务请求/响应关联
GOAL_ID"goal_id"动作目标标识
GOAL_STATUS"goal_status"动作结果状态
GOAL_STATUS_SUCCEEDED"succeeded"目标成功完成
GOAL_STATUS_ABORTED"aborted"目标被服务端中止
GOAL_STATUS_CANCELED"canceled"目标被客户端取消
SESSION_ID"session_id"Streaming session identifier
SEGMENT_ID"segment_id"Streaming segment within a session
SEQ"seq"Streaming chunk sequence number
FIN"fin"Last chunk of a streaming segment
FLUSH"flush"Discard older queued messages on input

所有常量均从 adora_node_api 重新导出。

标识类型

#![allow(unused)]
fn main() {
// 运行中数据流实例的唯一标识符(UUID v4)。
pub struct DataflowId(/* ... */);

// 数据流 YAML 中定义的节点标识符。
pub struct NodeId(/* ... */);

// 数据流 YAML 中定义的输入/输出标识符。
pub struct DataId(/* ... */);
}

错误类型

#![allow(unused)]
fn main() {
#[derive(Debug, Error)]
pub enum NodeError {
    Init(String),        // 配置解析、环境变量、守护进程握手
    Connection(String),  // 守护进程连接丢失
    Output(String),      // 发送或关闭失败
    Data(String),        // 分配或描述符解析
    Internal(eyre::Report),  // 意外错误的兜底
}

pub type NodeResult<T> = Result<T, NodeError>;
}

TryRecvError

#![allow(unused)]
fn main() {
pub enum TryRecvError {
    Empty,   // 当前没有可用事件
    Closed,  // 事件流已关闭
}
}

ZERO_COPY_THRESHOLD

#![allow(unused)]
fn main() {
pub const ZERO_COPY_THRESHOLD: usize = 4096;
}

小于此阈值的消息通过 TCP 发送。等于或超过此大小的消息使用共享内存进行零拷贝传输。

ArrowData

#![allow(unused)]
fn main() {
// arrow::array::ArrayRef 的包装器。实现了到内部 ArrayRef 的 Deref。
pub struct ArrowData(pub arrow::array::ArrayRef);
}

来自 Event::Input 的数据以 ArrowData 形式到达。使用 TryFrom 转换或 Arrow API 提取类型化的值。


InputTracker

用于跟踪输入健康状态和缓存每个输入最后接收值的辅助工具。在上游节点超时时用于优雅降级。

#![allow(unused)]
fn main() {
pub struct InputTracker { /* ... */ }

impl InputTracker {
    pub fn new() -> Self

    // 从事件更新状态。如果事件相关则返回 true。
    pub fn process_event(&mut self, event: &Event) -> bool

    // 输入的当前状态(Healthy 或 Closed),如果被追踪的话。
    pub fn state(&self, id: &DataId) -> Option<InputState>

    // 如果输入当前已关闭则为 true。
    pub fn is_closed(&self, id: &DataId) -> bool

    // 输入最后接收到的值。即使关闭后仍可用。
    pub fn last_value(&self, id: &DataId) -> Option<&ArrowData>

    // 所有当前处于 Closed 状态的输入。
    pub fn closed_inputs(&self) -> Vec<&DataId>

    // 如果任何被追踪的输入已关闭则为 true。
    pub fn any_closed(&self) -> bool
}

pub enum InputState {
    Healthy,  // 正常接收数据
    Closed,   // 上游退出或超时
}
}

集成测试

integration_testing 模块提供了无需运行守护进程即可测试节点的工具。

setup_integration_testing

设置线程局部状态,使同一线程上下一次调用 AdoraNode::init_from_env 时以测试模式初始化。

#![allow(unused)]
fn main() {
pub fn setup_integration_testing(
    input: TestingInput,
    output: TestingOutput,
    options: TestingOptions,
)
}

TestingInput

#![allow(unused)]
fn main() {
pub enum TestingInput {
    // 从 JSON 文件加载事件(必须反序列化为 IntegrationTestInput)。
    FromJsonFile(PathBuf),

    // 直接提供事件。
    Input(IntegrationTestInput),
}
}

TestingOutput

#![allow(unused)]
fn main() {
pub enum TestingOutput {
    // 将输出写入 JSONL 文件(创建或覆盖)。
    ToFile(PathBuf),

    // 将输出作为 JSONL 写入任意 writer。
    ToWriter(Box<dyn std::io::Write + Send>),

    // 将每个输出作为 JSON 对象发送到 flume 通道。
    ToChannel(flume::Sender<serde_json::Map<String, serde_json::Value>>),
}
}

TestingOptions

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Default)]
pub struct TestingOptions {
    // 跳过输出中的时间偏移以进行确定性比较。
    pub skip_output_time_offsets: bool,
}
}

环境变量测试

使用 init_from_env 的节点也支持通过环境变量进行基于文件的测试:

变量描述
ADORA_TEST_WITH_INPUTSJSON 输入文件的路径(IntegrationTestInput 格式)
ADORA_TEST_WRITE_OUTPUTS_TO输出 JSONL 文件的路径(默认:输入文件旁的 outputs.jsonl
ADORA_TEST_NO_OUTPUT_TIME_OFFSET如果设置,省略时间偏移以获得确定性输出

算子 API (adora-operator-api)

算子是由 Adora 运行时管理的进程内组件。它们被编译为共享库(.so/.dylib/.dll)并由运行时加载。

添加到你的 Cargo.toml

[dependencies]
adora-operator-api = { workspace = true }

[lib]
crate-type = ["cdylib"]

AdoraOperator Trait

#![allow(unused)]
fn main() {
pub trait AdoraOperator: Default {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut AdoraOutputSender,
    ) -> Result<AdoraStatus, String>;
}
}

实现此 trait 以定义算子的行为。运行时对每个传入事件调用 on_event。返回 AdoraStatus 以控制执行流程。

Event(算子)

算子的 Event 枚举比节点的 Event 更简单,使用 &str 作为 ID。

#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum Event<'a> {
    // 收到一个输入。
    Input { id: &'a str, data: ArrowData },

    // 将输入数据解析为 Arrow 数组失败。
    InputParseError { id: &'a str, error: String },

    // 输入被发送者关闭。
    InputClosed { id: &'a str },

    // 算子应停止。
    Stop,
}
}

AdoraOutputSender

#![allow(unused)]
fn main() {
pub struct AdoraOutputSender<'a>(/* ... */);

impl AdoraOutputSender<'_> {
    // 发送输出。`id` 是数据流 YAML 中的输出 ID。
    pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String>
}
}

AdoraStatus

on_event 返回以控制算子生命周期。

#![allow(unused)]
fn main() {
pub enum AdoraStatus {
    Continue,  // 继续运行,等待下一个事件
    Stop,      // 停止此算子
    StopAll,   // 停止整个数据流
}
}

register_operator! 宏

生成 Adora 运行时加载和调用算子所需的 FFI 入口点。

#![allow(unused)]
fn main() {
use adora_operator_api::register_operator;

register_operator!(MyOperator);
}

每个 crate 必须在顶层精确调用一次,传入实现了 AdoraOperator 的类型。


快速开始示例:节点

一个接收 tick 输入并发送随机数作为输出的最小节点。

use adora_node_api::{AdoraNode, Event, IntoArrow, adora_core::config::DataId};

fn main() -> eyre::Result<()> {
    let (mut node, mut events) = AdoraNode::init_from_env()?;

    let output = DataId::from("random".to_owned());

    while let Some(event) = events.recv() {
        match event {
            Event::Input { id, metadata, data } => {
                if id.as_str() == "tick" {
                    let value: u64 = fastrand::u64(..);
                    node.send_output(
                        output.clone(),
                        metadata.parameters,
                        value.into_arrow(),
                    )?;
                }
            }
            Event::Stop(_) => {}
            _ => {}
        }
    }

    Ok(())
}

对应的数据流 YAML:

nodes:
  - id: timer
    path: adora/timer/millis/100
    outputs:
      - tick

  - id: my-node
    path: ./target/debug/my-node
    inputs:
      tick: timer/tick
    outputs:
      - random

  - id: sink
    path: ./target/debug/sink
    inputs:
      data: my-node/random

快速开始示例:算子

一个计数 tick 并转发格式化消息的最小算子。

#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]

fn main() {
use adora_operator_api::{
    AdoraOperator, AdoraOutputSender, AdoraStatus, Event, IntoArrow, register_operator,
};

register_operator!(MyOperator);

#[derive(Debug, Default)]
struct MyOperator {
    ticks: usize,
}

impl AdoraOperator for MyOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut AdoraOutputSender,
    ) -> Result<AdoraStatus, String> {
        match event {
            Event::Input { id, data } => match *id {
                "tick" => {
                    self.ticks += 1;
                    let msg = format!("tick count: {}", self.ticks);
                    output_sender.send("status".into(), msg.into_arrow())?;
                }
                other => eprintln!("ignoring unexpected input {other}"),
            },
            Event::InputClosed { id } => {
                if *id == "tick" {
                    return Ok(AdoraStatus::Stop);
                }
            }
            Event::Stop => {}
            other => {
                eprintln!("received unknown event {other:?}");
            }
        }

        Ok(AdoraStatus::Continue)
    }
}
}

对应的数据流 YAML:

nodes:
  - id: timer
    path: adora/timer/millis/500
    outputs:
      - tick

  - id: runtime-node
    operator:
      shared_library: ./target/debug/libmy_operator
      inputs:
        tick: timer/tick
      outputs:
        - status