Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Rust API Reference

This document covers the two main Rust crates for building Adora dataflow components:

  • adora-node-api – for standalone node executables
  • adora-operator-api – for in-process operators managed by the Adora runtime

Node API (adora-node-api)

Add to your Cargo.toml:

[dependencies]
adora-node-api = { workspace = true }

AdoraNode

The primary struct for sending outputs and retrieving node information. Obtained through one of the initialization functions below.

Initialization

#![allow(unused)]
fn main() {
// Recommended: auto-detect environment (daemon, testing, or interactive).
pub fn init_from_env() -> NodeResult<(Self, EventStream)>

// Same as init_from_env but errors instead of falling back to interactive mode.
pub fn init_from_env_force() -> NodeResult<(Self, EventStream)>

// For dynamic nodes: connect to the daemon by node ID.
pub fn init_from_node_id(node_id: NodeId) -> NodeResult<(Self, EventStream)>

// Try init_from_env first; fall back to init_from_node_id.
pub fn init_flexible(node_id: NodeId) -> NodeResult<(Self, EventStream)>

// Standalone interactive mode (prompts for inputs on the terminal).
pub fn init_interactive() -> NodeResult<(Self, EventStream)>

// Integration test mode with synthetic inputs/outputs.
pub fn init_testing(
    input: TestingInput,
    output: TestingOutput,
    options: TestingOptions,
) -> NodeResult<(Self, EventStream)>
}

init_from_env is the recommended entry point. It checks, in order:

  1. Thread-local testing state set by setup_integration_testing
  2. ADORA_NODE_CONFIG environment variable (set by the daemon)
  3. ADORA_TEST_WITH_INPUTS environment variable (file-based integration testing)
  4. Interactive terminal fallback (only if stdin is a TTY)

Sending Outputs

All send methods silently ignore output IDs not declared in the dataflow YAML.

#![allow(unused)]
fn main() {
// Send an Arrow array. Copies data into shared memory when beneficial.
pub fn send_output(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data: impl Array,
) -> NodeResult<()>

// Send raw bytes. Copies into shared memory when beneficial.
pub fn send_output_bytes(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data_len: usize,
    data: &[u8],
) -> NodeResult<()>

// Send raw bytes via a closure for zero-copy writing.
pub fn send_output_raw<F>(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data_len: usize,
    data: F,
) -> NodeResult<()>
where
    F: FnOnce(&mut [u8])

// Send raw bytes with explicit Arrow type information.
pub fn send_typed_output<F>(
    &mut self,
    output_id: DataId,
    type_info: ArrowTypeInfo,
    parameters: MetadataParameters,
    data_len: usize,
    data: F,
) -> NodeResult<()>
where
    F: FnOnce(&mut [u8])

// Send a pre-allocated DataSample with type information.
pub fn send_output_sample(
    &mut self,
    output_id: DataId,
    type_info: ArrowTypeInfo,
    parameters: MetadataParameters,
    sample: Option<DataSample>,
) -> NodeResult<()>

// Report output IDs as closed. No further sends allowed for those IDs.
pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> NodeResult<()>
}

Service, Action, and Streaming Helpers

Higher-level methods for the communication patterns. These use well-known metadata keys to correlate requests, goals, responses, and streaming segments.

#![allow(unused)]
fn main() {
// Generate a unique, time-ordered ID (UUID v7) for correlation.
pub fn new_request_id() -> String
pub fn new_goal_id() -> String   // alias for new_request_id

// Send a service request. Injects a `request_id` into parameters and returns it.
pub fn send_service_request(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data: impl Array,
) -> NodeResult<String>

// Send a service response. Semantic alias for send_output.
// Caller must pass through the request_id from the incoming request's metadata.
pub fn send_service_response(
    &mut self,
    output_id: DataId,
    parameters: MetadataParameters,
    data: impl Array,
) -> NodeResult<()>
}

Service example (client sends request, server replies):

#![allow(unused)]
fn main() {
// Client: auto-generates and injects request_id
let rid = node.send_service_request("request".into(), params, data)?;

// Server: pass through metadata.parameters (includes request_id)
node.send_service_response("response".into(), metadata.parameters, result)?;
}

Action example (client sends goal, server streams feedback + result):

