Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

C++ API 参考

Adora 通过 CXX(Rust-C++ 互操作)为独立节点和进程内算子提供 C++ 绑定。CXX 桥接从 Rust 定义生成类型安全的 C++ 头文件——无需原始 FFI 或手动 extern "C" 声明。

两个 crate 提供 C++ 接口:

Crate用例
adora-node-api-cxxlibadora_node_api_cxx.a独立节点可执行文件
adora-operator-api-cxxlibadora_operator_api_cxx.a由运行时加载的共享库算子

生成的头文件:adora-node-api.hadora-operator-api.h


节点 API(adora-node-api-cxx

初始化

#include "adora-node-api.h"

// 从 Adora 守护进程设置的环境变量初始化节点。
// 返回包含事件流和输出发送器的 AdoraNode 结构体。
// 失败时抛出异常。
AdoraNode init_adora_node();

AdoraNode

init_adora_node() 返回。在节点的生命周期内拥有事件流和输出发送器。

struct AdoraNode {
    rust::Box<Events>        events;       // 事件流(阻塞接收器)
    rust::Box<OutputSender>  send_output;  // 输出发送器
};

Events

暴露给 C++ 的不透明 Rust 类型。提供对节点传入事件的阻塞迭代。

// 成员函数——直接在 boxed 对象上调用。
rust::Box<AdoraEvent> Events::next();

// 自由函数形式——等价于 events->next()。
rust::Box<AdoraEvent> next_event(rust::Box<Events>& events);

两种形式都会阻塞直到下一个事件到达,并返回一个拥有的 AdoraEvent

AdoraEvent

不透明 Rust 类型。使用 event_type() 检查其类型,然后使用 event_as_input()event_as_arrow_input() 进行向下转换。

// 确定事件类型。
AdoraEventType event_type(const rust::Box<AdoraEvent>& event);

// 向下转换为原始字节输入。如果事件不是 Input 则抛出异常。
AdoraInput event_as_input(rust::Box<AdoraEvent> event);

// 向下转换为 Arrow FFI 输入(写入 Arrow C Data Interface 结构体)。
// out_array 和 out_schema 必须指向有效的 ArrowArray / ArrowSchema 结构体。
// 成功时返回 error 为空的 AdoraResult。
AdoraResult event_as_arrow_input(
    rust::Box<AdoraEvent> event,
    uint8_t* out_array,
    uint8_t* out_schema);

// 与上面相同,但还返回输入 ID 和元数据。
ArrowInputInfo event_as_arrow_input_with_info(
    rust::Box<AdoraEvent> event,
    uint8_t* out_array,
    uint8_t* out_schema);

AdoraEventType

enum class AdoraEventType : uint8_t {
    Stop,             // 请求优雅关闭
    Input,            // 输入上有新数据到达
    InputClosed,      // 单个输入已关闭
    Error,            // 发生错误
    Unknown,          // 无法识别的事件变体
    AllInputsClosed,  // 所有输入已关闭(流已结束)
};

AdoraInput

event_as_input() 返回。包含原始字节。

struct AdoraInput {
    rust::String     id;    // 输入标识符(如 "tick"、"image")
    rust::Vec<uint8_t> data;  // 原始载荷字节
};

ArrowInputInfo

event_as_arrow_input_with_info() 返回。包含输入 ID、元数据和错误字符串。

struct ArrowInputInfo {
    rust::String       id;        // 输入标识符
    rust::Box<Metadata> metadata; // 附加的元数据
    rust::String       error;     // 成功时为空
};

AdoraResult

由输出发送函数返回。检查 error 字段——为空表示成功。

struct AdoraResult {
    rust::String error;  // 成功时为空字符串
};

OutputSender

不透明 Rust 类型。所有方法以 rust::Box<OutputSender>& 作为第一个参数(来自 AdoraNode::send_output 的发送器)。

send_output

在命名输出上发送原始字节。

AdoraResult send_output(
    rust::Box<OutputSender>& sender,
    rust::String id,
    rust::Slice<const uint8_t> data);

send_output_with_metadata

发送带有附加元数据的原始字节。

AdoraResult send_output_with_metadata(
    rust::Box<OutputSender>& sender,
    rust::String id,
    rust::Slice<const uint8_t> data,
    rust::Box<Metadata> metadata);

send_arrow_output

通过 C Data Interface 发送 Arrow 数组。指针必须引用有效的 ArrowArrayArrowSchema 结构体。成功时 Arrow 数据的所有权转移到 Rust。

AdoraResult send_arrow_output(
    rust::Box<OutputSender>& sender,
    rust::String id,
    uint8_t* array_ptr,
    uint8_t* schema_ptr);

// 带元数据的重载(通过 cxx_name 属性使用相同的 C++ 名称)。
AdoraResult send_arrow_output(
    rust::Box<OutputSender>& sender,
    rust::String id,
    uint8_t* array_ptr,
    uint8_t* schema_ptr,
    rust::Box<Metadata> metadata);

log_message

通过 Adora 日志系统发送日志消息。

AdoraResult log_message(
    const rust::Box<OutputSender>& sender,
    rust::String level,    // 如 "info"、"warn"、"error"
    rust::String message);

元数据

用于向输出附加类型化键值对的不透明 Rust 类型。

Construction

rust::Box<Metadata> new_metadata();

Reading

uint64_t     Metadata::timestamp() const;

bool         Metadata::get_bool(const rust::Str key) const;        // 缺失或类型错误时抛出异常
int64_t      Metadata::get_int(const rust::Str key) const;
double       Metadata::get_float(const rust::Str key) const;
rust::String Metadata::get_str(const rust::Str key) const;

rust::Vec<int64_t>      Metadata::get_list_int(const rust::Str key) const;
rust::Vec<double>       Metadata::get_list_float(const rust::Str key) const;
rust::Vec<rust::String> Metadata::get_list_string(const rust::Str key) const;

int64_t      Metadata::get_timestamp(const rust::Str key) const;   // 自纪元以来的纳秒数
rust::String Metadata::get_json(const rust::Str key) const;        // 单个值作为 JSON 字符串

Writing

所有 setter 在失败时抛出异常。

void Metadata::set_bool(const rust::Str key, bool value);
void Metadata::set_int(const rust::Str key, int64_t value);
void Metadata::set_float(const rust::Str key, double value);
void Metadata::set_string(const rust::Str key, rust::String value);

void Metadata::set_list_int(const rust::Str key, rust::Vec<int64_t> value);
void Metadata::set_list_float(const rust::Str key, rust::Vec<double> value);
void Metadata::set_list_string(const rust::Str key, rust::Vec<rust::String> value);

void Metadata::set_timestamp(const rust::Str key, int64_t nanos);  // 自纪元以来的纳秒数

Introspection

MetadataValueType Metadata::type(const rust::Str key) const;  // 键缺失时抛出异常
rust::String      Metadata::to_json() const;                   // 完整元数据作为 JSON
rust::Vec<rust::String> Metadata::list_keys() const;

MetadataValueType

enum class MetadataValueType : uint8_t {
    Bool,
    Integer,
    Float,
    String,
    ListInt,
    ListFloat,
    ListString,
    Timestamp,
};

Service, Action, and Streaming Patterns

C++ nodes can implement communication patterns using the metadata API. The well-known metadata keys are:

Key描述
"request_id"服务请求/响应关联(UUID v7)
"goal_id"动作目标标识(UUID v7)
"goal_status"动作结果状态:"succeeded""aborted""canceled"
"session_id"Streaming session identifier
"segment_id"Streaming segment within a session (integer)
"seq"Streaming chunk sequence number (integer)
"fin"Last chunk of a streaming segment (bool)
"flush"Discard older queued messages on input (bool)
// 服务服务端:传递输入元数据中的 request_id
auto input_metadata = event_as_arrow_input_with_info(event);
send_output_with_metadata(sender, "response", result, std::move(input_metadata.metadata));

// 动作服务端:在结果上设置 goal_id 和 goal_status
auto meta = new_metadata();
meta->set_string("goal_id", goal_id);
meta->set_string("goal_status", "succeeded");
send_output_with_metadata(sender, "result", result_data, std::move(meta));

CombinedEvents(ROS2 集成)

使用可选的 ros2-bridge 特性时,节点事件和 ROS2 订阅事件可以合并为单个流。

// 将 Adora 事件转换为合并流。
CombinedEvents adora_events_into_combined(rust::Box<Events> events);

// 创建空的合并流(用于仅 ROS2 的节点)。
CombinedEvents empty_combined_events();

CombinedEvents 结构体

struct CombinedEvents {
    rust::Box<MergedEvents> events;

    CombinedEvent next();  // 阻塞——返回下一个合并事件
};

CombinedEvent 结构体

struct CombinedEvent {
    rust::Box<MergedAdoraEvent> event;

    bool is_adora() const;  // 如果这是标准 Adora 事件则为 true
};

// 将合并事件向下转换为 AdoraEvent。如果不是 Adora 事件则抛出异常。
rust::Box<AdoraEvent> downcast_adora(CombinedEvent event);

ROS2 subscriptions add their own events to the merged stream. Use subscription->matches(event) and subscription->downcast(event) to handle ROS2-specific events (see the ROS2 Bridge docs).


算子 API(adora-operator-api-cxx

算子是由 Adora 运行时加载的共享库。C++ 端实现两个由 CXX 桥接调用的函数。

必需的 C++ 接口

你必须提供头文件 operator.h 和实现文件。头文件声明一个 Operator 类和两个自由函数:

// operator.h
#pragma once
#include <memory>
#include "adora-operator-api.h"

class Operator {
public:
    Operator();
    // 添加算子需要的任何状态。
};

std::unique_ptr<Operator> new_operator();

AdoraOnInputResult on_input(
    Operator& op,
    rust::Str id,
    rust::Slice<const uint8_t> data,
    OutputSender& output_sender);
  • new_operator() —— 启动时调用一次;返回算子实例。
  • on_input() —— 对每个输入事件调用;处理数据并可选地发送输出。

OutputSender(算子)

on_input() 内可用。在命名输出上发送数据。

AdoraSendOutputResult send_output(
    OutputSender& sender,
    rust::Str id,
    rust::Slice<const uint8_t> data);

结果类型

struct AdoraOnInputResult {
    rust::String error;  // 成功时为空
    bool         stop;   // 为 true 时请求优雅关闭
};

struct AdoraSendOutputResult {
    rust::String error;  // 成功时为空
};

快速开始:节点示例

一个接收定时器 tick 并发送计数器的最小节点。

#include "adora-node-api.h"
#include <iostream>
#include <vector>

int main() {
    auto adora_node = init_adora_node();
    unsigned char counter = 0;

    for (;;) {
        auto event = next_event(adora_node.events);
        auto ty = event_type(event);

        if (ty == AdoraEventType::AllInputsClosed) {
            break;
        }
        if (ty == AdoraEventType::Stop) {
            break;
        }
        if (ty == AdoraEventType::Input) {
            auto input = event_as_input(std::move(event));
            counter += 1;

            std::cout << "Input: " << std::string(input.id)
                      << " counter=" << (int)counter << std::endl;

            std::vector<unsigned char> out{counter};
            rust::Slice<const uint8_t> slice{out.data(), out.size()};
            auto result = send_output(adora_node.send_output, "counter", slice);
            if (!result.error.empty()) {
                std::cerr << "Send error: " << std::string(result.error) << std::endl;
                return 1;
            }
        }
    }
    return 0;
}

数据流 YAML:

nodes:
  - id: cxx-node
    path: build/my_node
    inputs:
      tick: adora/timer/millis/300
    outputs:
      - counter

快速开始:Arrow 节点示例

一个通过 C Data Interface 接收和发送 Arrow 数组的节点,带有元数据。

#include "adora-node-api.h"
#include <arrow/api.h>
#include <arrow/c/bridge.h>
#include <iostream>

int main() {
    auto adora_node = init_adora_node();

    for (int i = 0; i < 10; i++) {
        auto event = adora_node.events->next();
        auto ty = event_type(event);

        if (ty == AdoraEventType::AllInputsClosed || ty == AdoraEventType::Stop) {
            break;
        }
        if (ty == AdoraEventType::Input) {
            // 接收带元数据的 Arrow 输入
            struct ArrowArray c_array;
            struct ArrowSchema c_schema;
            auto info = event_as_arrow_input_with_info(
                std::move(event),
                reinterpret_cast<uint8_t*>(&c_array),
                reinterpret_cast<uint8_t*>(&c_schema));

            if (!info.error.empty()) {
                std::cerr << std::string(info.error) << std::endl;
                continue;
            }

            std::cout << "Input: " << std::string(info.id)
                      << " ts=" << info.metadata->timestamp() << std::endl;

            auto imported = arrow::ImportArray(&c_array, &c_schema);
            auto array = imported.ValueOrDie();
            std::cout << "Arrow: " << array->ToString() << std::endl;

            // 构建输出 Arrow 数组
            arrow::Int32Builder builder;
            builder.Append(i * 10);
            std::shared_ptr<arrow::Array> out_array;
            builder.Finish(&out_array);

            // 导出并带元数据发送
            struct ArrowArray out_c_array;
            struct ArrowSchema out_c_schema;
            arrow::ExportArray(*out_array, &out_c_array, &out_c_schema);

            auto meta = new_metadata();
            meta->set_string("source", "cpp-arrow-node");
            meta->set_int("iteration", i);

            auto result = send_arrow_output(
                adora_node.send_output, "counter",
                reinterpret_cast<uint8_t*>(&out_c_array),
                reinterpret_cast<uint8_t*>(&out_c_schema),
                std::move(meta));

            if (!result.error.empty()) {
                std::cerr << "Send error: " << std::string(result.error) << std::endl;
            }
        }
    }
    return 0;
}

快速开始:算子示例

一个最小的算子共享库。

// operator.cc
#include "operator.h"
#include <iostream>
#include <vector>

Operator::Operator() {}

std::unique_ptr<Operator> new_operator() {
    return std::make_unique<Operator>();
}

AdoraOnInputResult on_input(
    Operator& op,
    rust::Str id,
    rust::Slice<const uint8_t> data,
    OutputSender& output_sender)
{
    op.counter += 1;

    std::vector<unsigned char> out{op.counter};
    rust::Slice<const uint8_t> slice{out.data(), out.size()};
    auto send_result = send_output(output_sender, rust::Str("status"), slice);

    return AdoraOnInputResult{send_result.error, false};
}

数据流 YAML:

nodes:
  - id: runtime-node
    operators:
      - id: my-operator
        shared-library: build/my_operator
        inputs:
          data: some-node/output
        outputs:
          - status

构建集成(CMake)

推荐的构建方式是使用 CMake 配合 DoraTargets.cmake 辅助脚本(见 examples/cmake-dataflow/)。

项目结构

my-project/
  CMakeLists.txt
  DoraTargets.cmake       # copied from examples/cmake-dataflow/
  node/main.cc
  operator/operator.h
  operator/operator.cc
  dataflow.yml

CMakeLists.txt

cmake_minimum_required(VERSION 3.21)
project(my-dataflow LANGUAGES C CXX)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_FLAGS "-fPIC")

include(DoraTargets.cmake)
link_directories(${adora_link_dirs})

# Standalone node (executable)
add_executable(my_node node/main.cc ${node_bridge})
add_dependencies(my_node Adora_cxx)
target_include_directories(my_node PRIVATE ${adora_cxx_include_dir})
target_link_libraries(my_node adora_node_api_cxx)

# Operator (shared library)
add_library(my_operator SHARED
    operator/operator.cc ${operator_bridge})
add_dependencies(my_operator Adora_cxx)
target_include_directories(my_operator PRIVATE
    ${adora_cxx_include_dir} ${adora_c_include_dir}
    ${CMAKE_CURRENT_SOURCE_DIR}/operator)
target_link_libraries(my_operator adora_operator_api_cxx)

install(TARGETS my_node DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)
install(TARGETS my_operator DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/lib)

DoraTargets.cmake 提供的内容

变量描述
adora_cxx_include_dir生成的 CXX 头文件路径(adora-node-api.hadora-operator-api.h
adora_c_include_dirC API 头文件路径(用于混合 C/C++ 项目)
adora_link_dirslibadora_node_api_cxx.a / libadora_operator_api_cxx.a 的库搜索路径
node_bridge为节点生成的 CXX 桥接源文件(node_bridge.cc
operator_bridge为算子生成的 CXX 桥接源文件(operator_bridge.cc
Adora_cxx构建 CXX crate 的 CMake 目标依赖

构建步骤

# Option A: Build against local Adora source
mkdir build && cd build
cmake .. -DDORA_ROOT_DIR=/path/to/adora
cmake --build .

# Option B: Build against Adora from GitHub (cloned automatically)
mkdir build && cd build
cmake ..
cmake --build .

要求

  • C++20 编译器
  • Rust 工具链(用于通过 Cargo 构建 Adora 静态库)
  • CMake 3.21+
  • Arrow 集成需要:Apache Arrow C++ 库

CXX 桥接注意事项

  • 所有 Rust 不透明类型(EventsOutputSenderAdoraEventMetadataMergedEventsMergedAdoraEvent)通过 rust::Box<T> 访问。
  • rust::Stringrust::Vec<T>rust::Slice<const T> 是 CXX 桥接类型,与对应的 C++ 标准库类型互操作。参见 CXX 类型参考
  • 在 Rust 中返回 Result<T> 的函数在错误路径上抛出 C++ 异常。
  • Arrow FFI 函数(event_as_arrow_inputsend_arrow_output)在 Rust 侧是 unsafe 的。调用者必须传递转换为 uint8_t* 的有效 ArrowArray / ArrowSchema 结构体指针。
  • 节点库是静态归档(staticlib)。使用 -ladora_node_api_cxx 将其链接到你的可执行文件。
  • 算子库也是静态归档。使用 -ladora_operator_api_cxx 将其链接到你的共享库。