Skip to content

Commit e8cdb7a

Browse files
author
Dana Powers
committed
Make BrokerConnection .host / .port / .afi immutable, use _sock_* attributes for current lookups
1 parent b6d5ada commit e8cdb7a

File tree

2 files changed

+36
-24
lines changed

2 files changed

+36
-24
lines changed

kafka/conn.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,12 @@ class BrokerConnection(object):
212212
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
213213

214214
def __init__(self, host, port, afi, **configs):
215-
self.hostname = host
216215
self.host = host
217216
self.port = port
218217
self.afi = afi
219-
self._init_host = host
220-
self._init_port = port
221-
self._init_afi = afi
218+
self._sock_ip = host
219+
self._sock_port = port
220+
self._sock_afi = afi
222221
self.in_flight_requests = collections.deque()
223222
self._api_versions = None
224223

@@ -273,10 +272,10 @@ def __init__(self, host, port, afi, **configs):
273272
self.node_id)
274273

275274
def _dns_lookup(self):
276-
self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
275+
self._gai = dns_lookup(self.host, self.port, self.afi)
277276
if not self._gai:
278277
log.error('DNS lookup failed for %s:%i (%s)',
279-
self._init_host, self._init_port, self._init_afi)
278+
self.host, self.port, self.afi)
280279
return False
281280
return True
282281

@@ -334,8 +333,8 @@ def connect(self):
334333
return
335334
else:
336335
log.debug('%s: creating new socket', self)
337-
self.afi, self.host, self.port = next_lookup
338-
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
336+
self._sock_afi, self._sock_ip, self._sock_port = next_lookup
337+
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
339338

340339
for option in self.config['socket_options']:
341340
log.debug('%s: setting socket option %s', self, option)
@@ -349,15 +348,17 @@ def connect(self):
349348
# so we need to double check that we are still connecting before
350349
if self.connecting():
351350
self.config['state_change_callback'](self)
352-
log.info('%s: connecting to %s:%d', self, self.host, self.port)
351+
log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
352+
self.port, self._sock_ip, self._sock_port,
353+
AFI_NAMES[self._sock_afi])
353354

354355
if self.state is ConnectionStates.CONNECTING:
355356
# in non-blocking mode, use repeated calls to socket.connect_ex
356357
# to check connection status
357358
request_timeout = self.config['request_timeout_ms'] / 1000.0
358359
ret = None
359360
try:
360-
ret = self._sock.connect_ex((self.host, self.port))
361+
ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
361362
except socket.error as err:
362363
ret = err.errno
363364

@@ -449,7 +450,7 @@ def _wrap_ssl(self):
449450
try:
450451
self._sock = self._ssl_context.wrap_socket(
451452
self._sock,
452-
server_hostname=self.hostname,
453+
server_hostname=self.host,
453454
do_handshake_on_connect=False)
454455
except ssl.SSLError as e:
455456
log.exception('%s: Failed to wrap socket in SSLContext!', self)
@@ -573,7 +574,7 @@ def _try_authenticate_plain(self, future):
573574
return future.success(True)
574575

575576
def _try_authenticate_gssapi(self, future):
576-
auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
577+
auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.host
577578
gssapi_name = gssapi.Name(
578579
auth_id,
579580
name_type=gssapi.NameType.hostbased_service
@@ -1002,9 +1003,9 @@ def filter(self, record):
10021003
return version
10031004

10041005
def __str__(self):
1005-
return "<BrokerConnection node_id=%s host=%s/%s port=%d afi=%s>" % (
1006-
self.node_id, self.hostname, self.host, self.port,
1007-
AFI_NAMES[self.afi])
1006+
return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
1007+
self.node_id, self.host, self.port, self.state,
1008+
self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])
10081009

10091010

10101011
class BrokerConnectionMetrics(object):

test/test_conn.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -246,35 +246,43 @@ def test_lookup_on_connect():
246246
hostname = 'example.org'
247247
port = 9092
248248
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
249-
assert conn.host == conn.hostname == hostname
249+
assert conn.host == hostname
250+
assert conn.port == port
251+
assert conn.afi == socket.AF_UNSPEC
250252
ip1 = '127.0.0.1'
253+
afi1 = socket.AF_INET
251254
mock_return1 = [
252-
(2, 2, 17, '', (ip1, 9092)),
255+
(afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)),
253256
]
254257
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
255258
conn.connect()
256259
m.assert_called_once_with(hostname, port, 0, 1)
257260
conn.close()
258-
assert conn.host == ip1
261+
assert conn._sock_ip == ip1
262+
assert conn._sock_port == 9092
263+
assert conn._sock_afi == afi1
259264

260-
ip2 = '127.0.0.2'
265+
ip2 = '::1'
266+
afi2 = socket.AF_INET6
261267
mock_return2 = [
262-
(2, 2, 17, '', (ip2, 9092)),
268+
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
263269
]
264270

265271
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
266272
conn.last_attempt = 0
267273
conn.connect()
268274
m.assert_called_once_with(hostname, port, 0, 1)
269275
conn.close()
270-
assert conn.host == ip2
276+
assert conn._sock_ip == ip2
277+
assert conn._sock_port == 9092
278+
assert conn._sock_afi == afi2
271279

272280

273281
def test_relookup_on_failure():
274282
hostname = 'example.org'
275283
port = 9092
276284
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
277-
assert conn.host == conn.hostname == hostname
285+
assert conn.host == hostname
278286
mock_return1 = []
279287
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
280288
last_attempt = conn.last_attempt
@@ -284,13 +292,16 @@ def test_relookup_on_failure():
284292
assert conn.last_attempt > last_attempt
285293

286294
ip2 = '127.0.0.2'
295+
afi2 = socket.AF_INET
287296
mock_return2 = [
288-
(2, 2, 17, '', (ip2, 9092)),
297+
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
289298
]
290299

291300
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
292301
conn.last_attempt = 0
293302
conn.connect()
294303
m.assert_called_once_with(hostname, port, 0, 1)
295304
conn.close()
296-
assert conn.host == ip2
305+
assert conn._sock_ip == ip2
306+
assert conn._sock_port == 9092
307+
assert conn._sock_afi == afi2

0 commit comments

Comments
 (0)