dora.dora
ROS2 Context holding all messages definition for receiving and sending messages to ROS2.
By default, Ros2Context will use env AMENT_PREFIX_PATH
to search for message definition.
AMENT_PREFIX_PATH folder structure should be the following:
- For messages:
/msg/ .msg - For services:
/srv/ .srv
You can also use ros_paths
if you don't want to use env variable.
warning:: dora Ros2 bridge functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
context = Ros2Context()
Create a new ROS2 node
ros2_node = ros2_context.new_node(
"turtle_teleop",
"/ros2_demo",
Ros2NodeOptions(rosout=True),
)
warning:: dora Ros2 bridge functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
ROS2 Node
warnings::
- dora Ros2 bridge functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
- There's a known issue about ROS2 nodes not being discoverable by ROS2 See: https://github.com/jhelovuo/ros2-client/issues/4
Create a ROS2 topic to connect to.
turtle_twist_topic = ros2_node.create_topic(
"/turtle1/cmd_vel", "geometry_msgs/Twist", topic_qos
)
ROS2 Node Options
ROS2 Topic
warnings:
- dora Ros2 bridge functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
ROS2 Publisher
warnings:
- dora Ros2 bridge functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
ROS2 Subscription
warnings:
- dora Ros2 bridge functionality is considered unstable. It may be changed at any point without it being considered a breaking change.
ROS2 QoS Policy
DDS 2.2.3.4 DURABILITY
DDS 2.2.3.11 LIVELINESS
Start a runtime for Operators
The custom node API lets you integrate dora
into your application.
It allows you to retrieve input and send output in any fashion you want.
Use with:
from dora import Node
node = Node()
.next()
gives you the next input that the node has received.
It blocks until the next event becomes available.
You can use timeout in seconds to return if no input is available.
It will return None
when all senders has been dropped.
event = node.next()
You can also iterate over the event stream with a loop
for event in node:
match event["type"]:
case "INPUT":
match event["id"]:
case "image":
send_output
send data from the node.
Args:
output_id: str,
data: pyarrow.Array,
metadata: Option[Dict],
ex:
node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})