1
1
import copy
2
+ import logging
3
+
2
4
from collections import defaultdict
3
5
from functools import partial
4
6
from itertools import count
5
- import logging
6
- import time
7
7
8
- from kafka .common import (
9
- ErrorMapping , TopicAndPartition , ConnectionError ,
10
- FailedPayloadsException
11
- )
8
+ from kafka .common import (ErrorMapping , TopicAndPartition ,
9
+ ConnectionError , FailedPayloadsError ,
10
+ BrokerResponseError , PartitionUnavailableError ,
11
+ KafkaUnavailableError , KafkaRequestError )
12
+
12
13
from kafka .conn import KafkaConnection
13
14
from kafka .protocol import KafkaProtocol
14
15
@@ -29,8 +30,8 @@ def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
29
30
}
30
31
self .brokers = {} # broker_id -> BrokerMetadata
31
32
self .topics_to_brokers = {} # topic_id -> broker_id
32
- self .topic_partitions = defaultdict ( list ) # topic_id -> [0, 1, 2, ...]
33
- self ._load_metadata_for_topics ()
33
+ self .topic_partitions = {} # topic_id -> [0, 1, 2, ...]
34
+ self .load_metadata_for_topics () # bootstrap with all metadata
34
35
35
36
##################
36
37
# Private API #
@@ -49,55 +50,13 @@ def _get_conn_for_broker(self, broker):
49
50
def _get_leader_for_partition (self , topic , partition ):
50
51
key = TopicAndPartition (topic , partition )
51
52
if key not in self .topics_to_brokers :
52
- self ._load_metadata_for_topics (topic )
53
+ self .load_metadata_for_topics (topic )
53
54
54
55
if key not in self .topics_to_brokers :
55
- raise Exception ("Partition does not exist: %s" % str (key ))
56
+ raise KafkaRequestError ("Partition does not exist: %s" % str (key ))
56
57
57
58
return self .topics_to_brokers [key ]
58
59
59
- def _load_metadata_for_topics (self , * topics ):
60
- """
61
- Discover brokers and metadata for a set of topics. This method will
62
- recurse in the event of a retry.
63
- """
64
- request_id = self ._next_id ()
65
- request = KafkaProtocol .encode_metadata_request (self .client_id ,
66
- request_id , topics )
67
-
68
- response = self ._send_broker_unaware_request (request_id , request )
69
- if response is None :
70
- raise Exception ("All servers failed to process request" )
71
-
72
- (brokers , topics ) = KafkaProtocol .decode_metadata_response (response )
73
-
74
- log .debug ("Broker metadata: %s" , brokers )
75
- log .debug ("Topic metadata: %s" , topics )
76
-
77
- self .brokers = brokers
78
- self .topics_to_brokers = {}
79
-
80
- for topic , partitions in topics .items ():
81
- # Clear the list once before we add it. This removes stale entries
82
- # and avoids duplicates
83
- self .topic_partitions .pop (topic , None )
84
-
85
- if not partitions :
86
- log .info ("Partition is unassigned, delay for 1s and retry" )
87
- time .sleep (1 )
88
- self ._load_metadata_for_topics (topic )
89
- break
90
-
91
- for partition , meta in partitions .items ():
92
- if meta .leader == - 1 :
93
- log .info ("Partition is unassigned, delay for 1s and retry" )
94
- time .sleep (1 )
95
- self ._load_metadata_for_topics (topic )
96
- else :
97
- topic_part = TopicAndPartition (topic , partition )
98
- self .topics_to_brokers [topic_part ] = brokers [meta .leader ]
99
- self .topic_partitions [topic ].append (partition )
100
-
101
60
def _next_id (self ):
102
61
"""
103
62
Generate a new correlation id
@@ -119,7 +78,7 @@ def _send_broker_unaware_request(self, requestId, request):
119
78
"trying next server: %s" % (request , conn , e ))
120
79
continue
121
80
122
- return None
81
+ raise KafkaUnavailableError ( "All servers failed to process request" )
123
82
124
83
def _send_broker_aware_request (self , payloads , encoder_fn , decoder_fn ):
125
84
"""
@@ -150,6 +109,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
150
109
for payload in payloads :
151
110
leader = self ._get_leader_for_partition (payload .topic ,
152
111
payload .partition )
112
+ if leader == - 1 :
113
+ raise PartitionUnavailableError ("Leader is unassigned for %s-%s" % payload .topic , payload .partition )
153
114
payloads_by_broker [leader ].append (payload )
154
115
original_keys .append ((payload .topic , payload .partition ))
155
116
@@ -185,21 +146,51 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
185
146
186
147
if failed :
187
148
failed_payloads += payloads
188
- self .topics_to_brokers = {} # reset metadata
149
+ self .reset_all_metadata ()
189
150
continue
190
151
191
152
for response in decoder_fn (response ):
192
153
acc [(response .topic , response .partition )] = response
193
154
194
155
if failed_payloads :
195
- raise FailedPayloadsException (failed_payloads )
156
+ raise FailedPayloadsError (failed_payloads )
196
157
197
158
# Order the accumulated responses by the original key order
198
159
return (acc [k ] for k in original_keys ) if acc else ()
199
160
161
+ def _raise_on_response_error (self , resp ):
162
+ if resp .error == ErrorMapping .NO_ERROR :
163
+ return
164
+
165
+ if resp .error in (ErrorMapping .UNKNOWN_TOPIC_OR_PARTITON ,
166
+ ErrorMapping .NOT_LEADER_FOR_PARTITION ):
167
+ self .reset_topic_metadata (resp .topic )
168
+
169
+ raise BrokerResponseError (
170
+ "Request for %s failed with errorcode=%d" %
171
+ (TopicAndPartition (resp .topic , resp .partition ), resp .error ))
172
+
200
173
#################
201
174
# Public API #
202
175
#################
176
+ def reset_topic_metadata (self , * topics ):
177
+ for topic in topics :
178
+ try :
179
+ partitions = self .topic_partitions [topic ]
180
+ except KeyError :
181
+ continue
182
+
183
+ for partition in partitions :
184
+ self .topics_to_brokers .pop (TopicAndPartition (topic , partition ), None )
185
+
186
+ del self .topic_partitions [topic ]
187
+
188
+ def reset_all_metadata (self ):
189
+ self .topics_to_brokers .clear ()
190
+ self .topic_partitions .clear ()
191
+
192
+ def has_metadata_for_topic (self , topic ):
193
+ return topic in self .topic_partitions
203
194
204
195
def close (self ):
205
196
for conn in self .conns .values ():
@@ -219,6 +210,36 @@ def reinit(self):
219
210
for conn in self .conns .values ():
220
211
conn .reinit ()
221
212
213
+ def load_metadata_for_topics (self , * topics ):
214
+ """
215
+ Discover brokers and metadata for a set of topics. This function is called
216
+ lazily whenever metadata is unavailable.
217
+ """
218
+ request_id = self ._next_id ()
219
+ request = KafkaProtocol .encode_metadata_request (self .client_id ,
220
+ request_id , topics )
221
+
222
+ response = self ._send_broker_unaware_request (request_id , request )
223
+
224
+ (brokers , topics ) = KafkaProtocol .decode_metadata_response (response )
225
+
226
+ log .debug ("Broker metadata: %s" , brokers )
227
+ log .debug ("Topic metadata: %s" , topics )
228
+
229
+ self .brokers = brokers
230
+
231
+ for topic , partitions in topics .items ():
232
+ self .reset_topic_metadata (topic )
233
+
234
+ if not partitions :
235
+ continue
236
+
237
+ self .topic_partitions [topic ] = []
238
+ for partition , meta in partitions .items ():
239
+ topic_part = TopicAndPartition (topic , partition )
240
+ self .topics_to_brokers [topic_part ] = brokers [meta .leader ]
241
+ self .topic_partitions [topic ].append (partition )
242
+
222
243
def send_produce_request (self , payloads = [], acks = 1 , timeout = 1000 ,
223
244
fail_on_error = True , callback = None ):
224
245
"""
@@ -256,14 +277,9 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
256
277
257
278
out = []
258
279
for resp in resps :
259
- # Check for errors
260
- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
261
- raise Exception (
262
- "ProduceRequest for %s failed with errorcode=%d" %
263
- (TopicAndPartition (resp .topic , resp .partition ),
264
- resp .error ))
265
-
266
- # Run the callback
280
+ if fail_on_error is True :
281
+ self ._raise_on_response_error (resp )
282
+
267
283
if callback is not None :
268
284
out .append (callback (resp ))
269
285
else :
@@ -289,14 +305,9 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
289
305
290
306
out = []
291
307
for resp in resps :
292
- # Check for errors
293
- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
294
- raise Exception (
295
- "FetchRequest for %s failed with errorcode=%d" %
296
- (TopicAndPartition (resp .topic , resp .partition ),
297
- resp .error ))
298
-
299
- # Run the callback
308
+ if fail_on_error is True :
309
+ self ._raise_on_response_error (resp )
310
+
300
311
if callback is not None :
301
312
out .append (callback (resp ))
302
313
else :
@@ -312,9 +323,8 @@ def send_offset_request(self, payloads=[], fail_on_error=True,
312
323
313
324
out = []
314
325
for resp in resps :
315
- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
316
- raise Exception ("OffsetRequest failed with errorcode=%s" ,
317
- resp .error )
326
+ if fail_on_error is True :
327
+ self ._raise_on_response_error (resp )
318
328
if callback is not None :
319
329
out .append (callback (resp ))
320
330
else :
@@ -330,9 +340,8 @@ def send_offset_commit_request(self, group, payloads=[],
330
340
331
341
out = []
332
342
for resp in resps :
333
- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
334
- raise Exception ("OffsetCommitRequest failed with "
335
- "errorcode=%s" , resp .error )
343
+ if fail_on_error is True :
344
+ self ._raise_on_response_error (resp )
336
345
337
346
if callback is not None :
338
347
out .append (callback (resp ))
@@ -350,9 +359,8 @@ def send_offset_fetch_request(self, group, payloads=[],
350
359
351
360
out = []
352
361
for resp in resps :
353
- if fail_on_error is True and resp .error != ErrorMapping .NO_ERROR :
354
- raise Exception ("OffsetCommitRequest failed with errorcode=%s" ,
355
- resp .error )
362
+ if fail_on_error is True :
363
+ self ._raise_on_response_error (resp )
356
364
if callback is not None :
357
365
out .append (callback (resp ))
358
366
else :
0 commit comments