Create a Rust workspace
- Initiate the workspace with:
mkdir my_first_dataflow
cd my_first_dataflow
- Create the Cargo.toml file that will configure the entire workspace:
Cargo.toml
[workspace]
members = [
"rust-dataflow-example-node",
]
Write your first node
Let's write a node which sends the current time periodically. Let's make it after 100 iterations. The other nodes/operators will then exit as well because all sources closed.
- Generate a new Rust binary (application):
cargo new rust-dataflow-example-node
with Cargo.toml
:
[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }
with src/main.rs
:
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; fn main() -> eyre::Result<()> { println!("hello"); let output = DataId::from("random".to_owned()); let (mut node, mut events) = DoraNode::init_from_env()?; for i in 0..100 { let event = match events.recv() { Some(input) => input, None => break, }; match event { Event::Input { id, metadata, data: _, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); let data: &[u8] = &random.to_le_bytes(); node.send_output(output.clone(), metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => println!("Received manual stop"), other => eprintln!("Received unexpected input: {other:?}"), } } Ok(()) }
Write your first operator
- Generate a new Rust library:
cargo new rust-dataflow-example-operator --lib
with Cargo.toml
:
[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib"]
[dependencies]
dora-operator-api = { workspace = true }
with src/lib.rs
:
#![allow(unused)] #![warn(unsafe_op_in_unsafe_fn)] fn main() { use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, } impl DoraOperator for ExampleOperator { fn on_event( &mut self, event: &Event, output_sender: &mut DoraOutputSender, ) -> Result<DoraStatus, String> { match event { Event::Input { id, data } => match *id { "tick" => { self.ticks += 1; } "random" => { let parsed = { let data: [u8; 8] = (*data).try_into().map_err(|_| "unexpected random data")?; u64::from_le_bytes(data) }; let output = format!( "operator received random value {parsed:#x} after {} ticks", self.ticks ); output_sender.send("status".into(), output.into_bytes())?; } other => eprintln!("ignoring unexpected input {other}"), }, Event::Stop => {} Event::InputClosed { id } => { println!("input `{id}` was closed"); if *id == "random" { println!("`random` input was closed -> exiting"); return Ok(DoraStatus::Stop); } } other => { println!("received unknown event {other:?}"); } } Ok(DoraStatus::Continue) } } }
- And modify the root
Cargo.toml
:
[workspace]
members = [
"rust-dataflow-example-node",
"rust-dataflow-example-operator",
]
Write your sink node
Let's write a logger
which will print incoming data.
- Generate a new Rust binary (application):
cargo new sink_logger
with Cargo.toml
:
[package]
name = "rust-dataflow-example-sink"
version.workspace = true
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
with src/main.rs
:
use dora_node_api::{self, DoraNode, Event}; use eyre::{bail, Context, ContextCompat}; fn main() -> eyre::Result<()> { let (_node, mut events) = DoraNode::init_from_env()?; while let Some(event) = events.recv() { match event { Event::Input { id, metadata: _, data, } => match id.as_str() { "message" => { let data = data.wrap_err("no data")?; let received_string = std::str::from_utf8(&data) .wrap_err("received message was not utf8-encoded")?; println!("sink received message: {}", received_string); if !received_string.starts_with("operator received random value ") { bail!("unexpected message format (should start with 'operator received random value')") } if !received_string.ends_with(" ticks") { bail!("unexpected message format (should end with 'ticks')") } } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => { println!("Received manual stop"); } Event::InputClosed { id } => { println!("Input `{id}` was closed"); } other => eprintln!("Received unexpected input: {other:?}"), } } Ok(()) }
- And modify the root
Cargo.toml
:
[workspace]
members = [
"rust-dataflow-example-node",
"rust-dataflow-example-operator",
"rust-dataflow-example-sink"
]
Compile everything
cargo build --all --release
Write a graph definition
Let's write the graph definition so that the nodes know who to communicate with.
dataflow.yml
nodes:
- id: rust-node
custom:
build: cargo build -p rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator
build: cargo build -p rust-dataflow-example-operator
shared-library: ../../target/debug/rust_dataflow_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status
Run it!
- Run the
dataflow
:
dora-daemon --run-dataflow dataflow.yml