Skip to content

Gevent and KafkaClient.copy patch? #271

Closed
@jshaw86

Description

@jshaw86

This is referencing #145 I'm seeing copy failing still when monkey patching and then constructing a multiprocess. If this was punted on that's cool I can look at adding my own multiprocessing or threading pool to scale my upstream socket reads but gevent is nicer for this pattern.

versions:
gevent==1.0.1
kafka-python==0.9.2

Works:

    >>> from kafka.client import KafkaClient
    >>> from kafka.producer import SimpleProducer
    >>> kclient = KafkaClient("broker1,broker2,broker3")
    No handlers could be found for logger "kafka"
    >>> kproducer = SimpleProducer(kclient,batch_send=True, batch_send_every_n=100,  batch_send_every_t=10)

Fails:

    >>> import gevent.monkey
    >>> gevent.monkey.patch_all() 
    >>> kclient = KafkaClient("broker1,broker2,broker3")
    >>> kproducer = SimpleProducer(kclient,batch_send=True, batch_send_every_n=100, batch_send_every_t=10)
  Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.2-py2.7.egg/kafka/producer.py", line 240, in __init__
    batch_send_every_t)
  File "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.2-py2.7.egg/kafka/producer.py", line 143, in __init__
    self.client.copy(),
  File "/usr/local/lib/python2.7/dist-packages/kafka_python-0.9.2-py2.7.egg/kafka/client.py", line 235, in copy
    c = copy.deepcopy(self)
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 190, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib/python2.7/copy.py", line 334, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/lib/python2.7/copy.py", line 163, in deepcopy
    y = copier(x, memo)
  File "/usr/lib/python2.7/copy.py", line 257, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib/python2.7/copy.py", line 182, in deepcopy
    rv = reductor(2)
TypeError: cannot serialize 'Hub' object

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions