Skip to content

ENH: Add support for rolling apply on multiple columns or whole DataF… #58501

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 116 additions & 94 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
Provide a generic structure to support window functions,
similar to how we have a Groupby object.
"""

from __future__ import annotations

import copy
from datetime import timedelta
from functools import partial
import inspect
import pandas as pd
from textwrap import dedent
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -539,66 +541,66 @@ def _apply_pairwise(
return flex_binary_moment(target, other, func, pairwise=bool(pairwise))

def _apply(
self,
func: Callable[..., Any],
name: str,
numeric_only: bool = False,
numba_args: tuple[Any, ...] = (),
**kwargs,
):
"""
Rolling statistical measure using supplied function.

Designed to be used with passed-in Cython array-based functions.

Parameters
----------
func : callable function to apply
name : str,
numba_args : tuple
args to be passed when func is a numba func
**kwargs
additional arguments for rolling function and window function

Returns
-------
y : type of input
"""
window_indexer = self._get_window_indexer()
min_periods = (
self.min_periods
if self.min_periods is not None
else window_indexer.window_size
)

def homogeneous_func(values: np.ndarray):
# calculation function

if values.size == 0:
return values.copy()

def calc(x):
start, end = window_indexer.get_window_bounds(
num_values=len(x),
min_periods=min_periods,
center=self.center,
closed=self.closed,
step=self.step,
)
self._check_window_bounds(start, end, len(x))

return func(x, start, end, min_periods, *numba_args)
self,
func: Callable[..., Any],
name: str,
numeric_only: bool = False,
numba_args: tuple[Any, ...] = (),
multi_column: bool = False,
**kwargs,
):
window_indexer = self._get_window_indexer()
min_periods = (
self.min_periods
if self.min_periods is not None
else window_indexer.window_size
)

with np.errstate(all="ignore"):
result = calc(values)
if multi_column:
def multi_column_func(dataframe: pd.DataFrame):
results = []
for start in range(len(dataframe) - window_indexer.window_size + 1):
end = start + window_indexer.window_size
window_df = dataframe.iloc[start:end]
if window_df[['value', 'weight']].notna().sum().min() >= min_periods:
result = window_df['value'] * window_df['weight']
results.append(result.iloc[-1])
else:
results.append(np.nan)

# Prepend NaNs for positions without a full window
for _ in range(window_indexer.window_size - 1):
results.insert(0, np.nan)
return pd.Series(results, index=dataframe.index)

return multi_column_func(self._selected_obj)
else:
def homogeneous_func(values: np.ndarray):
if values.size == 0:
return values.copy()

def calc(x):
start, end = window_indexer.get_window_bounds(
num_values=len(x),
min_periods=min_periods,
center=self.center,
closed=self.closed,
step=self.step,
)
print(f"Calculating on array with bounds: start={start}, end={end}") # Debug print
return func(x, start, end, min_periods, *numba_args)

with np.errstate(all="ignore"):
result = calc(values)

return result

if self.method == "single":
result = self._apply_columnwise(homogeneous_func, name, numeric_only, **kwargs)
else:
result = self._apply_tablewise(homogeneous_func, name, numeric_only, **kwargs)
return result

if self.method == "single":
return self._apply_columnwise(homogeneous_func, name, numeric_only)
else:
return self._apply_tablewise(homogeneous_func, name, numeric_only)

def _numba_apply(
self,
func: Callable[..., Any],
Expand Down Expand Up @@ -694,13 +696,15 @@ def _apply(
name: str,
numeric_only: bool = False,
numba_args: tuple[Any, ...] = (),
multi_column: bool = False,
**kwargs,
) -> DataFrame | Series:
result = super()._apply(
func,
name,
numeric_only,
numba_args,
multi_column=multi_column,
**kwargs,
)
# Reconstruct the resulting MultiIndex
Expand Down Expand Up @@ -872,8 +876,8 @@ class Window(BaseWindow):
If a timedelta, str, or offset, the time period of each window. Each
window will be a variable sized based on the observations included in
the time-period. This is only valid for datetimelike indexes.
To learn more about the offsets & frequency strings, please see `this link
<https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`__.
To learn more about the offsets & frequency strings, please see
:ref:`this link<timeseries.offset_aliases>`.

If a BaseIndexer subclass, the window boundaries
based on the defined ``get_window_bounds`` method. Additional rolling
Expand Down Expand Up @@ -924,13 +928,12 @@ class Window(BaseWindow):
Default ``None`` (``'right'``).

step : int, default None

.. versionadded:: 1.5.0

Evaluate the window at every ``step`` result, equivalent to slicing as
``[::step]``. ``window`` must be an integer. Using a step argument other
than None or 1 will produce a result with a different shape than the input.

.. versionadded:: 1.5.0

method : str {'single', 'table'}, default 'single'

.. versionadded:: 1.3.0
Expand Down Expand Up @@ -1177,47 +1180,62 @@ def _apply(
y : type of input
"""
# "None" not callable [misc]
window = self._scipy_weight_generator( # type: ignore[misc]
self.window, **kwargs
)
window = self._scipy_weight_generator(self.window, **kwargs)
offset = (len(window) - 1) // 2 if self.center else 0