#![allow(unused)]
fn main() {
use adora_node_api::{GOAL_ID, GOAL_STATUS, GOAL_STATUS_SUCCEEDED, Parameter};

// Client: generate goal_id, attach to params
let goal_id = AdoraNode::new_goal_id();
params.insert(GOAL_ID.to_string(), Parameter::String(goal_id));
node.send_output("goal".into(), params, data)?;

// Server: extract goal_id, send feedback/result with goal_status
let gid = get_string_param(&metadata.parameters, GOAL_ID);
}

Streaming example (real-time voice/video pipeline with interruption):

#![allow(unused)]
fn main() {
use adora_node_api::StreamSegment;

// Create a streaming segment builder (auto-generates session_id)
let mut seg = StreamSegment::new();

// Send chunks with auto-incrementing seq
node.send_stream_chunk("text".into(), &mut seg, false, chunk_data)?;
// Mark final chunk of a segment
node.send_stream_chunk("text".into(), &mut seg, true, last_chunk)?;

// On user interruption: flush downstream queues and start a new segment
let flush_params = seg.flush();
node.send_output("text".into(), flush_params, empty_data)?;
}

See patterns.md for the full guide and examples/service-example and examples/action-example for working code.

Data Allocation

#![allow(unused)]
fn main() {
// Allocate a DataSample of the given size.
// Uses shared memory for data >= ZERO_COPY_THRESHOLD (4096 bytes).
pub fn allocate_data_sample(&mut self, data_len: usize) -> NodeResult<DataSample>
}

Node Information

#![allow(unused)]
fn main() {
// Node ID from the dataflow YAML.
pub fn id(&self) -> &NodeId

// Unique identifier for this dataflow run.
pub fn dataflow_id(&self) -> &DataflowId

// Input/output configuration for this node.
pub fn node_config(&self) -> &NodeRunConfig

// True if this node was restarted after a previous exit or failure.
pub fn is_restart(&self) -> bool

// Number of times this node has been restarted (0 on first run).
pub fn restart_count(&self) -> u32

// Parsed dataflow YAML descriptor.
pub fn dataflow_descriptor(&self) -> NodeResult<&Descriptor>
}

Logging

Rust nodes have two ways to emit structured logs. Both produce identical structured log entries in the daemon.

Option 1: Node API (recommended for most cases)

All log methods emit structured JSONL to stdout, which the daemon parses automatically. Works with min_log_level filtering, send_logs_as routing, and adora/logs subscribers.

#![allow(unused)]
fn main() {
// General structured log. Level: "error", "warn", "info", "debug", "trace".
pub fn log(&self, level: &str, message: &str, target: Option<&str>)

// Structured log with additional key-value fields.
pub fn log_with_fields(
    &self,
    level: &str,
    message: &str,
    target: Option<&str>,
    fields: Option<&BTreeMap<String, String>>,
)

// Convenience methods (no target parameter).
pub fn log_error(&self, message: &str)
pub fn log_warn(&self, message: &str)
pub fn log_info(&self, message: &str)
pub fn log_debug(&self, message: &str)
pub fn log_trace(&self, message: &str)
}

Option 2: Rust tracing crate

When adora’s tracing subscriber is initialized (via init_tracing() or the default feature), tracing::info!() etc. output structured JSON to stdout that the daemon parses identically:

#![allow(unused)]
fn main() {
tracing::info!("Sensor started");
tracing::warn!(sensor_id = "temp-01", "High temperature");
}

Use tracing when you want ecosystem integration (spans, instrumentation, OpenTelemetry). Use node.log_*() when you want explicit control or structured fields as BTreeMap.

MethodStructured?Fields?OpenTelemetry?Best for
node.log_info(msg)YesNoNoQuick one-liner
node.log_with_fields(...)YesYes (BTreeMap)NoStructured key-value context
tracing::info!(key = val, msg)YesYes (spans)YesEcosystem integration, OTel
println!()No (stdout level)NoNoQuick debugging

EventStream

Asynchronous iterator over incoming events destined for this node. Implements the futures::Stream trait.

The event stream closes itself after a Stop event is received. Nodes should exit once the stream ends.

#![allow(unused)]
fn main() {
// Block until the next event arrives. Returns None when the stream closes.
// Uses an internal EventScheduler that may reorder events for fairness.
pub fn recv(&mut self) -> Option<Event>

// Block with a timeout. Returns an Event::Error on timeout.
pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event>

// Async receive with EventScheduler reordering.
pub async fn recv_async(&mut self) -> Option<Event>

// Async receive with a timeout. Returns Event::Error on timeout.
pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event>

// Non-blocking receive. Returns TryRecvError::Empty if nothing is ready.
pub fn try_recv(&mut self) -> Result<Event, TryRecvError>

// Drain all buffered events without blocking.
// Returns Some(Vec::new()) if nothing is ready; None if the stream is closed.
pub fn drain(&mut self) -> Option<Vec<Event>>

// True if no events are buffered in the scheduler or receiver.
pub fn is_empty(&self) -> bool

// Returns and resets accumulated drop counts per input ID.
// For `drop_oldest` inputs, drops happen at `queue_size`.
// For `backpressure` inputs, drops happen at 10x `queue_size` (hard safety cap).
pub fn drain_drop_counts(&mut self) -> HashMap<DataId, u64>
}

EventStream also implements futures::Stream<Item = Event>, so it can be used with StreamExt::next() and other combinators. Unlike recv/recv_async, the Stream implementation does not use the EventScheduler, preserving chronological event order.


Event

Represents an incoming event. This enum is #[non_exhaustive] – ignore unknown variants to stay forward-compatible.

#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum Event {
    // An input was received from another node.
    Input {
        id: DataId,           // input ID from the YAML (not the sender's output ID)
        metadata: Metadata,   // timestamp and type information
        data: ArrowData,      // Apache Arrow data
    },

    // The sender mapped to this input exited; no more data will arrive.
    InputClosed { id: DataId },

    // A previously closed input recovered (e.g., upstream node came back after timeout).
    InputRecovered { id: DataId },

    // An upstream node has restarted. Useful for resetting caches or state.
    NodeRestarted { id: NodeId },

    // The event stream is about to close. See StopCause for the reason.
    Stop(StopCause),

    // Instructs the node to reload an operator (used internally by the runtime).
    Reload { operator_id: Option<OperatorId> },

    // An unexpected internal error. Log it for debugging.
    Error(String),
}
}

StopCause

#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum StopCause {
    // Explicit stop via `adora stop` or Ctrl-C. Exit promptly or be killed.
    Manual,

    // All inputs were closed (upstream nodes exited). Only sent if the node has inputs.
    AllInputsClosed,
}
}

Supporting Types

DataSample

A data region suitable for sending as an output message. Uses shared memory for data >= ZERO_COPY_THRESHOLD to enable zero-copy transfer.

Implements Deref<Target = [u8]> and DerefMut for reading and writing the underlying bytes.

Metadata and MetadataParameters

#![allow(unused)]
fn main() {
// Full metadata attached to every input event.
pub struct Metadata {
    // Contains timestamp, Arrow type info, and user-defined parameters.
}

// User-controlled metadata fields attached when sending outputs.
// Type alias for BTreeMap<String, Parameter>.
// Default is empty. Pass metadata.parameters from an input to forward metadata.
pub type MetadataParameters = BTreeMap<String, Parameter>;

// A single metadata parameter value.
pub enum Parameter {
    Bool(bool), Integer(i64), Float(f64), String(String),
    ListInt(Vec<i64>), ListFloat(Vec<f64>), ListString(Vec<String>),
    Timestamp(DateTime<Utc>),
}

// Extract typed parameters, returning None if missing or wrong type.
pub fn get_string_param<'a>(params: &'a MetadataParameters, key: &str) -> Option<&'a str>
pub fn get_integer_param(params: &MetadataParameters, key: &str) -> Option<i64>
pub fn get_bool_param(params: &MetadataParameters, key: &str) -> Option<bool>
}

Well-known metadata keys (for communication patterns):

ConstantValueUsed by
REQUEST_ID"request_id"Service request/response correlation
GOAL_ID"goal_id"Action goal identification
GOAL_STATUS"goal_status"Action result status
GOAL_STATUS_SUCCEEDED"succeeded"Goal completed successfully
GOAL_STATUS_ABORTED"aborted"Goal aborted by server
GOAL_STATUS_CANCELED"canceled"Goal canceled by client
SESSION_ID"session_id"Streaming session identifier
SEGMENT_ID"segment_id"Streaming segment within a session
SEQ"seq"Streaming chunk sequence number
FIN"fin"Last chunk of a streaming segment
FLUSH"flush"Discard older queued messages on input

All constants are re-exported from adora_node_api.

Identity Types

