Pythonで拡張カルマンフィルタ

以下の書籍の例をPythonで試しました.
また,拡張カルマンフィルタを使うと,(当然)線形カルマンフィルタの例も計算できることを確かめます.

【関連記事】
PythonでUnscented Kalman Filter (UKF) - Notes_JP

拡張カルマンフィルタ

非線形関数$\bm{f}, h$を使ったモデルによって記述される,以下のスカラ時系列$\{y(k)\}$を考える.
\begin{aligned}
\begin{cases}
\, \bm{x}(k + 1) = \bm{f} \bigl(\bm{x}(k)\bigr) + \bm{b} v(k)
& (\text{状態方程式}) \\
\, y(k) = h \bigl(\bm{x}(k)\bigr) + w(k)
& (\text{観測方程式})
\end{cases}
\end{aligned}

拡張カルマンフィルタ(Extended Kalman filter - Wikipedia,EKF)では,非線形関数を線形近似して,線形カルマンフィルタに帰着させる.

以下も参考になるかも:
非線形カルマンフィルタの基礎

numpyによる実装

事後状態推定値$\hat{x}$,事後誤差共分散行列$P$,カルマンゲイン$\bm{g}$を返す関数:

def ekf(f, h, dfdx, B, dhdx, Q, R, y, xhat, P):
    """
    x(k+1) = f(x(k)) + Bv(k)
    y(k) = h(x(k)) + w(k)
    """
    xhatm = f(xhat)
    A = dfdx(xhat)
    Pm = (A @ P @ A.T) + Q * (B @ B.T)
    CT = dhdx(xhatm)
    C = CT.T
    G = (Pm @ C) / ((CT @ Pm @ C) + R)
    xhat_new = xhatm + G * (y - h(xhatm))
    P_new = (np.eye(len(A)) - (G @ CT)) @ Pm
    return xhat_new, P_new, G

上の関数を繰り返し適用して,$\hat{x}, \bm{g}$をすべての時系列で計算する関数:

def calc_timeseries_ekf(N, f, h, dfdx, B, dhdx, Q, R, y, x0, P_ekf):
    xhat_ekf = [x0]
    G0 = np.empty(x0.shape)
    G0[:] = np.nan
    G_ekf = [G0]
    for k in range(1, N):
        xhat_new, P_new, G = ekf(f, h, dfdx, B, dhdx, Q, R, y[k], xhat_ekf[-1], P_ekf)
        xhat_ekf.append(xhat_new)
        G_ekf.append(G)
        P_ekf = P_new
    xhat_ekf = np.array(xhat_ekf)
    G_ekf= np.array(G_ekf)
    return xhat_ekf, G_ekf

物体の落下運動

書籍7.4の例を,上の関数でシミュレーションする.
背景については,次が参考になるかも.
Unscented Kalman Filterを用いた力学系の状態およびパラメータ推定
http://lab.cntl.kyutech.ac.jp/~nishida/lecture/psc/no9.pdf

次の記事で,Unscented Kalman Filter (UKF)との比較をしています.
PythonでUnscented Kalman Filter (UKF) - Notes_JP

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ptick

# --- パラメータ ---
rho = 1.23
g = 9.81
eta = 6e3
M = 3e4
a = 3e4

T = 0.5
EndTime = 30
N = int(EndTime / T + 1)
time = T * np.arange(0, N)
n = 3
R = 4e3
Q = 0
B = np.array([
    [0],
    [0],
    [0]
])

# --- 関数 ---
def f(x):
    x1, x2, x3 = x
    x1, x2, x3 = x1.item(0), x2.item(0), x3.item(0)
    res = np.array([
        [x1 + T * x2],
        [x2 + T * (0.5 * rho * np.exp(- x1 / eta) * (x2 ** 2) * x3 - g)],
        [x3]
    ])
    return res

def h(x):
    x1, x2, x3 = x
    x1, x2, x3 = x1.item(0), x2.item(0), x3.item(0)
    return np.sqrt((M ** 2) + ((x1 - a) ** 2))

def dfdx(x):
    x1, x2, x3 = x
    x1, x2, x3 = x1.item(0), x2.item(0), x3.item(0)
    res = np.array([
        [1, T, 0],
        [- T * 0.5 * (rho / eta) * np.exp(- x1 / eta) * (x2 ** 2) * x3,
         1 + T * rho * np.exp(- x1 / eta) * x2 * x3,
         T * 0.5 * rho * np.exp(- x1 / eta) * (x2 ** 2)
        ],
        [0, 0, 1]
    ])
    return res

def dhdx(x):
    x1, x2, x3 = x
    x1, x2, x3 = x1.item(0), x2.item(0), x3.item(0)
    res = np.array([
        [(x1 - a) / h(x), 0, 0]
    ])
    return res

