Source code for vangja.components.beta_constant

"""Beta Constant component for vangja time series models.

This module provides a constant term with a Beta prior distribution
for use in time series forecasting models, scaled to a specified range.
"""

import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt

from vangja.time_series import TimeSeriesModel
from vangja.types import PoolType, TuneMethod
from vangja.utils import get_group_definition


[docs] class BetaConstant(TimeSeriesModel): """A constant component with a Beta prior distribution scaled to a range. This component adds a constant term to the model that is sampled from a Beta distribution and then scaled to lie within [lower, upper]. It's useful for modeling parameters that should be bounded and have flexible shapes controlled by the alpha and beta parameters. Parameters ---------- lower : float The lower bound for the constant parameter after scaling. upper : float The upper bound for the constant parameter after scaling. alpha : float, default=0.5 The alpha parameter of the Beta distribution. Controls the shape. beta : float, default=0.5 The beta parameter of the Beta distribution. Controls the shape. pool_type : PoolType, default="complete" Type of pooling performed when sampling. Options are: - "complete": All series share the same constant value. - "partial": Series have individual constants with shared hyperpriors. - "individual": Each series has a completely independent constant. tune_method : TuneMethod | None, default=None How the transfer learning is to be performed. Options are: - "parametric": Use posterior samples to derive new Beta parameters. - "prior_from_idata": Use the posterior samples directly as priors. - None: This component will not be tuned even if idata is provided. shrinkage_strength : float, default=1 Shrinkage between groups for the hierarchical modeling. Higher values result in stronger shrinkage toward the shared mean. Attributes ---------- model_idx : int | None Index of this component in the model, set during definition. group : np.ndarray Array of group codes for each data point. n_groups : int Number of unique groups/series. groups_ : dict[int, str] Mapping from group codes to group names. Notes ----- The transformation from Beta to the scaled constant is: c = beta_value * (upper - lower) + lower Common choices for alpha and beta: - alpha=beta=0.5: Jeffrey's prior (U-shaped, more mass at extremes) - alpha=beta=1: Uniform distribution - alpha=beta=2: Symmetric bell-shaped Examples -------- >>> from vangja import LinearTrend, BetaConstant >>> # Add a beta-distributed scaling factor between 0.8 and 1.2 >>> model = LinearTrend() * BetaConstant(lower=0.8, upper=1.2, alpha=2, beta=2) >>> model.fit(data) >>> predictions = model.predict(horizon=30) >>> # Use partial pooling for multi-series data >>> model = LinearTrend() * BetaConstant(lower=0.5, upper=1.5, ... pool_type="partial") """ model_idx: int | None = None
[docs] def __init__( self, lower: float, upper: float, alpha: float = 0.5, beta: float = 0.5, pool_type: PoolType = "complete", tune_method: TuneMethod | None = None, shrinkage_strength: float = 1, ): """Initialize the BetaConstant component. Parameters ---------- lower : float The lower bound for the constant parameter after scaling. upper : float The upper bound for the constant parameter after scaling. alpha : float, default=0.5 The alpha parameter of the Beta distribution. beta : float, default=0.5 The beta parameter of the Beta distribution. pool_type : PoolType, default="complete" Type of pooling performed when sampling. tune_method : TuneMethod | None, default=None How the transfer learning is to be performed. shrinkage_strength : float, default=1 Shrinkage between groups for hierarchical modeling. """ self.lower = lower self.upper = upper self.alpha = alpha self.beta = beta self.pool_type = pool_type self.tune_method = tune_method self.shrinkage_strength = shrinkage_strength
def _get_params_from_idata(self, idata: az.InferenceData) -> tuple[float, float]: """Extract Beta parameters from posterior samples using method of moments. Parameters ---------- idata : az.InferenceData Sample from a posterior. Returns ------- tuple[float, float] The alpha and beta parameters derived from the posterior. """ c_key = f"bc_{self.model_idx} - c(l={self.lower},u={self.upper})" # Get scaled values and convert back to [0, 1] c_samples = idata["posterior"][c_key].to_numpy() beta_samples = (c_samples - self.lower) / (self.upper - self.lower) # Method of moments estimation for Beta distribution mean = float(np.clip(beta_samples.mean(), 0.01, 0.99)) var = float(np.clip(beta_samples.var(), 1e-6, mean * (1 - mean) - 1e-6)) # Solve for alpha and beta common = mean * (1 - mean) / var - 1 alpha = max(0.1, mean * common) beta = max(0.1, (1 - mean) * common) return alpha, beta def _complete_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ) -> pt.TensorVariable: """Add BetaConstant parameters for complete pooling. Parameters ---------- model : pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors : dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata : az.InferenceData | None Sample from a posterior for transfer learning. Returns ------- pt.TensorVariable The constant term to add to the model. """ with model: c_key = f"bc_{self.model_idx} - c(l={self.lower},u={self.upper})" beta_key = f"bc_{self.model_idx} - beta(a={self.alpha},b={self.beta})" if idata is not None and self.tune_method == "parametric": alpha, beta = self._get_params_from_idata(idata) beta_rv = pm.Beta(beta_key, alpha=alpha, beta=beta) elif priors is not None and self.tune_method == "prior_from_idata": # Convert from scaled back to beta space beta_rv = pm.Deterministic( beta_key, (priors[f"prior_{c_key}"] - self.lower) / (self.upper - self.lower), ) else: beta_rv = pm.Beta(beta_key, alpha=self.alpha, beta=self.beta) c = pm.Deterministic( c_key, beta_rv * (self.upper - self.lower) + self.lower ) return c def _partial_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ) -> pt.TensorVariable: """Add BetaConstant parameters for partial pooling. Parameters ---------- model : pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors : dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata : az.InferenceData | None Sample from a posterior for transfer learning. Returns ------- pt.TensorVariable The constant terms indexed by group. """ with model: c_key = f"bc_{self.model_idx} - c(l={self.lower},u={self.upper})" beta_key = f"bc_{self.model_idx} - beta(a={self.alpha},b={self.beta})" if idata is not None and self.tune_method == "parametric": alpha, beta = self._get_params_from_idata(idata) mu_beta = pm.Beta( f"bc_{self.model_idx} - mu_beta", alpha=alpha, beta=beta ) elif priors is not None and self.tune_method == "prior_from_idata": mu_beta = pm.Deterministic( f"bc_{self.model_idx} - mu_beta", (priors[f"prior_{c_key}"] - self.lower) / (self.upper - self.lower), ) else: mu_beta = pm.Beta( f"bc_{self.model_idx} - mu_beta", alpha=self.alpha, beta=self.beta ) # Hierarchical structure with offset beta_sigma = pm.HalfNormal( f"bc_{self.model_idx} - beta_sigma", sigma=0.1 / self.shrinkage_strength, ) beta_offset = pm.Normal( f"bc_{self.model_idx} - beta_offset", mu=0, sigma=1, shape=self.n_groups, ) # Clip to valid beta range [0, 1] beta_raw = mu_beta + beta_offset * beta_sigma beta_rv = pm.Deterministic(beta_key, pm.math.clip(beta_raw, 0.001, 0.999)) c = pm.Deterministic( c_key, beta_rv * (self.upper - self.lower) + self.lower ) return c[self.group] def _individual_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ) -> pt.TensorVariable: """Add BetaConstant parameters for individual pooling. Parameters ---------- model : pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors : dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata : az.InferenceData | None Sample from a posterior for transfer learning. Returns ------- pt.TensorVariable The constant terms indexed by group. """ with model: c_key = f"bc_{self.model_idx} - c(l={self.lower},u={self.upper})" beta_key = f"bc_{self.model_idx} - beta(a={self.alpha},b={self.beta})" if idata is not None and self.tune_method == "parametric": alpha, beta = self._get_params_from_idata(idata) beta_rv = pm.Beta(beta_key, alpha=alpha, beta=beta, shape=self.n_groups) elif priors is not None and self.tune_method == "prior_from_idata": alpha, beta = self._get_params_from_idata(idata) beta_rv = pm.Beta(beta_key, alpha=alpha, beta=beta, shape=self.n_groups) else: beta_rv = pm.Beta( beta_key, alpha=self.alpha, beta=self.beta, shape=self.n_groups ) c = pm.Deterministic( c_key, beta_rv * (self.upper - self.lower) + self.lower ) return c[self.group]
[docs] def definition( self, model: pm.Model, data: pd.DataFrame, model_idxs: dict[str, int], priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ) -> pt.TensorVariable: """Add the BetaConstant parameters to the model. Parameters ---------- model : pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). model_idxs : dict[str, int] Count of the number of components from each type. priors : dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata : az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. Returns ------- pt.TensorVariable The constant term(s) to add to the model. """ model_idxs["bc"] = model_idxs.get("bc", 0) self.model_idx = model_idxs["bc"] model_idxs["bc"] += 1 self.group, self.n_groups, self.groups_ = get_group_definition( data, self.pool_type ) with model: if self.pool_type == "complete": return self._complete_definition(model, data, priors, idata) elif self.pool_type == "partial": return self._partial_definition(model, data, priors, idata) elif self.pool_type == "individual": return self._individual_definition(model, data, priors, idata)
def _get_initval(self, initvals: dict[str, float], model: pm.Model) -> dict: """Get the initval of the constant parameter. Parameters ---------- initvals : dict[str, float] Calculated initvals based on data. model : pm.Model The model for which the initvals will be set. Returns ------- dict Empty dictionary as no special initialization is needed. """ return {} def _predict_map( self, future: pd.DataFrame, map_approx: dict[str, np.ndarray] ) -> np.ndarray: """Perform MAP prediction for the constant component. Parameters ---------- future : pd.DataFrame Pandas dataframe containing the timestamps for which inference should be performed. map_approx : dict[str, np.ndarray] The MAP posterior parameter estimate. Returns ------- np.ndarray Array of shape (n_groups, n_timestamps) with constant values. """ forecasts = [] self._predict_columns = {} c_key = f"bc_{self.model_idx} - c(l={self.lower},u={self.upper})" for group_code in self.groups_.keys(): c_value = map_approx[c_key] if self.pool_type != "complete": c_value = c_value[group_code] forecast = np.ones(len(future)) * c_value forecasts.append(forecast) self._predict_columns[f"bc_{self.model_idx}_{group_code}"] = forecast return np.vstack(forecasts) def _predict_mcmc( self, future: pd.DataFrame, trace: az.InferenceData ) -> np.ndarray: """Perform MCMC prediction for the constant component. Parameters ---------- future : pd.DataFrame Pandas dataframe containing the timestamps for which inference should be performed. trace : az.InferenceData Samples from the posterior. Returns ------- np.ndarray Array of shape (n_groups, n_timestamps) with constant values. """ forecasts = [] self._predict_columns = {} c_key = f"bc_{self.model_idx} - c(l={self.lower},u={self.upper})" for group_code in self.groups_.keys(): c_samples = trace["posterior"][c_key].to_numpy() if self.pool_type != "complete": c_value = c_samples[:, :, group_code].mean() else: c_value = c_samples.mean() forecast = np.ones(len(future)) * c_value forecasts.append(forecast) self._predict_columns[f"bc_{self.model_idx}_{group_code}"] = forecast return np.vstack(forecasts) def _plot( self, plot_params: dict, future: pd.DataFrame, data: pd.DataFrame, scale_params: dict, y_true: pd.DataFrame | None = None, series: int | str = "", ) -> None: """Plot the constant component. Parameters ---------- plot_params : dict Dictionary containing plotting parameters, including 'idx' for subplot indexing. future : pd.DataFrame Pandas dataframe containing the predictions. data : pd.DataFrame The training data. scale_params : dict Scaling parameters used for the data. y_true : pd.DataFrame | None, default=None A pandas dataframe containing the true values for comparison. series : int | str, default="" The series identifier for multi-series plots. """ plot_params["idx"] += 1 plt.subplot(100, 1, plot_params["idx"]) plt.title( f"BetaConstant({self.model_idx}, l={self.lower}, u={self.upper}, " f"a={self.alpha}, b={self.beta})" ) # Handle series parameter series_suffix = f"_{series}" if series != "" else "_0" col_name = f"bc_{self.model_idx}{series_suffix}" if col_name in future.columns: plt.bar([0], [future[col_name].iloc[0]]) else: col_name = f"bc_{self.model_idx}_0" if col_name in future.columns: plt.bar([0], [future[col_name].iloc[0]]) plt.axhline(0, c="k", linewidth=1) plt.axhline(self.lower, c="r", linewidth=1, linestyle="--", alpha=0.5) plt.axhline(self.upper, c="r", linewidth=1, linestyle="--", alpha=0.5) plt.grid(True, alpha=0.3) def _assign_model_idx(self, model_idxs: dict[str, int]) -> None: model_idxs["bc"] = model_idxs.get("bc", 0) self.model_idx = model_idxs["bc"] model_idxs["bc"] += 1 def _get_prior_var_names(self) -> list[str]: if self.tune_method != "prior_from_idata": return [] return [f"bc_{self.model_idx} - c(l={self.lower},u={self.upper})"]
[docs] def needs_priors(self, *args, **kwargs) -> bool: """Check if this component needs priors from idata. Returns ------- bool True if tune_method is "prior_from_idata", False otherwise. """ return self.tune_method == "prior_from_idata"
[docs] def __str__(self) -> str: """Return string representation of the component. Returns ------- str String representation. """ return ( f"BC(a={self.alpha},b={self.beta},l={self.lower},u={self.upper}," f"pt={self.pool_type},tm={self.tune_method})" )