Skip to content

CLN: Move test_parallel to gil.py #47068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 56 additions & 26 deletions asv_bench/benchmarks/gil.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from functools import wraps
import threading

import numpy as np

from pandas import (
Expand Down Expand Up @@ -30,21 +33,57 @@
from pandas._libs import algos
except ImportError:
from pandas import algos
try:
from pandas._testing import test_parallel # noqa: PDF014

have_real_test_parallel = True
except ImportError:
have_real_test_parallel = False

def test_parallel(num_threads=1):
def wrapper(fname):
return fname
from .pandas_vb_common import BaseIO # isort:skip

return wrapper

def test_parallel(num_threads=2, kwargs_list=None):
"""
Decorator to run the same function multiple times in parallel.

from .pandas_vb_common import BaseIO # isort:skip
Parameters
----------
num_threads : int, optional
The number of times the function is run in parallel.
kwargs_list : list of dicts, optional
The list of kwargs to update original
function kwargs on different threads.

Notes
-----
This decorator does not pass the return value of the decorated function.

Original from scikit-image:

https://github.com/scikit-image/scikit-image/pull/1519

"""
assert num_threads > 0
has_kwargs_list = kwargs_list is not None
if has_kwargs_list:
assert len(kwargs_list) == num_threads

def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
if has_kwargs_list:
update_kwargs = lambda i: dict(kwargs, **kwargs_list[i])
else:
update_kwargs = lambda i: kwargs
threads = []
for i in range(num_threads):
updated_kwargs = update_kwargs(i)
thread = threading.Thread(target=func, args=args, kwargs=updated_kwargs)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()

return inner

return wrapper


class ParallelGroupbyMethods:
Expand All @@ -53,8 +92,7 @@ class ParallelGroupbyMethods:
param_names = ["threads", "method"]

def setup(self, threads, method):
if not have_real_test_parallel:
raise NotImplementedError

N = 10**6
ngroups = 10**3
df = DataFrame(
Expand Down Expand Up @@ -86,8 +124,7 @@ class ParallelGroups:
param_names = ["threads"]

def setup(self, threads):
if not have_real_test_parallel:
raise NotImplementedError

size = 2**22
ngroups = 10**3
data = Series(np.random.randint(0, ngroups, size=size))
Expand All @@ -108,8 +145,7 @@ class ParallelTake1D:
param_names = ["dtype"]

def setup(self, dtype):
if not have_real_test_parallel:
raise NotImplementedError

N = 10**6
df = DataFrame({"col": np.arange(N, dtype=dtype)})
indexer = np.arange(100, len(df) - 100)
Expand All @@ -131,8 +167,7 @@ class ParallelKth:
repeat = 5

def setup(self):
if not have_real_test_parallel:
raise NotImplementedError

N = 10**7
k = 5 * 10**5
kwargs_list = [{"arr": np.random.randn(N)}, {"arr": np.random.randn(N)}]
Expand All @@ -149,8 +184,7 @@ def time_kth_smallest(self):

class ParallelDatetimeFields:
def setup(self):
if not have_real_test_parallel:
raise NotImplementedError

N = 10**6
self.dti = date_range("1900-01-01", periods=N, freq="T")
self.period = self.dti.to_period("D")
Expand Down Expand Up @@ -204,8 +238,7 @@ class ParallelRolling:
param_names = ["method"]

def setup(self, method):
if not have_real_test_parallel:
raise NotImplementedError

win = 100
arr = np.random.rand(100000)
if hasattr(DataFrame, "rolling"):
Expand Down Expand Up @@ -248,8 +281,7 @@ class ParallelReadCSV(BaseIO):
param_names = ["dtype"]

def setup(self, dtype):
if not have_real_test_parallel:
raise NotImplementedError

rows = 10000
cols = 50
data = {
Expand Down Expand Up @@ -284,8 +316,6 @@ class ParallelFactorize:
param_names = ["threads"]

def setup(self, threads):
if not have_real_test_parallel:
raise NotImplementedError

strings = tm.makeStringIndex(100000)

Expand Down
50 changes: 0 additions & 50 deletions pandas/_testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import collections
from datetime import datetime
from decimal import Decimal
from functools import wraps
import operator
import os
import re
Expand Down Expand Up @@ -749,55 +748,6 @@ def makeMissingDataframe(density=0.9, random_state=None):
return df


def test_parallel(num_threads=2, kwargs_list=None):
"""
Decorator to run the same function multiple times in parallel.

Parameters
----------
num_threads : int, optional
The number of times the function is run in parallel.
kwargs_list : list of dicts, optional
The list of kwargs to update original
function kwargs on different threads.

Notes
-----
This decorator does not pass the return value of the decorated function.

Original from scikit-image:

https://github.com/scikit-image/scikit-image/pull/1519

"""
assert num_threads > 0
has_kwargs_list = kwargs_list is not None
if has_kwargs_list:
assert len(kwargs_list) == num_threads
import threading

def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
if has_kwargs_list:
update_kwargs = lambda i: dict(kwargs, **kwargs_list[i])
else:
update_kwargs = lambda i: kwargs
threads = []
for i in range(num_threads):
updated_kwargs = update_kwargs(i)
thread = threading.Thread(target=func, args=args, kwargs=updated_kwargs)
threads.append(thread)
for thread in threads:
thread.start()
for thread in threads:
thread.join()

return inner

return wrapper


class SubclassedSeries(Series):
_metadata = ["testattr", "name"]

Expand Down