# --- 真値, 測定値 ---
w = np.random.normal(loc = 0.0, scale = np.sqrt(R), size = N)
x = [np.array([
    [9e4], 
    [-6e3],
    [3e-3]
])]
y = [h(x[-1]) + w[0]]

for k in range(N - 1):
    x.append(f(x[-1]))
    y.append(h(x[-1]) + w[k + 1]) 
x = np.array(x)

# --- EKF ---
P_ekf = np.array([
    [9e3, 0, 0],
    [0, 4e5, 0],
    [0, 0, 0.4]
])

xhat_ekf, G_ekf = calc_timeseries_ekf(N, f, h, dfdx, B, dhdx, Q, R, y, x[0], P_ekf)

# --- プロット ---
fig_num = 3
fig = plt.figure(figsize = (10, 3 * fig_num), tight_layout = True)
axes = [fig.add_subplot(fig_num, 1, i + 1) for i in range(fig_num)]
axes2 = [ax.twinx() for ax in axes]
ylim = [(0, 1e5), (-1e4, 0), (-0.5, 0.5)]
scilimits = [4, 3, 0]
for i in range(fig_num):
    ax, ax2 = axes[i], axes2[i]
    ax.plot(time, x[:, i], label = 'true', ls = 'dotted')
    ax.plot(time, xhat_ekf[:, i], label = 'estimated (EKF)')
    ax.set_ylim(ylim[i])
    ax2.plot(time, G_ekf[:, i], label = 'gain', ls = 'dashed', c = 'tab:gray')
    if scilimits[i] != 0:
        ax.yaxis.set_major_formatter(ptick.ScalarFormatter(useMathText = True))
        ax.ticklabel_format(style="sci", axis="y", scilimits=(scilimits[i], scilimits[i]))
    ax.set_xlabel('time (s)')
    ax.set_ylabel(f'x{i + 1}')
    ax2.set_ylabel(f'g{i + 1}')
    lines, labels = ax.get_legend_handles_labels()
    li2, la2 = ax2.get_legend_handles_labels()
    lines += li2
    labels += la2
    ax2.legend(lines, labels, loc = 'lower left')
plt.show()

結果
結果

線形カルマンフィルタへの適用例

拡張カルマンフィルタは,当然,線形カルマンフィルタの問題に適用できる.
書籍の例を,上の関数でシミュレーションしてみる.

例えば,例題6.3はこんな感じ.

# --- パラメータ ---
A = np.array([
    [0, -0.7],
    [1, -1.5]
])
B = np.array([
    [0.5],
    [1]
])
C = np.array([
    [0],
    [1]
])

N, Q, R = 100, 1, 0.1

# --- 関数 ---
def f(x):
    return A @ x

def h(x):
    return C.T @ x

def dfdx(x):
    return A

def dhdx(x):
    return C.T

# --- 真値, 測定値 ---
v = np.random.normal(loc = 0.0, scale = np.sqrt(Q), size = N)
w = np.random.normal(loc = 0.0, scale = np.sqrt(R), size = N)

x = [np.array([[0], 
               [0]])]
y = [h(x[-1]) + w[0]]
for k in range(N - 1):
    x.append(f(x[-1]) + B * v[k + 1])
    y.append(h(x[-1]) + w[k + 1])
x = np.array(x)
y = np.array(y)

# --- EKF ---
gamma = 1
P_ekf = gamma * np.eye(2)
xhat_ekf, G_ekf = calc_timeseries_ekf(N, f, h, dfdx, B, dhdx, Q, R, y, x[0], P_ekf)

# --- プロット ---
fig_num = 2
fig = plt.figure(figsize = (10, 3 * fig_num), tight_layout = True)
axes = [fig.add_subplot(fig_num, 1, i + 1) for i in range(fig_num)]
for i in range(fig_num):
    ax = axes[i]
    ax2 = ax.twinx()
    ax.plot(y[:, 0, 0], label = 'measured', c = 'tab:blue')
    ax.plot(x[:, i], label = 'true', ls = 'dotted', c = 'red')
    ax.plot(xhat_ekf[:, i], label = 'estimated (EKF)', c = 'tab:orange')
    ax2.plot(G_ekf[:, i], label = 'gain', ls = 'dashed', c = 'tab:gray')
    ax.set_xlabel('k')
    ax.set_ylabel(f'x{i + 1}')
    ax2.set_ylabel(f'g{i + 1}')
    lines, labels = ax.get_legend_handles_labels()
    li2, la2 = ax2.get_legend_handles_labels()
    lines += li2
    labels += la2
    ax2.legend(lines, labels, loc = 'lower right')
plt.show()

結果(例題6.3)
結果(例題6.3)