Welcome to dora
!
dora
goal is to be a low latency, composable, and distributed data flow.
By using dora
, you can define robotic applications as a graph of nodes that can be easily swapped and replaced. Those nodes can be shared and implemented in different languages such as Rust, Python or C. dora
will then connect those nodes and try to provide as many features as possible to facilitate the dataflow.
✨ Features
Composability as:
-
YAML
declarative programming -
polyglot:
- Rust
- C
- C++
- Python
- Isolated operators and custom nodes that can be reused.
Low latency as:
- written in ...Cough...blazingly fast ...Cough... Rust.
- PubSub communication with shared memory!
- Zero-copy on read!
Distributed as:
-
PubSub communication between machines with
zenoh
-
Distributed telemetry with
opentelemetry
⚖️ LICENSE
This project is licensed under Apache-2.0. Check out NOTICE.md for more information.
Installation
This project is in early development, and many features have yet to be implemented with breaking changes. Please don't take for granted the current design. The installation process will be streamlined in the future.
Download the binaries from Github
Install dora
binaries from GitHub releases:
wget https://github.com/dora-rs/dora/releases/download/<version>/dora-<version>-x86_64-Linux.zip
unzip dora-<version>-x86_64-Linux.zip
python3 -m pip install dora-rs==<version> ## For Python API
PATH=$PATH:$(pwd)
dora --help
Or compile from Source
Build it using:
git clone https://github.com/dora-rs/dora.git
cd dora
cargo build --all --release
PATH=$PATH:$(pwd)/target/release
Create a Rust workspace
- Initiate the workspace with:
mkdir my_first_dataflow
cd my_first_dataflow
- Create the Cargo.toml file that will configure the entire workspace:
Cargo.toml
[workspace]
members = [
"rust-dataflow-example-node",
]
Write your first node
Let's write a node which sends the current time periodically. Let's make it after 100 iterations. The other nodes/operators will then exit as well because all sources closed.
- Generate a new Rust binary (application):
cargo new rust-dataflow-example-node
with Cargo.toml
:
[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }
with src/main.rs
:
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; fn main() -> eyre::Result<()> { println!("hello"); let output = DataId::from("random".to_owned()); let (mut node, mut events) = DoraNode::init_from_env()?; for i in 0..100 { let event = match events.recv() { Some(input) => input, None => break, }; match event { Event::Input { id, metadata, data: _, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); let data: &[u8] = &random.to_le_bytes(); node.send_output(output.clone(), metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => println!("Received manual stop"), other => eprintln!("Received unexpected input: {other:?}"), } } Ok(()) }
Write your first operator
- Generate a new Rust library:
cargo new rust-dataflow-example-operator --lib
with Cargo.toml
:
[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib"]
[dependencies]
dora-operator-api = { workspace = true }
with src/lib.rs
:
#![allow(unused)] #![warn(unsafe_op_in_unsafe_fn)] fn main() { use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, } impl DoraOperator for ExampleOperator { fn on_event( &mut self, event: &Event, output_sender: &mut DoraOutputSender, ) -> Result<DoraStatus, String> { match event { Event::Input { id, data } => match *id { "tick" => { self.ticks += 1; } "random" => { let parsed = { let data: [u8; 8] = (*data).try_into().map_err(|_| "unexpected random data")?; u64::from_le_bytes(data) }; let output = format!( "operator received random value {parsed:#x} after {} ticks", self.ticks ); output_sender.send("status".into(), output.into_bytes())?; } other => eprintln!("ignoring unexpected input {other}"), }, Event::Stop => {} Event::InputClosed { id } => { println!("input `{id}` was closed"); if *id == "random" { println!("`random` input was closed -> exiting"); return Ok(DoraStatus::Stop); } } other => { println!("received unknown event {other:?}"); } } Ok(DoraStatus::Continue) } } }
- And modify the root
Cargo.toml
:
[workspace]
members = [
"rust-dataflow-example-node",
"rust-dataflow-example-operator",
]
Write your sink node
Let's write a logger
which will print incoming data.
- Generate a new Rust binary (application):
cargo new sink_logger
with Cargo.toml
:
[package]
name = "rust-dataflow-example-sink"
version.workspace = true
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
with src/main.rs
:
use dora_node_api::{self, DoraNode, Event}; use eyre::{bail, Context, ContextCompat}; fn main() -> eyre::Result<()> { let (_node, mut events) = DoraNode::init_from_env()?; while let Some(event) = events.recv() { match event { Event::Input { id, metadata: _, data, } => match id.as_str() { "message" => { let data = data.wrap_err("no data")?; let received_string = std::str::from_utf8(&data) .wrap_err("received message was not utf8-encoded")?; println!("sink received message: {}", received_string); if !received_string.starts_with("operator received random value ") { bail!("unexpected message format (should start with 'operator received random value')") } if !received_string.ends_with(" ticks") { bail!("unexpected message format (should end with 'ticks')") } } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => { println!("Received manual stop"); } Event::InputClosed { id } => { println!("Input `{id}` was closed"); } other => eprintln!("Received unexpected input: {other:?}"), } } Ok(()) }
- And modify the root
Cargo.toml
:
[workspace]
members = [
"rust-dataflow-example-node",
"rust-dataflow-example-operator",
"rust-dataflow-example-sink"
]
Compile everything
cargo build --all --release
Write a graph definition
Let's write the graph definition so that the nodes know who to communicate with.
dataflow.yml
nodes:
- id: rust-node
custom:
build: cargo build -p rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator
build: cargo build -p rust-dataflow-example-operator
shared-library: ../../target/debug/rust_dataflow_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status
Run it!
- Run the
dataflow
:
dora-daemon --run-dataflow dataflow.yml
Design Overview
The dora framework is structured into different components:
The following main components exist:
-
Nodes: Dora nodes are separate, isolated processes that communicate with other nodes through the dora library. Nodes can be either a custom, user-specified program, or a dora runtime node, which allows to run dora operators. Nodes implement their own
main
function and thus have full control over their execution.Nodes use the dora library to communicate with other nodes, which is available for multiple languages (Rust, C; maybe Python, WASM). Communication happens through a communication layer, which will be
zenoh
in our first version. We plan to add more options in the future. All communication layer implementations should be robust against disconnections, so operators should be able to keep running even if they lose the connection to the coordinator. -
Operators: Operators are light-weight, cooperative, library-based components that are executed by a dora runtime node. They must implement a specific interface, depending on the used language. Operators can use a wide range of advanced features provided by the dora runtime, for example priority scheduling or native deadline support.
-
Coordinator: The coordinator is responsible for reading the dataflow from a YAML file, verifying it, and deploying the nodes and operators to the specified or automatically determined machines. It monitors the operator's health and implements high level cluster management functionality. For example, we could implement automatic scaling for cloud nodes or operator replication and restarts. The coordinator can be controlled through a command line program (CLI).
Operators vs Custom Nodes
There are two ways to implement an operation in dora: Either as a dora operator, or as a custom nodes. Both approaches have their advantages and drawbacks, as explained below. In general, it is recommended to create dora operators and only use custom nodes when necessary.
Operators have the following advantages:
- They can use a wide range of advanced functionality provided by the dora runtime nodes. This includes special scheduling strategies and features such as deadlines.
- They are light-weight, so they only occupy minimal amounts of memory. This makes it possible to run thousands of operators on the same machine.
- They can use runtime-managed state storage, for robustness or for sharing state with other operators.
- They share the address space with other operators on the same node, which makes communication much faster.
Custom nodes provide a different set of advantages:
- Each node is a separate, isolated process, which can be important for security-critical operations.
- They support pinned resources. For example, a CPU core can be pinned to a custom node through the dataflow configuration file.
- They have full control over their execution, which allows to create complex input and wake-up rules.
Dataflow Specification
Dataflows are specified through a YAML file. This section presents our current draft for the file format. It only includes basic functionality for now, we will extend it later when we introduce more advanced features.
Dataflow
Dataflows are specified through the following format:
nodes:
- id: foo
# ... (see below)
- id: bar
# ... (see below)
Inputs and Outputs
Each operator or custom node has a separate namespace for its outputs. To refer to outputs, the
Input operands are specified using the
Nodes
Nodes are defined using the following format:
nodes:
- id: some-unique-id
# For nodes with multiple operators
operators:
- id: operator-1
# ... (see below)
- id: operator-2
# ... (see below)
- id: some-unique-id-2
custom:
source: path/to/timestamp
env:
- ENVIRONMENT_VARIABLE_1: true
working-directory: some/path
inputs:
input_1: operator_2/output_4
input_2: custom_node_2/output_4
outputs:
- output_1
# Unique operator
- id: some-unique-id-3
operator:
# ... (see below)
Nodes must provide either a operators
field, or a custom
field, but not both. Nodes with an operators
field run a dora runtime process, which runs and manages the specified operators. Nodes with a custom
field, run a custom executable.
Custom Nodes
Custom nodes specify the executable name and arguments like a normal shell operation through the run
field. Through the optional env
field, it is possible to set environment variables for the process. The optional working-directory
field allows to overwrite the directory in which the program is started.
To integrate with the rest of the dora dataflow, custom nodes must specify their inputs and outputs, similar to operators. They can reference outputs of both operators, and other custom nodes.
Operators
Operators are defined through the following format:
- id: unique-operator-id
name: Human-Readable Operator Name
description: An optional description of the operators's purpose.
inputs:
input_1: source_operator_2/output_1
input_2: custom_node_1/output_1
outputs:
- output_1
## ONE OF:
shared_library: "path/to/shared_lib" # file extension and `lib` prefix are added automatically
python: "path/to/python_file.py"
wasm: "path/to/wasm_file.wasm"
Operators must list all their inputs and outputs. Inputs can be linked to arbitrary outputs of other operators or custom nodes.
There are multiple ways to implement an operator:
- as a C-compatible shared library
- as a Python object
- as a WebAssembly (WASM) module
Each operator must specify exactly one implementation. The implementation must follow a specific format that is specified by dora.
Example
nodes:
- id: rust-node
custom:
build: cargo build -p rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator
build: cargo build -p rust-dataflow-example-operator
shared-library: ../../target/debug/rust_dataflow_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status
TODO: Integration with ROS 1/2
To integrate dora-rs operators with ROS1 or ROS2 operators, we plan to provide special bridge operators. These operators act as a sink in one dataflow framework and push all messages to a different dataflow framework, where they act as source.
For example, we plan to provide a to_ros_2
operator, which takes a single data
input, which is then published to a specified ROS 2 dataflow.
Rust API
Operator
The operator API is a framework for you to implement. The implemented operator will be managed by dora
. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora
.
An operator requires to be registered and implement the DoraOperator
trait. It is composed of an on_event
method that defines the behaviour of the operator when there is an event such as receiving an input for example.
#![allow(unused)] #![warn(unsafe_op_in_unsafe_fn)] fn main() { use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, } impl DoraOperator for ExampleOperator { fn on_event( &mut self, event: &Event, output_sender: &mut DoraOutputSender, ) -> Result<DoraStatus, String> { }
Try it out!
- Generate a new Rust library
cargo new rust-dataflow-example-operator --lib
Cargo.toml
[package]
name = "rust-dataflow-example-operator"
version.workspace = true
edition = "2021"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib"]
[dependencies]
dora-operator-api = { workspace = true }
src/lib.rs
#![allow(unused)] #![warn(unsafe_op_in_unsafe_fn)] fn main() { use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event}; register_operator!(ExampleOperator); #[derive(Debug, Default)] struct ExampleOperator { ticks: usize, } impl DoraOperator for ExampleOperator { fn on_event( &mut self, event: &Event, output_sender: &mut DoraOutputSender, ) -> Result<DoraStatus, String> { match event { Event::Input { id, data } => match *id { "tick" => { self.ticks += 1; } "random" => { let parsed = { let data: [u8; 8] = (*data).try_into().map_err(|_| "unexpected random data")?; u64::from_le_bytes(data) }; let output = format!( "operator received random value {parsed:#x} after {} ticks", self.ticks ); output_sender.send("status".into(), output.into_bytes())?; } other => eprintln!("ignoring unexpected input {other}"), }, Event::Stop => {} Event::InputClosed { id } => { println!("input `{id}` was closed"); if *id == "random" { println!("`random` input was closed -> exiting"); return Ok(DoraStatus::Stop); } } other => { println!("received unknown event {other:?}"); } } Ok(DoraStatus::Continue) } } }
- Build it:
cargo build --release
- Link it in your graph as:
build: cargo build -p rust-dataflow-example-operator
shared-library: ../../target/debug/rust_dataflow_example_operator
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
This example can be found in examples
.
Custom Node
The custom node API allow you to integrate dora
into your application. It allows you to retrieve input and send output in any fashion you want.
DoraNode::init_from_env()
DoraNode::init_from_env()
initiate a node from environment variables set by dora-coordinator
#![allow(unused)] fn main() { let (mut node, mut events) = DoraNode::init_from_env()?; }
.recv()
.recv()
wait for the next event on the events stream.
#![allow(unused)] fn main() { let event = events.recv(); }
.send_output(...)
send_output
send data from the node to the other nodes.
We take a closure as an input to enable zero copy on send.
#![allow(unused)] fn main() { node.send_output( &data_id, metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; }
Try it out!
- Generate a new Rust binary (application):
cargo new rust-dataflow-example-node
[package]
name = "rust-dataflow-example-node"
version.workspace = true
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
futures = "0.3.21"
rand = "0.8.5"
tokio = { version = "1.24.2", features = ["rt", "macros"] }
src/main.rs
use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event}; fn main() -> eyre::Result<()> { println!("hello"); let output = DataId::from("random".to_owned()); let (mut node, mut events) = DoraNode::init_from_env()?; for i in 0..100 { let event = match events.recv() { Some(input) => input, None => break, }; match event { Event::Input { id, metadata, data: _, } => match id.as_str() { "tick" => { let random: u64 = rand::random(); println!("tick {i}, sending {random:#x}"); let data: &[u8] = &random.to_le_bytes(); node.send_output(output.clone(), metadata.parameters, data.len(), |out| { out.copy_from_slice(data); })?; } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => println!("Received manual stop"), other => eprintln!("Received unexpected input: {other:?}"), } } Ok(()) }
- Link it in your graph as:
inputs:
tick: dora/timer/millis/10
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator
C API
Operator
The operator API is a framework for you to implement. The implemented operator will be managed by dora
. This framework enable us to make optimisation and provide advanced features.
The operator definition is composed of 3 functions, dora_init_operator
that initialise the operator and its context. dora_drop_operator
that free the memory, and dora_on_event
that action the logic of the operator on receiving an input.
#include "../../apis/c/operator/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
DoraInitResult_t dora_init_operator(void)
{
void *context = malloc(1);
char *context_char = (char *)context;
*context_char = 0;
DoraInitResult_t result = {.operator_context = context};
return result;
}
DoraResult_t dora_drop_operator(void *operator_context)
{
free(operator_context);
DoraResult_t result = {};
return result;
}
OnEventResult_t dora_on_event(
const RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{
Try it out!
- Create an
operator.c
file:
#include "../../apis/c/operator/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
DoraInitResult_t dora_init_operator(void)
{
void *context = malloc(1);
char *context_char = (char *)context;
*context_char = 0;
DoraInitResult_t result = {.operator_context = context};
return result;
}
DoraResult_t dora_drop_operator(void *operator_context)
{
free(operator_context);
DoraResult_t result = {};
return result;
}
OnEventResult_t dora_on_event(
const RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{
char *counter = (char *)operator_context;
if (event->input != NULL)
{
// input event
Input_t *input = event->input;
char id[input->id.len + 1];
memcpy(id, input->id.ptr, input->id.len);
id[input->id.len] = 0;
if (strcmp(id, "message") == 0)
{
char data[input->data.len + 1];
memcpy(data, input->data.ptr, input->data.len);
data[input->data.len] = 0;
*counter += 1;
printf("C operator received message `%s`, counter: %i\n", data, *counter);
char *out_id = "counter";
char *out_id_heap = strdup(out_id);
int data_alloc_size = 100;
char *out_data = (char *)malloc(data_alloc_size);
int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter);
assert(count >= 0 && count < 100);
Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}};
DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output);
OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE};
return result;
}
else
{
printf("C operator received unexpected input %s, context: %i\n", id, *counter);
}
}
if (event->stop)
{
printf("C operator received stop event\n");
}
OnEventResult_t result = {.status = DORA_STATUS_CONTINUE};
return result;
}
-ladvapi32 -luserenv -lkernel32 -lws2_32 -lbcrypt -lncrypt -lschannel -lntdll -liphlpapi
-lcfgmgr32 -lcredui -lcrypt32 -lcryptnet -lfwpuclnt -lgdi32 -lmsimg32 -lmswsock -lole32
-lopengl32 -lsecur32 -lshell32 -lsynchronization -luser32 -lwinspool
-Wl,-nodefaultlib:libcmt -D_DLL -lmsvcrt
```
Also: On Windows, the output file should have an `.exe` extension: `--output build/c_node.exe`
-
Repeat the previous step for the
sink.c
executable -
Link it in your graph as:
inputs:
message: c_node/message
outputs:
- counter
- id: c_sink
custom:
source: build/c_sink
inputs:
Custom Node
The custom node API allow you to integrate dora
into your application. It allows you to retrieve input and send output in any fashion you want.
init_dora_context_from_env
init_dora_context_from_env
initiate a node from environment variables set by dora-coordinator
void *dora_context = init_dora_context_from_env();
dora_next_event
dora_next_event
waits for the next event (e.g. an input). Use read_dora_event_type
to read the event's type. Inputs are of type DoraEventType_Input
. To extract the ID and data of an input event, use read_dora_input_id
and read_dora_input_data
on the returned pointer. It is safe to ignore any events and handle only the events that are relevant to the node.
void *input = dora_next_input(dora_context);
// read out the ID as a UTF8-encoded string
char *id;
size_t id_len;
read_dora_input_id(input, &id, &id_len);
// read out the data as a byte array
char *data;
size_t data_len;
read_dora_input_data(input, &data, &data_len);
dora_send_output
dora_send_output
send data from the node.
char out_id[] = "tick";
char out_data[50];
dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len);
Try it out!
- Create an
node.c
file:
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "../../apis/c/node/node_api.h"
// sleep
#ifdef _WIN32
#include <Windows.h>
#else
#include <unistd.h>
#endif
int main()
{
printf("[c node] Hello World\n");
void *dora_context = init_dora_context_from_env();
if (dora_context == NULL)
{
fprintf(stderr, "failed to init dora context\n");
return -1;
}
printf("[c node] dora context initialized\n");
for (char i = 0; i < 100; i++)
{
void *event = dora_next_event(dora_context);
if (event == NULL)
{
printf("[c node] ERROR: unexpected end of event\n");
return -1;
}
enum DoraEventType ty = read_dora_event_type(event);
if (ty == DoraEventType_Input)
{
char *data;
size_t data_len;
read_dora_input_data(event, &data, &data_len);
assert(data_len == 0);
char out_id[] = "message";
char out_data[50];
int out_data_len = sprintf(out_data, "loop iteration %d", i);
dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len);
}
else if (ty == DoraEventType_Stop)
{
printf("[c node] received stop event\n");
}
else
{
printf("[c node] received unexpected event: %d\n", ty);
}
free_dora_event(event);
}
printf("[c node] received 10 events\n");
free_dora_context(dora_context);
printf("[c node] finished successfully\n");
return 0;
}
-
Create a
build
folder in this directory (i.e., next to thenode.c
file) -
Compile the
dora-node-api-c
crate into a static library.- Run
cargo build -p dora-node-api-c --release
- The resulting staticlib is then available under
../../target/release/libdora-node-api-c.a
.
- Run
-
Compile the
node.c
(e.g. usingclang
) and link the staticlib- For example, use the following command:
clang node.c <FLAGS> -ldora_node_api_c -L ../../target/release --output build/c_node
- The
<FLAGS>
depend on the operating system and the libraries that the C node uses. The following flags are required for each OS:
- For example, use the following command:
-
Link it in your graph as:
timer: dora/timer/millis/50
outputs:
- message
- id: runtime-node
operators:
- id: c_operator
shared-library: build/operator
Python API
Operator
The operator API is a framework for you to implement. The implemented operator will be managed by dora
. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora
.
An operator requires an on_event
method and requires to return a DoraStatus
, depending of it needs to continue or stop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Callable
import cv2
import numpy as np
import pyarrow as pa
import torch
from dora import DoraStatus
pa.array([])
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
class Operator:
"""
Infering object from images
"""
def __init__(self):
self.model = torch.hub.load("ultralytics/yolov5", "yolov5n")
For Python, we recommend to allocate the operator on a single runtime. A runtime will share the same GIL with several operators making those operators run almost sequentially. See: https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks
Try it out!
- Create an operator python file called
object_detection.py
:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Callable
from dora import Node
import cv2
import numpy as np
import torch
model = torch.hub.load("ultralytics/yolov5", "yolov5n")
node = Node()
for event in node:
match event["type"]:
case "INPUT":
match event["id"]:
case "image":
print("[object detection] received image input")
frame = np.frombuffer(event["data"], dtype="uint8")
frame = cv2.imdecode(frame, -1)
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)
results = model(frame) # includes NMS
arrays = np.array(results.xyxy[0].cpu()).tobytes()
node.send_output("bbox", arrays, event["metadata"])
case other:
print("[object detection] ignoring unexpected input:", other)
case "STOP":
print("[object detection] received stop")
case "ERROR":
print("[object detection] error: ", event["error"])
case other:
print("[object detection] received unexpected event:", other)
- Link it in your graph as:
source: ./object_detection.py
inputs:
image: webcam/image
outputs:
- bbox
- id: plot
Custom Node
The custom node API allow you to integrate dora
into your application. It allows you to retrieve input and send output in any fashion you want.
Node()
Node()
initiate a node from environment variables set by dora-coordinator
from dora import Node
node = Node()
.next()
or __next__()
as an iterator
.next()
gives you the next input that the node has received. It blocks until the next input becomes available. It will return None
when all senders has been dropped.
input_id, value, metadata = node.next()
# or
for input_id, value, metadata in node:
.send_output(output_id, data)
send_output
send data from the node.
node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
Try it out!
- Install python node API:
pip install dora-rs
- Create a python file called
webcam.py
:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import cv2
from dora import Node
node = Node()
video_capture = cv2.VideoCapture(0)
start = time.time()
# Run for 20 seconds
while time.time() - start < 10:
# Wait next dora_input
event = node.next()
match event["type"]:
case "INPUT":
ret, frame = video_capture.read()
if ret:
node.send_output(
"image",
cv2.imencode(".jpg", frame)[1].tobytes(),
event["metadata"],
)
case "STOP":
print("received stop")
break
case other:
print("received unexpected event:", other)
break
video_capture.release()
- Link it in your graph as:
tick:
source: dora/timer/millis/100
queue_size: 1000
outputs:
- image
- id: object_detection
State Management
Most operations require to keep some sort of state between calls. This document describes the different ways to handle state in dora.
Internal State
Operators are struct
or object instances, so they can keep internal state between invocations. This state is private to the operator. When an operator exits or crashes, its internal state is lost.
Saving State
To make themselves resilient against crashes, operators can use dora's state management. The dora runtime provides each operator with a private key-value store (KVS). Operators can save serialized state into the KVS by using the save_state
function of the runtime:
#![allow(unused)] fn main() { fn save_state(key: &str, value: Vec<u8>) }
The runtime only stores the latest value for each key, so subsequent writes to the same key replace the earlier values. Serialization is required because the state must be self-contained (i.e. no pointers to other memory) and consistent (i.e. no half-updated state). Otherwise, state recovery might not be possible after an operator crash.
To keep the performance overhead of this function low, it is recommended to use a suitable serialization format that stores the data with minimal memory and compute overhead. Text-based formats such as JSON are not recommended. Also, fast-changing state should be stored under a separate key to minimize the amount of state that needs to be written.
State Recovery
When an operator crashes, the dora runtime restarts it and supplies it with the last version of the saved state. It does this by calling the operator's restore_state
method:
#![allow(unused)] fn main() { fn restore_state(&mut self, state: HashMap<String, Vec<u8>>) }
In this method, the operator should deserialize and apply all state entries, and perform all custom consistency checks that are necessary.
Sharing State
To share state between operators, dora provides access to a node-local key-value store:
#![allow(unused)] fn main() { fn kvs_write(key: &str, value: Vec<u8>) }
#![allow(unused)] fn main() { fn kvs_read(key: &str) -> Vec<u8> }
Todo:
- Consistency?
- Anna?
Custom Nodes
Custom nodes have full control over the execution, so they can implement their own state management. Shared state can be accessed through the kvs_read
and kvs_write
functions of the dora library, which are equivalent to the respective functions provided by the dora runtime.
Since custom nodes cannot use the recovery feature of the dora runtime, the save_state
/restore_state
functions are not available for them.
Framework
- Runtime process
- Talks with other runtime processes
- Across machines
- loop
- listen for inputs
- invoke corresponding operator(s)
- collect and forward outputs
- Operators
- Connected to runtime
- Via TCP socket (can be a separate process)
- Single connection with high level message format, or
- Separate connection per input/output
- Dynamically linked as shared library
- Runtime invokes specific handler message directly with input(s)
- Outputs either:
- Return a collection as result
- Call runtime function to send out result
- Via TCP socket (can be a separate process)
- Input aggregation (i.e. waiting until multiple inputs are available)
- by runtime -> aggregation specified in config file
- by operator -> custom handling possible
- Connected to runtime
Library
- All sources/operator/sinks are separate processes that link a runtime library
- "Orchestrator" process
- reads config file
- launches processes accordingly
- passes node config
- as argument
- via env variable
- including input and output names
- Runtime library provides (async) functions to
- wait for one or multiple inputs
- with timeouts
- send out outputs
Middleware (communication) layer abstraction (MLA)
dora
needs to implement MLA as a separate crate to provides a middleware abstraction layer that enables scalable, high performance communications for inter async tasks, intra-process (OS threads), interprocess communication on a single computer node or between different nodes in a computer network. MLA needs to support different communication patterns:
- publish-subscribe push / push pattern - the published message is pushed to subscribers
- publish-subscribe push / pull pattern - the published message is write to storage and later pulled by subscribers
- Request / reply pattern
- Point-to-point pattern
- Client / Server pattern
The MLA needs to abstract following details:
- inter-async tasks (e.g., tokio channels), intraprocess (OS threads, e.g., shared memory), interprocess and inter-host / inter-network communication
- different transport layer implementations (shared memory, UDP, TCP)
- builtin support for multiple serialization / deserialization protocols, e.g, capnproto, protobuf, flatbuffers etc
- different language bindings to Rust, Python, C, C++ etc
- telemetry tools for logs, metrics, distributed tracing, live data monitoring (e.g., tap a live data), recording and replay
Rust eco-system has abundant crates to provide underlaying communications, e.g.,:
- tokio / crossbeam provides different types of channels serving different purpose: mpsc, oneshot, broadcast, watch etc
- Tonic provides gRPC services
- Tower provides request/reply service
- Zenoh middleware provides many different pub/sub capabilities
MLA also needs to provide high level APIs:
- publish(topic, value, optional fields):- optional fields may contain senders' identify to help MLA logics to satify above requirements
- subscriber(topic, optional fields)-> future streams
- put(key, value, optional fields)
- get(key, optional fields) -> value
- send(key, msg, optional fields)
- recv(key, optional fields)->value
More info here: #53