Closed
Description
Kafka 0.10.0.1
Kafka-Python 1.3.3
I create a consumer like:
consumer = KafkaConsumer(
group_id="the-group",
bootstrap_servers=[...],
consumer_timeout_ms=30000,
)
partitions = [
TopicPartition("the-topic", partition_num)
for partition_num in consumer.partitions_for_topic(settings.kafka.topic)
if partition_num % 2 == 0
]
consumer.assign(partitions)
And then consume from it using for message in consumer:
in the usual way.
Some time later (a few seconds after beginning consuming in the for loop), I get:
Traceback (most recent call last):
File "/usr/local/opt/pypy/site-packages/kafka/future.py", line 79, in _call_backs
f(value)
File "/usr/local/opt/pypy/lib_pypy/_functools.py", line 45, in __call__
return self._func(*(self._args + fargs), **fkeywords)
File "/usr/local/opt/pypy/site-packages/kafka/coordinator/consumer.py", line 547, in _handle_offset_commit_response
self._subscription.mark_for_reassignment()
File "/usr/local/opt/pypy/site-packages/kafka/consumer/subscription_state.py", line 172, in mark_for_reassignment
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.
I don't call subscribe()
anywhere, nor assign()
anywhere else or again. This looks like it's coming from _handle_offset_commit_response
asynchronously, but I don't know why it would be.
Mostly I want to understand if I am supposed to do anything with this error, and, given that it's happening asynchronously, if I even can do anything with it (will it bubble up in a try:
block anywhere? I think not, if it's on another thread, for instance)
Metadata
Metadata
Assignees
Labels
No labels