Skip to content

PERF: Sparse ops speedup #13082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion asv_bench/benchmarks/sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def time_sparse_series_to_coo(self):
self.ss.to_coo(row_levels=[0, 1], column_levels=[2, 3], sort_labels=True)


class sparse_arithmetic(object):
class sparse_arithmetic_int(object):
goal_time = 0.2

def setup(self):
Expand All @@ -75,6 +75,12 @@ def make_sparse_array(self, length, dense_size, fill_value):
arr[indexer] = np.random.randint(0, 100, len(indexer))
return pd.SparseArray(arr, fill_value=fill_value)

def time_sparse_make_union(self):
self.a_10percent.sp_index.make_union(self.b_10percent.sp_index)

def time_sparse_intersect(self):
self.a_10percent.sp_index.intersect(self.b_10percent.sp_index)

def time_sparse_addition_10percent(self):
self.a_10percent + self.b_10percent

Expand All @@ -92,3 +98,45 @@ def time_sparse_division_10percent_zero(self):

def time_sparse_division_1percent(self):
self.a_1percent / self.b_1percent



class sparse_arithmetic_block(object):
goal_time = 0.2

def setup(self):
np.random.seed(1)
self.a = self.make_sparse_array(length=1000000, num_blocks=1000,
block_size=10, fill_value=np.nan)
self.b = self.make_sparse_array(length=1000000, num_blocks=1000,
block_size=10, fill_value=np.nan)

self.a_zero = self.make_sparse_array(length=1000000, num_blocks=1000,
block_size=10, fill_value=0)
self.b_zero = self.make_sparse_array(length=1000000, num_blocks=1000,
block_size=10, fill_value=np.nan)

def make_sparse_array(self, length, num_blocks, block_size, fill_value):
a = np.array([fill_value] * length)
for block in range(num_blocks):
i = np.random.randint(0, length)
a[i:i + block_size] = np.random.randint(0, 100, len(a[i:i + block_size]))
return pd.SparseArray(a, fill_value=fill_value)

def time_sparse_make_union(self):
self.a.sp_index.make_union(self.b.sp_index)

def time_sparse_intersect(self):
self.a.sp_index.intersect(self.b.sp_index)

def time_sparse_addition(self):
self.a + self.b

def time_sparse_addition_zero(self):
self.a_zero + self.b_zero

def time_sparse_division(self):
self.a / self.b

def time_sparse_division_zero(self):
self.a_zero / self.b_zero
3 changes: 2 additions & 1 deletion doc/source/whatsnew/v0.18.2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Deprecations
Performance Improvements
~~~~~~~~~~~~~~~~~~~~~~~~


- Improved performance of sparse ``IntIndex.intersect`` (:issue:`13082`)
- Improved performance of sparse arithmetic with ``BlockIndex`` when the number of blocks are large, though recommended to use ``IntIndex`` in such case (:issue:`13082`)



Expand Down
88 changes: 62 additions & 26 deletions pandas/sparse/tests/test_libsparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,57 @@ def test_intindex_make_union(self):
a.make_union(b)


class TestSparseIndexIntersect(tm.TestCase):

def test_intersect(self):
def _check_correct(a, b, expected):
result = a.intersect(b)
assert (result.equals(expected))

def _check_length_exc(a, longer):
nose.tools.assert_raises(Exception, a.intersect, longer)

def _check_case(xloc, xlen, yloc, ylen, eloc, elen):
xindex = BlockIndex(TEST_LENGTH, xloc, xlen)
yindex = BlockIndex(TEST_LENGTH, yloc, ylen)
expected = BlockIndex(TEST_LENGTH, eloc, elen)
longer_index = BlockIndex(TEST_LENGTH + 1, yloc, ylen)

_check_correct(xindex, yindex, expected)
_check_correct(xindex.to_int_index(), yindex.to_int_index(),
expected.to_int_index())

_check_length_exc(xindex, longer_index)
_check_length_exc(xindex.to_int_index(),
longer_index.to_int_index())

if compat.is_platform_windows():
raise nose.SkipTest("segfaults on win-64 when all tests are run")
check_cases(_check_case)

def test_intersect_empty(self):
xindex = IntIndex(4, np.array([], dtype=np.int32))
yindex = IntIndex(4, np.array([2, 3], dtype=np.int32))
self.assertTrue(xindex.intersect(yindex).equals(xindex))
self.assertTrue(yindex.intersect(xindex).equals(xindex))

xindex = xindex.to_block_index()
yindex = yindex.to_block_index()
self.assertTrue(xindex.intersect(yindex).equals(xindex))
self.assertTrue(yindex.intersect(xindex).equals(xindex))

def test_intersect_identical(self):
cases = [IntIndex(5, np.array([1, 2], dtype=np.int32)),
IntIndex(5, np.array([0, 2, 4], dtype=np.int32)),
IntIndex(0, np.array([], dtype=np.int32)),
IntIndex(5, np.array([], dtype=np.int32))]

for case in cases:
self.assertTrue(case.intersect(case).equals(case))
case = case.to_block_index()
self.assertTrue(case.intersect(case).equals(case))


class TestSparseIndexCommon(tm.TestCase):

_multiprocess_can_split_ = True
Expand Down Expand Up @@ -281,32 +332,6 @@ def _check(index):
# corner cases


def test_intersect():
def _check_correct(a, b, expected):
result = a.intersect(b)
assert (result.equals(expected))

def _check_length_exc(a, longer):
nose.tools.assert_raises(Exception, a.intersect, longer)

def _check_case(xloc, xlen, yloc, ylen, eloc, elen):
xindex = BlockIndex(TEST_LENGTH, xloc, xlen)
yindex = BlockIndex(TEST_LENGTH, yloc, ylen)
expected = BlockIndex(TEST_LENGTH, eloc, elen)
longer_index = BlockIndex(TEST_LENGTH + 1, yloc, ylen)

_check_correct(xindex, yindex, expected)
_check_correct(xindex.to_int_index(), yindex.to_int_index(),
expected.to_int_index())

_check_length_exc(xindex, longer_index)
_check_length_exc(xindex.to_int_index(), longer_index.to_int_index())

if compat.is_platform_windows():
raise nose.SkipTest("segfaults on win-64 when all tests are run")
check_cases(_check_case)


class TestBlockIndex(tm.TestCase):

_multiprocess_can_split_ = True
Expand Down Expand Up @@ -345,6 +370,16 @@ def test_block_internal(self):
tm.assert_numpy_array_equal(idx.blengths,
np.array([1, 2], dtype=np.int32))

def test_make_block_boundary(self):
for i in [5, 10, 100, 101]:
idx = _make_index(i, np.arange(0, i, 2, dtype=np.int32),
kind='block')

exp = np.arange(0, i, 2, dtype=np.int32)
tm.assert_numpy_array_equal(idx.blocs, exp)
tm.assert_numpy_array_equal(idx.blengths,
np.ones(len(exp), dtype=np.int32))

def test_equals(self):
index = BlockIndex(10, [0, 4], [2, 5])

Expand Down Expand Up @@ -413,6 +448,7 @@ def test_equals(self):
self.assertFalse(index.equals(IntIndex(10, [0, 1, 2, 3])))

def test_to_block_index(self):

def _check_case(xloc, xlen, yloc, ylen, eloc, elen):
xindex = BlockIndex(TEST_LENGTH, xloc, xlen)
yindex = BlockIndex(TEST_LENGTH, yloc, ylen)
Expand Down
75 changes: 49 additions & 26 deletions pandas/src/sparse.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ cdef class IntIndex(SparseIndex):

cpdef IntIndex intersect(self, SparseIndex y_):
cdef:
Py_ssize_t out_length, xi, yi = 0
Py_ssize_t out_length, xi, yi = 0, result_indexer = 0
int32_t xind
ndarray[int32_t, ndim=1] xindices, yindices
list new_list = []
ndarray[int32_t, ndim=1] xindices, yindices, new_indices
IntIndex y

# if is one already, returns self
Expand All @@ -112,6 +111,7 @@ cdef class IntIndex(SparseIndex):

xindices = self.indices
yindices = y.indices
new_indices = np.empty(min(len(xindices), len(yindices)), dtype=np.int32)

for xi from 0 <= xi < self.npoints:
xind = xindices[xi]
Expand All @@ -124,9 +124,11 @@ cdef class IntIndex(SparseIndex):

# TODO: would a two-pass algorithm be faster?
if yindices[yi] == xind:
new_list.append(xind)
new_indices[result_indexer] = xind
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a test where x is y (the input is the same)

result_indexer += 1

return IntIndex(self.length, new_list)
new_indices = new_indices[:result_indexer]
return IntIndex(self.length, new_indices)

cpdef IntIndex make_union(self, SparseIndex y_):

Expand Down Expand Up @@ -238,34 +240,42 @@ cdef class IntIndex(SparseIndex):

cpdef get_blocks(ndarray[int32_t, ndim=1] indices):
cdef:
Py_ssize_t i, npoints
Py_ssize_t init_len, i, npoints, result_indexer = 0
int32_t block, length = 1, cur, prev
list locs = [], lens = []
ndarray[int32_t, ndim=1] locs, lens

npoints = len(indices)

# just handle the special empty case separately
if npoints == 0:
return [], []
return np.array([], dtype=np.int32), np.array([], dtype=np.int32)

# block size can't be longer than npoints
locs = np.empty(npoints, dtype=np.int32)
lens = np.empty(npoints, dtype=np.int32)

# TODO: two-pass algorithm faster?
prev = block = indices[0]
for i from 1 <= i < npoints:
cur = indices[i]
if cur - prev > 1:
# new block
locs.append(block)
lens.append(length)
locs[result_indexer] = block
lens[result_indexer] = length
block = cur
length = 1
result_indexer += 1
else:
# same block, increment length
length += 1

prev = cur

locs.append(block)
lens.append(length)
locs[result_indexer] = block
lens[result_indexer] = length
result_indexer += 1
locs = locs[:result_indexer]
lens = lens[:result_indexer]
return locs, lens

#-------------------------------------------------------------------------------
Expand Down Expand Up @@ -398,12 +408,8 @@ cdef class BlockIndex(SparseIndex):
"""
cdef:
BlockIndex y
ndarray[int32_t, ndim=1] xloc, xlen, yloc, ylen

list out_blocs = []
list out_blengths = []

Py_ssize_t xi = 0, yi = 0
ndarray[int32_t, ndim=1] xloc, xlen, yloc, ylen, out_bloc, out_blen
Py_ssize_t xi = 0, yi = 0, max_len, result_indexer = 0
int32_t cur_loc, cur_length, diff

y = other.to_block_index()
Expand All @@ -416,6 +422,11 @@ cdef class BlockIndex(SparseIndex):
yloc = y.blocs
ylen = y.blengths

# block may be split, but can't exceed original len / 2 + 1
max_len = int(min(self.length, y.length) / 2) + 1
out_bloc = np.empty(max_len, dtype=np.int32)
out_blen = np.empty(max_len, dtype=np.int32)

while True:
# we are done (or possibly never began)
if xi >= self.nblocks or yi >= y.nblocks:
Expand Down Expand Up @@ -458,10 +469,14 @@ cdef class BlockIndex(SparseIndex):
cur_length = ylen[yi]
yi += 1

out_blocs.append(cur_loc)
out_blengths.append(cur_length)
out_bloc[result_indexer] = cur_loc
out_blen[result_indexer] = cur_length
result_indexer += 1

return BlockIndex(self.length, out_blocs, out_blengths)
out_bloc = out_bloc[:result_indexer]
out_blen = out_blen[:result_indexer]

return BlockIndex(self.length, out_bloc, out_blen)

cpdef BlockIndex make_union(self, SparseIndex y):
"""
Expand Down Expand Up @@ -626,15 +641,19 @@ cdef class BlockUnion(BlockMerge):

cdef _make_merged_blocks(self):
cdef:
ndarray[int32_t, ndim=1] xstart, xend, ystart, yend
ndarray[int32_t, ndim=1] xstart, xend, ystart, yend, out_bloc, out_blen
int32_t nstart, nend, diff
list out_blocs = [], out_blengths = []
Py_ssize_t max_len, result_indexer = 0

xstart = self.xstart
xend = self.xend
ystart = self.ystart
yend = self.yend

max_len = int(min(self.x.length, self.y.length) / 2) + 1
out_bloc = np.empty(max_len, dtype=np.int32)
out_blen = np.empty(max_len, dtype=np.int32)

while True:
# we are done (or possibly never began)
if self.xi >= self.x.nblocks and self.yi >= self.y.nblocks:
Expand All @@ -658,10 +677,14 @@ cdef class BlockUnion(BlockMerge):
nstart = ystart[self.yi]
nend = self._find_next_block_end(1)

out_blocs.append(nstart)
out_blengths.append(nend - nstart)
out_bloc[result_indexer] = nstart
out_blen[result_indexer] = nend - nstart
result_indexer += 1

out_bloc = out_bloc[:result_indexer]
out_blen = out_blen[:result_indexer]

return BlockIndex(self.x.length, out_blocs, out_blengths)
return BlockIndex(self.x.length, out_bloc, out_blen)

cdef int32_t _find_next_block_end(self, bint mode) except -1:
"""
Expand Down