Skip to content

CLN: tests/window/moments #44961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 40 additions & 31 deletions pandas/tests/window/moments/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,50 @@
)


# create the data only once as we are not setting it
def _create_consistency_data():
def create_series():
return [
Series(dtype=np.float64, name="a"),
Series([np.nan] * 5),
Series([1.0] * 5),
Series(range(5, 0, -1)),
Series(range(5)),
Series([np.nan, 1.0, np.nan, 1.0, 1.0]),
Series([np.nan, 1.0, np.nan, 2.0, 3.0]),
Series([np.nan, 1.0, np.nan, 3.0, 2.0]),
]

def create_dataframes():
return [
DataFrame(columns=["a", "a"]),
DataFrame(np.arange(15).reshape((5, 3)), columns=["a", "a", 99]),
] + [DataFrame(s) for s in create_series()]

def is_constant(x):
values = x.values.ravel("K")
return len(set(values[notna(values)])) == 1

def no_nans(x):
return x.notna().all().all()

def create_series():
return [
(x, is_constant(x), no_nans(x))
for x in itertools.chain(create_dataframes(), create_dataframes())
Series(dtype=np.float64, name="a"),
Series([np.nan] * 5),
Series([1.0] * 5),
Series(range(5, 0, -1)),
Series(range(5)),
Series([np.nan, 1.0, np.nan, 1.0, 1.0]),
Series([np.nan, 1.0, np.nan, 2.0, 3.0]),
Series([np.nan, 1.0, np.nan, 3.0, 2.0]),
]


@pytest.fixture(params=_create_consistency_data())
def consistency_data(request):
def create_dataframes():
return [
DataFrame(columns=["a", "a"]),
DataFrame(np.arange(15).reshape((5, 3)), columns=["a", "a", 99]),
] + [DataFrame(s) for s in create_series()]


def is_constant(x):
values = x.values.ravel("K")
return len(set(values[notna(values)])) == 1


@pytest.fixture(
params=(
obj
for obj in itertools.chain(create_series(), create_dataframes())
if is_constant(obj)
),
scope="module",
)
def consistent_data(request):
return request.param


@pytest.fixture(params=create_series())
def series_data(request):
return request.param


