8
8
import time
9
9
10
10
from kafka .client_async import KafkaClient
11
- from kafka .codec import gzip_encode
12
11
from kafka .consumer .fetcher import (
13
12
CompletedFetch , ConsumerRecord , Fetcher , NoOffsetForPartitionError
14
13
)
15
14
from kafka .consumer .subscription_state import SubscriptionState
16
15
from kafka .metrics import Metrics
17
16
from kafka .protocol .fetch import FetchRequest , FetchResponse
18
- from kafka .protocol .message import Message
19
17
from kafka .protocol .offset import OffsetResponse
20
- from kafka .protocol .types import Int64 , Int32
21
18
from kafka .structs import TopicPartition
22
19
from kafka .future import Future
23
20
from kafka .errors import (
24
21
StaleMetadata , LeaderNotAvailableError , NotLeaderForPartitionError ,
25
22
UnknownTopicOrPartitionError , OffsetOutOfRangeError
26
23
)
24
+ from kafka .record .memory_records import MemoryRecordsBuilder , MemoryRecords
27
25
28
26
29
27
@pytest .fixture
@@ -51,6 +49,16 @@ def fetcher(client, subscription_state, topic):
51
49
return Fetcher (client , subscription_state , Metrics ())
52
50
53
51
52
+ def _build_record_batch (msgs , compression = 0 ):
53
+ builder = MemoryRecordsBuilder (
54
+ magic = 1 , compression_type = 0 , batch_size = 9999999 )
55
+ for msg in msgs :
56
+ key , value , timestamp = msg
57
+ builder .append (key = key , value = value , timestamp = timestamp )
58
+ builder .close ()
59
+ return builder .buffer ()
60
+
61
+
54
62
def test_send_fetches (fetcher , topic , mocker ):
55
63
fetch_requests = [
56
64
FetchRequest [0 ](
@@ -321,12 +329,12 @@ def test_partition_records_offset():
321
329
def test_fetched_records (fetcher , topic , mocker ):
322
330
fetcher .config ['check_crcs' ] = False
323
331
tp = TopicPartition (topic , 0 )
332
+
324
333
msgs = []
325
334
for i in range (10 ):
326
- msg = Message (b'foo' )
327
- msgs .append ((i , - 1 , msg ))
335
+ msgs .append ((None , b"foo" , None ))
328
336
completed_fetch = CompletedFetch (
329
- tp , 0 , 0 , [0 , 100 , msgs ],
337
+ tp , 0 , 0 , [0 , 100 , _build_record_batch ( msgs ) ],
330
338
mocker .MagicMock ()
331
339
)
332
340
fetcher ._completed_fetches .append (completed_fetch )
@@ -401,11 +409,12 @@ def test__unpack_message_set(fetcher):
401
409
fetcher .config ['check_crcs' ] = False
402
410
tp = TopicPartition ('foo' , 0 )
403
411
messages = [
404
- (0 , None , Message ( b'a' ) ),
405
- (1 , None , Message ( b'b' ) ),
406
- (2 , None , Message ( b'c' ))
412
+ (None , b"a" , None ),
413
+ (None , b"b" , None ),
414
+ (None , b"c" , None ),
407
415
]
408
- records = list (fetcher ._unpack_message_set (tp , messages ))
416
+ memory_records = MemoryRecords (_build_record_batch (messages ))
417
+ records = list (fetcher ._unpack_message_set (tp , memory_records ))
409
418
assert len (records ) == 3
410
419
assert all (map (lambda x : isinstance (x , ConsumerRecord ), records ))
411
420
assert records [0 ].value == b'a'
@@ -416,88 +425,14 @@ def test__unpack_message_set(fetcher):
416
425
assert records [2 ].offset == 2
417
426
418
427
419
- def test__unpack_message_set_compressed_v0 (fetcher ):
420
- fetcher .config ['check_crcs' ] = False
421
- tp = TopicPartition ('foo' , 0 )
422
- messages = [
423
- (0 , None , Message (b'a' )),
424
- (1 , None , Message (b'b' )),
425
- (2 , None , Message (b'c' )),
426
- ]
427
- message_bytes = []
428
- for offset , _ , m in messages :
429
- encoded = m .encode ()
430
- message_bytes .append (Int64 .encode (offset ) + Int32 .encode (len (encoded )) + encoded )
431
- compressed_bytes = gzip_encode (b'' .join (message_bytes ))
432
- compressed_base_offset = 0
433
- compressed_msgs = [
434
- (compressed_base_offset , None ,
435
- Message (compressed_bytes ,
436
- magic = 0 ,
437
- attributes = Message .CODEC_GZIP ))
438
- ]
439
- records = list (fetcher ._unpack_message_set (tp , compressed_msgs ))
440
- assert len (records ) == 3
441
- assert all (map (lambda x : isinstance (x , ConsumerRecord ), records ))
442
- assert records [0 ].value == b'a'
443
- assert records [1 ].value == b'b'
444
- assert records [2 ].value == b'c'
445
- assert records [0 ].offset == 0
446
- assert records [1 ].offset == 1
447
- assert records [2 ].offset == 2
448
-
449
-
450
- def test__unpack_message_set_compressed_v1 (fetcher ):
451
- fetcher .config ['check_crcs' ] = False
452
- tp = TopicPartition ('foo' , 0 )
453
- messages = [
454
- (0 , None , Message (b'a' )),
455
- (1 , None , Message (b'b' )),
456
- (2 , None , Message (b'c' )),
457
- ]
458
- message_bytes = []
459
- for offset , _ , m in messages :
460
- encoded = m .encode ()
461
- message_bytes .append (Int64 .encode (offset ) + Int32 .encode (len (encoded )) + encoded )
462
- compressed_bytes = gzip_encode (b'' .join (message_bytes ))
463
- compressed_base_offset = 10
464
- compressed_msgs = [
465
- (compressed_base_offset , None ,
466
- Message (compressed_bytes ,
467
- magic = 1 ,
468
- attributes = Message .CODEC_GZIP ))
469
- ]
470
- records = list (fetcher ._unpack_message_set (tp , compressed_msgs ))
471
- assert len (records ) == 3
472
- assert all (map (lambda x : isinstance (x , ConsumerRecord ), records ))
473
- assert records [0 ].value == b'a'
474
- assert records [1 ].value == b'b'
475
- assert records [2 ].value == b'c'
476
- assert records [0 ].offset == 8
477
- assert records [1 ].offset == 9
478
- assert records [2 ].offset == 10
479
-
480
-
481
- def test__parse_record (fetcher ):
482
- tp = TopicPartition ('foo' , 0 )
483
- record = fetcher ._parse_record (tp , 123 , 456 , Message (b'abc' ))
484
- assert record .topic == 'foo'
485
- assert record .partition == 0
486
- assert record .offset == 123
487
- assert record .timestamp == 456
488
- assert record .value == b'abc'
489
- assert record .key is None
490
-
491
-
492
428
def test__message_generator (fetcher , topic , mocker ):
493
429
fetcher .config ['check_crcs' ] = False
494
430
tp = TopicPartition (topic , 0 )
495
431
msgs = []
496
432
for i in range (10 ):
497
- msg = Message (b'foo' )
498
- msgs .append ((i , - 1 , msg ))
433
+ msgs .append ((None , b"foo" , None ))
499
434
completed_fetch = CompletedFetch (
500
- tp , 0 , 0 , [0 , 100 , msgs ],
435
+ tp , 0 , 0 , [0 , 100 , _build_record_batch ( msgs ) ],
501
436
mocker .MagicMock ()
502
437
)
503
438
fetcher ._completed_fetches .append (completed_fetch )
@@ -513,10 +448,9 @@ def test__parse_fetched_data(fetcher, topic, mocker):
513
448
tp = TopicPartition (topic , 0 )
514
449
msgs = []
515
450
for i in range (10 ):
516
- msg = Message (b'foo' )
517
- msgs .append ((i , - 1 , msg ))
451
+ msgs .append ((None , b"foo" , None ))
518
452
completed_fetch = CompletedFetch (
519
- tp , 0 , 0 , [0 , 100 , msgs ],
453
+ tp , 0 , 0 , [0 , 100 , _build_record_batch ( msgs ) ],
520
454
mocker .MagicMock ()
521
455
)
522
456
partition_record = fetcher ._parse_fetched_data (completed_fetch )
@@ -529,10 +463,9 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker):
529
463
tp = TopicPartition (topic , 0 )
530
464
msgs = []
531
465
for i in range (10 ):
532
- msg = Message (b'foo' )
533
- msgs .append ((i , - 1 , msg ))
466
+ msgs .append ((None , b"foo" , None ))
534
467
completed_fetch = CompletedFetch (
535
- tp , 0 , 0 , [0 , 100 , msgs ],
468
+ tp , 0 , 0 , [0 , 100 , _build_record_batch ( msgs ) ],
536
469
mocker .MagicMock ()
537
470
)
538
471
fetcher ._subscriptions .pause (tp )
@@ -545,10 +478,9 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
545
478
tp = TopicPartition (topic , 0 )
546
479
msgs = []
547
480
for i in range (10 ):
548
- msg = Message (b'foo' )
549
- msgs .append ((i , - 1 , msg ))
481
+ msgs .append ((None , b"foo" , None ))
550
482
completed_fetch = CompletedFetch (
551
- tp , 10 , 0 , [0 , 100 , msgs ],
483
+ tp , 10 , 0 , [0 , 100 , _build_record_batch ( msgs ) ],
552
484
mocker .MagicMock ()
553
485
)
554
486
partition_record = fetcher ._parse_fetched_data (completed_fetch )
0 commit comments