Skip to content

Improve KafkaConsumer cleanup #1339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import random
import threading
import weakref

# selectors in stdlib as of py3.4
try:
Expand All @@ -27,6 +28,7 @@
from .metrics.stats import Avg, Count, Rate
from .metrics.stats.rate import TimeUnit
from .protocol.metadata import MetadataRequest
from .util import Dict, WeakMethod
# Although this looks unused, it actually monkey-patches socket.socketpair()
# and should be left in as long as we're using socket.socketpair() in this file
from .vendor import socketpair
Expand Down Expand Up @@ -197,7 +199,7 @@ def __init__(self, **configs):
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
self._conns = {}
self._conns = Dict() # object to support weakrefs
self._connecting = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
Expand All @@ -220,7 +222,7 @@ def __init__(self, **configs):
if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'],
self.config['metric_group_prefix'],
self._conns)
weakref.proxy(self._conns))

self._bootstrap(collect_hosts(self.config['bootstrap_servers']))

Expand Down Expand Up @@ -248,7 +250,7 @@ def _bootstrap(self, hosts):

for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
cb = functools.partial(self._conn_state_change, 'bootstrap')
cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap')
bootstrap = BrokerConnection(host, port, afi,
state_change_callback=cb,
node_id='bootstrap',
Expand Down Expand Up @@ -357,7 +359,7 @@ def _maybe_connect(self, node_id):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
host, port, afi = get_ip_port_afi(broker.host)
cb = functools.partial(self._conn_state_change, node_id)
cb = functools.partial(WeakMethod(self._conn_state_change), node_id)
conn = BrokerConnection(host, broker.port, afi,
state_change_callback=cb,
node_id=node_id,
Expand Down Expand Up @@ -404,6 +406,13 @@ def connected(self, node_id):
return False
return self._conns[node_id].connected()

def _close(self):
if not self._closed:
self._closed = True
self._wake_r.close()
self._wake_w.close()
self._selector.close()

def close(self, node_id=None):
"""Close one or all broker connections.

Expand All @@ -412,18 +421,18 @@ def close(self, node_id=None):
"""
with self._lock:
if node_id is None:
self._closed = True
self._close()
for conn in self._conns.values():
conn.close()
self._wake_r.close()
self._wake_w.close()
self._selector.close()
elif node_id in self._conns:
self._conns[node_id].close()
else:
log.warning("Node %s not found in current connection list; skipping", node_id)
return

def __del__(self):
self._close()

def is_disconnected(self, node_id):
"""Check whether the node connection has been disconnected or failed.

Expand Down
12 changes: 9 additions & 3 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,14 @@ def _update_reconnect_backoff(self):
self._reconnect_backoff /= 1000.0
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)

def _close_socket(self):
if self._sock:
self._sock.close()
self._sock = None

def __del__(self):
self._close_socket()

def close(self, error=None):
"""Close socket and fail all in-flight-requests.

Expand All @@ -641,9 +649,7 @@ def close(self, error=None):
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
self._update_reconnect_backoff()
if self._sock:
self._sock.close()
self._sock = None
self._close_socket()
self.state = ConnectionStates.DISCONNECTED
self._sasl_auth_future = None
self._protocol = KafkaProtocol(
Expand Down
38 changes: 27 additions & 11 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,7 @@ def ensure_active_group(self):
self.rejoining = True

if self._heartbeat_thread is None:
log.debug('Starting new heartbeat thread')
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
self._heartbeat_thread.daemon = True
self._heartbeat_thread.start()
self._start_heartbeat_thread()

while self.need_rejoin():
self.ensure_coordinator_ready()
Expand Down Expand Up @@ -712,13 +709,30 @@ def reset_generation(self):
def request_rejoin(self):
self.rejoin_needed = True

def _start_heartbeat_thread(self):
if self._heartbeat_thread is None:
log.info('Starting new heartbeat thread')
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
self._heartbeat_thread.daemon = True
self._heartbeat_thread.start()

def _close_heartbeat_thread(self):
if self._heartbeat_thread is not None:
log.info('Stopping heartbeat thread')
try:
self._heartbeat_thread.close()
except ReferenceError:
pass
self._heartbeat_thread = None

def __del__(self):
self._close_heartbeat_thread()

def close(self):
"""Close the coordinator, leave the current group,
and reset local generation / member_id"""
with self._lock:
if self._heartbeat_thread is not None:
self._heartbeat_thread.close()
self._heartbeat_thread = None
self._close_heartbeat_thread()
self.maybe_leave_group()

def maybe_leave_group(self):
Expand Down Expand Up @@ -877,20 +891,22 @@ def enable(self):
self.coordinator._lock.notify()

def disable(self):
with self.coordinator._lock:
self.enabled = False
self.enabled = False

def close(self):
self.closed = True
with self.coordinator._lock:
self.closed = True
self.coordinator._lock.notify()

def run(self):
try:
while not self.closed:
self._run_once()

log.debug('Heartbeat closed!')
log.debug('Heartbeat thread closed')

except ReferenceError:
log.debug('Heartbeat thread closed due to coordinator gc')

except RuntimeError as e:
log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
Expand Down
1 change: 1 addition & 0 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def __init__(self, client, subscription, metrics, **configs):
def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
super(ConsumerCoordinator, self).__del__()

def protocol_type(self):
return ConsumerProtocol.PROTOCOL_TYPE
Expand Down
8 changes: 8 additions & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ def __eq__(self, other):
return self._target_id == other._target_id and self._method_id == other._method_id


class Dict(dict):
"""Utility class to support passing weakrefs to dicts

See: https://docs.python.org/2/library/weakref.html
"""
pass


def try_method_on_system_exit(obj, method, *args, **kwargs):
def wrapper(_obj, _meth, *args, **kwargs):
try:
Expand Down