ROS2 桥接
Dora talks to the ROS2 ecosystem (topics, services, actions, parameters) over a pure-Rust DDS/RTPS stack (ros2-client + rustdds) – it never links rcl/rclcpp, and data crosses as Apache Arrow StructArray, converted to/from ROS2 CDR at the boundary.
本指南介绍使用桥接的两种方式:
- YAML / 动态桥接 —— 在节点上添加
ros2:键,框架便会派生一个桥接二进制;你的节点代码无需引入 ROS2,只交换纯 Arrow 数据。与语言无关、零代码。本指南的主体内容即此。 - 原生代码 API —— 通过各语言的类型化 API 直接从节点代码调用 ROS2:Python(
dora.Ros2Context)接口,以及由代码生成的 Rust / C++ 接口。当你想在节点中以命令式方式驱动 ROS2 时使用。参见下文的 原生代码 API。
功能一览
| 特性 | 配置 | 描述 |
|---|---|---|
| 主题订阅 | topic + direction: subscribe | 从 ROS2 接收,转发为 Arrow |
| 主题发布 | topic + direction: publish | 接收 Arrow,发布到 ROS2 |
| Multi-topic | topics | 在单个 ROS2 节点上处理多个话题 |
| 服务客户端 | service + role: client | Send requests, receive responses |
| 服务服务端 | service + role: server | Receive requests, send responses |
| 动作客户端 | action + role: client | Send goals, receive feedback + result |
| 动作服务端 | action + role: server | Receive goals, send feedback + result |
| QoS policies | qos | Reliability, durability, history, liveliness |
| Auto-spawn | Automatic | 由守护进程作为 Custom 节点派生的桥接二进制 |
Two Surfaces, One Stack
Dora 节点如何抵达 ROS2 线路,以及每种接口提供什么:
┌──────────────────────────────────────────────────────────────────────────┐
│ USER CODE (dora node) Rust · C++ · Python │
└───────────────┬──────────────────────────────────┬───────────────────────┘
│ │
┌────────────▼─────────────┐ ┌─────────────▼──────────────────────┐
│ (a) NATIVE CODE API │ │ (b) YAML / DYNAMIC BRIDGE │
│ compile-time typed │ │ YAML-driven, language-agnostic │
│ structs from .msg/.srv/ │ │ dora-ros2-bridge-node binary │
│ .action (codegen) │ │ + pyo3 (Python, in-process) │
│ → Rust / C++ │ │ Apache Arrow + thread-local │
│ + pyo3 classes (Python) │ │ TypeInfo │
└────────────┬─────────────┘ └─────────────┬──────────────────────┘
│ │
└──────────────┬─────────────────────┘
│ serialize ↔ Arrow ↔ CDR
┌────────────▼─────────────┐
│ PURE-RUST DDS/RTPS STACK │ ← never links rcl / rclcpp
│ ros2-client · rustdds │
└────────────┬─────────────┘
│ RTPS over UDP (SPDP discovery)
╔══════════════════════▼═══════════════════════════════════╗
║ DDS WIRE ── ROS2 ECOSYSTEM (rclcpp / rclpy nodes, ║
║ turtlesim, ros2 CLI, Nav2, ...) ║
╚═══════════════════════════════════════════════════════════╝
两种接口都基于同一套 ros2-client / rustdds 栈。“客户端” = 调用 ROS2 的 Dora 节点(dora→ROS2);“服务端” = 为 ROS2 客户端提供服务的 Dora 节点(ROS2→dora)。
Capability × surface (all supported):
| topic pub/sub | service client | service server | action client | action server | parameters | |
|---|---|---|---|---|---|---|
| YAML bridge | ✅ | ✅ | ✅ | ✅ | ✅ | (node-hosted) |
| Rust (codegen) | ✅ | ✅ | ✅ | ✅ | ✅ | 通过 ros2-client |
| C++ (codegen) | ✅ | ✅ | ✅ | ✅ | ✅ | 通过 ros2-client |
| Python (pyo3) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
每个单元格对应的可运行示例都在 examples/ros2-bridge/ 下(rust/、python/、c++/ 以及 yaml-bridge* 数据流)。
架构
当 Dora 描述符解析器在节点上遇到 ros2: 键时,会将其转换为指向 dora-ros2-bridge-node 二进制的 Custom 节点。桥接配置以 JSON 形式序列化到 DORA_ROS2_BRIDGE_CONFIG 环境变量中。
User Node <--(Arrow/SharedMem)--> Bridge Binary <--(CDR/DDS)--> ROS2
桥接二进制:
- 读取
AMENT_PREFIX_PATH以定位已安装的 ROS2 消息包 - 在启动时解析 message/service/action 定义
- 创建
ros2_client节点以及相应的发布者、订阅者、客户端或服务端 - Converts incoming ROS2 CDR messages to Arrow
StructArray(subscribe/response/feedback) - Converts incoming Arrow
StructArrayto ROS2 CDR messages (publish/request/goal)
你的用户节点从不链接 ROS2 —— 所有 ROS2 通信都隔离在桥接二进制中。
前提条件
- 已 source ROS2 环境:必须设置
AMENT_PREFIX_PATH并指向包含所需消息包的工作区 - Message packages installed: e.g.,
turtlesim,geometry_msgs,example_interfaces - 对于服务客户端:必须有一个 ROS2 服务端在运行(或使用配套的服务端数据流)
- 对于动作客户端:必须在启动数据流之前运行一个 ROS2 动作服务端(没有
wait_for_action_server机制) - 对于动作服务端:由 ROS2 动作客户端向桥接发送目标(例如
ros2 action send_goal)
Topic Bridge
单话题(订阅)
订阅一个 ROS2 话题,并将消息作为 Arrow 数据转发给下游 Dora 节点。
nodes:
- id: pose_bridge
ros2:
topic: /turtle1/pose
message_type: turtlesim/Pose
direction: subscribe # default, can be omitted
outputs:
- pose
The bridge creates a ROS2 subscription on /turtle1/pose, deserializes each incoming turtlesim/Pose message into an Arrow StructArray, and sends it on the pose output.
单话题(发布)
从 Dora 节点接收 Arrow 数据并发布到 ROS2 话题。
nodes:
- id: cmd_bridge
ros2:
topic: /turtle1/cmd_vel
message_type: geometry_msgs/Twist
direction: publish
inputs:
cmd_vel: planner/cmd_vel
桥接在 cmd_vel 输入上接收 Arrow 数据,将其序列化为 geometry_msgs/Twist CDR,并发布到 /turtle1/cmd_vel。
Multi-Topic
在单个 ROS2 节点上下文中桥接多个话题,可混合订阅与发布方向。
nodes:
- id: turtle_bridge
ros2:
topics:
- topic: /turtle1/pose
message_type: turtlesim/Pose
direction: subscribe
output: pose
- topic: /turtle1/cmd_vel
message_type: geometry_msgs/Twist
direction: publish
input: velocity
qos:
reliable: true
keep_last: 10
inputs:
velocity: planner/cmd_vel
outputs:
- pose
多话题模式下,每个桥接节点最多支持 64 个话题。
Input/Output ID Mapping
默认情况下,话题名会通过去掉开头的 / 并将其余 / 替换为 _ 来转换为 Dora ID:
| ROS2 Topic | 默认 Dora ID |
|---|---|
/turtle1/pose | turtle1_pose |
/camera/image_raw | camera_image_raw |
在多话题模式下,你可以用显式的 output(订阅)或 input(发布)字段覆盖此映射。在单话题模式下,直接使用节点声明的 outputs 或 inputs。
Service Bridge
Service Client
从 Dora 向外部 ROS2 服务发送请求并接收响应。
nodes:
- id: add_client
ros2:
service: /add_two_ints
service_type: example_interfaces/AddTwoInts
role: 客户端
inputs:
request: requester/data
outputs:
- response
桥接会等待服务可用(最多重试 10 次,每次 2 秒),然后对接收到的每个 Arrow 输入:
- 将 Arrow 数据序列化为
AddTwoInts_RequestCDR 消息 - 将请求发送到 ROS2 服务
- 等待响应(30 秒超时)
- 将响应反序列化为 Arrow 并在
response输出上发送
Service Server
将一个 Dora 处理节点暴露为 ROS2 服务,供外部 ROS2 客户端调用。
nodes:
- id: add_server
ros2:
service: /dora_add_two_ints
service_type: example_interfaces/AddTwoInts
role: 服务端
inputs:
response: handler/result
outputs:
- request
- id: handler
path: path/to/handler-node
inputs:
request: add_server/request
outputs:
- result
桥接接收 ROS2 服务请求,为每个请求分配唯一的 request_id(UUID v7),将请求数据作为 Arrow 在 request 输出上转发(元数据中带 request_id),并等待处理节点在 response 输入上带着相同的 request_id 回送响应。随后响应会被返回给正确的 ROS2 客户端。
可运行示例参见 examples/ros2-bridge/yaml-bridge-service/。
请求 ID 关联
每个传入的 ROS2 请求都会被分配一个 request_id 元数据参数。处理节点在发送响应时必须在元数据中包含相同的 request_id。最简单的做法是透传 metadata.parameters:
#![allow(unused)]
fn main() {
Event::Input { id, metadata, data } => {
// metadata.parameters contains request_id
let result = compute(data);
node.send_service_response("response".into(), metadata.parameters, result)?;
}
}
响应可以以任意顺序到达 —— 桥接按 request_id 而非到达顺序进行关联。超过 30 秒未处理的陈旧请求会被清除。待处理请求队列上限为 64 —— 队列满时多余的请求会被丢弃。
服务等待与超时
| Behavior | 值 |
|---|---|
| 服务客户端:等待可用 | 重试 10 次,每次 2 秒(共 20 秒) |
| 服务客户端:响应超时 | 30 seconds |
| 服务服务端:待处理请求上限 | 64 |
Action Bridge
Action Client
从 Dora 向外部 ROS2 动作服务端发送目标,并接收反馈与结果。
nodes:
- id: fib_client
ros2:
action: /fibonacci
action_type: example_interfaces/Fibonacci
role: 客户端
inputs:
goal: goal_sender/goal
outputs:
- feedback
- result
对每个 Arrow 目标输入:
- 将 Arrow 数据序列化为
Fibonacci_GoalCDR 消息 - 将目标发送到动作服务端(30 秒超时)
- 若被接受,则为反馈和结果派生后台线程
- 反馈消息随流式到达而在
feedback输出上送出 - 最终结果在
result输出上到达(5 分钟超时)
反馈流与结果流
动作桥接在不同的输出上分别发送反馈和结果:
feedback:每当动作服务端送来反馈消息时流式输出。以 Arrow 形式包含该动作的反馈消息(例如 Fibonacci 的{partial_sequence: int32[]})result:在动作完成时发送一次。以 Arrow 形式包含该动作的结果消息(例如 Fibonacci 的{sequence: int32[]})
Concurrent Goals
桥接最多支持 8 个并发进行中的目标(MAX_CONCURRENT_GOALS)。多余的目标会被丢弃并记录警告。每个目标会派生专门的反馈和结果读取线程。
Timeouts
| Behavior | 值 |
|---|---|
| 目标发送超时 | 30 seconds |
| Result retrieval timeout | 5 minutes |
| Feedback | 无超时(持续流式直到动作完成) |
Action Server
将一个 Dora 处理节点暴露为 ROS2 动作服务端,供外部 ROS2 客户端调用。
nodes:
- id: fib_server
ros2:
action: /fibonacci
action_type: example_interfaces/Fibonacci
role: 服务端
inputs:
feedback: handler/feedback
result: handler/result
outputs:
- goal
- id: handler
path: path/to/handler-node
inputs:
goal: fib_server/goal
outputs:
- feedback
- result
桥接接收来自 ROS2 客户端的目标,自动接受它们,并在 goal 输出上转发目标数据。处理节点计算反馈和结果,并在 feedback 和 result 输入上回送。
可运行的 Fibonacci 示例参见 examples/ros2-bridge/yaml-bridge-action-server/。
目标 ID 元数据
每个目标由作为 goal_id 元数据参数传递的 UUID 字符串标识。桥接会在每个 goal 输出上设置 goal_id。处理节点在发送 feedback 和 result 时必须包含相同的 goal_id,以便桥接将它们关联到正确的目标。
最简单的做法是从目标事件透传 metadata.parameters:
#![allow(unused)]
fn main() {
Event::Input { id, metadata, data } => match id.as_str() {
"goal" => {
let params = metadata.parameters; // contains goal_id
// ... compute ...
node.send_output("feedback".into(), params.clone(), feedback)?;
node.send_output("result".into(), params, result)?;
}
// ...
}
}
动作服务端生命周期
- ROS2 客户端发送目标请求
- 桥接自动接受目标并开始执行
- 桥接在
goal输出上发送目标数据,元数据中带goal_id - 处理节点带着相同的
goal_id发送feedback(零次或多次) - 处理节点带着相同的
goal_id发送result(一次);桥接将其返回给 ROS2 客户端 - 如果客户端始终不请求结果,结果发送会在 5 分钟后超时
不含数据或无法转发给处理节点的目标会被自动中止 —— 桥接向 ROS2 客户端回送 Aborted 状态,使其不会无限期挂起。
Goal Status
默认情况下,结果以 Succeeded 状态返回。处理节点可通过在结果输出上设置 goal_status 元数据参数来覆盖:
goal_status value | ROS2 Status | 用例 |
|---|---|---|
"succeeded" (or omitted) | Succeeded | 目标成功完成 |
"aborted" | Aborted | 目标在执行期间失败 |
"canceled" | Canceled | 目标被处理节点取消 |
无法识别的 goal_status 值默认为 Aborted 并记录警告。完全省略 goal_status 则默认为 Succeeded。
Rust example:
#![allow(unused)]
fn main() {
use dora_node_api::{GOAL_STATUS, GOAL_STATUS_ABORTED, Parameter};
let mut params = metadata.parameters; // contains goal_id
params.insert(GOAL_STATUS.to_string(), Parameter::String(GOAL_STATUS_ABORTED.to_string()));
node.send_output("result".into(), params, error_result)?;
}
动作服务端限制
| Behavior | 值 |
|---|---|
| Max concurrent goals | 8 (additional goals receive Aborted status) |
| Auto-accept | 所有目标都被自动接受 |
| Result send timeout | 5 minutes |
Python 动作服务端处理节点
Python 节点以 PyArrow 数组形式接收目标数据,元数据字典中带 goal_id。在反馈/结果输出上将其透传:
for event in node:
if event["type"] == "INPUT" and event["id"] == "goal":
goal_id = event["metadata"]["goal_id"]
order = event["value"]["order"][0].as_py()
# Send feedback
node.send_output("feedback", feedback_array, {"goal_id": goal_id})
# Send result (with optional status)
node.send_output("result", result_array, {
"goal_id": goal_id,
"goal_status": "succeeded", # or "aborted", "canceled"
})
C++ 动作服务端处理节点
C++ nodes access goal_id via type-safe metadata accessors:
auto goal_id = metadata->get_str("goal_id");
// Send feedback with goal_id
auto fb_metadata = new_metadata();
fb_metadata->set_string("goal_id", goal_id);
send_arrow_output_with_metadata("feedback", feedback_data, fb_metadata);
// Send result with goal_id
auto res_metadata = new_metadata();
res_metadata->set_string("goal_id", goal_id);
send_arrow_output_with_metadata("result", result_data, res_metadata);
服务质量(QoS)
配置
可在桥接层级设置 QoS(应用于所有话题/通道),或在多话题模式下按话题设置。
nodes:
- id: my_bridge
ros2:
topic: /sensor/data
message_type: sensor_msgs/LaserScan
qos:
reliable: true
durability: transient_local
keep_last: 10
liveliness: automatic
lease_duration: 5.0
max_blocking_time: 0.5
Defaults
| Field | 默认 |
|---|---|
reliable | false (best effort) |
durability | volatile |
liveliness | automatic |
lease_duration | infinity |
max_blocking_time | 100ms(仅在 reliable: true 时生效) |
keep_last | 1 |
keep_all | false |
按话题覆盖 QoS
在多话题模式下,每个话题都可以覆盖桥接层级的 QoS:
ros2:
topics:
- topic: /fast_sensor
message_type: sensor_msgs/Imu
direction: subscribe
qos:
reliable: false # override: best effort for this topic
keep_last: 1
- topic: /cmd
message_type: geometry_msgs/Twist
direction: publish
# inherits bridge-level QoS (reliable: true)
qos:
reliable: true # default for all topics
keep_last: 10
Validation Rules
| Field | Valid Values |
|---|---|
reliable | true, false |
durability | "volatile", "transient_local" |
liveliness | "automatic", "manual_by_participant", "manual_by_topic" |
keep_last | 1 to 10000 |
keep_all | true, false (mutually exclusive intent with keep_last) |
lease_duration | Finite non-negative float (seconds) |
max_blocking_time | Finite non-negative float (seconds) |
Data Format: Arrow Structs
All data exchanged between your nodes and the bridge uses Arrow StructArray with a single row. Each field in the ROS2 message becomes a column in the struct.
如何构建 Arrow 消息
Rust 示例:构建一个 AddTwoInts_Request({a: i64, b: i64}):
#![allow(unused)]
fn main() {
use std::sync::Arc;
use arrow::array::{Array, Int64Array, StructArray};
use arrow::datatypes::{DataType, Field};
fn make_add_request(a: i64, b: i64) -> StructArray {
let fields = vec![
Arc::new(Field::new("a", DataType::Int64, false)),
Arc::new(Field::new("b", DataType::Int64, false)),
];
let arrays: Vec<Arc<dyn Array>> = vec![
Arc::new(Int64Array::from(vec![a])),
Arc::new(Int64Array::from(vec![b])),
];
StructArray::try_new(fields.into(), arrays, None)
.expect("failed to create struct array")
}
}
读取响应({sum: i64}):
#![allow(unused)]
fn main() {
use arrow::array::{Int64Array, StructArray};
fn read_response(data: &dyn arrow::array::Array) -> i64 {
let struct_array = data
.as_any()
.downcast_ref::<StructArray>()
.expect("expected struct array");
struct_array
.column_by_name("sum")
.expect("missing 'sum' field")
.as_any()
.downcast_ref::<Int64Array>()
.expect("expected Int64Array")
.value(0)
}
}
ROS2 类型到 Arrow 类型的映射
| ROS2 Type | Arrow Type | Rust Arrow Array |
|---|---|---|
bool | Boolean | BooleanArray |
int8 | Int8 | Int8Array |
int16 | Int16 | Int16Array |
int32 | Int32 | Int32Array |
int64 | Int64 | Int64Array |
uint8 / byte / char | UInt8 | UInt8Array |
uint16 | UInt16 | UInt16Array |
uint32 | UInt32 | UInt32Array |
uint64 | UInt64 | UInt64Array |
float32 | Float32 | Float32Array |
float64 | Float64 | Float64Array |
string | Utf8 | StringArray |
wstring | Utf8 (encoded as UTF-16 on CDR side) | StringArray |
| Nested message | Struct | StructArray |
序列与数组
| ROS2 Type | Arrow Type | Rust Arrow Array |
|---|---|---|
Variable-length sequence (int32[]) | List | ListArray |
Bounded sequence (int32[<=10]) | List (length validated) | ListArray |
Fixed-size array (int32[3]) | FixedSizeList | FixedSizeListArray |
示例:从 Fibonacci 反馈({partial_sequence: int32[]})读取一个 ListArray:
#![allow(unused)]
fn main() {
use arrow::array::{Int32Array, ListArray, StructArray};
let struct_array = data.as_any().downcast_ref::<StructArray>().unwrap();
let list = struct_array
.column_by_name("partial_sequence")
.unwrap()
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let values = list
.value(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values()
.to_vec();
}
Complete YAML Reference
nodes:
- id: my_bridge
ros2:
# --- Mode (exactly one required) ---
# Single topic mode
topic: /topic_name # ROS2 topic name
message_type: package/TypeName # ROS2 message type
direction: subscribe # subscribe (default) | publish
# Multi-topic mode (mutually exclusive with topic)
topics:
- topic: /topic_a
message_type: package/TypeA
direction: subscribe
output: custom_output_id # override default ID mapping
qos: # per-topic QoS override
reliable: true
- topic: /topic_b
message_type: package/TypeB
direction: publish
input: custom_input_id # override default ID mapping
# Service mode (mutually exclusive with topic/topics/action)
service: /service_name # ROS2 service name
service_type: package/TypeName # ROS2 service type
role: client # client | server
# Action mode (mutually exclusive with topic/topics/service)
action: /action_name # ROS2 action name
action_type: package/TypeName # ROS2 action type
role: client # client | server
# --- QoS (optional, applies to all channels) ---
qos:
reliable: false # true | false (default: false = best effort)
durability: volatile # volatile (default) | transient_local
liveliness: automatic # automatic | manual_by_participant | manual_by_topic
lease_duration: 5.0 # seconds (default: infinity)
max_blocking_time: 0.1 # seconds (default: 0.1, reliable only)
keep_last: 1 # 1-10000 (default: 1)
keep_all: false # true | false (default: false)
# --- Optional ROS2 node config ---
namespace: / # ROS2 namespace (default: "/")
node_name: my_ros_node # ROS2 node name (default: dora node id)
# --- Standard Dora node fields ---
inputs:
input_id: source_node/output_id
outputs:
- output_id
Use Case Scenarios
1. 订阅传感器数据(turtlesim 位姿)
nodes:
- id: pose_bridge
ros2:
topic: /turtle1/pose
message_type: turtlesim/Pose
outputs:
- pose
- id: my_processor
path: ./target/debug/my-processor
inputs:
pose: pose_bridge/pose
#![allow(unused)]
fn main() {
// In my_processor: receive turtlesim/Pose as Arrow
Event::Input { id, data, .. } if id.as_str() == "pose" => {
let s = data.as_any().downcast_ref::<StructArray>().unwrap();
let x = s.column_by_name("x").unwrap()
.as_any().downcast_ref::<Float32Array>().unwrap().value(0);
let y = s.column_by_name("y").unwrap()
.as_any().downcast_ref::<Float32Array>().unwrap().value(0);
println!("Turtle at ({x}, {y})");
}
}
2. Publish Velocity Commands
nodes:
- id: planner
path: ./target/debug/planner
inputs:
tick: dora/timer/millis/100
outputs:
- cmd_vel
- id: cmd_bridge
ros2:
topic: /turtle1/cmd_vel
message_type: geometry_msgs/Twist
direction: publish
inputs:
cmd_vel: planner/cmd_vel
#![allow(unused)]
fn main() {
// In planner: send geometry_msgs/Twist as Arrow
// Twist has nested Vector3 fields: linear {x,y,z} and angular {x,y,z}
fn make_twist(linear_x: f64, angular_z: f64) -> StructArray {
let vec3_fields = vec![
Arc::new(Field::new("x", DataType::Float64, false)),
Arc::new(Field::new("y", DataType::Float64, false)),
Arc::new(Field::new("z", DataType::Float64, false)),
];
let linear = StructArray::try_new(
vec3_fields.clone().into(),
vec![
Arc::new(Float64Array::from(vec![linear_x])) as _,
Arc::new(Float64Array::from(vec![0.0])) as _,
Arc::new(Float64Array::from(vec![0.0])) as _,
],
None,
).unwrap();
let angular = StructArray::try_new(
vec3_fields.into(),
vec![
Arc::new(Float64Array::from(vec![0.0])) as _,
Arc::new(Float64Array::from(vec![0.0])) as _,
Arc::new(Float64Array::from(vec![angular_z])) as _,
],
None,
).unwrap();
let fields = vec![
Arc::new(Field::new("linear", linear.data_type().clone(), false)),
Arc::new(Field::new("angular", angular.data_type().clone(), false)),
];
StructArray::try_new(
fields.into(),
vec![Arc::new(linear) as _, Arc::new(angular) as _],
None,
).unwrap()
}
}
3. 多话题双向桥接
在单个 ROS2 节点上订阅位姿并发布速度。
nodes:
- id: turtle_bridge
ros2:
topics:
- topic: /turtle1/pose
message_type: turtlesim/Pose
direction: subscribe
output: pose
- topic: /turtle1/cmd_vel
message_type: geometry_msgs/Twist
direction: publish
input: velocity
qos:
reliable: true
keep_last: 10
inputs:
velocity: planner/cmd_vel
outputs:
- pose
- id: planner
path: ./target/debug/planner
inputs:
pose: turtle_bridge/pose
tick: dora/timer/millis/100
outputs:
- cmd_vel
4. 服务客户端:调用外部 ROS2 服务
nodes:
- id: requester
path: ./target/debug/requester
inputs:
tick: dora/timer/millis/1000
response: add_client/response
outputs:
- request
- id: add_client
ros2:
service: /add_two_ints
service_type: example_interfaces/AddTwoInts
role: 客户端
inputs:
request: requester/request
outputs:
- response
前提条件:先运行一个 ROS2 服务:
ros2 run examples_rclcpp_minimal_service service_main
5. 服务服务端:将 Dora 处理节点暴露为 ROS2 服务
nodes:
- id: add_server
ros2:
service: /add_two_ints
service_type: example_interfaces/AddTwoInts
role: 服务端
inputs:
response: handler/response
outputs:
- request
- id: handler
path: ./target/debug/handler
inputs:
request: add_server/request
outputs:
- response
处理节点以 Arrow 形式接收 {a: i64, b: i64},计算结果,并回送 {sum: i64}。外部 ROS2 客户端可以调用此服务:
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 3, b: 5}"
6. 动作客户端:长时间运行的 Fibonacci 目标
nodes:
- id: goal_sender
path: ./target/debug/goal-sender
inputs:
tick: dora/timer/millis/5000
feedback: fib_client/feedback
result: fib_client/result
outputs:
- goal
- id: fib_client
ros2:
action: /fibonacci
action_type: example_interfaces/Fibonacci
role: 客户端
inputs:
goal: goal_sender/goal
outputs:
- feedback
- result
前提条件:在数据流之前启动动作服务端:
ros2 run examples_rclcpp_action_server fibonacci_action_server
目标节点发送 {order: int32},接收流式的 {partial_sequence: int32[]} 反馈,以及最终的 {sequence: int32[]} 结果。
7. 动作服务端:将 Dora 处理节点暴露为 ROS2 动作
nodes:
- id: fib_server
ros2:
action: /fibonacci
action_type: example_interfaces/Fibonacci
role: 服务端
inputs:
feedback: handler/feedback
result: handler/result
outputs:
- goal
- id: handler
path: ./target/debug/handler
inputs:
goal: fib_server/goal
outputs:
- feedback
- result
处理节点接收带 goal_id 元数据的 {order: int32} 目标,发送 {partial_sequence: int32[]} 反馈,以及最终的 {sequence: int32[]} 结果 —— 全部在元数据中带相同的 goal_id。外部 ROS2 客户端可以发送目标:
ros2 action send_goal /fibonacci example_interfaces/action/Fibonacci "{order: 10}"
Native Code APIs (Rust / Python / C++)
上面的 YAML 桥接与语言无关,节点中无需任何 ROS2 代码。当你想从节点代码中以命令式方式驱动 ROS2 —— 类型化结构体、显式请求/响应、目标句柄 —— 时,请使用各语言的原生 API。它们封装同一套 ros2-client 栈;消息类型在构建时从 AMENT_PREFIX_PATH 上的 .msg/.srv/.action 文件生成。
三种语言都支持话题发布/订阅、服务客户端 + 服务端,以及动作客户端 + 服务端。完整的可运行示例位于 examples/ros2-bridge/{rust,python,c++}/。
Python (dora.Ros2Context)
Python 节点创建一个 ROS2 上下文 + 节点,然后创建发布者/订阅、服务/动作的客户端/服务端,或参数。数据为 PyArrow。
from dora import Node, Ros2Context, Ros2NodeOptions, Ros2QosPolicies
import pyarrow as pa
dora_node = Node() # connect to the dora daemon FIRST
ctx = Ros2Context()
node = ctx.new_node("my_node", "/", Ros2NodeOptions(rosout=True))
qos = Ros2QosPolicies(reliable=True, keep_last=1)
| 方法 | 用途 |
|---|---|
ctx.new_node(name, namespace, options, parameters=None) | 创建 ROS2 节点;parameters 声明初始 ROS2 参数 |
node.create_topic(name, type, qos) / create_publisher(topic) / create_subscription(topic) | Topic pub/sub |
node.create_service_client(name, type, qos=None) → Ros2ServiceClient | .call(request, timeout_s=None) |
node.create_service_server(name, type, qos=None) → Ros2ServiceServer | .take_request() → (id, req) / None, then .send_response(id, resp) |
node.create_action_client(name, type, qos) → Ros2ActionClient | .send_goal(goal) → goal_id, .take_feedback(goal_id), .take_result(goal_id), .cancel(goal_id=None) |
node.create_action_server(name, type, qos) → Ros2ActionServer | .take_goal() → (goal_id, goal), .send_feedback(goal_id, fb), .send_result(goal_id, result, status=None), .take_cancel() |
node.set_parameter / get_parameter / list_parameters / has_parameter | 运行时参数(同时提供给 ros2 param) |
服务客户端示例(调用外部 ROS2 AddTwoInts):
client = node.create_service_client("/add_two_ints", "example_interfaces/AddTwoInts", qos)
client.wait_for_service()
resp = client.call(pa.array([{"a": 2, "b": 3}]))
print(resp[0]["sum"].as_py()) # 5
参数(在节点创建时声明,随后提供给 ros2 param):
node = ctx.new_node("demo", "/", Ros2NodeOptions(rosout=True),
parameters={"speed": 1.5, "name": "robot"})
node.set_parameter("speed", 2.0)
print(node.get_parameter("speed"), node.list_parameters())
阻塞调用(call、take_*)在等待时会释放 GIL。参见 examples/ros2-bridge/python/{service-client,service-server,action-client,action-server,parameter}/。
Rust (dora-ros2-bridge)
A Rust node uses ros2-client directly with the generated message structs from dora_ros2_bridge::messages::<package>. Topic, service, and action client/server are all native ros2-client calls; the action server uses AsyncActionServer<A> (no extra codegen needed).
#![allow(unused)]
fn main() {
use dora_ros2_bridge::{ros2_client, messages::example_interfaces::action::Fibonacci};
let server = ros_node.create_action_server::<Fibonacci>(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new("/", "fibonacci").unwrap(),
&ros2_client::ActionTypeName::new("example_interfaces", "Fibonacci"),
action_qos,
)?;
}
参见 examples/ros2-bridge/rust/{topic-pub,topic-sub,service-client,service-server,action-client,action-server,turtle}/。
C++ (cxx codegen)
A C++ node includes the generated headers and calls package-namespaced creation functions; data is rust::Box/generated structs. Service and action servers are driven by a matches / downcast event loop, mirroring the clients.
#include "ros2-bridge/msg/example_interfaces.h"
auto server = example_interfaces::create_Fibonacci_action_server(
*node, "/", "fibonacci", qos, merged_events);
// in the event loop:
if (server->matches(event)) {
auto goal_event = server->downcast(std::move(event));
auto goal = goal_event->get_goal();
// ... compute ...
server->publish_feedback(goal_event->get_goal_id(), feedback);
server->send_result(goal_event->get_goal_id(), ActionStatusEnum::Succeeded, result);
}
| C++ surface | Creation function |
|---|---|
| topic | create_topic_<pkg>_<Type> + create_publisher / create_subscription |
| service client | create_client_<pkg>_<Name> → wait_for_service / send_request / matches / downcast |
| service server | create_service_server_<pkg>_<Name> → matches / downcast (get_request / get_id) / send_response |
| action client | <pkg>::create_<Name>_action_client → wait_for_action / send_goal / request_result / matches_* / downcast_* |
| action server | <pkg>::create_<Name>_action_server → matches / downcast (get_goal / get_goal_id) / publish_feedback / send_result |
参见 examples/ros2-bridge/c++/{turtle,service-server,action-client,action-server}/。
Discovery & RMW notes (native servers)
- 对于动作(action),由 Dora 托管的服务端无法被真实的
rcl/rclcpp/rclpy客户端发现(ros2-client#4):因此动作示例会在同一数据流中将 Dora 服务端与 Dora 客户端配对。相比之下,服务(service)服务端可以被真实的ros2客户端正常发现。 - RMW 必须匹配。 服务/动作的关联机制在 Fast-DDS(
Enhanced)与 Cyclone 映射之间不同;桥接根据RMW_IMPLEMENTATION选择映射(原生 API 在代码生成时确定,YAML 桥接在运行时确定)。不匹配会导致调用挂起。请在两端一致地设置RMW_IMPLEMENTATION。 ros2-client中没有wait_for_action_server;如果没有服务端存在,客户端的首次send_goal只会超时。
Limitations and Known Constraints
这些约束适用于 YAML / 动态桥接。原生代码 API 的相关说明见 发现与 RMW 注意事项(特别是:Python 动作客户端/服务端确实支持取消)。
- 动作服务端自动接受:所有传入的目标都会被自动接受。处理节点无法在执行开始前拒绝目标。
- 不支持动作取消(YAML 桥接):YAML 动作桥接不处理 ROS2 取消请求(Python 原生动作 API 支持)。
- 没有
wait_for_action_server:ros2_client库不提供此 API。请在数据流之前启动动作服务端。若服务端不可用,首个目标会超时(30 秒)。 - 原生动作服务端发现(ros2-client#4):由 Dora 托管的动作服务端无法被真实的基于
rcl的客户端发现;请将其与 Dora 客户端配对。服务(service)服务端不受影响。 - 单飞服务客户端:服务客户端按顺序处理请求 —— 每个请求都会阻塞直到响应到达(或在 30 秒后超时)。
- 服务/动作通道的 QoS 统一:
qos配置应用于所有服务/动作子通道(goal、result、cancel、feedback、status)。无法按通道单独配置 QoS。 - 需要
AMENT_PREFIX_PATH:若找不到 ROS2 消息定义,桥接会在启动时失败。 - 最多 64 个话题:多话题模式下每个桥接节点最多支持 64 个话题。
- 最多 8 个并发动作目标:达到上限时,多余的目标会收到
Aborted状态。 - 最多 64 个待处理服务请求(服务端):队列满时请求会被丢弃。
最佳实践
运行前请 source 你的 ROS2 环境。 确保已设置 AMENT_PREFIX_PATH 并包含所有所需的消息包。若找不到定义,桥接会记录错误。
在数据流之前启动动作服务端。 动作服务端没有等待机制。若服务端未就绪,首次目标发送会在 30 秒后超时。
对相关话题使用多话题模式。 在同一个桥接节点上桥接 /turtle1/pose(订阅)和 /turtle1/cmd_vel(发布),相比两个独立的桥接节点可减少资源占用。
精确匹配 Arrow 字段名。 桥接会校验 Arrow 结构体字段名与 ROS2 消息定义一致。缺失字段使用默认值(数字为零、字符串为空)。多余字段会导致错误。
在多话题模式下使用显式的 output/input。 默认 ID 映射(去掉 /、将 / 替换为 _)对层级较深的话题名可能造成困惑。显式 ID 让数据流 YAML 自带说明。
设置 QoS 以匹配 ROS2 发布者/订阅者。 QoS 不匹配(例如可靠订阅者搭配尽力而为发布者)会导致无声的通信失败。可用 ros2 topic info -v /topic_name 查看现有的 QoS 设置。
在服务响应中透传 request_id。 桥接使用 request_id 元数据参数将响应关联到请求。若处理节点未在响应元数据中包含 request_id,桥接将无法把响应匹配到原始的 ROS2 请求。