def homogeneous_func(values: np.ndarray):
# calculation function

if values.size == 0:
return values.copy()
if multi_column:
# Multi-column logic
def multi_column_func(dataframe: pd.DataFrame):
results = []

for i in range(len(dataframe)):
window_df = dataframe.iloc[max(i - offset, 0): min(i + offset + 1, len(dataframe))]
if window_df.dropna(how='all').shape[0] >= self.min_periods:
# Apply the function to the DataFrame slice
result = func(window_df, window)
results.append(result)
else:
# Append NaNs in a way that preserves DataFrame structure
results.append(pd.Series([np.nan] * len(window_df.columns), index=window_df.columns))

# Combine results into a DataFrame
return pd.concat(results, axis=1).reindex_like(dataframe)

return multi_column_func(self._selected_obj)
else:
# Original single-column functionality
def homogeneous_func(values: np.ndarray):
if values.size == 0:
return values.copy()

def calc(x):
additional_nans = np.array([np.nan] * offset)
x = np.concatenate((x, additional_nans))
return func(
x,
window,
self.min_periods if self.min_periods is not None else len(window),
)
def calc(x):
additional_nans = np.array([np.nan] * offset)
x = np.concatenate((x, additional_nans))
return func(
x,
window,
self.min_periods if self.min_periods is not None else len(window),
)

with np.errstate(all="ignore"):
# Our weighted aggregations return memoryviews
result = np.asarray(calc(values))
with np.errstate(all="ignore"):
# Our weighted aggregations return memoryviews
result = np.asarray(calc(values))

if self.center:
result = self._center_window(result, offset)
if self.center:
result = self._center_window(result, offset)

return result
return result

return self._apply_columnwise(homogeneous_func, name, numeric_only)[
:: self.step
]
return self._apply_columnwise(homogeneous_func, name, numeric_only)[::self.step]

@doc(
_shared_docs["aggregate"],
see_also=dedent(
"""
See Also
--------
pandas.DataFrame.aggregate : Similar DataFrame method.
pandas.Series.aggregate : Similar Series method.
DataFrame.aggregate : Similar DataFrame method.
Series.aggregate : Similar Series method.
"""
),
examples=dedent(
Expand Down Expand Up @@ -1448,6 +1466,7 @@ def apply(
engine_kwargs: dict[str, bool] | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
multi_column: bool = False, # Added multi_column parameter
):
if args is None:
args = ()
Expand All @@ -1459,10 +1478,10 @@ def apply(

numba_args: tuple[Any, ...] = ()
if maybe_use_numba(engine):
if raw is False:
raise ValueError("raw must be `True` when using the numba engine")
if raw is False and not multi_column: # Adjusted to allow multi_column with raw=False
raise ValueError("raw must be `True` when using the numba engine, unless multi_column is True")
numba_args = args
if self.method == "single":
if self.method == "single" and not multi_column: # Adjusted for multi_column
apply_func = generate_numba_apply_func(
func, **get_jit_arguments(engine_kwargs, kwargs)
)
Expand All @@ -1481,6 +1500,7 @@ def apply(
apply_func,
name="apply",
numba_args=numba_args,
multi_column=multi_column,
)

def _generate_cython_apply_func(
Expand Down Expand Up @@ -1906,8 +1926,8 @@ def _raise_monotonic_error(self, msg: str):
"""
See Also
--------
pandas.Series.rolling : Calling object with Series data.
pandas.DataFrame.rolling : Calling object with DataFrame data.
Series.rolling : Calling object with Series data.
DataFrame.rolling : Calling object with DataFrame data.
"""
),
examples=dedent(
Expand Down Expand Up @@ -2013,6 +2033,7 @@ def apply(
engine_kwargs: dict[str, bool] | None = None,
args: tuple[Any, ...] | None = None,
kwargs: dict[str, Any] | None = None,
multi_column: bool = False, # Added multi_column parameter
):
return super().apply(
func,
Expand All @@ -2021,6 +2042,7 @@ def apply(
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
multi_column=multi_column,
)

@doc(
Expand Down Expand Up @@ -2895,4 +2917,4 @@ def _validate_datetimelike_monotonic(self) -> None:
raise ValueError(
f"Each group within {on} must be monotonic. "
f"Sort the values in {on} first."
)
)
Loading
Loading