Python 快速入门
本指南将引导你使用 Python 编写 adora 数据流的节点和算子。
前提条件
cargo install adora-cli # CLI(adora 命令)
pip install adora-rs # Python 节点/算子 API
adora-rs 包已包含 pyarrow 作为依赖。
从源码构建(替代 pip install adora-rs):
pip install maturin # requires >= 1.8
cd apis/python/node && maturin develop --uv && cd ../../..
Hello World:发送者与接收者
创建三个文件:
sender.py – 发送 100 条带编号的消息:
import pyarrow as pa
from adora import Node
node = Node()
for i in range(100):
node.send_output("message", pa.array([i]))
receiver.py – 接收并打印消息:
from adora import Node
node = Node()
for event in node:
if event["type"] == "INPUT":
values = event["value"].to_pylist()
print(f"Received {event['id']}: {values}")
elif event["type"] == "STOP":
break
dataflow.yml – 将发送者连接到接收者:
nodes:
- id: sender
path: sender.py
outputs:
- message
- id: receiver
path: receiver.py
inputs:
message: sender/message
运行:
adora run dataflow.yml
Events
每次调用 node.next() 或通过 for event in node 迭代都会返回一个事件字典:
| Key | 类型 | 描述 |
|---|---|---|
type | str | "INPUT", "INPUT_CLOSED", "STOP", 或 "ERROR" |
id | str | 输入名称(例如 "message")– 仅适用于 INPUT 事件 |
value | pyarrow.Array 或 None | 数据载荷 |
metadata | dict | 追踪/路由元数据 |
通过检查 event["type"] 来处理事件:
for event in node:
match event["type"]:
case "INPUT":
process(event["id"], event["value"])
case "INPUT_CLOSED":
print(f"Input {event['id']} closed")
case "STOP":
break
使用 Arrow 数据
所有数据在 adora 中以 Apache Arrow 数组的形式流动。常见模式:
import pyarrow as pa
# Simple values
node.send_output("count", pa.array([42]))
node.send_output("names", pa.array(["alice", "bob"]))
# Read values back
values = event["value"].to_pylist() # [42] or ["alice", "bob"]
# Structured data
struct = pa.StructArray.from_arrays(
[pa.array([1.5]), pa.array(["hello"])],
names=["x", "y"],
)
node.send_output("point", struct)
# Raw bytes (images, serialized data, etc.)
node.send_output("frame", pa.array(raw_bytes))
Operators
算子是节点的轻量级替代方案。它们在 adora 运行时进程内运行(无需单独的操作系统进程),因此对于简单的转换操作更加高效。
定义一个包含 on_event 方法的 Operator 类:
# doubler_op.py
import pyarrow as pa
from adora import AdoraStatus
class Operator:
def on_event(self, event, send_output) -> AdoraStatus:
if event["type"] == "INPUT":
value = event["value"].to_pylist()[0]
send_output("doubled", pa.array([value * 2]), event["metadata"])
return AdoraStatus.CONTINUE
在 YAML 中使用 operator 而非 path 来引用它:
nodes:
- id: timer
path: adora/timer/millis/500
outputs:
- tick
- id: doubler
operator:
python: doubler_op.py
inputs:
tick: timer/tick
outputs:
- doubled
何时使用算子 vs 节点:
| Nodes | Operators | |
|---|---|---|
| 进程模型 | 独立的操作系统进程 | 进程内(共享运行时) |
| 启动开销 | Higher | Lower |
| Isolation | 完全的进程隔离 | 共享内存空间 |
| 最适用于 | 长时间运行、重计算任务 | 轻量级转换、过滤 |
异步节点
对于需要异步 I/O(HTTP 调用、数据库查询等)的节点,请使用 recv_async():
import asyncio
from adora import Node
async def main():
node = Node()
for _ in range(50):
event = await node.recv_async()
if event["type"] == "STOP":
break
# Do async work here
result = await fetch_data(event["value"])
node.send_output("result", result)
asyncio.run(main())
参见 examples/python-async 获取完整示例。
日志
使用 node.log() 进行结构化日志记录,可与 adora logs 集成:
node.log("info", "Processing item", {"count": str(i)})
或者使用 Python 标准的 logging 模块 – adora 会自动捕获 stdout/stderr:
import logging
logging.info("Processing item %d", i)
参见 examples/python-logging 了解日志模块集成。
Timers
内置的定时器节点无需编写任何代码即可生成周期性的定时信号:
nodes:
- id: tick-source
path: adora/timer/millis/100 # tick every 100ms
outputs:
- tick
- id: my-node
path: my_node.py
inputs:
tick: tick-source/tick
也可使用:adora/timer/hz/30 表示 30 Hz。
下一步
- Python API 参考 – Node、Operator、DataflowBuilder、CUDA 的完整 API 文档
- 通信模式 – 服务(请求/应答)和动作(目标/反馈/结果)模式
- 示例 – python-dataflow、python-async、python-drain、python-concurrent-rw、python-multiple-arrays
- 分布式部署 – 使用
adora up跨多台机器运行