Skip to content

Commit f08b5b6

Browse files
authored
Merge branch 'main' into local_series_index
2 parents 7a5a559 + e9fe815 commit f08b5b6

File tree

24 files changed

+995
-204
lines changed

24 files changed

+995
-204
lines changed

bigframes/bigquery/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
unix_millis,
2828
unix_seconds,
2929
)
30-
from bigframes.bigquery._operations.geo import st_area
30+
from bigframes.bigquery._operations.geo import st_area, st_difference
3131
from bigframes.bigquery._operations.json import (
3232
json_extract,
3333
json_extract_array,
@@ -48,6 +48,7 @@
4848
"array_to_string",
4949
# geo ops
5050
"st_area",
51+
"st_difference",
5152
# json ops
5253
"json_set",
5354
"json_extract",

bigframes/bigquery/_operations/geo.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
from bigframes import operations as ops
18+
import bigframes.dtypes
1819
import bigframes.geopandas
1920
import bigframes.series
2021

@@ -91,3 +92,122 @@ def st_area(series: bigframes.series.Series) -> bigframes.series.Series:
9192
series = series._apply_unary_op(ops.geo_area_op)
9293
series.name = None
9394
return series
95+
96+
97+
def st_difference(
98+
series: bigframes.series.Series, other: bigframes.series.Series
99+
) -> bigframes.series.Series:
100+
"""
101+
Returns a GEOGRAPHY that represents the point set difference of
102+
`geography_1` and `geography_2`. Therefore, the result consists of the part
103+
of `geography_1` that doesn't intersect with `geography_2`.
104+
105+
If `geometry_1` is completely contained in `geometry_2`, then ST_DIFFERENCE
106+
returns an empty GEOGRAPHY.
107+
108+
..note::
109+
BigQuery's Geography functions, like `st_difference`, interpret the geometry
110+
data type as a point set on the Earth's surface. A point set is a set
111+
of points, lines, and polygons on the WGS84 reference spheroid, with
112+
geodesic edges. See: https://cloud.google.com/bigquery/docs/geospatial-data
113+
114+
**Examples:**
115+
116+
>>> import bigframes as bpd
117+
>>> import bigframes.bigquery as bbq
118+
>>> import bigframes.geopandas
119+
>>> from shapely.geometry import Polygon, LineString, Point
120+
>>> bpd.options.display.progress_bar = None
121+
122+
We can check two GeoSeries against each other, row by row.
123+
124+
>>> s1 = bigframes.geopandas.GeoSeries(
125+
... [
126+
... Polygon([(0, 0), (2, 2), (0, 2)]),
127+
... Polygon([(0, 0), (2, 2), (0, 2)]),
128+
... LineString([(0, 0), (2, 2)]),
129+
... LineString([(2, 0), (0, 2)]),
130+
... Point(0, 1),
131+
... ],
132+
... )
133+
>>> s2 = bigframes.geopandas.GeoSeries(
134+
... [
135+
... Polygon([(0, 0), (1, 1), (0, 1)]),
136+
... LineString([(1, 0), (1, 3)]),
137+
... LineString([(2, 0), (0, 2)]),
138+
... Point(1, 1),
139+
... Point(0, 1),
140+
... ],
141+
... index=range(1, 6),
142+
... )
143+
144+
>>> s1
145+
0 POLYGON ((0 0, 2 2, 0 2, 0 0))
146+
1 POLYGON ((0 0, 2 2, 0 2, 0 0))
147+
2 LINESTRING (0 0, 2 2)
148+
3 LINESTRING (2 0, 0 2)
149+
4 POINT (0 1)
150+
dtype: geometry
151+
152+
>>> s2
153+
1 POLYGON ((0 0, 1 1, 0 1, 0 0))
154+
2 LINESTRING (1 0, 1 3)
155+
3 LINESTRING (2 0, 0 2)
156+
4 POINT (1 1)
157+
5 POINT (0 1)
158+
dtype: geometry
159+
160+
>>> bbq.st_difference(s1, s2)
161+
0 None
162+
1 POLYGON ((0.99954 1, 2 2, 0 2, 0 1, 0.99954 1))
163+
2 LINESTRING (0 0, 1 1.00046, 2 2)
164+
3 GEOMETRYCOLLECTION EMPTY
165+
4 POINT (0 1)
166+
5 None
167+
dtype: geometry
168+
169+
We can also check difference of single shapely geometries:
170+
171+
>>> sbq1 = bigframes.geopandas.GeoSeries(
172+
... [
173+
... Polygon([(0, 0), (10, 0), (10, 10), (0, 0)])
174+
... ]
175+
... )
176+
>>> sbq2 = bigframes.geopandas.GeoSeries(
177+
... [
178+
... Polygon([(4, 2), (6, 2), (8, 6), (4, 2)])
179+
... ]
180+
... )
181+
182+
>>> sbq1
183+
0 POLYGON ((0 0, 10 0, 10 10, 0 0))
184+
dtype: geometry
185+
186+
>>> sbq2
187+
0 POLYGON ((4 2, 6 2, 8 6, 4 2))
188+
dtype: geometry
189+
190+
>>> bbq.st_difference(sbq1, sbq2)
191+
0 POLYGON ((0 0, 10 0, 10 10, 0 0), (8 6, 6 2, 4...
192+
dtype: geometry
193+
194+
Additionally, we can check difference of a GeoSeries against a single shapely geometry:
195+
196+
>>> bbq.st_difference(s1, sbq2)
197+
0 POLYGON ((0 0, 2 2, 0 2, 0 0))
198+
1 None
199+
2 None
200+
3 None
201+
4 None
202+
dtype: geometry
203+
204+
Args:
205+
other (bigframes.series.Series or geometric object):
206+
The GeoSeries (elementwise) or geometric object to find the difference to.
207+
208+
Returns:
209+
bigframes.series.Series:
210+
A GeoSeries of the points in each aligned geometry that are not
211+
in other.
212+
"""
213+
return series._apply_binary_op(other, ops.geo_st_difference_op)

bigframes/blob/_functions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ def _output_bq_type(self):
6868

6969
def _create_udf(self):
7070
"""Create Python UDF in BQ. Return name of the UDF."""
71-
udf_name = str(self._session._loader._storage_manager._random_table())
71+
udf_name = str(
72+
self._session._loader._storage_manager.generate_unique_resource_id()
73+
)
7274

7375
func_body = inspect.getsource(self._func)
7476
func_name = self._func.__name__

bigframes/clients.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,24 @@ def create_bq_connection(
9494
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#grant_permission_on_function
9595
self._ensure_iam_binding(project_id, service_account_id, iam_role)
9696

97-
# Introduce retries to accommodate transient errors like etag mismatch,
98-
# which can be caused by concurrent operation on the same resource, and
99-
# manifests with message like:
100-
# google.api_core.exceptions.Aborted: 409 There were concurrent policy
101-
# changes. Please retry the whole read-modify-write with exponential
102-
# backoff. The request's ETag '\007\006\003,\264\304\337\272' did not match
103-
# the current policy's ETag '\007\006\003,\3750&\363'.
97+
# Introduce retries to accommodate transient errors like:
98+
# (1) Etag mismatch,
99+
# which can be caused by concurrent operation on the same resource, and
100+
# manifests with message like:
101+
# google.api_core.exceptions.Aborted: 409 There were concurrent policy
102+
# changes. Please retry the whole read-modify-write with exponential
103+
# backoff. The request's ETag '\007\006\003,\264\304\337\272' did not
104+
# match the current policy's ETag '\007\006\003,\3750&\363'.
105+
# (2) Connection creation,
106+
# for which sometimes it takes a bit for its service account to reflect
107+
# across APIs (e.g. b/397662004, b/386838767), before which, an attempt
108+
# to set an IAM policy for the service account may throw an error like:
109+
# google.api_core.exceptions.InvalidArgument: 400 Service account
110+
# bqcx-*@gcp-sa-bigquery-condel.iam.gserviceaccount.com does not exist.
104111
@google.api_core.retry.Retry(
105112
predicate=google.api_core.retry.if_exception_type(
106-
google.api_core.exceptions.Aborted
113+
google.api_core.exceptions.Aborted,
114+
google.api_core.exceptions.InvalidArgument,
107115
),
108116
initial=10,
109117
maximum=20,

bigframes/core/compile/scalar_op_compiler.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,11 +1001,6 @@ def normalize_op_impl(x: ibis_types.Value):
10011001

10021002

10031003
# Geo Ops
1004-
@scalar_op_compiler.register_unary_op(ops.geo_st_boundary_op, pass_op=False)
1005-
def geo_st_boundary_op_impl(x: ibis_types.Value):
1006-
return st_boundary(x)
1007-
1008-
10091004
@scalar_op_compiler.register_unary_op(ops.geo_area_op)
10101005
def geo_area_op_impl(x: ibis_types.Value):
10111006
return typing.cast(ibis_types.GeoSpatialValue, x).area()
@@ -1016,6 +1011,18 @@ def geo_st_astext_op_impl(x: ibis_types.Value):
10161011
return typing.cast(ibis_types.GeoSpatialValue, x).as_text()
10171012

10181013

1014+
@scalar_op_compiler.register_unary_op(ops.geo_st_boundary_op, pass_op=False)
1015+
def geo_st_boundary_op_impl(x: ibis_types.Value):
1016+
return st_boundary(x)
1017+
1018+
1019+
@scalar_op_compiler.register_binary_op(ops.geo_st_difference_op, pass_op=False)
1020+
def geo_st_difference_op_impl(x: ibis_types.Value, y: ibis_types.Value):
1021+
return typing.cast(ibis_types.GeoSpatialValue, x).difference(
1022+
typing.cast(ibis_types.GeoSpatialValue, y)
1023+
)
1024+
1025+
10191026
@scalar_op_compiler.register_unary_op(ops.geo_st_geogfromtext_op)
10201027
def geo_st_geogfromtext_op_impl(x: ibis_types.Value):
10211028
# Ibis doesn't seem to provide a dedicated method to cast from string to geography,

bigframes/core/utils.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
from typing import Hashable, Iterable, List
1919
import warnings
2020

21+
import bigframes_vendored.constants as constants
2122
import bigframes_vendored.pandas.io.common as vendored_pandas_io_common
2223
import numpy as np
2324
import pandas as pd
2425
import pandas.api.types as pdtypes
26+
import pyarrow as pa
2527
import typing_extensions
2628

2729
import bigframes.dtypes as dtypes
@@ -243,6 +245,22 @@ def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]:
243245
return updated_columns
244246

245247

248+
def _search_for_nested_json_type(arrow_type: pa.DataType) -> bool:
249+
"""
250+
Searches recursively for JSON array type within a PyArrow DataType.
251+
"""
252+
if arrow_type == dtypes.JSON_ARROW_TYPE:
253+
return True
254+
if pa.types.is_list(arrow_type):
255+
return _search_for_nested_json_type(arrow_type.value_type)
256+
if pa.types.is_struct(arrow_type):
257+
for i in range(arrow_type.num_fields):
258+
if _search_for_nested_json_type(arrow_type.field(i).type):
259+
return True
260+
return False
261+
return False
262+
263+
246264
def replace_json_with_string(dataframe: pd.DataFrame) -> List[str]:
247265
"""
248266
Due to a BigQuery IO limitation with loading JSON from Parquet files (b/374784249),
@@ -253,12 +271,27 @@ def replace_json_with_string(dataframe: pd.DataFrame) -> List[str]:
253271
updated_columns = []
254272

255273
for col in dataframe.columns:
256-
if dataframe[col].dtype == dtypes.JSON_DTYPE:
274+
column_type = dataframe[col].dtype
275+
if column_type == dtypes.JSON_DTYPE:
257276
dataframe[col] = dataframe[col].astype(dtypes.STRING_DTYPE)
258277
updated_columns.append(col)
278+
elif isinstance(column_type, pd.ArrowDtype) and _search_for_nested_json_type(
279+
column_type.pyarrow_dtype
280+
):
281+
raise NotImplementedError(
282+
f"Nested JSON types, found in column `{col}`: `{column_type}`', "
283+
f"are currently unsupported for upload. {constants.FEEDBACK_LINK}"
284+
)
259285

260286
if dataframe.index.dtype == dtypes.JSON_DTYPE:
261287
dataframe.index = dataframe.index.astype(dtypes.STRING_DTYPE)
262288
updated_columns.append(dataframe.index.name)
289+
elif isinstance(
290+
dataframe.index.dtype, pd.ArrowDtype
291+
) and _search_for_nested_json_type(dataframe.index.dtype.pyarrow_dtype):
292+
raise NotImplementedError(
293+
f"Nested JSON types, found in the index: `{dataframe.index.dtype}`', "
294+
f"are currently unsupported for upload. {constants.FEEDBACK_LINK}"
295+
)
263296

264297
return updated_columns

bigframes/dataframe.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3760,10 +3760,9 @@ def to_gbq(
37603760
)
37613761
if_exists = "replace"
37623762

3763-
temp_table_ref = self._session._temp_storage_manager._random_table(
3764-
# The client code owns this table reference now, so skip_cleanup=True
3765-
# to not clean it up when we close the session.
3766-
skip_cleanup=True,
3763+
# The client code owns this table reference now
3764+
temp_table_ref = (
3765+
self._session._temp_storage_manager.generate_unique_resource_id()
37673766
)
37683767
destination_table = f"{temp_table_ref.project}.{temp_table_ref.dataset_id}.{temp_table_ref.table_id}"
37693768

bigframes/geopandas/geoseries.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def area(self, crs=None) -> bigframes.series.Series: # type: ignore
6262
6363
Raises:
6464
NotImplementedError:
65-
GeoSeries.area is not supported. Use bigframes.bigquery.st_area(series), insetead.
65+
GeoSeries.area is not supported. Use bigframes.bigquery.st_area(series), instead.
6666
"""
6767
raise NotImplementedError(
6868
f"GeoSeries.area is not supported. Use bigframes.bigquery.st_area(series), instead. {constants.FEEDBACK_LINK}"
@@ -93,3 +93,6 @@ def to_wkt(self: GeoSeries) -> bigframes.series.Series:
9393
series = self._apply_unary_op(ops.geo_st_astext_op)
9494
series.name = None
9595
return series
96+
97+
def difference(self: GeoSeries, other: GeoSeries) -> bigframes.series.Series: # type: ignore
98+
return self._apply_binary_op(other, ops.geo_st_difference_op)

bigframes/operations/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
geo_area_op,
9191
geo_st_astext_op,
9292
geo_st_boundary_op,
93+
geo_st_difference_op,
9394
geo_st_geogfromtext_op,
9495
geo_st_geogpoint_op,
9596
geo_x_op,
@@ -366,6 +367,7 @@
366367
# Geo ops
367368
"geo_area_op",
368369
"geo_st_boundary_op",
370+
"geo_st_difference_op",
369371
"geo_st_astext_op",
370372
"geo_st_geogfromtext_op",
371373
"geo_st_geogpoint_op",

bigframes/operations/blob.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -560,9 +560,9 @@ def pdf_extract(
560560
self,
561561
*,
562562
connection: Optional[str] = None,
563-
max_batching_rows: int = 8192,
564-
container_cpu: Union[float, int] = 0.33,
565-
container_memory: str = "512Mi",
563+
max_batching_rows: int = 1,
564+
container_cpu: Union[float, int] = 2,
565+
container_memory: str = "1Gi",
566566
) -> bigframes.series.Series:
567567
"""Extracts text from PDF URLs and saves the text as string.
568568
@@ -574,10 +574,10 @@ def pdf_extract(
574574
connection (str or None, default None): BQ connection used for
575575
function internet transactions, and the output blob if "dst"
576576
is str. If None, uses default connection of the session.
577-
max_batching_rows (int, default 8,192): Max number of rows per batch
577+
max_batching_rows (int, default 1): Max number of rows per batch
578578
send to cloud run to execute the function.
579-
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
580-
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
579+
container_cpu (int or float, default 2): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
580+
container_memory (str, default "1Gi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
581581
582582
Returns:
583583
bigframes.series.Series: conatins all text from a pdf file
@@ -604,11 +604,11 @@ def pdf_chunk(
604604
self,
605605
*,
606606
connection: Optional[str] = None,
607-
chunk_size: int = 1000,
607+
chunk_size: int = 2000,
608608
overlap_size: int = 200,
609-
max_batching_rows: int = 8192,
610-
container_cpu: Union[float, int] = 0.33,
611-
container_memory: str = "512Mi",
609+
max_batching_rows: int = 1,
610+
container_cpu: Union[float, int] = 2,
611+
container_memory: str = "1Gi",
612612
) -> bigframes.series.Series:
613613
"""Extracts and chunks text from PDF URLs and saves the text as
614614
arrays of strings.
@@ -620,15 +620,15 @@ def pdf_chunk(
620620
connection (str or None, default None): BQ connection used for
621621
function internet transactions, and the output blob if "dst"
622622
is str. If None, uses default connection of the session.
623-
chunk_size (int, default 1000): the desired size of each text chunk
623+
chunk_size (int, default 2000): the desired size of each text chunk
624624
(number of characters).
625625
overlap_size (int, default 200): the number of overlapping characters
626626
between consective chunks. The helps to ensure context is
627627
perserved across chunk boundaries.
628-
max_batching_rows (int, default 8,192): Max number of rows per batch
628+
max_batching_rows (int, default 1): Max number of rows per batch
629629
send to cloud run to execute the function.
630-
container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
631-
container_memory (str, default "512Mi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
630+
container_cpu (int or float, default 2): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers.
631+
container_memory (str, default "1Gi"): container memory size. String of the format <number><unit>. Possible values are from 512Mi to 32Gi.
632632
633633
Returns:
634634
bigframe.series.Series: Series of array[str], where each string is a

0 commit comments

Comments
 (0)