跳过主要内容

Obstacle location (障碍物位置) 算子

Obstacle location (障碍物位置) 算子匹配带有深度框架的边界框,用于查找障碍物的近似位置。

其中有两个逻辑:

  • 一个是用于车道检测的接地点。
  • 一个是边界框障碍物定位。

这两种逻辑都是基于对激光雷达 3D 点在 2D 空间中的投影进行计算,然后重用索引来获取 3D 位置。

  • 在接地点检测的情况下,近似值基于一个 K邻近回归,因为我们可能没有足够的地面数据。
  • 在边界框的情况下,我们使用边界框内的第一个分位数最近点来估计距离。 我们使用第一个分位数最近点来消除噪声。

将激光雷达点云投影到 2D 中的机械性也用于 plot.py 算子中。 您可以使用其中输入 lidar_pc 帮助您进行调试。

输入

  • 2D 障碍物边界框。

输出

  • 障碍物的 3D 位置作为点。

示例绘制 (边界框中间的绿点)

Imgur

图描述

  - id: obstacle_location_op
operator:
outputs:
- obstacles
inputs:
lidar_pc: oasis_agent/lidar_pc
obstacles_bbox: yolov5/bbox
position: oasis_agent/position
python: ../../operators/obstacle_location_op.py

图可视化

方法

__init__()

源码
    def __init__(self):
self.point_cloud = []
self.camera_point_cloud = []
self.ground_point_cloud = []
self.camera_ground_point_cloud = []
self.last_point_cloud = []
self.last_camera_point_cloud = []
self.obstacles = []
self.obstacles_bbox = []
self.position = []
self.lanes = []


.on_event(...)

源码

def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE


.on_input(...)

源码

def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes], None],
):
if "lidar_pc" == dora_input["id"]:
point_cloud = np.array(dora_input["value"])
point_cloud = point_cloud.reshape((-1, 3))

# 从 Velodyne激光雷达轴 至 相机轴
# 从 Velodyne激光雷达轴:
# x -> 向前, y -> 向右, z -> 至顶
# 至 相机轴:
# x -> 向右, y -> 至底, z -> 向前
point_cloud = np.dot(
point_cloud,
VELODYNE_MATRIX,
)

# 仅 向前 点 ( forward = z > 0.1 )
point_cloud = point_cloud[np.where(point_cloud[:, 2] > 0.1)]

# 移除地面点。 Above lidar only ( bottom = y < 1.0 )
above_ground_point_index = np.where(point_cloud[:, 1] < 1.0)
point_cloud = point_cloud[above_ground_point_index]
self.ground_point_cloud = point_cloud[above_ground_point_index == False]

# 3D 数组 -> 2D 数组 与 index_x -> pixel x, index_y -> pixel_y, value -> z
camera_point_cloud = local_points_to_camera_view(
point_cloud, INTRINSIC_MATRIX
).T
self.camera_ground_point_cloud = local_points_to_camera_view(
self.ground_point_cloud, INTRINSIC_MATRIX
).T

self.camera_point_cloud = camera_point_cloud
self.point_cloud = point_cloud

elif "position" == dora_input["id"]:
# 添加传感器变换
self.position = dora_input["value"].to_numpy()
self.extrinsic_matrix = get_extrinsic_matrix(
get_projection_matrix(self.position)
)

elif "lanes" == dora_input["id"]:
lanes = np.array(dora_input["value"]).reshape((-1, 60, 2))

knnr = KNeighborsRegressor(n_neighbors=4)
knnr.fit(self.camera_ground_point_cloud[:, :2], self.ground_point_cloud)

processed_lanes = []
for lane in lanes:
lane_location = knnr.predict(lane)
lane_location = np.array(lane_location)

lane_location = np.hstack(
(
lane_location,
np.ones((lane_location.shape[0], 1)),
)
)
lane_location = np.dot(lane_location, self.extrinsic_matrix.T)[:, :3]
processed_lanes.append(lane_location)
processed_lanes = pa.array(np.array(processed_lanes, np.float32).ravel())

send_output("global_lanes", processed_lanes, dora_input["metadata"])

elif "obstacles_bbox" == dora_input["id"]:
if len(self.position) == 0 or len(self.point_cloud) == 0:
return DoraStatus.CONTINUE

# bbox = np.array([[min_x, max_x, min_y, max_y, confidence, label], ... n_bbox ... ])
self.obstacles_bbox = np.array(dora_input["value"]).reshape((-1, 6))

obstacles_with_location = []
for obstacle_bb in self.obstacles_bbox:
[min_x, max_x, min_y, max_y, confidence, label] = obstacle_bb
z_points = self.point_cloud[
np.where(
(self.camera_point_cloud[:, 0] > min_x)
& (self.camera_point_cloud[:, 0] < max_x)
& (self.camera_point_cloud[:, 1] > min_y)
& (self.camera_point_cloud[:, 1] < max_y)
)
]
if len(z_points) > 0:
closest_point = z_points[
z_points[:, 2].argsort()[int(len(z_points) / 4)]
]
obstacles_with_location.append(closest_point)
if len(obstacles_with_location) > 0:
obstacles_with_location = np.array(obstacles_with_location)
obstacles_with_location = np.hstack(
(
obstacles_with_location,
np.ones((obstacles_with_location.shape[0], 1)),
)
)
obstacles_with_location = np.dot(
obstacles_with_location, self.extrinsic_matrix.T
)[:, :3]

predictions = get_predictions(
self.obstacles_bbox, obstacles_with_location
)
predictions_bytes = pa.array(np.array(predictions, np.float32).ravel())

send_output("obstacles", predictions_bytes, dora_input["metadata"])
else:
send_output(
"obstacles",
pa.array(np.array([]).ravel()),
dora_input["metadata"],
)
return DoraStatus.CONTINUE