Skip to content

test: Add dataframe unit test suite #1751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
422 changes: 332 additions & 90 deletions bigframes/core/compile/polars/compiler.py

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,23 @@ def get_global_session():

def with_default_session(func: Callable[..., _T], *args, **kwargs) -> _T:
return func(get_global_session(), *args, **kwargs)


class _GlobalSessionContext:
"""
Context manager for testing that sets global session.
"""

def __init__(self, session: bigframes.session.Session):
self._session = session

def __enter__(self):
global _global_session, _global_session_lock
with _global_session_lock:
self._previous_session = _global_session
_global_session = self._session

def __exit__(self, *exc_details):
global _global_session, _global_session_lock
with _global_session_lock:
_global_session = self._previous_session
3 changes: 2 additions & 1 deletion bigframes/core/rewrite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
from bigframes.core.rewrite.slices import pull_out_limit, pull_up_limits, rewrite_slice
from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions
from bigframes.core.rewrite.windows import rewrite_range_rolling
from bigframes.core.rewrite.windows import pull_out_window_order, rewrite_range_rolling

__all__ = [
"legacy_join_as_projection",
Expand All @@ -41,4 +41,5 @@
"bake_order",
"try_reduce_to_local_scan",
"fold_row_counts",
"pull_out_window_order",
]
33 changes: 32 additions & 1 deletion bigframes/core/rewrite/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import dataclasses

from bigframes import operations as ops
from bigframes.core import nodes
from bigframes.core import guid, identifiers, nodes, ordering


def rewrite_range_rolling(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
Expand All @@ -43,3 +43,34 @@ def rewrite_range_rolling(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
node,
window_spec=dataclasses.replace(node.window_spec, ordering=(new_ordering,)),
)


def pull_out_window_order(root: nodes.BigFrameNode) -> nodes.BigFrameNode:
return root.bottom_up(rewrite_window_node)


def rewrite_window_node(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
if not isinstance(node, nodes.WindowOpNode):
return node
if len(node.window_spec.ordering) == 0:
return node
else:
offsets_id = guid.generate_guid()
w_offsets = nodes.PromoteOffsetsNode(
node.child, identifiers.ColumnId(offsets_id)
)
sorted_child = nodes.OrderByNode(w_offsets, node.window_spec.ordering)
new_window_node = dataclasses.replace(
node,
child=sorted_child,
window_spec=node.window_spec.without_order(force=True),
)
w_resetted_order = nodes.OrderByNode(
new_window_node,
by=(ordering.ascending_over(identifiers.ColumnId(offsets_id)),),
is_total_order=True,
)
w_offsets_dropped = nodes.SelectionNode(
w_resetted_order, tuple(nodes.AliasedRef.identity(id) for id in node.ids)
)
return w_offsets_dropped
12 changes: 8 additions & 4 deletions bigframes/core/window_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ def is_row_bounded(self):
This is relevant for determining whether the window requires a total order
to calculate deterministically.
"""
return isinstance(self.bounds, RowsWindowBounds)
return isinstance(self.bounds, RowsWindowBounds) and (
(self.bounds.start is not None) or (self.bounds.end is not None)
)

@property
def is_range_bounded(self):
Expand All @@ -254,7 +256,9 @@ def is_unbounded(self):
This is relevant for determining whether the window requires a total order
to calculate deterministically.
"""
return self.bounds is None
return self.bounds is None or (
self.bounds.start is None and self.bounds.end is None
)

@property
def all_referenced_columns(self) -> Set[ids.ColumnId]:
Expand All @@ -266,9 +270,9 @@ def all_referenced_columns(self) -> Set[ids.ColumnId]:
)
return set(itertools.chain((i.id for i in self.grouping_keys), ordering_vars))

def without_order(self) -> WindowSpec:
def without_order(self, force: bool = False) -> WindowSpec:
"""Removes ordering clause if ordering isn't required to define bounds."""
if self.is_row_bounded:
if self.is_row_bounded and not force:
raise ValueError("Cannot remove order from row-bounded window")
return replace(self, ordering=())

Expand Down
2 changes: 0 additions & 2 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
return dtypes.INT_DTYPE


# TODO: Convert to NullaryWindowOp
@dataclasses.dataclass(frozen=True)
class RankOp(UnaryWindowOp):
name: ClassVar[str] = "rank"
Expand All @@ -456,7 +455,6 @@ def implicitly_inherits_order(self):
return False


# TODO: Convert to NullaryWindowOp
@dataclasses.dataclass(frozen=True)
class DenseRankOp(UnaryWindowOp):
@property
Expand Down
36 changes: 33 additions & 3 deletions bigframes/testing/polars_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@
import polars

import bigframes
import bigframes.clients
import bigframes.core.blocks
import bigframes.core.compile.polars
import bigframes.core.ordering
import bigframes.dataframe
import bigframes.session.clients
import bigframes.session.executor
import bigframes.session.metrics

Expand All @@ -35,6 +32,26 @@
class TestExecutor(bigframes.session.executor.Executor):
compiler = bigframes.core.compile.polars.PolarsCompiler()

def peek(
self,
array_value: bigframes.core.ArrayValue,
n_rows: int,
use_explicit_destination: Optional[bool] = False,
):
"""
A 'peek' efficiently accesses a small number of rows in the dataframe.
"""
lazy_frame: polars.LazyFrame = self.compiler.compile(array_value)
pa_table = lazy_frame.collect().limit(n_rows).to_arrow()
# Currently, pyarrow types might not quite be exactly the ones in the bigframes schema.
# Nullability may be different, and might use large versions of list, string datatypes.
return bigframes.session.executor.ExecuteResult(
arrow_batches=pa_table.to_batches(),
schema=array_value.schema,
total_bytes=pa_table.nbytes,
total_rows=pa_table.num_rows,
)

def execute(
self,
array_value: bigframes.core.ArrayValue,
Expand All @@ -58,6 +75,14 @@ def execute(
total_rows=pa_table.num_rows,
)

def cached(
self,
array_value: bigframes.core.ArrayValue,
*,
config,
) -> None:
return


class TestSession(bigframes.session.Session):
def __init__(self):
Expand Down Expand Up @@ -92,3 +117,8 @@ def read_pandas(self, pandas_dataframe, write_engine="default"):
pandas_dataframe = pandas_dataframe.to_frame()
local_block = bigframes.core.blocks.Block.from_local(pandas_dataframe, self)
return bigframes.dataframe.DataFrame(local_block)

@property
def bqclient(self):
# prevents logger from trying to call bq upon any errors
return None
15 changes: 6 additions & 9 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
UNIT_TEST_DEPENDENCIES: List[str] = []
UNIT_TEST_EXTRAS: List[str] = ["tests"]
UNIT_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {
"3.12": ["polars", "scikit-learn"],
"3.12": ["tests", "polars", "scikit-learn"],
}

# 3.10 is needed for Windows tests as it is the only version installed in the
Expand Down Expand Up @@ -202,14 +202,11 @@ def install_unittest_dependencies(session, install_test_extra, *constraints):
if UNIT_TEST_LOCAL_DEPENDENCIES:
session.install(*UNIT_TEST_LOCAL_DEPENDENCIES, *constraints)

if install_test_extra and UNIT_TEST_EXTRAS_BY_PYTHON:
extras = UNIT_TEST_EXTRAS_BY_PYTHON.get(session.python, [])
if install_test_extra and UNIT_TEST_EXTRAS:
extras = UNIT_TEST_EXTRAS
else:
extras = []

if extras:
if install_test_extra:
if session.python in UNIT_TEST_EXTRAS_BY_PYTHON:
extras = UNIT_TEST_EXTRAS_BY_PYTHON[session.python]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, this branch should be like
extras = [**UNIT_TEST_EXTRAS_BY_PYTHON[session.python], **UNIT_TEST_EXTRAS].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh, UNIT_TEST_EXTRAS_BY_PYTHON should just be full override like for the system verison, so added 'tests' to it in new revision now

else:
extras = UNIT_TEST_EXTRAS
session.install("-e", f".[{','.join(extras)}]", *constraints)
else:
session.install("-e", ".", *constraints)
Expand Down
Loading