跳过主要内容

PID Control (PID:偏差的比例P-积分I-微分D 控制) 算子

PID Control (PID:偏差的比例P-积分I-微分D 控制) 算子计算需要执行的命令以跟随给定的航点。 它以加速的方式对汽车当前速度和位置做出反应,或根据先前的输入进行刹车。

输入

  • 要跟随的航点坐标。

输出

  • 油门、转向 (RAD) 和刹车。

图描述

  - id: pid_control_op
operator:
python: ../../operators/pid_control_op.py
outputs:
- control
inputs:
position: oasis_agent/position
speed: oasis_agent/speed
waypoints: fot_op/waypoints

图可视化

考虑修改超参数

见:https://en.wikipedia.org/wiki/PID_controller

pid_p = 0.1
pid_d = 0.0
pid_i = 0.05
dt = 1.0 / 20

方法

__init__()

源码
    def __init__(self):
self.waypoints = []
self.target_speeds = []
self.metadata = {}
self.position = []
self.speed = []
self.previous_position = []
self.current_speed = []
self.previous_time = time.time()


.on_event(...)

源码

def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE


.on_input(...)

输入句柄。 参数: dora_input["id"] (str): yaml 配置中声明的输入的 Id dora_input["value"] (arrow.array(UInt8)): 字节形式的输入消息 send_output (Callable[[str, bytes]]): 函数使输出发送回dora。

源码

def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes], None],
):
"""输入句柄。
参数:
dora_input["id"] (str): yaml 配置中声明的输入的 Id
dora_input["value"] (arrow.array(UInt8)): 字节形式的输入消息
send_output (Callable[[str, bytes]]): 函数使输出发送回dora。
"""

if "position" == dora_input["id"]:
self.position = dora_input["value"].to_numpy()
return DoraStatus.CONTINUE

elif dora_input["id"] == "speed":
self.speed = np.array(dora_input["value"])
return DoraStatus.CONTINUE

elif "waypoints" == dora_input["id"]:
waypoints = dora_input["value"].to_numpy()
waypoints = waypoints.reshape((-1, 3))

self.target_speeds = waypoints[:, 2]
self.waypoints = waypoints[:, :2]
self.metadata = dora_input["metadata"]

if len(self.position) == 0 or len(self.speed) == 0:
return DoraStatus.CONTINUE

if len(self.waypoints) == 0:
send_output(
"control",
pa.array(np.array([0, 0, 1], np.float16).ravel()),
self.metadata,
)
return DoraStatus.CONTINUE

[x, y, _, rx, ry, rz, rw] = self.position
[_, _, yaw] = R.from_quat([rx, ry, rz, rw]).as_euler("xyz", degrees=False)
distances = pairwise_distances(self.waypoints, np.array([[x, y]])).T[0]

index = distances > MIN_PID_WAYPOINT_DISTANCE
self.waypoints = self.waypoints[index]
self.target_speeds = self.target_speeds[index]
distances = distances[index]

if len(self.waypoints) == 0:
target_angle = 0
target_speed = 0
else:
argmin_distance = np.argmin(distances)

## 检索离转向距离最近的点
target_location = self.waypoints[argmin_distance]

target_speed = self.target_speeds[argmin_distance]

## 计算转向角度
target_vector = target_location - [x, y]

target_angle = get_angle(
math.atan2(target_vector[1], target_vector[0]), yaw
)

throttle, brake = compute_throttle_and_brake(
pid, LA.norm(self.speed), target_speed
)

send_output(
"control",
pa.array(np.array([throttle, target_angle, brake], np.float16)),
self.metadata,
)
return DoraStatus.CONTINUE