Source code for causalpy.checks.placebo_in_time

#   Copyright 2022 - 2026 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""
Placebo-in-time sensitivity check with hierarchical null model.

Builds a hierarchical Bayesian model of the "status quo" (no-effect)
distribution from placebo folds, then compares the actual intervention
effect against that learned null.  Optionally computes Bayesian
assurance (operating characteristics) against a user-supplied
expected-effect prior.

Supports two fold-selection strategies:

* **sequential** (default) — evenly-spaced sliding windows stepping
  backward from the actual treatment time.
* **random** — randomly sampled eligible windows from the
  pre-intervention period, with constraints on minimum training
  fraction, minimum gap between folds, and optional period exclusion.

Supports experiments with a ``treatment_time`` parameter
(InterruptedTimeSeries, SyntheticControl).  Requires a PyMC model
for posterior extraction.
"""

from __future__ import annotations

import logging
import warnings
from dataclasses import dataclass, field
from typing import Any, Literal

import numpy as np
import pandas as pd
import pymc as pm
import xarray as xr

from causalpy.checks.base import CheckResult, clone_model
from causalpy.experiments.base import BaseExperiment
from causalpy.experiments.interrupted_time_series import InterruptedTimeSeries
from causalpy.experiments.synthetic_control import SyntheticControl
from causalpy.pipeline import PipelineContext
from causalpy.pymc_models import PyMCModel

logger = logging.getLogger(__name__)

MIN_FOLD_OBSERVATIONS = 3
MAX_RANDOM_SELECTION_RETRIES = 16

_DEFAULT_SAMPLE_KWARGS: dict[str, Any] = {
    "draws": 1000,
    "chains": 4,
    "target_accept": 0.97,
}


[docs] @dataclass class PlaceboFoldResult: """Result of a single placebo fold. Attributes ---------- fold : int Fold number (1-indexed). pseudo_treatment_time : Any The shifted treatment time for this fold. experiment : BaseExperiment The fitted experiment for this fold. cumulative_impact_samples : xr.DataArray Posterior samples of the cumulative (summed) impact for this fold. fold_mean : float Posterior mean of the cumulative impact. fold_sd : float Posterior standard deviation of the cumulative impact. """ fold: int pseudo_treatment_time: Any experiment: BaseExperiment cumulative_impact_samples: xr.DataArray fold_mean: float fold_sd: float
[docs] @dataclass class AssuranceResult: """Bayesian operating characteristics from design-level simulation. Attributes ---------- true_positive_rate : float P(decide "positive" | alternative true). This *is* the assurance. false_positive_rate : float P(decide "positive" | null true). true_negative_rate : float P(decide "null" | null true). false_negative_rate : float P(decide "null" | alternative true). null_indeterminate_rate : float P(decide "indeterminate" | null true). alt_indeterminate_rate : float P(decide "indeterminate" | alternative true). null_decisions : np.ndarray Raw decision strings under the null scenario. alt_decisions : np.ndarray Raw decision strings under the alternative scenario. """ true_positive_rate: float false_positive_rate: float true_negative_rate: float false_negative_rate: float null_indeterminate_rate: float alt_indeterminate_rate: float null_decisions: np.ndarray = field(repr=False) alt_decisions: np.ndarray = field(repr=False)
[docs] class PlaceboInTime: """Placebo-in-time sensitivity check with hierarchical null model. Shifts the treatment time backward into the pre-intervention period to create ``n_folds`` placebo experiments. Extracts the posterior cumulative impact from each fold, then fits a hierarchical Bayesian model to characterise the "status quo" distribution of effects when no intervention occurred. The actual intervention's cumulative effect is compared against this learned null. When ``expected_effect_prior`` and ``rope_half_width`` are provided, additionally computes Bayesian assurance (operating characteristics) via simulation. Parameters ---------- n_folds : int, default 3 Number of placebo folds to create. Must be >= 1. selection_method : {"sequential", "random"}, default "sequential" How to choose placebo windows. * ``"sequential"`` — evenly-spaced sliding windows stepping backward from the treatment time (original behaviour). * ``"random"`` — randomly sample eligible windows from the pre-intervention period, subject to ``min_training_pct``, ``min_gap``, and ``exclude_periods`` constraints. min_training_pct : float, default 0.30 *(random mode only)* Minimum fraction of total pre-period observations that must precede each candidate placebo window. Note: the eligible pre-period is further shortened because a candidate's pseudo-intervention window must also end before the actual treatment. When ``treatment_end_time`` is not set on the experiment, ``intervention_length`` defaults to ``data.index.max() - treatment_time`` (roughly the post-period length), which can make the effective eligible window much smaller than ``(1 - min_training_pct)`` suggests. min_gap : int, default 1 *(random mode only)* Minimum number of pre-intervention observations between any two selected folds, measured as positions in the sorted pre-period index. The default of ``1`` only forbids picking the same candidate twice; use a larger value to spread folds further apart. When ``allow_overlap`` is ``False`` (the default) non-overlap of pseudo-intervention windows is enforced independently of ``min_gap``. allow_overlap : bool, default False *(random mode only)* If ``False`` (the default), selected pseudo-intervention windows are required to be non-overlapping in index/time units. Two folds at times ``t_a`` and ``t_b`` are considered non-overlapping when ``abs(t_a - t_b) >= intervention_length``. Set to ``True`` to allow overlapping windows, which relaxes the constraint at the cost of violating the exchangeability assumption of the hierarchical status-quo model (each fold mean is treated as an independent draw from a common ``mu_status_quo``). exclude_periods : set[str] | None, default None *(random mode only)* Set of period labels to exclude from candidate selection. For datetime-indexed data, use ``"YYYY-MM"`` strings; for numeric indices, use string representations of the index values. experiment_factory : callable, optional Custom factory ``(data, treatment_time) -> BaseExperiment``. If ``None`` (default), the factory is derived from the pipeline's ``experiment_config``. Required for standalone (non-pipeline) use. sample_kwargs : dict, optional MCMC settings for the hierarchical status-quo model. Defaults to ``{"draws": 1000, "chains": 4, "target_accept": 0.97}``. threshold : float, default 0.95 Probability cutoff. Used both for ``passed`` (P(actual effect outside null) must exceed this) and for the ROPE decision rule when computing assurance. prior_scale : float, default 1.0 Multiplier for auto-computed prior widths on the hierarchical model. The priors are ``mu ~ Normal(center, 5 * prior_scale * data_scale)`` and ``tau ~ HalfNormal(2 * prior_scale * data_scale)``. expected_effect_prior : distribution or array, optional Prior belief about the true total effect under the alternative hypothesis. Accepts any object with an ``.rvs(n)`` method (PreliZ, scipy) or a numpy array of pre-drawn samples. When provided together with ``rope_half_width``, assurance analysis runs automatically. rope_half_width : float, optional Half-width of the ROPE interval ``[-rope, +rope]``. Required when ``expected_effect_prior`` is provided. n_design_replications : int, optional Number of simulation replications for assurance. Defaults to ``min(theta_new.size, expected_effect_samples.size)``. random_seed : int, optional RNG seed for the assurance simulation and random fold selection. Examples -------- >>> import causalpy as cp # doctest: +SKIP >>> check = cp.checks.PlaceboInTime(n_folds=3) # doctest: +SKIP Random selection with constraints: >>> check = cp.checks.PlaceboInTime( # doctest: +SKIP ... n_folds=4, ... selection_method="random", ... min_training_pct=0.30, ... min_gap=2, ... random_seed=42, ... ) """ applicable_methods: set[type[BaseExperiment]] = { InterruptedTimeSeries, SyntheticControl, }
[docs] def __init__( self, n_folds: int = 3, selection_method: Literal["sequential", "random"] = "sequential", min_training_pct: float = 0.30, min_gap: int = 1, allow_overlap: bool = False, exclude_periods: set[str] | None = None, experiment_factory: Any | None = None, sample_kwargs: dict[str, Any] | None = None, threshold: float = 0.95, prior_scale: float = 1.0, expected_effect_prior: Any | None = None, rope_half_width: float | None = None, n_design_replications: int | None = None, random_seed: int | None = None, ) -> None: if n_folds < 1: raise ValueError("n_folds must be >= 1") if selection_method not in ("sequential", "random"): raise ValueError( f"selection_method must be 'sequential' or 'random', " f"got {selection_method!r}" ) if not 0 < min_training_pct < 1: raise ValueError( f"min_training_pct must be in (0, 1), got {min_training_pct}" ) if min_gap < 1: raise ValueError(f"min_gap must be >= 1, got {min_gap}") if expected_effect_prior is not None and rope_half_width is None: raise ValueError( "rope_half_width is required when expected_effect_prior is " "provided. Specify the ROPE half-width that defines " "practical significance." ) self.n_folds = n_folds self.selection_method = selection_method self.min_training_pct = min_training_pct self.min_gap = min_gap self.allow_overlap = allow_overlap self.exclude_periods = exclude_periods self.experiment_factory = experiment_factory self.sample_kwargs = {**_DEFAULT_SAMPLE_KWARGS, **(sample_kwargs or {})} self.threshold = threshold self.prior_scale = prior_scale self.expected_effect_prior = expected_effect_prior self.rope_half_width = rope_half_width self.n_design_replications = n_design_replications self.random_seed = random_seed
[docs] def validate(self, experiment: BaseExperiment) -> None: """Check the experiment is compatible with PlaceboInTime. Parameters ---------- experiment : BaseExperiment Candidate experiment to validate. Raises ------ TypeError If the experiment lacks ``treatment_time`` or uses a non-PyMC model. """ if not hasattr(experiment, "treatment_time"): raise TypeError( f"{type(experiment).__name__} does not have a treatment_time " f"attribute. PlaceboInTime requires experiments with an " f"explicit treatment time." ) if not isinstance(experiment.model, PyMCModel): raise TypeError( f"PlaceboInTime requires a PyMC model for posterior " f"extraction, but got {type(experiment.model).__name__}. " f"Use a PyMC model (e.g. cp.pymc_models.LinearRegression)." )
def _get_factory(self, context: PipelineContext | None) -> Any: """Return a factory ``(data, treatment_time) -> experiment``.""" if self.experiment_factory is not None: return self.experiment_factory if context is None or context.experiment_config is None: raise RuntimeError( "No experiment_config in context and no experiment_factory " "provided. Use EstimateEffect before SensitivityAnalysis, " "or pass an explicit experiment_factory to PlaceboInTime." ) config = context.experiment_config method = config["method"] kwargs = {k: v for k, v in config.items() if k != "method"} def _factory(data: pd.DataFrame, treatment_time: Any) -> BaseExperiment: """Create a fresh experiment with the given treatment time.""" kw = dict(kwargs) kw["treatment_time"] = treatment_time if "model" in kw and kw["model"] is not None: kw["model"] = clone_model(kw["model"]) return method(data, **kw) return _factory def _compute_intervention_length(self, experiment: BaseExperiment) -> Any: """Compute intervention length from the experiment.""" treatment_time = experiment.treatment_time # type: ignore[attr-defined] data = experiment.data # type: ignore[attr-defined] treatment_end = getattr(experiment, "treatment_end_time", None) if treatment_end is not None: return treatment_end - treatment_time if hasattr(data, "index"): return data.index.max() - treatment_time raise ValueError("Cannot determine intervention length from experiment.") def _compute_fold_treatment_times( self, treatment_time: Any, intervention_length: Any ) -> list[Any]: """Compute pseudo-treatment times for each fold (sequential mode).""" return [ treatment_time - (self.n_folds - fold) * intervention_length for fold in range(self.n_folds) ] def _compute_random_fold_treatment_times( self, data: pd.DataFrame, treatment_time: Any, intervention_length: Any, ) -> list[Any]: """Randomly select pseudo-treatment times from the pre-period. The algorithm proceeds in two stages. 1. **Candidate pool.** Walks the sorted pre-intervention index and keeps each position that satisfies *all* of: * its period label is not in :attr:`exclude_periods`; * its position in the sorted pre-period index is at least ``ceil(min_training_pct * n_total)`` so each placebo fold has enough training data ahead of it; * its pseudo-intervention window ``[idx, idx + intervention_length)`` ends before the real ``treatment_time`` (so the placebo and real intervention cannot overlap in time). If the resulting pool has fewer than :attr:`n_folds` candidates the method raises :class:`ValueError` with the knobs to relax (``n_folds`` / ``min_training_pct`` / ``exclude_periods``). 2. **Greedy selection with retry.** Hands the pool to :meth:`_try_greedy_selection`, which picks :attr:`n_folds` indices one at a time subject to :attr:`min_gap` (positional distance in the candidate pool) and :attr:`allow_overlap` (non-overlap of the pseudo windows in time/index units). Greedy without backtracking can paint itself into a corner on tight constraints, so the method runs up to :data:`MAX_RANDOM_SELECTION_RETRIES` passes. When :attr:`random_seed` is set, each retry uses a deterministic sub-seed (``seed + attempt``) so the whole routine remains reproducible across runs. Parameters ---------- data : pd.DataFrame Full dataset (must have a sorted index). treatment_time : Any The actual treatment time. intervention_length : Any Length of the intervention window. Returns ------- list[Any] Sorted list of pseudo-treatment times. Raises ------ ValueError If not enough eligible candidates exist, or if no feasible selection is found after ``MAX_RANDOM_SELECTION_RETRIES`` greedy attempts. """ pre_data = data.loc[data.index < treatment_time] if pre_data.empty: raise ValueError("No observations before treatment_time.") all_indices = pre_data.index.sort_values() n_total = len(all_indices) min_training = int(np.ceil(self.min_training_pct * n_total)) exclude = self.exclude_periods or set() # Each candidate carries its position in ``all_indices`` so # ``min_gap`` can be enforced as an observation-count distance # between selected folds, not a candidate-list distance. candidates: list[tuple[int, Any]] = [] for pos, idx_val in enumerate(all_indices): if hasattr(idx_val, "strftime"): label = idx_val.strftime("%Y-%m") else: label = str(idx_val) if label in exclude: continue if pos < min_training: continue pseudo_end = idx_val + intervention_length if pseudo_end > treatment_time: continue candidates.append((pos, idx_val)) if len(candidates) < self.n_folds: raise ValueError( f"Only {len(candidates)} eligible candidate periods found, " f"but {self.n_folds} folds requested. Reduce n_folds, " f"lower min_training_pct, or relax exclude_periods." ) last_err: ValueError | None = None for attempt in range(MAX_RANDOM_SELECTION_RETRIES): # Deterministic sub-seeds: successive attempts reshuffle # choices in a reproducible way when ``random_seed`` is set # and remain non-deterministic (as expected) when it isn't. sub_seed: int | None if self.random_seed is None: sub_seed = None else: sub_seed = int(self.random_seed) + attempt rng = np.random.default_rng(sub_seed) try: selected = self._try_greedy_selection( candidates, intervention_length, rng ) return sorted(candidates[i][1] for i in selected) except ValueError as err: last_err = err continue raise ValueError( f"Cannot select {self.n_folds} folds with min_gap=" f"{self.min_gap} and allow_overlap={self.allow_overlap} " f"after {MAX_RANDOM_SELECTION_RETRIES} greedy attempts with " f"deterministic sub-seeds. Relax constraints " f"(smaller min_gap, set allow_overlap=True, or reduce " f"n_folds). Last underlying error: {last_err}" ) def _try_greedy_selection( self, candidates: list[tuple[int, Any]], intervention_length: Any, rng: np.random.Generator, ) -> list[int]: """Single greedy pass over candidates; raises on infeasibility. Enforces two constraints between any pair of selected folds: * ``min_gap`` positional distance in the candidate index. * When ``allow_overlap`` is ``False``, non-overlap of the pseudo-intervention windows, expressed in the same units as ``intervention_length``. Two windows ``[t_a, t_a + L)`` and ``[t_b, t_b + L)`` are non-overlapping iff ``abs(t_a - t_b) >= L``. """ pool = list(range(len(candidates))) selected: list[int] = [] for _ in range(self.n_folds): valid: list[int] = [] for i in pool: pos_i, idx_val_i = candidates[i] ok = True for s in selected: pos_s, idx_val_s = candidates[s] if abs(pos_i - pos_s) < self.min_gap: ok = False break if not self.allow_overlap and self._windows_overlap( idx_val_i, idx_val_s, intervention_length ): ok = False break if ok: valid.append(i) if not valid: raise ValueError( "No candidate remaining satisfies min_gap and " "non-overlap constraints; greedy selection stuck." ) pick = int(rng.choice(valid)) selected.append(pick) pool.remove(pick) return selected @staticmethod def _windows_overlap(idx_a: Any, idx_b: Any, intervention_length: Any) -> bool: """Return ``True`` iff two half-open intervention windows share a point. Each window starting at index ``idx`` is treated as the half-open interval ``[idx, idx + intervention_length)`` (start inclusive, end exclusive). Under that convention, the windows ``[a, a + L)`` and ``[b, b + L)`` overlap iff ``abs(a - b) < L`` -- two back-to-back windows at distance exactly ``L`` are considered non-overlapping. The ``idx + intervention_length`` arithmetic, rather than a direct Timedelta computation, lets the same expression handle numeric indices and datetime indices with ``pd.DateOffset`` uniformly. """ earlier, later = (idx_a, idx_b) if idx_a <= idx_b else (idx_b, idx_a) return later < earlier + intervention_length def _get_fold_data( self, data: pd.DataFrame, pseudo_treatment_time: Any, intervention_length: Any, ) -> pd.DataFrame: """Extract data up to the end of the placebo intervention window.""" pseudo_end = pseudo_treatment_time + intervention_length return data.loc[data.index < pseudo_end].copy() @staticmethod def _extract_cumulative_impact(experiment: BaseExperiment) -> xr.DataArray: """Extract posterior cumulative impact from a fitted experiment. Returns an ``xr.DataArray`` with a single ``sample`` dimension obtained by summing over ``obs_ind`` and stacking ``(chain, draw)``. """ post_impact = experiment.post_impact # type: ignore[attr-defined] if "treated_units" in post_impact.dims: post_impact = post_impact.isel(treated_units=0) cumulative = post_impact.sum("obs_ind") return cumulative.stack(sample=("chain", "draw")) def _build_status_quo_model( self, fold_means: np.ndarray, fold_sds: np.ndarray, ) -> tuple[Any, np.ndarray]: """Fit the hierarchical status-quo model and return theta_new. Parameters ---------- fold_means : np.ndarray Per-fold posterior means of cumulative impact. fold_sds : np.ndarray Per-fold posterior SDs of cumulative impact. Returns ------- tuple[InferenceData, np.ndarray] ``(idata, theta_new_samples)`` where ``theta_new_samples`` are draws from the posterior predictive for a new null period. """ n_folds = len(fold_means) fold_sds = np.where(fold_sds < 1e-6, 1e-6, fold_sds) prior_mu_center = float(np.nanmean(fold_means)) prior_mu_scale = float(np.nanstd(fold_means)) if prior_mu_scale <= 0.0: prior_mu_scale = 1.0 scale = self.prior_scale coords = {"fold": np.arange(n_folds)} with pm.Model(coords=coords) as model: observed_fold_means = pm.Data( "observed_fold_means", fold_means, dims="fold" ) observed_fold_sd = pm.Data("observed_fold_sd", fold_sds, dims="fold") mu_status_quo = pm.Normal( "mu_status_quo", mu=prior_mu_center, sigma=5.0 * scale * prior_mu_scale, ) tau_status_quo = pm.HalfNormal( "tau_status_quo", sigma=2.0 * scale * prior_mu_scale, ) fold_z = pm.Normal("fold_z", mu=0.0, sigma=1.0, dims="fold") fold_true_effect = pm.Deterministic( "fold_true_effect", mu_status_quo + tau_status_quo * fold_z, dims="fold", ) pm.Normal( "likelihood_fold_means", mu=fold_true_effect, sigma=observed_fold_sd, observed=observed_fold_means, dims="fold", ) idata = pm.sample(**self.sample_kwargs) with model: model.add_coords({"new_period": np.arange(1)}) pm.Normal( "theta_new", mu=mu_status_quo, sigma=tau_status_quo, dims="new_period", ) pp = pm.sample_posterior_predictive(idata, var_names=["theta_new"]) theta_new_samples = ( pp["posterior_predictive"]["theta_new"] .stack(sample=("chain", "draw")) .values.squeeze() ) return idata, theta_new_samples
[docs] @staticmethod def bayesian_rope_decision( posterior_samples: np.ndarray, rope_half_width: float, threshold: float, ) -> str: """Apply a ROPE-based Bayesian decision rule. Parameters ---------- posterior_samples : np.ndarray Posterior draws of the total effect. rope_half_width : float Half-width of the ROPE interval ``[-rope, +rope]``. threshold : float Minimum posterior probability required to make a decision. Returns ------- str One of ``"positive"``, ``"null"``, or ``"indeterminate"``. """ samples = np.asarray(posterior_samples).ravel() prob_positive = float((samples > rope_half_width).mean()) prob_null = float((np.abs(samples) <= rope_half_width).mean()) if prob_positive >= threshold: return "positive" elif prob_null >= threshold: return "null" else: return "indeterminate"
def _draw_expected_effect_samples(self, n: int) -> np.ndarray: """Draw samples from the expected-effect prior. Parameters ---------- n : int Desired number of samples. Objects with an ``.rvs(n)`` method receive ``n`` directly. Pre-drawn numpy arrays are returned as-is, and :meth:`_compute_assurance` cycles through them via ``i % len(prior)`` when the array is shorter than the number of replications. A warning is emitted in this case because short arrays can introduce spurious structure in the simulated decisions. Returns ------- np.ndarray Samples from the expected-effect prior. """ prior = self.expected_effect_prior if prior is None: raise ValueError("expected_effect_prior is not set.") if isinstance(prior, np.ndarray): if len(prior) < n: warnings.warn( f"expected_effect_prior has {len(prior)} samples, fewer " f"than the {n} replications requested by the assurance " f"simulation; the array will be cycled through via " f"index % len(prior). Pass a longer array or an object " f"with an .rvs(n) method (e.g. a PreliZ/scipy " f"distribution) to avoid cycling.", stacklevel=2, ) return prior if hasattr(prior, "rvs"): return np.asarray(prior.rvs(n)) # type: ignore[union-attr] raise TypeError( f"expected_effect_prior must be a numpy array or have an " f".rvs(n) method, got {type(prior).__name__}." ) def _compute_assurance( self, theta_new_samples: np.ndarray, fold_sds: np.ndarray, n_posterior_samples: int, ) -> AssuranceResult: """Simulate decisions under null and alternative to get assurance. Parameters ---------- theta_new_samples : np.ndarray Draws from the status-quo posterior predictive. fold_sds : np.ndarray Per-fold posterior SDs (used to simulate estimation noise). n_posterior_samples : int Number of posterior draws to simulate per replication. Returns ------- AssuranceResult """ expected_samples = self._draw_expected_effect_samples(len(theta_new_samples)) n_reps = self.n_design_replications if n_reps is None: n_reps = min(len(theta_new_samples), len(expected_samples)) rng = np.random.default_rng(self.random_seed) rope = self.rope_half_width if rope is None: raise ValueError( "rope_half_width must be set for assurance." ) # pragma: no cover null_decisions: list[str] = [] for i in range(n_reps): true_effect = float(theta_new_samples[i % len(theta_new_samples)]) sigma = float(rng.choice(fold_sds)) simulated_posterior = rng.normal( loc=true_effect, scale=sigma, size=n_posterior_samples ) null_decisions.append( self.bayesian_rope_decision(simulated_posterior, rope, self.threshold) ) alt_decisions: list[str] = [] for i in range(n_reps): # Under the alternative, the observed effect is the expected # treatment effect added on top of the null baseline noise, # matching the paper's formulation: theta_new + expected_effect. null_component = float(theta_new_samples[i % len(theta_new_samples)]) treatment_component = float(expected_samples[i % len(expected_samples)]) true_effect = null_component + treatment_component sigma = float(rng.choice(fold_sds)) simulated_posterior = rng.normal( loc=true_effect, scale=sigma, size=n_posterior_samples ) alt_decisions.append( self.bayesian_rope_decision(simulated_posterior, rope, self.threshold) ) null_arr = np.array(null_decisions) alt_arr = np.array(alt_decisions) return AssuranceResult( true_positive_rate=float((alt_arr == "positive").mean()), false_positive_rate=float((null_arr == "positive").mean()), true_negative_rate=float((null_arr == "null").mean()), false_negative_rate=float((alt_arr == "null").mean()), null_indeterminate_rate=float((null_arr == "indeterminate").mean()), alt_indeterminate_rate=float((alt_arr == "indeterminate").mean()), null_decisions=null_arr, alt_decisions=alt_arr, )
[docs] def run( self, experiment: BaseExperiment, context: PipelineContext | None = None, ) -> CheckResult: """Run placebo-in-time analysis with hierarchical null model. Creates ``n_folds`` placebo experiments by shifting the treatment time backward. Extracts posterior cumulative impact from each fold, then fits a hierarchical Bayesian model to characterise the status-quo distribution. Compares the actual intervention effect against this null. When ``expected_effect_prior`` was provided at construction, also runs Bayesian assurance simulation. Can be used standalone (``context=None``) when ``experiment_factory`` was provided, or within a pipeline. Parameters ---------- experiment : BaseExperiment The fitted experiment whose treatment time will be shifted to generate placebo folds. context : PipelineContext or None, default None Pipeline context providing ``experiment_config`` for re-fits. If ``None``, an explicit ``experiment_factory`` must have been supplied at construction time. Returns ------- CheckResult With ``passed`` indicating whether the actual effect is clearly outside the null distribution, and rich metadata including the null samples and optional assurance results. """ self.validate(experiment) factory = self._get_factory(context) treatment_time = experiment.treatment_time # type: ignore[attr-defined] data = experiment.data # type: ignore[attr-defined] intervention_length = self._compute_intervention_length(experiment) actual_cumulative = self._extract_cumulative_impact(experiment) actual_cumulative_mean = float(actual_cumulative.mean().values) if self.selection_method == "random": fold_treatment_times = self._compute_random_fold_treatment_times( data, treatment_time, intervention_length ) else: fold_treatment_times = self._compute_fold_treatment_times( treatment_time, intervention_length ) fold_results: list[PlaceboFoldResult] = [] fold_summaries: list[str] = [] skipped_folds: list[int] = [] for fold_idx, pseudo_tt in enumerate(fold_treatment_times): fold_num = fold_idx + 1 logger.info( "PlaceboInTime fold %d/%d: pseudo_treatment_time=%s", fold_num, self.n_folds, pseudo_tt, ) fold_data = self._get_fold_data(data, pseudo_tt, intervention_length) if len(fold_data) < MIN_FOLD_OBSERVATIONS: logger.warning( "Fold %d has only %d observations (minimum %d), skipping.", fold_num, len(fold_data), MIN_FOLD_OBSERVATIONS, ) skipped_folds.append(fold_num) fold_summaries.append( f"Fold {fold_num}: SKIPPED (only {len(fold_data)} " f"observations, need >= {MIN_FOLD_OBSERVATIONS})" ) continue try: fold_experiment = factory(fold_data, pseudo_tt) cum_samples = self._extract_cumulative_impact(fold_experiment) f_mean = float(cum_samples.mean().values) f_sd = float(cum_samples.std().values) except Exception: logger.warning( "Fold %d failed to fit (pseudo_treatment_time=%s), skipping.", fold_num, pseudo_tt, exc_info=True, ) skipped_folds.append(fold_num) fold_summaries.append( f"Fold {fold_num}: SKIPPED (experiment failed to fit " f"at pseudo treatment time {pseudo_tt})" ) continue fold_result = PlaceboFoldResult( fold=fold_num, pseudo_treatment_time=pseudo_tt, experiment=fold_experiment, cumulative_impact_samples=cum_samples, fold_mean=f_mean, fold_sd=f_sd, ) fold_results.append(fold_result) fold_summaries.append( f"Fold {fold_num}: pseudo treatment at {pseudo_tt} " f"— mean={f_mean:.2f}, sd={f_sd:.2f}" ) n_completed = len(fold_results) n_skipped = len(skipped_folds) if n_completed < 1: parts = [ f"Placebo-in-time analysis: 0 folds completed ({n_skipped} skipped).", "INCONCLUSIVE — no folds completed.", ] parts.extend(fold_summaries) return CheckResult( check_name="PlaceboInTime", passed=None, text="\n".join(parts), metadata={ "fold_results": fold_results, "rope_half_width": self.rope_half_width, "threshold": self.threshold, "expected_effect_prior": self.expected_effect_prior, }, ) fold_means = np.array([fr.fold_mean for fr in fold_results]) fold_sds = np.array([fr.fold_sd for fr in fold_results]) idata, theta_new_samples = self._build_status_quo_model(fold_means, fold_sds) p_outside = float( (np.abs(actual_cumulative_mean) > np.abs(theta_new_samples)).mean() ) passed = p_outside > self.threshold mu_post_mean = float(idata.posterior["mu_status_quo"].mean().values) tau_post_mean = float(idata.posterior["tau_status_quo"].mean().values) parts = [ f"Placebo-in-time analysis: {n_completed} of {self.n_folds} folds completed" ] if n_skipped: parts[0] += f" ({n_skipped} skipped)" parts[0] += "." parts.append( f"Hierarchical status-quo model: " f"mu={mu_post_mean:.2f}, tau={tau_post_mean:.2f}." ) parts.append( f"Actual cumulative impact: {actual_cumulative_mean:.2f}. " f"P(actual outside null) = {p_outside:.3f}." ) if passed: parts.append("SUPPORTED — actual effect is outside the null distribution.") else: parts.append( "NOT SUPPORTED — actual effect is within the null distribution." ) parts.extend(fold_summaries) text = "\n".join(parts) metadata: dict[str, Any] = { "fold_results": fold_results, "fold_sds": fold_sds, "status_quo_idata": idata, "null_samples": theta_new_samples, "actual_cumulative_mean": actual_cumulative_mean, "p_effect_outside_null": p_outside, "rope_half_width": self.rope_half_width, "threshold": self.threshold, "expected_effect_prior": self.expected_effect_prior, } n_posterior_samples = len(actual_cumulative.values) if self.expected_effect_prior is not None: assurance_result = self._compute_assurance( theta_new_samples, fold_sds, n_posterior_samples ) metadata["assurance_result"] = assurance_result metadata["assurance"] = assurance_result.true_positive_rate text += ( f"\n\nBayesian assurance (operating characteristics):\n" f" Under NULL (status quo true):\n" f" False Positive rate : " f"{assurance_result.false_positive_rate:.3f}\n" f" True Negative rate : " f"{assurance_result.true_negative_rate:.3f}\n" f" Indeterminate rate : " f"{assurance_result.null_indeterminate_rate:.3f}\n" f" Under ALTERNATIVE (expected effect true):\n" f" Assurance (TP rate) : " f"{assurance_result.true_positive_rate:.3f}\n" f" False Negative rate : " f"{assurance_result.false_negative_rate:.3f}\n" f" Indeterminate rate : " f"{assurance_result.alt_indeterminate_rate:.3f}" ) return CheckResult( check_name="PlaceboInTime", passed=passed, text=text, metadata=metadata, )
def __repr__(self) -> str: """Return a string representation of the check.""" parts = [f"n_folds={self.n_folds}"] if self.selection_method != "sequential": parts.append(f"selection_method={self.selection_method!r}") if self.allow_overlap: parts.append("allow_overlap=True") if self.expected_effect_prior is not None: parts.append("assurance=True") return f"PlaceboInTime({', '.join(parts)})"