Kalman filter is commonly used for robot or vehicle's state estimation in autonomous navigation. In this entry, I'm introducing a simple Python sample code of 1D position and velocity estimation by Kalman filter.
Theory of Kalman filter
The problem the sample program handles is to estimate a moving vehicle's 1D position and velocity under the following conditions.
- The vehicle is moving straight forward. The target velocity is maximum 10km/h and acceleration is 5m/s2.
- A Laser range finder is located at the origin of the vehicles position. This laser can measure a distance to the vehicle.
- The vehicle's acceleration includes an error variance 0.5.
- The Laser measurement includes an error variance 5.0.
Source code
I'm explaining about the implementation of the source code as follow.
Each parameters for Kalman filter computation are defined as follow.
Motion model
According to the theory, Kalman filter is separated into 2 steps, prediction and updating. For the prediction step, the vehicle's motion model is defined as follow. This model is used for predicting the vehicle's position, velocity and error covariance at the next time. And then, the model is defined as constant acceleration linear motion model.
class MotionModel: """ Class to predict state (position and speed) by Linear Motion Model """ def __init__(self, interval_sec, input_noise_variance): """ Constructor interval_sec: interval time[s] to update this simulation input_noise_variance: variance of noise included in input acceleration """ # set inputs as parameters self.DT = interval_sec self.Q = np.array([[input_noise_variance]]) self.define_matrix_in_state_equation() def define_matrix_in_state_equation(self): self.F = np.array([[1.0, self.DT], [0.0, 1.0]]) self.G = np.array([[(self.DT ** 2)/2], [self.DT]]) def calculate_state(self, x_prev, u): """ Function of state equation This is used to predict vehicle's position and speed at next time by using acceleration input x_{t+1} = F * x_t + G * u_t x_prev: position and speed at previous time, x_t u: acceleration input, u_t """ x_next = self.F @ x_prev + self.G @ u return x_next def calculate_covariance(self, p_prev): """ Function to predict covariance of position and speed p_{t+1} = F * p^_t * F^T + G * Q * G^T p_prev: estimated covariance at previous time, p^_t """ p_next = self.F @ p_prev @ self.F.T + self.G @ self.Q @ self.G.T return p_next
Observation model
Secondly, I defined an observation model to generate dummy observation data by Laser including some random noise and to compute an observation prediction value as follow.
class ObservationModel: """ Class to simulate position info obseved by LiDAR and to predict observation at predicted position, x_{t+1} """ def __init__(self, observation_noise_variance): """ Constructor observation_noise_variance: variance of noise included in observation """ # set input as parameter self.R = np.array([[observation_noise_variance]]) # define matrix in observation equation self.H = np.array([[1.0, 0.0]]) def observe(self, x_true): """ Function to simulate observation by including noise variance parameter, R x_true: true position """ return x_true[0, 0] + np.random.randn() * self.R def calculate_observation(self, x_pred): """ Function to predict observation at predicted position x_pred: predicted position by state equation, x_{t+1} """ z_pred = self.H @ x_pred return z_pred
Kalman filter module
Finally, I defined a Kalman filter module class to estimate the vehicle's position and velocity. This class execute the following multiple steps.
- Predicting the vehicle's position, velocity and error covariance at next time based on motion model.
- Computing innovation between an observation from Laser and predicted observation based on the above predicted position.
- Computing a covariance of observation prediction error.
- Computing kalman gain to correct the position and velocity.
- Updating the vehicle's position, velocity and error covariance at next time based on the kalman gain.
class KalmanFilter1dPosSpd: """ Class to estimate state (position and speed) by Linear Kalman Filter """ def __init__(self, motion_model, observation_model): """ Constructor motion_model: motion model to predict state observation_model: model to predict observation at predicted position """ = motion_model = observation_model def calculate_innovation(self, z, x_pred): """ Function to calculate innovation dz = z_{t+1} - H * x_{t+1} x_pred: predicted position with previous data, x_{t+1} """ delta_z = z - return delta_z def calculate_obsrv_pred_err_cov(self, p_pred): """ Function to calculate observation prediction error variance s_{t+1} = H * p_{t+1} * H^T + R p_pred: predicted variance with previous data, p_{t+1} """ p_obsrv_pred_err = @ p_pred @ + return p_obsrv_pred_err def calculate_kalman_gain(self, p_pred, p_obsrv_pred_err): """ Function to calculate kalman gain k_{t+1} = p_{t+1} * H^T * s_{t+1}^{-1} p_pred: predicted variance with previous data, p_{t+1} p_obsrv_pred_err: observation prediction error variance, s_{t+1} """ kalman_gain = p_pred @ @ np.linalg.inv(p_obsrv_pred_err) return kalman_gain def calculate_state(self, x_pred, delta_z, kalman_gain): """ Function to calculate position and speed x^_{t+1} = x_{t+1} + k_{t+1} * dz x_pred: predicted position and speed with previous data, x^_{t+1} delta_z: difference between observation and predicted observation, dz kalman_gain: kalman gain to update position and velocity, k_{t+1} """ x_updated = x_pred + kalman_gain @ delta_z return x_updated def calculate_covariance(self, p_pred, kalman_gain, p_obsrv_pred_err): """ Function to calculate covariane of position and speed p^_{t+1} = p_{t+1} - k_{t+1} * s_{t+1} * k_{t+1}^T p_pred: predicted covariance with previous data, p^_{t+1} kalman_gain: kalman gain to calculate position and speed, k_{t+1} p_obsrv_pred_err: observation prediction error variance, s_{t+1} """ p_updated = p_pred - kalman_gain @ p_obsrv_pred_err @ kalman_gain.T return p_updated def estimate_state(self, x_prev, p_prev, u, z): """ Function to estimate state and covariance by linear kalman filter Step 1: update state and covariance with previous data Step 2: update state and covariance with current observation x_prev: estimated state at previous time p_prev: estimated covariance at previous time u: acceleration input including gaussian noise z: observed position at current time including gaussian noise """ # update with previous data x_pred =, u) p_pred = # update with current observation delta_z = self.calculate_innovation(z, x_pred) p_obsrv_pred_err = self.calculate_obsrv_pred_err_cov(p_pred) kalman_gain = self.calculate_kalman_gain(p_pred, p_obsrv_pred_err) x_updated = self.calculate_state(x_pred, delta_z, kalman_gain) p_updated = self.calculate_covariance(p_pred, kalman_gain, p_obsrv_pred_err) return x_updated, p_updated
For simulation, I defined the following main function by using the above motion model, observation model and Kalman filter module.
def decide_input_accel(elapsed_time_sec): """ Function to decide acceleration input Increasing speed until time limit passed elapsed_time_sec: current elapsed time[sec] """ if elapsed_time_sec <= TIME_LIMIT_ACCEL_SEC: input_accel_ms2 = INPUT_ACCEL_MS2 else: input_accel_ms2 = 0.0 return np.array([[input_accel_ms2]]) def add_noise_input_accel(input_accel_ms2): """ Function to generate acceleration input including gaussian noise input_accel_ms2: true acceleration input """ input_accel_noise_ms2 = input_accel_ms2[0, 0] + np.random.randn() * INPUT_NOISE_VARIANCE if input_accel_noise_ms2 < 0.0: input_accel_noise_ms2 = 0.0 return np.array([[input_accel_noise_ms2]]) def main(): """ main function """ print(__file__ + " start!!") # generate instances mm = MotionModel(interval_sec=INTERVAL_SEC, input_noise_variance=INPUT_NOISE_VARIANCE) om = ObservationModel(observation_noise_variance=OBSERVATION_NOISE_VARIANCE) kf = KalmanFilter1dPosSpd(motion_model=mm, observation_model=om) # initialize data input_accel_ms2 = 0.0 elapsed_time_sec, time_list = 0.0, [] true_state, true_pos_list, true_vel_list = np.zeros((2, 1)), [], [] pred_state, pred_pos_list, pred_vel_list = np.zeros((2, 1)), [], [] obsrv, obsrv_list = 0.0, [] est_state = np.zeros((2, 1)) est_cov = np.array([[OBSERVATION_NOISE_VARIANCE, 0.0], [0.0, INPUT_NOISE_VARIANCE]]) est_pos_list, est_vel_list, est_pos_cov_list, est_vel_cov_list = [], [], [], [] # initialize plot fig = plt.figure() # position plot ax_position = fig.add_subplot(2, 2, 1) ax_position.set_xlabel("Time[s]") ax_position.set_ylabel("Position[m]") ax_position.grid(True) # position variance plot ax_pos_var = fig.add_subplot(2, 2, 2) ax_pos_var.set_xlabel("Time[s]") ax_pos_var.set_ylabel("Pos Var") ax_pos_var.grid(True) # velocity plot ax_veloicity = fig.add_subplot(2, 2, 3) ax_veloicity.set_xlabel("Time[s]") ax_veloicity.set_ylabel("Speed[m/s]") ax_veloicity.grid(True) # velocity variance plot ax_vel_var = fig.add_subplot(2, 2, 4) ax_vel_var.set_xlabel("Time[s]") ax_vel_var.set_ylabel("Spd Var") ax_vel_var.grid(True) # simulate while TIME_LIMIT_SEC >= elapsed_time_sec: input_accel_ms2 = decide_input_accel(elapsed_time_sec=elapsed_time_sec) input_accel_noise_ms2 = add_noise_input_accel(input_accel_ms2) # calculate data obsrv = om.observe(true_state) true_state = mm.calculate_state(x_prev=true_state, u=input_accel_ms2) pred_state = mm.calculate_state(x_prev=pred_state, u=input_accel_noise_ms2) est_state, est_cov = kf.estimate_state(x_prev=est_state, p_prev=est_cov, u=input_accel_noise_ms2, z=obsrv) # record data time_list.append(elapsed_time_sec) obsrv_list.append(obsrv[0, 0]) true_pos_list.append(true_state[0, 0]), true_vel_list.append(true_state[1, 0]) pred_pos_list.append(pred_state[0, 0]), pred_vel_list.append(pred_state[1, 0]) est_pos_list.append(est_state[0, 0]), est_vel_list.append(est_state[1, 0]) est_pos_cov_list.append(est_cov[0, 0]), est_vel_cov_list.append(est_cov[1, 1]) # increment time elapsed_time_sec += INTERVAL_SEC # plot data ax_position.plot(time_list, obsrv_list, color='g', lw=2, label="Observed Position") ax_position.plot(time_list, pred_pos_list, color='m', lw=2, label="Pred Position") ax_position.plot(time_list, true_pos_list, color='b', lw=2, label="True Position") ax_position.plot(time_list, est_pos_list, color='r', lw=2, label="Estimated Position") ax_position.legend() ax_veloicity.plot(time_list, pred_vel_list, color='m', lw=2, label="Pred Speed") ax_veloicity.plot(time_list, true_vel_list, color='b', lw=2, label="True Speed") ax_veloicity.plot(time_list, est_vel_list, color='r', lw=2, label="Estimated Speed") ax_veloicity.legend() ax_pos_var.plot(time_list, est_pos_cov_list, color='r', lw=2, label="Est Pos Variance") ax_pos_var.legend() ax_vel_var.plot(time_list, est_vel_cov_list, color='r', lw=2, label="Est Spd Variance") ax_vel_var.legend() fig.tight_layout() # only when show plot flag is true, show output graph # when unit test is executed, this flag become false # and the graph is not shown if show_plot: return True # execute main process if __name__ == "__main__": main()
You can get the following simulation result by executing the simulation.
The upper left plot is an estimation result of the vehicle's position. Green is observation by Laser, blue is true position, purple is predicted position and red is estimated position.
The upper right plot is an error covariance of the vehicle's position. You can see that the covariance is getting smaller gradually.
The lower left plot is an estimation result of the vehicle's velocity. Blue is true velocity, purple is predicted velocity and red is estimated velocity.
The lower right plot is an error covariance of the vehicle's velocity. You can see that the covariance is getting smaller gradually.