跳过主要内容

C API

算子

算子 API 是供您实现的框架。 实现的算子将由 dora 管理。 该框架使我们能够进行优化并提供高级特性。

算子定义由 3 个函数组成,dora_init_operator用于初始化算子及其上下文。 dora_drop_operator 用于释放内存,dora_on_event 用于在接收输入时算子的动作逻辑。

#include "../../apis/c/operator/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

DoraInitResult_t dora_init_operator(void)
{
void *context = malloc(1);
char *context_char = (char *)context;
*context_char = 0;

DoraInitResult_t result = {.operator_context = context};
return result;
}

DoraResult_t dora_drop_operator(void *operator_context)
{
free(operator_context);

DoraResult_t result = {};
return result;
}

OnEventResult_t dora_on_event(
const RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{

试试看!

  • 建立一个 operator.c 文件:
#include "../../apis/c/operator/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

DoraInitResult_t dora_init_operator(void)
{
void *context = malloc(1);
char *context_char = (char *)context;
*context_char = 0;

DoraInitResult_t result = {.operator_context = context};
return result;
}

DoraResult_t dora_drop_operator(void *operator_context)
{
free(operator_context);

DoraResult_t result = {};
return result;
}

OnEventResult_t dora_on_event(
const RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{
char *counter = (char *)operator_context;

if (event->input != NULL)
{
// input event
Input_t *input = event->input;

char id[input->id.len + 1];
memcpy(id, input->id.ptr, input->id.len);
id[input->id.len] = 0;

if (strcmp(id, "message") == 0)
{
char data[input->data.len + 1];
memcpy(data, input->data.ptr, input->data.len);
data[input->data.len] = 0;

*counter += 1;
printf("C operator received message `%s`, counter: %i\n", data, *counter);

char *out_id = "counter";
char *out_id_heap = strdup(out_id);

int data_alloc_size = 100;
char *out_data = (char *)malloc(data_alloc_size);
int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter);
assert(count >= 0 && count < 100);

Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}};
DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output);

OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE};
return result;
}
else
{
printf("C operator received unexpected input %s, context: %i\n", id, *counter);
}
}
if (event->stop)
{
printf("C operator received stop event\n");
}

OnEventResult_t result = {.status = DORA_STATUS_CONTINUE};
return result;
}

建立算子:

  • 编译 operator.c 文件到一个共享库

    • 例如,使用如下命令:
      clang -c operator.c -o build/operator.o -fdeclspec -fPIC
      clang -shared build/operator.o -o build/liboperator.so
      在 Windows 上省略 -fPIC 参数。 使用匹配你的OS的共享库标准库的 前缀/扩展名 替换 liboperator.so 名字,如:在Windows 是 .dll
  • 在您的图里链接为:

  - id: runtime-node
operators:
- id: c_operator
shared-library: build/operator
inputs:
message: c_node/message
outputs:
- counter

自定义节点

自定义节点可以使您集成 dora 到您的应用中。 它允许您以任何您想要的方式检索输入和发送输出。

init_dora_context_from_env

init_dora_context_from_env 从环境变量设置 dora-coordinator 初始化一个节点。

void *dora_context = init_dora_context_from_env();

dora_next_event

dora_next_event 等待下一个事件(如:输入事件)。 使用 read_dora_event_type 读到事件类型。 输入类型为 DoraEventType_Input 。 提取输入事件的 ID 和数据,在返回的指针上使用 read_dora_input_idread_dora_input_data。 忽略任何事件并仅处理与节点相关的事件是安全的。

void *input = dora_next_input(dora_context);

// 以 UTF8 编码字符串的形式读出 ID
char *id;
size_t id_len;
read_dora_input_id(input, &id, &id_len);

// 以字节数组的形式读出数据
char *data;
size_t data_len;
read_dora_input_data(input, &data, &data_len);

dora_send_output

dora_send_output 从节点发送数据。

char out_id[] = "tick";
char out_data[50];
dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len);

试试看!

  • 建立一个 node.c 文件。
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "../../apis/c/node/node_api.h"

// sleep
#ifdef _WIN32
#include <Windows.h>
#else
#include <unistd.h>
#endif

int main()
{
printf("[c node] Hello World\n");

void *dora_context = init_dora_context_from_env();
if (dora_context == NULL)
{
fprintf(stderr, "failed to init dora context\n");
return -1;
}

printf("[c node] dora context initialized\n");

for (char i = 0; i < 100; i++)
{
void *event = dora_next_event(dora_context);
if (event == NULL)
{
printf("[c node] ERROR: unexpected end of event\n");
return -1;
}

enum DoraEventType ty = read_dora_event_type(event);

if (ty == DoraEventType_Input)
{
char *data;
size_t data_len;
read_dora_input_data(event, &data, &data_len);

assert(data_len == 0);

char out_id[] = "message";
char out_data[50];
int out_data_len = sprintf(out_data, "loop iteration %d", i);

dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len);
}
else if (ty == DoraEventType_Stop)
{
printf("[c node] received stop event\n");
}
else
{
printf("[c node] received unexpected event: %d\n", ty);
}

free_dora_event(event);
}

printf("[c node] received 10 events\n");

free_dora_context(dora_context);

printf("[c node] finished successfully\n");

return 0;
}

建立自定义节点:

  • 在这个目录,建 立一个 build 文件夹 (即, 下一个文件 node.c )

  • 编译 dora-node-api-c crate 到一个静态库。

    • 运行 cargo build -p dora-node-api-c --release
    • 可用的静态库结果生成在 ../../target/release/libdora-node-api-c.a
  • 编译 node.c (例如使用 clang) 并链接静态库

    • 例如,使用如下命令:
      clang node.c <FLAGS> -ldora_node_api_c -L ../../target/release --output build/c_node
    • <FLAGS> 取决于C节点使用的操作系统和库。 每个操作系统都需要以下标志:
      • Linux: -lm -lrt -ldl -pthread
      • macOS: -framework CoreServices -framework Security -l System -l resolv -l pthread -l c -l m
      • Windows:
        -ladvapi32 -luserenv -lkernel32 -lws2_32 -lbcrypt -lncrypt -lschannel -lntdll -liphlpapi
        -lcfgmgr32 -lcredui -lcrypt32 -lcryptnet -lfwpuclnt -lgdi32 -lmsimg32 -lmswsock -lole32
        -lopengl32 -lsecur32 -lshell32 -lsynchronization -luser32 -lwinspool
        -Wl,-nodefaultlib:libcmt -D_DLL -lmsvcrt
        另外:在 Windows 上,输出文件应具有.exe扩展名:--output build/c_node.exe
  • sink.c 重复执行上一步

  • 在您的图中将其链接为:

  - id: c_sink
custom:
source: build/c_sink
inputs:
counter: runtime-node/c_operator/counter