import math
import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
from vangja.time_series import TimeSeriesModel
from vangja.types import PoolType, TuneMethod
from vangja.utils import get_group_definition
[docs]
class FourierSeasonality(TimeSeriesModel):
"""A seasonal component using Fourier series representation.
This component models periodic patterns in time series using a Fourier
series, following the Prophet approach. It allows flexible representation
of seasonal effects with controllable complexity via the number of terms.
The seasonality is defined as::
seasonality(t) = sum_{n=1}^{N} [a_n * cos(2*pi*n*t/P) + b_n * sin(2*pi*n*t/P)]
where:
- ``P`` is the period (e.g., 365.25 for yearly, 7 for weekly)
- ``N`` is the series order (number of Fourier terms)
- ``a_n``, ``b_n`` are the Fourier coefficients (collectively called beta)
Parameters
----------
period : float
The period of the seasonal pattern in days. Common values:
- 365.25: Yearly seasonality
- 7: Weekly seasonality
- 30.4375: Monthly seasonality (average month length)
series_order : int
Number of Fourier terms. Higher values capture more complex patterns
but may overfit. Guidelines:
- Yearly: 10-20 terms
- Weekly: 3-5 terms
- Monthly: 3-5 terms
beta_mean : float, default=0
The mean of the Normal prior for the Fourier coefficients.
beta_sd : float, default=10
The standard deviation of the Normal prior for the Fourier coefficients.
pool_type : PoolType, default="complete"
Type of pooling for multi-series data. One of:
- "complete": All series share the same seasonal pattern
- "partial": Hierarchical pooling with shared seasonal shape
- "individual": Each series has independent seasonal patterns
tune_method : TuneMethod | None, default=None
Transfer learning method. One of:
- "parametric": Use posterior mean/std as new priors
- "prior_from_idata": Use posterior samples directly
- None: No transfer learning
override_beta_mean_for_tune : np.ndarray | None, default=None
Override the beta mean during transfer learning.
override_beta_sd_for_tune : np.ndarray | None, default=None
Override the beta standard deviation during transfer learning.
shrinkage_strength : float, default=1
Controls hierarchical shrinkage. Higher values pull individual
series parameters more strongly toward the shared pattern.
shift_for_tune : bool, default=False
If True, learn a time shift parameter during transfer learning
to align seasonal patterns across time series.
loss_factor_for_tune : float, default=1
Regularization factor for transfer learning. Adds a penalty to
preserve the original seasonal amplitude.
Attributes
----------
model_idx : int | None
Index of this component in the model (set during fitting).
group : np.ndarray
Array of group codes for each data point.
n_groups : int
Number of unique groups/series.
groups_ : dict[int, str]
Mapping from group codes to series names.
Examples
--------
>>> from vangja import LinearTrend, FourierSeasonality
>>> from vangja.datasets import load_air_passengers
>>>
>>> # Basic additive seasonality
>>> model = LinearTrend() + FourierSeasonality(period=365.25, series_order=10)
>>> model.fit(data)
>>>
>>> # Multiplicative seasonality (Prophet-style)
>>> model = LinearTrend() ** FourierSeasonality(period=365.25, series_order=10)
>>>
>>> # Multiple seasonal components
>>> model = LinearTrend() ** (
... FourierSeasonality(period=365.25, series_order=10) +
... FourierSeasonality(period=7, series_order=3)
... )
See Also
--------
LinearTrend : Piecewise linear trend component.
Notes
-----
The Fourier series representation is based on the Prophet paper [2]_.
Using more Fourier terms allows fitting more complex seasonal patterns
but increases the risk of overfitting.
References
----------
.. [2] Taylor, S.J. and Letham, B., 2018. Forecasting at scale.
The American Statistician, 72(1), pp.37-45.
"""
model_idx: int | None = None
[docs]
def __init__(
self,
period: float,
series_order: int,
beta_mean: float = 0,
beta_sd: float = 10,
pool_type: PoolType = "complete",
tune_method: TuneMethod | None = None,
override_beta_mean_for_tune: np.ndarray | None = None,
override_beta_sd_for_tune: np.ndarray | None = None,
shrinkage_strength: float = 1,
shift_for_tune: bool = False,
loss_factor_for_tune: float = 0,
):
"""Create a Fourier Seasonality model component.
See the class docstring for full parameter descriptions.
"""
self.period = period
self.series_order = series_order
self.beta_mean = beta_mean
self.beta_sd = beta_sd
self.shrinkage_strength = shrinkage_strength
self.pool_type = pool_type
self.tune_method = tune_method
self.override_beta_mean_for_tune = override_beta_mean_for_tune
self.override_beta_sd_for_tune = override_beta_sd_for_tune
self.shift_for_tune = shift_for_tune
self.loss_factor_for_tune = loss_factor_for_tune
def _fourier_series(self, data: pd.DataFrame, shift=None):
# convert to days since epoch
t = (data["ds"] - pd.Timestamp("1970-01-01")).dt.days.values.astype(float)
if shift is not None:
t += shift
x_T = t * np.pi * 2
fourier_components = np.empty((data["ds"].shape[0], 2 * self.series_order))
if shift is not None and type(shift) is not np.ndarray:
fourier_components = pt.as_tensor_variable(fourier_components)
for i in range(self.series_order):
c = x_T * (i + 1) / self.period
if type(fourier_components) is np.ndarray:
fourier_components[:, 2 * i] = np.sin(c)
fourier_components[:, (2 * i) + 1] = np.cos(c)
else:
fourier_components = pt.set_subtensor(
fourier_components[:, 2 * i], np.sin(c)
)
fourier_components = pt.set_subtensor(
fourier_components[:, (2 * i) + 1], np.cos(c)
)
return fourier_components
def _get_beta_params_from_idata(self, idata: az.InferenceData):
"""
Calculate the mean and the standard deviation of the Normal prior for the
Fourier series coefficients parameter from a provided posterior sample.
Parameters
----------
idata: az.InferenceData
Sample from a posterior.
"""
beta_key = f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})"
if self.override_beta_mean_for_tune is not None:
beta_mean = self.override_beta_mean_for_tune
else:
beta_mean = idata["posterior"][beta_key].to_numpy().mean(axis=(1, 0))
if self.override_beta_sd_for_tune is not None:
beta_sd = self.override_beta_sd_for_tune
else:
beta_sd = idata["posterior"][beta_key].to_numpy().std(axis=(1, 0))
return beta_mean, beta_sd
def _complete_definition(
self,
model: pm.Model,
data: pd.DataFrame,
priors: dict[str, pt.TensorVariable] | None,
idata: az.InferenceData | None,
):
"""
Add the FourierSeasonality parameters to the model when pool_type is complete.
Parameters
----------
model: pm.Model
The model to which the parameters are added.
data : pd.DataFrame
A pandas dataframe that must at least have columns ds (predictor), y
(target) and series (name of time series).
priors: dict[str, pt.TensorVariable] | None
A dictionary of multivariate normal random variables approximating the
posterior sample in idata.
idata: az.InferenceData | None
Sample from a posterior. If it is not None, Vangja will use this to set the
parameters' priors in the model. If idata is not None, each component from
the model should specify how idata should be used to set its parameters'
priors.
"""
with model:
x = self._fourier_series(data)
beta_key = (
f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})"
)
if idata is not None and self.tune_method == "parametric":
beta_mean, beta_sd = self._get_beta_params_from_idata(idata)
beta = pm.Normal(
beta_key, beta_mean, beta_sd, shape=2 * self.series_order
)
elif priors is not None and self.tune_method == "prior_from_idata":
beta_mean, beta_sd = self._get_beta_params_from_idata(idata)
beta = pm.Deterministic(beta_key, priors[f"prior_{beta_key}"])
else:
beta = pm.Normal(
beta_key, self.beta_mean, self.beta_sd, shape=2 * self.series_order
)
if idata is not None and self.tune_method is not None:
reg_ds = pd.DataFrame(
{
"ds": pd.date_range(
"2000-01-01", periods=math.ceil(self.period), freq="D"
)
}
)
reg_x = self._fourier_series(reg_ds)
old = pm.math.sum(reg_x * beta_mean, axis=1)
new = pm.math.sum(reg_x * beta, axis=1)
lam = (
2 * self.period / data.shape[0]
if self.period > 2 * data.shape[0]
else 0
)
pm.Potential(
f"{beta_key} - loss",
self.loss_factor_for_tune
* lam
* pm.math.minimum(0, pm.math.dot(old, old) - pm.math.dot(new, new)),
)
return pm.math.sum(x * beta, axis=1)
def _partial_definition(
self,
model: pm.Model,
data: pd.DataFrame,
priors: dict[str, pt.TensorVariable] | None,
idata: az.InferenceData | None,
):
"""
Add the FourierSeasonality parameters to the model when pool_type is partial.
Parameters
----------
model: pm.Model
The model to which the parameters are added.
data : pd.DataFrame
A pandas dataframe that must at least have columns ds (predictor), y
(target) and series (name of time series).
priors: dict[str, pt.TensorVariable] | None
A dictionary of multivariate normal random variables approximating the
posterior sample in idata.
idata: az.InferenceData | None
Sample from a posterior. If it is not None, Vangja will use this to set the
parameters' priors in the model. If idata is not None, each component from
the model should specify how idata should be used to set its parameters'
priors.
"""
with model:
x = self._fourier_series(data)
beta_key = (
f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})"
)
beta_sd = self.beta_sd
if idata is not None and self.tune_method == "parametric":
beta_mean, beta_sd = self._get_beta_params_from_idata(idata)
beta_shared = pm.Normal(
f"fs_{self.model_idx} - beta_shared",
beta_mean,
beta_sd,
shape=2 * self.series_order,
)
elif priors is not None and self.tune_method == "prior_from_idata":
beta_mean, beta_sd = self._get_beta_params_from_idata(idata)
beta_shared = pm.Deterministic(
f"fs_{self.model_idx} - beta_shared", priors[f"prior_{beta_key}"]
)
else:
beta_shared = pm.Normal(
f"fs_{self.model_idx} - beta_shared",
self.beta_mean,
beta_sd,
shape=2 * self.series_order,
)
beta_sigma = pm.HalfNormal(
f"fs_{self.model_idx} - beta_sigma(p={self.period},n={self.series_order})",
sigma=beta_sd / self.shrinkage_strength,
shape=2 * self.series_order,
)
beta_z_offset = pm.Normal(
f"fs_{self.model_idx} - beta_z_offset(p={self.period},n={self.series_order})",
mu=0,
sigma=1,
shape=(self.n_groups, 2 * self.series_order),
)
beta = pm.Deterministic(
beta_key,
beta_shared + beta_z_offset * beta_sigma,
)
if idata is not None and self.tune_method is not None:
reg_ds = pd.DataFrame(
{
"ds": pd.date_range(
"2000-01-01", periods=math.ceil(self.period), freq="D"
)
}
)
reg_x = self._fourier_series(reg_ds)
old = pm.math.sum(reg_x * beta_mean, axis=1)
new = pm.math.dot(beta, reg_x.T)
lam = np.array(
[
(
2 * data[self.group == group_code].shape[0]
if self.period > 2 * data[self.group == group_code].shape[0]
else 0
)
for group_code in self.groups_
]
)
pm.Potential(
f"{beta_key} - loss",
self.loss_factor_for_tune
* pm.math.sum(
lam
* pm.math.minimum(
0, pm.math.dot(old, old) - pm.math.sum(new * new, axis=1)
)
),
)
return pm.math.sum(x * beta[self.group], axis=1)
def _individual_definition(
self,
model: pm.Model,
data: pd.DataFrame,
priors: dict[str, pt.TensorVariable] | None,
idata: az.InferenceData | None,
):
"""
Add the FourierSeasonality parameters to the model when pool_type is individual.
Parameters
----------
model: pm.Model
The model to which the parameters are added.
data : pd.DataFrame
A pandas dataframe that must at least have columns ds (predictor), y
(target) and series (name of time series).
priors: dict[str, pt.TensorVariable] | None
A dictionary of multivariate normal random variables approximating the
posterior sample in idata.
idata: az.InferenceData | None
Sample from a posterior. If it is not None, Vangja will use this to set the
parameters' priors in the model. If idata is not None, each component from
the model should specify how idata should be used to set its parameters'
priors.
"""
with model:
x = self._fourier_series(data)
beta_key = (
f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})"
)
if idata is not None and self.tune_method == "parametric":
beta_mean, beta_sd = self._get_beta_params_from_idata(idata)
beta = pm.Normal(
beta_key,
beta_mean,
beta_sd,
shape=(self.n_groups, 2 * self.series_order),
)
elif priors is not None and self.tune_method == "prior_from_idata":
beta_mean, beta_sd = self._get_beta_params_from_idata(idata)
beta = pm.Normal(
beta_key,
mu=priors[f"prior_{beta_key}"],
sigma=beta_sd,
shape=(self.n_groups, 2 * self.series_order),
)
else:
beta = pm.Normal(
beta_key,
self.beta_mean,
self.beta_sd,
shape=(self.n_groups, 2 * self.series_order),
)
if idata is not None and self.tune_method is not None:
reg_ds = pd.DataFrame(
{
"ds": pd.date_range(
"2000-01-01", periods=math.ceil(self.period), freq="D"
)
}
)
reg_x = self._fourier_series(reg_ds)
old = pm.math.sum(reg_x * beta_mean, axis=1)
new = pm.math.dot(beta, reg_x.T)
lam = np.array(
[
(
2 * data[self.group == group_code].shape[0]
if self.period > 2 * data[self.group == group_code].shape[0]
else 0
)
for group_code in self.groups_
]
)
pm.Potential(
f"{beta_key} - loss",
self.loss_factor_for_tune
* pm.math.sum(
lam
* pm.math.minimum(
0, pm.math.dot(old, old) - pm.math.sum(new * new, axis=1)
)
),
)
return pm.math.sum(x * beta[self.group], axis=1)
[docs]
def definition(
self,
model: TimeSeriesModel,
data: pd.DataFrame,
model_idxs: dict[str, int],
priors: dict[str, pt.TensorVariable] | None,
idata: az.InferenceData | None,
):
"""
Add the FourierSeasonality parameters to the model.
Parameters
----------
model: TimeSeriesModel
The model to which the parameters are added.
data : pd.DataFrame
A pandas dataframe that must at least have columns ds (predictor), y
(target) and series (name of time series).
model_idxs: dict[str, int]
Count of the number of components from each type.
priors: dict[str, pt.TensorVariable] | None
A dictionary of multivariate normal random variables approximating the
posterior sample in idata.
idata: az.InferenceData | None
Sample from a posterior. If it is not None, Vangja will use this to set the
parameters' priors in the model. If idata is not None, each component from
the model should specify how idata should be used to set its parameters'
priors.
"""
model_idxs["fs"] = model_idxs.get("fs", 0)
self.model_idx = model_idxs["fs"]
model_idxs["fs"] += 1
self.group, self.n_groups, self.groups_ = get_group_definition(
data, self.pool_type
)
with model:
if self.pool_type == "complete":
return self._complete_definition(model, data, priors, idata)
elif self.pool_type == "partial":
return self._partial_definition(model, data, priors, idata)
elif self.pool_type == "individual":
return self._individual_definition(model, data, priors, idata)
def _get_initval(self, initvals: dict[str, float], model: pm.Model) -> dict:
"""Get the initval of the Fourier series coefficients parameters.
Parameters
----------
initvals: dict[str, float]
Calculated initvals based on data.
model: pm.Model
The model for which the initvals will be set.
"""
return {}
def _det_seasonality_posterior(self, beta, x):
return np.dot(x, beta.T)
def _predict_map(self, future, map_approx):
forecasts = []
self._predict_columns = {}
for group_code in self.groups_.keys():
beta_key = (
f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})"
)
if beta_key in map_approx:
beta = map_approx[beta_key]
else:
beta = map_approx[f"prior_{beta_key}"]
if self.pool_type != "complete" and self.n_groups > 1:
beta = beta[group_code]
forecasts.append(
self._det_seasonality_posterior(beta, self._fourier_series(future))
)
self._predict_columns[f"fs_{self.model_idx}_{group_code}"] = forecasts[-1]
return np.vstack(forecasts)
def _predict_mcmc(self, future, trace):
shift = trace["posterior"].get(f"fs_{self.model_idx} - shift", None)
if shift is not None:
shift = shift.mean()
beta_key = f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})"
forecasts = []
self._predict_columns = {}
for group_code in self.groups_.keys():
# Get beta, averaging over chains and draws
if beta_key in trace["posterior"]:
beta = trace["posterior"][beta_key].to_numpy().mean(axis=(0, 1))
else:
beta = (
trace["posterior"][f"prior_{beta_key}"].to_numpy().mean(axis=(0, 1))
)
# Handle per-group parameters
if self.pool_type != "complete" and self.n_groups > 1:
beta = beta[group_code]
forecast = self._det_seasonality_posterior(
beta, self._fourier_series(future, shift)
)
forecasts.append(forecast)
self._predict_columns[f"fs_{self.model_idx}_{group_code}"] = forecast
return np.vstack(forecasts)
def _plot(self, plot_params, future, data, scale_params, y_true=None, series=""):
date = future["ds"] if self.period > 7 else future["ds"].dt.day_name()
plot_params["idx"] += 1
plt.subplot(100, 1, plot_params["idx"])
plt.title(
f"FourierSeasonality({self.model_idx},p={self.period},n={self.series_order})"
)
plt.grid()
# Handle series parameter - use _0 as default for complete pooling
if series == "":
series_suffix = "_0"
else:
series_suffix = f"_{series}"
plt.plot(
date[-int(self.period) :],
future[f"fs_{self.model_idx}{series_suffix}"][-int(self.period) :],
lw=1,
)
def _assign_model_idx(self, model_idxs: dict[str, int]) -> None:
model_idxs["fs"] = model_idxs.get("fs", 0)
self.model_idx = model_idxs["fs"]
model_idxs["fs"] += 1
def _get_prior_var_names(self) -> list[str]:
if self.tune_method != "prior_from_idata":
return []
return [f"fs_{self.model_idx} - beta(p={self.period},n={self.series_order})"]
[docs]
def needs_priors(self, *args, **kwargs):
return self.tune_method == "prior_from_idata"
[docs]
def is_individual(self, *args, **kwargs):
return self.pool_type == "individual"
def __str__(self):
return f"FS(p={self.period},n={self.series_order},pt={self.pool_type},tm={self.tune_method})"