PID Control operator
pid
control operator computes the command that needs to be executed to follow the given waypoints.
It reacts to the car current speed and position in a way that accelerates or brake according to previous inputs.
Inputs
- waypoints coordinates to follow.
Outputs
- throttle, steering (rad) and braking.
Graph Description
- id: pid_control_op
operator:
python: ../../operators/pid_control_op.py
outputs:
- control
inputs:
position: oasis_agent/position
speed: oasis_agent/speed
waypoints: fot_op/waypoints
Graph Viz
Hyperparameters consider changing
See: https://en.wikipedia.org/wiki/PID_controller
pid_p = 0.1
pid_d = 0.0
pid_i = 0.05
dt = 1.0 / 20
Methods
__init__()
Source Code
def __init__(self):
self.waypoints = []
self.target_speeds = []
self.metadata = {}
self.position = []
self.speed = []
self.previous_position = []
self.current_speed = []
self.previous_time = time.time()
.on_event(...)
Source Code
def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE
.on_input(...)
Handle input. Args: dora_input["id"] (str): Id of the input declared in the yaml configuration dora_input["value"] (arrow.array(UInt8)): Bytes message of the input send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
Source Code
def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes], None],
):
"""Handle input.
Args:
dora_input["id"] (str): Id of the input declared in the yaml configuration
dora_input["value"] (arrow.array(UInt8)): Bytes message of the input
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
"""
if "position" == dora_input["id"]:
self.position = dora_input["value"].to_numpy()
return DoraStatus.CONTINUE
elif dora_input["id"] == "speed":
self.speed = np.array(dora_input["value"])
return DoraStatus.CONTINUE
elif "waypoints" == dora_input["id"]:
waypoints = dora_input["value"].to_numpy()
waypoints = waypoints.reshape((-1, 3))
self.target_speeds = waypoints[:, 2]
self.waypoints = waypoints[:, :2]
self.metadata = dora_input["metadata"]
if len(self.position) == 0 or len(self.speed) == 0:
return DoraStatus.CONTINUE
if len(self.waypoints) == 0:
send_output(
"control",
pa.array(np.array([0, 0, 1], np.float16).ravel()),
self.metadata,
)
return DoraStatus.CONTINUE
[x, y, _, rx, ry, rz, rw] = self.position
[_, _, yaw] = R.from_quat([rx, ry, rz, rw]).as_euler("xyz", degrees=False)
distances = pairwise_distances(self.waypoints, np.array([[x, y]])).T[0]
index = distances > MIN_PID_WAYPOINT_DISTANCE
self.waypoints = self.waypoints[index]
self.target_speeds = self.target_speeds[index]
distances = distances[index]
if len(self.waypoints) == 0:
target_angle = 0
target_speed = 0
else:
argmin_distance = np.argmin(distances)
## Retrieve the closest point to the steer distance
target_location = self.waypoints[argmin_distance]
target_speed = self.target_speeds[argmin_distance]
## Compute the angle of steering
target_vector = target_location - [x, y]
target_angle = get_angle(
math.atan2(target_vector[1], target_vector[0]), yaw
)
throttle, brake = compute_throttle_and_brake(
pid, LA.norm(self.speed), target_speed
)
send_output(
"control",
pa.array(np.array([throttle, target_angle, brake], np.float16)),
self.metadata,
)
return DoraStatus.CONTINUE