FOT( Frenet Optimal Planner, Frenet 最优规划师) 算子
FOT( Frenet Optimal Planner, Frenet 最优规划师) 算子,基于 https://github.com/erdos-project/frenet_optimal_trajectory_planner/ 并包装不同的元素障碍物
、位置
、速度
...转换为 Frenet 消费格式。
FOT 输入为:
initial_conditions = {
"ps": 0,
"target_speed": # 目标速度
"pos": # x, y 当前位置
"vel": # vx, vy 当前速度
"wp": # [[x, y], ... n_waypoints ] 所需航点
"obs": # [[min_x, min_y, max_x, max_y], ... ] 路上障碍
}
下面还有一组 超参数。
由于我们的障碍物被定义为 3D 点,我们需要将这些点转换为 [min_x, min_y, max_x, max_y]
格式。 我们在 get_obstacle_list
函数中执行此操作。 这种近似值非常基本,可能需要重新审视。
输出要么是成功的轨迹,我们可以将其输入到 PID 中。 或者它是失败的,在这种情况下,我们将当前位置作为航点发送。
输入
- 图像: 高 x 宽 x BGR array。
输出
- 航点:x_points, y_points, 速度
示例图像
图描述
- id: fot_op
operator:
python: ../../operators/fot_op.py
outputs:
- waypoints
inputs:
position: oasis_agent/position
speed: oasis_agent/speed
obstacles: obstacle_location_op/obstacles
gps_waypoints: carla_gps_op/gps_waypoints
图可视化
方法
__init__()
源码
def __init__(self):
self.obstacles = np.array([])
self.lanes = np.array([])
self.position = []
self.speed = []
self.last_position = []
self.waypoints = []
self.gps_waypoints = np.array([])
self.last_obstacles = np.array([])
self.obstacle_metadata = {}
self.gps_metadata = {}
self.metadata = {}
self.orientation = None
self.outputs = []
self.hyperparameters = {
"max_speed": 25.0,
"max_accel": 45.0,
"max_curvature": 55.0,
"max_road_width_l": 0.1,
"max_road_width_r": 0.1,
"d_road_w": 0.5,
"dt": 0.5,
"maxt": 5.0,
"mint": 2.0,
"d_t_s": 5,
"n_s_sample": 2.0,
"obstacle_clearance": 0.1,
"kd": 1.0,
"kv": 0.1,
"ka": 0.1,
"kj": 0.1,
"kt": 0.1,
"ko": 0.1,
"klat": 1.0,
"klon": 1.0,
"num_threads": 0, # 设置 0 以避免使用线程算法
}
self.conds = {
"s0": 0,
"target_speed": TARGET_SPEED,
} # 粘贴调试日志的输出
.on_event(...)
源码
def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE
.on_input(...)
源码
def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes], None],
):
if dora_input["id"] == "position":
self.last_position = self.position
self.position = np.array(dora_input["value"])
if len(self.last_position) == 0:
self.last_position = self.position
return DoraStatus.CONTINUE
elif dora_input["id"] == "speed":
self.speed = np.array(dora_input["value"])
return DoraStatus.CONTINUE
elif dora_input["id"] == "obstacles":
obstacles = np.array(dora_input["value"]).reshape((-1, 5))
if len(self.last_obstacles) > 0:
self.obstacles = np.concatenate([self.last_obstacles, obstacles])
else:
self.obstacles = obstacles
elif dora_input["id"] == "global_lanes":
lanes = np.array(dora_input["value"]).reshape((-1, 60, 3))
self.lanes = lanes
return DoraStatus.CONTINUE
elif "gps_waypoints" == dora_input["id"]:
waypoints = np.array(dora_input["value"])
waypoints = waypoints.reshape((-1, 3))[:, :2]
self.gps_waypoints = waypoints
return DoraStatus.CONTINUE
if len(self.gps_waypoints) == 0:
print("No waypoints")
send_output(
"waypoints",
self.gps_waypoints.tobytes(),
dora_input["metadata"],
)
return DoraStatus.CONTINUE
elif len(self.position) == 0 or len(self.speed) == 0:
return DoraStatus.CONTINUE
[x, y, z, rx, ry, rz, rw] = self.position
[_, _, yaw] = R.from_quat([rx, ry, rz, rw]).as_euler("xyz", degrees=False)
gps_obstacles = get_obstacle_list(
self.position, self.obstacles, self.gps_waypoints
)
if len(self.lanes) > 0:
lanes = get_lane_list(self.position, self.lanes, self.gps_waypoints)
obstacles = np.concatenate([gps_obstacles, lanes])
else:
obstacles = gps_obstacles
initial_conditions = {
"ps": 0,
"target_speed": self.conds["target_speed"],
"pos": self.position[:2],
"vel": (np.clip(LA.norm(self.speed), 0.5, 40))
* np.array([np.cos(yaw), np.sin(yaw)]),
"wp": self.gps_waypoints,
"obs": obstacles,
}
(
result_x,
result_y,
speeds,
ix,
iy,
iyaw,
d,
s,
speeds_x,
speeds_y,
misc,
costs,
success,
) = fot_wrapper.run_fot(initial_conditions, self.hyperparameters)
if not success:
initial_conditions["wp"] = initial_conditions["wp"][:5]
print(f"fot failed. stopping with {initial_conditions}.")
target_distance = LA.norm(self.gps_waypoints[-1] - self.position[:2])
print(f"Distance to target: {target_distance}")
for obstacle in self.obstacles:
print(f"obstacles:{obstacle}, label: {LABELS[int(obstacle[-1])]}")
send_output(
"waypoints",
pa.array(np.array([x, y, 0.0], np.float32)),
dora_input["metadata"],
)
return DoraStatus.CONTINUE
self.waypoints = np.concatenate([result_x, result_y]).reshape((2, -1)).T
self.outputs = np.ascontiguousarray(
np.concatenate([result_x, result_y, speeds])
.reshape((3, -1))
.T.astype(np.float32)
)
send_output(
"waypoints",
pa.array(self.outputs.ravel()),
dora_input["metadata"],
)
return DoraStatus.CONTINUE