Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

C API 参考

本文档涵盖 Adora 框架提供的两个 C API:用于独立 C 进程的节点 API 和用于 Adora 运行时加载的共享库算子的算子 API

目录


节点 API (adora-node-api-c)

头文件:apis/c/node/node_api.h Crate:adora-node-api-c(构建为 staticlib

节点 API 供作为外部进程参与 Adora 数据流的独立 C 可执行文件使用。守护进程生成进程并设置节点在初始化期间读取的环境变量。

初始化

init_adora_context_from_env

void *init_adora_context_from_env();

从守护进程设置的环境变量初始化 Adora 节点上下文。成功时返回指向上下文的不透明指针,失败时返回 NULL

返回的指针必须传递给所有期望上下文参数的后续节点 API 调用。节点完成后,使用 free_adora_context 释放。

free_adora_context

void free_adora_context(void *adora_context);

释放先前由 init_adora_context_from_env 创建的上下文。每个上下文必须恰好释放一次。释放后不得再次使用该指针。

事件循环

adora_next_event

void *adora_next_event(void *adora_context);

阻塞直到此节点有可用的下一个事件。返回指向事件的不透明指针,当所有事件流关闭时返回 NULL(表示节点应退出)。

返回的指针不能直接解引用。使用 read_adora_* 函数提取事件类型和载荷。完成后使用 free_adora_event 释放事件。

free_adora_event

void free_adora_event(void *adora_event);

释放先前由 adora_next_event 返回的事件。每个事件必须恰好释放一次。释放后,事件指针和所有派生指针(来自 read_adora_input_idread_adora_input_data)都将失效。

事件检查

read_adora_event_type

enum AdoraEventType read_adora_event_type(void *adora_event);

返回给定事件的类型。可能的值请参见 AdoraEventType

read_adora_input_id

void read_adora_input_id(void *adora_event, char **out_ptr, size_t *out_len);

AdoraEventType_Input 事件读取输入 ID。将字符串起始指针写入 *out_ptr,字节长度写入 *out_len。字符串是有效的 UTF-8 但以 null 结尾;使用 out_len 确定其边界。

如果事件不是输入事件,则设置 *out_ptr = NULL*out_len = 0

返回的指针借用自事件。调用 free_adora_event 后将失效。

read_adora_input_data

void read_adora_input_data(void *adora_event, char **out_ptr, size_t *out_len);

AdoraEventType_Input 事件读取原始数据字节。将数据起始指针写入 *out_ptr,字节长度写入 *out_len

如果事件不是输入事件或输入不携带数据,则设置 *out_ptr = NULL*out_len = 0

目前仅支持 UInt8 Arrow 数组。其他 Arrow 数据类型会导致运行时 panic。未来版本将使用 Arrow C Data Interface 以支持所有类型。

返回的指针借用自事件。调用 free_adora_event 后将失效。

read_adora_input_timestamp

unsigned long long read_adora_input_timestamp(void *adora_event);

uint64 值形式返回输入事件元数据中的混合逻辑时钟时间戳。如果事件不是输入事件则返回 0

Output

adora_send_output

int adora_send_output(
    void *adora_context,
    const char *id_ptr,
    size_t id_len,
    const char *data_ptr,
    size_t data_len
);

向所有下游订阅者发送输出数据。输出 ID(id_ptr/id_len)必须是有效的 UTF-8 字符串,与数据流 YAML 中节点声明的某个输出匹配。数据(data_ptr/data_len)作为原始字节(UInt8 Arrow 数组)发送。

成功返回 0,错误返回 -1。错误通过 tracing 记录。

如果任何指针参数为 NULL,则立即返回 -1

日志

adora_log

int adora_log(
    void *adora_context,
    const char *level_ptr,
    size_t level_len,
    const char *msg_ptr,
    size_t msg_len
);

通过 Adora 日志管道发送结构化日志消息。levelmsg 都必须是有效的 UTF-8 字符串。

有效的日志级别:"error""warn""info""debug""trace"

成功返回 0,错误返回 -1。如果任何指针参数为 NULL,则立即返回 -1

Enums

AdoraEventType

enum AdoraEventType {
    AdoraEventType_Stop,        // 请求优雅关闭
    AdoraEventType_Input,       // 有新输入数据可用
    AdoraEventType_InputClosed, // 输入流已关闭
    AdoraEventType_Error,       // 发生错误
    AdoraEventType_Unknown,     // 无法识别的事件类型
};

算子 API (adora-operator-api-c)

头文件:apis/c/operator/operator_api.hapis/c/operator/operator_types.h Crate:adora-operator-api-c

算子 API 供加载到 Adora 运行时进程中的共享库(.so/.dylib/.dll)使用。与节点不同,算子没有自己的 main 函数。它们导出三个函数,由运行时在适当的生命周期点调用。

operator_types.h 头文件由 safer-ffi 自动生成,定义了所有 C 兼容的结构体和枚举类型。

生命周期函数

adora_init_operator

AdoraInitResult_t adora_init_operator(void);

运行时加载算子时调用一次。分配并初始化所有算子状态,然后通过 operator_context 字段返回。运行时在每次后续调用中传回此指针。

成功时返回 .result.error = NULLAdoraInitResult_t

adora_drop_operator

AdoraResult_t adora_drop_operator(void *operator_context);

算子被卸载时调用一次。释放与 operator_context 关联的所有资源。

成功时返回 .error = NULLAdoraResult_t

事件处理

adora_on_event

OnEventResult_t adora_on_event(
    RawEvent_t *event,
    const SendOutput_t *send_output,
    void *operator_context
);

每当此算子有事件到达时由运行时调用。检查 event 字段以确定事件类型:

FieldMeaning
event->input != NULL有新输入可用
event->stop == true请求优雅关闭
event->error.ptr != NULL发生错误(UTF-8 字符串位于 error.ptr/error.len
event->input_closed.ptr != NULL输入流已关闭(输入 ID 位于 input_closed.ptr/input_closed.len

使用 send_output 向下游节点发送数据(见 adora_send_operator_output)。返回带有适当 AdoraStatus_tOnEventResult_t 以控制算子生命周期。

输入读取

adora_read_input_id

char *adora_read_input_id(const Input_t *input);

返回新分配的以 null 结尾的字符串,包含输入 ID。调用者必须使用 adora_free_input_id 释放。

adora_read_data

Vec_uint8_t adora_read_data(Input_t *input);

将输入数据作为字节数组读取。从输入中消费底层 Arrow 数组(每个事件只能读取一次数据)。如果输入没有数据或数据已被消费,则返回 .ptr = NULLVec_uint8_t

调用者必须使用 adora_free_data 释放返回的数据。

输出发送

adora_send_operator_output

AdoraResult_t adora_send_operator_output(
    const SendOutput_t *send_output,
    const char *id,
    const uint8_t *data_ptr,
    size_t data_len
);

向下游订阅者发送输出数据。id 必须是以 null 结尾的字符串,与算子声明的某个输出匹配。数据(data_ptr/data_len)在内部被转换为 UInt8 Arrow 数组。

成功时返回 .error = NULLAdoraResult_t

内存管理

算子 API 分配的内存必须由调用者使用相应的函数释放:

分配来源释放函数
adora_read_input_idadora_free_input_id
adora_read_dataadora_free_data
void adora_free_input_id(char *input_id);
void adora_free_data(Vec_uint8_t data);

不调用这些函数会导致内存泄漏。不要对这些分配使用 free()——它们由 Rust 运行时分配,必须通过 API 释放。

Structs

Vec_uint8_t

typedef struct Vec_uint8 {
    uint8_t *ptr;
    size_t len;
    size_t cap;
} Vec_uint8_t;

Rust 分配的字节向量。从 ptr 开始访问 len 个字节。不要修改 cap。使用 adora_free_data 释放。

AdoraResult_t

typedef struct AdoraResult {
    Vec_uint8_t *error;  // 成功时为 NULL,失败时指向错误字符串
} AdoraResult_t;

通用结果类型。NULL 错误指针表示成功。非 NULL 时,错误指针包含 UTF-8 错误消息。

AdoraInitResult_t

typedef struct AdoraInitResult {
    AdoraResult_t result;
    void *operator_context;  // 算子状态的不透明指针
} AdoraInitResult_t;

adora_init_operator 返回。成功时 result.errorNULLoperator_context 持有算子状态指针。

OnEventResult_t

typedef struct OnEventResult {
    AdoraResult_t result;
    AdoraStatus_t status;
} OnEventResult_t;

adora_on_event 返回。包含错误/成功结果和控制算子生命周期的状态码。

RawEvent_t

typedef struct RawEvent {
    Input_t *input;           // 输入事件时为非 NULL
    Vec_uint8_t input_closed; // 输入流关闭时为非空
    bool stop;                // 请求关闭时为 true
    Vec_uint8_t error;        // 错误时为非空
} RawEvent_t;

表示传递给算子的事件。多个字段可能同时被设置;按优先级顺序检查它们。

Input_t

typedef struct Input Input_t;  // 不透明

表示输入事件数据的不透明类型。使用 adora_read_input_idadora_read_data 提取其内容。

Output_t

typedef struct Output Output_t;  // 不透明

adora_send_operator_output 内部使用的不透明类型。不由用户代码直接创建。

SendOutput_t

typedef struct SendOutput {
    ArcDynFn1_AdoraResult_Output_t send_output;
} SendOutput_t;

传递给 adora_on_event 的回调句柄。将其传递给 adora_send_operator_output 以发送数据。不要将其存储在当前 adora_on_event 调用的范围之外。

Metadata_t

typedef struct Metadata {
    Vec_uint8_t open_telemetry_context;
} Metadata_t;

包含 OpenTelemetry 追踪上下文字符串的事件元数据。

算子枚举

AdoraStatus_t

enum AdoraStatus {
    ADORA_STATUS_CONTINUE = 0,  // 继续运行
    ADORA_STATUS_STOP     = 1,  // 停止此算子
    ADORA_STATUS_STOP_ALL = 2,  // 停止整个数据流
};
typedef uint8_t AdoraStatus_t;

OnEventResult_t 中返回,用于在处理事件后控制算子生命周期。


节点示例

一个接收定时器 tick 并发送输出消息的完整 C 节点:

#include <stdio.h>
#include <string.h>
#include "node_api.h"

int main() {
    void *ctx = init_adora_context_from_env();
    if (ctx == NULL) {
        fprintf(stderr, "failed to init adora context\n");
        return 1;
    }

    for (int i = 0; i < 100; i++) {
        void *event = adora_next_event(ctx);
        if (event == NULL)
            break;  // 所有流已关闭

        enum AdoraEventType ty = read_adora_event_type(event);

        if (ty == AdoraEventType_Input) {
            char *id;
            size_t id_len;
            read_adora_input_id(event, &id, &id_len);

            // 发送响应
            char out_id[] = "message";
            char out_data[64];
            int out_len = snprintf(out_data, sizeof(out_data),
                                   "iteration %d", i);

            adora_send_output(ctx, out_id, strlen(out_id),
                              out_data, out_len);
        } else if (ty == AdoraEventType_Stop) {
            free_adora_event(event);
            break;
        }

        free_adora_event(event);
    }

    free_adora_context(ctx);
    return 0;
}

节点的数据流 YAML:

nodes:
  - id: c_node
    path: build/c_node
    inputs:
      timer: adora/timer/millis/100
    outputs:
      - message

算子示例

一个读取输入、维护状态并发送输出的完整 C 算子:

#include "operator_api.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

AdoraInitResult_t adora_init_operator(void) {
    // 分配算子状态(一个简单的计数器)
    int *counter = (int *)calloc(1, sizeof(int));

    AdoraInitResult_t result = {.operator_context = counter};
    return result;
}

AdoraResult_t adora_drop_operator(void *operator_context) {
    free(operator_context);
    AdoraResult_t result = {.error = NULL};
    return result;
}

OnEventResult_t adora_on_event(
    RawEvent_t *event,
    const SendOutput_t *send_output,
    void *operator_context)
{
    OnEventResult_t result = {.status = ADORA_STATUS_CONTINUE};
    int *counter = (int *)operator_context;

    if (event->input != NULL) {
        char *id = adora_read_input_id(event->input);
        Vec_uint8_t data = adora_read_data(event->input);

        if (data.ptr != NULL) {
            *counter += 1;
            printf("received input '%s', counter: %d\n", id, *counter);

            // 将计数器值作为字符串发送
            char buf[64];
            int len = snprintf(buf, sizeof(buf), "count=%d", *counter);
            result.result = adora_send_operator_output(
                send_output, "counter", (uint8_t *)buf, len);

            adora_free_data(data);
        }

        adora_free_input_id(id);
    }

    if (event->stop) {
        result.status = ADORA_STATUS_STOP;
    }

    return result;
}

算子的数据流 YAML:

nodes:
  - id: runtime-node
    operators:
      - id: c_operator
        shared-library: build/operator
        inputs:
          data: source_node/output
        outputs:
          - counter

构建和链接

节点(静态库)

C 节点链接 adora-node-api-c,它构建为静态库。

步骤 1:构建静态库

cargo build -p adora-node-api-c --release

这会生成 target/release/libadora_node_api_c.a(Windows 上为 .lib)。

步骤 2:编译和链接

clang node.c -ladora_node_api_c -L ../../target/release -o build/c_node <FLAGS>

平台特定的链接器标志:

PlatformFlags
Linux-lm -lrt -ldl -pthread
macOS-framework CoreServices -framework Security -lSystem -lresolv -lpthread -lc -lm
Windows-ladvapi32 -luserenv -lkernel32 -lws2_32 -lbcrypt -lncrypt -lschannel -lntdll -liphlpapi -lcfgmgr32 -lcredui -lcrypt32 -lcryptnet -lfwpuclnt -lgdi32 -lmsimg32 -lmswsock -lole32 -lopengl32 -lsecur32 -lshell32 -lsynchronization -luser32 -lwinspool -Wl,-nodefaultlib:libcmt -D_DLL -lmsvcrt

在 Windows 上,为输出文件添加 .exe 扩展名。

算子(共享库)

C 算子被编译为共享库,由 Adora 运行时在启动时加载。

步骤 1:编译为目标文件

clang -c operator.c -o build/operator.o -fdeclspec -fPIC

Windows 上省略 -fPIC

步骤 2:链接为共享库

# Linux
clang -shared build/operator.o -o build/liboperator.so

# macOS
clang -shared build/operator.o -o build/liboperator.dylib

# Windows
clang -shared build/operator.o -o build/operator.dll

步骤 3:在数据流 YAML 中引用

operators:
  - id: c_operator
    shared-library: build/operator   # 不含 lib 前缀或扩展名
    inputs:
      data: source/output
    outputs:
      - result

shared-library 路径省略平台特定的前缀(lib)和扩展名(.so/.dylib/.dll)。运行时为当前平台解析正确的文件。

头文件路径

节点 API 头文件位于 apis/c/node/node_api.h。算子 API 头文件位于 apis/c/operator/operator_api.hapis/c/operator/operator_types.h。相应调整你的头文件路径:

# Node
clang -I path/to/adora/apis/c/node node.c ...

# Operator
clang -I path/to/adora/apis/c/operator operator.c ...

C++ 兼容性

两组头文件都包含 extern "C" 保护(算子头文件中)或使用 C 兼容的声明(节点头文件中),因此可以直接从 C++ 源文件包含。