Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

日志

Adora 为实时机器人和 AI 数据流提供结构化日志系统。日志按节点捕获为结构化 JSONL 文件,转发到协调器进行实时流式传输,并可选择性地通过数据流图作为数据消息路由。

Which Logging Approach Should I Use?

Start here if you’re unsure which approach fits your use case.

I want to…Approach配置
Log from PythonUse Python’s logging module (auto-bridged)Nothing – just import logging
Log from RustUse node.log_info() / node.log_error() etc.Nothing – works out of the box
Log from C/C++Use adora_log() / log_message()Nothing – works out of the box
Filter noisy nodesSet min_log_level in YAMLPer-node YAML field
Watch all logs in one placeSubscribe to adora/logs virtual inputinputs: logs: adora/logs
Process one node’s logs as dataUse send_logs_as on that nodePer-node YAML + wire the output
Rotate log filesSet max_log_size in YAMLPer-node YAML field
Build a custom log sinkUse adora-log-utils crateRust dependency
Filter CLI displayUse --log-level / --log-filter flagsCLI flags or env vars

Language-Specific Quick Start

Python – the simplest path is Python’s built-in logging module:

import logging
from adora import Node

node = Node()  # Automatically bridges Python logging -> adora

logging.info("Sensor started")       # Captured as structured "info" log
logging.warning("High temp: 42C")    # Captured as structured "warn" log
print("raw debug output")            # Captured as "stdout" level

When Node() is created, it installs a handler that routes all Python logging calls through Rust’s tracing system. The daemon parses these as structured log entries with level, message, file, and line number. No extra configuration needed.

You can also use the explicit API for structured fields:

node.log_info("Reading acquired")
node.log("info", "Reading acquired", fields={"sensor_id": "temp-01"})

Rust – use the node API convenience methods:

#![allow(unused)]
fn main() {
let (node, mut events) = AdoraNode::init_from_env()?;

// Convenience methods (recommended for most cases)
node.log_info("Sensor started");
node.log_warn("High temperature");

// With structured fields
let mut fields = BTreeMap::new();
fields.insert("sensor_id".into(), "temp-01".into());
node.log_with_fields("info", "Reading acquired", None, Some(&fields));
}

Alternatively, Rust nodes can use the tracing crate. When adora’s tracing subscriber is initialized (via init_tracing()), tracing::info!() etc. output structured JSON to stdout, which the daemon parses automatically:

#![allow(unused)]
fn main() {
// Also works -- parsed as structured logs by the daemon
tracing::info!("Sensor started");
tracing::warn!(sensor_id = "temp-01", "High temperature");
}

Use node.log_*() when you want explicit control over the log format. Use tracing::*!() when you want ecosystem integration (spans, instrumentation, OpenTelemetry). Both produce identical structured log entries in the daemon.

C – use the adora_log() function:

adora_log(ctx, "info", 4, "Sensor started", 14);

C++ – use the log_message() function:

log_message(node.send_output, "info", "Sensor started");

功能一览

特性范围配置
日志级别过滤CLI 显示--log-levelADORA_LOG_LEVEL
输出格式CLI 显示--log-formatADORA_LOG_FORMAT
按节点级别覆盖CLI 显示--log-filterADORA_LOG_FILTER
源级别过滤按节点 YAMLmin_log_level
标准输出转数据路由按节点 YAMLsend_stdout_as
结构化日志路由按节点 YAMLsend_logs_as
日志文件轮转按节点 YAMLmax_log_size
轮转文件限制按节点 YAMLmax_rotated_files
节点日志 APIRust/Python/C/C++ 节点node.log()adora_log()
日志工具库Rust crateadora-log-utils
Log aggregationDataflow inputadora/logs virtual input
时间范围过滤adora logs--since--until
实时日志流adora logs--follow
文本搜索adora logs--grep
本地日志读取adora logs--local--all-nodes

日志文件格式

每个节点在以下路径生成一个 JSONL 文件(每行一个 JSON 对象):

<working_dir>/out/<dataflow_uuid>/log_<node_id>.jsonl

每行具有以下结构:

{
  "timestamp": "2024-01-15T10:30:00.123Z",
  "level": "info",
  "node_id": "sensor",
  "message": "Starting sensor...",
  "target": "sensor::module",
  "fields": { "key": "value" }
}
Field类型描述
timestampstringRFC3339 时间戳,精确到毫秒
levelstring"error""warn""info""debug""trace""stdout"
node_idstring节点 ID
messagestring日志消息文本
targetstring?Rust 模块目标(例如 "sensor::module"),缺失时为 null
fieldsobject?Structured key-value fields from the logging framework. Trust model: fields originate from node stdout and are passed through without sanitization. In mixed-trust environments, log consumers should validate field contents before acting on them

节点输出如何变为日志条目

守护进程捕获节点进程的每行 stdout/stderr 并尝试将其解析为结构化日志消息(包含 levelmessagetimestamp 和可选 fields 的 JSON)。解析成功时保留结构化字段。解析失败时,原始行变为 "stdout"-level 条目。

这意味着使用 Rust 的 tracinglog crate 并输出 JSON 的节点会自动获得完整的结构化日志。仅使用 println! 的节点则产生 "stdout"-level 条目。


