Skip to content

Commit 0a74924

Browse files
authored
Improve KafkaConsumer cleanup (#1339)
1 parent 794b695 commit 0a74924

File tree

5 files changed

+62
-22
lines changed

5 files changed

+62
-22
lines changed

kafka/client_async.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
import random
88
import threading
9+
import weakref
910

1011
# selectors in stdlib as of py3.4
1112
try:
@@ -27,6 +28,7 @@
2728
from .metrics.stats import Avg, Count, Rate
2829
from .metrics.stats.rate import TimeUnit
2930
from .protocol.metadata import MetadataRequest
31+
from .util import Dict, WeakMethod
3032
# Although this looks unused, it actually monkey-patches socket.socketpair()
3133
# and should be left in as long as we're using socket.socketpair() in this file
3234
from .vendor import socketpair
@@ -197,7 +199,7 @@ def __init__(self, **configs):
197199
self._topics = set() # empty set will fetch all topic metadata
198200
self._metadata_refresh_in_progress = False
199201
self._selector = self.config['selector']()
200-
self._conns = {}
202+
self._conns = Dict() # object to support weakrefs
201203
self._connecting = set()
202204
self._refresh_on_disconnects = True
203205
self._last_bootstrap = 0
@@ -220,7 +222,7 @@ def __init__(self, **configs):
220222
if self.config['metrics']:
221223
self._sensors = KafkaClientMetrics(self.config['metrics'],
222224
self.config['metric_group_prefix'],
223-
self._conns)
225+
weakref.proxy(self._conns))
224226

225227
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
226228

@@ -248,7 +250,7 @@ def _bootstrap(self, hosts):
248250

249251
for host, port, afi in hosts:
250252
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
251-
cb = functools.partial(self._conn_state_change, 'bootstrap')
253+
cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap')
252254
bootstrap = BrokerConnection(host, port, afi,
253255
state_change_callback=cb,
254256
node_id='bootstrap',
@@ -357,7 +359,7 @@ def _maybe_connect(self, node_id):
357359
log.debug("Initiating connection to node %s at %s:%s",
358360
node_id, broker.host, broker.port)
359361
host, port, afi = get_ip_port_afi(broker.host)
360-
cb = functools.partial(self._conn_state_change, node_id)
362+
cb = functools.partial(WeakMethod(self._conn_state_change), node_id)
361363
conn = BrokerConnection(host, broker.port, afi,
362364
state_change_callback=cb,
363365
node_id=node_id,
@@ -404,6 +406,13 @@ def connected(self, node_id):
404406
return False
405407
return self._conns[node_id].connected()
406408

409+
def _close(self):
410+
if not self._closed:
411+
self._closed = True
412+
self._wake_r.close()
413+
self._wake_w.close()
414+
self._selector.close()
415+
407416
def close(self, node_id=None):
408417
"""Close one or all broker connections.
409418
@@ -412,18 +421,18 @@ def close(self, node_id=None):
412421
"""
413422
with self._lock:
414423
if node_id is None:
415-
self._closed = True
424+
self._close()
416425
for conn in self._conns.values():
417426
conn.close()
418-
self._wake_r.close()
419-
self._wake_w.close()
420-
self._selector.close()
421427
elif node_id in self._conns:
422428
self._conns[node_id].close()
423429
else:
424430
log.warning("Node %s not found in current connection list; skipping", node_id)
425431
return
426432

433+
def __del__(self):
434+
self._close()
435+
427436
def is_disconnected(self, node_id):
428437
"""Check whether the node connection has been disconnected or failed.
429438

kafka/conn.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,14 @@ def _update_reconnect_backoff(self):
628628
self._reconnect_backoff /= 1000.0
629629
log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
630630

631+
def _close_socket(self):
632+
if self._sock:
633+
self._sock.close()
634+
self._sock = None
635+
636+
def __del__(self):
637+
self._close_socket()
638+
631639
def close(self, error=None):
632640
"""Close socket and fail all in-flight-requests.
633641
@@ -641,9 +649,7 @@ def close(self, error=None):
641649
self.state = ConnectionStates.DISCONNECTING
642650
self.config['state_change_callback'](self)
643651
self._update_reconnect_backoff()
644-
if self._sock:
645-
self._sock.close()
646-
self._sock = None
652+
self._close_socket()
647653
self.state = ConnectionStates.DISCONNECTED
648654
self._sasl_auth_future = None
649655
self._protocol = KafkaProtocol(

kafka/coordinator/base.py

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -356,10 +356,7 @@ def ensure_active_group(self):
356356
self.rejoining = True
357357

358358
if self._heartbeat_thread is None:
359-
log.debug('Starting new heartbeat thread')
360-
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
361-
self._heartbeat_thread.daemon = True
362-
self._heartbeat_thread.start()
359+
self._start_heartbeat_thread()
363360

364361
while self.need_rejoin():
365362
self.ensure_coordinator_ready()
@@ -712,13 +709,30 @@ def reset_generation(self):
712709
def request_rejoin(self):
713710
self.rejoin_needed = True
714711

712+
def _start_heartbeat_thread(self):
713+
if self._heartbeat_thread is None:
714+
log.info('Starting new heartbeat thread')
715+
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
716+
self._heartbeat_thread.daemon = True
717+
self._heartbeat_thread.start()
718+
719+
def _close_heartbeat_thread(self):
720+
if self._heartbeat_thread is not None:
721+
log.info('Stopping heartbeat thread')
722+
try:
723+
self._heartbeat_thread.close()
724+
except ReferenceError:
725+
pass
726+
self._heartbeat_thread = None
727+
728+
def __del__(self):
729+
self._close_heartbeat_thread()
730+
715731
def close(self):
716732
"""Close the coordinator, leave the current group,
717733
and reset local generation / member_id"""
718734
with self._lock:
719-
if self._heartbeat_thread is not None:
720-
self._heartbeat_thread.close()
721-
self._heartbeat_thread = None
735+
self._close_heartbeat_thread()
722736
self.maybe_leave_group()
723737

724738
def maybe_leave_group(self):
@@ -877,20 +891,22 @@ def enable(self):
877891
self.coordinator._lock.notify()
878892

879893
def disable(self):
880-
with self.coordinator._lock:
881-
self.enabled = False
894+
self.enabled = False
882895

883896
def close(self):
897+
self.closed = True
884898
with self.coordinator._lock:
885-
self.closed = True
886899
self.coordinator._lock.notify()
887900

888901
def run(self):
889902
try:
890903
while not self.closed:
891904
self._run_once()
892905

893-
log.debug('Heartbeat closed!')
906+
log.debug('Heartbeat thread closed')
907+
908+
except ReferenceError:
909+
log.debug('Heartbeat thread closed due to coordinator gc')
894910

895911
except RuntimeError as e:
896912
log.error("Heartbeat thread for group %s failed due to unexpected error: %s",

kafka/coordinator/consumer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def __init__(self, client, subscription, metrics, **configs):
125125
def __del__(self):
126126
if hasattr(self, '_cluster') and self._cluster:
127127
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
128+
super(ConsumerCoordinator, self).__del__()
128129

129130
def protocol_type(self):
130131
return ConsumerProtocol.PROTOCOL_TYPE

kafka/util.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ def __eq__(self, other):
167167
return self._target_id == other._target_id and self._method_id == other._method_id
168168

169169

170+
class Dict(dict):
171+
"""Utility class to support passing weakrefs to dicts
172+
173+
See: https://docs.python.org/2/library/weakref.html
174+
"""
175+
pass
176+
177+
170178
def try_method_on_system_exit(obj, method, *args, **kwargs):
171179
def wrapper(_obj, _meth, *args, **kwargs):
172180
try:

0 commit comments

Comments
 (0)