Skip to content

Cannot create producer in Kubernetes without specifying api_version #1294

Closed
@JoshArmi

Description

@JoshArmi

When creating a KafkaProducer using this library on Kubernetes, the producer isn't created successfully.

Doing some debugging pointed to an attempt to dns lookup the pods in the cluster, which isn't supported by Kubernetes. It appears to be as part of the code that tries to ascertain what API is available. Whether you specify via hostname or IP, with or without port doesn't appear to make a difference.

When you specify an explicit api_version it works as expected.

In [8]: producer = KafkaProducer(bootstrap_servers='kafka.default.svc.cluster.local')
---------------------------------------------------------------------------
gaierror                                  Traceback (most recent call last)
/usr/local/lib/python3.6/site-packages/kafka/conn.py in connect(self)
    235                                                        socket.AF_UNSPEC,
--> 236                                                        socket.SOCK_STREAM)
    237                     except socket.gaierror as ex:

/usr/local/lib/python3.6/socket.py in getaddrinfo(host, port, family, type, proto, flags)
    744     addrlist = []
--> 745     for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
    746         af, socktype, proto, canonname, sa = res

gaierror: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

gaierror                                  Traceback (most recent call last)
<ipython-input-8-448608bc6cbd> in <module>()
----> 1 producer = KafkaProducer(bootstrap_servers='kafka.default.svc.cluster.local')

/usr/local/lib/python3.6/site-packages/kafka/producer/kafka.py in __init__(self, **configs)
    333
    334         client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
--> 335                              **self.config)
    336
    337         # Get auto-discovered version from client if necessary

/usr/local/lib/python3.6/site-packages/kafka/client_async.py in __init__(self, **configs)
    208         if self.config['api_version'] is None:
    209             check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
--> 210             self.config['api_version'] = self.check_version(timeout=check_timeout)
    211
    212     def _bootstrap(self, hosts):

/usr/local/lib/python3.6/site-packages/kafka/client_async.py in check_version(self, node_id, timeout, strict)
    807             if try_node is None:
    808                 raise Errors.NoBrokersAvailable()
--> 809             self._maybe_connect(try_node)
    810             conn = self._conns[try_node]
    811

/usr/local/lib/python3.6/site-packages/kafka/client_async.py in _maybe_connect(self, node_id)
    331         if conn.connected():
    332             return True
--> 333         conn.connect()
    334         return conn.connected()
    335

/usr/local/lib/python3.6/site-packages/kafka/conn.py in connect(self)
    239                           'exception was {2}. Is your advertised.listeners (called'
    240                           'advertised.host.name before Kafka 9) correct and resolvable?'.format(
--> 241                              self._init_host, self._init_port, ex
    242                           ))
    243                     self._gai_index = 0

gaierror: getaddrinfo failed for kafka-7785c49648-rwr9d:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (calledadvertised.host.name before Kafka 9) correct and resolvable?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions