Source code for leaspy.models.mixture

import math
import warnings
from abc import abstractmethod
from typing import Iterable, Optional, Dict

import numpy as np
import pandas as pd
import torch

from leaspy.exceptions import LeaspyInputError, LeaspyModelInputError, LeaspyIndividualParamsInputError
from leaspy.io.data.dataset import Dataset
from leaspy.models.base import InitializationMethod

from leaspy.models.obs_models import (
    FullGaussianObservationModel,
    observation_model_factory,
)
from leaspy.utils.docs import doc_with_super
from leaspy.utils.functional import Exp, MatMul, OrthoBasis, Sqr
from leaspy.utils.typing import DictParams, KwargsType

from leaspy.utils.weighted_tensor import (
    TensorOrWeightedTensor,
    WeightedTensor,
    unsqueeze_right,
)
from leaspy.variables.distributions import MixtureNormal, Normal
from leaspy.variables.specs import (
    Hyperparameter,
    IndividualLatentVariable,
    LinkedVariable,
    ModelParameter,
    NamedVariables,
    PopulationLatentVariable,
    SuffStatsRW,
    VariablesLazyValuesRO,
)

from leaspy.variables.specs import (
    LVL_FT,
)
from leaspy.variables.state import State
from .mcmc_saem_compatible import McmcSaemCompatibleModel


