Python API
Operator
The operator API is a framework for you to implement. The implemented operator will be managed by dora
. This framework enable us to make optimisation and provide advanced features. It is the recommended way of using dora
.
An operator requires an on_event
method and requires to return a DoraStatus
, depending of it needs to continue or stop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Callable
import cv2
import numpy as np
import pyarrow as pa
import torch
from dora import DoraStatus
pa.array([])
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
class Operator:
"""
Infering object from images
"""
def __init__(self):
self.model = torch.hub.load("ultralytics/yolov5", "yolov5n")
For Python, we recommend to allocate the operator on a single runtime. A runtime will share the same GIL with several operators making those operators run almost sequentially. See: https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks
Try it out!
- Create an operator python file called
object_detection.py
:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Callable
from dora import Node
import cv2
import numpy as np
import torch
model = torch.hub.load("ultralytics/yolov5", "yolov5n")
node = Node()
for event in node:
match event["type"]:
case "INPUT":
match event["id"]:
case "image":
print("[object detection] received image input")
frame = np.frombuffer(event["data"], dtype="uint8")
frame = cv2.imdecode(frame, -1)
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)
results = model(frame) # includes NMS
arrays = np.array(results.xyxy[0].cpu()).tobytes()
node.send_output("bbox", arrays, event["metadata"])
case other:
print("[object detection] ignoring unexpected input:", other)
case "STOP":
print("[object detection] received stop")
case "ERROR":
print("[object detection] error: ", event["error"])
case other:
print("[object detection] received unexpected event:", other)
- Link it in your graph as:
source: ./object_detection.py
inputs:
image: webcam/image
outputs:
- bbox
- id: plot
Custom Node
The custom node API allow you to integrate dora
into your application. It allows you to retrieve input and send output in any fashion you want.
Node()
Node()
initiate a node from environment variables set by dora-coordinator
from dora import Node
node = Node()
.next()
or __next__()
as an iterator
.next()
gives you the next input that the node has received. It blocks until the next input becomes available. It will return None
when all senders has been dropped.
input_id, value, metadata = node.next()
# or
for input_id, value, metadata in node:
.send_output(output_id, data)
send_output
send data from the node.
node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
Try it out!
- Install python node API:
pip install dora-rs
- Create a python file called
webcam.py
:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import cv2
from dora import Node
node = Node()
video_capture = cv2.VideoCapture(0)
start = time.time()
# Run for 20 seconds
while time.time() - start < 10:
# Wait next dora_input
event = node.next()
match event["type"]:
case "INPUT":
ret, frame = video_capture.read()
if ret:
node.send_output(
"image",
cv2.imencode(".jpg", frame)[1].tobytes(),
event["metadata"],
)
case "STOP":
print("received stop")
break
case other:
print("received unexpected event:", other)
break
video_capture.release()
- Link it in your graph as:
tick:
source: dora/timer/millis/100
queue_size: 1000
outputs:
- image
- id: object_detection