Source code for skein_glm.graph_stability

"""Bootstrap-based edge stability for graphical models.

Two pure-Python wrappers around the M11 graphical estimators
(``GraphicalLasso``/``MCP``/``SCAD`` and ``JointGraphicalLasso``/
``MCP``):

* :class:`GraphicalStabilitySelection` — Meinshausen–Bühlmann (2010)
  stability selection lifted to edges. Subsample-bootstrap over a
  user-supplied λ-grid; record per-(λ, edge) selection probability;
  declare an edge **stable** if its max-over-λ probability crosses a
  threshold. Mirrors :class:`StabilitySelection` for coefficients.

* :class:`GraphicalBootstrap` — non-parametric (resample-with-
  replacement) bootstrap at a *single* λ. Returns the per-edge
  sampling distribution: mean, SD, quantile CIs, selection
  probability. This is what ``bootnet::bootnet(type="nonparametric")``
  in R returns and the headline output network-psychometrics
  practitioners use to draw edge-weight error bars.

Both wrappers work for single-population (single ``X``) and joint
(list of ``X^(k)`` across populations) graphical estimators; they
auto-detect by inspecting the base estimator for a ``lambda_2``
attribute (joint) vs. ``alpha`` (single).

Bootstrap requires *raw* data, not precomputed covariance: we have
no way to resample observations from a `(p, p)` covariance input.
Passing a square symmetric `X` raises ``ValueError``.

Reference
---------
van Borkulo et al. (2017). "Network analysis of multivariate data
in psychological science". *Nature Reviews Methods Primers* 2:60.
Implements the methodology behind R's `bootnet` package.
"""
from __future__ import annotations

from typing import Any

import numpy as np
from numpy.typing import NDArray
from sklearn.base import BaseEstimator, clone
from sklearn.utils import check_random_state


def _is_joint(est: Any) -> bool:
    """Joint estimators expose a ``lambda_2`` init param; single-pop
    expose ``alpha``. The two surfaces don't overlap."""
    return hasattr(est, "lambda_2")


def _looks_like_covariance(x: NDArray[np.float64]) -> bool:
    return x.ndim == 2 and x.shape[0] == x.shape[1] and bool(
        np.allclose(x, x.T, atol=1e-8)
    )


def _validate_raw_X(x: NDArray[np.float64], label: str) -> NDArray[np.float64]:
    xa = np.ascontiguousarray(x, dtype=np.float64)
    if xa.ndim != 2:
        raise ValueError(f"{label} must be 2D, got shape {xa.shape}")
    if _looks_like_covariance(xa):
        raise ValueError(
            f"{label} looks like a precomputed covariance matrix "
            f"(square + symmetric, shape {xa.shape}); bootstrap "
            "requires raw observations. Pass the original (n, p) "
            "data array instead."
        )
    return xa


def _edge_mask(precision: NDArray[np.float64], eps: float) -> NDArray[np.bool_]:
    """Symmetric `(p, p)` boolean — `True` where `|Θ_ij| > eps`, off-
    diagonal only."""
    m = np.abs(precision) > eps
    np.fill_diagonal(m, False)
    return m


def _stable_pairs(max_prob: NDArray[np.float64], threshold: float) -> NDArray[np.int64]:
    """Convert a symmetric `(p, p)` probability matrix into an `(n, 2)`
    array of upper-triangular `(i, j)` pairs above ``threshold``."""
    p = max_prob.shape[0]
    iu_i, iu_j = np.triu_indices(p, k=1)
    sel = max_prob[iu_i, iu_j] >= threshold
    return np.stack([iu_i[sel], iu_j[sel]], axis=1).astype(np.int64)


# --- shared bootstrap workers -----------------------------------------