查看日志:adora run

使用 adora run 运行数据流时,所有节点的日志会在终端上实时显示。

Flags

adora run dataflow.yml [OPTIONS]
标志默认环境变量描述
--log-level LEVELstdoutADORA_LOG_LEVEL最低显示级别
--log-format FORMATprettyADORA_LOG_FORMAT输出格式:prettyjsoncompact
--log-filter FILTERnoneADORA_LOG_FILTER按节点级别覆盖

日志级别

从最详细到最简洁:

Level描述
stdout包括节点原始 stdout 在内的所有内容(默认)
trace细粒度诊断消息
debug开发者级别诊断消息
info一般信息消息
warn警告条件
error仅错误条件

设置 --log-level info 会隐藏 stdouttracedebug 消息。stdout 级别是一个特殊的全通级别,放行所有内容。

级别过滤逻辑

级别过滤使用 LogLevelOrStdout::passes()

Message level    Filter level    Displayed?
─────────────    ────────────    ──────────
stdout           stdout          yes
stdout           info            no       (stdout only passes stdout filter)
info             stdout          yes      (any log level passes stdout filter)
debug            info            no       (debug is more verbose than info)
error            info            yes      (error is less verbose than info)

按节点覆盖

--log-filter 标志允许您为不同节点设置不同级别:

adora run dataflow.yml --log-level info --log-filter "sensor=debug,planner=warn"

这会为所有节点显示 info 及以上级别,但 sensor(显示 debug 及以上)和 planner(显示 warn 及以上)除外。

格式:"node1=level,node2=level"(逗号分隔的 name=level 对)。

输出格式

Pretty(默认)– 彩色、人类可读:

10:30:00 INFO   sensor: Starting sensor...

10:30:01 INFO   [adora]: spawning node processor

10:30:01 stdout sensor: raw output line
  • 本地时区的时间戳(HH:MM:SS
  • 级别着色:ERROR(红色)、WARN(黄色)、INFO(绿色)、DEBUG(蓝色)、TRACE(暗色)、stdout(斜体暗蓝)
  • 节点名称加粗并根据名称显示唯一颜色
  • 系统消息以 [adora] 为前缀
  • 生命周期消息(spawningnode finishedstopping)通过空行进行视觉分隔

Json – 完整的 LogMessage 结构体输出为 JSON,每行一条:

{"build_id":null,"dataflow_id":"abc-123","node_id":"sensor","level":"INFO","message":"Starting...","timestamp":"2024-01-15T10:30:00Z",...}

适用于通过管道传递给 jq 或导入日志聚合系统。

Compact – 精简,无色彩:

10:30:00 INFO sensor: Starting sensor...

适用于 CI/CD 环境和日志文件。


查看日志:adora logs

读取历史日志或从运行中的数据流实时流式传输日志。

基本用法

# Read logs for a specific node (via coordinator)
adora logs <dataflow_uuid> <node_name>

# Read local log files directly
adora logs --local <node_name>
adora logs --local --all-nodes

# Stream live logs
adora logs <dataflow_uuid> <node_name> --follow
adora logs --local <node_name> --follow

Flags

标志Short默认描述
--localfalse从本地 out/ 目录读取而非从协调器
--all-nodesfalse合并所有节点的日志,按时间戳排序
--tail N-nall仅显示最后 N 行
--follow-ffalse在新日志条目到达时实时流式输出
--since DURATIONnone仅显示指定时间之后的日志
--until DURATIONnone仅显示指定时间之前的日志
--level LEVELstdout最低日志级别(环境变量:ADORA_LOG_LEVEL
--grep PATTERNnone不区分大小写的文本搜索
--coordinator-addr IP127.0.0.1协调器地址
--coordinator-port PORTdefault协调器控制端口

时间过滤

--since--until 接受相对于当前时间的持续时间字符串:

# Logs from the last 5 minutes
adora logs --local sensor --since 5m

# Logs from 1 hour ago to 30 minutes ago
adora logs --local sensor --since 1h --until 30m

# Last 10 errors from the past hour
adora logs --local sensor --since 1h --level error --tail 10

支持的时间格式:30(秒)、30s5m1h2d

文本搜索

--grep 对以下内容执行不区分大小写的子串匹配:

  • 日志消息文本
  • 节点 ID
  • 模块目标
# Find all timeout-related messages
adora logs --local --all-nodes --grep "timeout"

# Find errors from a specific module
adora logs --local sensor --grep "camera::driver" --level error

过滤流水线

所有过滤器按以下顺序应用:

Read/Parse -> Time Filters -> Grep -> Tail -> Display

在协调器模式下使用 --since--until--grep 时,CLI 从服务器获取所有日志(忽略服务端的 --tail)并在客户端应用所有过滤器。这确保了组合使用过滤器时结果的正确性。

本地模式 vs 协调器模式

本地模式--local)直接从当前工作目录的 out/ 目录读取 JSONL 文件。无需运行协调器或守护进程。如果使用 --all-nodes 或未指定节点名称,所有日志文件将合并并按时间戳排序。

协调器模式(默认)通过 WebSocket 连接到运行中的协调器。协调器从守护进程的工作目录读取日志文件并将其回传。适用于本地和分布式部署。

跟踪模式

本地跟踪--local --follow):每 200ms 轮询日志文件获取新内容。新行被解析、经 --grep 过滤后打印。时间/尾部过滤器仅应用于初始历史输出。

