Closed
Description
The code:
from kafka import SimpleClient
from kafka import MultiProcessConsumer
client = SimpleClient('localhost:9092')
consumer = MultiProcessConsumer(client, "my-group", "my-topic", num_procs=2)
for message in consumer:
print(message)
for message in consumer.get_messages(count=5, block=True, timeout=4):
print(message)
client.close()
The Exception:
Process Process-14:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "xxx/.envs/xxx/lib/python3.6/site-packages/kafka/consumer/multiprocess.py", line 71, in _mp_consume
message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
File "<string>", line 2, in put
File "/usr/lib/python3.6/multiprocessing/managers.py", line 756, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle weakref objects
python version: 3.6.7
Kafka-python version: 1.4.6
What's wrong with it ?
Metadata
Metadata
Assignees
Labels
No labels