Skip to content

open_mfdataset fails with cftime index when using parallel and dask delayed client #6226

Closed
@aidanheerdegen

Description

@aidanheerdegen

What happened?

A call to open_mfdataset with parallel=true fails when using a dask delayed client with newer version of cftime and xarray. This happens with cftime==1.5.2 and xarray==0.20.2 but not cftime==1.5.1 and xarray==0.20.2.

What did you expect to happen?

I expected the call to open_mfdataset to work without error with parallel=True as it does with parallel=False and a previous version of cftime

Minimal Complete Verifiable Example

import xarray as xr
import numpy as np
from dask.distributed import Client

# Need a main routine for dask.distributed if run as script
if __name__ == "__main__":

    client = Client(n_workers=1) 

    t = xr.cftime_range('20010101','20010501', closed='left', calendar='noleap')
    x = np.arange(100)
    v = np.random.random((t.size,x.size))

    da = xr.DataArray(v, coords=[('time',t), ('x',x)])
    da.to_netcdf('sample.nc')

    # Works
    xr.open_mfdataset('sample.nc', parallel=False)

    # Throws TypeError exception
    xr.open_mfdataset('sample.nc', parallel=True)

Relevant log output

distributed.protocol.core - CRITICAL - Failed to deserialize                                                                                                                                       [32/525]
Traceback (most recent call last):                                                                                                                                                                        
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
    return msgpack.loads(                                                                                                                                                                                 
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default                                                  
    return merge_and_deserialize(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 488, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 417, in deserialize
    return loads(header, frames)                    
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 96, in pickle_loads
    return pickle.loads(x, buffers=new)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/pandas/core/indexes/base.py", line 255, in _new_Index
    return cls.__new__(cls, **d)
TypeError: __new__() got an unexpected keyword argument 'dtype'
Traceback (most recent call last):
  File "/g/data/v45/aph502/notebooks/test_pickle.py", line 21, in <module>
    xr.open_mfdataset('sample.nc', parallel=True)                               
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/xarray/backends/api.py", line 916, in open_mfdataset
    datasets, closers = dask.compute(datasets, closers)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/dask/base.py", line 571, in compute
    results = schedule(dsk, keys, **kwargs)           
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 2746, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1946, in gather
    return self.sync(                             
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 310, in sync
    return sync(                
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 364, in sync
    raise exc.with_traceback(tb)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils.py", line 349, in f
    result[0] = yield future
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1840, in _gather
    response = await future
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/client.py", line 1891, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils_comm.py", line 385, in retry_operation
    return await retry(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/core.py", line 900, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/core.py", line 669, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/tcp.py", line 232, in read
    msg = await from_frames(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/utils.py", line 78, in from_frames
    res = _from_frames()
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 111, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/core.py", line 103, in _decode_default
    return merge_and_deserialize(
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 488, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 417, in deserialize
    return loads(header, frames)
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 96, in pickle_loads
    return pickle.loads(x)                                                                                                    
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)                                                                                                    
  File "/g/data3/hh5/public/apps/miniconda3/envs/analysis3-22.01/lib/python3.9/site-packages/pandas/core/indexes/base.py", line 255, in _new_Index
    return cls.__new__(cls, **d)                                                                                                          
TypeError: __new__() got an unexpected keyword argument 'dtype'

Anything else we need to know?

It seems similar to previous issues with pickling #5686 which was fixed in cftime Unidata/cftime#252 but the tests in previous issues still work, so it isn't exactly the same.

Environment


INSTALLED VERSIONS
------------------
commit: None
python: 3.9.9 | packaged by conda-forge | (main, Dec 20 2021, 02:41:03) 
[GCC 9.4.0]
python-bits: 64
OS: Linux
OS-release: 4.18.0-348.2.1.el8.nci.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: en_AU.utf8
LANG: en_AU.ISO8859-1
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.10.6
libnetcdf: 4.7.4

xarray: 0.20.2
pandas: 1.4.0
numpy: 1.22.1
scipy: 1.7.3
netCDF4: 1.5.6
pydap: installed
h5netcdf: 0.13.1
h5py: 3.6.0
Nio: None
zarr: 2.10.3
cftime: 1.5.2
nc_time_axis: 1.4.0
PseudoNetCDF: None
rasterio: 1.2.6
cfgrib: 0.9.9.1
iris: 3.1.0
bottleneck: 1.3.2
dask: 2022.01.0
distributed: 2022.01.0
matplotlib: 3.5.1
cartopy: 0.19.0.post1
seaborn: 0.11.2
numbagg: None
fsspec: 2022.01.0
cupy: 10.1.0
pint: 0.18
sparse: 0.13.0
setuptools: 59.8.0
pip: 21.3.1
conda: 4.11.0
pytest: 6.2.5
IPython: 8.0.1
sphinx: 4.4.0

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions