5
5
6
6
from mock import MagicMock , patch
7
7
8
-
9
8
from kafka import KafkaClient
10
9
from kafka .common import (
11
10
ProduceRequest , FetchRequest , Message , ChecksumError ,
12
11
ConsumerFetchSizeTooSmall , ProduceResponse , FetchResponse ,
13
- OffsetAndMessage , BrokerMetadata , PartitionMetadata
12
+ OffsetAndMessage , BrokerMetadata , PartitionMetadata ,
13
+ TopicAndPartition , KafkaUnavailableError ,
14
+ LeaderUnavailableError , PartitionUnavailableError
14
15
)
15
- from kafka .common import KafkaUnavailableError
16
16
from kafka .codec import (
17
17
has_gzip , has_snappy , gzip_encode , gzip_decode ,
18
18
snappy_encode , snappy_decode
@@ -410,6 +410,7 @@ def test_encode_offset_request(self):
410
410
def test_decode_offset_response (self ):
411
411
pass
412
412
413
+
413
414
@unittest .skip ("Not Implemented" )
414
415
def test_encode_offset_commit_request (self ):
415
416
pass
@@ -474,18 +475,17 @@ def mock_get_conn(host, port):
474
475
return mocked_conns [(host , port )]
475
476
476
477
# patch to avoid making requests before we want it
477
- with patch .object (KafkaClient , 'load_metadata_for_topics' ), \
478
- patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
479
-
480
- client = KafkaClient (hosts = ['kafka01:9092' , 'kafka02:9092' ])
478
+ with patch .object (KafkaClient , 'load_metadata_for_topics' ):
479
+ with patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
480
+ client = KafkaClient (hosts = ['kafka01:9092' , 'kafka02:9092' ])
481
481
482
- self .assertRaises (
483
- KafkaUnavailableError ,
484
- client ._send_broker_unaware_request ,
485
- 1 , 'fake request' )
482
+ self .assertRaises (
483
+ KafkaUnavailableError ,
484
+ client ._send_broker_unaware_request ,
485
+ 1 , 'fake request' )
486
486
487
- for key , conn in mocked_conns .iteritems ():
488
- conn .send .assert_called_with (1 , 'fake request' )
487
+ for key , conn in mocked_conns .iteritems ():
488
+ conn .send .assert_called_with (1 , 'fake request' )
489
489
490
490
def test_send_broker_unaware_request (self ):
491
491
'Tests that call works when at least one of the host is available'
@@ -504,16 +504,171 @@ def mock_get_conn(host, port):
504
504
return mocked_conns [(host , port )]
505
505
506
506
# patch to avoid making requests before we want it
507
- with patch .object (KafkaClient , 'load_metadata_for_topics' ), \
508
- patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
507
+ with patch .object (KafkaClient , 'load_metadata_for_topics' ):
508
+ with patch .object (KafkaClient , '_get_conn' , side_effect = mock_get_conn ):
509
+ client = KafkaClient (hosts = 'kafka01:9092,kafka02:9092' )
510
+
511
+ resp = client ._send_broker_unaware_request (1 , 'fake request' )
512
+
513
+ self .assertEqual ('valid response' , resp )
514
+ mocked_conns [('kafka02' , 9092 )].recv .assert_called_with (1 )
515
+
516
+ @patch ('kafka.client.KafkaConnection' )
517
+ @patch ('kafka.client.KafkaProtocol' )
518
+ def test_load_metadata (self , protocol , conn ):
519
+ "Load metadata for all topics"
520
+
521
+ conn .recv .return_value = 'response' # anything but None
522
+
523
+ brokers = {}
524
+ brokers [0 ] = BrokerMetadata (1 , 'broker_1' , 4567 )
525
+ brokers [1 ] = BrokerMetadata (2 , 'broker_2' , 5678 )
526
+
527
+ topics = {}
528
+ topics ['topic_1' ] = {
529
+ 0 : PartitionMetadata ('topic_1' , 0 , 1 , [1 , 2 ], [1 , 2 ])
530
+ }
531
+ topics ['topic_noleader' ] = {
532
+ 0 : PartitionMetadata ('topic_noleader' , 0 , - 1 , [], []),
533
+ 1 : PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [])
534
+ }
535
+ topics ['topic_no_partitions' ] = {}
536
+ topics ['topic_3' ] = {
537
+ 0 : PartitionMetadata ('topic_3' , 0 , 0 , [0 , 1 ], [0 , 1 ]),
538
+ 1 : PartitionMetadata ('topic_3' , 1 , 1 , [1 , 0 ], [1 , 0 ]),
539
+ 2 : PartitionMetadata ('topic_3' , 2 , 0 , [0 , 1 ], [0 , 1 ])
540
+ }
541
+ protocol .decode_metadata_response .return_value = (brokers , topics )
542
+
543
+ # client loads metadata at init
544
+ client = KafkaClient (hosts = ['broker_1:4567' ])
545
+ self .assertDictEqual ({
546
+ TopicAndPartition ('topic_1' , 0 ): brokers [1 ],
547
+ TopicAndPartition ('topic_noleader' , 0 ): None ,
548
+ TopicAndPartition ('topic_noleader' , 1 ): None ,
549
+ TopicAndPartition ('topic_3' , 0 ): brokers [0 ],
550
+ TopicAndPartition ('topic_3' , 1 ): brokers [1 ],
551
+ TopicAndPartition ('topic_3' , 2 ): brokers [0 ]},
552
+ client .topics_to_brokers )
553
+
554
+ @patch ('kafka.client.KafkaConnection' )
555
+ @patch ('kafka.client.KafkaProtocol' )
556
+ def test_get_leader_for_partitions_reloads_metadata (self , protocol , conn ):
557
+ "Get leader for partitions reload metadata if it is not available"
558
+
559
+ conn .recv .return_value = 'response' # anything but None
560
+
561
+ brokers = {}
562
+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
563
+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
564
+
565
+ topics = {'topic_no_partitions' : {}}
566
+ protocol .decode_metadata_response .return_value = (brokers , topics )
567
+
568
+ client = KafkaClient (hosts = ['broker_1:4567' ])
569
+
570
+ # topic metadata is loaded but empty
571
+ self .assertDictEqual ({}, client .topics_to_brokers )
572
+
573
+ topics ['topic_no_partitions' ] = {
574
+ 0 : PartitionMetadata ('topic_no_partitions' , 0 , 0 , [0 , 1 ], [0 , 1 ])
575
+ }
576
+ protocol .decode_metadata_response .return_value = (brokers , topics )
577
+
578
+ # calling _get_leader_for_partition (from any broker aware request)
579
+ # will try loading metadata again for the same topic
580
+ leader = client ._get_leader_for_partition ('topic_no_partitions' , 0 )
581
+
582
+ self .assertEqual (brokers [0 ], leader )
583
+ self .assertDictEqual ({
584
+ TopicAndPartition ('topic_no_partitions' , 0 ): brokers [0 ]},
585
+ client .topics_to_brokers )
586
+
587
+ @patch ('kafka.client.KafkaConnection' )
588
+ @patch ('kafka.client.KafkaProtocol' )
589
+ def test_get_leader_for_unassigned_partitions (self , protocol , conn ):
590
+ "Get leader raises if no partitions is defined for a topic"
591
+
592
+ conn .recv .return_value = 'response' # anything but None
593
+
594
+ brokers = {}
595
+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
596
+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
597
+
598
+ topics = {'topic_no_partitions' : {}}
599
+ protocol .decode_metadata_response .return_value = (brokers , topics )
600
+
601
+ client = KafkaClient (hosts = ['broker_1:4567' ])
602
+
603
+ self .assertDictEqual ({}, client .topics_to_brokers )
604
+ self .assertRaises (
605
+ PartitionUnavailableError ,
606
+ client ._get_leader_for_partition ,
607
+ 'topic_no_partitions' , 0 )
608
+
609
+ @patch ('kafka.client.KafkaConnection' )
610
+ @patch ('kafka.client.KafkaProtocol' )
611
+ def test_get_leader_returns_none_when_noleader (self , protocol , conn ):
612
+ "Getting leader for partitions returns None when the partiion has no leader"
613
+
614
+ conn .recv .return_value = 'response' # anything but None
509
615
510
- client = KafkaClient (hosts = 'kafka01:9092,kafka02:9092' )
616
+ brokers = {}
617
+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
618
+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
619
+
620
+ topics = {}
621
+ topics ['topic_noleader' ] = {
622
+ 0 : PartitionMetadata ('topic_noleader' , 0 , - 1 , [], []),
623
+ 1 : PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [])
624
+ }
625
+ protocol .decode_metadata_response .return_value = (brokers , topics )
626
+
627
+ client = KafkaClient (hosts = ['broker_1:4567' ])
628
+ self .assertDictEqual (
629
+ {
630
+ TopicAndPartition ('topic_noleader' , 0 ): None ,
631
+ TopicAndPartition ('topic_noleader' , 1 ): None
632
+ },
633
+ client .topics_to_brokers )
634
+ self .assertIsNone (client ._get_leader_for_partition ('topic_noleader' , 0 ))
635
+ self .assertIsNone (client ._get_leader_for_partition ('topic_noleader' , 1 ))
636
+
637
+ topics ['topic_noleader' ] = {
638
+ 0 : PartitionMetadata ('topic_noleader' , 0 , 0 , [0 , 1 ], [0 , 1 ]),
639
+ 1 : PartitionMetadata ('topic_noleader' , 1 , 1 , [1 , 0 ], [1 , 0 ])
640
+ }
641
+ protocol .decode_metadata_response .return_value = (brokers , topics )
642
+ self .assertEqual (brokers [0 ], client ._get_leader_for_partition ('topic_noleader' , 0 ))
643
+ self .assertEqual (brokers [1 ], client ._get_leader_for_partition ('topic_noleader' , 1 ))
644
+
645
+ @patch ('kafka.client.KafkaConnection' )
646
+ @patch ('kafka.client.KafkaProtocol' )
647
+ def test_send_produce_request_raises_when_noleader (self , protocol , conn ):
648
+ "Send producer request raises LeaderUnavailableError if leader is not available"
649
+
650
+ conn .recv .return_value = 'response' # anything but None
651
+
652
+ brokers = {}
653
+ brokers [0 ] = BrokerMetadata (0 , 'broker_1' , 4567 )
654
+ brokers [1 ] = BrokerMetadata (1 , 'broker_2' , 5678 )
655
+
656
+ topics = {}
657
+ topics ['topic_noleader' ] = {
658
+ 0 : PartitionMetadata ('topic_noleader' , 0 , - 1 , [], []),
659
+ 1 : PartitionMetadata ('topic_noleader' , 1 , - 1 , [], [])
660
+ }
661
+ protocol .decode_metadata_response .return_value = (brokers , topics )
511
662
512
- resp = client . _send_broker_unaware_request ( 1 , 'fake request' )
663
+ client = KafkaClient ( hosts = [ 'broker_1:4567' ] )
513
664
514
- self .assertEqual ('valid response' , resp )
515
- mocked_conns [('kafka02' , 9092 )].recv .assert_called_with (1 )
665
+ requests = [ProduceRequest (
666
+ "topic_noleader" , 0 ,
667
+ [create_message ("a" ), create_message ("b" )])]
516
668
669
+ self .assertRaises (
670
+ LeaderUnavailableError ,
671
+ client .send_produce_request , requests )
517
672
518
673
if __name__ == '__main__' :
519
674
unittest .main ()
0 commit comments