Source code for vangja.components.normal_constant

"""Normal Constant component for vangja time series models.

This module provides a constant term with a Normal prior distribution
for use in time series forecasting models.
"""

import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt

from vangja.time_series import TimeSeriesModel
from vangja.types import PoolType, TuneMethod
from vangja.utils import get_group_definition


[docs] class NormalConstant(TimeSeriesModel): """A constant component with a Normal (Gaussian) prior distribution. This component adds a constant term to the model that is sampled from a Normal distribution. It's useful for modeling baseline offsets or intercept terms that may vary across different time series. Parameters ---------- mu : float, default=0 The mean of the Normal prior for the constant parameter. sd : float, default=1 The standard deviation of the Normal prior for the constant parameter. pool_type : PoolType, default="complete" Type of pooling performed when sampling. Options are: - "complete": All series share the same constant value. - "partial": Series have individual constants with shared hyperpriors. - "individual": Each series has a completely independent constant. tune_method : TuneMethod | None, default=None How the transfer learning is to be performed. Options are: - "parametric": Use posterior mean and std from idata as new priors. - "prior_from_idata": Use the posterior samples directly as priors. - None: This component will not be tuned even if idata is provided. override_mu_for_tune : float | None, default=None Override the mean of the Normal prior for the constant parameter with this value during transfer learning. override_sd_for_tune : float | None, default=None Override the standard deviation of the Normal prior for the constant parameter with this value during transfer learning. shrinkage_strength : float, default=1 Shrinkage between groups for the hierarchical modeling. Higher values result in stronger shrinkage toward the shared mean. Attributes ---------- model_idx : int | None Index of this component in the model, set during definition. group : np.ndarray Array of group codes for each data point. n_groups : int Number of unique groups/series. groups_ : dict[int, str] Mapping from group codes to group names. Examples -------- >>> from vangja import LinearTrend, NormalConstant >>> # Add a normal constant offset to a linear trend >>> model = LinearTrend() + NormalConstant(mu=0, sd=10) >>> model.fit(data) >>> predictions = model.predict(horizon=30) >>> # Use partial pooling for multi-series data >>> model = LinearTrend() + NormalConstant(mu=0, sd=10, pool_type="partial") """ model_idx: int | None = None
[docs] def __init__( self, mu: float = 0, sd: float = 1, pool_type: PoolType = "complete", tune_method: TuneMethod | None = None, override_mu_for_tune: float | None = None, override_sd_for_tune: float | None = None, shrinkage_strength: float = 1, ): """Initialize the NormalConstant component. Parameters ---------- mu : float, default=0 The mean of the Normal prior for the constant parameter. sd : float, default=1 The standard deviation of the Normal prior for the constant parameter. pool_type : PoolType, default="complete" Type of pooling performed when sampling. tune_method : TuneMethod | None, default=None How the transfer learning is to be performed. override_mu_for_tune : float | None, default=None Override the mean of the Normal prior during transfer learning. override_sd_for_tune : float | None, default=None Override the standard deviation of the Normal prior during transfer learning. shrinkage_strength : float, default=1 Shrinkage between groups for hierarchical modeling. """ self.mu = mu self.sd = sd self.pool_type = pool_type self.tune_method = tune_method self.override_mu_for_tune = override_mu_for_tune self.override_sd_for_tune = override_sd_for_tune self.shrinkage_strength = shrinkage_strength
def _get_params_from_idata(self, idata: az.InferenceData) -> tuple[float, float]: """Extract Normal prior parameters from posterior samples. Parameters ---------- idata : az.InferenceData Sample from a posterior. Returns ------- tuple[float, float] The mean and standard deviation derived from the posterior. """ c_key = f"nc_{self.model_idx} - c(mu={self.mu},sd={self.sd})" if self.override_mu_for_tune is not None: mu = self.override_mu_for_tune else: mu = float(idata["posterior"][c_key].to_numpy().mean()) if self.override_sd_for_tune is not None: sd = self.override_sd_for_tune else: sd = float(idata["posterior"][c_key].to_numpy().std()) return mu, sd def _complete_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ) -> pt.TensorVariable: """Add NormalConstant parameters for complete pooling. Parameters ---------- model : pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors : dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata : az.InferenceData | None Sample from a posterior for transfer learning. Returns ------- pt.TensorVariable The constant term to add to the model. """ with model: c_key = f"nc_{self.model_idx} - c(mu={self.mu},sd={self.sd})" if idata is not None and self.tune_method == "parametric": mu, sd = self._get_params_from_idata(idata) c = pm.Normal(c_key, mu=mu, sigma=sd) elif priors is not None and self.tune_method == "prior_from_idata": c = pm.Deterministic(c_key, priors[f"prior_{c_key}"]) else: c = pm.Normal(c_key, mu=self.mu, sigma=self.sd) return c def _partial_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ) -> pt.TensorVariable: """Add NormalConstant parameters for partial pooling. Parameters ---------- model : pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors : dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata : az.InferenceData | None Sample from a posterior for transfer learning. Returns ------- pt.TensorVariable The constant terms indexed by group. """ with model: c_key = f"nc_{self.model_idx} - c(mu={self.mu},sd={self.sd})" sd = self.sd if idata is not None and self.tune_method == "parametric": mu, sd = self._get_params_from_idata(idata) c_shared = pm.Normal(f"nc_{self.model_idx} - c_shared", mu=mu, sigma=sd) elif priors is not None and self.tune_method == "prior_from_idata": mu, sd = self._get_params_from_idata(idata) c_shared = pm.Deterministic( f"nc_{self.model_idx} - c_shared", priors[f"prior_{c_key}"] ) else: c_shared = pm.Normal( f"nc_{self.model_idx} - c_shared", mu=self.mu, sigma=self.sd ) c_sigma = pm.HalfNormal( f"nc_{self.model_idx} - c_sigma", sigma=sd / self.shrinkage_strength, ) c_offset = pm.Normal( f"nc_{self.model_idx} - c_offset", mu=0, sigma=1, shape=self.n_groups, ) c = pm.Deterministic(c_key, c_shared + c_offset * c_sigma) return c[self.group] def _individual_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ) -> pt.TensorVariable: """Add NormalConstant parameters for individual pooling. Parameters ---------- model : pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors : dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata : az.InferenceData | None Sample from a posterior for transfer learning. Returns ------- pt.TensorVariable The constant terms indexed by group. """ with model: c_key = f"nc_{self.model_idx} - c(mu={self.mu},sd={self.sd})" if idata is not None and self.tune_method == "parametric": mu, sd = self._get_params_from_idata(idata) c = pm.Normal(c_key, mu=mu, sigma=sd, shape=self.n_groups) elif priors is not None and self.tune_method == "prior_from_idata": mu, sd = self._get_params_from_idata(idata) c = pm.Normal( c_key, mu=priors[f"prior_{c_key}"], sigma=sd, shape=self.n_groups, ) else: c = pm.Normal(c_key, mu=self.mu, sigma=self.sd, shape=self.n_groups) return c[self.group]
[docs] def definition( self, model: pm.Model, data: pd.DataFrame, model_idxs: dict[str, int], priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ) -> pt.TensorVariable: """Add the NormalConstant parameters to the model. Parameters ---------- model : pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). model_idxs : dict[str, int] Count of the number of components from each type. priors : dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata : az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. Returns ------- pt.TensorVariable The constant term(s) to add to the model. """ model_idxs["nc"] = model_idxs.get("nc", 0) self.model_idx = model_idxs["nc"] model_idxs["nc"] += 1 self.group, self.n_groups, self.groups_ = get_group_definition( data, self.pool_type ) with model: if self.pool_type == "complete": return self._complete_definition(model, data, priors, idata) elif self.pool_type == "partial": return self._partial_definition(model, data, priors, idata) elif self.pool_type == "individual": return self._individual_definition(model, data, priors, idata)
def _get_initval(self, initvals: dict[str, float], model: pm.Model) -> dict: """Get the initval of the constant parameter. Parameters ---------- initvals : dict[str, float] Calculated initvals based on data. model : pm.Model The model for which the initvals will be set. Returns ------- dict Empty dictionary as no special initialization is needed. """ return {} def _predict_map( self, future: pd.DataFrame, map_approx: dict[str, np.ndarray] ) -> np.ndarray: """Perform MAP prediction for the constant component. Parameters ---------- future : pd.DataFrame Pandas dataframe containing the timestamps for which inference should be performed. map_approx : dict[str, np.ndarray] The MAP posterior parameter estimate. Returns ------- np.ndarray Array of shape (n_groups, n_timestamps) with constant values. """ forecasts = [] self._predict_columns = {} c_key = f"nc_{self.model_idx} - c(mu={self.mu},sd={self.sd})" for group_code in self.groups_.keys(): c_value = map_approx[c_key] if self.pool_type != "complete": c_value = c_value[group_code] forecast = np.ones(len(future)) * c_value forecasts.append(forecast) self._predict_columns[f"nc_{self.model_idx}_{group_code}"] = forecast return np.vstack(forecasts) def _predict_mcmc( self, future: pd.DataFrame, trace: az.InferenceData ) -> np.ndarray: """Perform MCMC prediction for the constant component. Parameters ---------- future : pd.DataFrame Pandas dataframe containing the timestamps for which inference should be performed. trace : az.InferenceData Samples from the posterior. Returns ------- np.ndarray Array of shape (n_groups, n_timestamps) with constant values. """ forecasts = [] self._predict_columns = {} c_key = f"nc_{self.model_idx} - c(mu={self.mu},sd={self.sd})" for group_code in self.groups_.keys(): c_samples = trace["posterior"][c_key].to_numpy() if self.pool_type != "complete": c_value = c_samples[:, :, group_code].mean() else: c_value = c_samples.mean() forecast = np.ones(len(future)) * c_value forecasts.append(forecast) self._predict_columns[f"nc_{self.model_idx}_{group_code}"] = forecast return np.vstack(forecasts) def _plot( self, plot_params: dict, future: pd.DataFrame, data: pd.DataFrame, scale_params: dict, y_true: pd.DataFrame | None = None, series: int | str = "", ) -> None: """Plot the constant component. Parameters ---------- plot_params : dict Dictionary containing plotting parameters, including 'idx' for subplot indexing. future : pd.DataFrame Pandas dataframe containing the predictions. data : pd.DataFrame The training data. scale_params : dict Scaling parameters used for the data. y_true : pd.DataFrame | None, default=None A pandas dataframe containing the true values for comparison. series : int | str, default="" The series identifier for multi-series plots. """ plot_params["idx"] += 1 plt.subplot(100, 1, plot_params["idx"]) plt.title(f"NormalConstant({self.model_idx}, mu={self.mu}, sd={self.sd})") # Handle series parameter - convert group_code int to key format series_suffix = f"_{series}" if series != "" else "_0" col_name = f"nc_{self.model_idx}{series_suffix}" if col_name in future.columns: plt.bar([0], [future[col_name].iloc[0]]) else: # Fallback for complete pooling col_name = f"nc_{self.model_idx}_0" if col_name in future.columns: plt.bar([0], [future[col_name].iloc[0]]) plt.axhline(0, c="k", linewidth=1) plt.grid(True, alpha=0.3) def _assign_model_idx(self, model_idxs: dict[str, int]) -> None: model_idxs["nc"] = model_idxs.get("nc", 0) self.model_idx = model_idxs["nc"] model_idxs["nc"] += 1 def _get_prior_var_names(self) -> list[str]: if self.tune_method != "prior_from_idata": return [] return [f"nc_{self.model_idx} - c(mu={self.mu},sd={self.sd})"]
[docs] def needs_priors(self, *args, **kwargs) -> bool: """Check if this component needs priors from idata. Returns ------- bool True if tune_method is "prior_from_idata", False otherwise. """ return self.tune_method == "prior_from_idata"
[docs] def __str__(self) -> str: """Return string representation of the component. Returns ------- str String representation. """ return ( f"NC(mu={self.mu},sd={self.sd},pt={self.pool_type},tm={self.tune_method})" )