def _fit_stability_one(
    base_estimator: Any,
    X_or_Xs: Any,
    indices: Any,
    lambdas: NDArray[np.float64],
    alpha_name: str,
    is_joint: bool,
    active_eps: float,
) -> NDArray[np.bool_]:
    """One stability-selection bootstrap pass. Returns the boolean edge
    mask stack across λ:

    - single: `(n_lambdas, p, p)`
    - joint: `(n_lambdas, K, p, p)`
    """
    X_sub: Any
    if is_joint:
        X_sub = [
            np.ascontiguousarray(np.asarray(x)[idx], dtype=np.float64)
            for x, idx in zip(X_or_Xs, indices)
        ]
    else:
        X_sub = np.ascontiguousarray(np.asarray(X_or_Xs)[indices], dtype=np.float64)

    masks = []
    for lam in lambdas:
        est = clone(base_estimator).set_params(**{alpha_name: float(lam)})
        est.fit(X_sub)
        if is_joint:
            masks.append(
                np.stack([_edge_mask(t, active_eps) for t in est.precisions_], axis=0)
            )
        else:
            masks.append(_edge_mask(est.precision_, active_eps))
    return np.stack(masks, axis=0)


def _fit_bootstrap_one(
    base_estimator: Any,
    X_or_Xs: Any,
    indices: Any,
    is_joint: bool,
) -> NDArray[np.float64]:
    """One non-parametric bootstrap pass. Fits the estimator on the
    resampled data and returns the precision matrix (single) or a stack
    of precisions (joint)."""
    X_sub: Any
    if is_joint:
        X_sub = [
            np.ascontiguousarray(np.asarray(x)[idx], dtype=np.float64)
            for x, idx in zip(X_or_Xs, indices)
        ]
    else:
        X_sub = np.ascontiguousarray(np.asarray(X_or_Xs)[indices], dtype=np.float64)
    est = clone(base_estimator).fit(X_sub)
    if is_joint:
        return np.stack(est.precisions_, axis=0)
    return np.asarray(est.precision_, dtype=np.float64)


# --- stability selection ---------------------------------------------


