Source code for geomstats.learning.geodesic_regression

r"""Geodesic Regression.

Lead author: Nicolas Guigui.

The generative model of the data is:
:math:`Z = Exp_{\beta_0}(\beta_1.X)` and :math:`Y = Exp_Z(\epsilon)`
where:

- :math:`Exp` denotes the Riemannian exponential,
- :math:`\beta_0` is called the intercept,
  and is a point on the manifold,
- :math:`\beta_1` is called the coefficient,
  and is a tangent vector to the manifold at :math:`\beta_0`,
- :math:`\epsilon \sim N(0, 1)` is a standard Gaussian noise,
- :math:`X` is the input, :math:`Y` is the target.

The geodesic regression method:

- estimates :math:`\beta_0, \beta_1`,
- predicts :math:`\hat{y}` from input :math:`X`.
"""

import logging
import math

from scipy.optimize import OptimizeResult
from sklearn.base import BaseEstimator

import geomstats.backend as gs
import geomstats.errors as error
from geomstats.learning.frechet_mean import FrechetMean
from geomstats.numerics.optimization import ScipyMinimize


[docs] class RiemannianGradientDescent: """Riemannian gradient descent.""" def __init__(self, max_iter=100, init_step_size=0.1, tol=1e-5, verbose=False): self.max_iter = max_iter self.init_step_size = init_step_size self.verbose = verbose self.tol = tol self.jac = "autodiff" def _handle_jac(self, fun, point_ndim): if self.jac == "autodiff": def fun_(x): value, grad = gs.autodiff.value_and_grad(fun, point_ndims=point_ndim)(x) return value, grad else: raise NotImplementedError("For now only working with autodiff.") return fun_ def _get_vector_transport(self, space): if hasattr(space.metric, "parallel_transport"): def vector_transport(tan_a, tan_b, base_point, _): return space.metric.parallel_transport(tan_a, base_point, tan_b) else: def vector_transport(tan_a, _, __, point): return space.to_tangent(tan_a, point) return vector_transport
[docs] def minimize(self, space, fun, x0): """Perform gradient descent.""" fun = self._handle_jac(fun, point_ndim=space.point_ndim) vector_transport = self._get_vector_transport(space) lr = self.init_step_size intercept_init, coef_init = gs.split(x0, 2) intercept_init = gs.reshape(intercept_init, space.shape) coef_init = gs.reshape(coef_init, space.shape) intercept_hat = intercept_hat_new = space.projection(intercept_init) coef_hat = coef_hat_new = space.to_tangent(coef_init, intercept_hat) param = gs.vstack([gs.flatten(intercept_hat), gs.flatten(coef_hat)]) current_loss = math.inf current_grad = gs.zeros_like(param) current_iter = i = 0 for i in range(self.max_iter): loss, grad = fun(param) if gs.any(gs.isnan(grad)): logging.warning(f"NaN encountered in gradient at iter {current_iter}") lr /= 2 grad = current_grad elif loss >= current_loss and i > 0: lr /= 2 else: if not current_iter % 5: lr *= 2 coef_hat = coef_hat_new intercept_hat = intercept_hat_new current_iter += 1 if abs(loss - current_loss) < self.tol: if self.verbose: logging.info(f"Tolerance threshold reached at iter {current_iter}") break grad_intercept, grad_coef = gs.split(grad, 2) riem_grad_intercept = space.to_tangent( gs.reshape(grad_intercept, space.shape), intercept_hat ) riem_grad_coef = space.to_tangent( gs.reshape(grad_coef, space.shape), intercept_hat ) intercept_hat_new = space.metric.exp( -lr * riem_grad_intercept, intercept_hat ) coef_hat_new = vector_transport( coef_hat - lr * riem_grad_coef, -lr * riem_grad_intercept, intercept_hat, intercept_hat_new, ) param = gs.vstack([gs.flatten(intercept_hat_new), gs.flatten(coef_hat_new)]) current_loss = loss current_grad = grad if self.verbose: logging.info( f"Number of gradient evaluations: {i}, " f"Number of gradient iterations: {current_iter}" f" loss at termination: {current_loss}" ) return OptimizeResult(fun=loss, x=param, nit=current_iter)
[docs] class GeodesicRegression(BaseEstimator): r"""Geodesic Regression. The generative model of the data is: :math:`Z = Exp_{\beta_0}(\beta_1.X)` and :math:`Y = Exp_Z(\epsilon)` where: - :math:`Exp` denotes the Riemannian exponential, - :math:`\beta_0` is called the intercept, and is a point on the manifold, - :math:`\beta_1` is called the coefficient, and is a tangent vector to the manifold at :math:`\beta_0`, - :math:`\epsilon \sim N(0, 1)` is a standard Gaussian noise, - :math:`X` is the input, :math:`Y` is the target. The geodesic regression method: - estimates :math:`\beta_0, \beta_1`, - predicts :math:`\hat{y}` from input :math:`X`. Parameters ---------- space : Manifold Equipped manifold. center_X : bool Subtract mean to X as a preprocessing. method : str, {\'extrinsic\', \'riemannian\'} Gradient descent method. Optional, default: extrinsic. initialization : str or array-like, {'random', 'data', 'frechet', warm_start'} Initial values of the parameters for the optimization, or initialization method. Optional, default: 'random' regularization : float Weight on the constraint for the intercept to lie on the manifold in the extrinsic optimization scheme. An L^2 constraint is applied. Optional, default: 1. compute_training_score : bool Whether to compute R^2. Optional, default: False. Notes ----- * Required metric methods: * all: `exp`, `squared_dist` * if `riemannian`: `parallel transport` or `to_tangent` """ def __init__( self, space, center_X=True, method="extrinsic", initialization="random", regularization=1.0, compute_training_score=False, ): self.space = space self.center_X = center_X self._method = None self.method = method self.initialization = initialization self.regularization = regularization self.compute_training_score = compute_training_score self.intercept_ = None self.coef_ = None self.mean_ = None self.training_score_ = None self.mean_estimator = FrechetMean(self.space)
[docs] def set(self, **kwargs): """Set optimizer parameters. Especially useful for one line instantiations. """ for param_name, value in kwargs.items(): if not hasattr(self.optimizer, param_name): raise ValueError(f"Unknown parameter {param_name}.") setattr(self.optimizer, param_name, value) return self
@property def method(self): """Gradient descent method.""" return self._method @method.setter def method(self, value): """Gradient descent method.""" error.check_parameter_accepted_values( value, "method", ["extrinsic", "riemannian"] ) if value == self._method: return self._method = value tol = 1e-5 max_iter = 100 if value == "extrinsic": optimizer = ScipyMinimize( method="CG", options={"disp": False, "maxiter": max_iter}, tol=tol, ) else: optimizer = RiemannianGradientDescent( max_iter=max_iter, init_step_size=0.1, tol=tol, verbose=False, ) self.optimizer = optimizer def _model(self, X, coef, intercept): """Compute the generative model of the geodesic regression. Parameters ---------- X : array-like, shape=[n_samples,] Training input samples. coef : array-like, shape=[{dim, [n,n]}] Coefficient of the geodesic regression. intercept : array-like, shape=[{dim, [n,n]}] Intercept of the geodesic regression. Returns ------- _ : array-like, shape=[..., {dim, [n,n]}] Value on the manifold output by the generative model. """ return self.space.metric.exp(gs.einsum("n,...->n...", X, coef), intercept) def _loss(self, X, y, param, weights=None): """Compute the loss associated to the geodesic regression. Parameters ---------- X : {array-like, sparse matrix}, shape=[...,}] Training input samples. y : array-like, shape=[..., {dim, [n,n]}] Training target values. param : array-like, shape=[2, {dim, [n,n]}] Parameters intercept and coef of the geodesic regression, vertically stacked. weights : array-like, shape=[...,] Weights associated to the points. Optional, default: None. Returns ------- _ : float Loss. """ intercept, coef = gs.split(param, 2) intercept = gs.reshape(intercept, self.space.shape) coef = gs.reshape(coef, self.space.shape) if self.method == "extrinsic": base_point = self.space.projection(intercept) penalty = self.regularization * gs.sum((base_point - intercept) ** 2) else: base_point = intercept penalty = 0.0 tangent_vec = self.space.to_tangent(coef, base_point) distances = self.space.metric.squared_dist( self._model(X, tangent_vec, base_point), y ) if weights is None: weights = 1.0 return 1.0 / 2.0 * gs.sum(weights * distances) + penalty def _initialize_parameters(self, y): """Set initial values for the parameters of the model. Set initial parameters for the optimization, depending on the value of the attribute `initialization`. The options are: - `random` : pick random numbers from a normal distribution, then project them to the manifold and the tangent space. - `frechet` : compute the Frechet mean of the target points - `data` : pick a random sample from the target points and a tangent vector with random coefficients. - `warm_start`: pick previous values of the parameters if the model was fitted before, otherwise behaves as `random`. Parameters ---------- y: array-like, shape=[n_samples, {dim, [n,n]}] The target data, used for the option `data` and 'frechet'. Returns ------- intercept : array-like, shape=[{dim, [n,n]}] Initial value for the intercept. coef : array-like, shape=[{dim, [n,n]}] Initial value for the coefficient. """ init = self.initialization shape = self.space.shape if isinstance(init, str): if init == "random": return gs.random.normal(size=(2,) + shape) if init == "frechet": mean = self.mean_estimator.fit(y).estimate_ return mean, gs.zeros(shape) if init == "data": return gs.random.choice(y, 1)[0], gs.random.normal(size=shape) if init == "warm_start": if self.intercept_ is not None: return self.intercept_, self.coef_ return gs.random.normal(size=(2,) + shape) raise ValueError( "The initialization string must be one of " "random, frechet, data or warm_start" ) return init
[docs] def fit(self, X, y, weights=None): """Estimate the parameters of the geodesic regression. Estimate the intercept and the coefficient defining the geodesic regression model. Parameters ---------- X : array-like, shape=[n_samples,] Training input samples. y : array-like, shape[n_samples, {dim, [n,n]}] Training target values. weights : array-like, shape=[n_samples] Weights associated to the points. Optional, default: None. Returns ------- self : object Returns self. """ times = gs.copy(X) if self.center_X: self.mean_ = gs.mean(X) times -= self.mean_ if self.method == "extrinsic": res = self._fit_extrinsic(times, y, weights) if self.method == "riemannian": res = self._fit_riemannian(times, y, weights) intercept_hat, coef_hat = gs.split(res.x, 2) intercept_hat = gs.reshape(intercept_hat, self.space.shape) coef_hat = gs.reshape(coef_hat, self.space.shape) self.intercept_ = self.space.projection(intercept_hat) self.coef_ = self.space.to_tangent(coef_hat, self.intercept_) if self.compute_training_score: variance = gs.sum(self.space.metric.squared_dist(y, self.intercept_)) self.training_score_ = 1 - 2 * res.fun / variance return self
def _fit_extrinsic(self, X, y, weights=None): """Estimate the parameters using the extrinsic gradient descent. Estimate the intercept and the coefficient defining the geodesic regression model, using the extrinsic gradient. Parameters ---------- X : array-like, shape=[n_samples,] Training input samples. y : array-like, shape=[n_samples, {dim, [n,n]}] Training target values. weights : array-like, shape=[n_samples,] Weights associated to the points. Optional, default: None. Returns ------- res : OptimizeResult Scipy's optimize result. """ intercept_init, coef_init = self._initialize_parameters(y) intercept_hat = self.space.projection(intercept_init) coef_hat = self.space.to_tangent(coef_init, intercept_hat) initial_guess = gs.hstack([gs.flatten(intercept_hat), gs.flatten(coef_hat)]) objective_with_grad = lambda param: self._loss(X, y, param, weights) return self.optimizer.minimize( objective_with_grad, initial_guess, ) def _fit_riemannian(self, X, y, weights=None): """Estimate the parameters using a Riemannian gradient descent. Estimate the intercept and the coefficient defining the geodesic regression model, using the Riemannian gradient. Parameters ---------- X : array-like, shape=[n_samples,] Training input samples. y : array-like, shape=[n_samples, {dim, [n,n]}] Training target values. weights : array-like, shape=[n_samples,] Weights associated to the points. Optional, default: None. Returns ------- res : OptimizeResult Scipy's optimize result. """ objective_with_grad = lambda params: self._loss(X, y, params, weights) intercept_init, coef_init = self._initialize_parameters(y) x0 = gs.vstack([gs.flatten(intercept_init), gs.flatten(coef_init)]) return self.optimizer.minimize(self.space, objective_with_grad, x0)
[docs] def predict(self, X): """Predict the manifold value for each input. Parameters ---------- X : array-like, shape=[n_samples,] Input data. Returns ------- y : array-like, shape=[n_samples, {dim, [n,n]}] Training target values. """ if self.coef_ is None: raise RuntimeError("Fit method must be called before predict.") times = gs.copy(X) if self.center_X: times = times - self.mean_ return self._model(times, self.coef_, self.intercept_)
[docs] def score(self, X, y, weights=None): """Compute training score. Compute the training score defined as R^2. Parameters ---------- X : array-like, shape=[n_samples,] Training input samples. y : array-like, shape=[n_samples, {dim, [n,n]}] Training target values. weights : array-like, shape=[n_samples,] Weights associated to the points. Optional, default: None. Returns ------- score : float Training score. """ y_pred = self.predict(X) if weights is None: weights = 1.0 mean = self.mean_estimator.fit(y).estimate_ numerator = gs.sum(weights * self.space.metric.squared_dist(y, y_pred)) denominator = gs.sum(weights * self.space.metric.squared_dist(y, mean)) return 1 - numerator / denominator if denominator != 0 else 0.0