Source code for geomstats.learning.kalman_filter

"""Kalman filter on Lie groups, with two local test system models.

Lead author: Paul Chauchat.
"""

import geomstats.backend as gs
from geomstats.geometry.euclidean import Euclidean
from geomstats.geometry.matrices import Matrices
from geomstats.geometry.special_euclidean import SpecialEuclidean


[docs] class LocalizationLinear: """Class for modeling a linear 1D localization problem. The state is made of a scalar position and scalar speed, thus a 2D vector. A sensor provides acceleration inputs, while another one provides sparse measurements of the position. """ def __init__(self): self.group = Euclidean(2, equip=False) self.dim_noise = 1 self.dim_obs = 1
[docs] @staticmethod def propagate(state, sensor_input): r"""Propagate with piece-wise constant acceleration and velocity. Takes a given (position, speed) pair :math:`(x, v)` and creates a new one :math:`(x + dt * v, v + dt * acc)`, where the time step and the acceleration are given by an accelerometer. Parameters ---------- state : array-like, shape=[dim] Vector representing a state (position, speed). sensor_input : array-like, shape=[2] Vector representing the information from the accelerometer. Returns ------- new_state : array-like, shape=[dim] Vector representing the propagated state. """ dt, acc = sensor_input pos, speed = state pos = pos + dt * speed speed = speed + dt * acc return gs.array([pos, speed])
[docs] def propagation_jacobian(self, state, sensor_input): r"""Compute the Jacobian associated to the affine propagation.. The Jacobian is given by :math:`\begin{bmatrix} 1 & dt \\ & 1 \end{bmatrix}`. Parameters ---------- state : unused sensor_input : array-like, shape=[2] Vector representing the information from the accelerometer. Returns ------- jacobian : array-like, shape=[dim, dim] Jacobian of the propagation. """ dt, _ = sensor_input dim = self.group.dim position_line = gs.hstack((gs.eye(dim // 2), dt * gs.eye(dim // 2))) speed_line = gs.hstack((gs.zeros((dim // 2, dim // 2)), gs.eye(dim // 2))) jac = gs.vstack((position_line, speed_line)) return jac
[docs] def noise_jacobian(self, state, sensor_input): r"""Compute the matrix associated to the propagation noise. The noise is supposed additive and only applies to the speed part. The Jacobian is given by :math:`\begin{bmatrix} 0 & \sqrt{dt} \end{bmatrix}`. Parameters ---------- state : unused sensor_input : array-like, shape=[2] Vector representing the information from the accelerometer. Returns ------- jacobian : array-like, shape=[dim_noise, dim] Jacobian of the propagation w.r.t. the noise. """ dt, _ = sensor_input dim = self.group.dim position_wrt_noise = gs.zeros((dim // 2, dim // 2)) speed_wrt_noise = gs.sqrt(dt) * gs.eye(dim // 2) jac = gs.vstack((position_wrt_noise, speed_wrt_noise)) return jac
[docs] def observation_jacobian(self, state, observation): r"""Compute the matrix associated to the observation model. The Jacobian is given by :math:`\begin{bmatrix} 1 & 0 \end{bmatrix}`. Parameters ---------- state : unused observation : unused Returns ------- jacobian : array-like, shape=[dim_obs, dim] Jacobian of the observation. """ return gs.eye(self.dim_obs, self.group.dim)
[docs] @staticmethod def get_measurement_noise_cov(state, observation_cov): r"""Get the observation covariance. Parameters ---------- state : unused observation_cov : array-like, shape=[dim_obs, dim_obs] Covariance matrix associated to the sensor. Returns ------- covariance : array-like, shape=[dim_obs, dim_obs] Covariance of the observation. """ return gs.copy(observation_cov)
[docs] @staticmethod def observation_model(state): """Model used to create the measurements. This model simply outputs the position part of the state, i.e. its first element. Parameters ---------- state : array-like, shape=[dim] Vector representing the state. Returns ------- observation : array-like, shape=[dim_obs] Expected observation of the state. """ return state[:1]
[docs] def innovation(self, state, observation): """Discrepancy between the measurement and its expected value. Parameters ---------- state : array-like, shape=[dim] Vector representing the state. observation : array-like, shape=[dim_obs] Obtained measurement. Returns ------- innovation : array-like, shape=[dim_obs] Error between the measurement and the expected value. """ return observation - self.observation_model(state)
[docs] class Localization: """Class for modeling a non-linear 2D localization problem. The state is composed of a planar orientation and position, and is thus a member of SE(2). A sensor provides the linear and angular speed, while another one provides sparse position observations. """ def __init__(self): self.group = SpecialEuclidean(2, point_type="vector", equip=False) self.dim_noise = 3 self.dim_obs = 2
[docs] def preprocess_input(self, sensor_input): """Separate the input into its main parts. Each input is the concatenation of four parts: the time step, the 2D linear velocity and the angular velocity. Parameters ---------- sensor_input : array-like, shape=[4] Vector representing the sensor input. Returns ------- dt : float Time step between two consecutive inputs. linear_vel : array-like, shape=[2] 2D linear velocity. angular_vel : array-like, shape=[dim_rot] Angular velocity. """ return ( sensor_input[0], sensor_input[1 : self.group.n + 1], sensor_input[self.group.n + 1 :], )
[docs] def rotation_matrix(self, theta): """Construct the rotation matrix associated to the angle theta. Parameters ---------- theta : float Rotation angle. Returns ------- rot : array-like, shape=[2, 2] 2D rotation matrix of angle theta. """ if gs.ndim(gs.array(theta)) <= 1: theta = gs.array([theta]) return self.group.rotations.matrix_from_rotation_vector(theta)
[docs] def regularize_angle(self, theta): """Bring back angle theta in ]-pi, pi].""" if gs.ndim(gs.array(theta)) < 1: theta = gs.array([theta]) return self.group.rotations.log_from_identity(theta)
[docs] def adjoint_map(self, state): r"""Construct the matrix associated to the adjoint representation. The inner automorphism is given by :math:`Ad_X : g |-> XgX^-1`. For a state :math:`X = (\theta, x, y)`, the matrix associated to its tangent map, the adjoint representation, is :math:`\begin{bmatrix} 1 & \\ -J [x, y] & R(\theta) \end{bmatrix}`, where :math:`R(\theta)` is the rotation matrix of angle theta, and :math:`J = \begin{bmatrix} 0 & -1 \\ 1 & 0 \end{bmatrix}` Parameters ---------- state : array-like, shape=[dim] Vector representing a state. Returns ------- adjoint : array-like, shape=[dim, dim] Adjoint representation of the state. """ theta, _, _ = state tangent_base = gs.array([[0.0, -1.0], [1.0, 0.0]]) orientation_part = gs.eye(self.group.rotations.dim, self.group.dim) pos_column = gs.reshape(state[1:], (self.group.n, 1)) position_wrt_orientation = Matrices.mul(-tangent_base, pos_column) position_wrt_position = self.rotation_matrix(theta) last_lines = gs.hstack((position_wrt_orientation, position_wrt_position)) ad = gs.vstack((orientation_part, last_lines)) return ad
[docs] def propagate(self, state, sensor_input): r"""Propagate state with constant velocity motion model on SE(2). From a given state (orientation, position) pair :math:`(\theta, x)`, a new one is obtained as :math:`(\theta + dt * \omega, x + dt * R(\theta) u)`, where the time step, the linear and angular velocities u and :math:\omega are given some sensor (e.g., odometers). Parameters ---------- state : array-like, shape=[dim] Vector representing a state (orientation, position). sensor_input : array-like, shape=[4] Vector representing the information from the sensor. Returns ------- new_state : array-like, shape=[dim] Vector representing the propagated state. """ dt, linear_vel, angular_vel = self.preprocess_input(sensor_input) theta, _, _ = state local_vel = gs.matvec(self.rotation_matrix(theta), linear_vel) new_pos = state[1:] + dt * local_vel theta = theta + dt * angular_vel theta = self.regularize_angle(theta) return gs.concatenate((theta, new_pos), axis=0)
[docs] def propagation_jacobian(self, state, sensor_input): r"""Compute the Jacobian associated to the input. Since the propagation writes f(x) = x*u, and the error is modeled on the Lie algebra, the Jacobian is Ad_{u^{-1}} [BB2017]. Parameters ---------- state : unused sensor_input : array-like, shape=[4] Vector representing the information from the sensor. Returns ------- jacobian : array-like, shape=[dim, dim] Jacobian of the propagation. """ dt, linear_vel, angular_vel = self.preprocess_input(sensor_input) input_vector_form = dt * gs.concatenate((angular_vel, linear_vel), axis=0) input_inv = self.group.inverse(input_vector_form) return self.adjoint_map(input_inv)
[docs] def noise_jacobian(self, state, sensor_input): r"""Compute the matrix associated to the propagation noise. The noise being considered multiplicative, it is simply the identity scaled by the time step. Parameters ---------- state : unused sensor_input : array-like, shape=[4] Vector representing the information from the sensor. Returns ------- jacobian : array-like, shape=[dim_noise, dim] Jacobian of the propagation w.r.t. the noise. """ dt, _, _ = self.preprocess_input(sensor_input) return gs.sqrt(dt) * gs.eye(self.dim_noise)
[docs] def observation_jacobian(self, state, observation): r"""Compute the matrix associated to the observation model. The Jacobian is given by :math:`\begin{bmatrix} 0 & I_2 \end{bmatrix}`. Parameters ---------- state : unused observation : unused Returns ------- jacobian : array-like, shape=[dim_obs, dim] Jacobian of the observation. """ orientation_part = gs.zeros((self.dim_obs, self.group.rotations.dim)) position_part = gs.eye(self.dim_obs, self.group.n) return gs.hstack((orientation_part, position_part))
[docs] def get_measurement_noise_cov(self, state, observation_cov): r"""Get the observation covariance. For an observation y and an orientation theta, the modified observation considered for the innovation is :math:`R(\theta)^T y` [BB2017], so the covariance N is rotated accordingly as :math:`R(\theta)^T N R(\theta)`. Parameters ---------- state : array-like, shape=[dim] Vector representing a state. observation_cov : array-like, shape=[dim_obs, dim_obs] Covariance matrix associated to the sensor. Returns ------- covariance : array-like, shape=[dim_obs, dim_obs] Covariance of the observation. """ theta, _, _ = state rot = self.rotation_matrix(theta) return Matrices.mul(Matrices.transpose(rot), observation_cov, rot)
[docs] def observation_model(self, state): """Model used to create the measurements. This model simply outputs the position part of the state, i.e. its two last elements. Parameters ---------- state : array-like, shape=[dim] Vector representing the state. Returns ------- observation : array-like, shape=[dim_obs] Expected observation of the state. """ return state[self.group.rotations.dim :]
[docs] def innovation(self, state, observation): """Discrepancy between the measurement and its expected value. The linear error (observation - expected) is cast into the state's frame by rotation, following [BB2017] Parameters ---------- state : array-like, shape=[dim] Vector representing the state. observation : array-like, shape=[dim_obs] Obtained measurement. Returns ------- innovation : array-like, shape=[dim_obs] Error between the measurement and the expected value. """ theta, _, _ = state rot = self.rotation_matrix(theta) expected = self.observation_model(state) return gs.matvec(Matrices.transpose(rot), observation - expected)
[docs] class KalmanFilter: """Class for a general Kalman filter working on Lie groups. Given an adapted model, it provides the tools to carry out non-linear state estimation with an error modeled on the Lie algebra. The model must provide the functions to propagate and update a state, the observation model, and the computation of the Jacobians. Parameter --------- model : {class, instance} Object representing an observed dynamical system. """ def __init__(self, model): self.model = model self.state = model.group.identity self.covariance = gs.zeros((self.model.group.dim, self.model.group.dim)) self.process_noise = gs.zeros((self.model.dim_noise, self.model.dim_noise)) self.measurement_noise = gs.zeros((self.model.dim_obs, self.model.dim_obs))
[docs] def initialize_covariances(self, prior_values, process_values, obs_values): """Set the values of the covariances.""" cov_dict = { "covariance": prior_values, "process_noise": process_values, "measurement_noise": obs_values, } for attribute, value in cov_dict.items(): setattr(self, attribute, value)
[docs] def propagate(self, sensor_input): """Propagate the estimate and its covariance. Given the propagation Jacobian F and the noise Jacobian G, the covariance P becomes F P F^T + G Q G^T. Parameters ---------- sensor_input : array-like Vector representing the propagation sensor input. """ prop_noise = self.process_noise prop_jac = self.model.propagation_jacobian(self.state, sensor_input) noise_jac = self.model.noise_jacobian(self.state, sensor_input) prop_cov = Matrices.mul(prop_jac, self.covariance, Matrices.transpose(prop_jac)) noise_cov = Matrices.mul(noise_jac, prop_noise, Matrices.transpose(noise_jac)) self.covariance = prop_cov + noise_cov self.state = self.model.propagate(self.state, sensor_input)
[docs] def compute_gain(self, observation): """Compute the Kalman gain given the observation model. Given the observation Jacobian H and covariance N (not necessarily equal to that of the sensor), and the current covariance P, the Kalman gain is K = P H^T(H P H^T + N)^{-1}. Parameters ---------- observation : array-like, shape=[dim_obs] Obtained measurement. Returns ------- gain : array-like, shape=[model.dim, model.dim_obs] Kalman gain. """ obs_cov = self.model.get_measurement_noise_cov( self.state, self.measurement_noise ) obs_jac = self.model.observation_jacobian(self.state, observation) expected_cov = Matrices.mul( obs_jac, self.covariance, Matrices.transpose(obs_jac) ) innovation_cov = expected_cov + obs_cov return Matrices.mul( self.covariance, Matrices.transpose(obs_jac), gs.linalg.inv(innovation_cov) )
[docs] def update(self, observation): r"""Update the current estimate given an observation. The state is updated by the matrix-vector product of the Kalman gain K and the innovation. The possibly non-linear update function is provided by the model. Given the observation Jacobian H and covariance N, the current covariance P is updated as (I - KH)P. Parameters ---------- observation : array-like, shape=[dim_obs] Obtained measurement. """ innovation = self.model.innovation(self.state, observation) gain = self.compute_gain(observation) obs_jac = self.model.observation_jacobian(self.state, observation) cov_factor = gs.eye(self.model.group.dim) - Matrices.mul(gain, obs_jac) self.covariance = Matrices.mul(cov_factor, self.covariance) state_upd = gs.matvec(gain, innovation) self.state = self.model.group.exp(state_upd, self.state)