C API 参考
本文档涵盖 Adora 框架提供的两个 C API:用于独立 C 进程的节点 API 和用于 Adora 运行时加载的共享库算子的算子 API。
目录
节点 API (adora-node-api-c)
头文件:apis/c/node/node_api.h Crate:adora-node-api-c(构建为 staticlib)
节点 API 供作为外部进程参与 Adora 数据流的独立 C 可执行文件使用。守护进程生成进程并设置节点在初始化期间读取的环境变量。
初始化
init_adora_context_from_env
void *init_adora_context_from_env();
从守护进程设置的环境变量初始化 Adora 节点上下文。成功时返回指向上下文的不透明指针,失败时返回 NULL。
返回的指针必须传递给所有期望上下文参数的后续节点 API 调用。节点完成后,使用 free_adora_context 释放。
free_adora_context
void free_adora_context(void *adora_context);
释放先前由 init_adora_context_from_env 创建的上下文。每个上下文必须恰好释放一次。释放后不得再次使用该指针。
事件循环
adora_next_event
void *adora_next_event(void *adora_context);
阻塞直到此节点有可用的下一个事件。返回指向事件的不透明指针,当所有事件流关闭时返回 NULL(表示节点应退出)。
返回的指针不能直接解引用。使用 read_adora_* 函数提取事件类型和载荷。完成后使用 free_adora_event 释放事件。
free_adora_event
void free_adora_event(void *adora_event);
释放先前由 adora_next_event 返回的事件。每个事件必须恰好释放一次。释放后,事件指针和所有派生指针(来自 read_adora_input_id、read_adora_input_data)都将失效。
事件检查
read_adora_event_type
enum AdoraEventType read_adora_event_type(void *adora_event);
返回给定事件的类型。可能的值请参见 AdoraEventType。
read_adora_input_id
void read_adora_input_id(void *adora_event, char **out_ptr, size_t *out_len);
从 AdoraEventType_Input 事件读取输入 ID。将字符串起始指针写入 *out_ptr,字节长度写入 *out_len。字符串是有效的 UTF-8 但不以 null 结尾;使用 out_len 确定其边界。
如果事件不是输入事件,则设置 *out_ptr = NULL 和 *out_len = 0。
返回的指针借用自事件。调用 free_adora_event 后将失效。
read_adora_input_data
void read_adora_input_data(void *adora_event, char **out_ptr, size_t *out_len);
从 AdoraEventType_Input 事件读取原始数据字节。将数据起始指针写入 *out_ptr,字节长度写入 *out_len。
如果事件不是输入事件或输入不携带数据,则设置 *out_ptr = NULL 和 *out_len = 0。
目前仅支持 UInt8 Arrow 数组。其他 Arrow 数据类型会导致运行时 panic。未来版本将使用 Arrow C Data Interface 以支持所有类型。
返回的指针借用自事件。调用 free_adora_event 后将失效。
read_adora_input_timestamp
unsigned long long read_adora_input_timestamp(void *adora_event);
以 uint64 值形式返回输入事件元数据中的混合逻辑时钟时间戳。如果事件不是输入事件则返回 0。
Output
adora_send_output
int adora_send_output(
void *adora_context,
const char *id_ptr,
size_t id_len,
const char *data_ptr,
size_t data_len
);
向所有下游订阅者发送输出数据。输出 ID(id_ptr/id_len)必须是有效的 UTF-8 字符串,与数据流 YAML 中节点声明的某个输出匹配。数据(data_ptr/data_len)作为原始字节(UInt8 Arrow 数组)发送。
成功返回 0,错误返回 -1。错误通过 tracing 记录。
如果任何指针参数为 NULL,则立即返回 -1。
日志
adora_log
int adora_log(
void *adora_context,
const char *level_ptr,
size_t level_len,
const char *msg_ptr,
size_t msg_len
);
通过 Adora 日志管道发送结构化日志消息。level 和 msg 都必须是有效的 UTF-8 字符串。
有效的日志级别:"error"、"warn"、"info"、"debug"、"trace"。
成功返回 0,错误返回 -1。如果任何指针参数为 NULL,则立即返回 -1。
Enums
AdoraEventType
enum AdoraEventType {
AdoraEventType_Stop, // 请求优雅关闭
AdoraEventType_Input, // 有新输入数据可用
AdoraEventType_InputClosed, // 输入流已关闭
AdoraEventType_Error, // 发生错误
AdoraEventType_Unknown, // 无法识别的事件类型
};
算子 API (adora-operator-api-c)
头文件:apis/c/operator/operator_api.h、apis/c/operator/operator_types.h Crate:adora-operator-api-c
算子 API 供加载到 Adora 运行时进程中的共享库(.so/.dylib/.dll)使用。与节点不同,算子没有自己的 main 函数。它们导出三个函数,由运行时在适当的生命周期点调用。
operator_types.h 头文件由 safer-ffi 自动生成,定义了所有 C 兼容的结构体和枚举类型。
生命周期函数
adora_init_operator
AdoraInitResult_t adora_init_operator(void);
运行时加载算子时调用一次。分配并初始化所有算子状态,然后通过 operator_context 字段返回。运行时在每次后续调用中传回此指针。
成功时返回 .result.error = NULL 的 AdoraInitResult_t。
adora_drop_operator
AdoraResult_t adora_drop_operator(void *operator_context);
算子被卸载时调用一次。释放与 operator_context 关联的所有资源。
成功时返回 .error = NULL 的 AdoraResult_t。
事件处理
adora_on_event
OnEventResult_t adora_on_event(
RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context
);
每当此算子有事件到达时由运行时调用。检查 event 字段以确定事件类型:
| Field | Meaning |
|---|---|
event->input != NULL | 有新输入可用 |
event->stop == true | 请求优雅关闭 |
event->error.ptr != NULL | 发生错误(UTF-8 字符串位于 error.ptr/error.len) |
event->input_closed.ptr != NULL | 输入流已关闭(输入 ID 位于 input_closed.ptr/input_closed.len) |
使用 send_output 向下游节点发送数据(见 adora_send_operator_output)。返回带有适当 AdoraStatus_t 的 OnEventResult_t 以控制算子生命周期。
输入读取
adora_read_input_id
char *adora_read_input_id(const Input_t *input);
返回新分配的以 null 结尾的字符串,包含输入 ID。调用者必须使用 adora_free_input_id 释放。
adora_read_data
Vec_uint8_t adora_read_data(Input_t *input);
将输入数据作为字节数组读取。从输入中消费底层 Arrow 数组(每个事件只能读取一次数据)。如果输入没有数据或数据已被消费,则返回 .ptr = NULL 的 Vec_uint8_t。
调用者必须使用 adora_free_data 释放返回的数据。
输出发送
adora_send_operator_output
AdoraResult_t adora_send_operator_output(
const SendOutput_t *send_output,
const char *id,
const uint8_t *data_ptr,
size_t data_len
);
向下游订阅者发送输出数据。id 必须是以 null 结尾的字符串,与算子声明的某个输出匹配。数据(data_ptr/data_len)在内部被转换为 UInt8 Arrow 数组。
成功时返回 .error = NULL 的 AdoraResult_t。
内存管理
算子 API 分配的内存必须由调用者使用相应的函数释放:
| 分配来源 | 释放函数 |
|---|---|
adora_read_input_id | adora_free_input_id |
adora_read_data | adora_free_data |
void adora_free_input_id(char *input_id);
void adora_free_data(Vec_uint8_t data);
不调用这些函数会导致内存泄漏。不要对这些分配使用 free()——它们由 Rust 运行时分配,必须通过 API 释放。
Structs
Vec_uint8_t
typedef struct Vec_uint8 {
uint8_t *ptr;
size_t len;
size_t cap;
} Vec_uint8_t;
Rust 分配的字节向量。从 ptr 开始访问 len 个字节。不要修改 cap。使用 adora_free_data 释放。
AdoraResult_t
typedef struct AdoraResult {
Vec_uint8_t *error; // 成功时为 NULL,失败时指向错误字符串
} AdoraResult_t;
通用结果类型。NULL 错误指针表示成功。非 NULL 时,错误指针包含 UTF-8 错误消息。
AdoraInitResult_t
typedef struct AdoraInitResult {
AdoraResult_t result;
void *operator_context; // 算子状态的不透明指针
} AdoraInitResult_t;
由 adora_init_operator 返回。成功时 result.error 为 NULL,operator_context 持有算子状态指针。
OnEventResult_t
typedef struct OnEventResult {
AdoraResult_t result;
AdoraStatus_t status;
} OnEventResult_t;
由 adora_on_event 返回。包含错误/成功结果和控制算子生命周期的状态码。
RawEvent_t
typedef struct RawEvent {
Input_t *input; // 输入事件时为非 NULL
Vec_uint8_t input_closed; // 输入流关闭时为非空
bool stop; // 请求关闭时为 true
Vec_uint8_t error; // 错误时为非空
} RawEvent_t;
表示传递给算子的事件。多个字段可能同时被设置;按优先级顺序检查它们。
Input_t
typedef struct Input Input_t; // 不透明
表示输入事件数据的不透明类型。使用 adora_read_input_id 和 adora_read_data 提取其内容。
Output_t
typedef struct Output Output_t; // 不透明
由 adora_send_operator_output 内部使用的不透明类型。不由用户代码直接创建。
SendOutput_t
typedef struct SendOutput {
ArcDynFn1_AdoraResult_Output_t send_output;
} SendOutput_t;
传递给 adora_on_event 的回调句柄。将其传递给 adora_send_operator_output 以发送数据。不要将其存储在当前 adora_on_event 调用的范围之外。
Metadata_t
typedef struct Metadata {
Vec_uint8_t open_telemetry_context;
} Metadata_t;
包含 OpenTelemetry 追踪上下文字符串的事件元数据。
算子枚举
AdoraStatus_t
enum AdoraStatus {
ADORA_STATUS_CONTINUE = 0, // 继续运行
ADORA_STATUS_STOP = 1, // 停止此算子
ADORA_STATUS_STOP_ALL = 2, // 停止整个数据流
};
typedef uint8_t AdoraStatus_t;
在 OnEventResult_t 中返回,用于在处理事件后控制算子生命周期。
节点示例
一个接收定时器 tick 并发送输出消息的完整 C 节点:
#include <stdio.h>
#include <string.h>
#include "node_api.h"
int main() {
void *ctx = init_adora_context_from_env();
if (ctx == NULL) {
fprintf(stderr, "failed to init adora context\n");
return 1;
}
for (int i = 0; i < 100; i++) {
void *event = adora_next_event(ctx);
if (event == NULL)
break; // 所有流已关闭
enum AdoraEventType ty = read_adora_event_type(event);
if (ty == AdoraEventType_Input) {
char *id;
size_t id_len;
read_adora_input_id(event, &id, &id_len);
// 发送响应
char out_id[] = "message";
char out_data[64];
int out_len = snprintf(out_data, sizeof(out_data),
"iteration %d", i);
adora_send_output(ctx, out_id, strlen(out_id),
out_data, out_len);
} else if (ty == AdoraEventType_Stop) {
free_adora_event(event);
break;
}
free_adora_event(event);
}
free_adora_context(ctx);
return 0;
}
节点的数据流 YAML:
nodes:
- id: c_node
path: build/c_node
inputs:
timer: adora/timer/millis/100
outputs:
- message
算子示例
一个读取输入、维护状态并发送输出的完整 C 算子:
#include "operator_api.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
AdoraInitResult_t adora_init_operator(void) {
// 分配算子状态(一个简单的计数器)
int *counter = (int *)calloc(1, sizeof(int));
AdoraInitResult_t result = {.operator_context = counter};
return result;
}
AdoraResult_t adora_drop_operator(void *operator_context) {
free(operator_context);
AdoraResult_t result = {.error = NULL};
return result;
}
OnEventResult_t adora_on_event(
RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{
OnEventResult_t result = {.status = ADORA_STATUS_CONTINUE};
int *counter = (int *)operator_context;
if (event->input != NULL) {
char *id = adora_read_input_id(event->input);
Vec_uint8_t data = adora_read_data(event->input);
if (data.ptr != NULL) {
*counter += 1;
printf("received input '%s', counter: %d\n", id, *counter);
// 将计数器值作为字符串发送
char buf[64];
int len = snprintf(buf, sizeof(buf), "count=%d", *counter);
result.result = adora_send_operator_output(
send_output, "counter", (uint8_t *)buf, len);
adora_free_data(data);
}
adora_free_input_id(id);
}
if (event->stop) {
result.status = ADORA_STATUS_STOP;
}
return result;
}
算子的数据流 YAML:
nodes:
- id: runtime-node
operators:
- id: c_operator
shared-library: build/operator
inputs:
data: source_node/output
outputs:
- counter
构建和链接
节点(静态库)
C 节点链接 adora-node-api-c,它构建为静态库。
步骤 1:构建静态库
cargo build -p adora-node-api-c --release
这会生成 target/release/libadora_node_api_c.a(Windows 上为 .lib)。
步骤 2:编译和链接
clang node.c -ladora_node_api_c -L ../../target/release -o build/c_node <FLAGS>
平台特定的链接器标志:
| Platform | Flags |
|---|---|
| Linux | -lm -lrt -ldl -pthread |
| macOS | -framework CoreServices -framework Security -lSystem -lresolv -lpthread -lc -lm |
| Windows | -ladvapi32 -luserenv -lkernel32 -lws2_32 -lbcrypt -lncrypt -lschannel -lntdll -liphlpapi -lcfgmgr32 -lcredui -lcrypt32 -lcryptnet -lfwpuclnt -lgdi32 -lmsimg32 -lmswsock -lole32 -lopengl32 -lsecur32 -lshell32 -lsynchronization -luser32 -lwinspool -Wl,-nodefaultlib:libcmt -D_DLL -lmsvcrt |
在 Windows 上,为输出文件添加 .exe 扩展名。
算子(共享库)
C 算子被编译为共享库,由 Adora 运行时在启动时加载。
步骤 1:编译为目标文件
clang -c operator.c -o build/operator.o -fdeclspec -fPIC
Windows 上省略 -fPIC。
步骤 2:链接为共享库
# Linux
clang -shared build/operator.o -o build/liboperator.so
# macOS
clang -shared build/operator.o -o build/liboperator.dylib
# Windows
clang -shared build/operator.o -o build/operator.dll
步骤 3:在数据流 YAML 中引用
operators:
- id: c_operator
shared-library: build/operator # 不含 lib 前缀或扩展名
inputs:
data: source/output
outputs:
- result
shared-library 路径省略平台特定的前缀(lib)和扩展名(.so/.dylib/.dll)。运行时为当前平台解析正确的文件。
头文件路径
节点 API 头文件位于 apis/c/node/node_api.h。算子 API 头文件位于 apis/c/operator/operator_api.h 和 apis/c/operator/operator_types.h。相应调整你的头文件路径:
# Node
clang -I path/to/adora/apis/c/node node.c ...
# Operator
clang -I path/to/adora/apis/c/operator operator.c ...
C++ 兼容性
两组头文件都包含 extern "C" 保护(算子头文件中)或使用 C 兼容的声明(节点头文件中),因此可以直接从 C++ 源文件包含。