Source code for tidypolars_extra.io_r

from __future__ import annotations
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="rpy2.rinterface")

from .io import DATA_LABELS
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

import pandas as pd

try:
    import rpy2.robjects as ro
    from rpy2.robjects import pandas2ri, conversion, default_converter
    from rpy2.rinterface_lib.embedded import RRuntimeError
    from rpy2.rinterface_lib.sexp import NULLType
except ImportError as exc:
    raise ImportError(
        "io_r.py requires 'rpy2'. Install it with:\n\n"
        "    pip install rpy2\n"
    ) from exc

[docs] R = ro.r # handle to the R interpreter
def _detect_file_type(path: Path) -> str: ext = path.suffix.lower() if ext == ".rds": return "rds" if ext in {".rdata", ".rda"}: return "rdata" raise ValueError(f"Unsupported R file extension: {ext!r} (expected .rds or .RData/.rda)") # --- Safe wrapper around readRDS() that does NOT print to console on error --- _SAFE_READRDS = R( """ function(path) { tryCatch( readRDS(path), error = function(e) { structure( list(message = conditionMessage(e)), class = "rds_error" ) } ) } """ ) def _is_null(x: Any) -> bool: # """Return True if x is an R NULL or Python None.""" return x is None or isinstance(x, NULLType) def _load_r_dataframe(path: Path, obj_name: Optional[str] = None) -> ro.vectors.DataFrame: # """ # Load an R data.frame from .rds or .RData. # Parameters # ---------- # path: # Path to .rds or .RData file. # obj_name: # If loading from .RData, optional name of the object to pick. # If None, the first data.frame in the file is used. # Returns # ------- # rpy2.robjects.vectors.DataFrame # """ file_type = _detect_file_type(path) # --- First attempt: treat .rds as a real RDS file (without console error) --- if file_type == "rds": safe_res = _SAFE_READRDS(str(path)) # Is this an "rds_error" object? if bool(R["inherits"](safe_res, "rds_error")[0]): # Unknown input format, etc. → treat as RData fallback file_type = "rdata" else: r_df = safe_res if not bool(R["is.data.frame"](r_df)[0]): raise TypeError(f"Object in {path} is not an R data.frame.") return r_df # --- RData logic (real .RData or “fake RDS” that is actually RData) --- if file_type == "rdata": loaded_names = list(R["load"](str(path))) # character vector of object names if not loaded_names: raise ValueError(f"No objects found in RData file: {path}") if obj_name is not None: if obj_name not in loaded_names: raise ValueError( f"Object {obj_name!r} not found in {path}. " f"Available: {loaded_names}" ) r_obj = ro.globalenv[obj_name] if not bool(R["is.data.frame"](r_obj)[0]): raise TypeError(f"Object {obj_name!r} in {path} is not an R data.frame.") return r_obj # Otherwise, pick the first data.frame in the .RData for name in loaded_names: r_obj = ro.globalenv[name] if bool(R["is.data.frame"](r_obj)[0]): return r_obj raise TypeError( f"No R data.frame found in {path}. " f"Objects present: {loaded_names}" ) # Should not happen because _detect_file_type() would already have errored raise RuntimeError(f"Unexpected file_type for {path}: {file_type}") # Coerce character columns to factors for safe conversion to pandas, # preserving 'label' and 'labels' attributes. _COERCE_CHAR_TO_FACTOR = R( """ function(d) { is_char <- vapply(d, is.character, logical(1L)) if (any(is_char)) { d[is_char] <- lapply(d[is_char], function(x) { lbl <- attr(x, "label", exact = TRUE) vlbl <- attr(x, "labels", exact = TRUE) xf <- as.factor(x) if (!is.null(lbl)) attr(xf, "label") <- lbl if (!is.null(vlbl)) attr(xf, "labels") <- vlbl xf }) } d } """ ) def _r_label_extractor() -> ro.functions.SignatureTranslatedFunction: # """ # Build (once) the small R function that extracts variable- and value-labels. # It returns a list with: # $var_labels : named list of variable labels (or NULL) # $value_labels : named list; each element is either # - NULL if no value labels, or # - named character vector with names = codes, values = labels # (code -> label mapping) # """ return R( """ function(d) { vars <- names(d) # Variable labels (attr(x, "label")) var_labels <- lapply(d, function(x) { lbl <- attr(x, "label", exact = TRUE) if (is.null(lbl)) return(NULL) if (length(lbl) == 0 || is.na(lbl[1])) return(NULL) as.character(lbl[1]) }) names(var_labels) <- vars # Value labels (attr(x, "labels"), e.g. haven::labelled) value_labels <- lapply(d, function(x) { labs <- attr(x, "labels", exact = TRUE) if (is.null(labs)) return(NULL) # Remove NA codes if any labs <- labs[!is.na(labs)] if (length(labs) == 0) return(NULL) # haven::labelled: names(labs) = labels, values(labs) = codes # We want mapping: code -> label codes <- as.character(unname(labs)) labels <- as.character(names(labs)) out <- setNames(labels, codes) # names = codes, values = labels out }) names(value_labels) <- vars list( var_labels = var_labels, value_labels = value_labels ) } """ ) _LABEL_FUN = _r_label_extractor() def _extract_labels(r_df: ro.vectors.DataFrame) -> DATA_LABELS: # """ # Extract variable and value labels from an R data.frame and return DATA_LABELS. # Only variables that actually have labels are included in the dicts. # """ labels_r = _LABEL_FUN(r_df) # R objects: # labels_r$var_labels : named list of labels or NULL # labels_r$value_labels : named list of named vectors or NULL var_labels_r = labels_r.rx2("var_labels") value_labels_r = labels_r.rx2("value_labels") var_names = list(r_df.names) # names() themselves can be NULL, so guard that too if _is_null(var_labels_r): var_label_names = set() else: vnames = var_labels_r.names var_label_names = set([] if _is_null(vnames) else list(vnames)) if _is_null(value_labels_r): value_label_names = set() else: vlnames = value_labels_r.names value_label_names = set([] if _is_null(vlnames) else list(vlnames)) variables: Dict[str, str] = {} values: Dict[str, Dict[Any, str]] = {} for var in var_names: # ---------- VARIABLE LABELS ---------- if var in var_label_names and not _is_null(var_labels_r): vlabel_vec = var_labels_r.rx2(var) if _is_null(vlabel_vec): pass elif len(vlabel_vec) > 0: vlabel = str(vlabel_vec[0]) if vlabel.strip() and vlabel.lower() != "na": variables[var] = vlabel # ---------- VALUE LABELS ---------- if var in value_label_names and not _is_null(value_labels_r): vlabels_vec = value_labels_r.rx2(var) if _is_null(vlabels_vec) or len(vlabels_vec) == 0: continue vnames = vlabels_vec.names if _is_null(vnames): continue # names(vlabels_vec) = codes, values = labels codes = [str(c) for c in list(vnames)] labs = [str(x) for x in list(vlabels_vec)] mapping = { code: lab for code, lab in zip(codes, labs) if lab.strip() } if mapping: values[var] = mapping return DATA_LABELS(original=var_names, variables=variables, values=values)
[docs] def load_r( path: str, obj_name: Optional[str] = None, ) -> Tuple[pd.DataFrame, DATA_LABELS]: # """ # Load an R dataset (.rds or .RData) and extract labels. # Parameters # ---------- # path: # Path to the R file (.rds or .RData/.rda). # obj_name: # If reading from .RData, optional name of the object to load. # If None, the first R data.frame in the file is used. # Returns # ------- # (data, labels) # data : pandas.DataFrame with the R data. # labels : DATA_LABELS instance with: # - labels.variables: only vars with variable labels # - labels.values : only vars with value labels # """ path_obj = Path(path) r_df = _load_r_dataframe(path_obj, obj_name=obj_name) # 1) Extract labels from the ORIGINAL R data.frame labels = _extract_labels(r_df) # 2) Coerce character columns to factors for safe Python conversion r_df_for_py = _COERCE_CHAR_TO_FACTOR(r_df) # 3) Convert the coerced data.frame to pandas with conversion.localconverter(default_converter + pandas2ri.converter): data = conversion.rpy2py(r_df_for_py) if not isinstance(data, pd.DataFrame): raise TypeError( f"Converted object from {path} is not a pandas DataFrame; got {type(data)}" ) return data, labels