Skip to content

Make BrokerConnection .host / .port / .afi immutable #1422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ class SSLWantWriteError(Exception):
gssapi = None
GSSError = None


AFI_NAMES = {
socket.AF_UNSPEC: "unspecified",
socket.AF_INET: "IPv4",
socket.AF_INET6: "IPv6",
}


class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
Expand Down Expand Up @@ -204,13 +212,12 @@ class BrokerConnection(object):
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')

def __init__(self, host, port, afi, **configs):
self.hostname = host
self.host = host
self.port = port
self.afi = afi
self._init_host = host
self._init_port = port
self._init_afi = afi
self._sock_ip = host
self._sock_port = port
self._sock_afi = afi
self.in_flight_requests = collections.deque()
self._api_versions = None

Expand Down Expand Up @@ -266,10 +273,10 @@ def __init__(self, host, port, afi, **configs):

def _next_afi_host_port(self):
if not self._gai:
self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
self._gai = dns_lookup(self.host, self.port, self.afi)
if not self._gai:
log.error('DNS lookup failed for %s:%i (%s)',
self._init_host, self._init_port, self._init_afi)
self.host, self.port, self.afi)
return

afi, _, __, ___, sockaddr = self._gai.pop(0)
Expand All @@ -286,8 +293,8 @@ def connect(self):
return
else:
log.debug('%s: creating new socket', self)
self.afi, self.host, self.port = next_lookup
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
self._sock_afi, self._sock_ip, self._sock_port = next_lookup
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)

for option in self.config['socket_options']:
log.debug('%s: setting socket option %s', self, option)
Expand All @@ -301,15 +308,17 @@ def connect(self):
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
log.info('%s: connecting to %s:%d', self, self.host, self.port)
log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
self.port, self._sock_ip, self._sock_port,
AFI_NAMES[self._sock_afi])

if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
request_timeout = self.config['request_timeout_ms'] / 1000.0
ret = None
try:
ret = self._sock.connect_ex((self.host, self.port))
ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
except socket.error as err:
ret = err.errno

Expand Down Expand Up @@ -400,7 +409,7 @@ def _wrap_ssl(self):
try:
self._sock = self._ssl_context.wrap_socket(
self._sock,
server_hostname=self.hostname,
server_hostname=self.host,
do_handshake_on_connect=False)
except ssl.SSLError as e:
log.exception('%s: Failed to wrap socket in SSLContext!', self)
Expand Down Expand Up @@ -524,7 +533,7 @@ def _try_authenticate_plain(self, future):
return future.success(True)

def _try_authenticate_gssapi(self, future):
auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.host
gssapi_name = gssapi.Name(
auth_id,
name_type=gssapi.NameType.hostbased_service
Expand Down Expand Up @@ -962,9 +971,10 @@ def connect():
self.config[key] = stashed[key]
return version

def __repr__(self):
return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
self.node_id, self.hostname, self.host, self.port)
def __str__(self):
return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
self.node_id, self.host, self.port, self.state,
self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])


class BrokerConnectionMetrics(object):
Expand Down
29 changes: 20 additions & 9 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,35 +255,43 @@ def test_lookup_on_connect():
hostname = 'example.org'
port = 9092
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
assert conn.host == conn.hostname == hostname
assert conn.host == hostname
assert conn.port == port
assert conn.afi == socket.AF_UNSPEC
ip1 = '127.0.0.1'
afi1 = socket.AF_INET
mock_return1 = [
(2, 2, 17, '', (ip1, 9092)),
(afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)),
]
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn.host == ip1
assert conn._sock_ip == ip1
assert conn._sock_port == 9092
assert conn._sock_afi == afi1

ip2 = '127.0.0.2'
ip2 = '::1'
afi2 = socket.AF_INET6
mock_return2 = [
(2, 2, 17, '', (ip2, 9092)),
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn.host == ip2
assert conn._sock_ip == ip2
assert conn._sock_port == 9092
assert conn._sock_afi == afi2


def test_relookup_on_failure():
hostname = 'example.org'
port = 9092
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
assert conn.host == conn.hostname == hostname
assert conn.host == hostname
mock_return1 = []
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
last_attempt = conn.last_attempt
Expand All @@ -293,13 +301,16 @@ def test_relookup_on_failure():
assert conn.last_attempt > last_attempt

ip2 = '127.0.0.2'
afi2 = socket.AF_INET
mock_return2 = [
(2, 2, 17, '', (ip2, 9092)),
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn.host == ip2
assert conn._sock_ip == ip2
assert conn._sock_port == 9092
assert conn._sock_afi == afi2