-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
BUG: read_parquet, to_parquet for s3 destinations #19135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
5c0536a
c4b490e
57bb814
e3e948e
7040ef4
2e43ba6
c95a542
e32f0c9
424eb6a
8ed608d
1cf2184
4011374
452104e
230c814
026ecc7
56081c9
6122373
70adb42
87614c2
55c575d
556c43f
22f1ae5
ce11de6
900a1c4
9dbc77c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,14 +91,6 @@ def _is_url(url): | |
return False | ||
|
||
|
||
def _is_s3_url(url): | ||
"""Check for an s3, s3n, or s3a url""" | ||
try: | ||
return parse_url(url).scheme in ['s3', 's3n', 's3a'] | ||
except: | ||
return False | ||
|
||
|
||
def _expand_user(filepath_or_buffer): | ||
"""Return the argument with an initial component of ~ or ~user | ||
replaced by that user's home directory. | ||
|
@@ -168,8 +160,16 @@ def _stringify_path(filepath_or_buffer): | |
return filepath_or_buffer | ||
|
||
|
||
def is_s3_url(url): | ||
"""Check for an s3, s3n, or s3a url""" | ||
try: | ||
return parse_url(url).scheme in ['s3', 's3n', 's3a'] | ||
except: | ||
return False | ||
|
||
|
||
def get_filepath_or_buffer(filepath_or_buffer, encoding=None, | ||
compression=None): | ||
compression=None, mode=None): | ||
""" | ||
If the filepath_or_buffer is a url, translate and return the buffer. | ||
Otherwise passthrough. | ||
|
@@ -179,6 +179,8 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, | |
filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), | ||
or buffer | ||
encoding : the encoding to use to decode py3 bytes, default is 'utf-8' | ||
mode : {'rb', 'wb', 'ab'} applies to S3 where a write mandates opening the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mode : str, optional, you don't need to mention anything else here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's keep a documentation of available options, and have the comment that this applies to s3 destinations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no this is just distracting as it only applies to s3 and is simply a pass thru option. pls change. |
||
file in 'wb' mode. | ||
|
||
Returns | ||
------- | ||
|
@@ -195,11 +197,12 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None, | |
reader = BytesIO(req.read()) | ||
return reader, encoding, compression | ||
|
||
if _is_s3_url(filepath_or_buffer): | ||
if is_s3_url(filepath_or_buffer): | ||
from pandas.io import s3 | ||
return s3.get_filepath_or_buffer(filepath_or_buffer, | ||
encoding=encoding, | ||
compression=compression) | ||
compression=compression, | ||
mode=mode) | ||
|
||
if isinstance(filepath_or_buffer, (compat.string_types, | ||
compat.binary_type, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
from pandas import DataFrame, RangeIndex, Int64Index, get_option | ||
from pandas.compat import string_types | ||
from pandas.core.common import AbstractMethodError | ||
from pandas.io.common import get_filepath_or_buffer | ||
from pandas.io.common import get_filepath_or_buffer, is_s3_url | ||
|
||
|
||
def get_engine(engine): | ||
|
@@ -107,7 +107,7 @@ def write(self, df, path, compression='snappy', | |
self.validate_dataframe(df) | ||
if self._pyarrow_lt_070: | ||
self._validate_write_lt_070(df) | ||
path, _, _ = get_filepath_or_buffer(path) | ||
path, _, _ = get_filepath_or_buffer(path, mode='wb') | ||
|
||
if self._pyarrow_lt_060: | ||
table = self.api.Table.from_pandas(df, timestamps_to_ms=True) | ||
|
@@ -190,6 +190,10 @@ def __init__(self): | |
self.api = fastparquet | ||
|
||
def write(self, df, path, compression='snappy', **kwargs): | ||
if is_s3_url(path): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this if block, go from path = 's3://{}'.format(path.path)
kwargs['open_with'] = path.s3.open See if that works? |
||
raise NotImplementedError("fastparquet s3 write isn't implemented." | ||
" Consider using pyarrow instead.") | ||
|
||
self.validate_dataframe(df) | ||
# thriftpy/protocol/compact.py:339: | ||
# DeprecationWarning: tostring() is deprecated. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -212,28 +212,42 @@ def check_error_on_write(self, df, engine, exc): | |
with tm.ensure_clean() as path: | ||
to_parquet(df, path, engine, compression=None) | ||
|
||
def check_round_trip(self, df, engine, expected=None, | ||
def check_round_trip(self, df, engine, expected=None, path=None, | ||
write_kwargs=None, read_kwargs=None, | ||
check_names=True): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls revert this, it is easy enough simply to passing It is VERY confusing to have 2 check functions, and so now future readers have no idea what to use. This is just increasing complexity. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the change in the write_kwargs is ok. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes def check_round_trip(self, df, engine, path=None, expected=None,
write_kwargs=None, read_kwargs=None,
check_names=True):
if write_kwargs is None:
write_kwargs = {'compression': None}
if read_kwargs is None:
read_kwargs = {}
if expected is None:
expected = df
if path is None:
with tm.ensure_clean() as path:
df.to_parquet(path, engine, **write_kwargs)
actual = read_parquet(path, engine, **read_kwargs)
tm.assert_frame_equal(expected, actual, check_names=check_names)
# repeat
df.to_parquet(path, engine, **write_kwargs)
actual = read_parquet(path, engine, **read_kwargs)
tm.assert_frame_equal(expected, actual, check_names=check_names)
else:
df.to_parquet(path, engine, **write_kwargs)
actual = read_parquet(path, engine, **read_kwargs)
tm.assert_frame_equal(expected, actual, check_names=check_names)
# repeat
df.to_parquet(path, engine, **write_kwargs)
actual = read_parquet(path, engine, **read_kwargs)
tm.assert_frame_equal(expected, actual, check_names=check_names) Any suggestions? |
||
|
||
if write_kwargs is None: | ||
write_kwargs = {} | ||
write_kwargs = {'compression': None} | ||
|
||
if read_kwargs is None: | ||
read_kwargs = {} | ||
with tm.ensure_clean() as path: | ||
df.to_parquet(path, engine, **write_kwargs) | ||
result = read_parquet(path, engine, **read_kwargs) | ||
|
||
if expected is None: | ||
expected = df | ||
tm.assert_frame_equal(result, expected, check_names=check_names) | ||
if expected is None: | ||
expected = df | ||
|
||
# repeat | ||
to_parquet(df, path, engine, **write_kwargs) | ||
result = pd.read_parquet(path, engine, **read_kwargs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well this is repeating quite a bit, just make an function def like this (make it a top-level function in this module, add a mini-doc string as well), and just call it.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that your suggestion is similar to the idea the was rejected of Maybe a better name other then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls do it my way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this needs updating |
||
if path is None: | ||
with tm.ensure_clean() as path: | ||
df.to_parquet(path, engine, **write_kwargs) | ||
actual = read_parquet(path, engine, **read_kwargs) | ||
tm.assert_frame_equal(expected, actual, | ||
check_names=check_names) | ||
|
||
# repeat | ||
df.to_parquet(path, engine, **write_kwargs) | ||
actual = read_parquet(path, engine, **read_kwargs) | ||
tm.assert_frame_equal(expected, actual, | ||
check_names=check_names) | ||
else: | ||
df.to_parquet(path, engine, **write_kwargs) | ||
actual = read_parquet(path, engine, **read_kwargs) | ||
tm.assert_frame_equal(expected, actual, | ||
check_names=check_names) | ||
|
||
if expected is None: | ||
expected = df | ||
tm.assert_frame_equal(result, expected, check_names=check_names) | ||
# repeat | ||
df.to_parquet(path, engine, **write_kwargs) | ||
actual = read_parquet(path, engine, **read_kwargs) | ||
tm.assert_frame_equal(expected, actual, | ||
check_names=check_names) | ||
|
||
|
||
class TestBasic(Base): | ||
|
@@ -251,7 +265,7 @@ def test_columns_dtypes(self, engine): | |
|
||
# unicode | ||
df.columns = [u'foo', u'bar'] | ||
self.check_round_trip(df, engine, write_kwargs={'compression': None}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a reason you are changing all of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well yes, I've took the opportunity to do small refactoring in test_parquet.py. test_parquet.py Now i'm using So now we have As for |
||
self.check_round_trip(df, engine) | ||
|
||
def test_columns_dtypes_invalid(self, engine): | ||
|
||
|
@@ -292,7 +306,6 @@ def test_read_columns(self, engine): | |
|
||
expected = pd.DataFrame({'string': list('abc')}) | ||
self.check_round_trip(df, engine, expected=expected, | ||
write_kwargs={'compression': None}, | ||
read_kwargs={'columns': ['string']}) | ||
|
||
def test_write_index(self, engine): | ||
|
@@ -304,7 +317,7 @@ def test_write_index(self, engine): | |
pytest.skip("pyarrow is < 0.7.0") | ||
|
||
df = pd.DataFrame({'A': [1, 2, 3]}) | ||
self.check_round_trip(df, engine, write_kwargs={'compression': None}) | ||
self.check_round_trip(df, engine) | ||
|
||
indexes = [ | ||
[2, 3, 4], | ||
|
@@ -315,15 +328,12 @@ def test_write_index(self, engine): | |
# non-default index | ||
for index in indexes: | ||
df.index = index | ||
self.check_round_trip( | ||
df, engine, | ||
write_kwargs={'compression': None}, | ||
check_names=check_names) | ||
self.check_round_trip(df, engine, check_names=check_names) | ||
|
||
# index with meta-data | ||
df.index = [0, 1, 2] | ||
df.index.name = 'foo' | ||
self.check_round_trip(df, engine, write_kwargs={'compression': None}) | ||
self.check_round_trip(df, engine) | ||
|
||
def test_write_multiindex(self, pa_ge_070): | ||
# Not suppoprted in fastparquet as of 0.1.3 or older pyarrow version | ||
|
@@ -332,7 +342,7 @@ def test_write_multiindex(self, pa_ge_070): | |
df = pd.DataFrame({'A': [1, 2, 3]}) | ||
index = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)]) | ||
df.index = index | ||
self.check_round_trip(df, engine, write_kwargs={'compression': None}) | ||
self.check_round_trip(df, engine) | ||
|
||
def test_write_column_multiindex(self, engine): | ||
# column multi-index | ||
|
@@ -426,6 +436,11 @@ def test_categorical_unsupported(self, pa_lt_070): | |
df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) | ||
self.check_error_on_write(df, pa, NotImplementedError) | ||
|
||
def test_s3_roundtrip(self, df_compat, s3_resource, pa): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add the same test to FP, but assert that it raises. |
||
# GH #19134 | ||
self.check_round_trip(df_compat, pa, | ||
path='s3://pandas-test/pyarrow.parquet') | ||
|
||
|
||
class TestParquetFastParquet(Base): | ||
|
||
|
@@ -436,7 +451,7 @@ def test_basic(self, fp, df_full): | |
# additional supported types for fastparquet | ||
df['timedelta'] = pd.timedelta_range('1 day', periods=3) | ||
|
||
self.check_round_trip(df, fp, write_kwargs={'compression': None}) | ||
self.check_round_trip(df, fp) | ||
|
||
@pytest.mark.skip(reason="not supported") | ||
def test_duplicate_columns(self, fp): | ||
|
@@ -449,8 +464,7 @@ def test_duplicate_columns(self, fp): | |
def test_bool_with_none(self, fp): | ||
df = pd.DataFrame({'a': [True, None, False]}) | ||
expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16') | ||
self.check_round_trip(df, fp, expected=expected, | ||
write_kwargs={'compression': None}) | ||
self.check_round_trip(df, fp, expected=expected) | ||
|
||
def test_unsupported(self, fp): | ||
|
||
|
@@ -466,7 +480,7 @@ def test_categorical(self, fp): | |
if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"): | ||
pytest.skip("CategoricalDtype not supported for older fp") | ||
df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) | ||
self.check_round_trip(df, fp, write_kwargs={'compression': None}) | ||
self.check_round_trip(df, fp) | ||
|
||
def test_datetime_tz(self, fp): | ||
# doesn't preserve tz | ||
|
@@ -475,8 +489,7 @@ def test_datetime_tz(self, fp): | |
|
||
# warns on the coercion | ||
with catch_warnings(record=True): | ||
self.check_round_trip(df, fp, df.astype('datetime64[ns]'), | ||
write_kwargs={'compression': None}) | ||
self.check_round_trip(df, fp, df.astype('datetime64[ns]')) | ||
|
||
def test_filter_row_groups(self, fp): | ||
d = {'a': list(range(0, 3))} | ||
|
@@ -486,3 +499,9 @@ def test_filter_row_groups(self, fp): | |
row_group_offsets=1) | ||
result = read_parquet(path, fp, filters=[('a', '==', 0)]) | ||
assert len(result) == 1 | ||
|
||
def test_s3_roundtrip(self, df_compat, s3_resource, fp): | ||
# GH #19134 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a comment on why this is NI |
||
with pytest.raises(NotImplementedError): | ||
self.check_round_trip(df_compat, fp, | ||
path='s3://pandas-test/fastparquet.parquet') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,8 @@ | ||
from pandas.io.common import _is_s3_url | ||
from pandas.io.common import is_s3_url | ||
|
||
|
||
class TestS3URL(object): | ||
|
||
def test_is_s3_url(self): | ||
assert _is_s3_url("s3://pandas/somethingelse.com") | ||
assert not _is_s3_url("s4://pandas/somethingelse.com") | ||
assert is_s3_url("s3://pandas/somethingelse.com") | ||
assert not is_s3_url("s4://pandas/somethingelse.com") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where exception was raised
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several possible exceptions in the fail chain.
3 different components
pyarrow - write attempt
FileNotFoundException or ValueError (depends on if file exists in S3 or not).
fastparquet - read attempt
Exception in attempting to concat str and S3File
fastparquet - write attempt
Exception in attempting to open path using default_open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where an exception was raised if the write destination is S3.