#![allow(unused)]
fn main() {
// Unique identifier for a running dataflow instance (UUID v4).
pub struct DataflowId(/* ... */);

// Node identifier, as defined in the dataflow YAML.
pub struct NodeId(/* ... */);

// Input/output identifier, as defined in the dataflow YAML.
pub struct DataId(/* ... */);
}

Error Types

#![allow(unused)]
fn main() {
#[derive(Debug, Error)]
pub enum NodeError {
    Init(String),        // config parsing, env vars, daemon handshake
    Connection(String),  // daemon connection lost
    Output(String),      // send or close failure
    Data(String),        // allocation or descriptor parsing
    Internal(eyre::Report),  // catch-all for unexpected errors
}

pub type NodeResult<T> = Result<T, NodeError>;
}

TryRecvError

#![allow(unused)]
fn main() {
pub enum TryRecvError {
    Empty,   // no event available right now
    Closed,  // event stream has been closed
}
}

ZERO_COPY_THRESHOLD

#![allow(unused)]
fn main() {
pub const ZERO_COPY_THRESHOLD: usize = 4096;
}

Messages smaller than this threshold are sent via TCP. Messages at or above this size use shared memory for zero-copy transfer.

ArrowData

#![allow(unused)]
fn main() {
// Wrapper around arrow::array::ArrayRef. Implements Deref to the inner ArrayRef.
pub struct ArrowData(pub arrow::array::ArrayRef);
}

Data from Event::Input arrives as ArrowData. Use TryFrom conversions or Arrow APIs to extract typed values.


InputTracker

Helper for tracking input health and caching the last received value per input. Useful for graceful degradation when upstream nodes time out.

#![allow(unused)]
fn main() {
pub struct InputTracker { /* ... */ }

impl InputTracker {
    pub fn new() -> Self

    // Update state from an event. Returns true if the event was relevant.
    pub fn process_event(&mut self, event: &Event) -> bool

    // Current state of an input (Healthy or Closed), if tracked.
    pub fn state(&self, id: &DataId) -> Option<InputState>

    // True if the input is currently closed.
    pub fn is_closed(&self, id: &DataId) -> bool

    // Last received value for an input. Available even when closed.
    pub fn last_value(&self, id: &DataId) -> Option<&ArrowData>

    // All inputs currently in Closed state.
    pub fn closed_inputs(&self) -> Vec<&DataId>

    // True if any tracked input is closed.
    pub fn any_closed(&self) -> bool
}

pub enum InputState {
    Healthy,  // receiving data normally
    Closed,   // upstream exited or timed out
}
}

Integration Testing

The integration_testing module provides tools for testing nodes without a running daemon.

setup_integration_testing

Sets up thread-local state so that the next call to AdoraNode::init_from_env on the same thread initializes in test mode.

#![allow(unused)]
fn main() {
pub fn setup_integration_testing(
    input: TestingInput,
    output: TestingOutput,
    options: TestingOptions,
)
}

TestingInput

#![allow(unused)]
fn main() {
pub enum TestingInput {
    // Load events from a JSON file (must deserialize to IntegrationTestInput).
    FromJsonFile(PathBuf),

    // Provide events directly.
    Input(IntegrationTestInput),
}
}

TestingOutput

#![allow(unused)]
fn main() {
pub enum TestingOutput {
    // Write outputs to a JSONL file (created or overwritten).
    ToFile(PathBuf),

    // Write outputs as JSONL to any writer.
    ToWriter(Box<dyn std::io::Write + Send>),

    // Send each output as a JSON object to a flume channel.
    ToChannel(flume::Sender<serde_json::Map<String, serde_json::Value>>),
}
}

TestingOptions

#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Default)]
pub struct TestingOptions {
    // Skip time offsets in outputs for deterministic comparison.
    pub skip_output_time_offsets: bool,
}
}

Environment Variable Testing

Nodes using init_from_env also support file-based testing via environment variables:

VariableDescription
ADORA_TEST_WITH_INPUTSPath to a JSON input file (IntegrationTestInput format)
ADORA_TEST_WRITE_OUTPUTS_TOPath for the output JSONL file (default: outputs.jsonl next to inputs)
ADORA_TEST_NO_OUTPUT_TIME_OFFSETIf set, omit time offsets for deterministic outputs

Operator API (adora-operator-api)

Operators are in-process components managed by the Adora runtime. They are compiled as shared libraries (.so/.dylib/.dll) and loaded by the runtime.

Add to your Cargo.toml:

