Rust API
Operator
The operator API is a framework for you to implement. The implemented operator will be managed by dora
. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora
.
An operator requires to be registered and implement the DoraOperator
trait. It is composed of an on_event
method that defines the behaviour of the operator when there is an event such as receiving an input for example.
#![allow(unused)] #![warn(unsafe_op_in_unsafe_fn)] fn main() { use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, } impl DoraOperator for ExampleOperator { fn on_event( &mut self, event: &Event, output_sender: &mut DoraOutputSender, ) -> Result<DoraStatus, String> { }
Try it out!
- Generate a new Rust library
cargo new rust-dataflow-example-operator --lib
Cargo.toml
[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib"]
[dependencies]
dora-operator-api = { workspace = true }
src/lib.rs
#![allow(unused)] #![warn(unsafe_op_in_unsafe_fn)] fn main() { use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, } impl DoraOperator for ExampleOperator { fn on_event( &mut self, event: &Event, output_sender: &mut DoraOutputSender, ) -> Result<DoraStatus, String> { match event { Event::Input { id, data } => match *id { "tick" => { self.ticks += 1; } "random" => { let parsed = { let data: [u8; 8] = (*data).try_into().map_err(|_| "unexpected random data")?; u64::from_le_bytes(data) }; let output = format!( "operator received random value {parsed:#x} after {} ticks", self.ticks ); output_sender.send("status".into(), output.into_bytes())?; } other => eprintln!("ignoring unexpected input {other}"), }, Event::Stop => {} Event::InputClosed { id } => { println!("input `{id}` was closed"); if *id == "random" { println!("`random` input was closed -> exiting"); return Ok(DoraStatus::Stop); } } other => { println!("received unknown event {other:?}"); } } Ok(DoraStatus::Continue) } } }
- Build it:
cargo build --release
- Link it in your graph as:
build: cargo build -p rust-dataflow-example-operator
shared-library: ../../target/debug/rust_dataflow_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
This example can be found in examples
.
Custom Node
The custom node API allow you to integrate dora
into your application. It allows you to retrieve input and send output in any fashion you want.
DoraNode::init_from_env()
DoraNode::init_from_env()
initiate a node from environment variables set by dora-coordinator
#![allow(unused)] fn main() { let (mut node, mut events) = DoraNode::init_from_env()?; }
.recv()
.recv()
wait for the next event on the events stream.
#![allow(unused)] fn main() { let event = events.recv(); }
.send_output(...)
send_output
send data from the node to the other nodes.
We take a closure as an input to enable zero copy on send.
#![allow(unused)] fn main() { node.send_output( &data_id, metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; }
Try it out!
- Generate a new Rust binary (application):
cargo new rust-dataflow-example-node
[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }
src/main.rs
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; fn main() -> eyre::Result<()> { println!("hello"); let output = DataId::from("random".to_owned()); let (mut node, mut events) = DoraNode::init_from_env()?; for i in 0..100 { let event = match events.recv() { Some(input) => input, None => break, }; match event { Event::Input { id, metadata, data: _, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); let data: &[u8] = &random.to_le_bytes(); node.send_output(output.clone(), metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => println!("Received manual stop"), other => eprintln!("Received unexpected input: {other:?}"), } } Ok(()) }
- Link it in your graph as:
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator