10
10
11
11
from kafka .admin .acl_resource import ACLOperation , ACLPermissionType , ACLFilter , ACL , ResourcePattern , ResourceType , \
12
12
ACLResourcePatternType
13
+ from kafka .admin .leader_election_resources import ElectionType
13
14
from kafka .client_async import KafkaClient , selectors
14
15
from kafka .coordinator .protocol import ConsumerProtocolMemberMetadata , ConsumerProtocolMemberAssignment , ConsumerProtocol
15
16
import kafka .errors as Errors
20
21
from kafka .protocol .admin import (
21
22
CreateTopicsRequest , DeleteTopicsRequest , DescribeConfigsRequest , AlterConfigsRequest , CreatePartitionsRequest ,
22
23
ListGroupsRequest , DescribeGroupsRequest , DescribeAclsRequest , CreateAclsRequest , DeleteAclsRequest ,
23
- DeleteGroupsRequest
24
+ DeleteGroupsRequest , ElectLeadersRequest
24
25
)
25
26
from kafka .protocol .commit import GroupCoordinatorRequest , OffsetFetchRequest
26
27
from kafka .protocol .metadata import MetadataRequest
@@ -393,27 +394,55 @@ def _send_request_to_controller(self, request):
393
394
# So this is a little brittle in that it assumes all responses have
394
395
# one of these attributes and that they always unpack into
395
396
# (topic, error_code) tuples.
396
- topic_error_tuples = (response .topic_errors if hasattr (response , 'topic_errors' )
397
- else response .topic_error_codes )
398
- # Also small py2/py3 compatibility -- py3 can ignore extra values
399
- # during unpack via: for x, y, *rest in list_of_values. py2 cannot.
400
- # So for now we have to map across the list and explicitly drop any
401
- # extra values (usually the error_message)
402
- for topic , error_code in map (lambda e : e [:2 ], topic_error_tuples ):
397
+ topic_error_tuples = getattr (response , 'topic_errors' , getattr (response , 'topic_error_codes' , None ))
398
+ if topic_error_tuples is not None :
399
+ success = self ._parse_topic_request_response (topic_error_tuples , request , response , tries )
400
+ else :
401
+ # Leader Election request has a two layer error response (topic and partition)
402
+ success = self ._parse_topic_partition_request_response (request , response , tries )
403
+
404
+ if success :
405
+ return response
406
+ raise RuntimeError ("This should never happen, please file a bug with full stacktrace if encountered" )
407
+
408
+ def _parse_topic_request_response (self , topic_error_tuples , request , response , tries ):
409
+ # Also small py2/py3 compatibility -- py3 can ignore extra values
410
+ # during unpack via: for x, y, *rest in list_of_values. py2 cannot.
411
+ # So for now we have to map across the list and explicitly drop any
412
+ # extra values (usually the error_message)
413
+ for topic , error_code in map (lambda e : e [:2 ], topic_error_tuples ):
414
+ error_type = Errors .for_code (error_code )
415
+ if tries and error_type is NotControllerError :
416
+ # No need to inspect the rest of the errors for
417
+ # non-retriable errors because NotControllerError should
418
+ # either be thrown for all errors or no errors.
419
+ self ._refresh_controller_id ()
420
+ return False
421
+ elif error_type is not Errors .NoError :
422
+ raise error_type (
423
+ "Request '{}' failed with response '{}'."
424
+ .format (request , response ))
425
+ return True
426
+
427
+ def _parse_topic_partition_request_response (self , request , response , tries ):
428
+ # Also small py2/py3 compatibility -- py3 can ignore extra values
429
+ # during unpack via: for x, y, *rest in list_of_values. py2 cannot.
430
+ # So for now we have to map across the list and explicitly drop any
431
+ # extra values (usually the error_message)
432
+ for topic , partition_results in response .replication_election_results :
433
+ for partition_id , error_code in map (lambda e : e [:2 ], partition_results ):
403
434
error_type = Errors .for_code (error_code )
404
435
if tries and error_type is NotControllerError :
405
436
# No need to inspect the rest of the errors for
406
437
# non-retriable errors because NotControllerError should
407
438
# either be thrown for all errors or no errors.
408
439
self ._refresh_controller_id ()
409
- break
410
- elif error_type is not Errors .NoError :
440
+ return False
441
+ elif error_type not in [ Errors .NoError , Errors . ElectionNotNeeded ] :
411
442
raise error_type (
412
443
"Request '{}' failed with response '{}'."
413
444
.format (request , response ))
414
- else :
415
- return response
416
- raise RuntimeError ("This should never happen, please file a bug with full stacktrace if encountered" )
445
+ return True
417
446
418
447
@staticmethod
419
448
def _convert_new_topic_request (new_topic ):
@@ -1337,10 +1366,60 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id):
1337
1366
.format (version ))
1338
1367
return self ._send_request_to_node (group_coordinator_id , request )
1339
1368
1369
+ @staticmethod
1370
+ def _convert_topic_partitions (topic_partitions ):
1371
+ return [
1372
+ (
1373
+ topic ,
1374
+ partition_ids
1375
+ )
1376
+ for topic , partition_ids in topic_partitions .items ()
1377
+ ]
1378
+
1379
+ def _get_all_topic_partitions (self ):
1380
+ return [
1381
+ (
1382
+ topic ,
1383
+ [partition_info .partition for partition_info in self ._client .cluster ._partitions [topic ].values ()]
1384
+ )
1385
+ for topic in self ._client .cluster .topics ()
1386
+ ]
1387
+
1388
+ def _get_topic_partitions (self , topic_partitions ):
1389
+ if topic_partitions is None :
1390
+ return self ._get_all_topic_partitions ()
1391
+ return self ._convert_topic_partitions (topic_partitions )
1392
+
1393
+ def perform_leader_election (self , election_type , topic_partitions = None , timeout_ms = None ):
1394
+ """Perform leader election on the topic partitions.
1395
+
1396
+ :param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean
1397
+ :param topic_partitions: A map of topic name strings to partition ids list.
1398
+ By default, will run on all topic partitions
1399
+ :param timeout_ms: Milliseconds to wait for the leader election process to complete
1400
+ before the broker returns.
1401
+
1402
+ :return: Appropriate version of ElectLeadersResponse class.
1403
+ """
1404
+ version = self ._matching_api_version (ElectLeadersRequest )
1405
+ timeout_ms = self ._validate_timeout (timeout_ms )
1406
+ if 0 < version <= 1 :
1407
+ request = ElectLeadersRequest [version ](
1408
+ election_type = ElectionType (election_type ),
1409
+ topic_partitions = self ._get_topic_partitions (topic_partitions ),
1410
+ timeout = timeout_ms ,
1411
+ )
1412
+ else :
1413
+ raise NotImplementedError (
1414
+ "Support for CreateTopics v{} has not yet been added to KafkaAdminClient."
1415
+ .format (version ))
1416
+ # TODO convert structs to a more pythonic interface
1417
+ return self ._send_request_to_controller (request )
1418
+
1340
1419
def _wait_for_futures (self , futures ):
1341
1420
while not all (future .succeeded () for future in futures ):
1342
1421
for future in futures :
1343
1422
self ._client .poll (future = future )
1344
1423
1345
1424
if future .failed ():
1346
- raise future .exception # pylint: disable-msg=raising-bad-type
1425
+ raise future .exception # pylint: disable-msg=raising-bad-type
0 commit comments