协调器跟踪--follow):向协调器打开 WebSocket 订阅。协调器实时转发守护进程的日志消息。级别过滤在服务端应用以提高效率。--grep--since 在客户端对流应用。


环境变量

所有环境变量作为备选值 – CLI 标志始终优先。

变量使用者Values描述
ADORA_LOG_LEVELadora runadora logserrorwarninfodebugtracestdout默认最低日志级别
ADORA_LOG_FORMATadora runprettyjsoncompact默认输出格式
ADORA_LOG_FILTERadora run"node1=level,node2=level"默认的按节点覆盖
ADORA_QUIETdaemon任意值抑制日志转发显示(文件写入继续)

Example:

# Set defaults for a development session
export ADORA_LOG_LEVEL=info
export ADORA_LOG_FORMAT=pretty
export ADORA_LOG_FILTER="sensor=debug"

# These are equivalent:
adora run dataflow.yml
adora run dataflow.yml --log-level info --log-format pretty --log-filter "sensor=debug"

# CLI flag overrides env var:
adora run dataflow.yml --log-level debug   # overrides ADORA_LOG_LEVEL=info

YAML 配置

min_log_level

在日志到达日志文件、协调器或 send_logs_as 路由之前,在源端(守护进程端)过滤日志。

nodes:
  - id: noisy-sensor
    path: ./target/debug/sensor
    min_log_level: info    # 抑制此节点的 debug/trace/stdout

有效值:errorwarninfodebugtracestdout

设置后,守护进程在解析后立即丢弃低于此级别的日志消息。这减少了磁盘 I/O、网络流量和日志文件大小。过滤使用与 CLI 显示过滤器相同的 passes() 逻辑。

send_stdout_as

将原始 stdout/stderr 行路由为数据流输出消息。

nodes:
  - id: legacy-node
    path: ./legacy-script.py
    send_stdout_as: raw_output
    outputs:
      - raw_output
      - data

  - id: log-consumer
    inputs:
      logs: legacy-node/raw_output

每行 stdout/stderr 作为 Arrow 编码的字符串发送。这对于集成在 stdout 上输出数据的旧节点很有用(例如使用 print() 的 Python 脚本)。

send_stdout_as 和正常日志文件写入同时进行 – stdout 路由不会抑制日志文件。

send_logs_as

将解析的结构化日志条目路由为数据流输出消息。

nodes:
  - id: sensor
    path: ./target/debug/sensor
    send_logs_as: log_entries
    outputs:
      - data
      - log_entries

  - id: log-aggregator
    inputs:
      sensor_logs: sensor/log_entries

send_stdout_as 不同,这仅发送成功解析为结构化日志的行(非原始 stdout)。每个条目序列化为完整的 JSON LogMessage 字符串。min_log_level 过滤器在路由前应用 – 被抑制的消息不会发送。

使用此功能在数据流内部构建日志聚合、告警或监控节点。

adora/logs – Automatic Log Aggregation

Subscribe to logs from all nodes with a single input line – no manual wiring needed:

nodes:
  - id: sensor
    path: sensor.py
    inputs:
      tick: adora/timer/millis/200
    outputs:
      - reading

  - id: processor
    path: processor.py
    inputs:
      reading: sensor/reading
    outputs:
      - result

  - id: log-viewer
    path: log_viewer.py
    inputs:
      logs: adora/logs              # all nodes, all levels
      errors: adora/logs/error      # only error+ from all nodes
      sensor: adora/logs/info/sensor  # info+ from one node

The adora/logs virtual input works like adora/timer – the daemon handles subscription internally. Each log message arrives as a JSON-encoded LogMessage string in an Arrow array. To prevent infinite loops, a node never receives its own log messages.

Syntax:

Input描述
adora/logsAll logs from all nodes
adora/logs/<level>Logs at <level> or above from all nodes
adora/logs/<level>/<node-id>Logs at <level> or above from a specific node

Levels: stdout, error, warn, info, debug, trace.

When to use adora/logs vs send_logs_as:

adora/logssend_logs_as
范围All nodes at onceOne node at a time
YAML changesOnly the consumerEach source node
Adding a nodeZero wiring changesMust update consumer
用例Dashboard, monitoringPer-node log processing

See examples/log-aggregator/ for a complete working example.

max_log_size

启用基于大小的日志文件轮转。

nodes:
  - id: sensor
    path: ./target/debug/sensor
    max_log_size: "50MB"
Bytes
"1KB""1K"1,024
"50MB""50M"52,428,800
"1GB""1G"1,073,741,824
"1000"1,000(纯数字 = 字节)

当活动日志文件超过配置大小时,守护进程会:

  1. 刷新并关闭当前文件
  2. 重命名现有轮转文件:.4.jsonl -> .5.jsonl.3.jsonl -> .4.jsonl
  3. 重命名当前文件:log_sensor.jsonl -> log_sensor.1.jsonl
  4. 创建新的 log_sensor.jsonl
  5. 删除超出轮转限制的文件(默认 5,可通过 max_rotated_files 配置)

