Skip to content

Commit 8f076df

Browse files
committed
Merge pull request #84 from nieksand/simpler_timeouts
Replace _send_upstream datetime logic with simpler time().
2 parents bec4dd3 + cab017a commit 8f076df

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

kafka/producer.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
from collections import defaultdict
2-
from datetime import datetime, timedelta
32
from itertools import cycle
43
from multiprocessing import Queue, Process
54
from Queue import Empty
65
import logging
76
import sys
7+
import time
88

99
from kafka.common import ProduceRequest
1010
from kafka.common import FailedPayloadsException
@@ -36,7 +36,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
3636
while not stop:
3737
timeout = batch_time
3838
count = batch_size
39-
send_at = datetime.now() + timedelta(seconds=timeout)
39+
send_at = time.time() + timeout
4040
msgset = defaultdict(list)
4141

4242
# Keep fetching till we gather enough messages or a
@@ -54,7 +54,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
5454

5555
# Adjust the timeout to match the remaining period
5656
count -= 1
57-
timeout = (send_at - datetime.now()).total_seconds()
57+
timeout = send_at - time.time()
5858
msgset[partition].append(msg)
5959

6060
# Send collected requests upstream

0 commit comments

Comments
 (0)