"""Parameterized curves on any given manifold.
Lead author: Alice Le Brigant.
"""
import logging
import math
from scipy.interpolate import CubicSpline, PchipInterpolator
import geomstats.backend as gs
from geomstats.algebra_utils import from_vector_to_diagonal_matrix
from geomstats.geometry.diffeo import AutodiffDiffeo, Diffeo
from geomstats.geometry.euclidean import Euclidean
from geomstats.geometry.fiber_bundle import AlignerAlgorithm, FiberBundle
from geomstats.geometry.hypersphere import Hypersphere
from geomstats.geometry.landmarks import Landmarks
from geomstats.geometry.manifold import register_quotient
from geomstats.geometry.matrices import Matrices
from geomstats.geometry.nfold_manifold import NFoldManifold, NFoldMetric
from geomstats.geometry.pullback_metric import PullbackDiffeoMetric
from geomstats.geometry.quotient_metric import QuotientMetric
from geomstats.numerics.finite_differences import (
centered_difference,
forward_difference,
second_centered_difference,
)
from geomstats.numerics.interpolation import (
LinearInterpolator1D,
UniformUnitIntervalLinearInterpolator,
)
from geomstats.vectorization import check_is_batch, get_batch_shape
[docs]
def insert_zeros(array, axis=-1, end=False):
"""Insert zeros in a given array.
Insert zeros while taking care of
Parameters
----------
array : array-like
axis : int
Axis in which insert the zeros.
Must be given backwards.
end : bool
If True, zeros are introduced at the end.
Returns
-------
array_with_zeros : array-like
Shape in the specified axis increases by one.
"""
array_ndim = len(array.shape[axis:])
batch_shape = get_batch_shape(array_ndim, array)
shape = batch_shape + (1,) + array.shape[len(batch_shape) + 1 :]
zeros = gs.zeros(shape)
first, second = (array, zeros) if end else (zeros, array)
return gs.concatenate((first, second), axis=-array_ndim)
[docs]
class DiscreteCurves:
"""Space of discrete curves."""
def __new__(
self, ambient_dim=2, k_sampling_points=10, starting_at_origin=True, equip=True
):
"""Instantiate discrete curves.
If `starting_at_origin=True`, instantiates discrete curves modulo translations
eventually equipped with an elastic metric.
If `starting_at_origin=False`, instantiates discrete curves
eventually equipped with an L2 metric.
Parameters
----------
ambient_dim : int
Dimension of the ambient Euclidean space in which curves take values.
k_sampling_points : int
Number of sampling points.
starting_at_origin : bool
If rotations are quotiented out.
equip : bool
If True, equip space with default metric.
"""
if starting_at_origin:
return DiscreteCurvesStartingAtOrigin(
ambient_dim=ambient_dim,
k_sampling_points=k_sampling_points,
equip=equip,
)
ambient_manifold = Euclidean(ambient_dim)
space = Landmarks(
ambient_manifold=ambient_manifold,
k_landmarks=k_sampling_points,
equip=False,
)
if equip:
space.equip_with_metric(L2CurvesMetric)
return space
[docs]
class DiscreteCurvesStartingAtOrigin(NFoldManifold):
r"""Space of discrete curves modulo translations.
Each individual curve is represented by a 2d-array of shape `[
k_sampling_points - 1, ambient_dim]`.
This space corresponds to the space of immersions defined below, i.e. the
space of smooth functions from an interval I into the ambient Euclidean
space M, with non-vanishing derivative.
.. math::
Imm(I, M)=\{c \in C^{\infty}(I, M) \|c'(s)\|\neq 0 \forall s \in I \},
where the interval of parameters I is taken to be I = [0, 1]
without loss of generality.
Parameters
----------
ambient_dim : int
Dimension of the ambient Euclidean space in which curves take values.
k_sampling_points : int
Number of sampling points.
equip : bool
If True, equip space with default metric.
"""
def __init__(self, ambient_dim=2, k_sampling_points=10, equip=True):
ambient_manifold = Euclidean(ambient_dim)
super().__init__(ambient_manifold, k_sampling_points - 1, equip=equip)
self._sphere = Hypersphere(dim=ambient_dim - 1)
self._discrete_curves_with_l2 = None
[docs]
def new(self, equip=True):
"""Create manifold with same parameters."""
return DiscreteCurvesStartingAtOrigin(
ambient_dim=self.ambient_manifold.dim,
k_sampling_points=self.k_sampling_points,
equip=equip,
)
@property
def ambient_manifold(self):
"""Manifold in which curves take values."""
return self.base_manifold
@property
def k_sampling_points(self):
"""Number of sampling points for the discrete curves."""
return self.n_copies + 1
@property
def discrete_curves_with_l2(self):
"""Copy of discrete curves with the L^2 metric."""
if self._discrete_curves_with_l2 is None:
self._discrete_curves_with_l2 = self.new(equip=False).equip_with_metric(
L2CurvesMetric
)
return self._discrete_curves_with_l2
[docs]
@staticmethod
def default_metric():
"""Metric to equip the space with if equip is True."""
return SRVMetric
[docs]
def insert_origin(self, point):
"""Insert origin as first element of point."""
return insert_zeros(point, axis=-self.point_ndim)
[docs]
def projection(self, point):
"""Project a point from discrete curves.
Removes translation and origin.
Parameters
----------
point : array-like, shape=[..., k_sampling_points, ambient_dim]
Returns
-------
proj_point : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
"""
if point.shape[-2] == self.k_sampling_points - 1:
return gs.copy(point)
return (point[..., :, :] - gs.expand_dims(point[..., 0, :], axis=-2))[
..., 1:, :
]
[docs]
def random_point(self, n_samples=1, bound=1.0):
"""Sample random curves.
Sampling on the sphere to avoid chaotic curves.
"""
sampling_times = gs.linspace(0.0, 1.0, self.k_sampling_points)
initial_point = self._sphere.random_point(n_samples)
initial_tangent_vec = self._sphere.random_tangent_vec(initial_point)
point = self._sphere.metric.geodesic(
initial_point, initial_tangent_vec=initial_tangent_vec
)(sampling_times)
return self.projection(point)
[docs]
def interpolate(self, point):
"""Interpolate between the sampling points of a discrete curve.
Parameters
----------
point : array-like, shape=[..., k_sampling_points, ambient_dim]
Discrete curve starting at the origin.
Returns
-------
spline : function
Cubic spline that interpolates between the sampling points
of the discrete curve.
"""
k_sampling_points = self.k_sampling_points
t_space = gs.linspace(0.0, 1.0, k_sampling_points)
point_with_origin = insert_zeros(point, axis=-self.point_ndim)
is_batch = check_is_batch(self.point_ndim, point)
def interpolating_curve(t):
if not is_batch:
return gs.from_numpy(
CubicSpline(t_space, point_with_origin, axis=-self.point_ndim)(t)
)
return gs.stack(
[
gs.from_numpy(
CubicSpline(t_space, point_with_origin_, axis=-self.point_ndim)(
t
)
)
for point_with_origin_ in point_with_origin
]
)
return interpolating_curve
[docs]
def length(self, point):
"""Compute the length of a discrete curve.
This is the integral of the absolute value of the velocity.
Parameters
----------
point : array-like, shape=[..., k_sampling_points, ambient_dim]
Discrete curve starting at the origin.
Returns
-------
length : array-like, shape=[..., ]
Length of the discrete curve.
"""
point_with_origin = self.insert_origin(point)
velocity = forward_difference(point_with_origin, axis=-self.point_ndim)
l2_metric = self.discrete_curves_with_l2.metric
return l2_metric.norm(velocity, point_with_origin[..., :-1, :])
[docs]
def normalize(self, point):
"""Rescale discrete curve to have unit length."""
return gs.einsum("...ij,...->...ij", point, 1 / self.length(point))
[docs]
class L2CurvesMetric(NFoldMetric):
"""L2 metric on the space of discrete curves.
L2 metric on the space of regularly sampled discrete curves
defined on the unit interval. The inner product between two tangent vectors
is given by the integral of the ambient space inner product, approximated by
a left Riemann sum.
"""
def __init__(self, space):
super().__init__(
space=space,
signature=(math.inf, 0, 0),
)
[docs]
@staticmethod
def riemann_sum(func):
r"""Compute the left Riemann sum approximation of the integral.
Compute the left Riemann sum approximation of the integral of a
function func defined on the unit interval, given by sample points
at regularly spaced times
.. math::
t_i = i / k, \\
i = 0, ..., k - 1
where :math:`k` is the number of landmarks (last time is missing).
Parameters
----------
func : array-like, shape=[..., k_landmarks]
Sample points of a function at regularly spaced times.
Returns
-------
riemann_sum : array-like, shape=[..., ]
Left Riemann sum.
"""
k_sampling_points_minus_one = func.shape[-1]
dt = 1 / k_sampling_points_minus_one
return dt * gs.sum(func, axis=-1)
[docs]
def inner_product(self, tangent_vec_a, tangent_vec_b, base_point=None):
"""Compute L2 inner product between two tangent vectors.
The inner product is the integral of the ambient space inner product,
approximated by a left Riemann sum.
Parameters
----------
tangent_vec_a : array-like, shape=[..., k_landmarks, ambient_dim]
Tangent vector to a curve, i.e. infinitesimal vector field
along a curve.
tangent_vec_b : array-like, shape=[..., k_landmarks, ambient_dim]
Tangent vector to a curve, i.e. infinitesimal vector field
along a curve.
base_point : array-like, shape=[..., k_landmarks, ambient_dim]
Discrete curve defined on the unit interval [0, 1].
Return
------
inner_prod : array_like, shape=[...]
L2 inner product between tangent_vec_a and tangent_vec_b.
"""
inner_products = self.pointwise_inner_product(
tangent_vec_a, tangent_vec_b, base_point
)
return self.riemann_sum(inner_products)
[docs]
class ElasticMetric(PullbackDiffeoMetric):
r"""Elastic metric on the space of discrete curves.
Family of elastic metric parametrized by bending and stretching parameters
a and b.
For curves in :math:`\mathbb{R}^2`, it pullbacks the L2 metric by
the F-transform (see [NK2018]_).
For curves in :math:`\mathbb{R}^d`, it pullbacks a scaled L2 metric by the SRV
transform (see [BCKKNP2022_). It only works for ratio :math:`a / 2b = 1`.
When a=1, and b=1/2, it is equivalent to the SRV metric.
Parameters
----------
a : float
Bending parameter.
b : float
Stretching parameter.
References
----------
.. [KN2018] S. Kurtek and T. Needham,
"Simplifying transforms for general elastic metrics on the space of
plane curves", arXiv:1803.10894 [math.DG], 29 Mar 2018.
.. [BCKKNP2022] Martin Bauer, Nicolas Charon, Eric Klassen, Sebastian Kurtek,
Tom Needham, and Thomas Pierron.
“Elastic Metrics on Spaces of Euclidean Curves: Theory and Algorithms.”
arXiv, September 20, 2022. https://doi.org/10.48550/arXiv.2209.09862.
"""
def __init__(
self,
space,
a,
b=None,
):
if b is None:
b = a / 2
self.a = a
self.b = b
ambient_manifold = space.ambient_manifold
k_sampling_points = space.k_sampling_points
image_space = Landmarks(
ambient_manifold=space.ambient_manifold,
k_landmarks=k_sampling_points - 1,
equip=False,
)
image_space.equip_with_metric(L2CurvesMetric)
if ambient_manifold.dim == 2:
diffeo = FTransform(space.ambient_manifold, k_sampling_points, a, b)
elif gs.abs(a / (2 * b) - 1.0) < gs.atol:
diffeo = SRVTransform(ambient_manifold, k_sampling_points)
if gs.abs(b - 0.5) > gs.atol:
scale = 4 * b**2
image_space.equip_with_metric(scale * image_space.metric)
else:
raise ValueError(
"Cannot instantiate elastic metric for ambient dim="
f"{ambient_manifold.dim}, a={a} and b={b}. "
"Ratio must be 1 or ambient_dim=2."
)
super().__init__(
space=space,
diffeo=diffeo,
image_space=image_space,
signature=(math.inf, 0, 0),
)
[docs]
class SRVMetric(PullbackDiffeoMetric):
"""Square Root Velocity metric on the space of discrete curves.
The SRV metric is equivalent to the elastic metric chosen with
- bending parameter a = 1,
- stretching parameter b = 1/2.
It can be obtained as the pullback of the L2 metric by the Square Root
Velocity Function.
See [Sea2011]_ for details.
References
----------
.. [Sea2011] A. Srivastava, E. Klassen, S. H. Joshi and I. H. Jermyn,
"Shape Analysis of Elastic Curves in Euclidean Spaces,"
in IEEE Transactions on Pattern Analysis and Machine Intelligence,
vol. 33, no. 7, pp. 1415-1428, July 2011.
"""
def __init__(self, space):
self.a = 1.0
self.b = 1 / 2
self._check_ambient_manifold(space.ambient_manifold)
image_space = self._instantiate_image_space(space)
diffeo = SRVTransform(space.ambient_manifold, space.k_sampling_points)
super().__init__(space=space, diffeo=diffeo, image_space=image_space)
def _check_ambient_manifold(self, ambient_manifold):
if not isinstance(ambient_manifold, Euclidean):
raise AssertionError(
"This metric is only "
"implemented for discrete curves embedded "
"in a Euclidean space."
)
def _instantiate_image_space(self, space):
image_space = Landmarks(
ambient_manifold=space.ambient_manifold,
k_landmarks=space.k_sampling_points - 1,
equip=False,
)
image_space.equip_with_metric(L2CurvesMetric)
return image_space
[docs]
class IterativeHorizontalGeodesicAligner(AlignerAlgorithm):
r"""Align two curves through iterative horizontal geodesic algorithm.
This algorithm computes the horizontal geodesic between two curves in the shape
bundle of curves modulo reparametrizations, and at the same time, aligns the
end curve with respect to the initial curve. This is done through an iterative
procedure where the initial curve stays fixed and the sampling points are moved
on the end curve to obtain its optimal parametrization with respect to the initial
curve. This procedure is based on the decomposition of any path of curves into a
horizontal path of curves composed with a path of reparametrizations:
:math:`c(t, s) = c_h(t, phi(t, s))` where :math:`d/dt c_h(t, .)` is horizontal.
Here t is the time parameter of the path and s the space parameter of the curves.
The algorithm sets current_end_curve to be the end curve and iterates three steps:
1) compute the geodesic between the initial curve and current_end_curve
2) compute the path of reparametrizations such that the path of its inverses
transforms this geodesic into a horizontal path of curves
3) invert this path of reparametrizations to find the horizontal path and update
current_end_curve to be its end point.
The algorithm stops when the new current_end_curve is sufficiently
close to the former current_end_curve.
Parameters
----------
total_space : Manifold
Total space with reparametrizations fiber bundle structure.
n_time_grid : int
Number of times in which compute the geodesic.
threshold : float
When the difference between the new end curve and the current end
curve becomes lower than this threshold, the algorithm stops.
Optional, default: 1e-3.
max_iter : int
Maximum number of iterations.
Optional, default: 20.
tol: float
Minimal spacing between time samples in the unit segment when
reparametrizing the end curve.
Optional, default: 1e-3.
verbose: boolean
Optional, default: False.
save_history : bool
If True, history is saved in a `self.history`.
References
----------
.. [LAB2017] A. Le Brigant, "Optimal matching between curves in a manifold",
in Geometric Science of Information. Springer Lecture Notes in Computer
Science 10589 (2017), 57 - 64. https://hal.science/hal-04374199.
"""
def __init__(
self,
total_space,
n_time_grid=100,
threshold=1e-3,
max_iter=20,
tol=1e-3,
verbose=0,
save_history=False,
):
super().__init__(total_space)
self.n_time_grid = n_time_grid
self.threshold = threshold
self.max_iter = max_iter
self.tol = tol
self.verbose = verbose
self.save_history = save_history
if save_history:
self.history = None
@staticmethod
def _euler_step_forward(current, increment, step, tol):
r"""Perform Euler step while enforcing increasing solution.
Compute the new state phi(t+dt,.) from the current state phi(t,.) using
the Euler step: new = current + step * increment, i.e.
:math:`phi(t+dt,.) = phi(t,.) + dt * d/dt phi(t,.)`
while ensuring that the result is an increasing function on the unit
interval that preserves the end points 0 and 1. This is done by alternatively
computing phi(t+dt,s) and phi(t+dt,1-s), for s increasing from 0 to 0.5,
in a way that ensures that the spacing with the previous and following
sampling points are greater than a given tolerance.
"""
k_sampling_points = current.shape[0]
max_index = k_sampling_points // 2
new = gs.copy(current)
sign = gs.sign(tol)
for index in range(1, max_index):
new_index = current[index] + step * increment[index - 1]
new_index_min = new[index - 1] + tol
new_index_max = (
new[k_sampling_points - index] - (k_sampling_points - 2 * index) * tol
)
new_index = sign * gs.maximum(sign * new_index, sign * new_index_min)
new[index] = sign * gs.minimum(sign * new_index, sign * new_index_max)
symindex = k_sampling_points - 1 - index
new_symindex = current[symindex] + step * increment[symindex - 1]
new_symindex_min = new[index] + (k_sampling_points - 2 * index - 1) * tol
new_symindex_max = new[symindex + 1] - tol
new_symindex = sign * gs.minimum(
sign * new_symindex, sign * new_symindex_max
)
new[symindex] = sign * gs.maximum(
sign * new_symindex, sign * new_symindex_min
)
if k_sampling_points % 2 > 0:
new_midindex = current[index + 1] + step * increment[index]
new_midindex_min = new[index] + tol
new_midindex_max = new[index + 2] - tol
new_midindex = sign * gs.maximum(
sign * new_midindex, sign * new_midindex_min
)
new[index + 1] = sign * gs.minimum(
sign * new_midindex, sign * new_midindex_max
)
return new
def _euler_step(self, current, increment, step):
r"""Perform Euler step while enforcing increasing solution.
Symmetric version of previous function.
"""
return (
self._euler_step_forward(current, increment, step, self.tol)
+ gs.flip(
self._euler_step_forward(
gs.flip(current, axis=0),
gs.flip(increment, axis=0),
step,
-self.tol,
),
axis=0,
)
) / 2
def _construct_reparametrization(self, vertical_norm, space_deriv_norm):
r"""Construct path of reparametrizations.
Construct path of reparametrizations phi(t, s) in the decomposition
of the path of curves
:math:`c(t, s) = c_h(t, phi(t, s))` where :math:`d/dt c_h(t, .)` is horizontal.
This is done by solving a partial differential equation, using an Euler
step that enforces that the solution phi(t,.) is an increasing function of
the unit interval that preserves the end points 0 and 1, for all time t.
Parameters
----------
vertical_norm : array-like, shape=[n_times - 1, k_sampling_points]
Pointwise norm of the vertical part of the time derivative of
the path of curves.
space_deriv_norm : array-like, shape=[n_times - 1, k_sampling_points]
Pointwise norm of the space derivative of the path of curves.
Returns
-------
repar : array-like, shape=[n_times, k_sampling_points]
Path of parametrizations, such that the path of curves composed
with the path of inverse parametrizations is a horizontal path.
"""
n_times = vertical_norm.shape[0] + 1
k_sampling_points = vertical_norm.shape[1]
quotient = vertical_norm / space_deriv_norm
repar = gs.linspace(0.0, 1.0, k_sampling_points)
repars = [repar]
for i in range(n_times - 1):
repar_diff = forward_difference(repar, axis=-1)
repar_space_deriv = gs.where(
vertical_norm[i, 1:-1] > 0,
repar_diff[1:],
repar_diff[:-1],
)
repar_time_deriv = repar_space_deriv * quotient[i, 1:-1]
repar = repar_i = self._euler_step(repar, repar_time_deriv, 1 / n_times)
repars.append(repar_i)
return gs.stack(repars)
def _invert_reparametrize_single(self, t_space, repar, point):
"""Invert path of reparametrizations, non vectorized."""
spline = CubicSpline(t_space, point, axis=0)
repar_inverse = PchipInterpolator(repar, t_space)
return gs.from_numpy(spline(repar_inverse(t_space)))
def _invert_reparametrization(
self,
t_space,
repar,
path_of_curves,
repar_inverse_end,
end_spline,
):
r"""Invert path of reparametrizations.
Given a path of curves c(t, s) and a path of reparametrizations
phi(t, s), compute:
:math:`c(t, phi_inv(t, s))` where `phi_inv(t, .) = phi(t, .)^{-1}`
The computation for the last time t=1 is done differently, using
the spline function associated to the end curve and the composition
of the inverse reparametrizations contained in rep_inverse_end:
:math:`end_spline ° phi_inv(1, .) ° ... ° phi_inv(0, .)`.
Parameters
----------
repar : array-like, shape=[n_times, k_sampling_points]
Path of reparametrizations.
path_of_curves : array-like, shape=[n_times, k_sampling_points, ambient_dim]
Path of curves.
repar_inverse_end: list
List of the inverses of the reparametrizations applied to
the end curve during the optimal matching algorithm.
end_spline : function
Spline interpolation of the end point of the path of curves.
Returns
-------
reparametrized_path : array-like, \
shape=[n_times, k_sampling_points, ambient_dim]
Path of curves composed with the inverse of the path of
reparametrizations.
"""
initial_curve = path_of_curves[0]
reparametrized_path = [initial_curve] + [
self._invert_reparametrize_single(
t_space,
repar_i,
point,
)
for repar_i, point in zip(repar[1:-1], path_of_curves[1:-1])
]
repar_inverse_end.append(PchipInterpolator(repar[-1], t_space))
arg = t_space
for repar_inverse in reversed(repar_inverse_end):
arg = repar_inverse(arg)
end_curve_repar = end_spline(arg)
reparametrized_path.append(end_curve_repar)
return gs.stack(reparametrized_path)
def _iterate(
self,
times,
t_space,
initial_point,
end_point,
repar_inverse_end,
end_spline,
):
"""Perform one step of the alignment algorithm."""
ndim = self._total_space.point_ndim
total_space_geod_fun = self._total_space.metric.geodesic(
initial_point=initial_point, end_point=end_point
)
geod_points = total_space_geod_fun(times)
geod_points_with_origin = insert_zeros(geod_points, axis=-ndim)
time_deriv = forward_difference(geod_points, axis=-(ndim + 1))
_, vertical_norm = self._total_space.fiber_bundle.vertical_projection(
time_deriv, geod_points[:-1], return_norm=True
)
vertical_norm = insert_zeros(vertical_norm, axis=-1)
space_deriv = centered_difference(
geod_points_with_origin, axis=-ndim, endpoints=True
)[:-1]
pointwise_space_deriv_norm = self._total_space.ambient_manifold.metric.norm(
space_deriv, geod_points_with_origin[:-1]
)
repar = self._construct_reparametrization(
vertical_norm,
pointwise_space_deriv_norm,
)
horizontal_path = self._invert_reparametrization(
t_space, repar, geod_points_with_origin, repar_inverse_end, end_spline
)
return horizontal_path
def _discrete_horizontal_geodesic_single(
self, initial_point, end_point, end_spline
):
"""Compute discrete horizontal geodesic, non vectorized.
Parameters
----------
initial_point : array-like, shape=[k_sampling_points, ambient_dim]
Initial discrete curve.
end_point : array-like, shape=[k_sampling_points, ambient_dim]
End discrete curve.
end_spline : function
Spline interpolation of end point.
Returns
-------
horizontal_geod_points : array, shape=[n_time_grid, k - 1, ambient_dim]
Geodesic points.
"""
times = gs.linspace(0.0, 1.0, self.n_time_grid)
k_sampling_points = self._total_space.k_sampling_points
t_space = gs.linspace(0.0, 1.0, k_sampling_points)
current_end_point = end_point
repar_inverse_end = []
for index in range(self.max_iter):
horizontal_path_with_origin = self._iterate(
times,
t_space,
initial_point,
current_end_point,
repar_inverse_end,
end_spline,
)
new_end_point = horizontal_path_with_origin[-1][1:]
l2_metric = self._total_space.discrete_curves_with_l2.metric
gap = l2_metric.dist(new_end_point, current_end_point)
current_end_point = new_end_point
if gap < self.threshold:
if self.verbose > 0:
logging.info(
f"Convergence of alignment reached after {index + 1} "
"iterations."
)
break
else:
logging.warning(
"Maximum number of iterations %d reached. The result may be inaccurate",
self.max_iter,
)
if self.save_history:
self.history = dict(
spline=end_spline,
repar_inverse=repar_inverse_end,
)
return horizontal_path_with_origin[..., 1:, :]
[docs]
def discrete_horizontal_geodesic(self, initial_point, end_point, end_spline):
"""Compute discrete horizontal geodesic.
Parameters
----------
initial_point : array-like, shape=[..., k_sampling_points, ambient_dim]
Initial discrete curve.
end_point : array-like, shape=[..., k_sampling_points, ambient_dim]
End discrete curve.
end_spline : callable or list[callable]
Spline interpolation of end point.
Returns
-------
geod_points : array, shape=[..., n_time_grid, k - 1, ambient_dim]
"""
is_batch = check_is_batch(
self._total_space.point_ndim,
initial_point,
end_point,
)
if not is_batch:
return self._discrete_horizontal_geodesic_single(
initial_point, end_point, end_spline
)
if initial_point.ndim != end_point.ndim:
initial_point, end_point = gs.broadcast_arrays(initial_point, end_point)
return gs.stack(
[
self._discrete_horizontal_geodesic_single(
initial_point_, end_point_, end_spline_
)
for initial_point_, end_point_, end_spline_ in zip(
initial_point, end_point, end_spline
)
]
)
[docs]
def align(self, point, base_point):
"""Align point to base point.
Parameters
----------
point : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Discrete curve to align.
base_point : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Reference discrete curve.
Returns
-------
aligned : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Curve reparametrized in an optimal way with respect to reference curve.
"""
if point.ndim == self._total_space.point_ndim:
spline = self._total_space.interpolate(point)
if base_point.ndim > self._total_space.point_ndim:
spline = [spline] * base_point.shape[0]
else:
spline = [self._total_space.interpolate(point_) for point_ in point]
return self.discrete_horizontal_geodesic(base_point, point, spline)[
..., -1, :, :
]
[docs]
class DynamicProgrammingAligner(AlignerAlgorithm):
r"""Align two curves through dynamic programming.
Find the reparametrization gamma of end_curve that minimizes the distance
between initial_curve and end_curve reparametrized by gamma, and output
the corresponding distance, using a dynamic programming algorithm.
The objective can be expressed in terms of square root velocity (SRV)
representations: it is equivalent to finding the gamma that maximizes
the L2 scalar product between :math:`initial_{srv}` and :math:`end_{srv}@\gamma`,
where :math:`initial_{srv}` is the SRV representation of the initial curve
and :math:`end_{srv}@\gamma` is the SRV representation of the end curve
reparametrized by :math:`\gamma`, i.e
.. math::
end_{srv}@\gamma(t) = end_{srv}(\gamma(t))\cdot|\gamma(t)|^\frac{1}{2}
The dynamic programming algorithm assumes that for every subinterval
:math:`\left[\frac{i}{n},\frac{i+1}{n}\right]` of :math:`\left[0,1\right]`,
gamma is linear.
Parameters
----------
total_space : Manifold
Total space with reparametrizations fiber bundle structure.
n_space_grid : int
Number of sampling points of the unit interval.
max_slope : int
Maximum slope allowed for a reparametrization.
References
----------
[WAJ2007] M. Washington, S. Anuj & H. Joshi,
"On Shape of Plane Elastic Curves", in International Journal of Computer
Vision. 73(3):307-324, 2007.
"""
def __init__(self, total_space, n_space_grid=100, max_slope=6):
k_sampling_points = total_space.k_sampling_points
super().__init__(total_space)
self.n_space_grid = n_space_grid
self.max_slope = max_slope
self._srv_transform = SRVTransform(
self._total_space.ambient_manifold,
k_sampling_points,
)
def _resample_srv_function(self, srv_function):
"""Resample SRV function of a discrete curve.
Parameters
----------
srv_function : array, shape=[..., k_sampling_points - 1, ambient_dim]
Discrete curve.
Returns
-------
srv : array, shape=[..., n_space_grid - 1, ambient_dim]
SRV function of the curve at the right size.
Notes
-----
If `n_space_grid` < `k_sampling_points`, it subsamples the curve.
"""
n_space_grid = self.n_space_grid
index = gs.array(range(n_space_grid - 1))
ratio = (self._total_space.k_sampling_points - 1) / (n_space_grid - 1)
indices = gs.cast(gs.floor(index * ratio), dtype=int)
return srv_function[..., indices, :]
@staticmethod
def _compute_restricted_integral(
srv_1, srv_2, x_min_index, x_max_index, y_min_index, y_max_index
):
r"""Compute the value of an integral over a subinterval.
Compute n * the value of the integral of
.. math::
q_1(t)\cdot q_2(\gamma(t))\cdot|\gamma(t)|^\frac{1}{2}
over :math:`\left[\x_min,x_max\right]` where gamma restricted to
:math:`\left[\x_min,x_max\right]` is a linear.
Parameters
----------
srv_1 : array, shape=[n, ambient_dim]
SRV function of the initial curve.
srv_2 : array, shape=[n, ambient_dim]
SRV function of the end curve.
x_min_index : int
Beginning of the subinterval.
x_max_index : int
End of the subinterval.
y_min_index : int
Value of gamma at x_min.
y_max_index : int
Value of gamma at x_max.
Returns
-------
value : float
Value of the integral described above.
"""
delta_y_index = y_max_index - y_min_index
delta_x_index = x_max_index - x_min_index
gamma_slope = delta_y_index / delta_x_index
x_bound_indices = list(range(x_min_index, x_max_index + 1))
y_bounds_indices = [
(y_index - y_min_index) / gamma_slope + x_min_index
for y_index in range(y_min_index, y_max_index + 1)
]
lower_bound = x_min_index
x_index, y_index = 0, 0
value = 0.0
while x_index < delta_x_index and y_index < delta_y_index:
upper_bound = min(
x_bound_indices[x_index + 1], y_bounds_indices[y_index + 1]
)
# NB: normalization is done outside
length = upper_bound - lower_bound
value += length * gs.dot(
srv_1[x_min_index + x_index], srv_2[y_min_index + y_index]
)
if (
gs.abs(x_bound_indices[x_index + 1] - y_bounds_indices[y_index + 1])
< gs.atol
):
x_index += 1
y_index += 1
elif x_bound_indices[x_index + 1] < y_bounds_indices[y_index + 1]:
x_index += 1
else:
y_index += 1
lower_bound = upper_bound
return gamma_slope**0.5 * value
def _reparametrize(self, point_with_origin, gamma):
"""Reparametrize curve by gamma.
Parameters
----------
point_with_origin : array, shape=[k_sampling_points, ambient_dim]
Discrete curve.
gamma : array, shape=[n_subinterval]
Parametrization of a curve.
Returns
-------
new_point : array , shape=[k_sampling_points - 1, ambient_dim]
Curve reparametrized by gamma.
"""
n_space_grid = self.n_space_grid
k_sampling_points = self._total_space.k_sampling_points
x_uniform = gs.linspace(0.0, 1.0, k_sampling_points)
point_interpolator = UniformUnitIntervalLinearInterpolator(
point_with_origin, point_ndim=1
)
x_space = gs.array([gamma_elem[1] for gamma_elem in gamma]) / (n_space_grid - 1)
y_space = gs.array([gamma_elem[0] for gamma_elem in gamma]) / (n_space_grid - 1)
repar_inverse = LinearInterpolator1D(y_space, x_space, point_ndim=0)
return point_interpolator(repar_inverse(x_uniform[1:]))
def _compute_squared_dist(self, initial_srv, end_srv, grid_last):
"""Compute squared distance using algorithmic information."""
n_space_grid = self.n_space_grid
norm_squared_initial_srv = self._compute_restricted_integral(
initial_srv, initial_srv, 0, n_space_grid - 1, 0, n_space_grid - 1
) / (n_space_grid - 1)
norm_squared_end_srv = self._compute_restricted_integral(
end_srv, end_srv, 0, n_space_grid - 1, 0, n_space_grid - 1
) / (n_space_grid - 1)
maximum_scalar_product = grid_last / (n_space_grid - 1)
return (
norm_squared_initial_srv + norm_squared_end_srv - 2 * maximum_scalar_product
)
def _align_single(self, point, base_point, return_sdist=False):
r"""Align point to base point, non vectorized.
Parameters
----------
point : array-like, shape=[k_sampling_points - 1, ambient_dim]
Discrete curve to align.
base_point : array-like, shape=[k_sampling_points - 1, ambient_dim]
Reference discrete curve.
return_sdist : bool
If True, also returns squared distance.
Returns
-------
aligned : array, shape=[k_sampling_points - 1, ambient_dim]
Curve reparametrized in an optimal way with respect to reference curve.
squared_dist : float
Quotient distance between point and base point.
If return_sdist is True.
"""
max_slope = self.max_slope
n_space_grid = self.n_space_grid
initial_srv = self._srv_transform(base_point)
end_srv = self._srv_transform(point)
resampled_initial_srv = self._resample_srv_function(initial_srv)
resampled_end_srv = self._resample_srv_function(end_srv)
grid = {(0, 0): 0.0}
gamma = {(0, 0): [(0, 0)]}
for x_max_index in range(1, n_space_grid):
max_y_max_index = max(n_space_grid, x_max_index + 1)
for y_max_index in range(1, max_y_max_index):
# NB: solves minimization part
min_x_min_index = max(0, x_max_index - max_slope)
min_y_min_index = max(0, y_max_index - max_slope)
for x_min_index in range(min_x_min_index, x_max_index):
for y_min_index in range(min_y_min_index, y_max_index):
if (x_min_index, y_min_index) not in grid:
# jump borders
continue
elif (
gs.abs(
(y_max_index - y_min_index)
/ (x_max_index - x_min_index)
- 1
)
< gs.atol
and x_max_index - x_min_index != 1
and y_max_index - y_max_index != 1
):
# jump slope == 1 except last
continue
new_value = grid[
x_min_index, y_min_index
] + self._compute_restricted_integral(
resampled_initial_srv,
resampled_end_srv,
x_min_index,
x_max_index,
y_min_index,
y_max_index,
)
if (x_max_index, y_max_index) not in grid or grid[
(x_max_index, y_max_index)
] < new_value:
grid[x_max_index, y_max_index] = new_value
new_gamma = gamma[(x_min_index, y_min_index)].copy()
new_gamma.append((x_max_index, y_max_index))
gamma[(x_max_index, y_max_index)] = new_gamma
last_index = n_space_grid - 1
point_with_origin = insert_zeros(point, axis=-2)
point_reparametrized = self._reparametrize(
point_with_origin, gamma[last_index, last_index]
)
if not return_sdist:
return point_reparametrized
return point_reparametrized, self._compute_squared_dist(
resampled_initial_srv,
resampled_end_srv,
grid[(last_index, last_index)],
)
[docs]
def align(self, point, base_point, return_sdist=False):
"""Align point to base point.
Parameters
----------
point : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Discrete curve to align.
base_point : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Reference discrete curve.
return_sdist : bool
If True, also returns squared distance.
Returns
-------
aligned : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Curve reparametrized in an optimal way with respect to reference curve.
squared_dist : array, shape=[...,]
Quotient distance between point and base point.
If return_sdist is True.
"""
is_batch = check_is_batch(
self._total_space.point_ndim,
point,
base_point,
)
if not is_batch:
return self._align_single(point, base_point, return_sdist=return_sdist)
if point.ndim != base_point.ndim:
point, base_point = gs.broadcast_arrays(point, base_point)
out = [
self._align_single(point_, base_point_, return_sdist=return_sdist)
for point_, base_point_ in zip(point, base_point)
]
if not return_sdist:
return gs.stack(out)
aligned = gs.stack([out_[0] for out_ in out])
sdists = gs.stack([out_[1] for out_ in out])
return aligned, sdists
[docs]
class ReparametrizationBundle(FiberBundle):
"""Principal bundle of curves modulo reparametrizations with an elastic metric.
The space of parameterized curves is the total space of a principal bundle
where the group action is given by reparametrization and the base space is
the shape space of curves modulo reparametrization, i.e.unparametrized
curves. In the discrete case, reparametrization corresponds to resampling.
Each tangent vector to the space of parameterized curves can be split into a
vertical part (tangent to the fibers of the principal bundle) and a
horizontal part (orthogonal to the vertical part with respect to the SRV
metric). The geodesic between the shapes of two curves is computed by
aligning (i.e. reparametrizing) one of the two curves with respect to the
other, and computing the geodesic between the aligned curves. This geodesic
will be horizontal, and will project to a geodesic on the shape space.
Two different aligners are available:
- IterativeHorizontalGeodesicAligner (default)
- DynamicProgrammingAligner.
Parameters
----------
total_space : DiscreteCurvesStartingAtOrigin
Space of discrete curves starting at the origin
"""
def __init__(self, total_space, aligner=None):
if aligner is None:
aligner = IterativeHorizontalGeodesicAligner(total_space)
super().__init__(
total_space=total_space,
aligner=aligner,
)
[docs]
def vertical_projection(self, tangent_vec, base_point, return_norm=False):
"""Compute vertical part of tangent vector at base point.
Parameters
----------
tangent_vec : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Tangent vector to decompose into horizontal and vertical parts.
base_point : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Discrete curve, base point of tangent_vec in the manifold of curves.
return_norm : boolean,
If True, the method returns the pointwise norm of the vertical
part of tangent_vec.
Returns
-------
tangent_vec_ver : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Vertical part of tangent_vec.
vertical_norm: array-like, shape=[..., n_points]
Pointwise norm of the vertical part of tangent_vec.
Only returned when return_norm is True.
"""
ambient_manifold = self._total_space.ambient_manifold
a_param, b_param = self._total_space.metric.a, self._total_space.metric.b
squotient = (a_param / b_param) ** 2
ndim = self._total_space.point_ndim
tangent_vec_with_zeros = insert_zeros(tangent_vec, axis=-ndim)
base_point_with_origin = insert_zeros(base_point, axis=-ndim)
position = base_point_with_origin[..., 1:-1, :]
delta = 1.0
d_pos = centered_difference(base_point_with_origin, delta=delta, axis=-ndim)
d_vec = centered_difference(tangent_vec_with_zeros, delta=delta, axis=-ndim)
d2_pos = second_centered_difference(
base_point_with_origin, delta=delta, axis=-ndim
)
d2_vec = second_centered_difference(
tangent_vec_with_zeros, delta=delta, axis=-ndim
)
pointwise_snorm = ambient_manifold.metric.squared_norm(
d_pos,
position,
)
pointwise_norm = gs.sqrt(pointwise_snorm)
pointwise_snorm2 = ambient_manifold.metric.squared_norm(
d2_pos,
position,
)
pointwise_inner_product_pos = ambient_manifold.metric.inner_product(
d2_pos, d_pos, position
)
vec_a = pointwise_snorm - 1 / 2 * pointwise_inner_product_pos
vec_b = -2 * pointwise_snorm - squotient * (
pointwise_snorm2 - pointwise_inner_product_pos**2 / pointwise_snorm
)
vec_c = pointwise_snorm + 1 / 2 * pointwise_inner_product_pos
vec_d = pointwise_norm * (
ambient_manifold.metric.inner_product(d2_vec, d_pos, position)
- (squotient - 1)
* ambient_manifold.metric.inner_product(d_vec, d2_pos, position)
+ (squotient - 2)
* ambient_manifold.metric.inner_product(d2_pos, d_pos, position)
* ambient_manifold.metric.inner_product(d_vec, d_pos, position)
/ pointwise_snorm
)
linear_system = (
from_vector_to_diagonal_matrix(vec_a[..., :-1], 1)
+ from_vector_to_diagonal_matrix(vec_b, 0)
+ from_vector_to_diagonal_matrix(vec_c[..., 1:], -1)
)
if linear_system.ndim == 2 and tangent_vec.ndim > 2:
linear_system = gs.broadcast_to(
linear_system, vec_d.shape[:-1] + linear_system.shape
)
vertical_norm = gs.linalg.solve(linear_system, vec_d)
unit_speed = gs.einsum(
"...ij,...i->...ij",
d_pos,
1 / pointwise_norm,
)
tangent_vec_ver = gs.einsum("...ij,...i->...ij", unit_speed, vertical_norm)
tangent_vec_ver = insert_zeros(tangent_vec_ver, axis=-ndim, end=True)
if return_norm:
vertical_norm = insert_zeros(vertical_norm, axis=-1, end=True)
return tangent_vec_ver, vertical_norm
return tangent_vec_ver
[docs]
class RotationBundle(FiberBundle):
"""Principal bundle of curves modulo rotations with an elastic metric.
This is the fiber bundle where the total space is the space of parameterized
curves equipped with an elastic metric, the action is given by rotations, and
the base space is the shape space of curves modulo rotations.
Parameters
----------
total_space : DiscreteCurvesStartingAtOrigin
Space of discrete curves starting at the origin
"""
def _rotate(self, point, rotation):
"""Rotate discrete curve starting at origin."""
return Matrices.transpose(gs.matmul(rotation, Matrices.transpose(point)))
[docs]
def align(self, point, base_point, return_rotation=False):
"""Align point to base point.
Find optimal rotation of curve with respect to base curve.
Parameters
----------
point : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Discrete curve to align.
base_point : array-like, shape=[..., k_sampling_points - 1, ambient_dim]
Reference discrete curve.
return_rotation : boolean
If true, returns the optimal rotation used for the alignment.
Optional, default : False.
Returns
-------
aligned : array-like, shape=[..., k_sampling_points - 1, ambient_dim
Curve optimally rotated with respect to reference curve.
"""
transform = self._total_space.metric.diffeo
initial_srv = transform(base_point)
end_srv = transform(point)
mat = gs.matmul(Matrices.transpose(initial_srv), end_srv)
u_svd, _, vt_svd = gs.linalg.svd(mat)
sign = gs.linalg.det(gs.matmul(u_svd, vt_svd))
vt_svd[..., -1, :] = gs.einsum("...,...j->...j", sign, vt_svd[..., -1, :])
rotation = gs.matmul(u_svd, vt_svd)
point_aligned = self._rotate(point, rotation)
if return_rotation:
return point_aligned, rotation
return point_aligned
register_quotient(
Space=DiscreteCurvesStartingAtOrigin,
Metric=SRVMetric,
GroupAction="rotations",
FiberBundle=RotationBundle,
QuotientMetric=QuotientMetric,
)
register_quotient(
Space=DiscreteCurvesStartingAtOrigin,
Metric=SRVMetric,
GroupAction="reparametrizations",
FiberBundle=ReparametrizationBundle,
QuotientMetric=QuotientMetric,
)