命名约定:

log_sensor.jsonl       # current (active)
log_sensor.1.jsonl     # previous
log_sensor.2.jsonl     # older
log_sensor.3.jsonl
log_sensor.4.jsonl
log_sensor.5.jsonl     # oldest (deleted on next rotation)

每个节点的最大磁盘使用量:max_log_size * (1 + max_rotated_files)(1 个活动文件 + N 个轮转文件)。

没有 max_log_size 时,日志文件会无限增长。对于长时间运行的数据流,请始终设置此项。

adora logs --local 命令自动读取节点的所有轮转文件并按时间顺序合并(最旧的轮转文件在前,当前文件在后)。

max_rotated_files

控制保留多少个轮转日志文件(默认:5,范围:1-100)。

nodes:
  - id: sensor
    path: ./target/debug/sensor
    max_log_size: "50MB"
    max_rotated_files: 10    # keep 10 rotated files instead of 5

max_rotated_files: 10max_log_size: "50MB" 时,每个节点最大磁盘使用量为 50MB * 11 = 550MB。较低的值节省磁盘空间;较高的值保留更多历史记录。

运行时节点限制

对于运行时节点(算子),每个运行时只允许每个日志字段有一个:

# OK -- single operator
nodes:
  - id: runtime-node
    operator:
      python: process.py
      send_logs_as: logs
      min_log_level: info
      max_log_size: "100MB"

# ERROR -- 多个算子具有冲突的配置
nodes:
  - id: runtime-node
    operators:
      - id: op1
        python: a.py
        send_logs_as: logs1
      - id: op2
        python: b.py
        send_logs_as: logs2    # Error: multiple send_logs_as

当运行时中的单个算子设置这些字段时,输出名称以算子 ID 为前缀(例如 op1/logs)。


节点日志 API

节点可以使用节点 API 以编程方式发出结构化日志消息。这等同于将 JSON 格式的日志行写入 stdout – 守护进程以相同方式解析。

Rust

#![allow(unused)]
fn main() {
use adora_node_api::AdoraNode;
use std::collections::BTreeMap;

let (node, mut events) = AdoraNode::init_from_env()?;

// 指定级别字符串和可选目标的通用日志
node.log("info", "sensor initialized", Some("sensor::init"));

// 便捷方法(无 target 参数)
node.log_error("connection failed");
node.log_warn("temperature elevated");
node.log_info("reading acquired");
node.log_debug("raw bytes received");
node.log_trace("entering loop iteration");

// 结构化字段(键值上下文通过 send_logs_as 保留)
let mut fields = BTreeMap::new();
fields.insert("sensor_id".to_string(), "temp-01".to_string());
fields.insert("reading".to_string(), "42.5".to_string());
node.log_with_fields("info", "reading acquired", None, Some(&fields));
}

level 参数接受 "error""warn"(或 "warning")、"info""debug""trace"。未知级别默认为 "info"。字段总量上限为 60 KB,以匹配下游 64 KB 的解析限制。

Python

Python nodes have three ways to log, all producing structured log entries:

from adora import Node
import logging

node = Node()

# Option 1: Python's logging module (recommended -- auto-bridged by Node())
logging.info("sensor initialized")
logging.warning("temperature elevated")
logging.debug("raw bytes: %s", data)

# Option 2: Explicit adora API with level string
node.log("info", "sensor initialized", target="sensor.init")
node.log("info", "reading acquired", fields={"sensor_id": "temp-01", "reading": "42.5"})

# Option 3: Convenience methods
node.log_error("connection failed")
node.log_warn("temperature elevated")
node.log_info("reading acquired")
node.log_debug("raw bytes received")
node.log_trace("entering loop iteration")

# This also works but produces "stdout"-level entries (no structure):
print("raw output")

How the Python logging bridge works: When Node() is created, it installs a custom logging.Handler that routes all Python logging calls through Rust’s tracing system. The daemon parses these as structured log entries with level, message, file path, and line number. This happens automatically – no configuration needed.

方法Structured?Fields support?When to use
logging.info()No (use extra= for custom formatters)General-purpose logging
node.log("info", msg, fields={...})When you need structured key-value context
node.log_info(msg)Quick one-liner, same as node.log("info", msg)
print()No (stdout level)Legacy code, quick debugging

Common pitfall: Do not call logging.basicConfig() before creating Node(). The node constructor sets up the logging bridge; calling basicConfig() first may install a conflicting handler. If you need custom formatters, configure them after Node() creation.

C

#include "node_api.h"

void *ctx = init_adora_context_from_env();
const char *level = "info";
const char *msg = "sensor initialized";
adora_log(ctx, level, strlen(level), msg, strlen(msg));

C++

// 通过 cxx bridge
auto node = init_adora_node();
log_message(node.send_output, "info", "sensor initialized");

