Python API 参考
本文档涵盖用于构建 adora 节点、算子和数据流的 Python API。安装方式:
pip install adora-rs
目录
节点 API
from adora import Node
Node 类是自定义节点的主要接口。它连接到运行中的数据流,接收输入事件并发送输出。
Node 类
__init__(node_id=None)
创建新节点并连接到运行中的数据流。
# Standard: node ID is read from environment variables set by the daemon
node = Node()
# Dynamic: connect to a running dataflow by explicit node ID
node = Node(node_id="my-dynamic-node")
Parameters:
node_id(str,可选)—— 动态节点的显式节点 ID。省略时,节点从 adora 守护进程设置的环境变量中读取身份。
异常: 如果节点无法连接到数据流,则抛出 RuntimeError。
next(timeout=None)
从事件流中获取下一个事件。阻塞直到有事件可用或超时过期。
event = node.next() # block indefinitely
event = node.next(timeout=2.0) # block up to 2 seconds
Parameters:
timeout(float,可选)—— 最大等待时间(秒)。
返回: dict —— 一个事件字典,如果所有发送者已被释放或超时过期则为 None。
drain()
非阻塞地获取所有缓冲事件。
events = node.drain()
for event in events:
print(event["type"])
返回: list[dict] —— 事件字典列表。如果没有缓冲事件则返回空列表。
try_recv()
非阻塞接收。如果有可用的缓冲事件则返回。
event = node.try_recv()
if event is not None:
print(event["type"])
返回: dict | None —— 一个事件字典,如果没有缓冲事件则为 None。
recv_async(timeout=None)
异步接收。配合 asyncio 使用。
event = await node.recv_async()
event = await node.recv_async(timeout=5.0)
Parameters:
timeout(float,可选)—— 最大等待时间(秒)。超时时返回错误。
返回: dict | None —— 一个事件字典,如果所有发送者已被释放则为 None。
注意: 此方法为实验性质。PyO3 异步(Rust-Python FFI)集成仍在开发中。
is_empty()
检查事件流中是否有任何缓冲事件。
if not node.is_empty():
event = node.try_recv()
返回: bool
send_output(output_id, data, metadata=None)
在输出通道上发送数据。
import pyarrow as pa
# Send raw bytes
node.send_output("status", b"OK")
# Send an Apache Arrow array (zero-copy capable)
node.send_output("values", pa.array([1, 2, 3]))
# Send with metadata
node.send_output("image", pa.array(pixels), {"camera_id": "front"})
Parameters:
output_id(str)—— 数据流 YAML 中声明的输出名称。data(bytes | pyarrow.Array)—— 载荷。对简单数据使用bytes,对零拷贝共享内存传输使用pyarrow.Array。metadata(dict,可选)—— 附加到消息的键值对。支持的值类型:bool、int、float、str、list[int]、list[float]、list[str]、datetime.datetime。
异常: 如果 data 既不是 bytes 也不是 pyarrow.Array,则抛出 RuntimeError。
Service, action, and streaming patterns
Python nodes use the same metadata key conventions as Rust for communication patterns. Parameters are plain dicts with string keys.
约定的元数据键:
| Key | 描述 |
|---|---|
"request_id" | 服务请求/响应关联(UUID v7) |
"goal_id" | 动作目标标识(UUID v7) |
"goal_status" | 动作结果状态:"succeeded"、"aborted" 或 "canceled" |
"session_id" | Streaming session identifier |
"segment_id" | Streaming segment within a session (integer) |
"seq" | Streaming chunk sequence number (integer) |
"fin" | Last chunk of a streaming segment (bool) |
"flush" | Discard older queued messages on input (bool) |
服务客户端示例:
import uuid
# Send a request with a unique request_id
request_id = str(uuid.uuid7()) # Python 3.13+; use uuid_utils or uuid.uuid4() on older versions
node.send_output("request", data, {"request_id": request_id})
服务服务端示例:
# Pass through the metadata (includes request_id) from the incoming request
node.send_output("response", result, event["metadata"])
动作客户端示例:
goal_id = str(uuid.uuid7())
node.send_output("goal", data, {"goal_id": goal_id})
Streaming example (flush downstream queues on user interruption):
params = {
"session_id": session_id,
"segment_id": 1,
"seq": 0,
"fin": False,
"flush": True,
}
node.send_output("text", data, metadata={"parameters": params})
See patterns.md for the full guide.
日志
Python nodes can log using either Python’s built-in logging module (recommended) or the explicit node API.
Python logging module (auto-bridged):
When Node() is created, it automatically installs a handler that routes Python’s logging module through the adora daemon. No configuration needed:
import logging
from adora import Node
node = Node() # Installs the logging bridge
logging.info("Sensor initialized") # -> structured "info" log entry
logging.warning("High temperature") # -> structured "warn" log entry
logging.debug("Raw bytes: %s", data) # -> structured "debug" log entry
These log entries are captured with full metadata (level, message, file path, line number) and work with min_log_level filtering, send_logs_as routing, and adora/logs subscribers.
Note: Do not call
logging.basicConfig()before creatingNode(). The constructor sets up the bridge; callingbasicConfig()first may install a conflicting handler.
Explicit node API:
log(level, message, target=None, fields=None)
Emit a structured log message with optional target and key-value fields.
node.log("info", "Processing frame", target="vision")
node.log("error", "Sensor timeout", fields={"sensor": "lidar", "retry": "3"})
Parameters:
level(str)—— 日志级别:"error"、"warn"、"info"、"debug"或"trace"。message(str)—— 日志消息。target(str,可选)—— 目标模块或子系统名称。fields(dict[str, str],可选)—— 结构化键值上下文字段。
Works with the daemon’s min_log_level filtering, send_logs_as routing, and adora/logs subscribers.
log_error(message), log_warn(message), log_info(message), log_debug(message), log_trace(message)
Convenience methods for common log levels:
node.log_error("Connection failed")
node.log_warn("Temperature elevated")
node.log_info("Sensor initialized")
node.log_debug("Raw bytes received")
node.log_trace("Entering loop iteration")
Each is equivalent to node.log(level, message).
When to use which:
| 方法 | Structured? | Fields? | 最适用于 |
|---|---|---|---|
logging.info() | 是 | 否 | General-purpose logging |
node.log("info", msg, fields={...}) | 是 | 是 | Structured context (sensor_id, etc.) |
node.log_info(msg) | 是 | 否 | Quick one-liner |
print() | 否 | 否 | Legacy code, quick debugging |
dataflow_descriptor()
以 Python 字典形式返回完整的数据流描述符(解析后的数据流 YAML)。
descriptor = node.dataflow_descriptor()
print(descriptor["nodes"])
返回: dict
node_config()
从数据流描述符返回此节点的配置块。
config = node.node_config()
model_path = config.get("model", "default.pt")
返回: dict
dataflow_id()
返回运行中数据流的唯一标识符。
print(node.dataflow_id()) # e.g. "a1b2c3d4-..."
返回: str
is_restart()
检查此节点是否在之前的退出或故障后被重启。用于决定是恢复保存的状态还是重新开始。
if node.is_restart():
restore_checkpoint()
返回: bool
restart_count()
返回此节点被重启的次数。首次运行时返回 0,第一次重启后返回 1,以此类推。
print(f"Restart #{node.restart_count()}")
返回: int
merge_external_events(subscription)
将 ROS2 订阅流合并到节点的主事件循环中。调用此方法后,ROS2 消息以 kind 设置为 "external" 的事件形式到达。
from adora import Node, Ros2Context, Ros2Node, Ros2NodeOptions, Ros2Topic
node = Node()
ros2_context = Ros2Context()
ros2_node = ros2_context.new_node("listener", Ros2NodeOptions())
topic = Ros2Topic("/chatter", "std_msgs/String", ros2_node)
subscription = ros2_node.create_subscription(topic)
node.merge_external_events(subscription)
for event in node:
if event["kind"] == "external":
print("ROS2:", event["value"])
elif event["type"] == "INPUT":
print("Adora:", event["id"])
Parameters:
subscription(adora.Ros2Subscription)—— 通过 adora ROS2 桥接创建的 ROS2 订阅。
迭代支持
Node 类实现了 __iter__ 和 __next__,因此可以直接迭代:
for event in node:
match event["type"]:
case "INPUT":
process(event["value"])
case "STOP":
break
迭代器在每次迭代时无超时调用 next()。当事件流关闭时产生 None,从而终止循环。
事件字典
事件以普通 Python 字典形式返回。结构取决于事件类型。
INPUT
从另一个节点收到输入消息。
{
"type": "INPUT",
"id": "camera_image", # input ID as declared in the dataflow YAML
"kind": "adora", # "adora" for dataflow events, "external" for ROS2
"value": <pyarrow.Array>, # the payload as an Apache Arrow array
"metadata": {
"timestamp": datetime, # UTC-aware datetime.datetime
"open_telemetry_context": "...", # tracing context (if enabled)
... # any user-supplied metadata
},
}
访问数据:
values = event["value"].to_pylist() # convert to Python list
array = event["value"].to_numpy() # convert to NumPy array
INPUT_CLOSED
输入通道已关闭(上游节点已完成)。
{
"type": "INPUT_CLOSED",
"id": "camera_image",
"kind": "adora",
}
STOP
数据流正在关闭。
{
"type": "STOP",
"id": "MANUAL" | "ALL_INPUTS_CLOSED", # stop cause
"kind": "adora",
}
ERROR
运行时发生错误。
{
"type": "ERROR",
"error": "description of the error",
"kind": "adora",
}
外部(ROS2)
使用 merge_external_events 时,ROS2 消息以如下形式到达:
{
"kind": "external",
"value": <pyarrow.Array>, # the ROS2 message as an Arrow array
}
AdoraStatus 枚举
用作算子 on_event 方法的返回值以控制事件循环。
from adora import AdoraStatus
| 值 | Meaning |
|---|---|
AdoraStatus.CONTINUE | 继续处理事件(值 0) |
AdoraStatus.STOP | 停止此算子(值 1) |
AdoraStatus.STOP_ALL | 停止整个数据流(值 2) |
算子 API
算子在 adora 运行时进程内运行(无单独的操作系统进程)。它们被定义为名为 Operator 的 Python 类,具有 on_event 方法。
Operator 类(用户定义)
创建一个包含 Operator 类的 Python 文件:
from adora import AdoraStatus
class Operator:
def __init__(self):
# Initialize state here
self.count = 0
def on_event(self, adora_event, send_output) -> AdoraStatus:
if adora_event["type"] == "INPUT":
self.count += 1
# Process the input and optionally send output
send_output("result", b"processed", adora_event["metadata"])
return AdoraStatus.CONTINUE
Methods:
__init__(self)—— 算子加载时调用一次。在此初始化任何状态或模型。on_event(self, adora_event, send_output) -> AdoraStatus—— 对每个传入事件调用。必须返回AdoraStatus值。
on_event 的参数:
adora_event(dict)—— 一个事件字典。send_output(callable)—— 发送输出数据的回调(见下文)。
运行时还会在算子实例上设置 self.dataflow_descriptor,值为解析后的数据流 YAML 字典。
send_output 回调
send_output 回调传递给 on_event,用于从算子发送数据。
send_output(output_id, data, metadata=None)
Parameters:
output_id(str)—— 数据流 YAML 中声明的输出名称。data(bytes | pyarrow.Array)—— 载荷。metadata(dict,可选)—— 要附加的元数据。传递adora_event["metadata"]以传播追踪上下文。
Example:
import pyarrow as pa
from adora import AdoraStatus
class Operator:
def on_event(self, adora_event, send_output) -> AdoraStatus:
if adora_event["type"] == "INPUT":
result = pa.array([42], type=pa.int64())
send_output("output", result, adora_event["metadata"])
return AdoraStatus.CONTINUE
DataflowBuilder
from adora.builder import DataflowBuilder, Node, Operator, Output
在 Python 中以编程方式构建数据流 YAML。
DataflowBuilder 类
__init__(name="adora-dataflow")
创建新的数据流构建器。
flow = DataflowBuilder("my-robot")
Parameters:
name(str,可选)—— 数据流名称。默认为"adora-dataflow"。
add_node(id, **kwargs) -> Node
向数据流添加节点。返回 Node 对象以供进一步配置。
sender = flow.add_node("sender")
Parameters:
id(str)—— 唯一节点标识符。**kwargs—— 传递到 YAML 的额外节点配置。
返回: Node(构建器)
to_yaml(path=None) -> str | None
生成数据流的 YAML 表示。如果提供了 path,则写入文件并返回 None。否则返回 YAML 字符串。
# Write to file
flow.to_yaml("dataflow.yml")
# Get as string
yaml_str = flow.to_yaml()
Parameters:
path(str,可选)—— 写入 YAML 的文件路径。
返回: str | None
上下文管理器
DataflowBuilder 支持 with 语句:
with DataflowBuilder("my-flow") as flow:
flow.add_node("sender").path("sender.py")
flow.to_yaml("dataflow.yml")
Node 类(构建器)
由 DataflowBuilder.add_node() 返回。所有 setter 方法返回 self 以支持链式调用。
path(path) -> Node
设置节点可执行文件或脚本的路径。
node.path("my_node.py")
args(args) -> Node
设置节点的命令行参数。
node.args("--verbose --port 8080")
env(env) -> Node
设置节点的环境变量。
node.env({"MODEL_PATH": "/models/yolo.pt"})
build(command) -> Node
设置节点的构建命令(启动前运行)。
node.build("pip install -r requirements.txt")
git(url, branch=None, tag=None, rev=None) -> Node
将 Git 仓库设置为节点的源。
node.git("https://github.com/org/repo.git", branch="main")
add_operator(operator) -> Node
将 Operator 附加到此节点。
op = Operator("detector", python="object_detection.py")
node.add_operator(op)
add_output(output_id) -> Output
在此节点上声明一个输出,并返回 Output 引用以用作输入源。
output = sender.add_output("data")
add_input(input_id, source, queue_size=None, queue_policy=None) -> Node
将此节点订阅到另一个节点的输出。
# Using an Output object
output = sender.add_output("data")
receiver.add_input("data", output)
# Using a string reference
receiver.add_input("tick", "adora/timer/millis/100")
# With a custom queue size
receiver.add_input("images", camera_output, queue_size=2)
# Lossless input (blocks sender when full)
receiver.add_input("commands", cmd_output, queue_size=100, queue_policy="backpressure")
Parameters:
input_id(str)—— 此节点上的输入名称。source(str | Output)—— 字符串("node_id/output_id")或Output对象。queue_size(int,可选)—— 此输入的最大缓冲消息数。queue_policy(str, optional) –"drop_oldest"(default) or"backpressure"(buffers up to 10xqueue_sizebefore dropping).
to_dict() -> dict
返回节点的字典表示,用于 YAML 序列化。
Output 类(构建器)
由 Node.add_output() 返回。表示节点输出的引用,用作 add_input() 中的源。
output = sender.add_output("data")
receiver.add_input("sensor_data", output)
str(output) # "sender/data"
Operator 类(构建器)
定义用于嵌入节点 YAML 配置中的算子。
__init__(id, name=None, description=None, build=None, python=None, shared_library=None, send_stdout_as=None)
op = Operator(
id="detector",
python="object_detection.py",
send_stdout_as="detection_text",
)
Parameters:
id(str)—— 唯一算子标识符。name(str,可选)—— 显示名称。description(str,可选)—— 可读的描述。build(str,可选)—— 加载前运行的构建命令。python(str,可选)—— Python 算子文件的路径。shared_library(str,可选)—— 共享库算子的路径。send_stdout_as(str,可选)—— 将算子的 stdout 作为具有此 ID 的输出路由。
to_dict() -> dict
返回用于 YAML 序列化的字典表示。
CUDA 模块
from adora.cuda import torch_to_ipc_buffer, ipc_buffer_to_ipc_handle, open_ipc_handle
通过 CUDA IPC 实现节点间零拷贝 GPU 张量共享的实用工具。需要支持 CUDA 的 PyTorch 和 Numba。
torch_to_ipc_buffer(tensor) -> tuple[pyarrow.Array, dict]
将 PyTorch CUDA 张量转换为包含 CUDA IPC 句柄的 Arrow 数组和元数据字典。通过数据流发送两者以无需复制即可共享 GPU 内存。
import torch
import pyarrow as pa
from adora import Node
from adora.cuda import torch_to_ipc_buffer
node = Node()
tensor = torch.randn(1024, 768, device="cuda")
ipc_buffer, metadata = torch_to_ipc_buffer(tensor)
node.send_output("gpu_data", ipc_buffer, metadata)
Parameters:
tensor(torch.Tensor)—— CUDA 张量。
返回: tuple[pyarrow.Array, dict] —— IPC 句柄(int8 Arrow 数组),以及包含形状、步长、数据类型、大小、偏移和来源信息的元数据。
ipc_buffer_to_ipc_handle(handle_buffer, metadata) -> IpcHandle
从接收到的 Arrow 缓冲区和元数据重建 CUDA IPC 句柄。
from adora.cuda import ipc_buffer_to_ipc_handle
event = node.next()
ipc_handle = ipc_buffer_to_ipc_handle(event["value"], event["metadata"])
Parameters:
handle_buffer(pyarrow.Array)—— 来自event["value"]的 Arrow 数组。metadata(dict)—— 来自event["metadata"]的元数据。
返回: numba.cuda.cudadrv.driver.IpcHandle
open_ipc_handle(ipc_handle, metadata) -> ContextManager[torch.Tensor]
打开 CUDA IPC 句柄并产生 PyTorch 张量。作为上下文管理器使用以确保正确清理。
from adora.cuda import ipc_buffer_to_ipc_handle, open_ipc_handle
event = node.next()
ipc_handle = ipc_buffer_to_ipc_handle(event["value"], event["metadata"])
with open_ipc_handle(ipc_handle, event["metadata"]) as tensor:
result = tensor * 2 # use the GPU tensor directly
Parameters:
ipc_handle(IpcHandle)—— 来自ipc_buffer_to_ipc_handle的句柄。metadata(dict)—— 包含形状、步长和数据类型信息的元数据字典。
返回: 产生 CUDA 上 torch.Tensor 的上下文管理器。
快速开始示例
一个接收图像、处理并发送结果的完整节点:
#!/usr/bin/env python3
"""示例节点:接收消息、转换并发送输出。"""
import logging
import pyarrow as pa
from adora import Node
def main():
node = Node()
for event in node:
if event["type"] == "INPUT":
input_id = event["id"]
if input_id == "message":
values = event["value"].to_pylist()
number = values[0]
# Create a struct array with multiple fields
result = pa.StructArray.from_arrays(
[
pa.array([number * 2]),
pa.array([f"Message #{number}"]),
],
names=["doubled", "description"],
)
node.send_output("transformed", result)
logging.info("Transformed message %d", number)
elif event["type"] == "STOP":
logging.info("Node stopping")
break
if __name__ == "__main__":
main()
运行方式:
adora run dataflow.yml
DataflowBuilder 示例
以编程方式构建数据流,而非手动编写 YAML:
#!/usr/bin/env python3
"""构建一个简单的 sender -> receiver 数据流。"""
from adora.builder import DataflowBuilder, Operator
flow = DataflowBuilder("example-flow")
# Add a timer-driven sender node
sender = flow.add_node("sender")
sender.path("sender.py")
tick_output = sender.add_output("message")
# Add a receiver that subscribes to the sender
receiver = flow.add_node("receiver")
receiver.path("receiver.py")
receiver.add_input("message", tick_output)
# Add a node with a timer input
timed_node = flow.add_node("periodic")
timed_node.path("periodic.py")
timed_node.add_input("tick", "adora/timer/millis/100")
# Add a node with an operator
runtime_node = flow.add_node("runtime-node")
op = Operator("detector", python="object_detection.py")
runtime_node.add_operator(op)
runtime_node.add_input("image", "camera/image")
# Write or print the YAML
flow.to_yaml("dataflow.yml")
print(flow.to_yaml())