@@ -46,7 +46,10 @@ class KafkaConsumer(six.Iterator):
46
46
It just needs to have at least one broker that will respond to a
47
47
Metadata API Request. Default port is 9092. If no servers are
48
48
specified, will default to localhost:9092.
49
- client_id (str): A name for this client. This string is passed in
49
+ client (kafka.client_async.KafkaClient): a kafka client to
50
+ use, or if unprovided, one is constructed from the provided
51
+ configuration.
52
+ client_id (str): a name for this client. This string is passed in
50
53
each request to servers and can be used to identify specific
51
54
server-side log entries that correspond to this client. Also
52
55
submitted to GroupCoordinator for logging with respect to
@@ -228,6 +231,7 @@ class KafkaConsumer(six.Iterator):
228
231
"""
229
232
DEFAULT_CONFIG = {
230
233
'bootstrap_servers' : 'localhost' ,
234
+ 'client' : None ,
231
235
'client_id' : 'kafka-python-' + __version__ ,
232
236
'group_id' : None ,
233
237
'key_deserializer' : None ,
@@ -324,7 +328,11 @@ def __init__(self, *topics, **configs):
324
328
log .warning ('use api_version=%s [tuple] -- "%s" as str is deprecated' ,
325
329
str (self .config ['api_version' ]), str_version )
326
330
327
- self ._client = KafkaClient (metrics = self ._metrics , ** self .config )
331
+ client = self .config .pop ('client' , None ) or KafkaClient (
332
+ metrics = self ._metrics ,
333
+ ** self .config
334
+ )
335
+ self ._client = client
328
336
329
337
# Get auto-discovered version from client if necessary
330
338
if self .config ['api_version' ] is None :
0 commit comments