Skip to content

Refactor dns lookup in BrokerConnection #1312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 49 additions & 61 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,67 +251,42 @@ def __init__(self, host, port, afi, **configs):
self._sasl_auth_future = None
self.last_attempt = 0
self._gai = None
self._gai_index = 0
self._sensors = None
if self.config['metrics']:
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
self.config['metric_group_prefix'],
self.node_id)

def _next_afi_host_port(self):
if not self._gai:
self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
if not self._gai:
log.error('DNS lookup failed for %s:%i (%s)',
self._init_host, self._init_port, self._init_afi)
return

afi, _, __, ___, sockaddr = self._gai.pop(0)
host, port = sockaddr[:2]
return (afi, host, port)

def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED:
log.debug('%s: creating new socket', self)
# if self.afi is set to AF_UNSPEC, then we need to do a name
# resolution and try all available address families
if self._init_afi == socket.AF_UNSPEC:
if self._gai is None:
# XXX: all DNS functions in Python are blocking. If we really
# want to be non-blocking here, we need to use a 3rd-party
# library like python-adns, or move resolution onto its
# own thread. This will be subject to the default libc
# name resolution timeout (5s on most Linux boxes)
try:
self._gai = socket.getaddrinfo(self._init_host,
self._init_port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)
except socket.gaierror as ex:
log.warning('DNS lookup failed for %s:%d,'
' exception was %s. Is your'
' advertised.listeners (called'
' advertised.host.name before Kafka 9)'
' correct and resolvable?',
self._init_host, self._init_port, ex)
self._gai = []
self._gai_index = 0
else:
# if self._gai already exists, then we should try the next
# name
self._gai_index += 1
while True:
if self._gai_index >= len(self._gai):
error = 'Unable to connect to any of the names for {0}:{1}'.format(
self._init_host, self._init_port)
log.error(error)
self.close(Errors.ConnectionError(error))
return
afi, _, __, ___, sockaddr = self._gai[self._gai_index]
if afi not in (socket.AF_INET, socket.AF_INET6):
self._gai_index += 1
continue
break
self.host, self.port = sockaddr[:2]
self._sock = socket.socket(afi, socket.SOCK_STREAM)
self.last_attempt = time.time()
next_lookup = self._next_afi_host_port()
if not next_lookup:
self.close(Errors.ConnectionError('DNS failure'))
return
else:
self._sock = socket.socket(self._init_afi, socket.SOCK_STREAM)
log.debug('%s: creating new socket', self)
self.afi, self.host, self.port = next_lookup
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)

for option in self.config['socket_options']:
log.debug('%s: setting socket option %s', self, option)
self._sock.setsockopt(*option)

self._sock.setblocking(False)
self.last_attempt = time.time()
self.state = ConnectionStates.CONNECTING
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
Expand All @@ -328,11 +303,6 @@ def connect(self):
ret = None
try:
ret = self._sock.connect_ex((self.host, self.port))
# if we got here through a host lookup, we've found a host,port,af tuple
# that works save it so we don't do a GAI lookup again
if self._gai is not None:
self.afi = self._sock.family
self._gai = None
except socket.error as err:
ret = err.errno

Expand Down Expand Up @@ -645,23 +615,15 @@ def close(self, error=None):
will be failed with this exception.
Default: kafka.errors.ConnectionError.
"""
if self.state is ConnectionStates.DISCONNECTED:
if error is not None:
if sys.version_info >= (3, 2):
log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True)
else:
log.warning('%s: close() called on disconnected connection with error: %s', self, error)
return

log.info('%s: Closing connection. %s', self, error or '')
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
if self.state is not ConnectionStates.DISCONNECTED:
self.state = ConnectionStates.DISCONNECTING
self.config['state_change_callback'](self)
self._update_reconnect_backoff()
if self._sock:
self._sock.close()
self._sock = None
self.state = ConnectionStates.DISCONNECTED
self.last_attempt = time.time()
self._sasl_auth_future = None
self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
Expand Down Expand Up @@ -1171,3 +1133,29 @@ def collect_hosts(hosts, randomize=True):
shuffle(result)

return result


def is_inet_4_or_6(gai):
"""Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
return gai[0] in (socket.AF_INET, socket.AF_INET6)


def dns_lookup(host, port, afi=socket.AF_UNSPEC):
"""Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)"""
# XXX: all DNS functions in Python are blocking. If we really
# want to be non-blocking here, we need to use a 3rd-party
# library like python-adns, or move resolution onto its
# own thread. This will be subject to the default libc
# name resolution timeout (5s on most Linux boxes)
try:
return list(filter(is_inet_4_or_6,
socket.getaddrinfo(host, port, afi,
socket.SOCK_STREAM)))
except socket.gaierror as ex:
log.warning('DNS lookup failed for %s:%d,'
' exception was %s. Is your'
' advertised.listeners (called'
' advertised.host.name before Kafka 9)'
' correct and resolvable?',
host, port, ex)
return []
25 changes: 25 additions & 0 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,28 @@ def test_lookup_on_connect():
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn.host == ip2


def test_relookup_on_failure():
hostname = 'example.org'
port = 9092
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
assert conn.host == conn.hostname == hostname
mock_return1 = []
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
last_attempt = conn.last_attempt
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
assert conn.disconnected()
assert conn.last_attempt > last_attempt

ip2 = '127.0.0.2'
mock_return2 = [
(2, 2, 17, '', (ip2, 9092)),
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn.host == ip2