Python 快速入门
本指南将引导你使用 Python 编写 adora 数据流的节点和算子。
前提条件
cargo install adora-cli # CLI(adora 命令)
pip install adora-rs # Python 节点/算子 API
adora-rs 包已包含 pyarrow 作为依赖。
从源码构建(替代 pip install adora-rs):
pip install maturin # requires >= 1.8
cd apis/python/node && maturin develop --uv && cd ../../..
Hello World: Sender and Receiver
Create three files:
sender.py – sends 100 numbered messages:
import pyarrow as pa
from adora import Node
node = Node()
for i in range(100):
node.send_output("message", pa.array([i]))
receiver.py – receives and prints messages:
from adora import Node
node = Node()
for event in node:
if event["type"] == "INPUT":
values = event["value"].to_pylist()
print(f"Received {event['id']}: {values}")
elif event["type"] == "STOP":
break
dataflow.yml – connects sender to receiver:
nodes:
- id: sender
path: sender.py
outputs:
- message
- id: receiver
path: receiver.py
inputs:
message: sender/message
Run it:
adora run dataflow.yml
Events
Every call to node.next() or iteration over for event in node returns an event dictionary:
| Key | 类型 | 描述 |
|---|---|---|
type | str | "INPUT", "INPUT_CLOSED", "STOP", or "ERROR" |
id | str | Input name (e.g. "message") – only for INPUT events |
value | pyarrow.Array or None | The data payload |
metadata | dict | Tracing/routing metadata |
Handle events by checking event["type"]:
for event in node:
match event["type"]:
case "INPUT":
process(event["id"], event["value"])
case "INPUT_CLOSED":
print(f"Input {event['id']} closed")
case "STOP":
break
Working with Arrow Data
All data flows through adora as Apache Arrow arrays. Common patterns:
import pyarrow as pa
# Simple values
node.send_output("count", pa.array([42]))
node.send_output("names", pa.array(["alice", "bob"]))
# Read values back
values = event["value"].to_pylist() # [42] or ["alice", "bob"]
# Structured data
struct = pa.StructArray.from_arrays(
[pa.array([1.5]), pa.array(["hello"])],
names=["x", "y"],
)
node.send_output("point", struct)
# Raw bytes (images, serialized data, etc.)
node.send_output("frame", pa.array(raw_bytes))
Operators
Operators are lightweight alternatives to nodes. They run inside the adora runtime process (no separate OS process), making them faster for simple transformations.
Define an Operator class with an on_event method:
# doubler_op.py
import pyarrow as pa
from adora import AdoraStatus
class Operator:
def on_event(self, event, send_output) -> AdoraStatus:
if event["type"] == "INPUT":
value = event["value"].to_pylist()[0]
send_output("doubled", pa.array([value * 2]), event["metadata"])
return AdoraStatus.CONTINUE
Reference it in YAML with operator instead of path:
nodes:
- id: timer
path: adora/timer/millis/500
outputs:
- tick
- id: doubler
operator:
python: doubler_op.py
inputs:
tick: timer/tick
outputs:
- doubled
When to use operators vs nodes:
| Nodes | Operators | |
|---|---|---|
| Process model | Separate OS process | In-process (shared runtime) |
| Startup cost | Higher | Lower |
| Isolation | Full process isolation | Shared memory space |
| Best for | Long-running, heavy compute | Lightweight transforms, filters |
异步节点
For nodes that need async I/O (HTTP calls, database queries, etc.), use recv_async():
import asyncio
from adora import Node
async def main():
node = Node()
for _ in range(50):
event = await node.recv_async()
if event["type"] == "STOP":
break
# Do async work here
result = await fetch_data(event["value"])
node.send_output("result", result)
asyncio.run(main())
See examples/python-async for a complete example.
日志
Use node.log() for structured logging that integrates with adora logs:
node.log("info", "Processing item", {"count": str(i)})
Or use Python’s standard logging module – adora captures stdout/stderr automatically:
import logging
logging.info("Processing item %d", i)
See examples/python-logging for logging module integration.
Timers
Built-in timer nodes generate periodic ticks without writing any code:
nodes:
- id: tick-source
path: adora/timer/millis/100 # tick every 100ms
outputs:
- tick
- id: my-node
path: my_node.py
inputs:
tick: tick-source/tick
Also available: adora/timer/hz/30 for 30 Hz.
下一步
- Python API 参考 – Node、Operator、DataflowBuilder、CUDA 完整 API 文档
- 通信模式 – 服务(请求/应答)和动作(目标/反馈/结果)模式
- Examples – python-dataflow, python-async, python-drain, python-concurrent-rw, python-multiple-arrays
- 分布式部署 – 使用
adora up跨多台机器运行