# Source code for geomstats.learning.kalman_filter

"""Kalman filter on Lie groups, with two local test system models.

"""

import geomstats.backend as gs
from geomstats.geometry.euclidean import Euclidean
from geomstats.geometry.matrices import Matrices
from geomstats.geometry.special_euclidean import SpecialEuclidean

[docs]
class LocalizationLinear:
"""Class for modeling a linear 1D localization problem.

The state is made of a scalar position and scalar speed, thus a 2D vector.
A sensor provides acceleration inputs, while another one provides sparse
measurements of the position.
"""

def __init__(self):
self.group = Euclidean(2, equip=False)
self.dim_noise = 1
self.dim_obs = 1

[docs]
@staticmethod
def propagate(state, sensor_input):
r"""Propagate with piece-wise constant acceleration and velocity.

Takes a given (position, speed) pair :math:(x, v) and creates a new
one :math:(x + dt * v, v + dt * acc), where the time step and the
acceleration are given by an accelerometer.

Parameters
----------
state : array-like, shape=[dim]
Vector representing a state (position, speed).
sensor_input : array-like, shape=
Vector representing the information from the accelerometer.

Returns
-------
new_state : array-like, shape=[dim]
Vector representing the propagated state.
"""
dt, acc = sensor_input
pos, speed = state
pos = pos + dt * speed
speed = speed + dt * acc
return gs.array([pos, speed])

