Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python 快速入门

本指南将引导你使用 Python 编写 adora 数据流的节点和算子。

前提条件

cargo install adora-cli    # CLI(adora 命令)
pip install adora-rs       # Python 节点/算子 API

adora-rs 包已包含 pyarrow 作为依赖。

从源码构建(替代 pip install adora-rs):

pip install maturin  # requires >= 1.8
cd apis/python/node && maturin develop --uv && cd ../../..

Hello World:发送者与接收者

创建三个文件:

sender.py – 发送 100 条带编号的消息:

import pyarrow as pa
from adora import Node

node = Node()
for i in range(100):
    node.send_output("message", pa.array([i]))

receiver.py – 接收并打印消息:

from adora import Node

node = Node()
for event in node:
    if event["type"] == "INPUT":
        values = event["value"].to_pylist()
        print(f"Received {event['id']}: {values}")
    elif event["type"] == "STOP":
        break

dataflow.yml – 将发送者连接到接收者:

nodes:
  - id: sender
    path: sender.py
    outputs:
      - message

  - id: receiver
    path: receiver.py
    inputs:
      message: sender/message

运行:

adora run dataflow.yml

Events

每次调用 node.next() 或通过 for event in node 迭代都会返回一个事件字典:

Key类型描述
typestr"INPUT", "INPUT_CLOSED", "STOP", 或 "ERROR"
idstr输入名称(例如 "message")– 仅适用于 INPUT 事件
valuepyarrow.Array 或 None数据载荷
metadatadict追踪/路由元数据

通过检查 event["type"] 来处理事件:

for event in node:
    match event["type"]:
        case "INPUT":
            process(event["id"], event["value"])
        case "INPUT_CLOSED":
            print(f"Input {event['id']} closed")
        case "STOP":
            break

使用 Arrow 数据

所有数据在 adora 中以 Apache Arrow 数组的形式流动。常见模式:

import pyarrow as pa

# Simple values
node.send_output("count", pa.array([42]))
node.send_output("names", pa.array(["alice", "bob"]))

# Read values back
values = event["value"].to_pylist()  # [42] or ["alice", "bob"]

# Structured data
struct = pa.StructArray.from_arrays(
    [pa.array([1.5]), pa.array(["hello"])],
    names=["x", "y"],
)
node.send_output("point", struct)

# Raw bytes (images, serialized data, etc.)
node.send_output("frame", pa.array(raw_bytes))

Operators

算子是节点的轻量级替代方案。它们在 adora 运行时进程内运行(无需单独的操作系统进程),因此对于简单的转换操作更加高效。

定义一个包含 on_event 方法的 Operator 类:

# doubler_op.py
import pyarrow as pa
from adora import AdoraStatus

class Operator:
    def on_event(self, event, send_output) -> AdoraStatus:
        if event["type"] == "INPUT":
            value = event["value"].to_pylist()[0]
            send_output("doubled", pa.array([value * 2]), event["metadata"])
        return AdoraStatus.CONTINUE

在 YAML 中使用 operator 而非 path 来引用它:

nodes:
  - id: timer
    path: adora/timer/millis/500
    outputs:
      - tick

  - id: doubler
    operator:
      python: doubler_op.py
      inputs:
        tick: timer/tick
      outputs:
        - doubled

何时使用算子 vs 节点:

NodesOperators
进程模型独立的操作系统进程进程内(共享运行时)
启动开销HigherLower
Isolation完全的进程隔离共享内存空间
最适用于长时间运行、重计算任务轻量级转换、过滤

异步节点

对于需要异步 I/O(HTTP 调用、数据库查询等)的节点,请使用 recv_async()

import asyncio
from adora import Node

async def main():
    node = Node()
    for _ in range(50):
        event = await node.recv_async()
        if event["type"] == "STOP":
            break
        # Do async work here
        result = await fetch_data(event["value"])
        node.send_output("result", result)

asyncio.run(main())

参见 examples/python-async 获取完整示例。

日志

使用 node.log() 进行结构化日志记录,可与 adora logs 集成:

node.log("info", "Processing item", {"count": str(i)})

或者使用 Python 标准的 logging 模块 – adora 会自动捕获 stdout/stderr:

import logging
logging.info("Processing item %d", i)

参见 examples/python-logging 了解日志模块集成。

Timers

内置的定时器节点无需编写任何代码即可生成周期性的定时信号:

nodes:
  - id: tick-source
    path: adora/timer/millis/100    # tick every 100ms
    outputs:
      - tick

  - id: my-node
    path: my_node.py
    inputs:
      tick: tick-source/tick

也可使用:adora/timer/hz/30 表示 30 Hz。

下一步

  • Python API 参考 – Node、Operator、DataflowBuilder、CUDA 的完整 API 文档
  • 通信模式 – 服务(请求/应答)和动作(目标/反馈/结果)模式
  • 示例 – python-dataflow、python-async、python-drain、python-concurrent-rw、python-multiple-arrays
  • 分布式部署 – 使用 adora up 跨多台机器运行