日志工具库(adora-log-utils

adora-log-utils crate 提供解析、合并、过滤和格式化工具,用于在自定义 sink 节点中处理 LogMessage 条目。在构建通过 send_logs_as 消费日志数据的节点时使用。

API

#![allow(unused)]
fn main() {
use adora_log_utils;

// 从 JSON 解析 LogMessage(从 send_logs_as 接收)
let log = adora_log_utils::parse_log(json_str)?;

// 直接从 Arrow 输入数据解析(事件处理器的便捷方法)
let log = adora_log_utils::parse_log_from_arrow(&data)?;

// 将多个日志流合并为单一时间线
let merged = adora_log_utils::merge_by_timestamp(vec![stream_a, stream_b]);

// 按最低级别过滤
let errors = adora_log_utils::filter_by_level(&logs, &min_level);

// 格式化为 JSON(单行,无尾部换行)
let json = adora_log_utils::format_json(&log);

// 格式化为紧凑单行:"<timestamp> <node> <LEVEL>: <message>"
let compact = adora_log_utils::format_compact(&log);

// 格式化为美观输出:"[<timestamp>][<LEVEL>][<node>] <message>"
let pretty = adora_log_utils::format_pretty(&log);
}

Dependency

添加到 sink 节点的 Cargo.toml

[dependencies]
adora-log-utils = { workspace = true }

日志 Sink 示例

三个示例 sink 节点演示如何消费通过 send_logs_as 路由的日志并转发到外部目的地。

文件 Sink(examples/log-sink-file/

将多个节点的日志流合并为单个 JSONL 文件。适用于统一日志收集。

nodes:
  - id: sensor
    path: sensor.py
    send_logs_as: log_entries
    inputs:
      tick: adora/timer/millis/200
    outputs:
      - reading
      - log_entries

  - id: processor
    path: processor.py
    send_logs_as: log_entries
    inputs:
      reading: sensor/reading
    outputs:
      - result
      - log_entries

  - id: file_sink
    path: log-sink-file
    inputs:
      sensor_logs: sensor/log_entries
      processor_logs: processor/log_entries
    env:
      LOG_FILE: "./combined.jsonl"

文件 sink 从环境变量读取 LOG_FILE(默认 ./combined.jsonl),使用 adora_log_utils::parse_log_from_arrow() 解析每条传入的 Arrow 消息,格式化为 JSON 并追加到文件。

TCP Sink(examples/log-sink-tcp/

通过 TCP 套接字将日志条目转发到远程日志收集器。适用于缺少本地文件系统且需要将日志流式传输到设备外的嵌入式系统。

nodes:
  - id: source
    path: source.py
    send_logs_as: log_entries
    inputs:
      tick: adora/timer/millis/500
    outputs:
      - data
      - log_entries

  - id: tcp_sink
    path: log-sink-tcp
    inputs:
      logs: source/log_entries
    env:
      SINK_ADDR: "127.0.0.1:9876"

TCP sink 从环境变量读取 SINK_ADDR(默认 127.0.0.1:9876),启动时连接到服务器,并将每条日志条目作为 JSON 行发送。写入失败时自动重连。

告警路由器(examples/log-sink-alert/

按严重程度拆分传入的日志条目。所有日志转发到 all_logs 输出;仅 error 和 warn 日志转发到 alerts 输出。这使下游节点能够差异化处理告警(例如触发通知、写入专用文件)。

nodes:
  - id: source
    path: my_node.py
    send_stdout_as: log_entries
    inputs:
      tick: adora/timer/millis/200
    outputs:
      - log_entries

  - id: alert_router
    path: log-sink-alert
    inputs:
      logs: source/log_entries
    outputs:
      - all_logs
      - alerts

源节点使用 send_stdout_as 将其 stdout 行路由为 Arrow 字符串数据。路由器使用 adora_log_utils::parse_log_from_arrow() 解析每条日志条目,检查级别,并使用 node.send_output() 将数据转发到相应输出。使用节点 API 的节点也可以使用 send_logs_as 来路由 node.log() 的结构化日志。

构建自定义 Sink

要构建自己的 sink 节点,请遵循以下模式:

use adora_node_api::{AdoraNode, Event};

fn main() -> eyre::Result<()> {
    let (_node, mut events) = AdoraNode::init_from_env()?;

    while let Some(event) = events.recv() {
        match event {
            Event::Input { data, .. } => {
                let log = adora_log_utils::parse_log_from_arrow(&data)?;
                // 处理日志条目:写入文件、通过网络发送等
                let json = adora_log_utils::format_json(&log);
                println!("{json}");
            }
            Event::Stop(_) => break,
            _ => {}
        }
    }
    Ok(())
}

守护进程如何处理日志

理解内部流水线有助于调试和调优。守护进程为每个节点运行一个专用的异步任务,按顺序处理日志行:

Node Process (stdout/stderr)
    |
    v
[1] Capture: lines buffered in mpsc channel (capacity 100)
    |
    v
[2] send_stdout_as: raw line -> Arrow data -> dataflow output
    |
    v
[3] Parse: try JSON structured log, fall back to Stdout-level
    |
    v
[4] min_log_level filter: drop messages below threshold
    |
    v
[5] send_logs_as: LogMessage -> JSON -> Arrow data -> dataflow output
    |
    v
[6] Write JSONL: compact format to log file, track bytes written
    |
    v
[7] Rotation check: if bytes_written >= max_log_size, rotate files
    |
    v
[8] Forward: send LogMessage to display channel (unless ADORA_QUIET)
    |
    v
[9] Sync: fsync log file to disk

关键细节:

  • 步骤 2 在解析之前发生,因此 send_stdout_as 捕获每一行包括非结构化输出
  • 步骤 4 在步骤 5-8 之前发生,因此 min_log_level 会抑制所有下游处理的消息
  • 步骤 5 仅对成功解析的结构化日志触发(步骤 3 成功路径)
  • 步骤 8 发送到 flume 通道(adora run 直接模式)或协调器(分布式模式)
  • 步骤 9 每次写入后调用 sync_all(),以一定的 I/O 开销为代价确保持久性

结构化日志解析

当节点发出 JSON 格式的日志输出(例如使用 JSON 格式化的 tracing-subscriber)时,守护进程提取:

  • level:日志严重程度
  • message:日志文本
  • target:模块路径
  • timestamp:日志发出时间
  • fields:任意键值对
  • build_iddataflow_idnode_iddaemon_id:作为后备从字段中提取

守护进程还会在所有消息上设置 dataflow_idnode_iddaemon_id,以确保它们始终存在于日志文件中。


协调器日志流协议

当守护进程在协调器下运行(分布式模式)时,日志转发通过 WebSocket 工作:

  1. 守护进程 -> 协调器:每条 LogMessage 被封装在 DaemonEvent::Log(message) 中并通过守护进程的 WebSocket 连接发送
  2. 协调器存储:协调器存储/转发日志
  3. CLI 订阅:CLI 通过其 WebSocket 连接发送 ControlRequest::LogSubscribe { dataflow_id, level }
  4. 服务端过滤:协调器仅转发 msg_level <= subscription_level 的消息。这减少了过滤订阅的网络流量
  5. CLI 接收:消息作为序列化的 LogMessage 结构体到达

--level 标志映射到 log::LevelFilter

  • stdout -> LevelFilter::Trace(最宽松,接收所有内容)
  • info -> LevelFilter::Info(接收 Error、Warn、Info)
  • etc.

完整 YAML 参考

nodes:
  - id: sensor
    path: ./target/debug/sensor
    outputs:
      - data
      - raw_output       # 用于 send_stdout_as
      - log_entries       # 用于 send_logs_as

    # Source-level log filtering (daemon-side)
    min_log_level: info          # 抑制 debug/trace/stdout

    # Route stdout to dataflow
    send_stdout_as: raw_output   # 每行 stdout 变为数据消息

    # Route structured logs to dataflow
    send_logs_as: log_entries    # 解析的日志条目变为数据消息

    # Log file rotation
    max_log_size: "50MB"         # 文件超过 50MB 时轮转
    max_rotated_files: 5         # keep 5 rotated files (default, range 1-100)

    inputs:
      tick: adora/timer/millis/100

完整示例

examples/python-logging/ 目录包含一个可运行的三节点流水线,演练了每个日志功能:

sensor (noisy, high-volume) --> processor (structured logs) --> monitor (log aggregator)

数据流配置要点:

nodes:
  - id: sensor
    path: sensor.py
    min_log_level: info       # 在源端抑制 debug 噪声
    max_log_size: "1KB"       # 演示用小值(快速触发轮转)
    inputs:
      tick: adora/timer/millis/50
    outputs:
      - reading

  - id: processor
    path: processor.py
    send_logs_as: log_entries  # 将结构化日志路由为数据
    inputs:
      reading: sensor/reading
    outputs:
      - result
      - log_entries

  - id: monitor
    path: monitor.py
    inputs:
      logs: processor/log_entries
      reading: sensor/reading

每个节点演示的内容:

  • sensor – 混合使用 print()(原始 stdout)、logging.info()logging.debug()logging.warning()。设置 min_log_level: info 后,debug 消息在到达日志文件前被守护进程丢弃。设置 max_log_size: "1KB" 后,几秒钟后日志轮转开始。
  • processor – 使用 send_logs_as: log_entries 将其结构化日志条目路由为数据流数据。原始 print() 输出_不_会被路由(仅路由解析后的结构化条目)。
  • monitor – 订阅 processor/log_entries 并统计警告/错误,演示数据流内日志聚合。

直接模式adora run – 单进程,适合快速测试):

# Basic run
adora run examples/python-logging/dataflow.yml --stop-after 5s

# Only warnings and above
adora run examples/python-logging/dataflow.yml --log-level warn --stop-after 5s

# Per-node overrides
adora run examples/python-logging/dataflow.yml --log-filter "monitor=debug,sensor=warn" --stop-after 5s

# JSON output for machine parsing
adora run examples/python-logging/dataflow.yml --log-format json --stop-after 3s

# Environment variable control
ADORA_LOG_LEVEL=warn adora run examples/python-logging/dataflow.yml --stop-after 5s

分布式模式adora up + adora start – 协调器/守护进程架构,多机部署必需):

# Start infrastructure
adora up

# Start attached (live log stream)
adora start examples/python-logging/dataflow.yml --attach

# Or start detached and query logs separately
adora start examples/python-logging/dataflow.yml
adora logs <dataflow-id> sensor --follow                    # stream one node
adora logs <dataflow-id> sensor --follow --level warn       # only warnings
adora logs <dataflow-id> --all-nodes --tail 20              # last 20 lines
adora logs <dataflow-id> processor --grep "error" --since 5m  # targeted search

在分布式模式下,日志流经 节点 -> 守护进程 -> 协调器 -> CLI(通过 WebSocket)。协调器缓冲日志消息直到订阅者连接,因此即使延迟附加也不会丢失日志。YAML 级别设置(min_log_levelsend_logs_asmax_log_size)工作方式相同,因为它们在守护进程端应用。

adora runadora start
显示过滤--log-level--log-format--log-filteradora logs--level
按节点覆盖--log-filter "sensor=debug"每个节点单独 adora logs
远程节点
实时流始终附加--attachadora logs --follow

运行后日志分析(两种模式工作方式相同):

# Read all local logs
adora logs --local --all-nodes --tail 20

# Search for warnings in sensor logs
adora logs --local sensor --grep "high temp"

# Check that rotation created multiple files
ls -la out/*/log_sensor*.jsonl

使用场景

1. 调试嘈杂的传感器流水线

摄像头传感器节点用 debug 消息淹没了日志,使得难以看到其他节点的错误。

nodes:
  - id: camera
    path: ./target/debug/camera
    min_log_level: warn          # 在源端抑制 info/debug/trace
    max_log_size: "10MB"         # 限制磁盘使用

  - id: detector
    path: ./target/debug/detector

  - id: planner
    path: ./target/debug/planner
# During development: see everything from detector, only warnings from camera
adora run dataflow.yml --log-level debug --log-filter "camera=warn,detector=debug"

# In production: only errors
export ADORA_LOG_LEVEL=error
adora run dataflow.yml

发生了什么:

  • 摄像头节点的 debug/info 消息在到达日志文件前被守护进程丢弃(min_log_level: warn
  • CLI 根据 --log-filter 进一步过滤显示
  • 日志文件在 10MB 时轮转,摄像头节点最多在磁盘上保留 60MB

2. 数据流内日志聚合

构建数据流内的日志监控节点,监视多个节点的错误并发送告警。

nodes:
  - id: camera
    path: ./target/debug/camera
    send_logs_as: logs
    outputs:
      - frames
      - logs

  - id: detector
    path: ./target/debug/detector
    send_logs_as: logs
    outputs:
      - detections
      - logs

  - id: log-monitor
    path: ./target/debug/log-monitor
    inputs:
      camera_logs: camera/logs
      detector_logs: detector/logs
    outputs:
      - alerts

日志监控器中的节点端处理(使用 adora-log-utils):

#![allow(unused)]
fn main() {
use adora_node_api::{AdoraNode, Event};
use adora_message::common::{LogLevel, LogLevelOrStdout};

let (mut node, mut events) = AdoraNode::init_from_env()?;
while let Some(event) = events.recv() {
    match event {
        Event::Input { data, .. } => {
            let log = adora_log_utils::parse_log_from_arrow(&data)?;

            let is_error = matches!(log.level,
                LogLevelOrStdout::LogLevel(LogLevel::Error));

            if is_error || log.message.contains("timeout") {
                // 向下游发送告警
                node.send_output("alerts", /* ... */)?;
            }
        }
        Event::Stop(_) => break,
        _ => {}
    }
}
}

