Rust API
Operator
The operator API is a framework for you to implement. The implemented operator will be managed by dora. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora.
An operator requires to be registered and implement the DoraOperator trait. It is composed of an on_event method that defines the behaviour of the operator when there is an event such as receiving an input for example.
#![allow(unused)] #![warn(unsafe_op_in_unsafe_fn)] fn main() { use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, } impl DoraOperator for ExampleOperator { fn on_event( &mut self, event: &Event, output_sender: &mut DoraOutputSender, ) -> Result<DoraStatus, String> { }
Try it out!
- Generate a new Rust library
cargo new rust-dataflow-example-operator --lib
Cargo.toml
[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib"]
[dependencies]
dora-operator-api = { workspace = true }
src/lib.rs
#![allow(unused)] #![warn(unsafe_op_in_unsafe_fn)] fn main() { use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, } impl DoraOperator for ExampleOperator { fn on_event( &mut self, event: &Event, output_sender: &mut DoraOutputSender, ) -> Result<DoraStatus, String> { match event { Event::Input { id, data } => match *id { "tick" => { self.ticks += 1; } "random" => { let parsed = { let data: [u8; 8] = (*data).try_into().map_err(|_| "unexpected random data")?; u64::from_le_bytes(data) }; let output = format!( "operator received random value {parsed:#x} after {} ticks", self.ticks ); output_sender.send("status".into(), output.into_bytes())?; } other => eprintln!("ignoring unexpected input {other}"), }, Event::Stop => {} Event::InputClosed { id } => { println!("input `{id}` was closed"); if *id == "random" { println!("`random` input was closed -> exiting"); return Ok(DoraStatus::Stop); } } other => { println!("received unknown event {other:?}"); } } Ok(DoraStatus::Continue) } } }
- Build it:
cargo build --release
- Link it in your graph as:
build: cargo build -p rust-dataflow-example-operator
shared-library: ../../target/debug/rust_dataflow_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
This example can be found in examples.
Custom Node
The custom node API allow you to integrate dora into your application. It allows you to retrieve input and send output in any fashion you want.
DoraNode::init_from_env()
DoraNode::init_from_env() initiate a node from environment variables set by dora-coordinator
#![allow(unused)] fn main() { let (mut node, mut events) = DoraNode::init_from_env()?; }
.recv()
.recv() wait for the next event on the events stream.
#![allow(unused)] fn main() { let event = events.recv(); }
.send_output(...)
send_output send data from the node to the other nodes.
We take a closure as an input to enable zero copy on send.
#![allow(unused)] fn main() { node.send_output( &data_id, metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; }
Try it out!
- Generate a new Rust binary (application):
cargo new rust-dataflow-example-node
[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }
src/main.rs
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; fn main() -> eyre::Result<()> { println!("hello"); let output = DataId::from("random".to_owned()); let (mut node, mut events) = DoraNode::init_from_env()?; for i in 0..100 { let event = match events.recv() { Some(input) => input, None => break, }; match event { Event::Input { id, metadata, data: _, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); let data: &[u8] = &random.to_le_bytes(); node.send_output(output.clone(), metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => println!("Received manual stop"), other => eprintln!("Received unexpected input: {other:?}"), } } Ok(()) }
- Link it in your graph as:
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator