Skip to content

Commit f04ec39

Browse files
Merge remote-tracking branch 'github/main' into geo_window_part
2 parents 9401370 + 578081e commit f04ec39

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1697
-457
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ repos:
3838
rev: v1.10.0
3939
hooks:
4040
- id: mypy
41-
additional_dependencies: [types-requests, types-tabulate, pandas-stubs]
41+
additional_dependencies: [types-requests, types-tabulate, pandas-stubs<=2.2.3.241126]
4242
exclude: "^third_party"
4343
args: ["--check-untyped-defs", "--explicit-package-bases", "--ignore-missing-imports"]

bigframes/bigquery/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
unix_millis,
2828
unix_seconds,
2929
)
30-
from bigframes.bigquery._operations.geo import st_area
30+
from bigframes.bigquery._operations.geo import st_area, st_difference
3131
from bigframes.bigquery._operations.json import (
3232
json_extract,
3333
json_extract_array,
@@ -48,6 +48,7 @@
4848
"array_to_string",
4949
# geo ops
5050
"st_area",
51+
"st_difference",
5152
# json ops
5253
"json_set",
5354
"json_extract",

bigframes/bigquery/_operations/geo.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
from bigframes import operations as ops
18+
import bigframes.dtypes
1819
import bigframes.geopandas
1920
import bigframes.series
2021

@@ -91,3 +92,122 @@ def st_area(series: bigframes.series.Series) -> bigframes.series.Series:
9192
series = series._apply_unary_op(ops.geo_area_op)
9293
series.name = None
9394
return series
95+
96+
97+
def st_difference(
98+
series: bigframes.series.Series, other: bigframes.series.Series
99+
) -> bigframes.series.Series:
100+
"""
101+
Returns a GEOGRAPHY that represents the point set difference of
102+
`geography_1` and `geography_2`. Therefore, the result consists of the part
103+
of `geography_1` that doesn't intersect with `geography_2`.
104+
105+
If `geometry_1` is completely contained in `geometry_2`, then ST_DIFFERENCE
106+
returns an empty GEOGRAPHY.
107+
108+
..note::
109+
BigQuery's Geography functions, like `st_difference`, interpret the geometry
110+
data type as a point set on the Earth's surface. A point set is a set
111+
of points, lines, and polygons on the WGS84 reference spheroid, with
112+
geodesic edges. See: https://cloud.google.com/bigquery/docs/geospatial-data
113+
114+
**Examples:**
115+
116+
>>> import bigframes as bpd
117+
>>> import bigframes.bigquery as bbq
118+
>>> import bigframes.geopandas
119+
>>> from shapely.geometry import Polygon, LineString, Point
120+
>>> bpd.options.display.progress_bar = None
121+
122+
We can check two GeoSeries against each other, row by row.
123+
124+
>>> s1 = bigframes.geopandas.GeoSeries(
125+
... [
126+
... Polygon([(0, 0), (2, 2), (0, 2)]),
127+
... Polygon([(0, 0), (2, 2), (0, 2)]),
128+
... LineString([(0, 0), (2, 2)]),
129+
... LineString([(2, 0), (0, 2)]),
130+
... Point(0, 1),
131+
... ],
132+
... )
133+
>>> s2 = bigframes.geopandas.GeoSeries(
134+
... [
135+
... Polygon([(0, 0), (1, 1), (0, 1)]),
136+
... LineString([(1, 0), (1, 3)]),
137+
... LineString([(2, 0), (0, 2)]),
138+
... Point(1, 1),
139+
... Point(0, 1),
140+
... ],
141+
... index=range(1, 6),
142+
... )
143+
144+
>>> s1
145+
0 POLYGON ((0 0, 2 2, 0 2, 0 0))
146+
1 POLYGON ((0 0, 2 2, 0 2, 0 0))
147+
2 LINESTRING (0 0, 2 2)
148+
3 LINESTRING (2 0, 0 2)
149+
4 POINT (0 1)
150+
dtype: geometry
151+
152+
>>> s2
153+
1 POLYGON ((0 0, 1 1, 0 1, 0 0))
154+
2 LINESTRING (1 0, 1 3)
155+
3 LINESTRING (2 0, 0 2)
156+
4 POINT (1 1)
157+
5 POINT (0 1)
158+
dtype: geometry
159+
160+
>>> bbq.st_difference(s1, s2)
161+
0 None
162+
1 POLYGON ((0.99954 1, 2 2, 0 2, 0 1, 0.99954 1))
163+
2 LINESTRING (0 0, 1 1.00046, 2 2)
164+
3 GEOMETRYCOLLECTION EMPTY
165+
4 POINT (0 1)
166+
5 None
167+
dtype: geometry
168+
169+
We can also check difference of single shapely geometries:
170+
171+
>>> sbq1 = bigframes.geopandas.GeoSeries(
172+
... [
173+
... Polygon([(0, 0), (10, 0), (10, 10), (0, 0)])
174+
... ]
175+
... )
176+
>>> sbq2 = bigframes.geopandas.GeoSeries(
177+
... [
178+
... Polygon([(4, 2), (6, 2), (8, 6), (4, 2)])
179+
... ]
180+
... )
181+
182+
>>> sbq1
183+
0 POLYGON ((0 0, 10 0, 10 10, 0 0))
184+
dtype: geometry
185+
186+
>>> sbq2
187+
0 POLYGON ((4 2, 6 2, 8 6, 4 2))
188+
dtype: geometry
189+
190+
>>> bbq.st_difference(sbq1, sbq2)
191+
0 POLYGON ((0 0, 10 0, 10 10, 0 0), (8 6, 6 2, 4...
192+
dtype: geometry
193+
194+
Additionally, we can check difference of a GeoSeries against a single shapely geometry:
195+
196+
>>> bbq.st_difference(s1, sbq2)
197+
0 POLYGON ((0 0, 2 2, 0 2, 0 0))
198+
1 None
199+
2 None
200+
3 None
201+
4 None
202+
dtype: geometry
203+
204+
Args:
205+
other (bigframes.series.Series or geometric object):
206+
The GeoSeries (elementwise) or geometric object to find the difference to.
207+
208+
Returns:
209+
bigframes.series.Series:
210+
A GeoSeries of the points in each aligned geometry that are not
211+
in other.
212+
"""
213+
return series._apply_binary_op(other, ops.geo_st_difference_op)

bigframes/blob/_functions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ def _output_bq_type(self):
6868

6969
def _create_udf(self):
7070
"""Create Python UDF in BQ. Return name of the UDF."""
71-
udf_name = str(self._session._loader._storage_manager._random_table())
71+
udf_name = str(
72+
self._session._loader._storage_manager.generate_unique_resource_id()
73+
)
7274

7375
func_body = inspect.getsource(self._func)
7476
func_name = self._func.__name__

bigframes/clients.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,24 @@ def create_bq_connection(
9494
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#grant_permission_on_function
9595
self._ensure_iam_binding(project_id, service_account_id, iam_role)
9696

97-
# Introduce retries to accommodate transient errors like etag mismatch,
98-
# which can be caused by concurrent operation on the same resource, and
99-
# manifests with message like:
100-
# google.api_core.exceptions.Aborted: 409 There were concurrent policy
101-
# changes. Please retry the whole read-modify-write with exponential
102-
# backoff. The request's ETag '\007\006\003,\264\304\337\272' did not match
103-
# the current policy's ETag '\007\006\003,\3750&\363'.
97+
# Introduce retries to accommodate transient errors like:
98+
# (1) Etag mismatch,
99+
# which can be caused by concurrent operation on the same resource, and
100+
# manifests with message like:
101+
# google.api_core.exceptions.Aborted: 409 There were concurrent policy
102+
# changes. Please retry the whole read-modify-write with exponential
103+
# backoff. The request's ETag '\007\006\003,\264\304\337\272' did not
104+
# match the current policy's ETag '\007\006\003,\3750&\363'.
105+
# (2) Connection creation,
106+
# for which sometimes it takes a bit for its service account to reflect
107+
# across APIs (e.g. b/397662004, b/386838767), before which, an attempt
108+
# to set an IAM policy for the service account may throw an error like:
109+
# google.api_core.exceptions.InvalidArgument: 400 Service account
110+
# bqcx-*@gcp-sa-bigquery-condel.iam.gserviceaccount.com does not exist.
104111
@google.api_core.retry.Retry(
105112
predicate=google.api_core.retry.if_exception_type(
106-
google.api_core.exceptions.Aborted
113+
google.api_core.exceptions.Aborted,
114+
google.api_core.exceptions.InvalidArgument,
107115
),
108116
initial=10,
109117
maximum=20,

bigframes/core/blocks.py

Lines changed: 101 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from __future__ import annotations
2323

2424
import ast
25+
import copy
2526
import dataclasses
2627
import datetime
2728
import functools
@@ -30,6 +31,7 @@
3031
import textwrap
3132
import typing
3233
from typing import (
34+
Any,
3335
Iterable,
3436
List,
3537
Literal,
@@ -49,7 +51,7 @@
4951
import pyarrow as pa
5052

5153
from bigframes import session
52-
import bigframes._config.sampling_options as sampling_options
54+
from bigframes._config import sampling_options
5355
import bigframes.constants
5456
import bigframes.core as core
5557
import bigframes.core.compile.googlesql as googlesql
@@ -535,19 +537,9 @@ def to_pandas(
535537
Returns:
536538
pandas.DataFrame, QueryJob
537539
"""
538-
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
539-
raise NotImplementedError(
540-
f"The downsampling method {sampling_method} is not implemented, "
541-
f"please choose from {','.join(_SAMPLING_METHODS)}."
542-
)
543-
544-
sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
545-
if sampling_method is not None:
546-
sampling = sampling.with_method(sampling_method).with_random_state( # type: ignore
547-
random_state
548-
)
549-
else:
550-
sampling = sampling.with_disabled()
540+
sampling = self._get_sampling_option(
541+
max_download_size, sampling_method, random_state
542+
)
551543

552544
df, query_job = self._materialize_local(
553545
materialize_options=MaterializationOptions(
@@ -559,6 +551,27 @@ def to_pandas(
559551
df.set_axis(self.column_labels, axis=1, copy=False)
560552
return df, query_job
561553

554+
def _get_sampling_option(
555+
self,
556+
max_download_size: Optional[int] = None,
557+
sampling_method: Optional[str] = None,
558+
random_state: Optional[int] = None,
559+
) -> sampling_options.SamplingOptions:
560+
561+
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
562+
raise NotImplementedError(
563+
f"The downsampling method {sampling_method} is not implemented, "
564+
f"please choose from {','.join(_SAMPLING_METHODS)}."
565+
)
566+
567+
sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
568+
if sampling_method is None:
569+
return sampling.with_disabled()
570+
571+
return sampling.with_method(sampling_method).with_random_state( # type: ignore
572+
random_state
573+
)
574+
562575
def try_peek(
563576
self, n: int = 20, force: bool = False, allow_large_results=None
564577
) -> typing.Optional[pd.DataFrame]:
@@ -798,11 +811,73 @@ def split(
798811
return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks]
799812

800813
def _compute_dry_run(
801-
self, value_keys: Optional[Iterable[str]] = None
802-
) -> bigquery.QueryJob:
814+
self,
815+
value_keys: Optional[Iterable[str]] = None,
816+
*,
817+
ordered: bool = True,
818+
max_download_size: Optional[int] = None,
819+
sampling_method: Optional[str] = None,
820+
random_state: Optional[int] = None,
821+
) -> typing.Tuple[pd.Series, bigquery.QueryJob]:
822+
sampling = self._get_sampling_option(
823+
max_download_size, sampling_method, random_state
824+
)
825+
if sampling.enable_downsampling:
826+
raise NotImplementedError("Dry run with sampling is not supported")
827+
828+
index: List[Any] = []
829+
values: List[Any] = []
830+
831+
index.append("columnCount")
832+
values.append(len(self.value_columns))
833+
index.append("columnDtypes")
834+
values.append(
835+
{
836+
col: self.expr.get_column_type(self.resolve_label_exact_or_error(col))
837+
for col in self.column_labels
838+
}
839+
)
840+
841+
index.append("indexLevel")
842+
values.append(self.index.nlevels)
843+
index.append("indexDtypes")
844+
values.append(self.index.dtypes)
845+
803846
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
804-
query_job = self.session._executor.dry_run(expr)
805-
return query_job
847+
query_job = self.session._executor.dry_run(expr, ordered)
848+
job_api_repr = copy.deepcopy(query_job._properties)
849+
850+
job_ref = job_api_repr["jobReference"]
851+
for key, val in job_ref.items():
852+
index.append(key)
853+
values.append(val)
854+
855+
index.append("jobType")
856+
values.append(job_api_repr["configuration"]["jobType"])
857+
858+
query_config = job_api_repr["configuration"]["query"]
859+
for key in ("destinationTable", "useLegacySql"):
860+
index.append(key)
861+
values.append(query_config.get(key))
862+
863+
query_stats = job_api_repr["statistics"]["query"]
864+
for key in (
865+
"referencedTables",
866+
"totalBytesProcessed",
867+
"cacheHit",
868+
"statementType",
869+
):
870+
index.append(key)
871+
values.append(query_stats.get(key))
872+
873+
index.append("creationTime")
874+
values.append(
875+
pd.Timestamp(
876+
job_api_repr["statistics"]["creationTime"], unit="ms", tz="UTC"
877+
)
878+
)
879+
880+
return pd.Series(values, index=index), query_job
806881

807882
def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
808883
expr = self._expr
@@ -2703,11 +2778,18 @@ def to_pandas(
27032778
"Cannot materialize index, as this object does not have an index. Set index column(s) using set_index."
27042779
)
27052780
ordered = ordered if ordered is not None else True
2781+
27062782
df, query_job = self._block.select_columns([]).to_pandas(
2707-
ordered=ordered, allow_large_results=allow_large_results
2783+
ordered=ordered,
2784+
allow_large_results=allow_large_results,
27082785
)
27092786
return df.index, query_job
27102787

2788+
def _compute_dry_run(
2789+
self, *, ordered: bool = True
2790+
) -> Tuple[pd.Series, bigquery.QueryJob]:
2791+
return self._block.select_columns([])._compute_dry_run(ordered=ordered)
2792+
27112793
def resolve_level(self, level: LevelsType) -> typing.Sequence[str]:
27122794
if utils.is_list_like(level):
27132795
levels = list(level)

0 commit comments

Comments
 (0)