另请参阅日志 Sink 示例部分获取完整可运行示例。

3. 崩溃后的事后调试

数据流崩溃后,调查最后几分钟发生了什么。

# Find available dataflows
ls out/

# Read the last 50 lines from all nodes around the crash
adora logs --local --all-nodes --tail 50

# Focus on errors in the last 5 minutes
adora logs --local --all-nodes --since 5m --level error

# Search for a specific error pattern
adora logs --local --all-nodes --grep "out of memory"

# Drill into a specific node
adora logs --local detector --since 2m

# Export as JSON for external analysis
adora run dataflow.yml --log-format json 2>logs.json

4. 长时间运行的生产数据流

数据流运行数天或数周。没有日志轮转,磁盘空间会被填满。

nodes:
  - id: ingest
    path: ./target/debug/ingest
    min_log_level: info        # 生产环境无 debug 噪声
    max_log_size: "100MB"      # 每个节点最大约 600MB(100MB * 6)
    restart_policy: always
    inputs:
      tick: adora/timer/millis/1000
    outputs:
      - data

  - id: processor
    path: ./target/debug/processor
    min_log_level: warn        # 仅警告和错误
    max_log_size: "50MB"
    restart_policy: on-failure
    inputs:
      data: ingest/data
    outputs:
      - results

  - id: writer
    path: ./target/debug/writer
    min_log_level: error       # 最少日志
    max_log_size: "20MB"
    inputs:
      results: processor/results