@pytest.fixture(params=itertools.chain(create_series(), create_dataframes()))
def all_data(request):
"""
Test:
- Empty Series / DataFrame
Expand Down
207 changes: 100 additions & 107 deletions pandas/tests/window/moments/test_moments_consistency_ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def create_mock_weights(obj, com, adjust, ignore_na):


def create_mock_series_weights(s, com, adjust, ignore_na):
w = Series(np.nan, index=s.index)
w = Series(np.nan, index=s.index, name=s.name)
alpha = 1.0 / (1.0 + com)
if adjust:
count = 0
Expand Down Expand Up @@ -58,63 +58,66 @@ def create_mock_series_weights(s, com, adjust, ignore_na):
return w


def test_ewm_consistency_mean(consistency_data, adjust, ignore_na, min_periods):
x, is_constant, no_nans = consistency_data
def test_ewm_consistency_mean(all_data, adjust, ignore_na, min_periods):
com = 3.0

result = x.ewm(
result = all_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).mean()
weights = create_mock_weights(x, com=com, adjust=adjust, ignore_na=ignore_na)
weights = create_mock_weights(all_data, com=com, adjust=adjust, ignore_na=ignore_na)
expected = (
x.multiply(weights).cumsum().divide(weights.cumsum()).fillna(method="ffill")
all_data.multiply(weights)
.cumsum()
.divide(weights.cumsum())
.fillna(method="ffill")
)
expected[
x.expanding().count() < (max(min_periods, 1) if min_periods else 1)
all_data.expanding().count() < (max(min_periods, 1) if min_periods else 1)
] = np.nan
tm.assert_equal(result, expected.astype("float64"))


def test_ewm_consistency_consistent(consistency_data, adjust, ignore_na, min_periods):
x, is_constant, no_nans = consistency_data
def test_ewm_consistency_consistent(consistent_data, adjust, ignore_na, min_periods):
com = 3.0

if is_constant:
count_x = x.expanding().count()
mean_x = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).mean()
# check that correlation of a series with itself is either 1 or NaN
corr_x_x = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).corr(x)
exp = x.max() if isinstance(x, Series) else x.max().max()
count_x = consistent_data.expanding().count()
mean_x = consistent_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).mean()
# check that correlation of a series with itself is either 1 or NaN
corr_x_x = consistent_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).corr(consistent_data)
exp = (
consistent_data.max()
if isinstance(consistent_data, Series)
else consistent_data.max().max()
)

# check mean of constant series
expected = x * np.nan
expected[count_x >= max(min_periods, 1)] = exp
tm.assert_equal(mean_x, expected)
# check mean of constant series
expected = consistent_data * np.nan
expected[count_x >= max(min_periods, 1)] = exp
tm.assert_equal(mean_x, expected)

# check correlation of constant series with itself is NaN
expected[:] = np.nan
tm.assert_equal(corr_x_x, expected)
# check correlation of constant series with itself is NaN
expected[:] = np.nan
tm.assert_equal(corr_x_x, expected)


def test_ewm_consistency_var_debiasing_factors(
consistency_data, adjust, ignore_na, min_periods
all_data, adjust, ignore_na, min_periods
):
x, is_constant, no_nans = consistency_data
com = 3.0

# check variance debiasing factors
var_unbiased_x = x.ewm(
var_unbiased_x = all_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=False)
var_biased_x = x.ewm(
var_biased_x = all_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=True)

weights = create_mock_weights(x, com=com, adjust=adjust, ignore_na=ignore_na)
weights = create_mock_weights(all_data, com=com, adjust=adjust, ignore_na=ignore_na)
cum_sum = weights.cumsum().fillna(method="ffill")
cum_sum_sq = (weights * weights).cumsum().fillna(method="ffill")
numerator = cum_sum * cum_sum
Expand All @@ -126,24 +129,21 @@ def test_ewm_consistency_var_debiasing_factors(


@pytest.mark.parametrize("bias", [True, False])
def test_moments_consistency_var(
consistency_data, adjust, ignore_na, min_periods, bias
):
x, is_constant, no_nans = consistency_data
def test_moments_consistency_var(all_data, adjust, ignore_na, min_periods, bias):
com = 3.0

mean_x = x.ewm(
mean_x = all_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).mean()
var_x = x.ewm(
var_x = all_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=bias)
assert not (var_x < 0).any().any()

if bias:
# check that biased var(x) == mean(x^2) - mean(x)^2
mean_x2 = (
(x * x)
(all_data * all_data)
.ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
.mean()
)
Expand All @@ -152,45 +152,42 @@ def test_moments_consistency_var(

@pytest.mark.parametrize("bias", [True, False])
def test_moments_consistency_var_constant(
consistency_data, adjust, ignore_na, min_periods, bias
consistent_data, adjust, ignore_na, min_periods, bias
):
x, is_constant, no_nans = consistency_data
com = 3.0
if is_constant:
count_x = x.expanding(min_periods=min_periods).count()
var_x = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=bias)
count_x = consistent_data.expanding(min_periods=min_periods).count()
var_x = consistent_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=bias)

# check that variance of constant series is identically 0
assert not (var_x > 0).any().any()
expected = x * np.nan
expected[count_x >= max(min_periods, 1)] = 0.0
if not bias:
expected[count_x < 2] = np.nan
tm.assert_equal(var_x, expected)
# check that variance of constant series is identically 0
assert not (var_x > 0).any().any()
expected = consistent_data * np.nan
expected[count_x >= max(min_periods, 1)] = 0.0
if not bias:
expected[count_x < 2] = np.nan
tm.assert_equal(var_x, expected)


@pytest.mark.parametrize("bias", [True, False])
def test_ewm_consistency_std(consistency_data, adjust, ignore_na, min_periods, bias):
x, is_constant, no_nans = consistency_data
def test_ewm_consistency_std(all_data, adjust, ignore_na, min_periods, bias):
com = 3.0
var_x = x.ewm(
var_x = all_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=bias)
assert not (var_x < 0).any().any()

std_x = x.ewm(
std_x = all_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).std(bias=bias)
assert not (std_x < 0).any().any()

# check that var(x) == std(x)^2
tm.assert_equal(var_x, std_x * std_x)

cov_x_x = x.ewm(
cov_x_x = all_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).cov(x, bias=bias)
).cov(all_data, bias=bias)
assert not (cov_x_x < 0).any().any()

# check that var(x) == cov(x, x)
Expand All @@ -199,57 +196,53 @@ def test_ewm_consistency_std(consistency_data, adjust, ignore_na, min_periods, b

@pytest.mark.parametrize("bias", [True, False])
def test_ewm_consistency_series_cov_corr(
consistency_data, adjust, ignore_na, min_periods, bias
series_data, adjust, ignore_na, min_periods, bias
):
x, is_constant, no_nans = consistency_data
com = 3.0

if isinstance(x, Series):
var_x_plus_y = (
(x + x)
.ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
.var(bias=bias)
)
var_x = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=bias)
var_y = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=bias)
cov_x_y = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).cov(x, bias=bias)
# check that cov(x, y) == (var(x+y) - var(x) -
# var(y)) / 2
tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))

# check that corr(x, y) == cov(x, y) / (std(x) *
# std(y))
corr_x_y = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).corr(x, bias=bias)
std_x = x.ewm(
var_x_plus_y = (
(series_data + series_data)
.ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
.var(bias=bias)
)
var_x = series_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=bias)
var_y = series_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).var(bias=bias)
cov_x_y = series_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).cov(series_data, bias=bias)
# check that cov(x, y) == (var(x+y) - var(x) -
# var(y)) / 2
tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))

# check that corr(x, y) == cov(x, y) / (std(x) *
# std(y))
corr_x_y = series_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).corr(series_data, bias=bias)
std_x = series_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).std(bias=bias)
std_y = series_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).std(bias=bias)
tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))

if bias:
# check that biased cov(x, y) == mean(x*y) -
# mean(x)*mean(y)
mean_x = series_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).std(bias=bias)
std_y = x.ewm(
).mean()
mean_y = series_data.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).std(bias=bias)
tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))

if bias:
# check that biased cov(x, y) == mean(x*y) -
# mean(x)*mean(y)
mean_x = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).mean()
mean_y = x.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
).mean()
mean_x_times_y = (
(x * x)
.ewm(
com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
)
.mean()
)
tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
).mean()
mean_x_times_y = (
(series_data * series_data)
.ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
.mean()
)
tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
Loading