import polars as pl
import functools as ft
from .utils import (_as_list,
_col_expr,
_col_exprs,
_kwargs_as_exprs,
_mutate_cols,
_uses_by,
_filter_kwargs_for,
_expand_to_full_path_or_url
)
from .funs import map
from .stringr import str_c, str_replace_all
from .stats import *
from .reexports import *
from .type_conversion import *
from .helpers import everything, matches, DescCol, desc, across, where
import numpy as np
import pandas as pd
import polars.selectors as cs
import re, copy, os
from itertools import chain
import warnings
warnings.filterwarnings("ignore", category=pl.exceptions.MapWithoutReturnDtypeWarning)
__all__ = [
"tibble", "TibbleGroupBy",
"from_pandas", "from_polars",
"__get_accepted_output_formats__"
]
[docs]
class tibble(pl.DataFrame):
"""
A data frame object that provides methods familiar to R tidyverse users.
"""
def __init__(self, *args, **kwargs):
# Support R-like keyword argument syntax: tibble(x=[1,2], y=[3,4])
# by converting kwargs to a dict if no positional args are provided
if len(args) == 0 and kwargs:
super().__init__(kwargs)
else:
super().__init__(*args, **kwargs)
@property
def _constructor(self):
# '''
# This method ensures that the method tibble return an instance
# of tibble, instead of a DataFrame
# '''
return self.__class__
def _repr_html_(self):
# """
# Printing method for jupyter
# Output rows and columns can be modified by setting the following ENVIRONMENT variables:
# * POLARS_FMT_MAX_COLS: set the number of columns
# * POLARS_FMT_MAX_ROWS: set the number of rows
# """
df = self.to_polars()
return df._repr_html_()
def __copy__(self):
# Shallow copy
# See: https://stackoverflow.com/a/51043609/13254470
obj = type(self).__new__(self.__class__)
obj.__dict__.update(self.__dict__)
return obj
def __getattribute__(self, attr):
if attr in _polars_methods:
raise AttributeError
return pl.DataFrame.__getattribute__(self, attr)
def __dir__(self):
_tidypolars_methods = [
'arrange', 'bind_cols', 'bind_rows', 'colnames', 'clone', 'count',
'crossing',
'distinct', 'drop', 'drop_na', 'drop_null', 'head', 'fill', 'filter',
'group_by',
'hoist',
'inner_join', 'left_join', 'mutate', 'names', 'nest',
'nrow', 'ncol',
'full_join', 'pack', 'pivot_longer', 'pivot_wider', 'print',
'pull', 'relocate', 'rename',
'replace',
'replace_null', 'right_join', 'select',
'separate',
'separate_longer_delim', 'separate_longer_position',
'separate_rows',
'separate_wider_delim', 'separate_wider_position',
'separate_wider_regex',
'set_names',
'slice', 'slice_head', 'slice_max', 'slice_min', 'slice_sample',
'slice_tail', 'summarize', 'tail',
'save_data',
'to_pandas', 'to_polars', 'unnest', 'unnest_longer', 'unnest_wider',
'unpack'
]
return _tidypolars_methods
[docs]
def arrange(self, *args):
"""
Arrange/sort rows
Parameters
----------
*args : str
Columns to sort by
Examples
--------
>>> df = tp.tibble({'x': ['a', 'a', 'b'], 'y': range(3)})
>>> # Arrange in ascending order
>>> df.arrange('x', 'y')
>>> # Arrange some columns descending
>>> df.arrange(tp.desc('x'), 'y')
Returns
-------
tibble
Original tibble ordered by ``args``
"""
exprs = _as_list(args)
desc = [True if isinstance(expr, DescCol) else False for expr in exprs]
return super()\
.sort(exprs, descending = desc, nulls_last=True)\
.pipe(from_polars)
[docs]
def bind_cols(self, *args):
"""
Bind data frames by columns
Parameters
----------
*args : tibble
Data frame to bind
Returns
-------
tibble
The original tibble with added columns
from the other tibble specified in ``args``
Examples
--------
>>> df1 = tp.tibble({'x': ['a', 'a', 'b'], 'y': range(3)})
>>> df2 = tp.tibble({'a': ['c', 'c', 'c'], 'b': range(4, 7)})
>>> df1.bind_cols(df2)
"""
frames = _as_list(args)
out = self.to_polars()
for frame in frames:
out = out.hstack(frame)
return out.pipe(from_polars)
[docs]
def bind_rows(self, *args):
"""
Bind data frames by row
Parameters
----------
*args : tibble, list
Data frames to bind by row
Returns
-------
tibble
The original tibble with added rows
from the other tibble specified in ``args``
Examples
--------
>>> df1 = tp.tibble({'x': ['a', 'a', 'b'], 'y': range(3)})
>>> df2 = tp.tibble({'x': ['c', 'c', 'c'], 'y': range(4, 7)})
>>> df1.bind_rows(df2)
"""
frames = _as_list(args)
out = pl.concat([self, *frames], how = "diagonal")
return out.pipe(from_polars)
[docs]
def clone(self):
"""
Very cheap deep clone
"""
return super().clone().pipe(from_polars)
[docs]
def count(self, *args, sort = False, name = 'n'):
"""
Returns row counts of the dataset.
If bare column names are provided, count() returns counts by group.
Parameters
----------
*args : str, Expr
Columns to group by
sort : bool
Should columns be ordered in descending order by count
name : str
The name of the new column in the output. If omitted, it will default to "n".
Returns
-------
tibble
If no agument is provided, just return the nomber of rows.
If column names are provided, it will count the unique
values across columns
Examples
--------
>>> df = tp.tibble({'a': [1, 1, 2, 3],
...: 'b': ['a', 'a', 'b', 'b']})
>>> df.count()
shape: (1, 1)
┌─────┐
│ n │
│ u32 │
╞═════╡
│ 4 │
└─────┘
>>> df.count('a', 'b')
shape: (3, 3)
┌─────────────────┐
│ a b n │
│ i64 str u32 │
╞═════════════════╡
│ 1 a 2 │
│ 2 b 1 │
│ 3 b 1 │
└─────────────────┘
"""
args = _as_list(args)
out = self.summarize(pl.len().alias(name), by = args)
if sort == True:
out = out.arrange(desc(name))
return out
[docs]
def distinct(self, *args, keep_all = False):
"""
Select distinct/unique rows
Parameters
----------
*args : str, Expr
Columns to find distinct/unique rows
keep_all : boll
If True, keep all columns. Otherwise, return
only the ones used to select the distinct rows.
Returns
-------
tibble
Tibble after removing the repeated rows based on ``args``
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': ['a', 'a', 'b']})
>>> df.distinct()
>>> df.distinct('b')
"""
args = _as_list(args)
#
if len(args) == 0:
df = super().unique()
else:
df = super().unique(args)
if not keep_all and len(args) > 0:
df = df.select(args)
return df.pipe(from_polars)
[docs]
def drop(self, *args):
"""
Drop unwanted columns
Parameters
----------
*args : str
Columns to drop
Returns
-------
tibble
Tibble with columns in ``args`` dropped
Examples
--------
>>> df.drop('x', 'y')
"""
args = _as_list(args)
drop_cols = self.select(args).names
return super().drop(drop_cols).pipe(from_polars)
[docs]
def drop_null(self, *args):
"""
Drop rows containing missing values
Parameters
----------
*args : str
Columns to drop nulls from (defaults to all)
Returns
-------
tibble
Tibble with rows in ``args`` with missing values dropped
Examples
--------
>>> df = tp.tibble(x = [1, None, 3], y = [None, 'b', 'c'], z = range(3)}
>>> df.drop_null()
>>> df.drop_null('x', 'y')
"""
args = _as_list(args)
if len(args) == 0:
out = super().drop_nulls()
else:
out = super().drop_nulls(args)
return out.pipe(from_polars)
[docs]
def drop_na(self, *args):
"""
Drop rows containing missing values. Alias for :meth:`drop_null`,
matching tidyr's ``drop_na()`` spelling.
Parameters
----------
*args : str
Columns to drop nulls from (defaults to all)
Returns
-------
tibble
Tibble with rows containing nulls in ``args`` removed.
Examples
--------
>>> df = tp.tibble(x = [1, None, 3], y = [None, 'b', 'c'])
>>> df.drop_na()
>>> df.drop_na('x')
"""
return self.drop_null(*args)
[docs]
def equals(self, other, null_equal = True):
"""
Check if two tibbles are equal
"""
df = self.to_polars()
other = other.to_polars()
return df.equals(other, null_equal = null_equal)
[docs]
def head(self, n = 5, *, by = None):
"""
Alias for `.slice_head()`
"""
return self.slice_head(n, by = by)
[docs]
def fill(self, *args, direction = 'down', by = None):
"""
Fill in missing values with previous or next value
Parameters
----------
*args : str
Columns to fill
direction : str
Direction to fill. One of ['down', 'up', 'downup', 'updown']
by : str, list
Columns to group by
Returns
-------
tibble
Tibble with missing values filled
Examples
--------
>>> df = tp.tibble({'a': [1, None, 3, 4, 5],
... 'b': [None, 2, None, None, 5],
... 'groups': ['a', 'a', 'a', 'b', 'b']})
>>> df.fill('a', 'b')
>>> df.fill('a', 'b', by = 'groups')
>>> df.fill('a', 'b', direction = 'downup')
"""
args = _as_list(args)
if len(args) == 0: return self
args = _col_exprs(args)
options = {'down': 'forward', 'up': 'backward'}
if direction in ['down', 'up']:
direction = options[direction]
exprs = [arg.fill_null(strategy = direction) for arg in args]
elif direction == 'downup':
exprs = [
arg.fill_null(strategy = 'forward')
.fill_null(strategy = 'backward')
for arg in args
]
elif direction == 'updown':
exprs = [
arg.fill_null(strategy = 'backward')
.fill_null(strategy = 'forward')
for arg in args
]
else:
raise ValueError("direction must be one of down, up, downup, or updown")
return self.mutate(*exprs, by = by)
[docs]
def filter(self, *args,
by = None):
"""
Filter rows on one or more conditions
Parameters
----------
*args : Expr
Conditions to filter by
by : str, list
Columns to group by
Returns
-------
tibble
A tibble with rows that match condition.
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': ['a', 'a', 'b']})
>>> df.filter(col('a') < 2, col('b') == 'a')
>>> df.filter((col('a') < 2) & (col('b') == 'a'))
>>> df.filter(col('a') <= tp.mean(col('a')), by = 'b')
"""
args = _as_list(args)
exprs = ft.reduce(lambda a, b: a & b, args)
if _uses_by(by):
out = super().group_by(by).map_groups(lambda x: x.filter(exprs))
else:
out = super().filter(exprs)
return out.pipe(from_polars)
[docs]
def inner_join(self, df, left_on = None, right_on = None, on = None, suffix = '_right'):
"""
Perform an inner join
Parameters
----------
df : tibble
Lazy DataFrame to join with.
left_on : str, list
Join column(s) of the left DataFrame.
right_on : str, list
Join column(s) of the right DataFrame.
on: str, list
Join column(s) of both DataFrames. If set, `left_on` and `right_on` should be None.
suffix : str
Suffix to append to columns with a duplicate name.
Returns
-------
tibble
A tibble with intersection of cases in the original and
df tibbles.
Examples
--------
>>> df1.inner_join(df2)
>>> df1.inner_join(df2, on = 'x')
>>> df1.inner_join(df2, left_on = 'left_x', right_on = 'x')
"""
if (left_on == None) & (right_on == None) & (on == None):
on = list(set(self.names) & set(df.names))
return super().join(df, on, 'inner',
left_on = left_on,
right_on= right_on,
suffix= suffix).pipe(from_polars)
[docs]
def left_join(self, df, left_on = None, right_on = None, on = None, suffix = '_right'):
"""
Perform a left join
Parameters
----------
df : tibble
Lazy DataFrame to join with.
left_on : str, list
Join column(s) of the left DataFrame.
right_on : str, list
Join column(s) of the right DataFrame.
on: str, list
Join column(s) of both DataFrames. If set, `left_on` and `right_on` should be None.
suffix : str
Suffix to append to columns with a duplicate name.
Returns
-------
tibble
The original tibble with added columns from tibble df if
they match columns in the original one. Columns to match
on are given in the function parameters.
Examples
--------
>>> df1.left_join(df2)
>>> df1.left_join(df2, on = 'x')
>>> df1.left_join(df2, left_on = 'left_x', right_on = 'x')
"""
if (left_on == None) & (right_on == None) & (on == None):
on = list(set(self.names) & set(df.names))
return super().join(df, on, 'left', left_on = left_on, right_on= right_on, suffix= suffix).pipe(from_polars)
[docs]
def right_join(self, df, left_on = None, right_on = None, on = None, suffix = '_right'):
"""
Perform a right join
Parameters
----------
df : tibble
DataFrame to join with.
left_on : str, list
Join column(s) of the left DataFrame.
right_on : str, list
Join column(s) of the right DataFrame.
on: str, list
Join column(s) of both DataFrames. If set, `left_on` and `right_on` should be None.
suffix : str
Suffix to append to columns with a duplicate name.
Returns
-------
tibble
Every row of ``df`` with matching columns from ``self``.
Unmatched rows on the left side receive null values.
Examples
--------
>>> df1.right_join(df2)
>>> df1.right_join(df2, on = 'x')
>>> df1.right_join(df2, left_on = 'left_x', right_on = 'x')
"""
if (left_on == None) & (right_on == None) & (on == None):
on = list(set(self.names) & set(df.names))
return super().join(df, on, 'right',
left_on = left_on,
right_on = right_on,
suffix = suffix).pipe(from_polars)
[docs]
def mutate(self, *args, by = None, **kwargs):
"""
Add or modify columns
Parameters
----------
*args : Expr
Column expressions to add or modify
by : str, list
Columns to group by
**kwargs : Expr
Column expressions to add or modify
Returns
-------
tibble
Original tibble with new column created.
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': range(3), c = ['a', 'a', 'b']})
>>> df.mutate(double_a = col('a') * 2,
... a_plus_b = col('a') + col('b'))
>>> df.mutate(row_num = row_number(), by = 'c')
"""
exprs = _as_list(args) + _kwargs_as_exprs(kwargs)
out = self.to_polars()
if _uses_by(by):
out = out.group_by(by).map_groups(lambda x: _mutate_cols(x, exprs))
else:
out = _mutate_cols(out, exprs)
return out.pipe(from_polars)
@property
[docs]
def names(self):
"""
Get column names
Returns
-------
list
Names of the columns
Examples
--------
>>> df.names
"""
return super().columns
@property
[docs]
def ncol(self):
"""
Get number of columns
Returns
-------
int
Number of columns
Examples
--------
>>> df.ncol
"""
return super().shape[1]
@property
[docs]
def nrow(self):
"""
Get number of rows
Returns
-------
int
Number of rows
Examples
--------
>>> df.nrow
"""
return super().shape[0]
[docs]
def full_join(self, df, left_on = None, right_on = None, on = None, suffix: str = '_right'):
"""
Perform an full join
Parameters
----------
df : tibble
Lazy DataFrame to join with.
left_on : str, list
Join column(s) of the left DataFrame.
right_on : str, list
Join column(s) of the right DataFrame.
on: str, list
Join column(s) of both DataFrames. If set, `left_on` and `right_on` should be None.
suffix : str
Suffix to append to columns with a duplicate name.
Returns
-------
tibble
Union between the original and the df tibbles. The
rows that don't match in one of the tibbles will be
completed with missing values.
Examples
--------
>>> df1.full_join(df2)
>>> df1.full_join(df2, on = 'x')
>>> df1.full_join(df2, left_on = 'left_x', right_on = 'x')
"""
if (left_on == None) & (right_on == None) & (on == None):
on = list(set(self.names) & set(df.names))
return super().join(df, on, 'full',
left_on = left_on,
right_on= right_on,
suffix= suffix,
coalesce=True).pipe(from_polars)
[docs]
def semi_join(self, df, left_on = None, right_on = None, on = None):
"""
Perform a semi join (keep rows with a match in df, no columns added)
Parameters
----------
df : tibble
DataFrame to join with.
left_on : str, list
Join column(s) of the left DataFrame.
right_on : str, list
Join column(s) of the right DataFrame.
on : str, list
Join column(s) of both DataFrames. If set, `left_on` and `right_on` should be None.
Returns
-------
tibble
Rows from the original tibble that have a match in df.
Examples
--------
>>> df1.semi_join(df2, on = 'x')
"""
if (left_on == None) & (right_on == None) & (on == None):
on = list(set(self.names) & set(df.names))
return super().join(df, on, 'semi',
left_on = left_on,
right_on= right_on).pipe(from_polars)
[docs]
def anti_join(self, df, left_on = None, right_on = None, on = None):
"""
Perform an anti join (keep rows without a match in df)
Parameters
----------
df : tibble
DataFrame to join with.
left_on : str, list
Join column(s) of the left DataFrame.
right_on : str, list
Join column(s) of the right DataFrame.
on : str, list
Join column(s) of both DataFrames. If set, `left_on` and `right_on` should be None.
Returns
-------
tibble
Rows from the original tibble that do not have a match in df.
Examples
--------
>>> df1.anti_join(df2, on = 'x')
"""
if (left_on == None) & (right_on == None) & (on == None):
on = list(set(self.names) & set(df.names))
return super().join(df, on, 'anti',
left_on = left_on,
right_on= right_on).pipe(from_polars)
[docs]
def cross_join(self, df, suffix = '_right'):
"""
Perform a cross join (Cartesian product)
Parameters
----------
df : tibble
DataFrame to join with.
suffix : str
Suffix to append to columns with a duplicate name.
Returns
-------
tibble
All combinations of rows from both tibbles.
Examples
--------
>>> df1.cross_join(df2)
"""
return super().join(df, how='cross', suffix=suffix).pipe(from_polars)
[docs]
def pivot_longer(self,
cols = None,
names_to = "name",
values_to = "value"):
"""
Pivot data from wide to long
Parameters
----------
cols : Expr
List of the columns to pivot. Defaults to all columns.
names_to : str
Name of the new "names" column.
values_to: str
Name of the new "values" column
Returns
-------
tibble
Original tibble, but in long format.
Examples
--------
>>> df = tp.tibble({'id': ['id1', 'id2'], 'a': [1, 2], 'b': [1, 2]})
>>> df.pivot_longer(cols = ['a', 'b'])
>>> df.pivot_longer(cols = ['a', 'b'], names_to = 'stuff', values_to = 'things')
"""
if cols is None:
cols = everything()
if isinstance(cols, dict):
cols = list(cols.keys())
df_cols = pl.Series(self.names)
value_vars = self.select(cols).names
id_vars = df_cols.filter(~df_cols.is_in(value_vars)).to_list()
out = super().unpivot(on=value_vars, index=id_vars, variable_name=names_to, value_name=values_to)
return out.pipe(from_polars)
[docs]
def pivot_wider(self,
names_from = 'name',
values_from = 'value',
id_cols = None,
values_fn = 'first',
values_fill = None
):
"""
Pivot data from long to wide
Parameters
----------
names_from : str
Column to get the new column names from.
values_from : str
Column to get the new column values from
id_cols : str, list
A set of columns that uniquely identifies each observation.
Defaults to all columns in the data table except for the columns specified in
`names_from` and `values_from`.
values_fn : str
Function for how multiple entries per group should be dealt with.
Any of 'first', 'count', 'sum', 'max', 'min', 'mean', 'median', 'last'
values_fill : str
If values are missing/null, what value should be filled in.
Can use: "backward", "forward", "mean", "min", "max", "zero", "one"
Returns
-------
tibble
Original tibble, but in wide format.
Examples
--------
>>> df = tp.tibble({'id': [1, 1], 'variable': ['a', 'b'], 'value': [1, 2]})
>>> df.pivot_wider(names_from = 'variable', values_from = 'value')
"""
if id_cols == None:
df_cols = pl.Series(self.names)
from_cols = pl.Series(self.select(names_from, values_from).names)
id_cols = df_cols.filter(~df_cols.is_in(from_cols.to_list())).to_list()
no_id = len(id_cols) == 0
if no_id:
id_cols = '___id__'
self = self.mutate(___id__ = pl.lit(1))
out = (
self.to_polars()
.pivot(index=id_cols, on=names_from, values=values_from, aggregate_function=values_fn)
.pipe(from_polars)
)
if values_fill != None:
new_cols = pl.Series(out.names)
new_cols = new_cols.filter(~new_cols.is_in(id_cols))
fill_exprs = [col(new_col).fill_null(values_fill) for new_col in new_cols]
out = out.mutate(*fill_exprs)
if no_id: out = out.drop('___id__')
return out
[docs]
def pull(self, var = None):
"""
Extract a column as a series
Parameters
----------
var : str
Name of the column to extract. Defaults to the last column.
Returns
-------
Series
The series will contain the values of the column from `var`.
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': range(3))
>>> df.pull('a')
"""
if var == None:
var = self.names[-1]
return super().get_column(var)
[docs]
def relevel(self, x, ref):
"""
Change the reference level a string or factor and covert to factor
Inputs
------
x : str
Variable name
ref : str
Reference level
Returns
-------
tibble
The original tibble with the column specified in `x` as
an ordered factors, with first category specified in `ref`.
"""
levels = self.pull(x).unique().to_list()
relevels = [ref] + [l for l in levels if l != ref]
self = self.mutate(**{x : as_factor(x, relevels)})
return self
[docs]
def relocate(self, *args, before = None, after = None):
"""
Move a column or columns to a new position
Parameters
----------
*args : str, Expr
Columns to move
Returns
-------
tibble
Original tibble with columns relocated.
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': range(3), 'c': ['a', 'a', 'b']})
>>> df.relocate('a', before = 'c')
>>> df.relocate('b', after = 'c')
"""
cols_all = pl.Series(self.names)
locs_all = pl.Series(range(len(cols_all)))
locs_dict = {k:v for k,v in zip(cols_all, locs_all)}
locs_df = pl.DataFrame(locs_dict, orient = "row")
cols_relocate = _as_list(args)
locs_relocate = pl.Series(locs_df.select(cols_relocate).row(0))
if (len(locs_relocate) == 0):
return self
uses_before = before != None
uses_after = after != None
if (uses_before & uses_after):
raise ValueError("Cannot provide both before and after")
elif (not uses_before) and (not uses_after):
before = cols_all[0]
uses_before = True
if uses_before:
before = locs_df.select(before).get_column(before)
locs_start = locs_all.filter(locs_all < before)
else:
after = locs_df.select(after).get_column(after)
locs_start = locs_all.filter(locs_all <= after)
locs_start = locs_start.filter(~locs_start.is_in(locs_relocate.to_list()))
final_order = pl.concat([locs_start, locs_relocate, locs_all]).unique(maintain_order = True)
final_order = cols_all[final_order].to_list()
return self.select(final_order)
[docs]
def rename(self, *args, regex=False, tolower=False, strict=False, **kwargs):
"""
Rename columns
Parameters
----------
*args : str or dict
If a single dict is provided, it is used as {old_name: new_name}.
If strings are provided, they are treated as pairs: new_name, old_name, ...
regex : bool, default False
If True, uses regular expression replacement
{<matched from>:<matched to>}
tolower : bool, default False
If True, convert all to lower case
**kwargs : str
Keyword arguments in the form new_name='old_name'
Returns
-------
tibble
Original tibble with columns renamed.
Examples
--------
>>> df = tp.tibble({'x': range(3), 't': range(3), 'z': ['a', 'a', 'b']})
>>> df.rename({'x': 'new_x'})
>>> df.rename(new_x = 'x')
>>> df.rename('new_x', 'x')
"""
columns = None
if len(args) == 1 and isinstance(args[0], dict):
columns = args[0]
elif len(args) >= 2 and all(isinstance(a, str) for a in args):
# dplyr-style positional: new_name, old_name, new_name, old_name, ...
columns = {args[i+1]: args[i] for i in range(0, len(args), 2)}
elif len(args) == 0:
pass
else:
raise ValueError("'columns' must be a dictionary, paired strings, or keyword arguments.")
# Handle kwargs: new_name='old_name'
if kwargs:
kw_columns = {v: k for k, v in kwargs.items()}
if columns is None:
columns = kw_columns
else:
columns.update(kw_columns)
if columns is not None:
if regex:
self = self.__rename_regexp__(columns)
else:
self = super().rename(columns, strict=False).pipe(from_polars)
if tolower:
self = self.__rename_tolower__()
return self
def __rename_regexp__(self, mapping):
pattern = next(iter(mapping))
replacement = next(iter(mapping.values()))
old = self.names
new = [re.sub(pattern, replacement, col) for col in self.names]
mapping = {o:n for o, n in zip(old, new)}
return self.rename(mapping, regex=False)
def __rename_tolower__(self):
old = self.names
new = [col.lower() for col in self.names]
mapping = {o:n for o, n in zip(old, new)}
return self.rename(mapping, regex=False)
[docs]
def replace_null(self, replace = None):
"""
Replace null values
Parameters
----------
replace : dict, str, int, or float
Dictionary of column/replacement pairs, or values to
replace null values. If not dict, replace in all columns.
If replace is a string, it will replace nulls in all
string columns, and so on.
Returns
-------
tibble
Original tibble with missing/null values replaced.
Examples
--------
>>> df = tp.tibble({'a': [None, 'abc', 'cde'], 'b':[None, 1, 2], 'c': [None, 1.1, 2.2]})
>>> df.replace_null({'a': 'New value'})
>>> df.replace_null({'a': 1})
>>> df.replace_null({'b': 1})
>>> df.replace_null({'b': 1.1})
>>> df.replace_null({'c': 1})
>>> df.replace_null('a')
>>> df.replace_null(1)
>>> df.replace_null(1.1)
"""
assert replace is not None, "'replace' must be provided."
assert not isinstance(replace, list), "'replace' cannot be a list."
replace_dict = {}
if isinstance(replace, str):
cols = self.select(where("string")).names
if cols:
replace_dict = {col:replace for col in cols}
elif isinstance(replace, (float, int)):
cols = self.select(where("numeric")).names
if cols:
replace_dict = {col:replace for col in cols}
elif isinstance(replace, dict):
replace_dict = replace
if replace_dict:
replace_exprs = [pl.col(key).fill_null(value) for key, value in replace_dict.items()]
res = self.mutate(*replace_exprs)
else:
res = self
return res
[docs]
def separate(self, sep_col, into, sep = '_', remove = True):
"""
Separate a character column into multiple columns
Parameters
----------
sep_col : str
Column to split into multiple columns
into : list
List of new column names
sep : str
Separator to split on. Default to '_'
remove : bool
If True removes the input column from the output data frame
Returns
-------
tibble
Original tibble with a column splitted based on `sep`.
Examples
--------
>>> df = tp.tibble(x = ['a_a', 'b_b', 'c_c'])
>>> df.separate('x', into = ['left', 'right'])
"""
into_len = len(into) - 1
sep_df = (
self
.to_polars()
.select(col(sep_col)
.str.split_exact(sep, into_len)
.alias("_seps")
.struct
.rename_fields(into))
.unnest("_seps")
.pipe(from_polars)
)
out = self.bind_cols(sep_df)
if remove == True:
out = out.drop(sep_col)
return out
[docs]
def separate_wider_delim(self, sep_col, delim, names, *, remove = True,
too_few = 'error', too_many = 'error'):
"""
Split a string column into several columns using a delimiter.
Parameters
----------
sep_col : str
Column to split.
delim : str
Delimiter to split on.
names : list
Names of the resulting columns.
remove : bool
If True (default) drop the original column.
too_few : str
One of ``'error'`` (default) or ``'align_start'``. When
``'error'``, raises if a row produces fewer fields than
``len(names)``.
too_many : str
One of ``'error'`` (default) or ``'drop'``. When ``'error'``,
raises if a row produces more fields than ``len(names)``.
Examples
--------
>>> df = tp.tibble(x = ['a_1', 'b_2'])
>>> df.separate_wider_delim('x', '_', names = ['letter', 'num'])
"""
if too_few not in ('error', 'align_start'):
raise NotImplementedError(f"too_few={too_few!r} is not supported")
if too_many not in ('error', 'drop'):
raise NotImplementedError(f"too_many={too_many!r} is not supported")
n_parts = len(names)
df = self.to_polars()
split_col = pl.col(sep_col).str.split(delim)
lens = df.select(split_col.list.len().alias('__n__')).get_column('__n__')
if too_few == 'error' and (lens < n_parts).any():
raise ValueError(
f"separate_wider_delim: some rows in {sep_col!r} produced "
f"fewer than {n_parts} fields"
)
if too_many == 'error' and (lens > n_parts).any():
raise ValueError(
f"separate_wider_delim: some rows in {sep_col!r} produced "
f"more than {n_parts} fields"
)
exprs = [
pl.col(sep_col).str.split(delim).list.get(i, null_on_oob = True).alias(name)
for i, name in enumerate(names)
]
out = df.with_columns(exprs)
if remove:
out = out.drop(sep_col)
return out.pipe(from_polars)
[docs]
def separate_wider_position(self, sep_col, widths, *, remove = True):
"""
Split a string column into several columns by character positions.
Parameters
----------
sep_col : str
Column to split.
widths : dict
Mapping of new column name → width in characters.
remove : bool
If True (default) drop the original column.
Examples
--------
>>> df = tp.tibble(x = ['2024Q1', '2025Q2'])
>>> df.separate_wider_position('x', widths = {'year': 4, 'q': 2})
"""
offset = 0
exprs = []
for name, width in widths.items():
exprs.append(pl.col(sep_col).str.slice(offset, width).alias(name))
offset += width
out = self.to_polars().with_columns(exprs)
if remove:
out = out.drop(sep_col)
return out.pipe(from_polars)
[docs]
def separate_wider_regex(self, sep_col, patterns, *, remove = True):
"""
Split a string column using a regular expression with named groups.
Parameters
----------
sep_col : str
Column to split.
patterns : str or dict
Either a regex string containing named capturing groups, or a
dict ``{name: sub_pattern}`` which is assembled into a single
regex of named groups in the given order.
remove : bool
If True (default) drop the original column.
Examples
--------
>>> df = tp.tibble(x = ['id-001', 'id-002'])
>>> df.separate_wider_regex('x', {'prefix': '[a-z]+', '_sep': '-', 'num': '\\d+'})
"""
if isinstance(patterns, dict):
regex = ''.join(f'(?P<{k}>{v})' for k, v in patterns.items())
keep = [k for k in patterns.keys() if not k.startswith('_')]
else:
regex = patterns
keep = None
expr = pl.col(sep_col).str.extract_groups(regex)
out = (self.to_polars()
.with_columns(expr.alias('__sep_struct__'))
.unnest('__sep_struct__'))
if keep is not None:
drop_unnamed = [c for c in out.columns
if c in patterns and c.startswith('_')]
if drop_unnamed:
out = out.drop(drop_unnamed)
if remove:
out = out.drop(sep_col)
return out.pipe(from_polars)
[docs]
def separate_longer_delim(self, sep_col, delim):
"""
Split a string column by ``delim`` into longer rows.
Parameters
----------
sep_col : str
Column to split.
delim : str
Delimiter to split on.
Examples
--------
>>> df = tp.tibble(x = ['a,b', 'c'])
>>> df.separate_longer_delim('x', ',')
"""
out = (self.to_polars()
.with_columns(pl.col(sep_col).str.split(delim))
.explode(sep_col))
return out.pipe(from_polars)
[docs]
def separate_longer_position(self, sep_col, width):
"""
Split each string into chunks of ``width`` characters and convert
into longer rows.
Parameters
----------
sep_col : str
Column to split.
width : int
Width of each chunk in characters.
Examples
--------
>>> df = tp.tibble(x = ['abcd', 'efgh'])
>>> df.separate_longer_position('x', 2)
"""
def _chunk(s):
if s is None:
return None
return [s[i:i + width] for i in range(0, len(s), width)]
out = (self.to_polars()
.with_columns(pl.col(sep_col)
.map_elements(_chunk, return_dtype = pl.List(pl.Utf8)))
.explode(sep_col))
return out.pipe(from_polars)
[docs]
def separate_rows(self, *cols, sep = ','):
"""
Split the given columns on ``sep`` and explode them into longer rows.
Superseded by :meth:`separate_longer_delim` but kept for tidyr parity.
Parameters
----------
*cols : str
Columns to split and explode.
sep : str
Delimiter to split on (default: ``','``).
Examples
--------
>>> df = tp.tibble(x = ['a,b', 'c'], y = [1, 2])
>>> df.separate_rows('x', sep = ',')
"""
out = self.to_polars()
for c in cols:
out = out.with_columns(pl.col(c).str.split(sep)).explode(c)
return out.pipe(from_polars)
[docs]
def set_names(self, nm = None):
"""
Change the column names of the data frame
Parameters
----------
nm : list
A list of new names for the data frame
Examples
--------
>>> df = tp.tibble(x = range(3), y = range(3))
>>> df.set_names(['a', 'b'])
"""
if nm == None: nm = self.names
nm = _as_list(nm)
rename_dict = {k:v for k, v in zip(self.names, nm)}
return self.rename(rename_dict)
[docs]
def select(self, *args):
"""
Select or drop columns
Parameters
----------
*args : str, list, dict, or combinations of them
Columns to select. It can combine names, list of names,
and a dict. If dict, it will rename the columns based
on the dict.
It also accepts helper functions:
``tp.matches(<regex>)``, ``tp.contains(<str>)``, ``tp.where(<str>)``.
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': range(3), 'abcba': ['a', 'a', 'b']})
>>> df.select('a', 'b')
>>> df.select(col('a'), col('b'))
>>> df.select({'a': 'new name'}, tp.matches("c"))
>>> df.select(tp.where('numeric'))
"""
# convert to list if dict.keys or dict.values are used
cols_to_select = []
cols_to_rename = {}
if len(args)==1 and cs.is_selector(*args):
cols_to_select = self.to_polars().select(*args).columns
else:
for arg in args:
if isinstance(arg, {}.keys().__class__) or\
isinstance(arg, {}.values().__class__):
cols_to_select += list(arg)
elif isinstance(arg, dict):
cols_to_select += [col for col,_ in arg.items()]
cols_to_rename |= arg
elif isinstance(arg, str):
cols_to_select += [arg]
elif isinstance(arg, list):
cols_to_select += arg
elif isinstance(arg, set):
cols_to_select += list(arg)
# # rename columns if dict is used
# cols_dict = [d for d in args if isinstance(d, dict)]
# if cols_dict:
# cols_dict = cols_dict[0]
# dict_list = list(cols_dict.values())
# self = self.rename(cols_dict)
# else:
# dict_list = []
# # collect str and list elements
# cols_list = [c for c in args if isinstance(c, str) or isinstance(c, list)]
# # flatten list
# cols_list = list(chain.from_iterable((x if isinstance(x, list)
# else [x] for x in cols_list )))
# # collect dict.keys() or dict.values()
# cols_dict_keys = [k for k in args if isinstance( k, type({}.keys()) )]
# cols_dict_values = [k for k in args if isinstance( k, type({}.values()) )]
# # collect set
# cols_set = [s for s in args if isinstance(s, set)]
# if cols_set:
# cols_set = list(cols_set[0])
# cols = cols_list + dict_list + cols_dict_keys +cols_dict_values +cols_set
# remove non-existing columns
cols_to_select = [col for col in cols_to_select
if col in self.names
or (col.startswith("^") and col.endswith("$"))]
# # remove duplicated
# cols_to_select = list(set(cols_to_select))
# cols = [col for col in cols if col in self.names or
# (col.startswith("^") and col.endswith("$"))]
cols = _col_exprs(cols_to_select)
return super().select(cols).pipe(from_polars).rename(cols_to_rename)
[docs]
def slice(self, *args, by = None):
"""
Grab rows from a data frame
Parameters
----------
*args : int, list
Rows to grab
by : str, list
Columns to group by
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': range(3), 'c': ['a', 'a', 'b']})
>>> df.slice(0, 1)
>>> df.slice(0, by = 'c')
"""
rows = _as_list(args)
if _uses_by(by):
df = super(tibble, self).group_by(by).map_groups(lambda x: x.select(pl.all().gather(rows)))
else:
df = super(tibble, self).select(pl.all().gather(rows))
return df.pipe(from_polars)
[docs]
def slice_head(self, n = 5, *, by = None):
"""
Grab top rows from a data frame
Parameters
----------
n : int
Number of rows to grab
by : str, list
Columns to group by
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': range(3), 'c': ['a', 'a', 'b']})
>>> df.slice_head(2)
>>> df.slice_head(1, by = 'c')
"""
col_order = self.names
if _uses_by(by):
df = super(tibble, self).group_by(by).head(n)
else:
df = super(tibble, self).head(n)
df = df.select(col_order)
return df.pipe(from_polars)
[docs]
def slice_tail(self, n = 5, *, by = None):
"""
Grab bottom rows from a data frame
Parameters
----------
n : int
Number of rows to grab
by : str, list
Columns to group by
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': range(3), 'c': ['a', 'a', 'b']})
>>> df.slice_tail(2)
>>> df.slice_tail(1, by = 'c')
"""
col_order = self.names
if _uses_by(by):
df = super(tibble, self).group_by(by).tail(n)
else:
df = super(tibble, self).tail(n)
df = df.select(col_order)
return df.pipe(from_polars)
[docs]
def slice_min(self, order_by, n = 1, *, with_ties = True, by = None):
"""
Select rows with the smallest values of ``order_by``.
Parameters
----------
order_by : str, list
Column(s) to order by (ascending).
n : int
Number of rows to return per group.
with_ties : bool
If True (default), include tied rows even if that exceeds ``n``.
by : str, list, optional
Columns to group by.
Examples
--------
>>> df = tp.tibble(x = [1, 2, 2, 3], g = ['a', 'a', 'b', 'b'])
>>> df.slice_min('x', n = 1)
>>> df.slice_min('x', n = 1, by = 'g')
"""
order_cols = _as_list(order_by)
if with_ties:
def _take(frame):
return frame.filter(
pl.struct(order_cols).rank(method = 'min') <= n
)
else:
def _take(frame):
return frame.sort(order_cols).head(n)
if _uses_by(by):
out = super(tibble, self).group_by(by).map_groups(_take)
else:
out = _take(super(tibble, self))
return out.pipe(from_polars)
[docs]
def slice_max(self, order_by, n = 1, *, with_ties = True, by = None):
"""
Select rows with the largest values of ``order_by``.
Parameters
----------
order_by : str, list
Column(s) to order by (descending).
n : int
Number of rows to return per group.
with_ties : bool
If True (default), include tied rows even if that exceeds ``n``.
by : str, list, optional
Columns to group by.
Examples
--------
>>> df = tp.tibble(x = [1, 2, 2, 3], g = ['a', 'a', 'b', 'b'])
>>> df.slice_max('x', n = 1)
>>> df.slice_max('x', n = 1, by = 'g')
"""
order_cols = _as_list(order_by)
if with_ties:
def _take(frame):
return frame.filter(
pl.struct(order_cols).rank(method = 'min', descending = True) <= n
)
else:
def _take(frame):
return frame.sort(order_cols, descending = True).head(n)
if _uses_by(by):
out = super(tibble, self).group_by(by).map_groups(_take)
else:
out = _take(super(tibble, self))
return out.pipe(from_polars)
[docs]
def slice_sample(self, n = None, *, prop = None, replace = False, seed = None, by = None):
"""
Randomly sample rows. Modern replacement for :meth:`sample_n` and
:meth:`sample_frac`.
Parameters
----------
n : int, optional
Number of rows to sample. Provide exactly one of ``n`` or ``prop``.
prop : float, optional
Fraction of rows to sample (between 0 and 1).
replace : bool
Whether to sample with replacement.
seed : int, optional
Random seed for reproducibility.
by : str, list, optional
Columns to group by; sampling happens within each group.
Examples
--------
>>> df.slice_sample(n = 3, seed = 42)
>>> df.slice_sample(prop = 0.5, by = 'g', seed = 42)
"""
assert (n is None) ^ (prop is None), "Provide exactly one of `n` or `prop`."
def _take(frame):
if n is not None:
return frame.sample(n = n, with_replacement = replace, seed = seed)
return frame.sample(fraction = prop, with_replacement = replace, seed = seed)
if _uses_by(by):
out = super(tibble, self).group_by(by).map_groups(_take)
else:
out = _take(super(tibble, self))
return out.pipe(from_polars)
[docs]
def summarise(self, *args,
by = None,
**kwargs):
"""Alias for `.summarize()`"""
return self.summarize(*args, by = by, **kwargs)
[docs]
def summarize(self, *args,
by = None,
**kwargs):
"""
Aggregate data with summary statistics
Parameters
----------
*args : Expr
Column expressions to add or modify
by : str, list
Columns to group by
**kwargs : Expr
Column expressions to add or modify
Returns
-------
tibble
A tibble with the summaries
Examples
--------
>>> df = tp.tibble({'a': range(3), 'b': range(3), 'c': ['a', 'a', 'b']})
>>> df.summarize(avg_a = tp.mean(col('a')))
>>> df.summarize(avg_a = tp.mean(col('a')),
... by = 'c')
>>> df.summarize(avg_a = tp.mean(col('a')),
... max_b = tp.max(col('b')))
"""
exprs = _as_list(args) + _kwargs_as_exprs(kwargs)
if _uses_by(by):
out = super(tibble, self).group_by(by).agg(exprs)
else:
out = super(tibble, self).select(exprs)
return out.pipe(from_polars)
[docs]
def tail(self, n = 5, *, by = None):
"""Alias for `.slice_tail()`"""
return self.slice_tail(n, by = by)
[docs]
def unite(self, col = "_united", unite_cols = [], sep = "_", remove = True):
"""
Unite multiple columns by pasting strings together
Parameters
----------
col : str
Name of the new column
unite_cols : list
List of columns to unite
sep : str
Separator to use between values
remove : bool
If True removes input columns from the data frame
Examples
--------
>>> df = tp.tibble(a = ["a", "a", "a"], b = ["b", "b", "b"], c = range(3))
>>> df.unite("united_col", unite_cols = ["a", "b"])
"""
# if len(unite_cols) == 0:
# unite_cols = self.names
# else:
# unite_cols = _col_exprs(unite_cols)
# unite_cols = self.to_polars().select(unite_cols).columns
out = self.mutate(**{col : str_c([pl.col(c) for c in unite_cols], sep = sep)})
out = out.relocate(col, before = unite_cols[0])
if remove == True:
out = out.drop(unite_cols)
return out
[docs]
def group_by(self, group, *args, **kwargs):
"""
Takes an existing tibble and converts it into a grouped tibble
where operations are performed "by group". ungroup() happens
automatically after the operation is performed.
Parameters
----------
group : str, list
Variable names to group by.
Returns
-------
Grouped tibble
A tibble with values grouped by one or more columns.
"""
res = TibbleGroupBy(self, group, maintain_order=True)
return res
[docs]
def nest(self, by, *args, **kwargs):
"""
creates a nested tibble
Parameters
----------
by : list, str
Columns to nest on
kwargs :
data : list of column names
columns to select to include in the nested data
If not provided, include all columns except the ones
used in 'by'
key : str
name of the resulting nested column.
names_sep : str
If not provided (default), the names in the nested
data will come from the former names. If a string,
the new inner names in the nested dataframe will use
the outer names with names_sep automatically stripped.
This makes names_sep roughly
symmetric between nesting and unnesting.
Returns
-------
tibble
The resulting tibble with have a column that contains
nested tibbles
"""
key = kwargs.get("key", 'data')
data = kwargs.get("data", [c for c in self.names if c not in by])
names_sep = kwargs.get("names_sep", None)
out = (self
.group_by(by)
.agg(**{
key : pl.struct(data).map_elements(
# lambda cols: from_polars( pl.DataFrame(cols.to_list()) ) )
lambda cols: from_polars(pl.DataFrame({'data':cols}).unnest('data')) )
# lambda cols: tibble(cols.to_list()) )
})
.pipe(from_polars)
)
# to keep enum order in the nested data
# enum_columns = [col for col in self.select(data).names
# if self.pull(col).dtype == pl.Enum]
# if enum_columns:
# for col in enum_columns:
# cats = self.pull(col).cat.get_categories().to_list()
# print(cats)
# out = out.mutate(**{key : map([key], lambda row:
# row[0].mutate(col = as_factor(col, cats) )
# }
# # to keep factors
# factors = [col for col in self.select(data).names
# if self.pull(col).dtype == pl.Categorical]
# if factors:
# for col in factors:
# out = out.mutate(**{col : as_factor(col)})
if names_sep is not None:
new_names = {col:f"{col}_{names_sep}" for col in data}
print(new_names)
out = out.mutate(**{key:col(key).map_elements(lambda row: row.rename(new_names))})
return out
[docs]
def unnest(self, col):
"""
Unnest a nested tibble
Parameters
----------
col : str
Columns to unnest
Returns
-------
tibble
The nested tibble will be expanded and become unested
rows of the original tibble.
"""
assert isinstance(col, str), "'col', must be a string"
# not run: error if nested df has different columns
# out = (self
# .mutate(**{
# col : pl.col(col).map_elements(lambda d: d.to_struct())
# })
# .to_polars()
# .explode(col)
# .unnest(col)
# )
# return out.pipe(from_polars)
out = tibble()
for row in self.to_polars().iter_rows(named=True):
if isinstance(row[col], tibble):
n = row[col].nrow
elif isinstance(row[col], list):
n = len(row[col])
ids = {c:v for c, v in row.items() if c not in col}
cols = list(ids.keys())
df_ids = from_polars(pl.DataFrame(ids)
.with_columns(pl.col(cols) .repeat_by(n))
.explode(cols))
out = out.bind_rows(df_ids.bind_cols(row[col]))
out = self.__unnest_cast__(self, out)
return out
def __unnest_cast__(self, df_source, df_target):
# """
# Align the types of columns in df_target to match categorical and enum columns from df_source,
# preserving the original column order.
# Parameters:
# df_source: DataFrame containing categorical and enum columns.
# df_target: DataFrame whose column types need to be aligned.
# Returns:
# A new DataFrame with types aligned to match df_source for categorical and enum columns,
# preserving column order.
# """
df_source = df_source.to_polars()
df_target = df_target.to_polars()
cat_enum_cols = [
col for col, dtype in zip(df_source.columns, df_source.dtypes)
if dtype in [pl.Categorical, pl.Enum]
]
for col in cat_enum_cols:
if col in df_target.columns:
if df_source.schema[col] == pl.Categorical:
df_target = df_target.with_columns(pl.col(col).cast(pl.Categorical))
elif isinstance(df_source.schema[col], pl.Enum):
enum_dtype = df_source.schema[col]
df_target = df_target.with_columns(pl.col(col).cast(enum_dtype))
return from_polars(df_target.select(df_target.columns))
[docs]
def unnest_longer(self, col_name, *, values_to = None, indices_to = None):
"""
Turn each element of a list- or struct-column into its own row.
For list columns, this behaves like ``DataFrame.explode``. For struct
columns, each row is expanded into one row per field, with the field
name going into ``indices_to`` and the field value into ``values_to``.
Parameters
----------
col_name : str
Name of the list or struct column to unnest.
values_to : str, optional
Name of the output value column. For list columns this renames
the exploded column. For struct columns this names the value
column; defaults to ``col_name``.
indices_to : str, optional
For struct columns, the name of the field-name column. Defaults
to ``f"{col_name}_id"``.
Examples
--------
>>> df = tp.tibble(id = [1, 2], vals = [[10, 20], [30]])
>>> df.unnest_longer('vals')
"""
df = self.to_polars()
dtype = df.schema[col_name]
if isinstance(dtype, pl.List):
out = df.explode(col_name)
if values_to is not None:
out = out.rename({col_name: values_to})
return out.pipe(from_polars)
if isinstance(dtype, pl.Struct):
fields = [f.name for f in dtype.fields]
idx = indices_to or f'{col_name}_id'
val = values_to or col_name
other = [c for c in df.columns if c != col_name]
parts = []
for name in fields:
parts.append(
df.select(
*[pl.col(c) for c in other],
pl.lit(name).alias(idx),
pl.col(col_name).struct.field(name).alias(val),
)
)
out = pl.concat(parts, how = 'vertical_relaxed')
ordered = other + [idx, val]
return out.select(ordered).pipe(from_polars)
raise TypeError(
f"unnest_longer requires a List or Struct column, got {dtype}"
)
[docs]
def unnest_wider(self, col_name, *, names_sep = None):
"""
Turn each element of a struct- or list-column into its own column.
Parameters
----------
col_name : str
Name of the column to unnest.
names_sep : str, optional
If provided, the output column names become
``f"{col_name}{names_sep}{field}"`` to avoid collisions.
Examples
--------
>>> df = tp.tibble(id = [1, 2], pt = [{'x': 1, 'y': 2}, {'x': 3, 'y': 4}])
>>> df.unnest_wider('pt')
"""
df = self.to_polars()
dtype = df.schema[col_name]
if isinstance(dtype, pl.Struct):
field_names = [f.name for f in dtype.fields]
if names_sep is not None:
renamed = {f: f'{col_name}{names_sep}{f}' for f in field_names}
out = (df.with_columns(pl.col(col_name).struct.rename_fields(
[renamed[f] for f in field_names]))
.unnest(col_name))
else:
out = df.unnest(col_name)
return out.pipe(from_polars)
if isinstance(dtype, pl.List):
width = df.select(pl.col(col_name).list.len().max()).item() or 0
sep = names_sep if names_sep is not None else '_'
exprs = [
pl.col(col_name).list.get(i, null_on_oob = True)
.alias(f'{col_name}{sep}{i + 1}')
for i in range(width)
]
out = df.with_columns(exprs).drop(col_name)
return out.pipe(from_polars)
raise TypeError(
f"unnest_wider requires a List or Struct column, got {dtype}"
)
[docs]
def hoist(self, col_name, *, remove = False, **fields):
"""
Pull named elements out of a list- or struct-column into top-level columns.
Parameters
----------
col_name : str
Name of the list or struct column to reach into.
remove : bool
If True, drop the original column after hoisting.
**fields : str, int, or list
Each keyword defines a new top-level column. The value is a
path into the list/struct column: a field name, an integer
list index, or a list of such steps for nested access.
Examples
--------
>>> df = tp.tibble(meta = [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}])
>>> df.hoist('meta', a = 'a')
"""
df = self.to_polars()
exprs = []
for new_name, path in fields.items():
expr = pl.col(col_name)
steps = path if isinstance(path, (list, tuple)) else [path]
for step in steps:
if isinstance(step, int):
expr = expr.list.get(step, null_on_oob = True)
else:
expr = expr.struct.field(step)
exprs.append(expr.alias(new_name))
out = df.with_columns(exprs)
if remove:
out = out.drop(col_name)
return out.pipe(from_polars)
[docs]
def pack(self, **groups):
"""
Pack several columns into one or more struct columns.
Parameters
----------
**groups : list or str
Each keyword defines a new struct column. The value is a
list of existing column names to pack into that struct.
Examples
--------
>>> df = tp.tibble(x = [1, 2], y = [3, 4], z = ['a', 'b'])
>>> df.pack(position = ['x', 'y'])
"""
df = self.to_polars()
to_drop = []
for new_name, cols in groups.items():
cols = _as_list(cols)
df = df.with_columns(pl.struct(cols).alias(new_name))
to_drop.extend(c for c in cols if c not in groups)
df = df.drop(to_drop)
return df.pipe(from_polars)
[docs]
def unpack(self, *cols):
"""
Unpack one or more struct columns into their component columns.
Parameters
----------
*cols : str
Names of the struct columns to unpack.
Examples
--------
>>> df = tp.tibble(id = [1, 2]).pack(pt = ['id']) # contrived
>>> df.unpack('pt')
"""
df = self.to_polars()
for c in cols:
df = df.unnest(c)
return df.pipe(from_polars)
[docs]
def crossing(self, *args, **kwargs):
"""
Expands the existing tibble for each value of the
variables used in the `crossing()` argument. See Returns.
Parameters
----------
*args : list
One unamed list is accepted.
**kwargs : list
Keyword will be the variable name, and the values in the list
will be in the expanded tibble
Returns
-------
tibble
A tibble with varibles containing all combinations of the
values in the arguments passed to `crossing()`. The original
tibble will be replicated for each unique combination.
Examples
--------
>>> df = tp.tibble({'a': [1, 2], "b": [3, 5]})
>>> df
shape: (2, 2)
┌───────────┐
│ a b │
│ i64 i64 │
╞═══════════╡
│ 1 3 │
│ 2 5 │
└───────────┘
>>> df.crossing(c = ['a', 'b', 'c'])
shape: (6, 3)
┌─────────────────┐
│ a b c │
│ i64 i64 str │
╞═════════════════╡
│ 1 3 a │
│ 1 3 b │
│ 1 3 c │
│ 2 5 a │
│ 2 5 b │
│ 2 5 c │
└─────────────────┘
"""
out = self.mutate(*args, **kwargs).to_polars()
for var,_ in kwargs.items():
out = out.explode(var)
return out.pipe(from_polars)
[docs]
def glimpse(self, regex='.'):
"""
Print compact information about the data
Parameters
----------
regex : str, list, dict
Return information of the variables that match the regular
expression, the list, or the dictionary. If dictionary is
used, the variable names must be the dictionary keys.
Returns
-------
None
"""
assert isinstance(regex, str) or\
isinstance(regex, list) or\
isinstance(regex, dict), "regex must be a list, dict, or regular expression"
# if isinstance(regex, str):
# df = self.select(regex=regex)
# elif isinstance(regex, dict):
# df = self.select(names=list(regex.keys()))
# else:
# df = self.select(names=regex)
print(f"Columns matching pattern '{regex}':")
df = self.select(matches(regex)).to_pandas()
size_col=80
header_var = 'Var'
header_type = 'Type'
header_uniq = 'Uniq'
header_missing = 'Miss'
header_missing_perc = '(%)'
header_head = 'Head'
#
length_col = np.max([len(header_var)] +
[len(col) for col in df.columns])
length_type = np.max([len(header_type)] +
[len(col) for col in
df.dtypes.astype(str).values]) + 2
length_nvalues = np.max([len(header_uniq),
len(str(np.max(df
.apply(pd.unique)
.apply(len))))])
length_missing = np.max([len(header_missing)] +
df.isna().sum().astype(str).apply(len).tolist())
try:
length_missing_perc = np.max([len(header_missing_perc),
len((100*df.isna().sum()/df.shape[0])
.max().astype(int)
.astype(str))+2]
)
except:
length_missing_perc = 3
length_head = size_col - (length_col + length_type + length_nvalues + length_missing )
#
header = (f"{header_var:>{length_col}s} "+
f"{header_type:{length_type}s}"+
f"{header_uniq:>{length_nvalues}s} "+
f"{header_missing:>{length_missing}s} "+
f"{header_missing_perc:>{length_missing_perc}s} "+
f"{header_head:{length_head}s}")
print(header)
# print("-"*size_col)
for col in df.columns:
dtype = str(df[col].dtype)
nvalues = len(df[col].unique())
missings = df[col].isna().sum()
missings_perc = str(int(100*missings/self.nrow))+"%"
#
vals = str(df[col].values)
vals = vals[:length_head] + (vals[length_head+1:], '...')[bool(len(vals) > length_head)]
#
print(f"{col:>{length_col}.{length_col}s} "+
f"{'<'+dtype+'>':{length_type}.{length_type}s}"+
f"{nvalues:>{length_nvalues}d} "+
f"{missings:>{length_missing}d}"+
f"{missings_perc:>{length_missing_perc}s} "
f"{vals:.{length_head+3}s}")
# print(hline)
# print(header)
print('')
print(f"[Rows: {self.nrow}; Columns {self.ncol}]")
return None
[docs]
def colnames(self, regex='.', type=None, include_factor=True):
"""
Return the names of numeric columns in `self` that match 'regex'
type: (str)
include_factor: (boolean)
When type=string, include or not factors
"""
cols = self.select(matches(regex)).names
if type:
if type=='numeric':
selector = pl.selectors.numeric()
if type=='integer':
selector = pl.selectors.integer()
if type=='string':
if include_factor:
selector = pl.selectors.string() | pl.selectors.categorical() | pl.selectors.enum()
else:
selector = pl.selectors.string()
if type=='factor':
selector = pl.selectors.categorical() | pl.selectors.enum()
if type=='date':
selector = pl.selectors.date()
cols_type = self.to_polars().select(selector).columns
cols = [c for c in cols_type if c in cols]
return cols
[docs]
def pipe(self, fn, *args, **kwargs):
"""
Apply a function to the entire DataFrame
Parameters
----------
fn : callable
Function to apply. The tibble is passed as the first argument.
*args : any
Additional positional arguments passed to fn.
**kwargs : any
Additional keyword arguments passed to fn.
Returns
-------
any
Result of ``fn(self, *args, **kwargs)``.
Examples
--------
>>> def add_column(df, name, value):
... return df.mutate(**{name: value})
>>> df.pipe(add_column, 'new_col', 1)
"""
return fn(self, *args, **kwargs)
[docs]
def transmute(self, *args, by = None, **kwargs):
"""
Add or modify columns, keeping only the new columns
Parameters
----------
*args : Expr
Column expressions to add or modify
by : str, list
Columns to group by
**kwargs : Expr
Column expressions to add or modify
Returns
-------
tibble
A tibble with only the newly created columns (and grouping columns if by is used).
Examples
--------
>>> df.transmute(double_a = col('a') * 2)
"""
exprs = _as_list(args) + _kwargs_as_exprs(kwargs)
col_names = []
for expr in exprs:
name = expr.meta.output_name()
col_names.append(name)
out = self.mutate(*args, by = by, **kwargs)
if _uses_by(by):
by_cols = [by] if isinstance(by, str) else list(by)
col_names = by_cols + [c for c in col_names if c not in by_cols]
return out.select(col_names)
[docs]
def clean_names(self, case = 'snake'):
"""
Standardize column names
Parameters
----------
case : str
Case style for column names. Options: 'snake' (default), 'lower', 'upper'.
Returns
-------
tibble
A tibble with cleaned column names.
Examples
--------
>>> df = tp.tibble(**{"First Name": [1], "Last.Name": [2], "AGE (years)": [30]})
>>> df.clean_names()
"""
def _to_snake(name):
name = re.sub(r'[^\w\s]', '_', name)
name = re.sub(r'\s+', '_', name)
name = re.sub(r'([a-z])([A-Z])', r'\1_\2', name)
name = re.sub(r'_+', '_', name)
name = name.strip('_').lower()
return name
mapping = {}
for col_name in self.names:
if case == 'snake':
new_name = _to_snake(col_name)
elif case == 'lower':
new_name = col_name.lower()
elif case == 'upper':
new_name = col_name.upper()
else:
new_name = _to_snake(col_name)
mapping[col_name] = new_name
return self.rename(mapping)
[docs]
def sample_n(self, n, seed = None, with_replacement = False):
"""
Randomly sample n rows
Parameters
----------
n : int
Number of rows to sample.
seed : int, optional
Random seed for reproducibility.
with_replacement : bool
Whether to sample with replacement.
Returns
-------
tibble
A tibble with n randomly sampled rows.
Examples
--------
>>> df.sample_n(5, seed = 42)
"""
return from_polars(super().sample(n = n, seed = seed, with_replacement = with_replacement))
[docs]
def sample_frac(self, fraction, seed = None, with_replacement = False):
"""
Randomly sample a fraction of rows
Parameters
----------
fraction : float
Fraction of rows to sample (between 0 and 1).
seed : int, optional
Random seed for reproducibility.
with_replacement : bool
Whether to sample with replacement.
Returns
-------
tibble
A tibble with a random fraction of rows.
Examples
--------
>>> df.sample_frac(0.5, seed = 42)
"""
return from_polars(super().sample(fraction = fraction, seed = seed, with_replacement = with_replacement))
[docs]
def complete(self, *cols, fill = None):
"""
Complete a DataFrame with all combinations of specified columns
Parameters
----------
*cols : str
Column names to find all combinations of.
fill : dict, optional
Dictionary of column names to fill values for missing combinations.
Returns
-------
tibble
A tibble with all combinations of the specified columns,
with missing values filled according to fill parameter.
Examples
--------
>>> df = tp.tibble(x = [1, 1, 2], y = ['a', 'b', 'a'], val = [10, 20, 30])
>>> df.complete('x', 'y')
"""
cols = list(cols)
unique_values = [self.pull(c).unique().sort() for c in cols]
# Build all combinations
from itertools import product as iterproduct
combos = list(iterproduct(*[v.to_list() for v in unique_values]))
all_combos = tibble(**{c: [row[i] for row in combos] for i, c in enumerate(cols)})
# Left join to get existing data
result = all_combos.left_join(self, on = cols)
# Fill missing values
if fill is not None:
fill_exprs = []
for col_name, value in fill.items():
if col_name in result.names:
fill_exprs.append(pl.col(col_name).fill_null(value).alias(col_name))
if fill_exprs:
result = result.mutate(*fill_exprs)
return result
[docs]
def describe(self):
"""
Generate summary statistics for all columns
Returns
-------
tibble
A tibble with summary statistics including column name, type,
count of non-null values, null count, unique count, and for
numeric columns: mean, std, min, 25%, 50%, 75%, max.
Examples
--------
>>> df.describe()
"""
stats_rows = []
for col_name in self.names:
col_series = self.to_polars().get_column(col_name)
dtype = col_series.dtype
row = {
'column': col_name,
'dtype': str(dtype),
'count': col_series.len(),
'null_count': col_series.null_count(),
'n_unique': col_series.n_unique(),
}
if dtype.is_numeric():
row['mean'] = col_series.mean()
row['std'] = col_series.std()
row['min'] = col_series.min()
row['p25'] = col_series.quantile(0.25)
row['median'] = col_series.quantile(0.5)
row['p75'] = col_series.quantile(0.75)
row['max'] = col_series.max()
else:
row['mean'] = None
row['std'] = None
row['min'] = None
row['p25'] = None
row['median'] = None
row['p75'] = None
row['max'] = None
stats_rows.append(row)
if not stats_rows:
return tibble()
return tibble(**{k: [r[k] for r in stats_rows] for k in stats_rows[0]})
[docs]
def replace_na(self, replace = None):
"""
Replace null values in specified columns
Parameters
----------
replace : dict
Dictionary mapping column names to replacement values.
Returns
-------
tibble
A tibble with nulls replaced.
Examples
--------
>>> df.replace_na({'x': 0, 'y': 'missing'})
"""
if replace is None:
return self
exprs = [pl.col(col_name).fill_null(value).alias(col_name)
for col_name, value in replace.items()
if col_name in self.names]
if not exprs:
return self
return self.mutate(*exprs)
[docs]
def get_dupes(self, *cols):
"""
Find duplicate rows
Parameters
----------
*cols : str
Column names to check for duplicates. If empty, checks all columns.
Returns
-------
tibble
A tibble containing duplicate rows with a 'dupe_count' column.
Examples
--------
>>> df.get_dupes('x', 'y')
"""
if len(cols) == 0:
check_cols = self.names
else:
check_cols = list(cols)
counts = (self.to_polars()
.group_by(check_cols)
.agg(pl.len().alias('dupe_count'))
.filter(pl.col('dupe_count') > 1))
return from_polars(
self.to_polars()
.join(counts, on=check_cols, how='inner')
).arrange('dupe_count', *check_cols)
[docs]
def assert_no_nulls(self, *cols):
"""
Assert that specified columns contain no null values
Parameters
----------
*cols : str
Column names to check. If empty, checks all columns.
Returns
-------
tibble
Returns self if assertion passes.
Raises
------
AssertionError
If any null values are found.
Examples
--------
>>> df.assert_no_nulls('x', 'y')
"""
check_cols = list(cols) if len(cols) > 0 else self.names
for col_name in check_cols:
null_count = self.to_polars().get_column(col_name).null_count()
if null_count > 0:
raise AssertionError(f"Column '{col_name}' has {null_count} null values")
return self
[docs]
def assert_unique(self, *cols):
"""
Assert that specified columns have unique combinations
Parameters
----------
*cols : str
Column names to check. If empty, checks all columns.
Returns
-------
tibble
Returns self if assertion passes.
Raises
------
AssertionError
If duplicate combinations are found.
Examples
--------
>>> df.assert_unique('id')
"""
check_cols = list(cols) if len(cols) > 0 else self.names
n_rows = self.nrow
n_unique = self.select(check_cols).distinct().nrow
if n_unique < n_rows:
n_dupes = n_rows - n_unique
raise AssertionError(f"Found {n_dupes} duplicate rows across columns {check_cols}")
return self
[docs]
def to_markdown(self):
"""
Render the tibble as a Markdown table string
Returns
-------
str
A Markdown-formatted table string.
Examples
--------
>>> print(df.to_markdown())
"""
headers = self.names
header_row = '| ' + ' | '.join(headers) + ' |'
sep_row = '| ' + ' | '.join(['---'] * len(headers)) + ' |'
rows = []
df_pl = self.to_polars()
for row in df_pl.iter_rows():
row_vals = [str(v) for v in row]
rows.append('| ' + ' | '.join(row_vals) + ' |')
return '\n'.join([header_row, sep_row] + rows)
# Not tidy functions, but useful from pandas/polars
# -------------------------------------------------
[docs]
def replace(self, rep, regex=False):
"""
Replace method from polars pandas. Replaces values of a column.
Parameters
----------
rep : dict
Format to use polars' replace:
{<varname>:{<old value>:<new value>, ...}}
Format to use pandas' replace:
{<old value>:<new value>, ...}
regex : bool
If true, replace using regular expression. It uses pandas
replace()
Returns
-------
tibble
Original tibble with values of columns replaced based on
rep`.
"""
if regex or not all(isinstance(value, dict) for value in rep.values()):
engine = 'pandas'
else:
engine = 'polars'
if engine=='polars':
out = self.to_polars()
for var, rep in rep.items():
try:
out = out.with_columns(**{var : pl.col(var).replace(rep)})
except :
out = out.with_columns(**{var : pl.col(var).replace_strict(rep)})
out = out.pipe(from_polars)
else:
out = self.to_pandas()
out = out.replace(to_replace=rep, regex=regex)
out = out.pipe(from_pandas)
return out
[docs]
def print(self, n=1000, ncols=1000, str_length=1000, digits=2):
"""
Print the DataFrame
Parameters
----------
n : int, default=1000
Number of rows to print
ncols : int, default=1000
Number of columns to print
str_length : int, default=1000
Maximum length of the strings.
Returns
-------
None
"""
with pl.Config(set_tbl_rows=n,
set_tbl_cols=ncols,
float_precision=digits,
fmt_str_lengths=str_length):
print(self)
[docs]
def iterrows(self):
yield from self.to_polars().iter_rows(named=True)
# Statistics
# ----------
[docs]
def descriptive_statistics(self, vars=None, groups=None,
include_categorical=True,
include_type=False):
"""
Compute descriptive statistics for numerical variables and optionally
frequency statistics for categorical variables, with support for grouping.
Parameters
----------
vars : str, list, dict, or None, default None
The variables for which to compute statistics.
- If None, all variables in the dataset (as given by `self.names`) are used.
- If a string, it is interpreted as a single variable name.
- If a list, each element is treated as a variable name.
- If a dict, keys are variable names and values are their labels.
groups : str, list, dict, or None, default None
Variable(s) to group by when computing statistics.
- If None, overall statistics are computed.
- If a string, it is interpreted as a single grouping variable.
- If a list, each element is treated as a grouping variable.
- If a dict, keys are grouping variable names and values are their labels.
include_categorical : bool, default True
Whether to include frequency statistics for categorical variables in the output.
include_type : bool, default False
If True, adds a column indicating the variable type ("Num" for numerical, "Cat" for categorical).
Returns
-------
tibble
A tibble containing the descriptive statistics.
For numerical variables, the statistics include
N (count of non-missing values),
Missing (percentage of missing values),
Mean (average value),
Std.Dev. (standard deviation),
Min (minimum value), and
Max (maximum value).
If grouping is specified, these statistics are computed for each group.
When ``include_categorical`` is True, frequency statistics for
categorical variables are appended to the result.
"""
assert isinstance(vars, str) or isinstance(vars, list) or \
isinstance(vars, dict) or vars is None, \
"'vars' must be a string, dict, or list"
assert isinstance(groups, str) or isinstance(groups, list) or \
isinstance(groups, dict) or groups is None, \
"'groups' must be a string, dict, or list"
if vars is None:
vars = {v:v for v in self.names}
elif isinstance(vars, str):
vars = {vars:vars}
elif isinstance(vars, list):
vars = {v:v for v in vars}
if isinstance(groups, str):
groups = {groups:groups}
elif isinstance(groups, list):
groups = {g:g for g in groups}
if isinstance(groups, dict):
for g in groups.keys():
vars.pop(g, None)
# select only numerical
vars_num = {var:label for var, label in vars.items() if
self.to_polars().schema[var].is_numeric()}
# select only numerical
vars_cat = {var:label for var, label in vars.items() if
not self.to_polars().schema[var].is_numeric()}
# compute statistics for numerical variables
if groups is None:
res = self.__descriptive_statistics__(self, vars_num)
else:
res = (self
.select(vars_num | groups)
.nest(list(groups.values()))
.mutate(summary = map(['data'], lambda col:
self.__descriptive_statistics__(col[0],
vars=vars_num)))
.drop('data')
.unnest('summary')
)
n = self.nrow
res = (res
.mutate(null_count = 100*pl.col("null_count")/n,
count = as_integer('count'))
.rename({"count":'N',
'null_count':'Missing (%)',
"mean":"Mean",
'std':'Std.Dev.',
'min':"Min",
'max':'Max'
})
)
if include_type:
res = res.mutate(Type='Num')
# compute statistics for categorical variables
if vars_cat and include_categorical:
res_cat = tibble()
for var_cat, label in vars_cat.items():
res_cat = res_cat.bind_rows(
self
.freq({var_cat:label}, groups=groups)
.drop('low', 'high')
.rename({'Freq':"Mean",
label:'Variable'})
.mutate(Variable = label + " ("+pl.col("Variable")+")")
.replace_null({'Variable': label + " (Missing)"})
)
if include_type:
res_cat = res_cat.mutate(Type='Cat')
res = res.bind_rows(res_cat)
res = res.arrange('Variable')
return res
def __descriptive_statistics__(self, data, vars=None):
res = (data
.select(vars)
.to_polars()
.describe()
.pipe(from_polars)
.pivot_longer(cols=list(vars.values()), names_to='Variable', values_to='value')
.pivot_wider(names_from='statistic', values_from='value')
)
return res
[docs]
def freq(self, vars=None, groups=None, na_rm=False, na_label=None):
"""
Compute frequency table.
Parameters
----------
vars : str, list, or dict
Variables to return value frequencies for.
If a dict is provided, the key should be the variable name
and the values the variable label for the output
groups : str, list, dict, or None, optional
Variable names to condition marginal frequencies on.
If a dict is provided, the key should be the variable name
and the values the variable label for the output
Defaults to None (no grouping).
na_rm : bool, optional
Whether to include NAs in the calculation. Defaults to False.
na_label : str
Label to use for the NA values
Returns
-------
tibble
A tibble with relative frequencies and counts.
"""
assert vars, "Parameter 'vars' not informed."
assert isinstance(groups, str) or \
isinstance(groups, list) or\
isinstance(groups, dict) or\
groups is None, "Incorrect 'groups' argument format. See documentation."
vars_all = []
if groups is None:
groups = {}
elif isinstance(groups, str):
groups = {groups:groups}
elif isinstance(groups, list):
groups = {g:g for g in groups}
vars_all += list(groups.keys())
if vars is None:
vars = {v:v for v in self.names}
elif isinstance(vars, str):
vars = {vars:vars}
elif isinstance(vars, list):
vars = {v:v for v in vars}
vars_all += list(vars.keys())
# labels = False
# if isinstance(vars, str):
# vars = [vars]
# elif isinstance(vars, dict):
# labels = True
# vars_labels = vars
# vars = list(vars.keys())
# elif type(vars) is {}.keys().__class__:
# vars = list(vars)
# if groups and not isinstance(groups, list):
# groups = [groups]
# if groups:
# vars = groups + vars
res=self.select(vars_all)
if not na_rm:
if na_label is not None:
res=res.replace_null({var:na_label for var in vars})
else:
res=res.drop_null()
if not groups:
res=(res
.group_by(vars_all)
.summarize(n = n())
.mutate(
p = pl.col("n")/pl.col("n").sum(),
freq = 100*pl.col("p"),
stdev = 100*np.sqrt((pl.col('p')*(1-pl.col('p')))/pl.col('n'))
)
)
# for var in vars:
# res = self.__tab_reorder_na__(res, var, na_label)
else:
res = (res
.group_by(vars_all)
.summarize(n = n())
.group_by(list(groups.keys()))
.mutate(
p = pl.col("n")/pl.col("n").sum(),
freq = 100*pl.col("p"),
stdev = 100*np.sqrt((pl.col('p')*(1-pl.col('p')))/pl.col('n'))
)
)
# vars.reverse()
res = (
res
.drop('p')
.mutate(n = as_integer('n'),
low = pl.col('freq')-1.96*pl.col('stdev'),
high = pl.col('freq')+1.96*pl.col('stdev'))
.rename({'n':'N',
'stdev':'Std.Dev.',
'freq':'Freq'}, tolower=False)
.arrange(list(vars.keys()))
)
res = res.rename(vars | groups)
return res
[docs]
def tab(self, row, col, groups=None,
margins=True, normalize='all',#row/columns
margins_name='Total', stat='both',
na_rm=True, na_label='NA', digits=2):
"""
Create a 2x2 contingency table for two categorical variables, with optional grouping,
margins, and normalization.
Parameters
----------
row : str
Name of the variable to be used for the rows of the table.
col : str
Name of the variable to be used for the columns of the table.
groups : str or list of str, optional
Variable name(s) to use as grouping variables. When provided, a separate 2x2 table
is generated for each group.
margins : bool, default True
If True, include row and column totals (margins) in the table.
normalize : {'all', 'row', 'columns'}, default 'all'
Specifies how to compute the marginal percentages in each cell:
- 'all': percentages computed over the entire table.
- 'row': percentages computed across each row.
- 'columns': percentages computed down each column.
margins_name : str, default 'Total'
Name to assign to the row and column totals.
stat : {'both', 'perc', 'n'}, default 'both'
Determines the statistic to display in each cell:
- 'both': returns both percentages and sample size.
- 'perc': returns percentages only.
- 'n': returns sample size only.
na_rm : bool, default True
If True, remove rows with missing values in the `row` or `col` variables.
na_label : str, default 'NA'
Label to use for missing values when `na_rm` is False.
digits : int, default 2
Number of digits to round the percentages to.
Returns
-------
tibble
A contingency table as a tibble. The table contains counts and/or percentages as specified
by the `stat` parameter, includes margins if requested, and is formatted with group headers
when grouping variables are provided.
"""
tab = self.select(row, col, groups).mutate(**{row:as_character(row),
col:as_character(col)})
vars_row = row
vars_col = col
if na_rm:
tab = tab.drop_null()
else:
repl = {var:na_label for var in [row, col]}
tab = tab.replace_null(repl)
tab = tab.to_pandas()
if groups:
groups = [groups] if isinstance(groups, str) else groups
ngroups=len(groups)
resn = self.__tab_groups__(tab, vars_row, vars_col, normalize=False,
margins=margins, margins_name=margins_name,
groups=groups)
resp = self.__tab_groups__(tab, vars_row, vars_col, normalize,
margins, margins_name, groups)
else:
ngroups=0
resn = self.__tab__(tab, vars_row, vars_col, normalize=False,
margins=margins, margins_name=margins_name)
resp = self.__tab__(tab, vars_row, vars_col, normalize=normalize,
margins=margins, margins_name=margins_name)
colsn=resn.columns[ngroups+1:]
colsp=resp.columns[ngroups+1:]
res=resp.iloc[:,0:ngroups+1]
if stat=='both':
for coln, colp in zip(colsn, colsp):
col = [f"{round(100*p, digits)} % ({n})" for p,n
in zip(resp[colp], resn[coln])]
res = res.assign(**{coln:col})
elif stat=='perc':
for colp in colsp:
res = res.assign(**{str(colp):100*resp[colp]})
else:
for coln in colsn:
res = res.assign(**{str(coln):100*resp[coln]})
# Group columns using varname as label
ncat = len(tab[vars_col].unique())
ngroups = 0 if not groups else len(groups)
col_groups = ['']*(ngroups+1) + [vars_col]*ncat+['']
col_ix = pd.MultiIndex.from_arrays([col_groups, res.columns])
res.columns = col_ix
res.columns.names = ['', '']
res.columns.name = ''
res.columns = [col[1] for col in res.columns]
res = self.__tab_reorder_na__(res, row, na_label)
return from_pandas(res)
def __tab__(self, tab, row, col, normalize='all', margins=True, margins_name='Total'):
if normalize=='row':
normalize='index'
if normalize=='column' or normalize=='col':
normalize='columns'
res = pd.crosstab(index=[tab[row]],
columns=[tab[col]],
margins=margins, margins_name=margins_name,
normalize=normalize)
res = res.reset_index(drop=False)
return res
def __tab_groups__(self, tab, vars_row, vars_col, normalize,
margins, margins_name, groups):
res = (tab
.groupby(groups)
.apply(self.__tab__,
vars_row, vars_col, normalize, margins, margins_name)
.reset_index(drop=False)
)
cols = [col for cidx, col in enumerate(list(res.columns) ) if
not bool(re.search(pattern='^level_[0-9]$', string=col))]
res=res.filter(cols)
return res
def __tab_reorder_na__(self, tab, row, na_label):
tab = from_pandas(tab).to_polars()
# Check if "Total" column exists and place "AB" before it
if na_label in tab.columns:
if "Total" in tab.columns:
total_index = tab.columns.index("Total")
columns = tab.columns[:total_index] + [na_label] + tab.columns[total_index:]
if na_label in tab.columns:
columns.remove(na_label) # Avoid duplication of "AB"
tab = tab.select(columns)
# Check if "Total" row exists and move "ABC" before it
if na_label in tab[row]:
na_row = tab.filter(tab[row] == na_label)
non_na_rows = tab.filter(tab[row] != na_label)
if "Total" in tab[row].to_list():
total_row_index = non_na_rows[row].to_list().index("Total")
before_total_rows = non_na_rows[:total_row_index]
after_total_rows = non_na_rows[total_row_index:]
tab = pl.concat([before_total_rows, na_row, after_total_rows], how="vertical")
else:
tab = pl.concat([non_na_rows, na_row], how="vertical")
return tab.to_pandas()
# Exporting table
# ---------------
[docs]
def save_data(self, fn, copies=None, sep=';', kws_latex=None, *args, **kws):
"""
Save data based on the filename.
Parameters
----------
fn : callable, str
Path and filename
copies : list of str
List with strings with the file extensions. Copies of the
file are saved based on the extension, using the same
filename and path used in "``fn``".
sep: str (optional)
Set the column separator to export to text-like files (.csv,
.tsv, .txt, etc.)
kws_latex : dict
Arguments of to_latex(). See tibble.to_latex()
Notes
-----
Additional positional and keyword arguments are passed to the
underlying method used to save the file, which is based on the
file extension.
* .tex => tidypolars_extra.tibble.to_latex
* .csv => polars.write_csv (uses sep=';' as default)
* .tsv => polars.write_csv (uses sep='\t' as default)
* .dat => polars.write_csv (uses sep=' ' as default)
* .txt => polars.write_csv (uses sep=' ' as default)
* .xls => polars.write_excel
* .xlsx => polars.write_excel
* .dta => pandas.DataFrame.to_stata
* .parquet => polars.write_parquet
Use silently=True to save quietly (Default False).
"""
assert fn, "Filename (fn) must be provided."
silently = kws.get("silently", False)
fn = _expand_to_full_path_or_url(fn)
fn_base, ext = os.path.splitext(fn)
folder = os.path.dirname(fn)
assert ext, "File extension (.csv, .tex, .xlsx, etc.) must be provided."
copies = copies or kws.get("copies", [])
copies = copies if isinstance(copies, list) else [copies]
ext = ext.replace('.', '')
copies = [c.replace('.', '') for c in copies]
ext_to_save = set([ext] + copies)
kws['fn'] = fn_base
AVAILABE_FORMATS = __get_accepted_output_formats__(_print=False)
for ext in ext_to_save:
kws['ext'] = ext
if ext in AVAILABE_FORMATS['csv-like']:
self.to_csv(*args, **kws)
if ext in AVAILABE_FORMATS['excel-like']:
self.to_excel(*args, **kws)
if ext in AVAILABE_FORMATS['Stata files']:
self.to_dta(*args, **kws)
if ext in AVAILABE_FORMATS['parquet']:
self.to_parquet(*args, **kws)
if ext in AVAILABE_FORMATS['latex']:
kws_latex = kws_latex or {}
fn = f"{fn_base}.{ext}"
print(f"Saving {os.path.basename(fn)}...", end='') if not silently else None
self.to_latex(fn=fn, *args, **kws_latex)
print('done!') if not silently else None
home_dir = os.path.expanduser("~")
path_display = "~" + folder.replace(home_dir, '')
print(f"Save at: {path_display}") if not silently else None
[docs]
def to_excel(self, *args, **kws):
"""
Save tibble to excel.
Details
-------
See polars `write_excel()` for details.
Returns
-------
None
"""
writer = self.to_polars().write_excel
kws_reader = _filter_kwargs_for(writer, kws)
silently = kws.get("silently", False)
ext = kws['ext']
fn = f"{kws['fn']}.{ext}"
fn_base = os.path.basename(fn)
print(f'Saving {fn_base}...', end='') if not silently else None
kws_reader['workbook'] = fn
writer(*args, **kws_reader)
print('done!') if not silently else None
[docs]
def to_csv(self, *args, **kws):
"""
Save tibble to csv.
Details
-------
See polars `write_csv()` for details.
Returns
-------
None
"""
writer = self.to_polars().write_csv
kws_reader = _filter_kwargs_for(writer, kws)
silently = kws.get("silently", False)
ext = kws['ext']
fn = f"{kws['fn']}.{ext}"
fn_base = os.path.basename(fn)
print(f'Saving {fn_base}...', end='') if not silently else None
kws_reader['file'] = fn
writer(*args, **kws_reader)
print('done!') if not silently else None
[docs]
def to_latex(self,
fn = None,
header = None,
digits = 4,
caption = None,
label = None,
align = None,
na_rep = '',
position = '!htb',
group_rows_by = None,
group_title_align = 'l',
footnotes = None,
footnotes_width = '\\linewidth',
index = False,
escape = False,
longtable = False,
longtable_singlespace = True,
rotate = False,
scale = True,
parse_linebreaks=True,
tabular = False,
*args, **kws
):
"""
Convert the object to a LaTeX tabular representation.
Parameters
----------
fn : str
Path with filename
header : list of tuples, optional
The column headers for the LaTeX table. Each tuple corresponds to a column.
Example creating upper level header with grouped columns::
[("", "col 1"),
("Group A", "col 2"),
("Group A", "col 3"),
("Group B", "col 4"),
("Group B", "col 5"),
]
Example creating two upper level headers with grouped columns::
[("Group 1", "" , "col 1"),
("Group 1", "Group A", "col 2"),
("Group 1", "Group A", "col 3"),
("" , "Group B", "col 4"),
("" , "Group B", "col 5"),
]
digits : int, default=4
Number of decimal places to round the numerical values in the table.
caption : str, optional
The caption for the LaTeX table.
label : str, optional
The label for referencing the table in LaTeX.
align : str, optional
Column alignment specifications (e.g., 'lcr').
na_rep : str, default=''
The representation for NaN values in the table.
position : str, default='!htbp'
The placement option for the table in the LaTeX document.
footnotes : dict, optional
A dictionary where keys are column alignments ('c', 'r', or 'l')
and values are the respective footnote strings.
footnotes_width : str, None
Width of the footnote. Example: '\\linewidth', '40pt'
If None, impose no restriction to the width
group_rows_by : str, default=None
Name of the variable in the data with values to group
the rows by.
group_title_align : str, default='l'
Alignment of the title of each row group.
index : bool, default=False
Whether to include the index in the LaTeX table.
escape : bool, default=False
Whether to escape LaTeX special characters.
longtable : bool, deafult=False
If True, table spans multiple pages
longtable_singlespace : bool
Force single space to longtables
rotate : bool
Whether to use landscape table
scale : bool, default=True
If True, scales the table to fit the linewidth when
the table exceeds that size.
Ignored when ``longtable=True`` (LaTeX limitation because
longtable does not use tabular).
parse_linebreaks : bool, default=True
If True, parse \\n and replace it with \\makecel
to produce linebreaks
tabular : bool, default=False
Whether to use a tabular format for the output.
Returns
-------
str
A LaTeX formatted string of the tibble.
"""
NEW_LINE_MARKER = '__new_lines__'
assert footnotes is None or isinstance(footnotes, dict),\
"'footnote' must be a dictionary"
# remove \n in the table cels .... (see below)
char_cols = self.to_polars().select(~cs.numeric()).columns
if char_cols:
self = self.mutate(across(char_cols, lambda col: str_replace_all(col, '\n', NEW_LINE_MARKER)))
# this must be the first operation
if group_rows_by is not None:
self = self.arrange(group_rows_by)
tabm = self.to_pandas().drop([group_rows_by], axis=1)
else:
tabm = self.to_pandas()
ncols = tabm.shape[1]
if tabular and not longtable:
position=None
if align is None:
align = 'l'*ncols
if header is not None:
tabm.columns = pd.MultiIndex.from_tuples(header)
tabl = (tabm
# .round(digits)
# .astype(str)
.to_latex(index = index,
escape = escape,
caption = caption,
label = label,
sparsify = True,
multirow = True,
multicolumn = True,
multicolumn_format = 'c',
column_format = align,
bold_rows = True,
na_rep = na_rep,
float_format=f"%.{digits}f",
position = position
))
rows = tabl.splitlines()
# put \n back .... (see above)
rows = [r.replace(NEW_LINE_MARKER, '\n') for r in rows]
if group_rows_by is not None:
rows = self.__to_latex_group_rows__(group_rows_by, group_title_align, ncols, rows)
# add centering
row = [i for i, txt in enumerate(rows) if
bool(re.search(pattern='begin.*tabular', string=txt))][0]
rows.insert(row,"\\centering")
footnotes_formated = ""
if footnotes is not None:
for align_note, footnote in footnotes.items():
footnote = [footnote] if isinstance(footnote, str) else footnote
for fni in footnote:
if footnotes_width is None:
notes = f"\\multicolumn{{{ncols}}}{{{align_note}}}{{{fni}}}\\\\"
else:
# notes = (f"\\multicolumn{{{ncols}}}{{{align_note}}}{{\\begin{{minipage}}[t]"+
# f"{{{footnotes_width}}}{fni}\\end{{minipage}}}}\\\\")
notes = (f"\\multicolumn{{{ncols}}}{{@{{}}p{{\\dimexpr {footnotes_width}\\relax}}@{{}}}}"+
f"{{\\footnotesize {fni}}}\\\\")
footnotes_formated += notes
if not longtable:
row = [idx for idx, s in enumerate(rows) if 'bottomrule' in s ][0]
rows.insert(row + 1, notes)
# rejoin table
tabl = "\n".join(rows)
# add midrules
if header is not None:
tabl = self.__to_latex_add_midrules_to_table__(tabl)
if longtable:
tabl = self.__to_latex_multipage__(tabl, caption, ncols, align,
label, position,
footnotes_formated,
longtable_singlespace)
if rotate:
tabl = re.sub(pattern="^", repl='\\\\begin{landscape}', string=tabl)
tabl = re.sub(pattern="$", repl='\\\\end{landscape}', string=tabl)
if scale and not longtable:
box = '\\resizebox{\\ifdim\\width>\\linewidth\\linewidth\\else\\width\\fi}{!}{'
tabl = tabl.replace('\\begin{tabular}', f"{box}\n\\begin{{tabular}}")
tabl = tabl.replace('\\end{tabular}', "\\end{tabular}}")
# linebreaks:
if parse_linebreaks:
tabl = self.__to_latex_breaklines__(tabl, longtable)
if fn:
with open(fn, 'w') as f:
f.write(tabl)
tabl = None
return tabl
[docs]
def to_dta(self, *args, **kws):
"""
Save tibble to dta.
Details
-------
See polars `write_dta()` for details.
Returns
-------
None
"""
writer = self.to_pandas().to_stata
kws_reader = _filter_kwargs_for(writer, kws)
silently = kws.get("silently", False)
ext = kws['ext']
fn = f"{kws['fn']}.{ext}"
fn_base = os.path.basename(fn)
print(f'Saving {fn_base}...', end='') if not silently else None
kws_reader['path'] = fn
writer(*args, **kws_reader)
print('done!') if not silently else None
[docs]
def to_parquet(self,
file = str,
compression = 'snappy',
use_pyarrow = False,
silently=False,
*args,
**kws):
"""Write a data frame to a parquet"""
writer = super().write_parquet
kws_reader = _filter_kwargs_for(writer, kws)
silently = kws.get("silently", False)
ext = kws['ext']
fn = f"{kws['fn']}.{ext}"
fn_base = os.path.basename(fn)
print(f'Saving {fn_base}...', end='') if not silently else None
writer(file=fn, compression = compression, use_pyarrow = use_pyarrow, *args, **kws_reader)
print('done!') if not silently else None
# Reporting (to_latex)
# ---------
def __to_latex_process_header_line_for_cmid__(self, line: str) -> str:
# Given a header line (without the trailing newline),
# parse for multicolumn commands and generate a line of cmidrule(s)
# based on the non-empty group labels.
# Example:
# Input line: r"\\multicolumn{3}{c}{Combine} & \\multicolumn{3}{c}{} \\"
# Output: r"\\cmidrule(lr){1-3} \\"
# Remove trailing "\\" if present
line_clean = line.strip()
if line_clean.endswith(r'\\'):
line_clean = line_clean[:-2].strip()
# Split the row into cells (assuming & is the column separator)
cells = [cell.strip() for cell in line_clean.split('&')]
col_counter = 0
midrules = []
# Regex to capture multicolumn: number of columns and content.
# This assumes a simple structure without nested braces.
multicolumn_pattern = re.compile(r'\\multicolumn\{(\d+)\}\{[^}]*\}\{([^}]*)\}')
for cell in cells:
m = multicolumn_pattern.search(cell)
if m:
span = int(m.group(1))
content = m.group(2).strip()
start = col_counter + 1
end = col_counter + span
# Only add a midrule if the cell’s content is not empty
if content:
midrules.append(r'\cmidrule(lr){' + f"{start}-{end}" + '}')
col_counter += span
else:
# A normal cell occupies one column.
col_counter += 1
if midrules:
# Join the midrule commands (separated by a space) and add the trailing \\.
return " ".join(midrules) #+ r' \\'
else:
return ""
def __to_latex_add_midrules_to_table__(self, latex_table: str) -> str:
# Given a LaTeX table (as a string) that uses booktabs commands,
# insert automatically generated cmidrule lines for header rows that
# contain multicolumn cells.
# Assumes that the header is contained between the \\toprule and the first \\midrule.
lines = latex_table.splitlines()
new_lines = []
in_header = False
header_lines = [] # temporarily hold header lines
for line in lines:
# When we hit \toprule, we start the header section.
if r'\toprule' in line:
in_header = True
new_lines.append(line)
# When we hit the first \midrule, process any stored header rows.
elif in_header and r'\midrule' in line:
# Process each header line: output the line and, if applicable, a cmidrule line.
for hline in header_lines:
new_lines.append(hline)
cmid_line = self.__to_latex_process_header_line_for_cmid__(hline)
if cmid_line:
new_lines.append(cmid_line)
# Now add the \midrule line and stop header processing.
new_lines.append(line)
in_header = False
header_lines = []
elif in_header:
# Collect header rows (these are the lines between \toprule and \midrule).
header_lines.append(line)
else:
# Outside the header section, just pass the line along.
new_lines.append(line)
return "\n".join(new_lines)
def __to_latex_multipage__(self, tabl, caption, ncols, align,
label, position, footnote,
longtable_singlespace):
header_old = self.__to_latex_extract_header__(tabl)
header_new = f"""
{header_old}
\\endfirsthead
\\caption[]{{ {caption} }}\\\\
\\multicolumn{{{ncols}}}{{l}}{{\\textit{{(continued)}}}}\\\\
\\toprule
{header_old}
\\midrule
\\endhead
\\bottomrule
{footnote}
\\multicolumn{{{ncols}}}{{r@{{}}}}{{\\textit{{(continued \\ldots)}}}}\\\\
\\endfoot
{footnote}
\\endlastfoot
% ----------------- BODY begins -----------------
"""
longtable_begin = f'\\begin{{longtable}}{{{align}}}\n\n\\caption[]{{{caption}}}\\\\'
longtable_end = '% ----------------- BODY ends -----------------\n\\end{longtable}'
if longtable_singlespace:
longtable_begin = '\\begin{spacing}{1}\n' + longtable_begin
longtable_end = longtable_end + "\n\\end{spacing}"
tabl = (tabl
.replace(f"\\begin{{table}}[{position}]", longtable_begin)
.replace("\\end{table}", longtable_end)
.replace(f"\\label{{{label}}}", f"\\label{{{label}}}\\\\")
.replace("\\centering", '')
.replace(f"\\begin{{tabular}}{{{align}}}", '')
.replace("\\end{tabular}", '')
.replace(header_old, header_new)
)
return tabl
def __to_latex_extract_header__(self, latex_table: str) -> str:
# Extract the header section from a LaTeX table.
# The header is defined as the text between the first occurrence of
# '\\toprule' and '\\midrule'. This function returns that section
# as a single string.
# Parameters:
# latex_table (str): The complete LaTeX table as a string.
# Returns:
# str: The header lines between '\\toprule' and '\\midrule', with
# surrounding whitespace removed.
# Use re.DOTALL so that '.' matches newline characters.
pattern = re.compile(r'\\toprule\s*(.*?)\s*\\midrule', re.DOTALL)
match = pattern.search(latex_table)
if match:
return match.group(1).strip()
else:
return ""
def __to_latex_group_rows__(self, group_rows_by, group_title_align, ncols, rows):
position_first_row = self.__to_latex_group_rows_starting_positions__(rows)
position_last_row = self.__to_latex_group_rows_ending_positions__(rows, position_first_row)
# get groups locations
groups = (self
.pull(group_rows_by)
.to_list())
groups_row_locations = {groups[0]: 0}
for i in range(1, len(groups)):
if groups[i] != groups[i-1]:
groups_row_locations[groups[i]] = i
# insert horizontal space on grouped rows
for i in range(position_first_row, position_last_row):
rows[i] = '\\hspace{1em}' + rows[i]
# insert groups heading rows
# print(f"First row position: {position_first_row}" )
# print(groups_row_locations)
# [print(f"{i+1} {r}") for i, r in enumerate(rows)]
for key, pos in sorted(groups_row_locations.items(),
key=lambda item: item[1], reverse=True):
group_title = f"\\addlinespace[0.3em]\\multicolumn{{{ncols}}}{{{group_title_align}}}{{ \\textbf{{{key}}} }}\\\\"
rows.insert(position_first_row + pos, group_title )
return rows
def __to_latex_group_rows_starting_positions__(self, rows):
# Given a list of LaTeX table rows, returns the index of the first row
# containing '\\midrule' after the last occurrence of a row containing '\\toprule'.
# If either token is not found, the function returns None.
last_top_index = -1
res = None
# Iterate over rows to find the last index containing '\toprule'
for i, row in enumerate(rows):
if r'\toprule' in row:
last_top_index = i
if last_top_index == -1:
res = None
# Search for the first occurrence of '\midrule' after the last '\toprule'
for i in range(last_top_index + 1, len(rows)):
if r'\midrule' in rows[i]:
res = i+1 # Return the index of the row containing '\midrule'
return res
def __to_latex_group_rows_ending_positions__(self, rows, position_first_row):
last_table_row_index = -1
for i, row in enumerate(rows[position_first_row:]):
if r'\bottomrule' in row:
last_table_row_index = position_first_row + i
break
return last_table_row_index
def __to_latex_breaklines__(self, table_str, longtable=False):
# Given a LaTeX table string containing a tabular environment,
# replace internal newline characters within table cells (i.e. those
# that occur within the cell content, not the row terminators) by
# LaTeX line breaks and wrap the cell text with \makecell{...}.
# Table rules such as \\toprule, \midrule, and \\bottomrule are left untouched.
# Parameters:
# table_str (str): A string containing a LaTeX table.
# Returns:
# str: The modified LaTeX table string.
def process_tabular(match):
# match.group(1): The \begin{tabular}{...} line
# match.group(2): The content inside the tabular environment
# match.group(3): The \end{tabular} line
begin_tabular = match.group(1)
content = match.group(2)
end_tabular = match.group(3)
# Split the content into parts while preserving the row separator.
# We assume each row ends with a double backslash (\\) followed by optional whitespace and a newline.
parts = re.split(r'(\\\\\s*\n|\\toprule\n|\\midrule\n|\\bottomrule\n)', content)
# Reassemble rows as tuples: (row_text, row_separator)
rows = []
for i in range(0, len(parts), 2):
row_text = parts[i]
separator = parts[i+1] if i+1 < len(parts) else ''
rows.append((row_text, separator))
processed_rows = []
for row_text, row_sep in rows:
# Skip processing for rows that are table rules.
if row_text.strip() in ('\\toprule', '\\midrule', '\\bottomrule') or\
bool(re.search(pattern="cmidrule", string=row_text)):
processed_rows.append(row_text + row_sep)
continue
# Split the row into cells using the ampersand (&) as the delimiter.
cells = row_text.split('&')
new_cells = []
for cell in cells:
# Remove only trailing whitespace from the cell.
cell_clean = cell.rstrip()
# Check if the cell contains an internal newline.
if '\n' in cell_clean:
# Remove any extra whitespace from the beginning and end.
cell_core = cell_clean.strip()
# Split the cell content by newline, strip each line, and join with LaTeX's line-break command.
cell_lines = cell_core.split('\n')
cell_with_breaks = r'\\'.join(line.strip() for line in cell_lines)
# Wrap the content with \makecell{...}
cell_processed = r'\makecell{' + cell_with_breaks + '}'
else:
cell_processed = cell
new_cells.append(cell_processed)
# Reassemble the row from its cells and append the preserved row separator.
new_row = " & ".join(new_cells)
processed_rows.append(new_row + row_sep)
# Reassemble the entire tabular content.
new_content = "".join(processed_rows)
return begin_tabular + new_content + end_tabular
# Process only the tabular environment in the table string.
if not longtable:
regex = r'(\\begin\{tabular\}\{[^}]*\})(.*?)(\\end\{tabular\})'
else:
regex = r'(. ----------------- BODY begins -----------------)(.*?)(. ----------------- BODY ends -----------------)'
new_table_str = re.sub(
regex,
process_tabular,
table_str,
flags=re.DOTALL
)
return new_table_str
# convert
# -------
[docs]
def to_dict(self, *, as_series = True):
"""
Aggregate data with summary statistics
Parameters
----------
as_series : bool
If True - returns the dict values as Series
If False - returns the dict values as lists
Examples
--------
>>> df.to_dict()
>>> df.to_dict(as_series = False)
"""
return super().to_dict(as_series = as_series)
[docs]
def to_pandas(self):
"""
Convert to a pandas DataFrame
Examples
--------
>>> df.to_pandas()
"""
# keep order of factors (pl.Enum)
enum_columns = [col for col in self.names if self.pull(col).dtype == pl.Enum]
res = self.to_polars().to_pandas()
if enum_columns :
for col in enum_columns:
# Get unique categories in order of appearance
categories_in_order = self.pull(col).cat.get_categories().to_list()
# Convert the column to Categorical
res[col] = pd.Categorical(
res[col],
categories=categories_in_order,
ordered=True
)
return res
[docs]
def to_polars(self):
"""
Convert to a polars DataFrame
Examples
--------
>>> df.to_polars()
"""
self = copy.copy(self)
self.__class__ = pl.DataFrame
return self
[docs]
class TibbleGroupBy(pl.dataframe.group_by.GroupBy):
def __init__(self, df, by, *args, **kwargs):
assert isinstance(by, str) or isinstance(by, list), "Use list or string to group by."
super().__init__(df, by, *args, predicates=kwargs.pop('predicates', None), **kwargs)
[docs]
self.by = by if isinstance(by, list) else [by]
@property
def _constructor(self):
return TibbleGroupBy
[docs]
def mutate(self, *args, **kwargs):
out = self.map_groups(lambda x: from_polars(x).mutate(*args, **kwargs))
return out
[docs]
def filter(self, *args, **kwargs):
out = self.map_groups(lambda x: from_polars(x).filter(*args, **kwargs))
return out
[docs]
def summarize(self, *args, **kwargs):
out = self.map_groups(lambda x: from_polars(x).summarise(by=self.by, *args, **kwargs))
return out
[docs]
def from_polars(df):
"""
Convert from polars DataFrame to tibble
Parameters
----------
df : DataFrame
pl.DataFrame to convert to a tibble
Returns
-------
tibble
Examples
--------
>>> tp.from_polars(df)
"""
# df = copy.copy(df)
# df.__class__ = tibble
df = tibble(df)
return df
[docs]
def from_pandas(df):
"""
Convert from pandas DataFrame to tibble
Parameters
----------
df : DataFrame
pd.DataFrame to convert to a tibble
Returns
-------
tibble
Examples
--------
>>> tp.from_pandas(df)
"""
if isinstance(df, pd.DataFrame):
try:
# Try to convert directly
df = from_polars(pl.from_pandas(df))
except Exception as e:
print(f"Error during conversion: {e}")
print("Identifying problematic columns...")
# Identify problematic columns by attempting individual conversions
problematic_columns = []
for column in df.columns:
try:
pl.from_pandas(df[[column]])
except Exception as col_error:
print(f"Column '{column}' caused an error: {col_error}")
problematic_columns.append(column)
# Convert problematic columns to string type
for column in problematic_columns:
df[column] = df[column].astype(str)
elif isinstance(df, tibble):
pass
elif isinstance(df, pl.DataFrame):
df = from_polars(df)
else:
df = None
return df
_allowed_methods = [
'dtypes', 'frame_equal',
'get_columns', 'lazy', 'pipe',
'iter_rows'
]
_polars_methods = [
'apply',
'columns',
'downsample',
'drop_duplicates',
'explode',
'fill_nan',
'fill_null',
'find_idx_by_name',
'fold',
'get_column',
'groupby',
'hash_rows',
'height',
'hstack',
'insert_at_idx',
'interpolate',
'is_duplicated',
'is_unique',
'join',
'limit',
'max',
'mean',
'median',
'melt',
'min',
'n_chunks',
'null_count',
'quantile',
'rechunk',
# 'replace',
'replace_at_idx',
'row',
'rows'
'sample',
'select_at_idx',
'shape',
'shift',
'shift_and_fill',
'shrink_to_fit',
'sort',
'std',
'sum',
# 'to_arrow',
# 'to_dict',
'to_dicts',
'to_dummies',
'to_ipc',
'to_json',
'to_numpy'
'to_pandas'
'to_parquet',
'transpose',
# 'unnest',
'unique',
'var',
'width',
'with_column',
'with_columns',
'with_column_renamed',
'with_columns'
]
def __get_accepted_output_formats__(_print=False):
ACCEPTED_FILES = {
'csv-like' : ['csv', 'CSV', 'tsv','TSV', 'dat', 'DAT', 'txt', 'TXT'],
'excel-like' : ['xls', 'xlsx', 'xlt', 'XLT', 'xltx', 'XLTX',
'ods', 'ODS', 'XLS', 'XLSX'],
'latex' : ['tex', 'TEX'],
'Stata files' : ['dta', 'DTA'],
'parquet' : ['parquet', 'PARQUET']
}
if _print:
res = None
for file_types, extensions in ACCEPTED_FILES.items():
exts = sorted(set([s.lower().replace(".", '') for s in extensions]))
print(f"- {file_types}: {', '.join(exts)}")
else:
res = ACCEPTED_FILES
return res