磁盘预算:

  • ingest:最多 600MB(100MB x 6 个文件)
  • processor:最多 300MB(50MB x 6 个文件)
  • writer:最多 120MB(20MB x 6 个文件)
  • 总计:所有日志最大磁盘使用约 1GB

5. 分布式部署的实时监控

多个守护进程在不同机器上运行,从中央工作站进行监控。

# Start infrastructure (coordinator + local daemon)
adora up

# On remote machines, start a daemon pointing to the coordinator:
#   adora daemon --coordinator-addr 192.168.1.10

# Start the dataflow (detached)
adora start dataflow.yml

# Open targeted log streams in separate terminals:

# Terminal 1: all sensor warnings
adora logs <dataflow-id> sensor --follow --level warn

# Terminal 2: processor errors with text search
adora logs <dataflow-id> processor --follow --level error --grep "timeout"

# Terminal 3: all nodes merged
adora logs <dataflow-id> --all-nodes --follow

# Terminal 4: historical + live (errors from the last hour, then stream)
adora logs <dataflow-id> processor --since 1h --level error --follow

# Monitor a remote coordinator from another machine:
adora logs <dataflow-id> sensor --follow --coordinator-addr 192.168.1.10

内部工作原理:

  1. CLI 连接到协调器(默认 localhost:6013,或 --coordinator-addr
  2. 对于历史日志:请求/应答,过滤在客户端应用(--since--grep--tail
  3. 使用 --follow 时:向协调器打开 WebSocket 订阅
  4. 协调器在转发前按 --level 在服务端过滤(减少网络流量)
  5. CLI 在实时流上客户端应用 --grep--since
  6. 协调器缓冲日志消息直到订阅者连接,因此后加入的订阅者可以看到近期历史

6. 带结构化日志的 CI/CD 流水线

在 CI 中,使用 JSON 格式获取机器可解析的输出,使用 compact 格式获取可读日志。

# Machine-parseable logs for CI tooling
adora run dataflow.yml --log-format json --stop-after 30s 2>test-logs.json

# Compact logs for CI console output
adora run dataflow.yml --log-format compact --log-level info --stop-after 30s

# Post-run analysis: count errors per node
adora logs --local --all-nodes --level error | wc -l

使用 JSON 格式时,每行都是完整的 LogMessage,可由 jq、日志聚合器或自定义脚本处理:

# Extract error messages with jq
cat test-logs.json | jq -r 'select(.level == "ERROR") | "\(.node_id): \(.message)"'

Performance Considerations

Logging adds I/O overhead proportional to log volume. Here’s how to tune it:

min_log_level is the most impactful setting. It filters at the daemon before any I/O: no log file write, no coordinator forwarding, no send_logs_as routing. A node emitting 1000 debug lines/sec at min_log_level: info generates zero overhead for those lines.

send_logs_as adds a dataflow message per log line. Each parsed log entry is serialized to JSON, converted to Arrow, and sent through the dataflow. For high-volume nodes, this can consume significant bandwidth. Use min_log_level to limit what gets routed.

adora/logs subscribers share a single serialization. The daemon converts each log line to Arrow once and clones the result for each subscriber. The cost scales linearly with subscriber count, not log volume x subscriber count. For most dataflows (1-3 log subscribers), this is negligible.

Log line size is capped at 1 MB. Lines longer than 1 MB from node stdout/stderr are truncated to prevent heap exhaustion. This protects against buggy nodes that dump large binary data to stdout.

Log file rotation is recommended for long-running dataflows. Without max_log_size, log files grow unbounded. A node emitting 100 lines/sec at ~200 bytes/line fills 1 GB in ~14 hours.

Recommended production settings:

nodes:
  - id: my-node
    path: ./my-node
    min_log_level: info        # drop debug/trace at source
    max_log_size: "50MB"       # rotate at 50MB
    max_rotated_files: 5       # keep 5 rotated files (300MB max)

最佳实践

**在生产环境中设置 min_log_level。**守护进程端的源级别过滤防止 debug 噪声到达日志文件和网络。这是减少日志量最有效的方式,因为它在任何 I/O 之前进行过滤。

**对于长时间运行的数据流,始终设置 max_log_size。**没有轮转,单个嘈杂的节点就能填满磁盘。从 "50MB"(轮转后每节点总计 300MB)开始,根据存储预算调整。使用 max_rotated_files 调节保留多少历史记录(默认 5,范围 1-100)。

**使用环境变量设置团队默认值。**在 shell 配置文件或 CI 配置中设置 ADORA_LOG_LEVELADORA_LOG_FORMAT。个人开发者可以使用 CLI 标志覆盖。

**在开发期间使用 --log-filter。**无需更改 YAML 配置,使用按节点显示覆盖来聚焦正在调试的节点:--log-filter "my-node=debug"

**使用 send_logs_as 进行运维监控。**构建监控节点来监视错误模式、计算错误率或转发告警。这将监控逻辑保持在数据流图内。使用 adora-log-utils 在自定义 sink 节点中解析和格式化日志条目(参见 examples/log-sink-file/examples/log-sink-tcp/)。

对于结构化数据,优先使用 send_logs_as 而非 send_stdout_assend_stdout_as 捕获每行 stdout(包括原始 print),而 send_logs_as 仅捕获带完整元数据的已解析结构化日志条目。

**使用 --local 进行事后调试。**崩溃后,adora logs --local --all-nodes 无需运行中的协调器即可工作,并按时间顺序合并所有节点日志。

**组合 --since--grep 进行针对性调试。**无需滚动数千行,缩小窗口:adora logs --local sensor --since 5m --grep "error"

**在日志流水线中使用 JSON 格式。**向外部系统(ELK、Grafana Loki、Datadog)导入日志时,使用 --log-format json 进行结构化摄取。