[docs] class GraphicalStabilitySelection(BaseEstimator): """Meinshausen–Bühlmann stability selection for graphical models. Wraps a graphical estimator (single or joint) and runs subsample-bootstrap over a user-supplied λ-grid. Each bootstrap iteration draws a fraction of rows without replacement, refits at every λ in the grid, and records which off-diagonal entries of `Θ̂` are nonzero. The fraction of bootstraps in which edge ``(i, j)`` was selected at λ_k is its **selection probability**; an edge is **stable** if its max-over-λ probability crosses ``threshold``. The MB (2010) error-control bound on expected false positives applies to ``threshold > 0.5``. Parameters ---------- base_estimator Any single-population (``GraphicalLasso``/``MCP``/``SCAD``) or joint (``JointGraphicalLasso``/``MCP``) graphical estimator. The init params (``gamma``/``a``/``edge_weights``/...) are passed through to every refit; only the regularisation parameter (``alpha`` or ``lambda_2``) is overridden per-λ. lambdas : array-like (n_lambdas,) Positive λ values to sweep. The order is preserved in the output but does not affect the result (each (bootstrap, λ) fit is independent). n_bootstraps : int, default 100 sample_fraction : float, default 0.5 Subsample fraction (without replacement). MB recommend 0.5. threshold : float, default 0.6 Stable-edge probability cutoff; must be > 0.5 for the MB error-control bound. random_state : int or None n_jobs : int or None Forwarded to ``joblib.Parallel``. ``-1`` uses all cores. active_eps : float, default 1e-6 Threshold for declaring `|Θ_ij|` nonzero. Looser than the sklearn default to absorb mild solver drift across subsample fits — graphical block-CD/ADMM converge to lower tolerance than coefficient solvers in practice. Attributes ---------- selection_probabilities_ : ndarray Single-pop: shape `(n_lambdas, p, p)`. Joint: shape `(n_lambdas, K, p, p)`. Symmetric in the last two axes, zero on the diagonal. max_probabilities_ : ndarray Single-pop: shape `(p, p)`. Joint: shape `(K, p, p)`. Max over λ of the per-edge selection probability. stable_edges_ : ndarray of int64 Single-pop: shape `(n_stable, 2)` of `(i, j)` pairs (upper triangular). Joint: list of `K` such arrays, one per population. lambdas_ : ndarray n_features_in_ : int n_populations_ : int (joint only) Notes ----- Bootstrap requires raw observation data. Passing a precomputed covariance matrix is rejected with a clear error message. Examples -------- >>> from skein_glm import ( ... GraphicalMCP, GraphicalStabilitySelection, ... ) >>> import numpy as np >>> ss = GraphicalStabilitySelection( ... base_estimator=GraphicalMCP(gamma=3.0), ... lambdas=np.geomspace(0.5, 0.05, 8), ... n_bootstraps=100, threshold=0.6, n_jobs=-1, random_state=0, ... ) >>> ss.fit(X) >>> ss.stable_edges_ # shape (n_stable, 2) """ selection_probabilities_: NDArray[np.float64] max_probabilities_: NDArray[np.float64] stable_edges_: Any # ndarray for single, list[ndarray] for joint lambdas_: NDArray[np.float64] n_features_in_: int def __init__( self, base_estimator: Any, lambdas: NDArray[np.float64], *, n_bootstraps: int = 100, sample_fraction: float = 0.5, threshold: float = 0.6, random_state: int | None = None, n_jobs: int | None = None, active_eps: float = 1e-6, ) -> None: self.base_estimator = base_estimator self.lambdas = lambdas self.n_bootstraps = n_bootstraps self.sample_fraction = sample_fraction self.threshold = threshold self.random_state = random_state self.n_jobs = n_jobs self.active_eps = active_eps def _validate_params(self) -> None: if not 0.5 < self.threshold <= 1.0: raise ValueError( f"threshold must be in (0.5, 1]; got {self.threshold} " "(MB error-control requires > 0.5)" ) if not 0.0 < self.sample_fraction < 1.0: raise ValueError( f"sample_fraction must be in (0, 1); got {self.sample_fraction}" ) if self.n_bootstraps < 1: raise ValueError(f"n_bootstraps must be ≥ 1; got {self.n_bootstraps}") lam = np.asarray(self.lambdas, dtype=np.float64) if lam.ndim != 1 or lam.size == 0: raise ValueError("lambdas must be a non-empty 1D array") if np.any(lam <= 0): raise ValueError("lambdas must be positive") def fit(self, X) -> "GraphicalStabilitySelection": from joblib import Parallel, delayed self._validate_params() is_joint = _is_joint(self.base_estimator) alpha_name = "lambda_2" if is_joint else "alpha" lambdas = np.ascontiguousarray(self.lambdas, dtype=np.float64) rng = check_random_state(self.random_state) if is_joint: if not isinstance(X, (list, tuple)) or len(X) == 0: raise ValueError( "joint estimator: X must be a non-empty list/tuple " "of per-population arrays" ) Xs = [_validate_raw_X(x, f"X[{k}]") for k, x in enumerate(X)] ns = [x.shape[0] for x in Xs] p = Xs[0].shape[1] for k, x in enumerate(Xs): if x.shape[1] != p: raise ValueError( f"all populations must share n_features; " f"X[0] has {p}, X[{k}] has {x.shape[1]}" ) sub_sizes = [max(1, int(round(self.sample_fraction * n))) for n in ns] boot_indices: list[Any] = [ tuple( np.sort( rng.choice(ns[k], size=sub_sizes[k], replace=False) ).astype(np.int64) for k in range(len(Xs)) ) for _ in range(self.n_bootstraps) ] X_or_Xs: Any = Xs self.n_populations_ = len(Xs) else: X_arr = _validate_raw_X(X, "X") n, p = X_arr.shape sub_size = max(1, int(round(self.sample_fraction * n))) boot_indices = [ np.sort( rng.choice(n, size=sub_size, replace=False) ).astype(np.int64) for _ in range(self.n_bootstraps) ] X_or_Xs = X_arr self.n_features_in_ = p masks = Parallel(n_jobs=self.n_jobs)( delayed(_fit_stability_one)( self.base_estimator, X_or_Xs, idx, lambdas, alpha_name, is_joint, self.active_eps, ) for idx in boot_indices ) stacked = np.stack(masks, axis=0) # (B, n_lambdas, [K,] p, p) sel_prob = stacked.mean(axis=0) self.selection_probabilities_ = sel_prob self.lambdas_ = lambdas if is_joint: # sel_prob shape: (n_lambdas, K, p, p) self.max_probabilities_ = sel_prob.max(axis=0) # (K, p, p) self.stable_edges_ = [ _stable_pairs(self.max_probabilities_[k], self.threshold) for k in range(self.n_populations_) ] else: self.max_probabilities_ = sel_prob.max(axis=0) # (p, p) self.stable_edges_ = _stable_pairs( self.max_probabilities_, self.threshold ) return self
[docs] def mb_threshold(self, expected_false_positives: float = 1.0) -> float: """Required ``π_thr`` to control ``E[V] ≤ expected_false_positives``. Applies the Meinshausen–Bühlmann (2010) closed-form bound to this fitted stability-selection result. Uses ``p · (p − 1)/2`` as the family size (number of unique edges) and the average number of selected edges per (λ, bootstrap) cell as ``q_Λ``. Returns a stability-probability threshold in ``(0.5, 1]``. If the requested error bound is infeasible at the observed ``q_Λ``, raises :class:`ValueError` with a diagnostic message. See :func:`skein_glm.mb_stability_threshold` for the formula and reference. """ from skein_glm.graph_inference import mb_stability_threshold sel_prob = self.selection_probabilities_ p = self.n_features_in_ p_total = p * (p - 1) // 2 # Average number of edges selected across (λ, [k]) cells. # Each cell is an upper-tri count of edges with prob > 0 — but # since `selection_probabilities_` is *probabilities* (already # averaged across bootstraps), the natural MB `q_Λ` is the sum # of per-edge selection probabilities, averaged across λ (and # populations, for joint). if sel_prob.ndim == 4: # (n_lambdas, K, p, p) iu = np.triu_indices(p, k=1) per_lambda_per_pop = sel_prob[:, :, iu[0], iu[1]].sum(axis=-1) q_lambda = float(per_lambda_per_pop.mean()) else: # (n_lambdas, p, p) iu = np.triu_indices(p, k=1) per_lambda = sel_prob[:, iu[0], iu[1]].sum(axis=-1) q_lambda = float(per_lambda.mean()) return mb_stability_threshold( p_total=p_total, q_lambda=q_lambda, expected_false_positives=expected_false_positives, )
# --- non-parametric bootstrap ----------------------------------------
[docs] class GraphicalBootstrap(BaseEstimator): """Non-parametric bootstrap for graphical edge weights. Resamples observations with replacement (same `n`), refits the graphical estimator at the given regularisation parameter, and records the bootstrap sampling distribution of every edge in `Θ̂`. Output statistics: * **`mean_`, `std_`** — per-edge bootstrap mean and standard deviation. The SD doubles as a Wald-style standard error. * **`ci_lower_`, `ci_upper_`** — per-edge quantile confidence bands at `alpha/2` and `1 − alpha/2`. The default ``alpha=0.05`` gives the 2.5% / 97.5% percentile envelope typically plotted on bootnet stability diagrams. * **`edge_selection_probabilities_`** — fraction of bootstrap replicates in which the edge was active. This is what ``bootnet`` calls the "edge stability" output (distinct from the MB-style :class:`GraphicalStabilitySelection`, which sweeps λ instead of resampling at a fixed λ). Parameters ---------- base_estimator A fully-configured graphical estimator. Its regularisation parameter (``alpha`` for single, ``lambda_2`` for joint) is used as-is at every bootstrap replicate; this class does not sweep λ. n_bootstraps : int, default 200 Number of bootstrap replicates. alpha : float, default 0.05 CI level — bands are `[alpha/2, 1 - alpha/2]` quantiles. random_state : int or None n_jobs : int or None active_eps : float, default 1e-6 Attributes ---------- precisions_ : ndarray Single-pop: shape `(B, p, p)`. Joint: shape `(B, K, p, p)`. Full bootstrap stack of fitted precisions. mean_, std_, ci_lower_, ci_upper_ : ndarray Single-pop: shape `(p, p)`. Joint: shape `(K, p, p)`. edge_selection_probabilities_ : ndarray Same shape as ``mean_``. Off-diagonal selection probability; diagonal is zero. n_features_in_ : int n_populations_ : int (joint only) """ precisions_: NDArray[np.float64] mean_: NDArray[np.float64] std_: NDArray[np.float64] ci_lower_: NDArray[np.float64] ci_upper_: NDArray[np.float64] edge_selection_probabilities_: NDArray[np.float64] n_features_in_: int def __init__( self, base_estimator: Any, *, n_bootstraps: int = 200, alpha: float = 0.05, random_state: int | None = None, n_jobs: int | None = None, active_eps: float = 1e-6, ) -> None: self.base_estimator = base_estimator self.n_bootstraps = n_bootstraps self.alpha = alpha self.random_state = random_state self.n_jobs = n_jobs self.active_eps = active_eps def _validate_params(self) -> None: if not 0.0 < self.alpha < 1.0: raise ValueError(f"alpha must be in (0, 1); got {self.alpha}") if self.n_bootstraps < 2: raise ValueError( f"n_bootstraps must be ≥ 2 for a non-degenerate CI; " f"got {self.n_bootstraps}" ) def fit(self, X) -> "GraphicalBootstrap": from joblib import Parallel, delayed self._validate_params() is_joint = _is_joint(self.base_estimator) rng = check_random_state(self.random_state) if is_joint: if not isinstance(X, (list, tuple)) or len(X) == 0: raise ValueError( "joint estimator: X must be a non-empty list/tuple " "of per-population arrays" ) Xs = [_validate_raw_X(x, f"X[{k}]") for k, x in enumerate(X)] ns = [x.shape[0] for x in Xs] p = Xs[0].shape[1] for k, x in enumerate(Xs): if x.shape[1] != p: raise ValueError( f"all populations must share n_features; " f"X[0] has {p}, X[{k}] has {x.shape[1]}" ) boot_indices: list[Any] = [ tuple( rng.choice(ns[k], size=ns[k], replace=True).astype(np.int64) for k in range(len(Xs)) ) for _ in range(self.n_bootstraps) ] X_or_Xs: Any = Xs self.n_populations_ = len(Xs) else: X_arr = _validate_raw_X(X, "X") n, p = X_arr.shape boot_indices = [ rng.choice(n, size=n, replace=True).astype(np.int64) for _ in range(self.n_bootstraps) ] X_or_Xs = X_arr self.n_features_in_ = p precs = Parallel(n_jobs=self.n_jobs)( delayed(_fit_bootstrap_one)( self.base_estimator, X_or_Xs, idx, is_joint ) for idx in boot_indices ) stacked = np.stack(precs, axis=0) # (B, [K,] p, p) self.precisions_ = stacked self.mean_ = stacked.mean(axis=0) self.std_ = stacked.std(axis=0, ddof=1) q_lo = self.alpha / 2.0 q_hi = 1.0 - q_lo self.ci_lower_ = np.quantile(stacked, q_lo, axis=0) self.ci_upper_ = np.quantile(stacked, q_hi, axis=0) # Edge selection probability — off-diagonal only, diagonal zeroed. active = np.abs(stacked) > self.active_eps sel = active.mean(axis=0) if is_joint: for k in range(sel.shape[0]): np.fill_diagonal(sel[k], 0.0) else: np.fill_diagonal(sel, 0.0) self.edge_selection_probabilities_ = sel return self
[docs] def fdr_threshold(self, fdr: float = 0.1) -> dict: """Benjamini–Hochberg FDR on edges from this bootstrap fit. Convenience wrapper around :func:`skein_glm.edge_fdr_threshold`. See that function for the returned dict shape. Parameters ---------- fdr : float, default 0.1 Target false discovery rate in ``(0, 1)``. """ from skein_glm.graph_inference import edge_fdr_threshold return edge_fdr_threshold(self, fdr=fdr)
[docs] def fwer_threshold( self, fwer: float = 0.05, method: str = "holm" ) -> dict: """Family-wise error control on edges from this bootstrap fit. Convenience wrapper around :func:`skein_glm.edge_fwer_threshold`. Parameters ---------- fwer : float, default 0.05 Target family-wise error rate in ``(0, 1)``. method : {"bonferroni", "holm"}, default "holm" Multiple-testing correction method. """ from skein_glm.graph_inference import edge_fwer_threshold return edge_fwer_threshold(self, fwer=fwer, method=method) # type: ignore[arg-type]
__all__ = ["GraphicalStabilitySelection", "GraphicalBootstrap"]