[docs]
def propagation_jacobian(self, state, sensor_input):
r"""Compute the Jacobian associated to the affine propagation..

The Jacobian is given by :math:\begin{bmatrix} 1 & dt \\ & 1
\end{bmatrix}.

Parameters
----------
state : unused
sensor_input : array-like, shape=
Vector representing the information from the accelerometer.

Returns
-------
jacobian : array-like, shape=[dim, dim]
Jacobian of the propagation.
"""
dt, _ = sensor_input
dim = self.group.dim
position_line = gs.hstack((gs.eye(dim // 2), dt * gs.eye(dim // 2)))
speed_line = gs.hstack((gs.zeros((dim // 2, dim // 2)), gs.eye(dim // 2)))
jac = gs.vstack((position_line, speed_line))
return jac

[docs]
def noise_jacobian(self, state, sensor_input):
r"""Compute the matrix associated to the propagation noise.

The noise is supposed additive and only applies to the speed part.
The Jacobian is given by :math:\begin{bmatrix} 0 & \sqrt{dt}
\end{bmatrix}.

Parameters
----------
state : unused
sensor_input : array-like, shape=
Vector representing the information from the accelerometer.

Returns
-------
jacobian : array-like, shape=[dim_noise, dim]
Jacobian of the propagation w.r.t. the noise.
"""
dt, _ = sensor_input
dim = self.group.dim
position_wrt_noise = gs.zeros((dim // 2, dim // 2))
speed_wrt_noise = gs.sqrt(dt) * gs.eye(dim // 2)
jac = gs.vstack((position_wrt_noise, speed_wrt_noise))
return jac

[docs]
def observation_jacobian(self, state, observation):
r"""Compute the matrix associated to the observation model.

The Jacobian is given by :math:\begin{bmatrix} 1 & 0 \end{bmatrix}.

Parameters
----------
state : unused
observation : unused

Returns
-------
jacobian : array-like, shape=[dim_obs, dim]
Jacobian of the observation.
"""
return gs.eye(self.dim_obs, self.group.dim)

[docs]
@staticmethod
def get_measurement_noise_cov(state, observation_cov):
r"""Get the observation covariance.

Parameters
----------
state : unused
observation_cov : array-like, shape=[dim_obs, dim_obs]
Covariance matrix associated to the sensor.

Returns
-------
covariance : array-like, shape=[dim_obs, dim_obs]
Covariance of the observation.
"""
return gs.copy(observation_cov)

[docs]
@staticmethod
def observation_model(state):
"""Model used to create the measurements.

This model simply outputs the position part of the state, i.e. its
first element.

Parameters
----------
state : array-like, shape=[dim]
Vector representing the state.

Returns
-------
observation : array-like, shape=[dim_obs]
Expected observation of the state.
"""
return state[:1]

[docs]
def innovation(self, state, observation):
"""Discrepancy between the measurement and its expected value.

Parameters
----------
state : array-like, shape=[dim]
Vector representing the state.
observation : array-like, shape=[dim_obs]
Obtained measurement.

Returns
-------
innovation : array-like, shape=[dim_obs]
Error between the measurement and the expected value.
"""
return observation - self.observation_model(state)

[docs]
class Localization:
"""Class for modeling a non-linear 2D localization problem.

The state is composed of a planar orientation and position, and is thus a
member of SE(2).
A sensor provides the linear and angular speed, while another one provides
sparse position observations.
"""

def __init__(self):
self.group = SpecialEuclidean(2, point_type="vector", equip=False)
self.dim_noise = 3
self.dim_obs = 2

[docs]
def preprocess_input(self, sensor_input):
"""Separate the input into its main parts.

Each input is the concatenation of four parts: the time step, the 2D
linear velocity and the angular velocity.

Parameters
----------
sensor_input : array-like, shape=
Vector representing the sensor input.

Returns
-------
dt : float
Time step between two consecutive inputs.
linear_vel : array-like, shape=
2D linear velocity.
angular_vel : array-like, shape=[dim_rot]
Angular velocity.
"""
return (
sensor_input,
sensor_input[1 : self.group.n + 1],
sensor_input[self.group.n + 1 :],
)

[docs]
def rotation_matrix(self, theta):
"""Construct the rotation matrix associated to the angle theta.

Parameters
----------
theta : float
Rotation angle.

Returns
-------
rot : array-like, shape=[2, 2]
2D rotation matrix of angle theta.
"""
if gs.ndim(gs.array(theta)) <= 1:
theta = gs.array([theta])
return self.group.rotations.matrix_from_rotation_vector(theta)

[docs]
def regularize_angle(self, theta):
"""Bring back angle theta in ]-pi, pi]."""
if gs.ndim(gs.array(theta)) < 1:
theta = gs.array([theta])
return self.group.rotations.log_from_identity(theta)

[docs]
r"""Construct the matrix associated to the adjoint representation.

The inner automorphism is given by :math:Ad_X : g |-> XgX^-1. For a
state :math:X = (\theta, x, y), the matrix associated to its tangent
:math:\begin{bmatrix} 1 & \\ -J [x, y] & R(\theta) \end{bmatrix},
where :math:R(\theta) is the rotation matrix of angle theta, and
:math:J = \begin{bmatrix} 0 & -1 \\ 1 & 0 \end{bmatrix}

Parameters
----------
state : array-like, shape=[dim]
Vector representing a state.

Returns
-------
"""
theta, _, _ = state
tangent_base = gs.array([[0.0, -1.0], [1.0, 0.0]])
orientation_part = gs.eye(self.group.rotations.dim, self.group.dim)
pos_column = gs.reshape(state[1:], (self.group.n, 1))
position_wrt_orientation = Matrices.mul(-tangent_base, pos_column)
position_wrt_position = self.rotation_matrix(theta)
last_lines = gs.hstack((position_wrt_orientation, position_wrt_position))

[docs]
def propagate(self, state, sensor_input):
r"""Propagate state with constant velocity motion model on SE(2).

From a given state (orientation, position) pair :math:(\theta, x),
a new one is obtained as :math:(\theta + dt * \omega,
x + dt * R(\theta) u), where the time step, the linear and angular
velocities u and :math:\omega are given some sensor (e.g., odometers).

Parameters
----------
state : array-like, shape=[dim]
Vector representing a state (orientation, position).
sensor_input : array-like, shape=
Vector representing the information from the sensor.

Returns
-------
new_state : array-like, shape=[dim]
Vector representing the propagated state.
"""
dt, linear_vel, angular_vel = self.preprocess_input(sensor_input)
theta, _, _ = state
local_vel = gs.matvec(self.rotation_matrix(theta), linear_vel)
new_pos = state[1:] + dt * local_vel
theta = theta + dt * angular_vel
theta = self.regularize_angle(theta)
return gs.concatenate((theta, new_pos), axis=0)

[docs]
def propagation_jacobian(self, state, sensor_input):
r"""Compute the Jacobian associated to the input.

Since the propagation writes f(x) = x*u, and the error is modeled on
the Lie algebra, the Jacobian is Ad_{u^{-1}} [BB2017].

Parameters
----------
state : unused
sensor_input : array-like, shape=
Vector representing the information from the sensor.

Returns
-------
jacobian : array-like, shape=[dim, dim]
Jacobian of the propagation.
"""
dt, linear_vel, angular_vel = self.preprocess_input(sensor_input)
input_vector_form = dt * gs.concatenate((angular_vel, linear_vel), axis=0)
input_inv = self.group.inverse(input_vector_form)

[docs]
def noise_jacobian(self, state, sensor_input):
r"""Compute the matrix associated to the propagation noise.

The noise being considered multiplicative, it is simply the identity
scaled by the time step.

Parameters
----------
state : unused
sensor_input : array-like, shape=
Vector representing the information from the sensor.

Returns
-------
jacobian : array-like, shape=[dim_noise, dim]
Jacobian of the propagation w.r.t. the noise.
"""
dt, _, _ = self.preprocess_input(sensor_input)
return gs.sqrt(dt) * gs.eye(self.dim_noise)

[docs]
def observation_jacobian(self, state, observation):
r"""Compute the matrix associated to the observation model.

The Jacobian is given by :math:\begin{bmatrix} 0 & I_2 \end{bmatrix}.

Parameters
----------
state : unused
observation : unused

Returns
-------
jacobian : array-like, shape=[dim_obs, dim]
Jacobian of the observation.
"""
orientation_part = gs.zeros((self.dim_obs, self.group.rotations.dim))
position_part = gs.eye(self.dim_obs, self.group.n)
return gs.hstack((orientation_part, position_part))

[docs]
def get_measurement_noise_cov(self, state, observation_cov):
r"""Get the observation covariance.

For an observation y and an orientation theta, the modified observation
considered for the innovation is :math:R(\theta)^T y [BB2017], so the
covariance N is rotated accordingly as :math:R(\theta)^T N R(\theta).

Parameters
----------
state : array-like, shape=[dim]
Vector representing a state.
observation_cov : array-like, shape=[dim_obs, dim_obs]
Covariance matrix associated to the sensor.

Returns
-------
covariance : array-like, shape=[dim_obs, dim_obs]
Covariance of the observation.
"""
theta, _, _ = state
rot = self.rotation_matrix(theta)
return Matrices.mul(Matrices.transpose(rot), observation_cov, rot)

[docs]
def observation_model(self, state):
"""Model used to create the measurements.

This model simply outputs the position part of the state, i.e. its
two last elements.

Parameters
----------
state : array-like, shape=[dim]
Vector representing the state.

Returns
-------
observation : array-like, shape=[dim_obs]
Expected observation of the state.
"""
return state[self.group.rotations.dim :]

[docs]
def innovation(self, state, observation):
"""Discrepancy between the measurement and its expected value.

The linear error (observation - expected) is cast into the state's
frame by rotation, following [BB2017]

Parameters
----------
state : array-like, shape=[dim]
Vector representing the state.
observation : array-like, shape=[dim_obs]
Obtained measurement.

Returns
-------
innovation : array-like, shape=[dim_obs]
Error between the measurement and the expected value.
"""
theta, _, _ = state
rot = self.rotation_matrix(theta)
expected = self.observation_model(state)
return gs.matvec(Matrices.transpose(rot), observation - expected)

[docs]
class KalmanFilter:
"""Class for a general Kalman filter working on Lie groups.

Given an adapted model, it provides the tools to carry out non-linear state
estimation with an error modeled on the Lie algebra. The model must provide
the functions to propagate and update a state, the observation model, and
the computation of the Jacobians.

Parameter
---------
model : {class, instance}
Object representing an observed dynamical system.
"""

def __init__(self, model):
self.model = model
self.state = model.group.identity
self.covariance = gs.zeros((self.model.group.dim, self.model.group.dim))
self.process_noise = gs.zeros((self.model.dim_noise, self.model.dim_noise))
self.measurement_noise = gs.zeros((self.model.dim_obs, self.model.dim_obs))

[docs]
def initialize_covariances(self, prior_values, process_values, obs_values):
"""Set the values of the covariances."""
cov_dict = {
"covariance": prior_values,
"process_noise": process_values,
"measurement_noise": obs_values,
}
for attribute, value in cov_dict.items():
setattr(self, attribute, value)

[docs]
def propagate(self, sensor_input):
"""Propagate the estimate and its covariance.

Given the propagation Jacobian F and the noise Jacobian G, the
covariance P becomes F P F^T + G Q G^T.

Parameters
----------
sensor_input : array-like
Vector representing the propagation sensor input.
"""
prop_noise = self.process_noise
prop_jac = self.model.propagation_jacobian(self.state, sensor_input)
noise_jac = self.model.noise_jacobian(self.state, sensor_input)

prop_cov = Matrices.mul(prop_jac, self.covariance, Matrices.transpose(prop_jac))
noise_cov = Matrices.mul(noise_jac, prop_noise, Matrices.transpose(noise_jac))
self.covariance = prop_cov + noise_cov
self.state = self.model.propagate(self.state, sensor_input)

[docs]
def compute_gain(self, observation):
"""Compute the Kalman gain given the observation model.

Given the observation Jacobian H and covariance N (not necessarily
equal to that of the sensor), and the current covariance P, the Kalman
gain is K = P H^T(H P H^T + N)^{-1}.

Parameters
----------
observation : array-like, shape=[dim_obs]
Obtained measurement.

Returns
-------
gain : array-like, shape=[model.dim, model.dim_obs]
Kalman gain.
"""
obs_cov = self.model.get_measurement_noise_cov(
self.state, self.measurement_noise
)
obs_jac = self.model.observation_jacobian(self.state, observation)
expected_cov = Matrices.mul(
obs_jac, self.covariance, Matrices.transpose(obs_jac)
)
innovation_cov = expected_cov + obs_cov
return Matrices.mul(
self.covariance, Matrices.transpose(obs_jac), gs.linalg.inv(innovation_cov)
)

[docs]
def update(self, observation):
r"""Update the current estimate given an observation.

The state is updated by the matrix-vector product of the Kalman gain K
and the innovation. The possibly non-linear update function is provided
by the model.
Given the observation Jacobian H and covariance N, the current
covariance P is updated as (I - KH)P.

Parameters
----------
observation : array-like, shape=[dim_obs]
Obtained measurement.
"""
innovation = self.model.innovation(self.state, observation)
gain = self.compute_gain(observation)
obs_jac = self.model.observation_jacobian(self.state, observation)
cov_factor = gs.eye(self.model.group.dim) - Matrices.mul(gain, obs_jac)
self.covariance = Matrices.mul(cov_factor, self.covariance)
state_upd = gs.matvec(gain, innovation)
self.state = self.model.group.exp(state_upd, self.state)