[dependencies]
adora-operator-api = { workspace = true }

[lib]
crate-type = ["cdylib"]

AdoraOperator Trait

#![allow(unused)]
fn main() {
pub trait AdoraOperator: Default {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut AdoraOutputSender,
    ) -> Result<AdoraStatus, String>;
}
}

Implement this trait to define your operator’s behavior. The runtime calls on_event for each incoming event. Return AdoraStatus to control execution flow.

Event (Operator)

The operator Event enum is simpler than the node Event and uses &str for IDs.

#![allow(unused)]
fn main() {
#[non_exhaustive]
pub enum Event<'a> {
    // An input was received.
    Input { id: &'a str, data: ArrowData },

    // Failed to parse the input data as an Arrow array.
    InputParseError { id: &'a str, error: String },

    // An input was closed by the sender.
    InputClosed { id: &'a str },

    // The operator should stop.
    Stop,
}
}

AdoraOutputSender

#![allow(unused)]
fn main() {
pub struct AdoraOutputSender<'a>(/* ... */);

impl AdoraOutputSender<'_> {
    // Send an output. `id` is the output ID from your dataflow YAML.
    pub fn send(&mut self, id: String, data: impl Array) -> Result<(), String>
}
}

AdoraStatus

Returned from on_event to control the operator lifecycle.

#![allow(unused)]
fn main() {
pub enum AdoraStatus {
    Continue,  // keep running, wait for the next event
    Stop,      // stop this operator
    StopAll,   // stop the entire dataflow
}
}

register_operator! Macro

Generates the FFI entry points required by the Adora runtime to load and call your operator.

#![allow(unused)]
fn main() {
use adora_operator_api::register_operator;

register_operator!(MyOperator);
}

This must be called exactly once per crate, at the top level, with the type that implements AdoraOperator.


Quick Start Example: Node

A minimal node that receives tick inputs and sends a random number as output.

use adora_node_api::{AdoraNode, Event, IntoArrow, adora_core::config::DataId};

fn main() -> eyre::Result<()> {
    let (mut node, mut events) = AdoraNode::init_from_env()?;

    let output = DataId::from("random".to_owned());

    while let Some(event) = events.recv() {
        match event {
            Event::Input { id, metadata, data } => {
                if id.as_str() == "tick" {
                    let value: u64 = fastrand::u64(..);
                    node.send_output(
                        output.clone(),
                        metadata.parameters,
                        value.into_arrow(),
                    )?;
                }
            }
            Event::Stop(_) => {}
            _ => {}
        }
    }

    Ok(())
}

Corresponding dataflow YAML:

nodes:
  - id: timer
    path: adora/timer/millis/100
    outputs:
      - tick

  - id: my-node
    path: ./target/debug/my-node
    inputs:
      tick: timer/tick
    outputs:
      - random

  - id: sink
    path: ./target/debug/sink
    inputs:
      data: my-node/random

Quick Start Example: Operator

A minimal operator that counts ticks and forwards formatted messages.

#![allow(unused)]
#![warn(unsafe_op_in_unsafe_fn)]

fn main() {
use adora_operator_api::{
    AdoraOperator, AdoraOutputSender, AdoraStatus, Event, IntoArrow, register_operator,
};

register_operator!(MyOperator);

#[derive(Debug, Default)]
struct MyOperator {
    ticks: usize,
}

impl AdoraOperator for MyOperator {
    fn on_event(
        &mut self,
        event: &Event,
        output_sender: &mut AdoraOutputSender,
    ) -> Result<AdoraStatus, String> {
        match event {
            Event::Input { id, data } => match *id {
                "tick" => {
                    self.ticks += 1;
                    let msg = format!("tick count: {}", self.ticks);
                    output_sender.send("status".into(), msg.into_arrow())?;
                }
                other => eprintln!("ignoring unexpected input {other}"),
            },
            Event::InputClosed { id } => {
                if *id == "tick" {
                    return Ok(AdoraStatus::Stop);
                }
            }
            Event::Stop => {}
            other => {
                eprintln!("received unknown event {other:?}");
            }
        }

        Ok(AdoraStatus::Continue)
    }
}
}

Corresponding dataflow YAML:

nodes:
  - id: timer
    path: adora/timer/millis/500
    outputs:
      - tick

  - id: runtime-node
    operator:
      shared_library: ./target/debug/libmy_operator
      inputs:
        tick: timer/tick
      outputs:
        - status