[docs] @doc_with_super() class TimeReparametrizedMixtureModel(McmcSaemCompatibleModel): """ A time-reparametrized model tailored to handle mixture models with multiple clusters. This class extends `TimeReparametrizedModel` to incorporate mixture-specific behaviors, including support for multiple clusters (`n_clusters`) and corresponding vectorized parameters. Parameters ---------- name : :obj:`str` Name of the model. source_dimension : :obj:`int`, optional Number of sources. Dimension of spatial components (default is None). **kwargs: :obj:`dict` Additional hyperparameters for the model. Must include: - 'n_clusters': int Number of mixture components (must be ≥ 2). - 'dimension' or 'features': int or list Dimensionality of the input data. - 'obs_models': str, list, or dict (optional) Specification of the observation model(s). Defaults to "gaussian-diagonal". Raises ------ :exc:`.LeaspyModelInputError` If inconsistent hyperparameters. """ _xi_mean = 0 _xi_std = 0.5 _tau_std = 5.0 _noise_std = 0.1 _sources_mean = 0 # Override: in mixture models, tau/xi/sources means define cluster centers (population-level) _individual_prior_params = ("tau_std", "xi_std", "sources_std", "tau_mean", "xi_mean") _noise_params = ("noise_std",) # Extend base _param_axes with mixture-specific cluster-indexed parameters _param_axes = { **McmcSaemCompatibleModel._param_axes, "tau_mean": ("cluster",), "tau_std": ("cluster",), "xi_mean": ("cluster",), "xi_std": ("cluster",), "sources_mean": ("source", "cluster"), "sources_std": ("source", "cluster"), "probs": ("cluster",), } _sources_std = 1.0 @property def xi_mean(self) -> torch.Tensor: """Return the mean of xi as a tensor.""" return torch.tensor([2 if i % 2 == 0 else -2 for i in range(self.n_clusters)]) @property def xi_std(self) -> torch.Tensor: """Return the standard deviation of xi as a tensor.""" return torch.tensor([self._xi_std] * self.n_clusters) @property def tau_std(self) -> torch.Tensor: """Return the standard deviation of tau as a tensor.""" return torch.tensor([self._tau_std] * self.n_clusters) @property def noise_std(self) -> torch.Tensor: """Return the standard deviation of the model as a tensor.""" return torch.tensor(self._noise_std) @property def sources_mean(self) -> torch.Tensor: """Return the mean of the sources as a tensor.""" return torch.tensor([[1 if (i + j) % 2 == 0 else -1 for j in range(self.n_clusters)] for i in range(self.source_dimension)]) @property def sources_std(self) -> torch.Tensor: """Return the standard deviation of the sources as a tensor.""" return torch.ones( self.source_dimension, self.n_clusters ) def __init__(self, name: str, **kwargs): self.source_dimension: Optional[int] = None dimension = kwargs.get("dimension", None) n_clusters = kwargs.get("n_clusters", None) if "features" in kwargs: dimension = len(kwargs["features"]) observation_models = kwargs.get("obs_models", None) if observation_models is None: observation_models = "gaussian-diagonal" if observation_models == "gaussian-diagonal": if n_clusters < 2: raise LeaspyInputError( "Number of clusters should be at least 2 to fit a mixture model" ) if dimension == 1: raise LeaspyInputError( "You cannot use a multivariate model with 1 feature" ) if isinstance(observation_models, (list, tuple)): kwargs["obs_models"] = tuple( [ observation_model_factory(obs_model, **kwargs) for obs_model in observation_models ] ) elif isinstance(observation_models, (dict)): # Not really satisfied... Used for api load kwargs["obs_models"] = tuple( [ observation_model_factory( observation_models["y"], dimension=dimension, n_clusters=n_clusters, ) ] ) else: kwargs["obs_models"] = ( observation_model_factory( observation_models, dimension=dimension, n_clusters=n_clusters ), ) super().__init__(name, **kwargs)
[docs] def get_variables_specs(self) -> NamedVariables: """ Return the specifications of the variables (latent variables, derived variables, model 'parameters') that are part of the model. Returns ------- :class:`~leaspy.variables.specs.NamedVariables` : A dictionary-like object containing specifications for the variables """ d = super().get_variables_specs() d.update( rt=LinkedVariable(self.time_reparametrization), # PRIORS tau_mean=ModelParameter.for_ind_mean_mixture("tau", shape=(self.n_clusters,)), tau_std=ModelParameter.for_ind_std_mixture("tau", shape=(self.n_clusters,)), xi_mean=ModelParameter.for_ind_mean_mixture("xi", shape=(self.n_clusters,)), xi_std=ModelParameter.for_ind_std_mixture("xi", shape=(self.n_clusters,)), probs = ModelParameter.for_probs(shape=self.n_clusters), # LATENT VARS xi=IndividualLatentVariable(MixtureNormal("xi_mean", "xi_std", "probs"), sampling_kws={"scale": 10},), tau=IndividualLatentVariable(MixtureNormal("tau_mean", "tau_std", "probs"), sampling_kws={"scale": 10},), # DERIVED VARS alpha=LinkedVariable(Exp("xi")), ) if self.source_dimension >= 1: d.update( # PRIORS betas_mean=ModelParameter.for_pop_mean( "betas", shape=(self.dimension - 1, self.source_dimension), ), betas_std=Hyperparameter(0.01), sources_mean=ModelParameter.for_ind_mean_mixture( "sources", shape=(self.source_dimension, self.n_clusters,), ), sources_std=Hyperparameter(1.0), # LATENT VARS betas=PopulationLatentVariable( Normal("betas_mean", "betas_std"), sampling_kws={"scale": 0.5}, ), sources=IndividualLatentVariable(MixtureNormal("sources_mean", "sources_std", "probs"), sampling_kws={"scale": 10}), # DERIVED VARS mixing_matrix=LinkedVariable( MatMul("orthonormal_basis", "betas").then(torch.t) ), # shape: (Ns, Nfts) space_shifts=LinkedVariable( MatMul("sources", "mixing_matrix") ), # shape: (Ni, Nfts) ) return d
@property def has_sources(self) -> bool: """ Indicates whether the model includes sources. Returns ------- :obj:`bool` True if `source_dimension` is a positive integer. False otherwise. """ return ( hasattr(self, "source_dimension") and isinstance(self.source_dimension, int) and self.source_dimension > 0 )
[docs] @staticmethod def time_reparametrization( *, t: TensorOrWeightedTensor[float], alpha: torch.Tensor, tau: torch.Tensor, ) -> TensorOrWeightedTensor[float]: """ Tensorized time reparametrization formula. .. warning:: Shapes of tensors must be compatible between them. Parameters ---------- t : :class:`torch.Tensor` Timepoints to reparametrize alpha : :class:`torch.Tensor` Acceleration factors of individual(s) tau : :class:`torch.Tensor` Time-shift(s) of individual(s) Returns ------- :class:`torch.Tensor` Reparametrized time of same shape as `timepoints` """ return alpha * (t - tau)
def _validate_compatibility_of_dataset( self, dataset: Optional[Dataset] = None ) -> None: """ Validate the compatibility of the provided dataset with the model's configuration. Parameters ---------- dataset : :class:`~leaspy.io.data.dataset.Dataset`, optional The dataset to validate against, by default None. Raises ------ :exc: `.LeaspyModelInputError` If `source_dimension` is provided but not an integer in the valid range [0, dataset.dimension - 1), or if `n_clusters` is provided but is not an integer ≥ 2. """ super()._validate_compatibility_of_dataset(dataset) if not dataset: return if self.source_dimension is None: self.source_dimension = int(dataset.dimension**0.5) warnings.warn( "You did not provide `source_dimension` hyperparameter for multivariate model, " f"setting it to ⌊√dimension⌋ = {self.source_dimension}." ) elif not ( isinstance(self.source_dimension, int) and 0 <= self.source_dimension < dataset.dimension ): raise LeaspyModelInputError( f"Sources dimension should be an integer in [0, dimension - 1[ " f"but you provided `source_dimension` = {self.source_dimension} " f"whereas `dimension` = {dataset.dimension}." ) # add n_clusters if self.n_clusters is None: warnings.warn( "You did not provide `n_clusters` hyperparameter for mixture model" ) elif not (isinstance(self.n_clusters, int) and self.n_clusters >= 2): raise LeaspyModelInputError( f"Number of clusters should be an integer greater than 2 " f"but you provided `n_clusters` = {self.n_clusters} " ) def _audit_individual_parameters( self, individual_parameters: DictParams ) -> KwargsType: """ Validate and process individual parameter inputs for model compatibility. Parameters ---------- individual_parameters : :class:`~leaspy.utils.typing.DictParams` A dictionary mapping parameter names (strings) to their values, which can be scalars or array-like structures. Returns ------- KwargsType: :class:`~leaspy.utils.typing.KwargsType` A dictionary with the following keys: - "nb_inds": Number of individuals - "tensorized_ips": Dictionary of parameters converted to 2D tensors. - "tensorized_ips_gen": Generator yielding tensors for each individual, each with an added batch dimension. Raises ------ :exc: `LeaspyIndividualParamsInputError` If the provided dictionary keys do not match the expected parameter names, or if the sizes of individual parameters are inconsistent, or if `sources` parameter does not meet array-like requirements. """ from .utilities import is_array_like, tensorize_2D expected_parameters = set(["xi", "tau"] + int(self.has_sources) * ["sources"]) given_parameters = set(individual_parameters.keys()) symmetric_diff = expected_parameters.symmetric_difference(given_parameters) if len(symmetric_diff) > 0: raise LeaspyIndividualParamsInputError( f"Individual parameters dict provided {given_parameters} " f"is not compatible for {self.name} model. " f"The expected individual parameters are {expected_parameters}." ) ips_is_array_like = { k: is_array_like(v) for k, v in individual_parameters.items() } ips_size = { k: len(v) if ips_is_array_like[k] else 1 for k, v in individual_parameters.items() } if self.has_sources: if not ips_is_array_like["sources"]: raise LeaspyIndividualParamsInputError( f"Sources must be an array_like but {individual_parameters['sources']} was provided." ) tau_xi_scalars = all(ips_size[k] == 1 for k in ["tau", "xi"]) if tau_xi_scalars and (ips_size["sources"] > 1): # is 'sources' not a nested array? (allowed iff tau & xi are scalars) if not is_array_like(individual_parameters["sources"][0]): # then update sources size (1D vector representing only 1 individual) ips_size["sources"] = 1 # TODO? check source dimension compatibility? uniq_sizes = set(ips_size.values()) if len(uniq_sizes) != 1: raise LeaspyIndividualParamsInputError( f"Individual parameters sizes are not compatible together. Sizes are {ips_size}." ) # number of individuals present n_individual_parameters = uniq_sizes.pop() # properly choose unsqueezing dimension when tensorizing array_like (useful for sources) # [1,2] => [[1],[2]] (expected for 2 individuals / 1D sources) # [1,2] => [[1,2]] (expected for 1 individual / 2D sources) unsqueeze_dim = 0 if n_individual_parameters == 1 else -1 # tensorized (2D) version of ips tensorized_individual_parameters = { name: tensorize_2D(value, unsqueeze_dim=unsqueeze_dim) for name, value in individual_parameters.items() } return { "nb_inds": n_individual_parameters, "tensorized_ips": tensorized_individual_parameters, "tensorized_ips_gen": ( { name: value[individual, :].unsqueeze(0) for name, value in tensorized_individual_parameters.items() } for individual in range(n_individual_parameters) ), }
[docs] def put_individual_parameters(self, state: State, dataset: Dataset): """ Initialize individual latent parameters in the given state if not already set. Parameters ---------- state : :class:`~leaspy.variables.state.State` The current state object that holds all the variables dataset : :class:`~leaspy.io.data.Data.Dataset` Dataset used to initialize latent variables accordingly. """ df = dataset.to_pandas().reset_index("TIME").groupby("ID").min() # Initialise individual parameters if they are not already initialised if not state.are_variables_set(("xi", "tau")): df_ind = df["TIME"].to_frame(name="tau") df_ind["xi"] = 0.0 else: df_ind = pd.DataFrame( torch.concat([state["xi"], state["tau"]], axis=1).detach().numpy(), columns=["xi", "tau"], index=np.arange(state["xi"].shape[0]), # use correct number of rows ) if self.source_dimension > 0: for i in range(self.source_dimension): df_ind[f"sources_{i}"] = 0.0 with state.auto_fork(None): state.put_individual_latent_variables(df=df_ind)
def _load_hyperparameters(self, hyperparameters: KwargsType) -> None: """ Updates all model hyperparameters from the provided hyperparameters. Parameters ---------- hyperparameters : :class:`~leaspy.utils.typing.KwargsType` Dictionary containing the hyperparameters to be loaded. Expected keys include: - "features": List or sequence of feature names - "dimension": Integer specifying the number of features - "source_dimension": Integer specifying the number of sources; must be in [0, dimension - 1]. - "n_clusters": Integer, must be ≥ 2 Raises ------ :exc: `LeaspyModelInputError` - `dimension` does not match the number of `features` - `source_dimension` is invalid or out of range - `n_clusters` is missing or less than 2 """ expected_hyperparameters = ( "features", "dimension", "source_dimension", "n_clusters", ) if "features" in hyperparameters: self.features = hyperparameters["features"] if "dimension" in hyperparameters: if self.features and hyperparameters["dimension"] != len(self.features): raise LeaspyModelInputError( f"Dimension provided ({hyperparameters['dimension']}) does not match " f"features ({len(self.features)})" ) self.dimension = hyperparameters["dimension"] if "source_dimension" in hyperparameters: if not ( isinstance(hyperparameters["source_dimension"], int) and (hyperparameters["source_dimension"] >= 0) and ( self.dimension is None or hyperparameters["source_dimension"] <= self.dimension - 1 ) ): raise LeaspyModelInputError( f"Source dimension should be an integer in [0, dimension - 1], " f"not {hyperparameters['source_dimension']}" ) self.source_dimension = hyperparameters["source_dimension"] if "n_clusters" in hyperparameters: if not ( isinstance(hyperparameters["n_clusters"], int) and (hyperparameters["n_clusters"] >= 2) ): raise LeaspyModelInputError( f"Number of clusters should be an integer greater than 2, " f"not {hyperparameters['n_clusters']} " ) self.n_clusters = hyperparameters["n_clusters"] self._raise_if_unknown_hyperparameters( expected_hyperparameters, hyperparameters )
[docs] def to_dict(self, *, with_mixing_matrix: bool = True) -> KwargsType: """ Export model object as dictionary ready for :term:`JSON` saving. Parameters ---------- with_mixing_matrix : :obj:`bool` (default ``True``) Save the :term:`mixing matrix` in the exported file in its 'parameters' section. .. warning:: It is not a real parameter and its value will be overwritten at model loading (orthonormal basis is recomputed from other "true" parameters and mixing matrix is then deduced from this orthonormal basis and the betas)! It was integrated historically because it is used for convenience in browser webtool and only there... Returns ------- :class:`~leaspy.utils.typing.KwargsType` : The object as a dictionary. """ # add n_clusters model_settings = super().to_dict() model_settings["n_clusters"] = self.n_clusters model_settings["source_dimension"] = self.source_dimension if with_mixing_matrix and self.source_dimension >= 1: # transposed compared to previous version model_settings["parameters"]["mixing_matrix"] = self.state[ "mixing_matrix" ].tolist() return model_settings
[docs] @doc_with_super() class RiemanianManifoldMixtureModel(TimeReparametrizedMixtureModel): """ A riemannian manifold model tailored to handle mixture models with multiple clusters. This class extends `RiemanianManifoldModel` to incorporate mixture-specific behaviors, mainly the handling of sources for multiple clusters. Parameters ---------- name : :obj:`str` The name of the model. **kwargs Hyperparameters of the model (including `noise_model`) Raises ------ :exc:`.LeaspyModelInputError` * If hyperparameters are inconsistent """ def __init__( self, name: str, variables_to_track: Optional[Iterable[str]] = None, **kwargs ): super().__init__(name, **kwargs) default_variables_to_track = [ "g", "v0", "noise_std", "tau_mean", "tau_std", "xi_mean", "xi_std", "nll_attach", "nll_regul_log_g", "nll_regul_log_v0", "xi", "tau", "nll_regul_pop_sum", "nll_regul_all_sum", "nll_tot", ] if self.source_dimension: default_variables_to_track += [ "sources", "betas", "mixing_matrix", "space_shifts", "sources_mean", ] variables_to_track = variables_to_track or default_variables_to_track self.tracked_variables = self.tracked_variables.union(set(variables_to_track)) self.tracked_variables_ordered = variables_to_track @classmethod def _center_xi_realizations(cls, state: State) -> None: """ Center the ``xi`` realizations in place. Parameters ---------- state : :class:`~leaspy.variables.state.State` The dictionary-like object representing current model state, which contains keys such as``"xi"`` and ``"log_v0"``. Notes ----- This transformation preserves the orthonormal basis since the new ``v0`` remains collinear to the previous one. It is a purely internal operation meant to reduce redundancy in the parameter space (i.e., improve identifiability and stabilize inference). """ mean_xi = torch.mean(state["xi"]) state["xi"] = state["xi"] - mean_xi state["log_v0"] = state["log_v0"] + mean_xi @classmethod def _center_sources_realizations(cls, state: State) -> None: """ Center the ``sources`` realizations in place. Parameters ---------- state : :class:`~leaspy.variables.state.State` The dictionary-like object representing current model state, which contains keys such as``"sources"``. """ mean_sources = torch.mean(state["sources"]) state["sources"] = state["sources"] - mean_sources
[docs] @classmethod def compute_sufficient_statistics(cls, state: State) -> SuffStatsRW: """ Compute the model's :term:`sufficient statistics`. Parameters ---------- state : :class:`~leaspy.variables.state.State` The state to pick values from. Returns ------- SuffStatsRW : The computed sufficient statistics. """ cls._center_xi_realizations(state) cls._center_sources_realizations(state) return super().compute_sufficient_statistics(state)
[docs] def get_variables_specs(self) -> NamedVariables: """ Return the specifications of the variables (latent variables, derived variables, model 'parameters') that are part of the model. Returns ------- :class:`~leaspy.variables.specs.NamedVariables` A dictionary-like object mapping variable names to their specifications. These include `ModelParameter`, `Hyperparameter`, `PopulationLatentVariable`, and `LinkedVariable` instances. """ d = super().get_variables_specs() d.update( # PRIORS log_v0_mean=ModelParameter.for_pop_mean( "log_v0", shape=(self.dimension,), ), log_v0_std=Hyperparameter(0.01), # no xi_mean as hyperaparameter # LATENT VARS log_v0=PopulationLatentVariable(Normal("log_v0_mean", "log_v0_std")), # DERIVED VARS v0=LinkedVariable( Exp("log_v0"), ), metric=LinkedVariable( self.metric ), # for linear model: metric & metric_sqr are fixed = 1. ) if self.source_dimension >= 1: d.update( model=LinkedVariable(self.model_with_sources), metric_sqr=LinkedVariable(Sqr("metric")), orthonormal_basis=LinkedVariable(OrthoBasis("v0", "metric_sqr")), ) else: d["model"] = LinkedVariable(self.model_no_sources) return d
[docs] @staticmethod @abstractmethod def metric(*, g: torch.Tensor) -> torch.Tensor: pass
[docs] @classmethod def model_no_sources(cls, *, rt: torch.Tensor, metric, v0, g) -> torch.Tensor: """ Return the model output when sources(spatial components) are not present. Parameters ---------- rt : :class:`torch.Tensor` The reparametrized time. metric : Any The metric tensor used for computing the spatial/temporal influence. v0 : Any The values of the population parameter `v0` for each feature. g : Any The values of the population parameter `g` for each feature. Returns ------- :class:`torch.Tensor` The model output without contribution from source shifts. Notes ----- This implementation delegates to `model_with_sources` with `space_shifts` set to a zero tensor of shape (1, 1), effectively removing source effects. """ return cls.model_with_sources( rt=rt, metric=metric, v0=v0, g=g, space_shifts=torch.zeros((1, 1)), )
[docs] @classmethod @abstractmethod def model_with_sources( cls, *, rt: torch.Tensor, space_shifts: torch.Tensor, metric, v0, g, ) -> torch.Tensor: pass
[docs] class LogisticMixtureInitializationMixin: def _compute_initial_values_for_model_parameters( self, dataset: Dataset, ) -> VariablesLazyValuesRO: """ Compute initial values for model parameters. Parameters ---------- dataset : ::class:`~leaspy.io.data.Data.Dataset` The dataset from which to extract observations and masks. Returns ------- :class:`~leaspy.variables.specs.VariablesLazyValuesRO` A dictionary mapping parameter names (as strings) to their initialized torch.Tensor values. Notes ----- - If the initialization method is `DEFAULT`, patient means are used. - If `RANDOM`, parameters are sampled from normal distributions centered at patient means with estimated standard deviations. - `values` are clamped between 0.01 and 0.99 to avoid boundary issues. - If the model includes sources (source_dimension >= 1), regression coefficients `betas_mean` are initialized accordingly. - If the observation model is a `FullGaussianObservationModel`, the noise standard deviation parameter is expanded to the correct shape. """ from leaspy.models.utilities import ( compute_patient_slopes_distribution, compute_patient_time_distribution, compute_patient_values_distribution, get_log_velocities, torch_round, ) # initialize a df with the probabilities of each individual belonging to each cluster df = dataset.to_pandas(apply_headers=True) n_inds = df.reset_index("TIME").groupby("ID").min().shape[0] n_clusters = self.n_clusters probs = torch.ones(n_clusters) / n_clusters slopes_mu, slopes_sigma = compute_patient_slopes_distribution(df) values_mu, values_sigma = compute_patient_values_distribution(df) if self.initialization_method == InitializationMethod.DEFAULT: slopes = slopes_mu values = values_mu betas = torch.zeros((self.dimension - 1, self.source_dimension)) if self.initialization_method == InitializationMethod.RANDOM: slopes = torch.normal(slopes_mu, slopes_sigma) values = torch.normal(values_mu, values_sigma) betas = torch.distributions.normal.Normal(loc=0.0, scale=1.0).sample( sample_shape=(self.dimension - 1, self.source_dimension) ) step = math.ceil(n_inds / n_clusters) start = 0 ids = pd.DataFrame( df.index.get_level_values("ID").unique() ) # get the values of the IDs for c in range(n_clusters): ids_cluster = ids.loc[ start : step * (c + 1), "ID" ] # get the IDs of the cluster df_cluster = df.loc[ ids_cluster.values ] # get all the dataframe for the cluster time_mu, time_sigma = compute_patient_time_distribution(df_cluster) if self.initialization_method == InitializationMethod.DEFAULT: t0_c = time_mu if self.initialization_method == InitializationMethod.RANDOM: t0_c = torch.normal(time_mu, time_sigma) start = step * (c + 1) + 1 # stock the values for all the clusters if c == 0: t0 = t0_c.unsqueeze(-2) else: t0 = torch.tensor(np.append(t0, t0_c.item())) # Enforce values are between 0 and 1 values = values.clamp( min=1e-2, max=1 - 1e-2 ) # always "works" for ordinal (values >= 1) parameters = { "log_g_mean": torch.log(1.0 / values - 1.0), "log_v0_mean": get_log_velocities(slopes, self.features), "tau_mean": t0, "tau_std": self.tau_std, "xi_mean": self.xi_mean, "xi_std": self.xi_std, "probs": probs, } if self.source_dimension >= 1: parameters["betas_mean"] = betas parameters["sources_mean"] = self.sources_mean rounded_parameters = { str(p): torch_round(v.to(torch.float32)) for p, v in parameters.items() } obs_model = next(iter(self.obs_models)) # WIP: multiple obs models... if isinstance(obs_model, FullGaussianObservationModel): rounded_parameters["noise_std"] = self.noise_std.expand( obs_model.extra_vars["noise_std"].shape ) return rounded_parameters
[docs] class LogisticMultivariateMixtureModel( LogisticMixtureInitializationMixin, RiemanianManifoldMixtureModel ): """Mixture Manifold model for multiple variables of interest (logistic formulation).""" def __init__(self, name: str, **kwargs): super().__init__(name, **kwargs)
[docs] def get_variables_specs(self) -> NamedVariables: """ Return the specifications of the variables (latent variables, derived variables, model 'parameters') that are part of the model. Returns ------- :class:`~leaspy.variables.specs.NamedVariables` A dictionary-like object mapping variable names to their specifications. These include `ModelParameter`, `Hyperparameter`, `PopulationLatentVariable`, and `LinkedVariable` instances. """ d = super().get_variables_specs() d.update( log_g_mean=ModelParameter.for_pop_mean("log_g", shape=(self.dimension,)), log_g_std=Hyperparameter(0.01), log_g=PopulationLatentVariable(Normal("log_g_mean", "log_g_std")), g=LinkedVariable(Exp("log_g")), ) return d
[docs] @staticmethod def metric(*, g: torch.Tensor) -> torch.Tensor: """ Compute the metric tensor from input tensor `g`. This function calculates the metric as \((g + 1)^2 / g\) element-wise. Parameters ---------- g : :class:`torch.Tensor` Input tensor with values of the population parameter `g` for each feature. Returns ------- :class:`torch.Tensor` The computed metric tensor, same shape as g(number of features) """ return (g + 1) ** 2 / g
[docs] @classmethod def model_with_sources( cls, *, rt: TensorOrWeightedTensor[float], space_shifts: TensorOrWeightedTensor[float], metric: TensorOrWeightedTensor[float], v0: TensorOrWeightedTensor[float], g: TensorOrWeightedTensor[float], ) -> torch.Tensor: """ Return the model output when sources(spatial components) are present. Parameters ---------- rt : :class:`~leaspy.utils.weighted_tensor.TensorOrWeightedTensor` [:obj:`float`] Tensor containing the reparametrized time. space_shifts : :class:`~leaspy.utils.weighted_tensor.TensorOrWeightedTensor` [:obj:`float`] Tensor containing the values of the space-shifts metric : :class:`~leaspy.utils.weighted_tensor.TensorOrWeightedTensor` [:obj:`float`] Tensor containing the metric tensor used for computing the spatial/temporal influence. v0 : :class:`~leaspy.utils.weighted_tensor.TensorOrWeightedTensor` [:obj:`float`] Tensor containing the values of the population parameter `v0` for each feature. g : :class:`~leaspy.utils.weighted_tensor.TensorOrWeightedTensor` [:obj:`float`] Tensor containing the values of the population parameter `g` for each feature. Returns ------- :class:`torch.Tensor` Weighted value tensor after applying sigmoid transformation, representing the model output with sources. """ # Shape: (Ni, Nt, Nfts) pop_s = (None, None, ...) rt = unsqueeze_right(rt, ndim=1) # .filled(float('nan')) w_model_logit = metric[pop_s] * ( v0[pop_s] * rt + space_shifts[:, None, ...] ) - torch.log(g[pop_s]) model_logit, weights = WeightedTensor.get_filled_value_and_weight( w_model_logit, fill_value=0.0 ) return WeightedTensor(torch.sigmoid(model_logit), weights).weighted_value