@@ -194,12 +194,23 @@ def test_complete_batch_producer_id_changed_no_retry(sender, accumulator, transa
194
194
195
195
def test_fail_batch (sender , accumulator , transaction_manager , mocker ):
196
196
sender ._transaction_manager = transaction_manager
197
- mocker .patch .object (TransactionManager , 'reset_producer_id' )
198
197
batch = producer_batch ()
199
198
mocker .patch .object (batch , 'done' )
200
199
assert sender ._transaction_manager .producer_id_and_epoch .producer_id == batch .producer_id
201
200
error = Exception ('error' )
202
201
sender ._fail_batch (batch , base_offset = 0 , timestamp_ms = None , exception = error , log_start_offset = None )
202
+ batch .done .assert_called_with (base_offset = 0 , timestamp_ms = None , exception = error , log_start_offset = None )
203
+
204
+
205
+ def test_out_of_order_sequence_number_reset_producer_id (sender , accumulator , transaction_manager , mocker ):
206
+ sender ._transaction_manager = transaction_manager
207
+ assert transaction_manager .transactional_id is None # this test is for idempotent producer only
208
+ mocker .patch .object (TransactionManager , 'reset_producer_id' )
209
+ batch = producer_batch ()
210
+ mocker .patch .object (batch , 'done' )
211
+ assert sender ._transaction_manager .producer_id_and_epoch .producer_id == batch .producer_id
212
+ error = Errors .OutOfOrderSequenceNumberError ()
213
+ sender ._fail_batch (batch , base_offset = 0 , timestamp_ms = None , exception = error , log_start_offset = None )
203
214
sender ._transaction_manager .reset_producer_id .assert_called_once ()
204
215
batch .done .assert_called_with (base_offset = 0 , timestamp_ms = None , exception = error , log_start_offset = None )
205
216
0 commit comments