Skip to content

Make seek(); commit(); work without commit discarding the seek change #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 3, 2014

Conversation

wizzat
Copy link
Collaborator

@wizzat wizzat commented Mar 25, 2014

Once upon a time I was moving between Kafka 0.8.0 and Kafka 0.8.1 rather freely. As such, I forked kafka-python. In particular, I've enabled offset management via the Kafka API, added a bunch of performance tweaks, and in general made it work. The results are visible here: https://github.com/wizzat/kafka-python/tree/easykafka.

However, during a load test we ran Kafka out of disk space. In response to this, we set the TTL on the cluster to about an hour, which deleted all the data out of the cluster. The offsets the application had stored in Zookeeper (via the Kafka server) were now wrong - and worse, didn't exist. This caused the application to raise lots of OFFSET_OUT_OF_RANGE errors.

In response to these OFFSET_OUT_OF_RANGE errors, I devised a brilliant work around. At the start of the application, I would advance the topic offsets something like this:

def seek_tail(self):
    logging.info('Seeking to TAIL of topic %s', self.topic)
    logging.info('Previous offsets:')
    self.log_offsets()

    self.consumer.seek(0, 2)
    self.consumer.commit()

    logging.info('New offsets')
    self.log_offsets()

However, as it turns out, however, no data actually went through the application - and thus the commit was lost due to the following line:

    # short circuit if nothing happened. This check is kept outside
    # to prevent un-necessarily acquiring a lock for checking the state
    if self.count_since_commit == 0:
        return

I thought about extracting commit() into two methods: commit() which delegated to force_commit(), however it seemed a smaller change to make seek() count as a count_since_commit. Thus, I propose a patch.

Let me know if you'd prefer to go with commit() and force_commit(), or perhaps commit(partitions=None, force=False).

@dpkp
Copy link
Owner

dpkp commented Mar 25, 2014

only side-effect i can think of is that it could change the auto_commit counting. but I think that's fine. i say merge. will let others comment.

@rdiomar
Copy link
Collaborator

rdiomar commented Mar 26, 2014

count_since_commit could be considered the number of times the consumer's offset changed since the last commit rather than the number of messages consumed, so I'm okay with this, but I feel like if auto_commit was True, some might expect seeking to do an immediate commit. Does anyone have a strong opinion about this?

@wizzat
Copy link
Collaborator Author

wizzat commented Mar 26, 2014

That makes a lot of sense and I'm tentatively +1 to it. I will implement this sometime today unless someone objects.

@wizzat
Copy link
Collaborator Author

wizzat commented Mar 27, 2014

Ok. I've updated the PR to commit when auto_commit.

rdiomar added a commit that referenced this pull request Apr 3, 2014
Make seek(); commit(); work without commit discarding the seek change
@rdiomar rdiomar merged commit 13d0d44 into dpkp:master Apr 3, 2014
@wizzat wizzat deleted the seek_commit branch April 4, 2014 23:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants