7
7
ProduceRequest , FetchRequest , Message , ChecksumError ,
8
8
ConsumerFetchSizeTooSmall , ProduceResponse , FetchResponse ,
9
9
OffsetAndMessage , BrokerMetadata , PartitionMetadata ,
10
- TopicAndPartition
10
+ TopicAndPartition , PartitionUnavailableError
11
11
)
12
12
from kafka .codec import (
13
13
has_gzip , has_snappy , gzip_encode , gzip_decode ,
@@ -55,7 +55,6 @@ def test_submodule_namespace(self):
55
55
from kafka import KafkaClient as KafkaClient2
56
56
self .assertEquals (KafkaClient2 .__name__ , "KafkaClient" )
57
57
58
- from kafka .codec import snappy_encode
59
58
self .assertEquals (snappy_encode .__name__ , "snappy_encode" )
60
59
61
60
@@ -391,7 +390,8 @@ class TestClient(unittest.TestCase):
391
390
392
391
@patch ('kafka.client.KafkaConnection' )
393
392
@patch ('kafka.client.KafkaProtocol' )
394
- def test_client_load_metadata (self , protocol , conn ):
393
+ def test_load_metadata (self , protocol , conn ):
394
+ "Load metadata for all topics"
395
395
396
396
conn .recv .return_value = 'response' # anything but None
397
397
@@ -403,102 +403,119 @@ def test_client_load_metadata(self, protocol, conn):
403
403
topics ['topic_1' ] = {
404
404
0 : PartitionMetadata ('topic_1' , 0 , 1 , [1 , 2 ], [1 , 2 ])
405
405
}
406
- topics ['topic_2' ] = {
407
- 0 : PartitionMetadata ('topic_2' , 0 , 0 , [0 , 1 ], [0 , 1 ]),
408
- 1 : PartitionMetadata ('topic_2' , 1 , 1 , [1 , 0 ], [1 , 0 ])
406
+ topics ['topic_noleader' ] = {
407
+ 0 : PartitionMetadata ('topic_noleader' , 0 , - 1 , [], []),
408
+ 1 : PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [])
409
+ }
410
+ topics ['topic_no_partitions' ] = {}
411
+ topics ['topic_3' ] = {
412
+ 0 : PartitionMetadata ('topic_3' , 0 , 0 , [0 , 1 ], [0 , 1 ]),
413
+ 1 : PartitionMetadata ('topic_3' , 1 , 1 , [1 , 0 ], [1 , 0 ]),
414
+ 2 : PartitionMetadata ('topic_3' , 2 , 0 , [0 , 1 ], [0 , 1 ])
409
415
}
410
416
protocol .decode_metadata_response .return_value = (brokers , topics )
411
417
418
+ # client loads metadata at init
412
419
client = KafkaClient (host = 'broker_1' , port = 4567 )
413
420
self .assertItemsEqual ({
414
421
TopicAndPartition ('topic_1' , 0 ): brokers [0 ],
415
- TopicAndPartition ('topic_2' , 0 ): brokers [0 ],
416
- TopicAndPartition ('topic_2' , 1 ): brokers [1 ]},
422
+ TopicAndPartition ('topic_noleader' , 0 ): None ,
423
+ TopicAndPartition ('topic_noleader' , 1 ): None ,
424
+ TopicAndPartition ('topic_3' , 0 ): brokers [0 ],
425
+ TopicAndPartition ('topic_3' , 1 ): brokers [1 ],
426
+ TopicAndPartition ('topic_3' , 2 ): brokers [0 ]},
417
427
client .topics_to_brokers )
418
428
419
429
@patch ('kafka.client.KafkaConnection' )
420
430
@patch ('kafka.client.KafkaProtocol' )
421
- def test_client_load_metadata_unassigned_partitions (self , protocol , conn ):
431
+ def test_get_leader_for_partitions_reloads_metadata (self , protocol , conn ):
432
+ "Get leader for partitions reload metadata if it is not available"
422
433
423
434
conn .recv .return_value = 'response' # anything but None
424
435
425
436
brokers = {}
426
437
brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
427
438
brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
428
439
429
- topics = {}
430
- topics ['topic_1' ] = {
431
- 0 : PartitionMetadata ('topic_1' , 0 , - 1 , [], [])
432
- }
440
+ topics = {'topic_no_partitions' : {}}
433
441
protocol .decode_metadata_response .return_value = (brokers , topics )
434
442
435
443
client = KafkaClient (host = 'broker_1' , port = 4567 )
436
444
445
+ # topic metadata is loaded but empty
437
446
self .assertItemsEqual ({}, client .topics_to_brokers )
438
- self .assertRaises (
439
- Exception ,
440
- client ._get_leader_for_partition ,
441
- 'topic_1' , 0 )
447
+
448
+ topics ['topic_no_partitions' ] = {
449
+ 0 : PartitionMetadata ('topic_no_partitions' , 0 , 0 , [0 , 1 ], [0 , 1 ])
450
+ }
451
+ protocol .decode_metadata_response .return_value = (brokers , topics )
442
452
443
453
# calling _get_leader_for_partition (from any broker aware request)
444
454
# will try loading metadata again for the same topic
445
- topics ['topic_1' ] = {
446
- 0 : PartitionMetadata ('topic_1' , 0 , 0 , [0 , 1 ], [0 , 1 ])
447
- }
448
- leader = client ._get_leader_for_partition ('topic_1' , 0 )
455
+ leader = client ._get_leader_for_partition ('topic_no_partitions' , 0 )
449
456
450
457
self .assertEqual (brokers [0 ], leader )
451
458
self .assertItemsEqual ({
452
- TopicAndPartition ('topic_1 ' , 0 ): brokers [0 ]},
459
+ TopicAndPartition ('topic_no_partitions ' , 0 ): brokers [0 ]},
453
460
client .topics_to_brokers )
454
461
455
462
@patch ('kafka.client.KafkaConnection' )
456
463
@patch ('kafka.client.KafkaProtocol' )
457
- def test_client_load_metadata_noleader_partitions (self , protocol , conn ):
464
+ def test_get_leader_for_unassigned_partitions (self , protocol , conn ):
465
+ "Get leader raises if no partitions is defined for a topic"
458
466
459
467
conn .recv .return_value = 'response' # anything but None
460
468
461
469
brokers = {}
462
470
brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
463
471
brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
464
472
465
- topics = {}
466
- topics ['topic_1' ] = {
467
- 0 : PartitionMetadata ('topic_1' , 0 , - 1 , [], [])
468
- }
469
- topics ['topic_2' ] = {
470
- 0 : PartitionMetadata ('topic_2' , 0 , 0 , [0 , 1 ], []),
471
- 1 : PartitionMetadata ('topic_2' , 1 , 1 , [1 , 0 ], [1 , 0 ])
472
- }
473
+ topics = {'topic_no_partitions' : {}}
473
474
protocol .decode_metadata_response .return_value = (brokers , topics )
474
475
475
476
client = KafkaClient (host = 'broker_1' , port = 4567 )
476
- self .assertItemsEqual (
477
- {
478
- TopicAndPartition ('topic_2' , 0 ): brokers [0 ],
479
- TopicAndPartition ('topic_2' , 1 ): brokers [1 ]
480
- },
481
- client .topics_to_brokers )
477
+
478
+ self .assertItemsEqual ({}, client .topics_to_brokers )
482
479
self .assertRaises (
483
- Exception ,
480
+ PartitionUnavailableError ,
484
481
client ._get_leader_for_partition ,
485
- 'topic_1 ' , 0 )
482
+ 'topic_no_partitions ' , 0 )
486
483
487
- # calling _get_leader_for_partition (from any broker aware request)
488
- # will try loading metadata again for the same topic
489
- topics ['topic_1' ] = {
490
- 0 : PartitionMetadata ('topic_1' , 0 , 0 , [0 , 1 ], [0 , 1 ])
484
+ @patch ('kafka.client.KafkaConnection' )
485
+ @patch ('kafka.client.KafkaProtocol' )
486
+ def test_get_leader_returns_none_when_noleader (self , protocol , conn ):
487
+ "Getting leader for partitions returns None when the partiion has no leader"
488
+
489
+ conn .recv .return_value = 'response' # anything but None
490
+
491
+ brokers = {}
492
+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
493
+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
494
+
495
+ topics = {}
496
+ topics ['topic_noleader' ] = {
497
+ 0 : PartitionMetadata ('topic_noleader' , 0 , - 1 , [], []),
498
+ 1 : PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [])
491
499
}
492
- leader = client . _get_leader_for_partition ( 'topic_1' , 0 )
500
+ protocol . decode_metadata_response . return_value = ( brokers , topics )
493
501
494
- self . assertEqual ( brokers [ 0 ], leader )
502
+ client = KafkaClient ( host = 'broker_1' , port = 4567 )
495
503
self .assertItemsEqual (
496
504
{
497
- TopicAndPartition ('topic_1' , 0 ): brokers [0 ],
498
- TopicAndPartition ('topic_2' , 0 ): brokers [0 ],
499
- TopicAndPartition ('topic_2' , 1 ): brokers [1 ]
505
+ TopicAndPartition ('topic_noleader' , 0 ): None ,
506
+ TopicAndPartition ('topic_noleader' , 1 ): None
500
507
},
501
508
client .topics_to_brokers )
509
+ self .assertIsNone (client ._get_leader_for_partition ('topic_noleader' , 0 ))
510
+ self .assertIsNone (client ._get_leader_for_partition ('topic_noleader' , 1 ))
511
+
512
+ topics ['topic_noleader' ] = {
513
+ 0 : PartitionMetadata ('topic_noleader' , 0 , 0 , [0 , 1 ], [0 , 1 ]),
514
+ 1 : PartitionMetadata ('topic_noleader' , 1 , 1 , [1 , 0 ], [1 , 0 ])
515
+ }
516
+ protocol .decode_metadata_response .return_value = (brokers , topics )
517
+ self .assertEqual (brokers [0 ], client ._get_leader_for_partition ('topic_noleader' , 0 ))
518
+ self .assertEqual (brokers [1 ], client ._get_leader_for_partition ('topic_noleader' , 1 ))
502
519
503
520
if __name__ == '__main__' :
504
521
unittest .main ()
0 commit comments