Python API
算子
算子 API 是供您实现的框架。 实现的算子将由 dora 管理。 该框架使我们能够进行优化并提供高级特性。 这是使用 dora
的推荐方法。
算子需要一个 on_event 方法,并且需要返回一个 DoraStatus ,这取决于它是否需要继续或停止。
事件
当前有 4 种事件类型可供 on_event 方法接收:
- STOP:表示算子收到信号要停止。
- INPUT: 表示已收到一个输入。
- 您可以使用 dora_event['id'] 获得 id。
- 您可以使用 dora_event['data'] 以字节数组形式获取数据。
- 您可以使用
dora_event['value']
,获取 arrow 数组形式的数据。 - 您可以使用
dora_event['metadata']
获取元数据。
INPUT_CLOSED
: 表示输入源已关闭。 如果输入对算子的行为是关键的,这会更有用。ERROR
: 表示错误信息已接收。UNKNOWN
: 表示未知信息已接收。
send_output
从算子发送一个输出,使用 send_output: Callable[[str, bytes | pa.Array, dict], None]
输入方法:
- 第一个参数是
output_id
定义在您的数据流。 - 第二个参数是字节数组或pyarrow形式的数据。零拷贝的数组。
- 第三个参数是dora元数据,如果您要追加链接跟踪一个输入输出过程。
例如:send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])
示例
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import pyarrow as pa
from dora import DoraStatus
from ultralytics import YOLO
pa.array([])
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
class Operator:
"""
从图像推断对象
"""
def __init__(self):
self.model = YOLO("yolov8n.pt")
def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
"""图像句柄
参数:
dora_input (dict) 含有 "id", value, 和 "metadata"
send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]:
函数发送输出至数据流:
- 第一个参数是 `output_id`
- 第二个参数是类型为 bytes 或 `pa.Array` 的数据
- 第三个数据是 dora元数据字典(metadata dict)
例如: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
"""
if dora_event["type"] == "INPUT":
frame = dora_input["value"].to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)
results = self.model(frame) # 包含 NMS
# 处理结果
boxes = np.array(results[0].boxes.xyxy.cpu())
conf = np.array(results[0].boxes.conf.cpu())
label = np.array(results[0].boxes.cls.cpu())
# 将他们连接在一起
arrays = np.concatenate((boxes, conf[:, None], label[:, None]), axis=1)
send_output("bbox", pa.array(arrays.ravel()), dora_input["metadata"])
return DoraStatus.CONTINUE
对于 Python,我们建议在单个运行时上分配算子。 运行时将与多个算子共享相同的 GIL,使这些算子几乎按顺序运行。 指引: https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks
自定义节点
Node()
自定义节点可以使您集成 dora
到您的应用中。 它允许您以任何您想要的方式检索输入和发送输出。
使用导入实例:
from dora import Node
node = Node()
.next()
.next()
为您提供已接收输入的下一个节点。 它会阻塞,直到下一个事件可用。 当全部发送者被移除它将返回“None”。
event = node.next()
您还可以通过循环在事件流上迭代
for event in node:
match event["type"]:
case "INPUT":
match event["id"]:
case "image":
.send_output(output_id, data, metadata)
send_output
从节点发送数据。
参数:
output_id: str,
data: Bytes|Arrow,
metadata: Option[Dict],
node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
.dataflow_descriptor()
返回此节点所属的完整数据流描述符。
此方法返回已分析的数据流 YAML 文件。
.__version__
返回 dora python API 的当前版本。
这个命令将显示 dora 的当前版本。