Source code for cycombinepy.normalize

"""Batch-wise normalization of expression data prior to clustering.

Port of ``normalize`` / ``quantile_norm`` / ``clr_norm*`` from
``R/02_batch_correct.R``.
"""

from __future__ import annotations

from typing import Iterable, Literal

import numpy as np
import pandas as pd
from anndata import AnnData
from scipy.interpolate import PchipInterpolator
from scipy.stats import rankdata

from cycombinepy._utils import (
    check_obs_key,
    marker_matrix,
    resolve_markers,
    set_marker_matrix,
)

NormMethod = Literal["scale", "rank", "CLR", "CLR_seu", "CLR_med", "qnorm", "none"]
TiesMethod = Literal["average", "min", "max", "dense", "ordinal"]


def _clr(x: np.ndarray) -> np.ndarray:
    """Seurat-flavor CLR: geom_mean = expm1(sum(log1p(x[x>0])) / n)."""
    pos = x[x > 0]
    if pos.size == 0:
        return x
    geom_mean = np.expm1(np.sum(np.log1p(pos)) / x.size)
    if geom_mean == 0:
        return x
    return np.log1p(x / geom_mean)


def _clr_mean(x: np.ndarray) -> np.ndarray:
    """CLR with geom_mean = expm1(mean(log1p(x[x>=0])))."""
    nn = x[x >= 0]
    if nn.size == 0:
        return x
    geom_mean = np.expm1(np.mean(np.log1p(nn)))
    if geom_mean == 0:
        return x
    return np.log((x + 1.0) / geom_mean)


def _clr_med(x: np.ndarray) -> np.ndarray:
    """CLR using the median as the center."""
    m = np.nanmedian(x)
    if m == 0:
        m = 1.0
    return np.log((x + 1.0) / m)


def _rank_norm(x: np.ndarray, ties_method: TiesMethod = "average") -> np.ndarray:
    """Percentile rank within a 1-D vector."""
    return rankdata(x, method=ties_method) / x.size


def _scale(x: np.ndarray) -> np.ndarray:
    """Z-score with an ``ddof=1`` std to match R's ``scale``."""
    mu = np.mean(x)
    sd = np.std(x, ddof=1)
    if sd == 0 or not np.isfinite(sd):
        return x - mu
    return (x - mu) / sd


def _apply_column_wise(block: np.ndarray, fn) -> np.ndarray:
    out = np.empty_like(block, dtype=float)
    for j in range(block.shape[1]):
        col = block[:, j]
        if np.sum(col) == 0:
            out[:, j] = col
        else:
            out[:, j] = fn(col)
    return out


def _quantile_norm(
    X: np.ndarray,
    batches: np.ndarray,
    n_quantiles: int = 5,
) -> np.ndarray:
    """Monotone-spline quantile normalization per marker across batches.

    Mirrors ``quantile_norm`` in ``R/02_batch_correct.R:126``. For each marker we
    compute a reference set of quantiles across all cells, then map each batch's
    quantiles onto the reference using a PCHIP (monotone cubic) spline.
    """
    out = X.astype(float, copy=True)
    q_levels = np.linspace(0.0, 1.0, n_quantiles)
    for j in range(X.shape[1]):
        col = X[:, j]
        ref_q = np.quantile(col, q_levels)
        for b in np.unique(batches):
            mask = batches == b
            batch_q = np.quantile(col[mask], q_levels)
            # PCHIP requires strictly increasing x. Fall back to identity if degenerate.
            if np.any(np.diff(batch_q) <= 0):
                continue
            spline = PchipInterpolator(batch_q, ref_q, extrapolate=True)
            out[mask, j] = spline(col[mask])
    return out


[docs] def normalize( adata: AnnData, markers: Iterable[str] | None = None, method: NormMethod = "scale", batch_key: str = "batch", ties_method: TiesMethod = "average", layer: str | None = None, copy: bool = False, ) -> AnnData | None: """Batch-wise normalize marker columns of ``adata``. Port of ``normalize`` in ``R/02_batch_correct.R:27-111``. Each batch is processed independently, so that downstream clustering is less influenced by between-batch shifts. Parameters ---------- adata AnnData containing expression in ``adata.X`` (or a layer). markers Var names to normalize. If ``None``, :func:`cycombinepy.get_markers` is used. method One of ``"scale"``, ``"rank"``, ``"CLR"``, ``"CLR_seu"``, ``"CLR_med"``, ``"qnorm"``, ``"none"``. batch_key Column in ``adata.obs`` identifying the batch. ties_method Tie-breaking rule for ``method="rank"``. layer If given, read / write that layer instead of ``adata.X``. copy If True, return a copy; otherwise modify in place and return None. """ if method == "none": return adata.copy() if copy else None check_obs_key(adata, batch_key) markers = resolve_markers(adata, markers) if copy: adata = adata.copy() X = marker_matrix(adata, markers, layer=layer) batches = np.asarray(adata.obs[batch_key].values) if method == "qnorm": new_X = _quantile_norm(X, batches) else: if method == "scale": fn = _scale elif method == "rank": fn = lambda col: _rank_norm(col, ties_method) elif method == "CLR": fn = _clr_mean elif method == "CLR_seu": fn = _clr elif method == "CLR_med": fn = _clr_med else: raise ValueError(f"Unknown normalization method: {method!r}") new_X = np.empty_like(X, dtype=float) # Preserve row order: apply per-batch, scatter results back. for b in pd.unique(batches): mask = batches == b new_X[mask] = _apply_column_wise(X[mask], fn) set_marker_matrix(adata, markers, new_X, layer=layer) return adata if copy else None