Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python 快速入门

本指南将引导你使用 Python 编写 adora 数据流的节点和算子。

前提条件

cargo install adora-cli    # CLI(adora 命令)
pip install adora-rs       # Python 节点/算子 API

adora-rs 包已包含 pyarrow 作为依赖。

从源码构建(替代 pip install adora-rs):

pip install maturin  # requires >= 1.8
cd apis/python/node && maturin develop --uv && cd ../../..

Hello World: Sender and Receiver

Create three files:

sender.py – sends 100 numbered messages:

import pyarrow as pa
from adora import Node

node = Node()
for i in range(100):
    node.send_output("message", pa.array([i]))

receiver.py – receives and prints messages:

from adora import Node

node = Node()
for event in node:
    if event["type"] == "INPUT":
        values = event["value"].to_pylist()
        print(f"Received {event['id']}: {values}")
    elif event["type"] == "STOP":
        break

dataflow.yml – connects sender to receiver:

nodes:
  - id: sender
    path: sender.py
    outputs:
      - message

  - id: receiver
    path: receiver.py
    inputs:
      message: sender/message

Run it:

adora run dataflow.yml

Events

Every call to node.next() or iteration over for event in node returns an event dictionary:

Key类型描述
typestr"INPUT", "INPUT_CLOSED", "STOP", or "ERROR"
idstrInput name (e.g. "message") – only for INPUT events
valuepyarrow.Array or NoneThe data payload
metadatadictTracing/routing metadata

Handle events by checking event["type"]:

for event in node:
    match event["type"]:
        case "INPUT":
            process(event["id"], event["value"])
        case "INPUT_CLOSED":
            print(f"Input {event['id']} closed")
        case "STOP":
            break

Working with Arrow Data

All data flows through adora as Apache Arrow arrays. Common patterns:

import pyarrow as pa

# Simple values
node.send_output("count", pa.array([42]))
node.send_output("names", pa.array(["alice", "bob"]))

# Read values back
values = event["value"].to_pylist()  # [42] or ["alice", "bob"]

# Structured data
struct = pa.StructArray.from_arrays(
    [pa.array([1.5]), pa.array(["hello"])],
    names=["x", "y"],
)
node.send_output("point", struct)

# Raw bytes (images, serialized data, etc.)
node.send_output("frame", pa.array(raw_bytes))

Operators

Operators are lightweight alternatives to nodes. They run inside the adora runtime process (no separate OS process), making them faster for simple transformations.

Define an Operator class with an on_event method:

# doubler_op.py
import pyarrow as pa
from adora import AdoraStatus

class Operator:
    def on_event(self, event, send_output) -> AdoraStatus:
        if event["type"] == "INPUT":
            value = event["value"].to_pylist()[0]
            send_output("doubled", pa.array([value * 2]), event["metadata"])
        return AdoraStatus.CONTINUE

Reference it in YAML with operator instead of path:

nodes:
  - id: timer
    path: adora/timer/millis/500
    outputs:
      - tick

  - id: doubler
    operator:
      python: doubler_op.py
      inputs:
        tick: timer/tick
      outputs:
        - doubled

When to use operators vs nodes:

NodesOperators
Process modelSeparate OS processIn-process (shared runtime)
Startup costHigherLower
IsolationFull process isolationShared memory space
Best forLong-running, heavy computeLightweight transforms, filters

异步节点

For nodes that need async I/O (HTTP calls, database queries, etc.), use recv_async():

import asyncio
from adora import Node

async def main():
    node = Node()
    for _ in range(50):
        event = await node.recv_async()
        if event["type"] == "STOP":
            break
        # Do async work here
        result = await fetch_data(event["value"])
        node.send_output("result", result)

asyncio.run(main())

See examples/python-async for a complete example.

日志

Use node.log() for structured logging that integrates with adora logs:

node.log("info", "Processing item", {"count": str(i)})

Or use Python’s standard logging module – adora captures stdout/stderr automatically:

import logging
logging.info("Processing item %d", i)

See examples/python-logging for logging module integration.

Timers

Built-in timer nodes generate periodic ticks without writing any code:

nodes:
  - id: tick-source
    path: adora/timer/millis/100    # tick every 100ms
    outputs:
      - tick

  - id: my-node
    path: my_node.py
    inputs:
      tick: tick-source/tick

Also available: adora/timer/hz/30 for 30 Hz.

下一步

  • Python API 参考 – Node、Operator、DataflowBuilder、CUDA 完整 API 文档
  • 通信模式 – 服务(请求/应答)和动作(目标/反馈/结果)模式
  • Examples – python-dataflow, python-async, python-drain, python-concurrent-rw, python-multiple-arrays
  • 分布式部署 – 使用 adora up 跨多台机器运行