Source code for vangja.components.linear_trend

from typing import Literal

import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt

from vangja.time_series import TimeSeriesModel
from vangja.types import PoolType, TuneMethod
from vangja.utils import get_group_definition


[docs] class LinearTrend(TimeSeriesModel): """A piecewise linear trend component with optional changepoints. This component models the trend of a time series as a piecewise linear function, following the Prophet approach. The trend can have multiple changepoints where the slope is allowed to change. The trend is defined as:: trend(t) = (k + a(t)^T * delta) * t + (m + a(t)^T * gamma) where: - ``k`` is the base slope - ``m`` is the intercept - ``delta`` is a vector of slope changes at changepoints - ``a(t)`` is an indicator vector for changepoints before time ``t`` - ``gamma`` is computed to make the trend continuous Parameters ---------- n_changepoints : int, default=25 The number of potential changepoints. Changepoints are placed uniformly in the first ``changepoint_range`` fraction of data. changepoint_range : float, default=0.8 The proportion of the time range where changepoints are allowed. For example, 0.8 means changepoints only in the first 80% of data. slope_mean : float, default=0 The mean of the Normal prior for the slope parameter. slope_sd : float, default=5 The standard deviation of the Normal prior for the slope parameter. intercept_mean : float, default=0 The mean of the Normal prior for the intercept parameter. intercept_sd : float, default=5 The standard deviation of the Normal prior for the intercept parameter. delta_mean : float, default=0 The mean of the Laplace prior for the slope changes at changepoints. delta_sd : float | None, default=0.05 The scale of the Laplace prior for slope changes. If None, the scale is learned as a random variable with an Exponential(1.5) prior. delta_side : {"left", "right"}, default="left" If "left", the slope parameter controls the slope at the earliest time point. If "right", it controls the slope at the latest time. pool_type : PoolType, default="complete" Type of pooling for multi-series data. One of: - "complete": All series share the same trend parameters - "partial": Hierarchical pooling with shared hyperpriors - "individual": Each series has independent parameters delta_pool_type : PoolType, default="complete" Pooling type specifically for changepoint deltas. Only used when ``pool_type="partial"``. tune_method : TuneMethod | None, default=None Transfer learning method. One of: - "parametric": Use posterior mean/std as new priors - "prior_from_idata": Use posterior samples directly - None: No transfer learning delta_tune_method : TuneMethod | None, default=None Transfer learning method for changepoint deltas. override_slope_mean_for_tune : np.ndarray | None, default=None Override the slope mean during transfer learning. override_slope_sd_for_tune : np.ndarray | None, default=None Override the slope standard deviation during transfer learning. override_delta_loc_for_tune : np.ndarray | None, default=None Override the delta location during transfer learning. override_delta_scale_for_tune : np.ndarray | None, default=None Override the delta scale during transfer learning. shrinkage_strength : float, default=100 Controls hierarchical shrinkage. Higher values pull individual series parameters more strongly toward the shared mean. loss_factor_for_tune : float, default=0 Regularization factor for transfer learning. Adds a penalty to keep transferred parameters close to original values. Attributes ---------- model_idx : int Index of this component in the model (set during fitting). s : np.ndarray Normalized time locations of changepoints. group : np.ndarray Array of group codes for each data point. n_groups : int Number of unique groups/series. groups_ : dict[int, str] Mapping from group codes to series names. Examples -------- >>> from vangja import LinearTrend, FourierSeasonality >>> from vangja.datasets import load_peyton_manning >>> >>> # Basic usage >>> model = LinearTrend() + FourierSeasonality(period=365.25, series_order=10) >>> model.fit(data, method="mapx") >>> predictions = model.predict(horizon=365) >>> # With hierarchical pooling for multiple series >>> model = LinearTrend( ... pool_type="partial", ... shrinkage_strength=50, ... n_changepoints=10 ... ) >>> # Transfer learning from a pre-trained model >>> target_model = LinearTrend(tune_method="parametric") >>> target_model.fit(short_series, idata=source_trace) See Also -------- FourierSeasonality : Seasonal component using Fourier series. Notes ----- The changepoint formulation follows the Facebook Prophet paper [1]_. The ``delta_side="right"`` option is an extension that allows the slope parameter to represent the end slope rather than the start slope. References ---------- .. [1] Taylor, S.J. and Letham, B., 2018. Forecasting at scale. The American Statistician, 72(1), pp.37-45. """ model_idx: int | None = None
[docs] def __init__( self, n_changepoints: int = 25, changepoint_range: float = 0.8, slope_mean: float = 0, slope_sd: float = 5, intercept_mean: float = 0, intercept_sd: float = 5, delta_mean: float = 0, delta_sd: float = 0.05, delta_side: Literal["left", "right"] = "left", pool_type: PoolType = "complete", delta_pool_type: PoolType = "complete", tune_method: TuneMethod | None = None, delta_tune_method: TuneMethod | None = None, override_slope_mean_for_tune: np.ndarray | None = None, override_slope_sd_for_tune: np.ndarray | None = None, override_delta_loc_for_tune: np.ndarray | None = None, override_delta_scale_for_tune: np.ndarray | None = None, shrinkage_strength: float = 100, loss_factor_for_tune: float = 0, ): """Creeate a Linear Trend model component. See the class docstring for full parameter descriptions. """ self.n_changepoints = n_changepoints self.changepoint_range = changepoint_range self.slope_mean = slope_mean self.slope_sd = slope_sd self.intercept_mean = intercept_mean self.intercept_sd = intercept_sd self.delta_mean = delta_mean self.delta_sd = delta_sd self.delta_side = delta_side self.pool_type = pool_type self.delta_pool_type = delta_pool_type self.tune_method = tune_method self.delta_tune_method = delta_tune_method self.override_slope_mean_for_tune = override_slope_mean_for_tune self.override_slope_sd_for_tune = override_slope_sd_for_tune self.override_delta_loc_for_tune = override_delta_loc_for_tune self.override_delta_scale_for_tune = override_delta_scale_for_tune self.shrinkage_strength = shrinkage_strength self.loss_factor_for_tune = loss_factor_for_tune
def _get_slope_params_from_idata(self, idata: az.InferenceData): """ Calculate the mean and the standard deviation of the Normal prior for the slope parameter from a provided posterior sample. Parameters ---------- idata: az.InferenceData Sample from a posterior. """ slope_key = f"lt_{self.model_idx} - slope" delta_key = f"lt_{self.model_idx} - delta" delta = ( (idata["posterior"][delta_key].to_numpy().sum(axis=2)) # self.delta_side == "right" check because of how the model was pre-trained # with the old implementation # TODO change this on release if delta_key in idata["posterior"] and self.delta_side == "right" else 0 ) if self.override_slope_mean_for_tune is not None: slope_mean = self.override_slope_mean_for_tune else: slope_mean = (idata["posterior"][slope_key].to_numpy() + delta).mean() if self.override_slope_sd_for_tune is not None: slope_sd = self.override_slope_sd_for_tune else: slope_sd = (idata["posterior"][slope_key].to_numpy() + delta).std() return slope_mean, slope_sd def _get_delta_params_from_idata(self, idata: az.InferenceData): """ Calculate the mean and the standard deviation of the Laplace prior for the change points parameter from a provided posterior sample. Parameters ---------- idata: az.InferenceData Sample from a posterior. """ delta_key = f"lt_{self.model_idx} - delta" delta = idata["posterior"][delta_key].to_numpy() if self.override_delta_loc_for_tune is not None: delta_loc = self.override_delta_loc_for_tune else: delta_loc = delta.mean(axis=(0, 1)) if self.override_delta_scale_for_tune is not None: delta_scale = self.override_delta_scale_for_tune else: delta_scale = delta.std(axis=(0, 1)) / (2**0.5) return delta_loc, delta_scale def _get_skipped_deltas(self, data: pd.DataFrame, idata: az.InferenceData): """ Calculate the sum of the skipped change point deltas for each time series. Parameters ---------- data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). idata: az.InferenceData Sample from a posterior. """ delta_key = f"lt_{self.model_idx} - delta" skipped_deltas = [] for _, group_name in sorted(self.groups_.items()): series = data[data["series"] == group_name] delta = 0 if delta_key in idata["posterior"]: # cp_before_min_t = (series["t"].min() > self.s).sum() delta = ( idata["posterior"][delta_key] .to_numpy() # .to_numpy()[:, :, :cp_before_min_t] .sum(axis=2) .mean() ) skipped_deltas.append(delta) return np.array(skipped_deltas) def _complete_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ): """ Add the LinearTrend parameters to the model when pool_type is complete. Parameters ---------- model: TimeSeriesModel The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors: dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata: az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. If idata is not None, each component from the model should specify how idata should be used to set its parameters' priors. """ with model: t = np.array(data["t"]) slope_key = f"lt_{self.model_idx} - slope" if idata is not None and self.tune_method == "parametric": slope_mean, slope_sd = self._get_slope_params_from_idata(idata) slope = pm.Normal(slope_key, slope_mean, slope_sd) elif priors is not None and self.tune_method == "prior_from_idata": slope_mean, slope_sd = self._get_slope_params_from_idata(idata) slope = pm.Deterministic(slope_key, priors[f"prior_{slope_key}"]) else: slope = pm.Normal(slope_key, self.slope_mean, self.slope_sd) intercept = pm.Normal( f"lt_{self.model_idx} - intercept", self.intercept_mean, self.intercept_sd, ) if self.n_changepoints > 0: delta_key = f"lt_{self.model_idx} - delta" if idata is not None and self.delta_tune_method == "parametric": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta = pm.Laplace( delta_key, delta_loc, delta_scale, shape=self.n_changepoints ) elif priors is not None and self.tune_method == "prior_from_idata": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta = pm.Deterministic(delta_key, priors[f"prior_{delta_key}"]) else: delta_sd = self.delta_sd if self.delta_sd is None: delta_sd = pm.Exponential(f"lt_{self.model_idx} - tau", 1.5) delta = pm.Laplace( f"lt_{self.model_idx} - delta", self.delta_mean, delta_sd, shape=self.n_changepoints, ) hist_size = int(np.floor(data.shape[0] * self.changepoint_range)) cp_indexes = ( np.linspace(0, hist_size - 1, self.n_changepoints + 1) .round() .astype(int) ) self.s = np.array(data.iloc[cp_indexes]["t"].tail(-1)) if self.delta_side == "left": A = (t[:, None] > self.s) * 1 else: A = (t[:, None] <= self.s) * 1 gamma = -self.s * delta trend = (slope + pm.math.sum(A * delta, axis=1)) * t + ( intercept + pm.math.sum(A * gamma, axis=1) ) else: trend = slope * t + intercept if idata is not None and self.tune_method is not None: pm.Potential( f"{slope_key} - loss", self.loss_factor_for_tune * pm.math.abs(slope - slope_mean), ) return trend def _partial_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ): """ Add the LinearTrend parameters to the model when pool_type is partial. Parameters ---------- model: pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors: dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata: az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. If idata is not None, each component from the model should specify how idata should be used to set its parameters' priors. """ with model: slope_key = f"lt_{self.model_idx} - slope" delta_key = f"lt_{self.model_idx} - delta" t = np.array(data["t"]) # calculate change points on first time series large_series = data[data["series"] == data["series"].iloc[0]] hist_size = int(np.floor(large_series.shape[0] * self.changepoint_range)) cp_indexes = ( np.linspace(0, hist_size - 1, self.n_changepoints + 1) .round() .astype(int) ) self.s = np.array(large_series.iloc[cp_indexes]["t"].tail(-1)) if self.delta_side == "left": A = (t[:, None] > self.s) * 1 else: A = (t[:, None] <= self.s) * 1 slope_shared = 0 if idata is not None and self.tune_method == "parametric": slope_mu, slope_sd = self._get_slope_params_from_idata(idata) slope_shared = pm.Normal( f"lt_{self.model_idx} - slope_shared", slope_mu, slope_sd ) slope_sigma = pm.HalfCauchy( f"lt_{self.model_idx} - slope_sigma", beta=slope_sd / self.shrinkage_strength, ) elif priors is not None and self.tune_method == "prior_from_idata": # TODO use delta somehow for slope shared? slope_mu, slope_sd = self._get_slope_params_from_idata(idata) slope_shared = pm.Deterministic( f"lt_{self.model_idx} - slope_shared", priors[f"prior_{slope_key}"] ) slope_sigma = pm.HalfCauchy( f"lt_{self.model_idx} - slope_sigma", beta=slope_sd / self.shrinkage_strength, ) else: slope_sigma = pm.HalfCauchy( f"lt_{self.model_idx} - slope_sigma", beta=self.slope_sd / self.shrinkage_strength, ) slope_z_offset = pm.Normal( f"lt_{self.model_idx} - slope_z_offset", mu=0, sigma=1, shape=self.n_groups, ) slope = pm.Deterministic( f"lt_{self.model_idx} - slope", slope_shared + slope_z_offset * slope_sigma, ) delta_sd = self.delta_sd if self.delta_sd is None: delta_sd = pm.Exponential(f"lt_{self.model_idx} - tau", 1.5) if self.delta_pool_type == "partial": delta_shared = 0 if idata is not None and self.tune_method == "parametric": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta_shared = pm.Normal( f"lt_{self.model_idx} - delta_shared", delta_loc, delta_scale ) delta_sigma = pm.HalfCauchy( f"lt_{self.model_idx} - delta_sigma", beta=delta_scale / self.shrinkage_strength, ) elif priors is not None and self.tune_method == "prior_from_idata": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta_shared = pm.Deterministic( f"lt_{self.model_idx} - delta_shared", priors[f"prior_{delta_key}"], ) delta_sigma = pm.HalfCauchy( f"lt_{self.model_idx} - delta_sigma", beta=delta_scale / self.shrinkage_strength, ) else: delta_sigma = pm.HalfCauchy( f"lt_{self.model_idx} - delta_sigma", beta=delta_sd ) delta_z_offset = pm.Laplace( f"lt_{self.model_idx} - delta_z_offset", 0, 1, shape=(self.n_groups, self.n_changepoints), ) delta = pm.Deterministic( delta_key, delta_shared + delta_z_offset * delta_sigma ) elif self.delta_pool_type == "individual": if idata is not None and self.tune_method == "parametric": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta = pm.Laplace( delta_key, delta_loc, delta_scale, shape=(self.n_groups, self.n_changepoints), ) elif priors is not None and self.tune_method == "prior_from_idata": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta = pm.Deterministic(delta_key, priors[f"prior_{delta_key}"]) else: delta = pm.Laplace( delta_key, self.delta_mean, delta_sd, shape=(self.n_groups, self.n_changepoints), ) else: if idata is not None and self.tune_method == "parametric": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta = pm.Laplace( delta_key, delta_loc, delta_scale, shape=self.n_changepoints ) elif priors is not None and self.tune_method == "prior_from_idata": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta = pm.Deterministic(delta_key, priors[f"prior_{delta_key}"]) else: delta = pm.Laplace( delta_key, self.delta_mean, delta_sd, shape=self.n_changepoints ) intercept = pm.Normal( f"lt_{self.model_idx} - intercept", self.intercept_mean, self.intercept_sd, shape=self.n_groups, ) if idata is not None and self.tune_method is not None: pm.Potential( f"{slope_key} - loss", -self.loss_factor_for_tune * pm.math.sum(pm.math.sqr(slope - slope_mu)), ) if self.delta_pool_type in ["partial", "individual"]: delta = delta[self.group] gamma = -self.s * delta return (slope[self.group] + pm.math.sum(A * delta, axis=1)) * t + ( intercept[self.group] + pm.math.sum(A * gamma, axis=1) ) def _individual_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ): """ Add the LinearTrend parameters to the model when pool_type is individual. Parameters ---------- model: pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors: dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata: az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. If idata is not None, each component from the model should specify how idata should be used to set its parameters' priors. """ with model: slope_key = f"lt_{self.model_idx} - slope" t = np.array(data["t"]) # calculate change points on first time series large_series = data[data["series"] == data["series"].iloc[0]] hist_size = int(np.floor(large_series.shape[0] * self.changepoint_range)) cp_indexes = ( np.linspace(0, hist_size - 1, self.n_changepoints + 1) .round() .astype(int) ) self.s = np.array(large_series.iloc[cp_indexes]["t"].tail(-1)) if self.delta_side == "left": A = (t[:, None] > self.s) * 1 else: A = (t[:, None] <= self.s) * 1 if idata is not None and self.tune_method == "parametric": slope_mu, slope_sd = self._get_slope_params_from_idata(idata) slope = pm.Normal(slope_key, slope_mu, slope_sd, shape=self.n_groups) elif priors is not None and self.tune_method == "prior_from_idata": slope_mu, slope_sd = self._get_slope_params_from_idata(idata) slope = pm.Normal( slope_key, mu=priors[f"prior_{slope_key}"], sigma=slope_sd, shape=self.n_groups, ) else: slope = pm.Normal( slope_key, self.slope_mean, self.slope_sd, shape=self.n_groups ) intercept = pm.Normal( f"lt_{self.model_idx} - intercept", self.intercept_mean, self.intercept_sd, shape=self.n_groups, ) if idata is not None and self.tune_method is not None: pm.Potential( f"{slope_key} - loss", self.loss_factor_for_tune * pm.math.sum(pm.math.sqr(slope - slope_mu)), ) if self.n_changepoints: delta_key = f"lt_{self.model_idx} - delta" if idata is not None and self.delta_tune_method == "parametric": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta = pm.Laplace( delta_key, delta_loc, delta_scale, shape=(self.n_groups, self.n_changepoints), ) elif priors is not None and self.tune_method == "prior_from_idata": delta_loc, delta_scale = self._get_delta_params_from_idata(idata) delta = pm.Laplace( delta_key, mu=priors[f"prior_{delta_key}"], b=delta_scale, shape=(self.n_groups, self.n_changepoints), ) else: delta_sd = self.delta_sd if self.delta_sd is None: delta_sd = pm.Exponential(f"lt_{self.model_idx} - tau", 1.5) delta = pm.Laplace( delta_key, self.delta_mean, delta_sd, shape=(self.n_groups, self.n_changepoints), ) gamma = -self.s * delta[self.group] return ( slope[self.group] + pm.math.sum(A * delta[self.group], axis=1) ) * t + (intercept[self.group] + pm.math.sum(A * gamma, axis=1)) else: return slope[self.group] * t + intercept[self.group]
[docs] def definition( self, model: TimeSeriesModel, data: pd.DataFrame, model_idxs: dict[str, int], priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ): """ Add the LinearTrend parameters to the model. Parameters ---------- model: TimeSeriesModel The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). model_idxs: dict[str, int] Count of the number of components from each type. priors: dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata: az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. If idata is not None, each component from the model should specify how idata should be used to set its parameters' priors. """ model_idxs["lt"] = model_idxs.get("lt", 0) self.model_idx = model_idxs["lt"] model_idxs["lt"] += 1 self.group, self.n_groups, self.groups_ = get_group_definition( data, self.pool_type ) with model: if self.pool_type == "complete": return self._complete_definition(model, data, priors, idata) elif self.pool_type == "partial": return self._partial_definition(model, data, priors, idata) elif self.pool_type == "individual": return self._individual_definition(model, data, priors, idata)
def _assign_model_idx(self, model_idxs: dict[str, int]) -> None: model_idxs["lt"] = model_idxs.get("lt", 0) self.model_idx = model_idxs["lt"] model_idxs["lt"] += 1 def _get_prior_var_names(self) -> list[str]: if self.tune_method != "prior_from_idata": return [] names = [f"lt_{self.model_idx} - slope"] if self.n_changepoints > 0: names.append(f"lt_{self.model_idx} - delta") return names def _get_initval(self, initvals: dict[str, float], model: pm.Model) -> dict: """Get the initval of the slope and the intercept of the linear trend. Parameters ---------- initvals: dict[str, float] Calculated initvals based on data. model: pm.Model The model for which the initvals will be set. """ slopes = [] intercepts = [] for key in sorted(self.groups_.keys()): slopes.append(initvals.get(f"slope_{key}", None)) intercepts.append(initvals.get(f"intercept_{key}", None)) result = {} slope_var = model.named_vars[f"lt_{self.model_idx} - slope"] intercept_var = model.named_vars[f"lt_{self.model_idx} - intercept"] # Only set initvals for free RVs (skip Deterministic from prior_from_idata) if slope_var in model.free_RVs: if self.pool_type == "complete" or self.n_groups == 1: result[slope_var] = slopes[0] else: result[slope_var] = np.array(slopes) if intercept_var in model.free_RVs: if self.pool_type == "complete" or self.n_groups == 1: result[intercept_var] = intercepts[0] else: result[intercept_var] = np.array(intercepts) return result def _predict_map(self, future, map_approx): forecasts = [] self._predict_columns = {} slope_key = f"lt_{self.model_idx} - slope" intercept_key = f"lt_{self.model_idx} - intercept" delta_key = f"lt_{self.model_idx} - delta" for group_code in self.groups_.keys(): slope_correction = 0 intercept_correction = 0 if self.n_changepoints > 0: if self.delta_side == "left": new_A = (np.array(future["t"])[:, None] > self.s) * 1 else: new_A = (np.array(future["t"])[:, None] <= self.s) * 1 if delta_key in map_approx: delta = map_approx[delta_key] else: delta = map_approx[f"prior_{delta_key}"] if ( self.pool_type == "individual" or ( self.pool_type == "partial" and self.delta_pool_type in ["partial", "individual"] ) ) and self.n_groups > 1: delta = delta[group_code] slope_correction = new_A @ delta intercept_correction = new_A @ (-self.s * delta) if slope_key in map_approx: slope = map_approx[slope_key] else: slope = map_approx[f"prior_{slope_key}"] if intercept_key in map_approx: intercept = map_approx[intercept_key] else: intercept = map_approx[f"prior_{intercept_key}"] if self.pool_type != "complete" and self.n_groups > 1: slope = slope[group_code] intercept = intercept[group_code] slope = slope + slope_correction intercept = intercept + intercept_correction forecasts.append(np.array(slope * future["t"] + intercept)) self._predict_columns[f"lt_{self.model_idx}_{group_code}"] = forecasts[-1] return np.vstack(forecasts) def _predict_mcmc(self, future, trace): slope_key = f"lt_{self.model_idx} - slope" intercept_key = f"lt_{self.model_idx} - intercept" delta_key = f"lt_{self.model_idx} - delta" forecasts = [] self._predict_columns = {} for group_code in self.groups_.keys(): # Get slope and intercept, averaging over chains and draws if slope_key in trace["posterior"]: slope = trace["posterior"][slope_key].to_numpy().mean(axis=(0, 1)) else: slope = ( trace["posterior"][f"prior_{slope_key}"] .to_numpy() .mean(axis=(0, 1)) ) if intercept_key in trace["posterior"]: intercept = ( trace["posterior"][intercept_key].to_numpy().mean(axis=(0, 1)) ) else: intercept = ( trace["posterior"][f"prior_{intercept_key}"] .to_numpy() .mean(axis=(0, 1)) ) # Handle per-group parameters if self.pool_type != "complete" and self.n_groups > 1: slope = slope[group_code] if slope.ndim > 0 else slope intercept = intercept[group_code] if intercept.ndim > 0 else intercept slope_correction = 0 intercept_correction = 0 if f"prior_{delta_key}" in trace["posterior"]: delta_key = f"prior_{delta_key}" if delta_key in trace["posterior"]: if self.delta_side == "left": new_A = (np.array(future["t"])[:, None] > self.s) * 1 else: new_A = (np.array(future["t"])[:, None] <= self.s) * 1 delta = trace["posterior"][delta_key].to_numpy().mean(axis=(0, 1)) # Handle per-group delta parameters if ( self.pool_type == "individual" or ( self.pool_type == "partial" and self.delta_pool_type in ["partial", "individual"] ) ) and self.n_groups > 1: delta = delta[group_code] slope_correction = new_A @ delta intercept_correction = new_A @ (-self.s * delta) total_slope = slope + slope_correction total_intercept = intercept + intercept_correction forecast = total_slope * future["t"].to_numpy() + total_intercept forecasts.append(forecast) self._predict_columns[f"lt_{self.model_idx}_{group_code}"] = forecast return np.vstack(forecasts) def _plot(self, plot_params, future, data, scale_params, y_true=None, series=""): plot_params["idx"] += 1 plt.subplot(100, 1, plot_params["idx"]) plt.title(f"LinearTrend({self.model_idx})") plt.grid() # Handle series parameter - use _0 as default for complete pooling if series == "": series_suffix = "_0" else: series_suffix = f"_{series}" plt.plot( future["ds"], future[f"lt_{self.model_idx}{series_suffix}"], lw=1, label=f"lt_{self.model_idx}", ) plt.legend()
[docs] def needs_priors(self, *args, **kwargs): return self.tune_method == "prior_from_idata"
[docs] def is_individual(self, *args, **kwargs): return self.pool_type == "individual"
def __str__(self): return f"LT(n={self.n_changepoints},r={self.changepoint_range},tm={self.tune_method})"