Skip to content
This repository was archived by the owner on Jul 16, 2024. It is now read-only.

Feat row #195

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ The **GreenplumPython** library contains 5 main modules:
type
group
order
op
pd_df
config
9 changes: 9 additions & 0 deletions doc/source/op.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Operators and Indexing
======================

.. module:: greenplumpython

.. automodule:: op
:members:
:show-inheritance:
:member-order: bysource
1 change: 1 addition & 0 deletions greenplumpython/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
from greenplumpython.func import create_column_function # type: ignore
from greenplumpython.func import create_function # type: ignore
from greenplumpython.func import aggregate_function, function
from greenplumpython.op import operator
from greenplumpython.type import type_
196 changes: 143 additions & 53 deletions greenplumpython/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,26 @@ def __init__(
parents: List["DataFrame"] = [],
db: Optional[Database] = None,
columns: Optional[Iterable[Column]] = None,
qualified_table_name: Optional[str] = None,
) -> None:
# FIXME: Add doc
# noqa
self._query = query
self._parents = parents
self._name = "cte_" + uuid4().hex
self._qualified_table_name = qualified_table_name
self._columns = columns
self._contents: Optional[Iterable[RealDictRow]] = None
if any(parents):
self._db = next(iter(parents))._db
else:
self._db = db

@property
def is_saved(self) -> bool:
"""Check whether the current dataframe is saved in database."""
return self._qualified_table_name is not None

@singledispatchmethod
def _getitem(self, _) -> "DataFrame":
raise NotImplementedError()
Expand Down Expand Up @@ -337,6 +344,7 @@ def apply(
func: Callable[["DataFrame"], "FunctionExpr"],
expand: bool = False,
column_name: Optional[str] = None,
row_id: Optional[str] = None,
) -> "DataFrame":
"""
Apply a dataframe function to the self :class:`~dataframe.DataFrame`.
Expand Down Expand Up @@ -424,7 +432,11 @@ def apply(
#
# To fix this, we need to pass the dataframe to the resulting FunctionExpr
# explicitly.
return func(self).bind(dataframe=self).apply(expand=expand, column_name=column_name)
return (
func(self)
.bind(dataframe=self)
.apply(expand=expand, column_name=column_name, row_id=row_id)
)

def assign(self, **new_columns: Callable[["DataFrame"], Any]) -> "DataFrame":
"""
Expand Down Expand Up @@ -471,6 +483,7 @@ def assign(self, **new_columns: Callable[["DataFrame"], Any]) -> "DataFrame":
for k, f in new_columns.items():
v: Any = f(self)
if isinstance(v, Expr):
v.bind(db=self._db)
assert (
v._dataframe is None or v._dataframe == self
), "Newly included columns must be based on the current dataframe"
Expand Down Expand Up @@ -539,9 +552,10 @@ def join(
other: "DataFrame",
how: Literal["", "left", "right", "outer", "inner", "cross"] = "",
cond: Optional[Callable[["DataFrame", "DataFrame"], Expr]] = None,
on: Optional[Union[str, Iterable[str]]] = None,
on: Iterable[str] = None,
self_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"},
other_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"},
on_columns: Union[Dict[str, Optional[str]], Set[str]] = {"*"},
) -> "DataFrame":
"""
Join the current :class:`~dataframe.DataFrame` with another using the given arguments.
Expand All @@ -567,6 +581,7 @@ def join(
the corresponding key to avoid name conflicts. Asterisk :code:`"*"`
can be used as a key to indicate all columns.
other_columns: Same as `self_columns`, but for the **other** :class:`~dataframe.DataFrame`.
on_columns: A :class:`dict` whose keys are the column names of the resulting joined dataframe using `on`

Note:
When using :code:`"*"` as key in `self_columns` or `other_columns`,
Expand All @@ -583,20 +598,21 @@ def join(
... age_rows, column_names=["name", "age"], db=db)
>>> result = student.join(
... student,
... on="age",
... self_columns={"*"},
... other_columns={"name": "name_2"})
... on=["age"],
... self_columns={"name": "name", "age": "age_1"},
... other_columns={"name": "name_2", "age": "age_2"})
>>> result
----------------------
name | age | name_2
-------+-----+--------
alice | 18 | alice
bob | 19 | carol
bob | 19 | bob
carol | 19 | carol
carol | 19 | bob
age | name | name_2
-----+-------+--------
18 | alice | alice
19 | bob | carol
19 | bob | bob
19 | carol | carol
19 | carol | bob
----------------------
(5 rows)

"""
# FIXME : Raise Error if target columns don't exist
assert how.upper() in [
Expand Down Expand Up @@ -629,15 +645,83 @@ def bind(t: DataFrame, columns: Union[Dict[str, Optional[str]], Set[str]]) -> Li
if on is not None
else None
)

