Skip to content

Commit 2f93f7b

Browse files
author
Jason Ng
committed
Remove old read_gbq, rename from_gbq to read_gbq, and add verbose query job info
Remove locale import
1 parent a763cf0 commit 2f93f7b

File tree

1 file changed

+33
-314
lines changed

1 file changed

+33
-314
lines changed

pandas_gbq/gbq.py

Lines changed: 33 additions & 314 deletions
Original file line numberDiff line numberDiff line change
@@ -304,28 +304,6 @@ def _print(self, msg, end='\n'):
304304
sys.stdout.write(msg + end)
305305
sys.stdout.flush()
306306

307-
def _start_timer(self):
308-
self.start = time.time()
309-
310-
def get_elapsed_seconds(self):
311-
return round(time.time() - self.start, 2)
312-
313-
def print_elapsed_seconds(self, prefix='Elapsed', postfix='s.',
314-
overlong=7):
315-
sec = self.get_elapsed_seconds()
316-
if sec > overlong:
317-
self._print('{} {} {}'.format(prefix, sec, postfix))
318-
319-
# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
320-
@staticmethod
321-
def sizeof_fmt(num, suffix='B'):
322-
fmt = "%3.1f %s%s"
323-
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
324-
if abs(num) < 1024.0:
325-
return fmt % (num, unit, suffix)
326-
num /= 1024.0
327-
return fmt % (num, 'Y', suffix)
328-
329307
def get_service(self):
330308
import httplib2
331309
try:
@@ -379,132 +357,6 @@ def process_insert_errors(self, insert_errors):
379357

380358
raise StreamingInsertError
381359

382-
def run_query(self, query, **kwargs):
383-
try:
384-
from googleapiclient.errors import HttpError
385-
except:
386-
from apiclient.errors import HttpError
387-
from oauth2client.client import AccessTokenRefreshError
388-
389-
_check_google_client_version()
390-
391-
job_collection = self.service.jobs()
392-
393-
job_config = {
394-
'query': {
395-
'query': query,
396-
'useLegacySql': self.dialect == 'legacy'
397-
# 'allowLargeResults', 'createDisposition',
398-
# 'preserveNulls', destinationTable, useQueryCache
399-
}
400-
}
401-
config = kwargs.get('configuration')
402-
if config is not None:
403-
if len(config) != 1:
404-
raise ValueError("Only one job type must be specified, but "
405-
"given {}".format(','.join(config.keys())))
406-
if 'query' in config:
407-
if 'query' in config['query'] and query is not None:
408-
raise ValueError("Query statement can't be specified "
409-
"inside config while it is specified "
410-
"as parameter")
411-
412-
job_config['query'].update(config['query'])
413-
else:
414-
raise ValueError("Only 'query' job type is supported")
415-
416-
job_data = {
417-
'configuration': job_config
418-
}
419-
420-
self._start_timer()
421-
try:
422-
self._print('Requesting query... ', end="")
423-
query_reply = job_collection.insert(
424-
projectId=self.project_id, body=job_data).execute()
425-
self._print('ok.\nQuery running...')
426-
except (AccessTokenRefreshError, ValueError):
427-
if self.private_key:
428-
raise AccessDenied(
429-
"The service account credentials are not valid")
430-
else:
431-
raise AccessDenied(
432-
"The credentials have been revoked or expired, "
433-
"please re-run the application to re-authorize")
434-
except HttpError as ex:
435-
self.process_http_error(ex)
436-
437-
job_reference = query_reply['jobReference']
438-
439-
while not query_reply.get('jobComplete', False):
440-
self.print_elapsed_seconds(' Elapsed', 's. Waiting...')
441-
try:
442-
query_reply = job_collection.getQueryResults(
443-
projectId=job_reference['projectId'],
444-
jobId=job_reference['jobId']).execute()
445-
except HttpError as ex:
446-
self.process_http_error(ex)
447-
448-
if self.verbose:
449-
if query_reply['cacheHit']:
450-
self._print('Query done.\nCache hit.\n')
451-
else:
452-
bytes_processed = int(query_reply.get(
453-
'totalBytesProcessed', '0'))
454-
self._print('Query done.\nProcessed: {}\n'.format(
455-
self.sizeof_fmt(bytes_processed)))
456-
457-
self._print('Retrieving results...')
458-
459-
total_rows = int(query_reply['totalRows'])
460-
result_pages = list()
461-
seen_page_tokens = list()
462-
current_row = 0
463-
# Only read schema on first page
464-
schema = query_reply['schema']
465-
466-
# Loop through each page of data
467-
while 'rows' in query_reply and current_row < total_rows:
468-
page = query_reply['rows']
469-
result_pages.append(page)
470-
current_row += len(page)
471-
472-
self.print_elapsed_seconds(
473-
' Got page: {}; {}% done. Elapsed'.format(
474-
len(result_pages),
475-
round(100.0 * current_row / total_rows)))
476-
477-
if current_row == total_rows:
478-
break
479-
480-
page_token = query_reply.get('pageToken', None)
481-
482-
if not page_token and current_row < total_rows:
483-
raise InvalidPageToken("Required pageToken was missing. "
484-
"Received {0} of {1} rows"
485-
.format(current_row, total_rows))
486-
487-
elif page_token in seen_page_tokens:
488-
raise InvalidPageToken("A duplicate pageToken was returned")
489-
490-
seen_page_tokens.append(page_token)
491-
492-
try:
493-
query_reply = job_collection.getQueryResults(
494-
projectId=job_reference['projectId'],
495-
jobId=job_reference['jobId'],
496-
pageToken=page_token).execute()
497-
except HttpError as ex:
498-
self.process_http_error(ex)
499-
500-
if current_row < total_rows:
501-
raise InvalidPageToken()
502-
503-
# print basic query stats
504-
self._print('Got {} rows.\n'.format(total_rows))
505-
506-
return schema, result_pages
507-
508360
def load_data(self, dataframe, dataset_id, table_id, chunksize):
509361
try:
510362
from googleapiclient.errors import HttpError
@@ -606,171 +458,7 @@ def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
606458
table.create(table_id, table_schema)
607459
sleep(delay)
608460

