目次
目的
自分でも何故か分かりませんが、毎年一回は必ず
カルマンフィルタの理論を勉強し直したくなる事が
あります。
その度に書籍を読み返したり、サンプルプログラムを
実装したりして、「いろいろ忘れてるな」「実は理解
してなかったな」と感じるのがお決まりです。
なので、きっとこれからも定期的に学び直したくなる
ときが来ると思ったので、ブログにまとめておくことに
しました。
今回は最も基本的な問題である、線形カルマンフィルタで
1次元の自己位置推定を行うPythonプログラムを作ったので
紹介します。
理論
カルマンフィルタの詳細な理論についてはこちらを
参照ください。
ja.wikipedia.org
また、MATLABの公式YouTubeチャンネルにある
こちらの動画もアニメーション付きで分かりやすい
ので、参考になります。
www.youtube.com
問題設定
下図のように、速度vで走行する車両の原点0からの
位置xを推定することを考えます。
このときの車両の位置xは、速度vより計算する方法と、
レーザ距離計によりzとして観測する方法の2つが
あります。
ただし、速度vと観測zにはそれぞれ、分散Q, R(平均は
ともに0)の傾向を示す誤差が含まれるものとします。
以上のような誤差分散を持つ速度vと観測zを利用して、
カルマンフィルタにより車両の位置x、その誤差分散Pを
推定することを本問題の目的とします。
条件
- 速度vは10[m/s]とし、その誤差分散Qは10.0とする
- 観測zの持つ誤差分散Rは5.0とする
Pythonプログラム
それではここからはプログラムの実装について解説していきます。
パラメータの定義
まずは、今回のカルマンフィルタの計算に必要なパラメータを
下記のように定義します。
INTERVAL_SEC = 0.01 TIME_LIMIT_SEC = 10.0 INPUT_VELOCITY_MS = 10.0 INPUT_NOISE_VARIANCE = 10.0 OBSERVATION_NOISE_VARIANCE = 5.0
これらは上から順に、
- シミュレーションの刻み時間[秒]
- シミュレーションの実行時間[秒]
- 入力速度v[m/s]
- 入力速度vの誤差分散
- レーザ距離計の観測誤差分散
となっています。
クラスとコンストラクタの実装
今回の1次元位置推定をカルマンフィルタで行うクラスと、
コンストラクタを下記のように実装します。
class LinearKalmanFilter1D: """ Class to estimate 1 dimensional position by Linear Kalman Filter. """ def __init__(self, interval_sec, input_velocity_ms, input_noise_variance, observation_noise_variance): """ Constructor interval_sec: interval time[s] to update this simulation input_velocity_ms: input velocity[m/s] into state equation input_noise_variance: variance of noise included in input velocity observation_noise_variance: variance of noise inculded in observation """ # set inputs as parameters self.DT = interval_sec self.V = input_velocity_ms self.Q = input_noise_variance self.R = observation_noise_variance # define parameters for state equation self.F = 1.0 self.G = self.DT # define parameter for observation equation self.H = 1.0
先程グローバル定数で定義した各パラメータを入力引数として
与えてセットしています。
また、システムの状態方程式であるx_{t+1} = F * x_t + G * u_tに
含まれるFを1.0、Gを刻み時間DTとします。これは、x_tが位置、
u_tが速度入力である等速直線運動としているためです。
そして、センサの観測方程式であるz{t+1} = H * x{t+1}に
含まれるHも1.0とします。今回の問題では、位置xをレーザ距離計で
観測するものとしているため、理論的にはxと同じものが得られると
考えてH=1.0としています。
状態方程式の定義
次はシステムの状態方程式を下記のように実装します。
先にコンストラクタの中で定義したFとGを使い、
速度vを入力とした等速直線運動モデルで次の時刻の
位置を計算するようになっています。
def state_equation(self, x_prev, u): """ Function of state equation This is used to predict vehicle's position at next time by using velocity input x_{t+1} = F * x_t + G * u_t x_prev: position at previous time, x_t u: velocity input, u_t """ x_next = self.F * x_prev + self.G * u return x_next
カルマンフィルタでは、上記の運動モデルを用いて
システムの状態(今回の問題では車両の位置)を予測する
というステップがあります。
入力される速度にはガウス分布に従う誤差が含まれる
仮定しますが、この状態で予測をし続けると誤差が
積算されて真値とのずれがどんどん大きくなってしまいます。
今回実装するプログラムでは、車両位置の真値と、
運動モデルで予測した誤差を含む位置との違いを
比較するために、中身は同じですが名前が異なる
下記の関数を定義します。
def predict_position(self, x_prev, u): """ Function to predict position at next time Predict by inputing velocity into linear motion model Velocity input is including gaussian noise x_prev: position at previous time, x_t u: velocity input including gaussian noise, u_t """ x_pred = self.state_equation(x_prev, u) return x_pred
先に定義した関数state_equation()で真値を、
predict_position()で予測位置をそれぞれ計算
するようにします。
疑似観測値と観測方程式の定義
続いて、レーザ距離計で観測される車両位置を
シミュレートする関数を下記のように定義します。
def observation(self, x_prev): """ Function to generate observation data Simulate position data observed by LiDAR This data is including gaussian noise x_prev: position at previous time """ z = self.state_equation(x_prev, self.V) + np.random.randn() * self.R return z
先に定義した状態方程式で計算される車両位置に
観測値が含む誤差の分散Rの正規分布に従う誤差
を加えたものを、疑似的な観測値としています。
また、状態方程式によって予測した状態で
センサから得られるであろう観測値を予測する
観測方程式を下記のように定義します。
def observation_equation(self, x_pred): """ Function of observation equation This is used to predict observation data at next time z_{t+1} = H * x_{t+1} x_pred: predicted position by state equation, x_{t+1} """ z_pred = self.H * x_pred return z_pred
この観測方程式に含まれるHは、入力された状態量を
観測量に変換するためのものですが、今回の問題では
車両位置を直接観測するということにしてるので、
1.0としています。
カルマンフィルタの処理の定義
最後に、ここまで定義した状態方程式、疑似観測値、
観測方程式を用いた、カルマンフィルタによる
一連の車両位置推定の処理を実装していきます。
まずは、先に定義したカルマンフィルタクラスの
インスタンスを生成します。
kf = LinearKalmanFilter1D(interval_sec=INTERVAL_SEC, input_velocity_ms=INPUT_VELOCITY_MS, input_noise_variance=INPUT_NOISE_VARIANCE, observation_noise_variance=OBSERVATION_NOISE_VARIANCE)
次に、状態方程式に与える速度入力を下記のように
実装します。事前に定義したV=10.0[m/s]に、分散
10.0の正規分布の誤差が含まれているとします。
この時、誤差によって速度が負の値になる場合は、
0.0に制限するようにします。
input_velocity = INPUT_VELOCITY_MS + np.random.randn() * INPUT_NOISE_VARIANCE if input_velocity < 0.0: input_velocity = 0.0
次に観測値を出力させるようにします。
状態方程式から計算される真の位置をベースに、
観測誤差を含んだ疑似的な観測値が得られる
ようにします。
obsrv_pos = kf.observation(x_prev=true_pos)
また、状態方程式に誤差を含まない速度入力を
与えて計算した真の位置と、誤差を含んだ速度
入力を与えて計算した予測位置も、比較用に
それぞれ下記のように出力するようにします。
true_pos = kf.state_equation(x_prev=true_pos, u=INPUT_VELOCITY_MS) pred_pos = kf.predict_position(x_prev=pred_pos, u=input_velocity)
そして、誤差を含んだ速度と観測値を入力として、
車両位置とその誤差分散を推定する処理を実行する
ようにします。
est_pos, est_var = kf.estimate_position(x_prev=est_pos, p_prev=est_var, u=input_velocity, z=obsrv_pos)
ここで実行しているestimate_position()という関数は
下記のように実装します。
def estimate_position(self, x_prev, p_prev, u, z): """ Function to estimate position and variance by linear kalman filter Step 1: update position and variance with previous data Step 2: update position and variance with current observation x_prev: estimated position at previous time p_prev: estimated variance at previous time u: velocity input including gaussian noise z: observed position at current time including gaussian noise """ # update with previous data x_pred = self.predict_position(x_prev, u) p_pred = self.predict_variance(p_prev) # update with current observation delta_z = self.calculate_innovation(z, x_pred) p_obsrv_pred_err = self.calculate_obsrv_pred_err_var(p_pred) kalman_gain = self.calculate_kalman_gain(p_pred, p_obsrv_pred_err) x_updated = self.update_position(x_pred, delta_z, kalman_gain) p_updated = self.update_variance(p_pred, kalman_gain, p_obsrv_pred_err) return x_updated, p_updated
カルマンフィルタによる推定は、状態方程式による
1時刻前の状態からの予測と、現在時刻に得られた
観測による更新という2つのステップに分かれます。
上記の関数の最初の2行が予測であり、下記の
ようにします。
x_pred = self.predict_position(x_prev, u) p_pred = self.predict_variance(p_prev)
1行目が状態の予測、2行目は誤差分散の予測です。
ここまでに解説してない誤差分散の予測は、
下記のように実装します。
def predict_variance(self, p_prev): """ Function to predict variance p_{t+1} = F^2 * p^_t + G^2 * Q p_prev: estimated variance at previous time, p^_t """ p_pred = self.F**2 * p_prev + self.G**2 * self.Q return p_pred
簡単に言えば速度入力が持つ誤差の分散を、
1時刻前に推定された誤差分散に加えるという
ことをしています。そのため、観測による
更新がされないと、誤差分散はどんどん大きく
なってしまいます。
そして、estimate_position()の3行目以降が
更新のステップであり、下記のように
なっています。
delta_z = self.calculate_innovation(z, x_pred) p_obsrv_pred_err = self.calculate_obsrv_pred_err_var(p_pred) kalman_gain = self.calculate_kalman_gain(p_pred, p_obsrv_pred_err) x_updated = self.update_position(x_pred, delta_z, kalman_gain) p_updated = self.update_variance(p_pred, kalman_gain, p_obsrv_pred_err)
1行目のcalculate_innovation()は下記の
ように実装します。これは、現在時刻に
得られた観測値と、観測方程式により
計算した観測予測値の差分を計算する
ものであり、イノベーションと呼びます。
def calculate_innovation(self, z, x_pred): """ Function to calculate observation prediction error variance dz = z_{t+1} - H * x_{t+1} x_pred: predicted position with previous data, x_{t+1} """ delta_z = z - self.observation_equation(x_pred) return delta_z
2行目のcalculate_obsrv_pred_err_var()は
下記のように実装します。これは、観測
予測値が含む誤差の分散を計算するための
ものであり、予測された状態の誤差分散と
観測値の誤差分散の足し合わせです。
def calculate_obsrv_pred_err_var(self, p_pred): """ Function to calculate observation prediction error variance s_{t+1} = H^2 * p_{t+1} + R p_pred: predicted variance with previous data, p_{t+1} """ p_obsrv_pred_err = self.H**2 * p_pred + self.R return p_obsrv_pred_err
3行目のcalculate_kalman_gain()は下記のように
実装します。カルマンフィルタでは、予測された
状態の誤差分散と観測値の誤差分散の合計に
対する予測された状態の誤差分散の割合であり、
カルマンゲインと呼ばれます。
このカルマンゲインを先程解説したイノベーション
に掛け合わることで、予測された状態に対する
補正量が得られます。
このカルマンゲインを計算することは最小分散
推定問題と言われ、補正された後の状態の
誤差分散が最小になるようにすることです。
def calculate_kalman_gain(self, p_pred, p_obsrv_pred_err): """ Function to calculate kalman gain k_{t+1} = p_{t+1} * H * s_{t+1}^{-1} p_pred: predicted variance with previous data, p_{t+1} p_obsrv_pred_err: observation prediction error variance, s_{t+1} """ kalman_gain = (p_pred * self.H) / p_obsrv_pred_err return kalman_gain
そして、4行目のupdate_position()により、
予測された状態に補正量を足し合わせ、
最終的に推定された状態を計算します。
これにより、今回の問題の目的である
車両位置の推定が完了します。
def update_position(self, x_pred, delta_z, kalman_gain): """ Function to update position x^_{t+1} = x_{t+1} + k_{t+1} * dz x_pred: predicted position with previous data, x^_{t+1} delta_z: difference between observation and predicted observation, dz kalman_gain: kalman gain to update position, k_{t+1} """ x_updated = x_pred + kalman_gain * delta_z return x_updated
これに加えて、5行目にあるupdate_variance()に
より、最終的に得られた状態が持つ誤差の分散を
推定します。この分散により、推定された状態が
どれだけの精度なのかを判断する目安にできます。
def update_variance(self, p_pred, kalman_gain, p_obsrv_pred_err): """ Function to update variane p^_{t+1} = p_{t+1} - k_{t+1}^2 * s_{t+1} p_pred: predicted variance with previous data, p^_{t+1} kalman_gain: kalman gain to update position, k_{t+1} p_obsrv_pred_err: observation prediction error variance, s_{t+1} """ p_updated = p_pred - kalman_gain**2 * p_obsrv_pred_err return p_updated
プログラムの実行結果
ここまでの実装により、今回の問題である
1次元の車両位置をカルマンフィルタで推定
するシミュレーションができるようになります。
そのためのコード全体は下記のようになります。
def main(): """ main function """ print(__file__ + " start!!") # generate instance of kalman filter kf = LinearKalmanFilter1D(interval_sec=INTERVAL_SEC, input_velocity_ms=INPUT_VELOCITY_MS, input_noise_variance=INPUT_NOISE_VARIANCE, observation_noise_variance=OBSERVATION_NOISE_VARIANCE) # initialize data elapsed_time_sec, true_pos, obsrv_pos, pred_pos = 0.0, 0.0, 0.0, 0.0 time_list, true_pos_list, obsrv_pos_list, pred_pos_list = [], [], [], [] est_pos, est_var = 0.0, 1.0 est_pos_list, est_var_list = [], [] # initialize plot fig = plt.figure() # position plot ax_position = fig.add_subplot(1, 2, 1) ax_position.set_xlabel("Time[s]") ax_position.set_ylabel("Position[m]") ax_position.grid(True) # variance plot ax_variance = fig.add_subplot(1, 2, 2) ax_variance.set_xlabel("Time[s]") ax_variance.set_ylabel("Variance") ax_variance.grid(True) # simulate while TIME_LIMIT_SEC >= elapsed_time_sec: # decide input velocity input_velocity = INPUT_VELOCITY_MS + np.random.randn() * INPUT_NOISE_VARIANCE if input_velocity < 0.0: input_velocity = 0.0 # calculate data obsrv_pos = kf.observation(x_prev=true_pos) true_pos = kf.state_equation(x_prev=true_pos, u=INPUT_VELOCITY_MS) pred_pos = kf.predict_position(x_prev=pred_pos, u=input_velocity) est_pos, est_var = kf.estimate_position(x_prev=est_pos, p_prev=est_var, u=input_velocity, z=obsrv_pos) # record data time_list.append(elapsed_time_sec) obsrv_pos_list.append(obsrv_pos) true_pos_list.append(true_pos) pred_pos_list.append(pred_pos) est_pos_list.append(est_pos) est_var_list.append(est_var) # increment time elapsed_time_sec += INTERVAL_SEC # plot data ax_position.plot(time_list, obsrv_pos_list, color='g', lw=2, label="Observed Position") ax_position.plot(time_list, pred_pos_list, color='m', lw=2, label="Predicted Position") ax_position.plot(time_list, true_pos_list, color='b', lw=2, label="True Position") ax_position.plot(time_list, est_pos_list, color='r', lw=2, label="Estimated Position") ax_position.legend() ax_variance.plot(time_list, est_var_list, color='r', lw=2, label="Estimated Variance") ax_variance.legend() fig.tight_layout() # only when show plot flag is true, show output graph # when unit test is executed, this flag become false # and the graph is not shown if show_plot: plt.show() return True
これを実行すると、こちらのような結果が
得られます。
左のグラフが車両位置の推定結果ですが、
緑は観測値であり大きな誤差分散を含みます。
青は真値、紫は予測値ですが、カルマンフィルタ
による補正がないため真値からどんどん
離れてしまっていることがわかります。
そして赤がカルマンフィルタによる推定結果
ですが、予測と更新により真値に近い値を
計算できています。
また、右のグラフは推定された状態の持つ
誤差分散の結果ですが、これも推定により
初期値からどんどん小さくなっており、
それだけ高精度な推定ができていると言えます。
以上、ここまでで、1次元の車両位置を推定する
カルマンフィルタを実装することができました。
GitHub
今回紹介したPythonプログラムは、こちらの
GitHubリポジトリでも公開しています。
github.com