# USING clause in SQL uses argument `on`.
sql_using_clause = f"USING ({join_column_names})" if join_column_names is not None else ""
return DataFrame(

if on is None:
return DataFrame(
f"""
SELECT {",".join(target_list)}
FROM {self._name} {how} JOIN {other_clause} {sql_on_clause} {sql_using_clause}
""",
parents=[self, other],
)

def bind_using(
t: DataFrame,
columns: Union[Dict[str, Optional[str]], Set[str]],
on: Iterable[str],
suffix: str,
) -> List[str]:
target_list: List[str] = []
for k in columns:
col: Column = t[k]
v = columns[k] if isinstance(columns, dict) else (k + suffix) if k in on else None
target_list.append(col._serialize() + (f' AS "{v}"' if v is not None else ""))
return target_list

self_target_list = (
bind_using(self, self_columns, on, "_l")
if isinstance(self_columns, set)
else bind(self, self_columns)
)
other_target_list = (
bind_using(other_temp, other_columns, on, "_r")
if isinstance(other_columns, set)
else bind(other_temp, other_columns)
)
target_list = self_target_list + other_target_list

join_dataframe = DataFrame(
f"""
SELECT {",".join(target_list)}
FROM {self._name} {how} JOIN {other_clause} {sql_on_clause} {sql_using_clause}
""",
parents=[self, other],
)
coalesce_target_list = []
if not (self_columns == {} or other_columns == {}):
for k in on:
s_v = self_columns[k] if isinstance(self_columns, dict) else (k + "_l")
o_v = other_columns[k] if isinstance(other_columns, dict) else (k + "_r")
coalesce_target_list.append(f"COALESCE({s_v},{o_v}) AS {k}")

join_df = DataFrame(
f"""
SELECT * {("," + ",".join(coalesce_target_list)) if coalesce_target_list != [] else ""}
FROM {join_dataframe._name}
""",
parents=[join_dataframe],
)

self_columns_set = (
self_columns
if isinstance(self_columns, set)
else set([k if k in on else v for k, v in self_columns.items()])
)
other_columns_set = (
other_columns
if isinstance(other_columns, set)
else set([k if k in on else v for k, v in other_columns.items()])
)
return DataFrame(
f"""
SELECT {",".join(sorted(self_columns_set | other_columns_set))}
FROM {join_df._name}
""",
parents=[join_df],
)

inner_join = partialmethod(join, how="INNER")
"""
Expand Down Expand Up @@ -707,7 +791,7 @@ def _depth_first_search(self, t: "DataFrame", visited: Set[str], lineage: List["
self._depth_first_search(i, visited, lineage)
lineage.append(t)

def _build_full_query(self) -> str:
def _serialize(self) -> str:
# noqa
""":meta private:"""
lineage = self._list_lineage()
Expand Down Expand Up @@ -849,12 +933,12 @@ def _fetch(self, is_all: bool = True) -> Iterable[Tuple[Any]]:
f"SELECT to_json({output_name})::TEXT FROM {self._name} AS {output_name}",
parents=[self],
)
result = self._db._execute(to_json_dataframe._build_full_query())
return result if result is not None else []
result = self._db._execute(to_json_dataframe._serialize())
return result if isinstance(result, Iterable) else []

def save_as(
self,
table_name: str,
table_name: Optional[str] = None,
column_names: List[str] = [],
temp: bool = False,
storage_params: dict[str, Any] = {},
Expand Down Expand Up @@ -915,53 +999,61 @@ def save_as(

# build string from parameter dict, such as from {'a': 1, 'b': 2} to
# 'WITH (a=1, b=2)'
storage_parameters = (
storage_params_clause = (
f"WITH ({','.join([f'{key}={storage_params[key]}' for key in storage_params.keys()])})"
)
df_full_name = f'"{table_name}"' if schema is None else f'"{schema}"."{table_name}"'
if table_name is None:
table_name = self._name if not self.is_saved else "cte_" + uuid4().hex
qualified_table_name = f'"{table_name}"' if schema is None else f'"{schema}"."{table_name}"'
self._db._execute(
f"""
CREATE {'TEMP' if temp else ''} TABLE {df_full_name}
CREATE {'TEMP' if temp else ''} TABLE {qualified_table_name}
({','.join(column_names)})
{storage_parameters if storage_params else ''}
AS {self._build_full_query()}
{storage_params_clause if storage_params else ''}
AS (
{self._serialize()}
)
""",
has_results=False,
)
return DataFrame.from_table(table_name, self._db)

# TODO: Uncomment or remove this.
#
# def create_index(
# self,
# columns: Iterable[Union["Column", str]],
# method: str = "btree",
# name: Optional[str] = None,
# ) -> None:
# if not self._in_catalog():
# raise Exception("Cannot create index on dataframes not in the system catalog.")
# index_name: str = name if name is not None else "idx_" + uuid4().hex
# indexed_cols = ",".join([str(col) for col in columns])
# assert self._db is not None
# self._db._execute(
# f"CREATE INDEX {index_name} ON {self.name} USING {method} ({indexed_cols})",
# has_results=False,
# )

def _explain(self, format: str = "TEXT") -> Iterable[Tuple[str]]:
return DataFrame.from_table(table_name, self._db, schema=schema)

def create_index(
self,
columns: Union[Set[str], Dict[str, str]],
method: str = "btree",
name: Optional[str] = None,
) -> "DataFrame":
"""
Explain the GreenplumPython :class:`~dataframe.DataFrame`'s execution plan.
Create an index for the current dataframe for fast searching.

The current dataframe is required to be saved before creating index.

Args:
format: str: the format of the explain result. It can be one of "TEXT"/"XML"/"JSON"/"YAML".
columns: key columns of the current dataframe to create index on.
method: index access method.
name: name of the index.

Returns:
Iterable[Tuple[str]]: The results of *EXPLAIN* query.
Dataframe with key columns indexed.
"""
assert self.is_saved, "Cannot create index for unsaved dataframe."
assert len(columns) > 0, "Column set to be indexed cannot be empty."

index_name: str = "idx_" + uuid4().hex if name is None else name
keys = (
[f'"{name}" "{op_class}"' for name, op_class in columns.items()]
if isinstance(columns, dict)
else [f'"{name}"' for name in columns]
)
assert self._db is not None
results = self._db._execute(f"EXPLAIN (FORMAT {format}) {self._build_full_query()}")
assert results is not None
return results
self._db._execute(
f'CREATE INDEX "{index_name}" ON {self._qualified_table_name} USING "{method}" ('
f' {",".join(keys)}'
f")",
has_results=False,
)
return self

def group_by(self, *column_names: str) -> DataFrameGroupingSet:
"""
Expand Down Expand Up @@ -1036,10 +1128,8 @@ def from_table(cls, table_name: str, db: Database, schema: Optional[str] = None)
df = gp.DataFrame.from_table("pg_class", db=db)

"""
return DataFrame(
f'TABLE "{schema}"."{table_name}"' if schema is not None else f'TABLE "{table_name}"',
db=db,
)
qualified_name = f'"{schema}"."{table_name}"' if schema is not None else f'"{table_name}"'
return DataFrame(f"TABLE {qualified_name}", db=db, qualified_table_name=qualified_name)

@classmethod
def from_rows(
Expand Down
5 changes: 5 additions & 0 deletions greenplumpython/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ def _execute(self, query: str, has_results: bool = True) -> Union[Iterable[Tuple
"""

with self._conn.cursor() as cursor:
cursor.execute("SELECT pg_backend_pid()")
print("BACKEND SESSION PID: ", cursor.fetchall())
print("OBJECT ID: ", id(self))
if config.print_sql:
print(query)
cursor.execute(query)
return cursor.fetchall() if has_results else cursor.rowcount

def close(self) -> None:
"""Close the database connection."""
print("OBJECT ID CLOSED: ", id(self))
self._conn.close()

def create_dataframe(
Expand Down Expand Up @@ -213,6 +217,7 @@ def assign(self, **new_columns: Callable[[], Any]) -> "DataFrame":
for k, f in new_columns.items():
v: Any = f()
if isinstance(v, Expr):
v.bind(db=self)
assert v._dataframe is None, "New column should not depend on any dataframe."
if isinstance(v, FunctionExpr):
v = v.bind(db=self)
Expand Down
Loading