Source code for vangja.components.fourier_seasonality

import math

import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt

from vangja.time_series import TimeSeriesModel
from vangja.types import PoolType, TuneMethod
from vangja.utils import get_group_definition


[docs] class FourierSeasonality(TimeSeriesModel): """A seasonal component using Fourier series representation. This component models periodic patterns in time series using a Fourier series, following the Prophet approach. It allows flexible representation of seasonal effects with controllable complexity via the number of terms. The seasonality is defined as:: seasonality(t) = sum_{n=1}^{N} [a_n * cos(2*pi*n*t/P) + b_n * sin(2*pi*n*t/P)] where: - ``P`` is the period (e.g., 365.25 for yearly, 7 for weekly) - ``N`` is the series order (number of Fourier terms) - ``a_n``, ``b_n`` are the Fourier coefficients (collectively called beta) Parameters ---------- period : float The period of the seasonal pattern in days. Common values: - 365.25: Yearly seasonality - 7: Weekly seasonality - 30.4375: Monthly seasonality (average month length) series_order : int Number of Fourier terms. Higher values capture more complex patterns but may overfit. Guidelines: - Yearly: 10-20 terms - Weekly: 3-5 terms - Monthly: 3-5 terms beta_mean : float, default=0 The mean of the Normal prior for the Fourier coefficients. beta_sd : float, default=10 The standard deviation of the Normal prior for the Fourier coefficients. pool_type : PoolType, default="complete" Type of pooling for multi-series data. One of: - "complete": All series share the same seasonal pattern - "partial": Hierarchical pooling with shared seasonal shape - "individual": Each series has independent seasonal patterns tune_method : TuneMethod | None, default=None Transfer learning method. One of: - "parametric": Use posterior mean/std as new priors - "prior_from_idata": Use posterior samples directly - None: No transfer learning override_beta_mean_for_tune : np.ndarray | None, default=None Override the beta mean during transfer learning. override_beta_sd_for_tune : np.ndarray | None, default=None Override the beta standard deviation during transfer learning. shrinkage_strength : float, default=1 Controls hierarchical shrinkage. Higher values pull individual series parameters more strongly toward the shared pattern. shift_for_tune : bool, default=False If True, learn a time shift parameter during transfer learning to align seasonal patterns across time series. loss_factor_for_tune : float, default=1 Regularization factor for transfer learning. Adds a penalty to preserve the original seasonal amplitude. Attributes ---------- model_idx : int | None Index of this component in the model (set during fitting). group : np.ndarray Array of group codes for each data point. n_groups : int Number of unique groups/series. groups_ : dict[int, str] Mapping from group codes to series names. Examples -------- >>> from vangja import LinearTrend, FourierSeasonality >>> from vangja.datasets import load_air_passengers >>> >>> # Basic additive seasonality >>> model = LinearTrend() + FourierSeasonality(period=365.25, series_order=10) >>> model.fit(data) >>> >>> # Multiplicative seasonality (Prophet-style) >>> model = LinearTrend() ** FourierSeasonality(period=365.25, series_order=10) >>> >>> # Multiple seasonal components >>> model = LinearTrend() ** ( ... FourierSeasonality(period=365.25, series_order=10) + ... FourierSeasonality(period=7, series_order=3) ... ) See Also -------- LinearTrend : Piecewise linear trend component. Notes ----- The Fourier series representation is based on the Prophet paper [2]_. Using more Fourier terms allows fitting more complex seasonal patterns but increases the risk of overfitting. References ---------- .. [2] Taylor, S.J. and Letham, B., 2018. Forecasting at scale. The American Statistician, 72(1), pp.37-45. """ model_idx: int | None = None
[docs] def __init__( self, period: float, series_order: int, beta_mean: float = 0, beta_sd: float = 10, pool_type: PoolType = "complete", tune_method: TuneMethod | None = None, override_beta_mean_for_tune: np.ndarray | None = None, override_beta_sd_for_tune: np.ndarray | None = None, shrinkage_strength: float = 1, shift_for_tune: bool = False, loss_factor_for_tune: float = 0, ): """Create a Fourier Seasonality model component. See the class docstring for full parameter descriptions. """ self.period = period self.series_order = series_order self.beta_mean = beta_mean self.beta_sd = beta_sd self.shrinkage_strength = shrinkage_strength self.pool_type = pool_type self.tune_method = tune_method self.override_beta_mean_for_tune = override_beta_mean_for_tune self.override_beta_sd_for_tune = override_beta_sd_for_tune self.shift_for_tune = shift_for_tune self.loss_factor_for_tune = loss_factor_for_tune
def _fourier_series(self, data: pd.DataFrame, shift=None): # convert to days since epoch t = (data["ds"] - pd.Timestamp("1970-01-01")).dt.days.values.astype(float) if shift is not None: t += shift x_T = t * np.pi * 2 fourier_components = np.empty((data["ds"].shape[0], 2 * self.series_order)) if shift is not None and type(shift) is not np.ndarray: fourier_components = pt.as_tensor_variable(fourier_components) for i in range(self.series_order): c = x_T * (i + 1) / self.period if type(fourier_components) is np.ndarray: fourier_components[:, 2 * i] = np.sin(c) fourier_components[:, (2 * i) + 1] = np.cos(c) else: fourier_components = pt.set_subtensor( fourier_components[:, 2 * i], np.sin(c) ) fourier_components = pt.set_subtensor( fourier_components[:, (2 * i) + 1], np.cos(c) ) return fourier_components def _get_beta_params_from_idata(self, idata: az.InferenceData): """ Calculate the mean and the standard deviation of the Normal prior for the Fourier series coefficients parameter from a provided posterior sample. Parameters ---------- idata: az.InferenceData Sample from a posterior. """ beta_key = f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})" if self.override_beta_mean_for_tune is not None: beta_mean = self.override_beta_mean_for_tune else: beta_mean = idata["posterior"][beta_key].to_numpy().mean(axis=(1, 0)) if self.override_beta_sd_for_tune is not None: beta_sd = self.override_beta_sd_for_tune else: beta_sd = idata["posterior"][beta_key].to_numpy().std(axis=(1, 0)) return beta_mean, beta_sd def _complete_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ): """ Add the FourierSeasonality parameters to the model when pool_type is complete. Parameters ---------- model: pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors: dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata: az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. If idata is not None, each component from the model should specify how idata should be used to set its parameters' priors. """ with model: x = self._fourier_series(data) beta_key = ( f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})" ) if idata is not None and self.tune_method == "parametric": beta_mean, beta_sd = self._get_beta_params_from_idata(idata) beta = pm.Normal( beta_key, beta_mean, beta_sd, shape=2 * self.series_order ) elif priors is not None and self.tune_method == "prior_from_idata": beta_mean, beta_sd = self._get_beta_params_from_idata(idata) beta = pm.Deterministic(beta_key, priors[f"prior_{beta_key}"]) else: beta = pm.Normal( beta_key, self.beta_mean, self.beta_sd, shape=2 * self.series_order ) if idata is not None and self.tune_method is not None: reg_ds = pd.DataFrame( { "ds": pd.date_range( "2000-01-01", periods=math.ceil(self.period), freq="D" ) } ) reg_x = self._fourier_series(reg_ds) old = pm.math.sum(reg_x * beta_mean, axis=1) new = pm.math.sum(reg_x * beta, axis=1) lam = ( 2 * self.period / data.shape[0] if self.period > 2 * data.shape[0] else 0 ) pm.Potential( f"{beta_key} - loss", self.loss_factor_for_tune * lam * pm.math.minimum(0, pm.math.dot(old, old) - pm.math.dot(new, new)), ) return pm.math.sum(x * beta, axis=1) def _partial_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ): """ Add the FourierSeasonality parameters to the model when pool_type is partial. Parameters ---------- model: pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors: dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata: az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. If idata is not None, each component from the model should specify how idata should be used to set its parameters' priors. """ with model: x = self._fourier_series(data) beta_key = ( f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})" ) beta_sd = self.beta_sd if idata is not None and self.tune_method == "parametric": beta_mean, beta_sd = self._get_beta_params_from_idata(idata) beta_shared = pm.Normal( f"fs_{self.model_idx} - beta_shared", beta_mean, beta_sd, shape=2 * self.series_order, ) elif priors is not None and self.tune_method == "prior_from_idata": beta_mean, beta_sd = self._get_beta_params_from_idata(idata) beta_shared = pm.Deterministic( f"fs_{self.model_idx} - beta_shared", priors[f"prior_{beta_key}"] ) else: beta_shared = pm.Normal( f"fs_{self.model_idx} - beta_shared", self.beta_mean, beta_sd, shape=2 * self.series_order, ) beta_sigma = pm.HalfNormal( f"fs_{self.model_idx} - beta_sigma(p={self.period},n={self.series_order})", sigma=beta_sd / self.shrinkage_strength, shape=2 * self.series_order, ) beta_z_offset = pm.Normal( f"fs_{self.model_idx} - beta_z_offset(p={self.period},n={self.series_order})", mu=0, sigma=1, shape=(self.n_groups, 2 * self.series_order), ) beta = pm.Deterministic( beta_key, beta_shared + beta_z_offset * beta_sigma, ) if idata is not None and self.tune_method is not None: reg_ds = pd.DataFrame( { "ds": pd.date_range( "2000-01-01", periods=math.ceil(self.period), freq="D" ) } ) reg_x = self._fourier_series(reg_ds) old = pm.math.sum(reg_x * beta_mean, axis=1) new = pm.math.dot(beta, reg_x.T) lam = np.array( [ ( 2 * data[self.group == group_code].shape[0] if self.period > 2 * data[self.group == group_code].shape[0] else 0 ) for group_code in self.groups_ ] ) pm.Potential( f"{beta_key} - loss", self.loss_factor_for_tune * pm.math.sum( lam * pm.math.minimum( 0, pm.math.dot(old, old) - pm.math.sum(new * new, axis=1) ) ), ) return pm.math.sum(x * beta[self.group], axis=1) def _individual_definition( self, model: pm.Model, data: pd.DataFrame, priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ): """ Add the FourierSeasonality parameters to the model when pool_type is individual. Parameters ---------- model: pm.Model The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). priors: dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata: az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. If idata is not None, each component from the model should specify how idata should be used to set its parameters' priors. """ with model: x = self._fourier_series(data) beta_key = ( f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})" ) if idata is not None and self.tune_method == "parametric": beta_mean, beta_sd = self._get_beta_params_from_idata(idata) beta = pm.Normal( beta_key, beta_mean, beta_sd, shape=(self.n_groups, 2 * self.series_order), ) elif priors is not None and self.tune_method == "prior_from_idata": beta_mean, beta_sd = self._get_beta_params_from_idata(idata) beta = pm.Normal( beta_key, mu=priors[f"prior_{beta_key}"], sigma=beta_sd, shape=(self.n_groups, 2 * self.series_order), ) else: beta = pm.Normal( beta_key, self.beta_mean, self.beta_sd, shape=(self.n_groups, 2 * self.series_order), ) if idata is not None and self.tune_method is not None: reg_ds = pd.DataFrame( { "ds": pd.date_range( "2000-01-01", periods=math.ceil(self.period), freq="D" ) } ) reg_x = self._fourier_series(reg_ds) old = pm.math.sum(reg_x * beta_mean, axis=1) new = pm.math.dot(beta, reg_x.T) lam = np.array( [ ( 2 * data[self.group == group_code].shape[0] if self.period > 2 * data[self.group == group_code].shape[0] else 0 ) for group_code in self.groups_ ] ) pm.Potential( f"{beta_key} - loss", self.loss_factor_for_tune * pm.math.sum( lam * pm.math.minimum( 0, pm.math.dot(old, old) - pm.math.sum(new * new, axis=1) ) ), ) return pm.math.sum(x * beta[self.group], axis=1)
[docs] def definition( self, model: TimeSeriesModel, data: pd.DataFrame, model_idxs: dict[str, int], priors: dict[str, pt.TensorVariable] | None, idata: az.InferenceData | None, ): """ Add the FourierSeasonality parameters to the model. Parameters ---------- model: TimeSeriesModel The model to which the parameters are added. data : pd.DataFrame A pandas dataframe that must at least have columns ds (predictor), y (target) and series (name of time series). model_idxs: dict[str, int] Count of the number of components from each type. priors: dict[str, pt.TensorVariable] | None A dictionary of multivariate normal random variables approximating the posterior sample in idata. idata: az.InferenceData | None Sample from a posterior. If it is not None, Vangja will use this to set the parameters' priors in the model. If idata is not None, each component from the model should specify how idata should be used to set its parameters' priors. """ model_idxs["fs"] = model_idxs.get("fs", 0) self.model_idx = model_idxs["fs"] model_idxs["fs"] += 1 self.group, self.n_groups, self.groups_ = get_group_definition( data, self.pool_type ) with model: if self.pool_type == "complete": return self._complete_definition(model, data, priors, idata) elif self.pool_type == "partial": return self._partial_definition(model, data, priors, idata) elif self.pool_type == "individual": return self._individual_definition(model, data, priors, idata)
def _get_initval(self, initvals: dict[str, float], model: pm.Model) -> dict: """Get the initval of the Fourier series coefficients parameters. Parameters ---------- initvals: dict[str, float] Calculated initvals based on data. model: pm.Model The model for which the initvals will be set. """ return {} def _det_seasonality_posterior(self, beta, x): return np.dot(x, beta.T) def _predict_map(self, future, map_approx): forecasts = [] self._predict_columns = {} for group_code in self.groups_.keys(): beta_key = ( f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})" ) if beta_key in map_approx: beta = map_approx[beta_key] else: beta = map_approx[f"prior_{beta_key}"] if self.pool_type != "complete" and self.n_groups > 1: beta = beta[group_code] forecasts.append( self._det_seasonality_posterior(beta, self._fourier_series(future)) ) self._predict_columns[f"fs_{self.model_idx}_{group_code}"] = forecasts[-1] return np.vstack(forecasts) def _predict_mcmc(self, future, trace): shift = trace["posterior"].get(f"fs_{self.model_idx} - shift", None) if shift is not None: shift = shift.mean() beta_key = f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})" forecasts = [] self._predict_columns = {} for group_code in self.groups_.keys(): # Get beta, averaging over chains and draws if beta_key in trace["posterior"]: beta = trace["posterior"][beta_key].to_numpy().mean(axis=(0, 1)) else: beta = ( trace["posterior"][f"prior_{beta_key}"].to_numpy().mean(axis=(0, 1)) ) # Handle per-group parameters if self.pool_type != "complete" and self.n_groups > 1: beta = beta[group_code] forecast = self._det_seasonality_posterior( beta, self._fourier_series(future, shift) ) forecasts.append(forecast) self._predict_columns[f"fs_{self.model_idx}_{group_code}"] = forecast return np.vstack(forecasts) def _plot(self, plot_params, future, data, scale_params, y_true=None, series=""): date = future["ds"] if self.period > 7 else future["ds"].dt.day_name() plot_params["idx"] += 1 plt.subplot(100, 1, plot_params["idx"]) plt.title( f"FourierSeasonality({self.model_idx},p={self.period},n={self.series_order})" ) plt.grid() # Handle series parameter - use _0 as default for complete pooling if series == "": series_suffix = "_0" else: series_suffix = f"_{series}" plt.plot( date[-int(self.period) :], future[f"fs_{self.model_idx}{series_suffix}"][-int(self.period) :], lw=1, ) def _assign_model_idx(self, model_idxs: dict[str, int]) -> None: model_idxs["fs"] = model_idxs.get("fs", 0) self.model_idx = model_idxs["fs"] model_idxs["fs"] += 1 def _get_prior_var_names(self) -> list[str]: if self.tune_method != "prior_from_idata": return [] return [f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})"]
[docs] def needs_priors(self, *args, **kwargs): return self.tune_method == "prior_from_idata"
[docs] def is_individual(self, *args, **kwargs): return self.pool_type == "individual"
def __str__(self): return f"FS(p={self.period},n={self.series_order},pt={self.pool_type},tm={self.tune_method})"