Closed
Description
What happened?
A call to open_mfdataset
with parallel=true
fails when using a dask delayed client with newer version of cftime
and xarray
. This happens with cftime==1.5.2
and xarray==0.20.2
but not cftime==1.5.1
and xarray==0.20.2
.
What did you expect to happen?
I expected the call to open_mfdataset
to work without error with parallel=True
as it does with parallel=False
and a previous version of cftime
Minimal Complete Verifiable Example
import xarray as xr
import numpy as np
from dask.distributed import Client
# Need a main routine for dask.distributed if run as script
if __name__ == "__main__":
client = Client(n_workers=1)
t = xr.cftime_range('20010101','20010501', closed='left', calendar='noleap')
x = np.arange(100)
v = np.random.random((t.size,x.size))
da = xr.DataArray(v, coords=[('time',t), ('x',x)])
da.to_netcdf('sample.nc')
# Works
xr.open_mfdataset('sample.nc', parallel=False)
# Throws TypeError exception
xr.open_mfdataset('sample.nc', parallel=True)
Relevant log output
distributed.protocol.core - CRITICAL - Failed to deserialize [32/525]
Traceback (most recent call last):
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default
return merge_and_deserialize(
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 488, in merge_and_deserialize
return deserialize(header, merged_frames, deserializers=deserializers)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 417, in deserialize
return loads(header, frames)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 96, in pickle_loads
return pickle.loads(x, buffers=new)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 75, in loads
return pickle.loads(x)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/pandas/core/indexes/base.py", line 255, in _new_Index
return cls.__new__(cls, **d)
TypeError: __new__() got an unexpected keyword argument 'dtype'
Traceback (most recent call last):
File "/g/data/v45/aph502/notebooks/test_pickle.py", line 21, in <module>
xr.open_mfdataset('sample.nc', parallel=True)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/xarray/backends/api.py", line 916, in open_mfdataset
datasets, closers = dask.compute(datasets, closers)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/dask/base.py", line 571, in compute
results = schedule(dsk, keys, **kwargs)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 2746, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1946, in gather
return self.sync(
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
return sync(
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 364, in sync
raise exc.with_traceback(tb)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 349, in f
result[0] = yield future
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1840, in _gather
response = await future
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1891, in _gather_remote
response = await retry_operation(self.scheduler.gather, keys=keys)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils_comm.py", line 385, in retry_operation
return await retry(
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils_comm.py", line 370, in retry
return await coro()
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/core.py", line 900, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/core.py", line 669, in send_recv
response = await comm.read(deserializers=deserializers)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/tcp.py", line 232, in read
msg = await from_frames(
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/utils.py", line 78, in from_frames
res = _from_frames()
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/utils.py", line 61, in _from_frames
return protocol.loads(
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default
return merge_and_deserialize(
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 488, in merge_and_deserialize
return deserialize(header, merged_frames, deserializers=deserializers)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 417, in deserialize
return loads(header, frames)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 96, in pickle_loads
return pickle.loads(x)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 75, in loads
return pickle.loads(x)
File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/pandas/core/indexes/base.py", line 255, in _new_Index
return cls.__new__(cls, **d)
TypeError: __new__() got an unexpected keyword argument 'dtype'
Anything else we need to know?
It seems similar to previous issues with pickling #5686 which was fixed in cftime
Unidata/cftime#252 but the tests in previous issues still work, so it isn't exactly the same.
Environment
INSTALLED VERSIONS
------------------
commit: None
python: 3.9.9 | packaged by conda-forge | (main, Dec 20 2021, 02:41:03)
[GCC 9.4.0]
python-bits: 64
OS: Linux
OS-release: 4.18.0-348.2.1.el8.nci.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: en_AU.utf8
LANG: en_AU.ISO8859-1
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.10.6
libnetcdf: 4.7.4
xarray: 0.20.2
pandas: 1.4.0
numpy: 1.22.1
scipy: 1.7.3
netCDF4: 1.5.6
pydap: installed
h5netcdf: 0.13.1
h5py: 3.6.0
Nio: None
zarr: 2.10.3
cftime: 1.5.2
nc_time_axis: 1.4.0
PseudoNetCDF: None
rasterio: 1.2.6
cfgrib: 0.9.9.1
iris: 3.1.0
bottleneck: 1.3.2
dask: 2022.01.0
distributed: 2022.01.0
matplotlib: 3.5.1
cartopy: 0.19.0.post1
seaborn: 0.11.2
numbagg: None
fsspec: 2022.01.0
cupy: 10.1.0
pint: 0.18
sparse: 0.13.0
setuptools: 59.8.0
pip: 21.3.1
conda: 4.11.0
pytest: 6.2.5
IPython: 8.0.1
sphinx: 4.4.0