609-
610-
def _parse_data(schema, rows):
611-
# see:
612-
# http://pandas.pydata.org/pandas-docs/dev/missing_data.html
613-
# #missing-data-casting-rules-and-indexing
614-
dtype_map = {'FLOAT': np.dtype(float),
615-
'TIMESTAMP': 'M8[ns]'}
616-
617-
fields = schema['fields']
618-
col_types = [field['type'] for field in fields]
619-
col_names = [str(field['name']) for field in fields]
620-
col_dtypes = [dtype_map.get(field['type'], object) for field in fields]
621-
page_array = np.zeros((len(rows),), dtype=lzip(col_names, col_dtypes))
622-
for row_num, raw_row in enumerate(rows):
623-
entries = raw_row.get('f', [])
624-
for col_num, field_type in enumerate(col_types):
625-
field_value = _parse_entry(entries[col_num].get('v', ''),
626-
field_type)
627-
page_array[row_num][col_num] = field_value
628-
629-
return DataFrame(page_array, columns=col_names)
630-
631-
632-
def _parse_entry(field_value, field_type):
633-
if field_value is None or field_value == 'null':
634-
return None
635-
if field_type == 'INTEGER':
636-
return int(field_value)
637-
elif field_type == 'FLOAT':
638-
return float(field_value)
639-
elif field_type == 'TIMESTAMP':
640-
timestamp = datetime.utcfromtimestamp(float(field_value))
641-
return np.datetime64(timestamp)
642-
elif field_type == 'BOOLEAN':
643-
return field_value == 'true'
644-
return field_value
645-
646-
647-
def read_gbq(query, project_id=None, index_col=None, col_order=None,
648-
reauth=False, verbose=True, private_key=None, dialect='legacy',
649-
**kwargs):
650-
r"""Load data from Google BigQuery.
651-
652-
The main method a user calls to execute a Query in Google BigQuery
653-
and read results into a pandas DataFrame.
654-
655-
Google BigQuery API Client Library v2 for Python is used.
656-
Documentation is available `here
657-
<https://developers.google.com/api-client-library/python/apis/bigquery/v2>`__
658-
659-
Authentication to the Google BigQuery service is via OAuth 2.0.
660-
661-
- If "private_key" is not provided:
662-
663-
By default "application default credentials" are used.
664-
665-
If default application credentials are not found or are restrictive,
666-
user account credentials are used. In this case, you will be asked to
667-
grant permissions for product name 'pandas GBQ'.
668-
669-
- If "private_key" is provided:
670-
671-
Service account credentials will be used to authenticate.
672-
673-
Parameters
674-
----------
675-
query : str
676-
SQL-Like Query to return data values
677-
project_id : str
678-
Google BigQuery Account project ID.
679-
index_col : str (optional)
680-
Name of result column to use for index in results DataFrame
681-
col_order : list(str) (optional)
682-
List of BigQuery column names in the desired order for results
683-
DataFrame
684-
reauth : boolean (default False)
685-
Force Google BigQuery to reauthenticate the user. This is useful
686-
if multiple accounts are used.
687-
verbose : boolean (default True)
688-
Verbose output
689-
private_key : str (optional)
690-
Service account private key in JSON format. Can be file path
691-
or string contents. This is useful for remote server
692-
authentication (eg. jupyter iPython notebook on remote host)
693-
694-
dialect : {'legacy', 'standard'}, default 'legacy'
695-
'legacy' : Use BigQuery's legacy SQL dialect.
696-
'standard' : Use BigQuery's standard SQL (beta), which is
697-
compliant with the SQL 2011 standard. For more information
698-
see `BigQuery SQL Reference
699-
<https://cloud.google.com/bigquery/sql-reference/>`__
700-
701-
**kwargs : Arbitrary keyword arguments
702-
configuration (dict): query config parameters for job processing.
703-
For example:
704-
705-
configuration = {'query': {'useQueryCache': False}}
706-
707-
For more information see `BigQuery SQL Reference
708-
<https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`__
709-
710-
Returns
711-
-------
712-
df: DataFrame
713-
DataFrame representing results of query
714-
715-
"""
716-
717-
if not project_id:
718-
raise TypeError("Missing required parameter: project_id")
719-
720-
if dialect not in ('legacy', 'standard'):
721-
raise ValueError("'{0}' is not valid for dialect".format(dialect))
722-
723-
connector = GbqConnector(project_id, reauth=reauth, verbose=verbose,
724-
private_key=private_key,
725-
dialect=dialect)
726-
schema, pages = connector.run_query(query, **kwargs)
727-
dataframe_list = []
728-
while len(pages) > 0:
729-
page = pages.pop()
730-
dataframe_list.append(_parse_data(schema, page))
731-
732-
if len(dataframe_list) > 0:
733-
final_df = concat(dataframe_list, ignore_index=True)
734-
else:
735-
final_df = _parse_data(schema, [])
736-
737-
# Reindex the DataFrame on the provided column
738-
if index_col is not None:
739-
if index_col in final_df.columns:
740-
final_df.set_index(index_col, inplace=True)
741-
else:
742-
raise InvalidIndexColumn(
743-
'Index column "{0}" does not exist in DataFrame.'
744-
.format(index_col)
745-
)
746-
747-
# Change the order of columns in the DataFrame based on provided list
748-
if col_order is not None:
749-
if sorted(col_order) == sorted(final_df.columns):
750-
final_df = final_df[col_order]
751-
else:
752-
raise InvalidColumnOrder(
753-
'Column order does not match this DataFrame.'
754-
)
755-
756-
# cast BOOLEAN and INTEGER columns from object to bool/int
757-
# if they dont have any nulls
758-
type_map = {'BOOLEAN': bool, 'INTEGER': int}
759-
for field in schema['fields']:
760-
if field['type'] in type_map and \
761-
final_df[field['name']].notnull().all():
762-
final_df[field['name']] = \
763-
final_df[field['name']].astype(type_map[field['type']])
764-
765-
connector.print_elapsed_seconds(
766-
'Total time taken',
767-
datetime.now().strftime('s.\nFinished at %Y-%m-%d %H:%M:%S.'),
768-
0
769-
)
770-
771-
return final_df
772-
773-
def from_gbq(query, project_id=None, index_col=None, col_order=None,
461+
def read_gbq(query, project_id=None, index_col=None, col_order=None, verbose=True,
774462
private_key=None, dialect='legacy', configuration=None, **kwargs):
775463
r"""Load data from Google BigQuery using google-cloud-python
776464
@@ -798,6 +486,8 @@ def from_gbq(query, project_id=None, index_col=None, col_order=None,
798486
col_order : list(str) (optional)
799487
List of BigQuery column names in the desired order for results
800488
DataFrame
489+
verbose : boolean (default True)
490+
Verbose output
801491
private_key : str (optional)
802492
Path to service account private key in JSON format. If none is provided,
803493
will default to the GOOGLE_APPLICATION_CREDENTIALS environment variable
@@ -826,6 +516,15 @@ def from_gbq(query, project_id=None, index_col=None, col_order=None,
826516
827517
"""
828518

519+
# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
520+
def sizeof_fmt(num, suffix='B'):
521+
fmt = "%3.1f %s%s"
522+
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']:
523+
if abs(num) < 1024.0:
524+
return fmt % (num, unit, suffix)
525+
num /= 1024.0
526+
return fmt % (num, 'Y', suffix)
527+
829528
if private_key:
830529
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = private_key
831530

@@ -849,11 +548,31 @@ def _wait_for_job(job):
849548
setattr(query_job, setting, value)
850549

851550
query_job.begin()
852-
_wait_for_job(query_job)
853551

552+
if verbose:
553+
print("Query running...")
554+
_wait_for_job(query_job)
555+
if verbose:
556+
print("Query done.")
557+
if query_job._properties["statistics"]["query"].get("cacheHit", False):
558+
print("Cache hit.")
559+
elif "statistics" in query_job._properties and "query" in query_job._properties["statistics"]:
560+
bytes_billed = int(query_job._properties["statistics"]["query"].get("totalBytesProcessed", 0))
561+
bytes_processed = int(query_job._properties["statistics"]["query"].get("totalBytesBilled", 0))
562+
print("Total bytes billed (processed): %s (%s)" % (sizeof_fmt(bytes_billed),sizeof_fmt(bytes_processed)))
854563
query_results = query_job.results()
855564

565+
if verbose:
566+
print("\nRetrieving results...")
567+
856568
rows, total_rows, page_token = query_results.fetch_data()
569+
570+
if verbose:
571+
print("Got %s rows.") % total_rows
572+
print("\nTotal time taken %s s" % (datetime.utcnow()-query_job.created.replace(tzinfo=None)).seconds)
573+
print("Finished at %s." % datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
574+
575+
857576
columns = [field.name for field in query_results.schema]
858577
data = rows
859578

0 commit comments

Comments
 (0)