-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
ENH: Rolling rank #43338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: Rolling rank #43338
Changes from 10 commits
3ebf8c0
ce754f7
f13a720
874c980
4d06ba3
1308208
4caa51b
f2ee5b2
b135f1e
fda85b4
e692ce3
6b23fc0
5f7d319
63d37c5
ba468c6
e078119
bb7005f
1470c7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -50,6 +50,8 @@ cdef extern from "../src/skiplist.h": | |||||
double skiplist_get(skiplist_t*, int, int*) nogil | ||||||
int skiplist_insert(skiplist_t*, double) nogil | ||||||
int skiplist_remove(skiplist_t*, double) nogil | ||||||
int skiplist_rank(skiplist_t*, double) nogil | ||||||
int skiplist_min_rank(skiplist_t*, double) nogil | ||||||
|
||||||
cdef: | ||||||
float32_t MINfloat32 = np.NINF | ||||||
|
@@ -795,7 +797,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, | |||||
val = values[j] | ||||||
if notnan(val): | ||||||
nobs += 1 | ||||||
err = skiplist_insert(sl, val) != 1 | ||||||
err = skiplist_insert(sl, val) == -1 | ||||||
if err: | ||||||
break | ||||||
|
||||||
|
@@ -806,7 +808,7 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start, | |||||
val = values[j] | ||||||
if notnan(val): | ||||||
nobs += 1 | ||||||
err = skiplist_insert(sl, val) != 1 | ||||||
err = skiplist_insert(sl, val) == -1 | ||||||
if err: | ||||||
break | ||||||
|
||||||
|
@@ -1139,6 +1141,111 @@ def roll_quantile(const float64_t[:] values, ndarray[int64_t] start, | |||||
return output | ||||||
|
||||||
|
||||||
cdef enum RankType: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you use the enums defined in pandas/_libs/algos.pyx? e.g. the TIEBREAK_AVERAGE. may need to move the to algox.pyd to import properly. |
||||||
AVERAGE, | ||||||
MIN, | ||||||
MAX, | ||||||
|
||||||
|
||||||
rank_types = { | ||||||
'average': AVERAGE, | ||||||
'min': MIN, | ||||||
'max': MAX, | ||||||
} | ||||||
|
||||||
|
||||||
def roll_rank(const float64_t[:] values, ndarray[int64_t] start, | ||||||
ndarray[int64_t] end, int64_t minp, bint percentile, str method, bint ascending) -> np.ndarray: | ||||||
""" | ||||||
O(N log(window)) implementation using skip list | ||||||
|
||||||
derived from roll_quantile | ||||||
""" | ||||||
cdef: | ||||||
Py_ssize_t i, j, s, e, N = len(values), idx | ||||||
float64_t rank_min = 0, rank = 0 | ||||||
int64_t nobs = 0, win | ||||||
float64_t val | ||||||
skiplist_t *skiplist | ||||||
float64_t[::1] output = None | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
NBD since doesn't affect correctness, but I find this clearer since |
||||||
RankType rank_type | ||||||
|
||||||
try: | ||||||
rank_type = rank_types[method] | ||||||
except KeyError: | ||||||
raise ValueError(f"Method '{method}' is not supported") | ||||||
|
||||||
is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds( | ||||||
start, end | ||||||
) | ||||||
# we use the Fixed/Variable Indexer here as the | ||||||
# actual skiplist ops outweigh any window computation costs | ||||||
output = np.empty(N, dtype=np.float64) | ||||||
|
||||||
win = (end - start).max() | ||||||
if win == 0: | ||||||
output[:] = NaN | ||||||
return np.asarray(output) | ||||||
skiplist = skiplist_init(<int>win) | ||||||
if skiplist == NULL: | ||||||
raise MemoryError("skiplist_init failed") | ||||||
|
||||||
with nogil: | ||||||
for i in range(N): | ||||||
s = start[i] | ||||||
e = end[i] | ||||||
|
||||||
if i == 0 or not is_monotonic_increasing_bounds: | ||||||
if not is_monotonic_increasing_bounds: | ||||||
nobs = 0 | ||||||
skiplist_destroy(skiplist) | ||||||
skiplist = skiplist_init(<int>win) | ||||||
jreback marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
# setup | ||||||
for j in range(s, e): | ||||||
val = values[j] if ascending else -values[j] | ||||||
if notnan(val): | ||||||
nobs += 1 | ||||||
rank = skiplist_insert(skiplist, val) | ||||||
if rank == -1: | ||||||
raise MemoryError("skiplist_insert failed") | ||||||
if rank_type == AVERAGE: | ||||||
rank_min = skiplist_min_rank(skiplist, val) | ||||||
rank = ((rank * (rank + 1) / 2) - ((rank_min - 1) * rank_min / 2)) / (rank - rank_min + 1) | ||||||
elif rank_type == MIN: | ||||||
rank = skiplist_min_rank(skiplist, val) | ||||||
|
||||||
else: | ||||||
# calculate deletes | ||||||
for j in range(start[i - 1], s): | ||||||
val = values[j] if ascending else -values[j] | ||||||
if notnan(val): | ||||||
skiplist_remove(skiplist, val) | ||||||
nobs -= 1 | ||||||
|
||||||
# calculate adds | ||||||
for j in range(end[i - 1], e): | ||||||
val = values[j] if ascending else -values[j] | ||||||
if notnan(val): | ||||||
nobs += 1 | ||||||
rank = skiplist_insert(skiplist, val) | ||||||
if rank == -1: | ||||||
raise MemoryError("skiplist_insert failed") | ||||||
if rank_type == AVERAGE: | ||||||
rank_min = skiplist_min_rank(skiplist, val) | ||||||
rank = ((rank * (rank + 1) / 2) - ((rank_min - 1) * rank_min / 2)) / (rank - rank_min + 1) | ||||||
elif rank_type == MIN: | ||||||
rank = skiplist_min_rank(skiplist, val) | ||||||
if nobs >= minp: | ||||||
output[i] = <float64_t>(rank) / nobs if percentile else rank | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the cast here necessary? |
||||||
else: | ||||||
output[i] = NaN | ||||||
|
||||||
skiplist_destroy(skiplist) | ||||||
|
||||||
return np.asarray(output) | ||||||
|
||||||
|
||||||
def roll_apply(object obj, | ||||||
ndarray[int64_t] start, ndarray[int64_t] end, | ||||||
int64_t minp, | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1500,3 +1500,23 @@ def test_rolling_numeric_dtypes(): | |
dtype="float64", | ||
) | ||
tm.assert_frame_equal(result, expected) | ||
|
||
|
||
@pytest.mark.parametrize("window", [1, 3, 10, 50, 1000]) | ||
@pytest.mark.parametrize("method", ["min", "max", "average"]) | ||
@pytest.mark.parametrize("pct", [True, False]) | ||
@pytest.mark.parametrize("ascending", [True, False]) | ||
@pytest.mark.parametrize("test_data", ["default", "duplicates", "nans"]) | ||
def test_rank(window, method, pct, ascending, test_data): | ||
length = 1000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same |
||
if test_data == "default": | ||
ser = Series(data=np.random.rand(length)) | ||
elif test_data == "duplicates": | ||
ser = Series(data=np.random.choice(3, length)) | ||
elif test_data == "nans": | ||
ser = Series(data=np.random.choice([1.0, 0.25, 0.75, np.nan], length)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above about |
||
|
||
expected = ser.rolling(window).apply(lambda x: x.rank(method=method, pct=pct, ascending=ascending).iloc[-1]) | ||
result = ser.rolling(window).rank(method=method, pct=pct, ascending=ascending) | ||
|
||
tm.assert_series_equal(result, expected) |
Uh oh!
There was an error while loading. Please reload this page.