Skip to content

Commit 92376cb

Browse files
authored
Refactor dns lookup in BrokerConnection (#1312)
1 parent 2c8748c commit 92376cb

File tree

2 files changed

+74
-61
lines changed

2 files changed

+74
-61
lines changed

kafka/conn.py

Lines changed: 49 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -251,67 +251,42 @@ def __init__(self, host, port, afi, **configs):
251251
self._sasl_auth_future = None
252252
self.last_attempt = 0
253253
self._gai = None
254-
self._gai_index = 0
255254
self._sensors = None
256255
if self.config['metrics']:
257256
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
258257
self.config['metric_group_prefix'],
259258
self.node_id)
260259

260+
def _next_afi_host_port(self):
261+
if not self._gai:
262+
self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
263+
if not self._gai:
264+
log.error('DNS lookup failed for %s:%i (%s)',
265+
self._init_host, self._init_port, self._init_afi)
266+
return
267+
268+
afi, _, __, ___, sockaddr = self._gai.pop(0)
269+
host, port = sockaddr[:2]
270+
return (afi, host, port)
271+
261272
def connect(self):
262273
"""Attempt to connect and return ConnectionState"""
263274
if self.state is ConnectionStates.DISCONNECTED:
264-
log.debug('%s: creating new socket', self)
265-
# if self.afi is set to AF_UNSPEC, then we need to do a name
266-
# resolution and try all available address families
267-
if self._init_afi == socket.AF_UNSPEC:
268-
if self._gai is None:
269-
# XXX: all DNS functions in Python are blocking. If we really
270-
# want to be non-blocking here, we need to use a 3rd-party
271-
# library like python-adns, or move resolution onto its
272-
# own thread. This will be subject to the default libc
273-
# name resolution timeout (5s on most Linux boxes)
274-
try:
275-
self._gai = socket.getaddrinfo(self._init_host,
276-
self._init_port,
277-
socket.AF_UNSPEC,
278-
socket.SOCK_STREAM)
279-
except socket.gaierror as ex:
280-
log.warning('DNS lookup failed for %s:%d,'
281-
' exception was %s. Is your'
282-
' advertised.listeners (called'
283-
' advertised.host.name before Kafka 9)'
284-
' correct and resolvable?',
285-
self._init_host, self._init_port, ex)
286-
self._gai = []
287-
self._gai_index = 0
288-
else:
289-
# if self._gai already exists, then we should try the next
290-
# name
291-
self._gai_index += 1
292-
while True:
293-
if self._gai_index >= len(self._gai):
294-
error = 'Unable to connect to any of the names for {0}:{1}'.format(
295-
self._init_host, self._init_port)
296-
log.error(error)
297-
self.close(Errors.ConnectionError(error))
298-
return
299-
afi, _, __, ___, sockaddr = self._gai[self._gai_index]
300-
if afi not in (socket.AF_INET, socket.AF_INET6):
301-
self._gai_index += 1
302-
continue
303-
break
304-
self.host, self.port = sockaddr[:2]
305-
self._sock = socket.socket(afi, socket.SOCK_STREAM)
275+
self.last_attempt = time.time()
276+
next_lookup = self._next_afi_host_port()
277+
if not next_lookup:
278+
self.close(Errors.ConnectionError('DNS failure'))
279+
return
306280
else:
307-
self._sock = socket.socket(self._init_afi, socket.SOCK_STREAM)
281+
log.debug('%s: creating new socket', self)
282+
self.afi, self.host, self.port = next_lookup
283+
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
308284

309285
for option in self.config['socket_options']:
310286
log.debug('%s: setting socket option %s', self, option)
311287
self._sock.setsockopt(*option)
312288

313289
self._sock.setblocking(False)
314-
self.last_attempt = time.time()
315290
self.state = ConnectionStates.CONNECTING
316291
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
317292
self._wrap_ssl()
@@ -328,11 +303,6 @@ def connect(self):
328303
ret = None
329304
try:
330305
ret = self._sock.connect_ex((self.host, self.port))
331-
# if we got here through a host lookup, we've found a host,port,af tuple
332-
# that works save it so we don't do a GAI lookup again
333-
if self._gai is not None:
334-
self.afi = self._sock.family
335-
self._gai = None
336306
except socket.error as err:
337307
ret = err.errno
338308

@@ -645,23 +615,15 @@ def close(self, error=None):
645615
will be failed with this exception.
646616
Default: kafka.errors.ConnectionError.
647617
"""
648-
if self.state is ConnectionStates.DISCONNECTED:
649-
if error is not None:
650-
if sys.version_info >= (3, 2):
651-
log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True)
652-
else:
653-
log.warning('%s: close() called on disconnected connection with error: %s', self, error)
654-
return
655-
656618
log.info('%s: Closing connection. %s', self, error or '')
657-
self.state = ConnectionStates.DISCONNECTING
658-
self.config['state_change_callback'](self)
619+
if self.state is not ConnectionStates.DISCONNECTED:
620+
self.state = ConnectionStates.DISCONNECTING
621+
self.config['state_change_callback'](self)
659622
self._update_reconnect_backoff()
660623
if self._sock:
661624
self._sock.close()
662625
self._sock = None
663626
self.state = ConnectionStates.DISCONNECTED
664-
self.last_attempt = time.time()
665627
self._sasl_auth_future = None
666628
self._protocol = KafkaProtocol(
667629
client_id=self.config['client_id'],
@@ -1170,3 +1132,29 @@ def collect_hosts(hosts, randomize=True):
11701132
shuffle(result)
11711133

11721134
return result
1135+
1136+
1137+
def is_inet_4_or_6(gai):
1138+
"""Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
1139+
return gai[0] in (socket.AF_INET, socket.AF_INET6)
1140+
1141+
1142+
def dns_lookup(host, port, afi=socket.AF_UNSPEC):
1143+
"""Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)"""
1144+
# XXX: all DNS functions in Python are blocking. If we really
1145+
# want to be non-blocking here, we need to use a 3rd-party
1146+
# library like python-adns, or move resolution onto its
1147+
# own thread. This will be subject to the default libc
1148+
# name resolution timeout (5s on most Linux boxes)
1149+
try:
1150+
return list(filter(is_inet_4_or_6,
1151+
socket.getaddrinfo(host, port, afi,
1152+
socket.SOCK_STREAM)))
1153+
except socket.gaierror as ex:
1154+
log.warning('DNS lookup failed for %s:%d,'
1155+
' exception was %s. Is your'
1156+
' advertised.listeners (called'
1157+
' advertised.host.name before Kafka 9)'
1158+
' correct and resolvable?',
1159+
host, port, ex)
1160+
return []

test/test_conn.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,3 +267,28 @@ def test_lookup_on_connect():
267267
m.assert_called_once_with(hostname, port, 0, 1)
268268
conn.close()
269269
assert conn.host == ip2
270+
271+
272+
def test_relookup_on_failure():
273+
hostname = 'example.org'
274+
port = 9092
275+
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
276+
assert conn.host == conn.hostname == hostname
277+
mock_return1 = []
278+
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
279+
last_attempt = conn.last_attempt
280+
conn.connect()
281+
m.assert_called_once_with(hostname, port, 0, 1)
282+
assert conn.disconnected()
283+
assert conn.last_attempt > last_attempt
284+
285+
ip2 = '127.0.0.2'
286+
mock_return2 = [
287+
(2, 2, 17, '', (ip2, 9092)),
288+
]
289+
290+
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
291+
conn.connect()
292+
m.assert_called_once_with(hostname, port, 0, 1)
293+
conn.close()
294+
assert conn.host == ip2

0 commit comments

Comments
 (0)