Closed
Description
xref #15338
Calling groupby with an unaligned index on the same frame in multiple threads can result in incorrect results. The following example demonstrates:
import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool
def build_frame(N, seed):
rs = np.random.RandomState(seed)
amounts = rs.exponential(0.1, N) * 100
ids = np.random.randint(0, 500, N)
df = pd.DataFrame({'amount': amounts, 'id': ids})
# XXX: call groupby once before makes everything pass. Note that the
# pre-filter by amount is necessary, without it things still fail.
#df[df.amount < 0].groupby(df.id)
return df
def f(x):
return x[x.amount < 20].groupby(x.id).amount.count()
N = 100000
NTHREADS = 8
SEED = 100
df = build_frame(N, SEED)
pool = ThreadPool(NTHREADS)
args = [df] * NTHREADS * 2
r1 = pool.map(f, args)
r2 = pool.map(f, args)
# Print out lengths, which don't always match
print([len(i) for i in r1])
print([len(i) for i in r2])
# Check that results are equivalent
matches = [a.equals(b) for (a, b) in zip(r1, r2)]
assert all(matches)
On my machine, running python test.py
results in:
[45, 471, 500, 429, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500]
[500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500]
Traceback (most recent call last):
File "test.py", line 38, in <module>
assert all(matches)
AssertionError
A few notes:
- Calling groupby once beforehand fixes everything. (see comment in
build_frame
). To me this indicates that the groupby call sets up some state (that's cached) that may not be threadsafe. Also, the second call topool.map
always returns correct results, only the first calls fail. - Doing this without threading results in the same answer every time.
- Without filtering by amount, the result is also always correct.
Tested with pandas master, as well as pandas 0.19.0 and up.