跳过主要内容

Python API

算子

算子 API 是供您实现的框架。 实现的算子将由 dora 管理。 该框架使我们能够进行优化并提供高级特性。 这是使用 dora 的推荐方法。

算子需要一个 on_event 方法,并且需要返回一个 DoraStatus ,这取决于它是否需要继续或停止。

事件

当前有 4 种事件类型可供 on_event 方法接收:

  • STOP:表示算子收到信号要停止。
  • INPUT: 表示已收到一个输入。
    • 您可以使用 dora_event['id'] 获得 id。
    • 您可以使用 dora_event['data'] 以字节数组形式获取数据。
    • 您可以使用 dora_event['value'],获取 arrow 数组形式的数据。
    • 您可以使用 dora_event['metadata'] 获取元数据。
  • INPUT_CLOSED: 表示输入源已关闭。 如果输入对算子的行为是关键的,这会更有用。
  • ERROR: 表示错误信息已接收。
  • UNKNOWN: 表示未知信息已接收。

send_output

从算子发送一个输出,使用 send_output: Callable[[str, bytes | pa.Array, dict], None] 输入方法:

  • 第一个参数是 output_id 定义在您的数据流。
  • 第二个参数是字节数组或pyarrow形式的数据。零拷贝的数组。
  • 第三个参数是dora元数据,如果您要追加链接跟踪一个输入输出过程。 例如:send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])

示例

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import numpy as np
import pyarrow as pa

from dora import DoraStatus
from ultralytics import YOLO

pa.array([])

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480


class Operator:
"""
从图像推断对象
"""

def __init__(self):
self.model = YOLO("yolov8n.pt")

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
"""图像句柄
参数:
dora_input (dict) 含有 "id", value, 和 "metadata"
send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]:
函数发送输出至数据流:
- 第一个参数是 `output_id`
- 第二个参数是类型为 bytes 或 `pa.Array` 的数据
- 第三个数据是 dora元数据字典(metadata dict)
例如: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
"""
if dora_event["type"] == "INPUT":


frame = dora_input["value"].to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)
results = self.model(frame) # 包含 NMS
# 处理结果
boxes = np.array(results[0].boxes.xyxy.cpu())
conf = np.array(results[0].boxes.conf.cpu())
label = np.array(results[0].boxes.cls.cpu())
# 将他们连接在一起
arrays = np.concatenate((boxes, conf[:, None], label[:, None]), axis=1)

send_output("bbox", pa.array(arrays.ravel()), dora_input["metadata"])
return DoraStatus.CONTINUE

对于 Python,我们建议在单个运行时上分配算子。 运行时将与多个算子共享相同的 GIL,使这些算子几乎按顺序运行。 指引: https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks

自定义节点

Node()

自定义节点可以使您集成 dora 到您的应用中。 它允许您以任何您想要的方式检索输入和发送输出。

使用导入实例:

from dora import Node

node = Node()

.next()

.next() 为您提供已接收输入的下一个节点。 它会阻塞,直到下一个事件可用。 当全部发送者被移除它将返回“None”。

event = node.next()

您还可以通过循环在事件流上迭代

for event in node:
match event["type"]:
case "INPUT":
match event["id"]:
case "image":

.send_output(output_id, data, metadata)

send_output 从节点发送数据。

参数:
output_id: str,
data: Bytes|Arrow,
metadata: Option[Dict],
node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})

.dataflow_descriptor()

返回此节点所属的完整数据流描述符。

此方法返回已分析的数据流 YAML 文件。

.__version__

返回 dora python API 的当前版本。

这个命令将显示 dora 的当前版本。