Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Getting Started with Python

This guide walks you through writing Python nodes and operators for adora dataflows.

Prerequisites

cargo install adora-cli    # CLI (adora command)
pip install adora-rs       # Python node/operator API

The adora-rs package includes pyarrow as a dependency.

Building from source (instead of pip install adora-rs):

pip install maturin  # requires >= 1.8
cd apis/python/node && maturin develop --uv && cd ../../..

Hello World: Sender and Receiver

Create three files:

sender.py – sends 100 numbered messages:

import pyarrow as pa
from adora import Node

node = Node()
for i in range(100):
    node.send_output("message", pa.array([i]))

receiver.py – receives and prints messages:

from adora import Node

node = Node()
for event in node:
    if event["type"] == "INPUT":
        values = event["value"].to_pylist()
        print(f"Received {event['id']}: {values}")
    elif event["type"] == "STOP":
        break

dataflow.yml – connects sender to receiver:

nodes:
  - id: sender
    path: sender.py
    outputs:
      - message

  - id: receiver
    path: receiver.py
    inputs:
      message: sender/message

Run it:

adora run dataflow.yml

Events

Every call to node.next() or iteration over for event in node returns an event dictionary:

KeyTypeDescription
typestr"INPUT", "INPUT_CLOSED", "STOP", or "ERROR"
idstrInput name (e.g. "message") – only for INPUT events
valuepyarrow.Array or NoneThe data payload
metadatadictTracing/routing metadata

Handle events by checking event["type"]:

for event in node:
    match event["type"]:
        case "INPUT":
            process(event["id"], event["value"])
        case "INPUT_CLOSED":
            print(f"Input {event['id']} closed")
        case "STOP":
            break

Working with Arrow Data

All data flows through adora as Apache Arrow arrays. Common patterns:

import pyarrow as pa

# Simple values
node.send_output("count", pa.array([42]))
node.send_output("names", pa.array(["alice", "bob"]))

# Read values back
values = event["value"].to_pylist()  # [42] or ["alice", "bob"]

# Structured data
struct = pa.StructArray.from_arrays(
    [pa.array([1.5]), pa.array(["hello"])],
    names=["x", "y"],
)
node.send_output("point", struct)

# Raw bytes (images, serialized data, etc.)
node.send_output("frame", pa.array(raw_bytes))

Operators

Operators are lightweight alternatives to nodes. They run inside the adora runtime process (no separate OS process), making them faster for simple transformations.

Define an Operator class with an on_event method:

# doubler_op.py
import pyarrow as pa
from adora import AdoraStatus

class Operator:
    def on_event(self, event, send_output) -> AdoraStatus:
        if event["type"] == "INPUT":
            value = event["value"].to_pylist()[0]
            send_output("doubled", pa.array([value * 2]), event["metadata"])
        return AdoraStatus.CONTINUE

Reference it in YAML with operator instead of path:

nodes:
  - id: timer
    path: adora/timer/millis/500
    outputs:
      - tick

  - id: doubler
    operator:
      python: doubler_op.py
      inputs:
        tick: timer/tick
      outputs:
        - doubled

When to use operators vs nodes:

NodesOperators
Process modelSeparate OS processIn-process (shared runtime)
Startup costHigherLower
IsolationFull process isolationShared memory space
Best forLong-running, heavy computeLightweight transforms, filters

Async Nodes

For nodes that need async I/O (HTTP calls, database queries, etc.), use recv_async():

import asyncio
from adora import Node

async def main():
    node = Node()
    for _ in range(50):
        event = await node.recv_async()
        if event["type"] == "STOP":
            break
        # Do async work here
        result = await fetch_data(event["value"])
        node.send_output("result", result)

asyncio.run(main())

See examples/python-async for a complete example.

Logging

Use node.log() for structured logging that integrates with adora logs:

node.log("info", "Processing item", {"count": str(i)})

Or use Python’s standard logging module – adora captures stdout/stderr automatically:

import logging
logging.info("Processing item %d", i)

See examples/python-logging for logging module integration.

Timers

Built-in timer nodes generate periodic ticks without writing any code:

nodes:
  - id: tick-source
    path: adora/timer/millis/100    # tick every 100ms
    outputs:
      - tick

  - id: my-node
    path: my_node.py
    inputs:
      tick: tick-source/tick

Also available: adora/timer/hz/30 for 30 Hz.

Next Steps