Source code for cycombinepy.io
"""FCS I/O utilities.
Port of ``compile_fcs`` / ``convert_flowset`` / ``prepare_data`` from
``R/01_prepare_data.R``. Uses ``pytometry`` (``readfcs`` under the hood) to parse
FCS files into AnnData.
"""
from __future__ import annotations
import glob
import os
from pathlib import Path
from typing import Iterable, Literal
import anndata as ad
import numpy as np
import pandas as pd
from anndata import AnnData
from cycombinepy.preprocessing import transform_asinh
def _read_fcs_one(path: str | os.PathLike) -> AnnData:
"""Read a single FCS file into AnnData via pytometry/readfcs.
We import pytometry's reader lazily so that the top-level ``import
cycombinepy.io`` works even if pytometry isn't installed.
"""
try:
# Import the submodule directly — pytometry's top-level package pulls in
# heavy optional deps that we don't need just for reading FCS.
from pytometry.io._readfcs import read_fcs as _pt_read_fcs # type: ignore
except ImportError:
try:
import readfcs # type: ignore
return readfcs.read(str(path))
except ImportError as exc:
raise ImportError(
"read_fcs_dir requires pytometry or readfcs. Install with "
"`pip install pytometry` or `pip install readfcs`."
) from exc
return _pt_read_fcs(str(path))
[docs]
def read_fcs_dir(
data_dir: str | os.PathLike,
pattern: str = "*.fcs",
metadata: pd.DataFrame | str | os.PathLike | None = None,
filename_col: str = "filename",
sample_key: str | None = None,
batch_key: str | None = None,
condition_key: str | None = None,
anchor_key: str | None = None,
markers: Iterable[str] | None = None,
transform: bool = True,
cofactor: float = 5.0,
derand: bool = True,
downsample: int | None = None,
sampling_type: Literal["random", "per_batch", "per_sample"] = "random",
seed: int | None = None,
) -> AnnData:
"""Read all FCS files in ``data_dir`` into a single AnnData.
Mirrors ``compile_fcs`` + ``convert_flowset`` + ``prepare_data`` from
``R/01_prepare_data.R``. Metadata (a DataFrame or a CSV/Excel path) is joined
on the basename of each FCS file via ``filename_col``. Its columns are
renamed to ``batch`` / ``sample`` / ``condition`` / ``anchor`` if the
corresponding ``*_key`` argument points at them.
Parameters
----------
data_dir
Directory containing FCS files.
pattern
Glob pattern for selecting files.
metadata
DataFrame or path to a CSV/TSV/XLSX table. Must contain ``filename_col``
matching the FCS basenames.
filename_col
Column in ``metadata`` holding the FCS filenames.
sample_key / batch_key / condition_key / anchor_key
Columns of ``metadata`` to use for ``sample`` / ``batch`` / ``condition``
/ ``anchor`` respectively. Resulting ``adata.obs`` will use those
canonical names.
markers
Restrict to these var_names after loading (optional).
transform
If True, apply :func:`cycombinepy.transform_asinh` with ``cofactor``.
cofactor, derand
Forwarded to ``transform_asinh``.
downsample
If given, downsample each unit (defined by ``sampling_type``) to this
many cells.
sampling_type
How to downsample: uniformly at random, or per batch / per sample.
seed
RNG seed.
"""
data_dir = Path(data_dir)
files = sorted(glob.glob(str(data_dir / pattern)))
if not files:
raise FileNotFoundError(f"No FCS files matching {pattern} in {data_dir}")
adatas = []
for f in files:
a = _read_fcs_one(f)
a.obs["filename"] = os.path.basename(f)
adatas.append(a)
adata = ad.concat(adatas, join="outer", index_unique="-")
# Join metadata if provided
if metadata is not None:
if isinstance(metadata, (str, os.PathLike)):
p = Path(metadata)
if p.suffix.lower() in (".xls", ".xlsx"):
meta_df = pd.read_excel(p)
elif p.suffix.lower() == ".tsv":
meta_df = pd.read_csv(p, sep="\t")
else:
meta_df = pd.read_csv(p)
else:
meta_df = metadata.copy()
if filename_col not in meta_df.columns:
raise KeyError(
f"metadata is missing the filename column {filename_col!r}"
)
meta_df = meta_df.set_index(filename_col)
for col in meta_df.columns:
adata.obs[col] = meta_df.loc[adata.obs["filename"].values, col].values
# Canonicalize key columns
rename_map = {
sample_key: "sample",
batch_key: "batch",
condition_key: "condition",
anchor_key: "anchor",
}
for src, dst in rename_map.items():
if src is not None and src != dst and src in adata.obs.columns:
adata.obs[dst] = adata.obs[src].values
# Marker subset
if markers is not None:
keep = [m for m in markers if m in adata.var_names]
adata = adata[:, keep].copy()
# Downsample
if downsample is not None:
rng = np.random.default_rng(seed)
if sampling_type == "random":
if adata.n_obs > downsample:
idx = rng.choice(adata.n_obs, downsample, replace=False)
adata = adata[idx].copy()
elif sampling_type in ("per_batch", "per_sample"):
key = "batch" if sampling_type == "per_batch" else "sample"
if key not in adata.obs.columns:
raise KeyError(f"sampling_type={sampling_type!r} requires obs[{key!r}]")
parts = []
for value in adata.obs[key].unique():
mask = (adata.obs[key] == value).to_numpy()
subset = adata[mask]
if subset.n_obs > downsample:
take = rng.choice(subset.n_obs, downsample, replace=False)
subset = subset[take].copy()
parts.append(subset)
adata = ad.concat(parts, join="outer")
else:
raise ValueError(f"Unknown sampling_type: {sampling_type!r}")
# Asinh transform
if transform:
transform_asinh(adata, cofactor=cofactor, derand=derand, seed=seed)
return adata