Skip to content

Commit b2fe8a1

Browse files
committed
Added specific to record/ folder micro benchmarks to get exact speed change after updating to V2 message format
1 parent b6629a0 commit b2fe8a1

File tree

3 files changed

+155
-0
lines changed

3 files changed

+155
-0
lines changed

benchmarks/README

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
The `record_batch_*` benchmarks in this section are written using
2+
``perf`` library, created by Viktor Stinner. For more information on how to get
3+
reliable results of test runs please consult
4+
http://perf.readthedocs.io/en/latest/run_benchmark.html.

benchmarks/record_batch_compose.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#!/usr/bin/env python3
2+
from __future__ import print_function
3+
import perf
4+
from kafka.record.memory_records import MemoryRecordsBuilder
5+
import itertools
6+
import random
7+
import hashlib
8+
import os
9+
10+
11+
DEFAULT_BATCH_SIZE = 1600 * 1024
12+
KEY_SIZE = 6
13+
VALUE_SIZE = 60
14+
TIMESTAMP_RANGE = [1505824130000, 1505824140000]
15+
16+
# With values above v1 record is 100 bytes, so 10_000 bytes for 100 messages
17+
MESSAGES_PER_BATCH = 100
18+
19+
20+
def random_bytes(length):
21+
buffer = bytearray(length)
22+
for i in range(length):
23+
buffer[i] = random.randint(0, 255)
24+
return bytes(buffer)
25+
26+
27+
def prepare():
28+
return iter(itertools.cycle([
29+
(random_bytes(KEY_SIZE),
30+
random_bytes(VALUE_SIZE),
31+
random.randint(*TIMESTAMP_RANGE)
32+
)
33+
for _ in range(int(MESSAGES_PER_BATCH * 1.94))
34+
]))
35+
36+
37+
def finalize(results):
38+
# Just some strange code to make sure PyPy does execute the main code
39+
# properly, without optimizing it away
40+
hash_val = hashlib.md5()
41+
for buf in results:
42+
hash_val.update(buf)
43+
print(hash_val, file=open(os.devnull, "w"))
44+
45+
46+
def func(loops, magic):
47+
# Jit can optimize out the whole function if the result is the same each
48+
# time, so we need some randomized input data )
49+
precomputed_samples = prepare()
50+
results = []
51+
52+
# Main benchmark code.
53+
t0 = perf.perf_counter()
54+
for _ in range(loops):
55+
batch = MemoryRecordsBuilder(
56+
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
57+
for _ in range(MESSAGES_PER_BATCH):
58+
key, value, timestamp = next(precomputed_samples)
59+
size = batch.append(timestamp=timestamp, key=key, value=value)
60+
assert size
61+
batch.close()
62+
results.append(batch.buffer())
63+
64+
res = perf.perf_counter() - t0
65+
66+
finalize(results)
67+
68+
return res
69+
70+
71+
runner = perf.Runner()
72+
runner.bench_time_func('batch_append_v0', func, 0)
73+
runner.bench_time_func('batch_append_v1', func, 1)

benchmarks/record_batch_read.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/usr/bin/env python
2+
from __future__ import print_function
3+
import perf
4+
from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
5+
import itertools
6+
import random
7+
import hashlib
8+
import os
9+
10+
11+
DEFAULT_BATCH_SIZE = 1600 * 1024
12+
KEY_SIZE = 6
13+
VALUE_SIZE = 60
14+
TIMESTAMP_RANGE = [1505824130000, 1505824140000]
15+
16+
BATCH_SAMPLES = 5
17+
MESSAGES_PER_BATCH = 100
18+
19+
20+
def random_bytes(length):
21+
buffer = bytearray(length)
22+
for i in range(length):
23+
buffer[i] = random.randint(0, 255)
24+
return bytes(buffer)
25+
26+
27+
def prepare(magic):
28+
samples = []
29+
for _ in range(BATCH_SAMPLES):
30+
batch = MemoryRecordsBuilder(
31+
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
32+
for _ in range(MESSAGES_PER_BATCH):
33+
size = batch.append(
34+
random.randint(*TIMESTAMP_RANGE),
35+
random_bytes(KEY_SIZE),
36+
random_bytes(VALUE_SIZE))
37+
assert size
38+
batch.close()
39+
samples.append(bytes(batch.buffer()))
40+
41+
return iter(itertools.cycle(samples))
42+
43+
44+
def finalize(results):
45+
# Just some strange code to make sure PyPy does execute the code above
46+
# properly
47+
hash_val = hashlib.md5()
48+
for buf in results:
49+
hash_val.update(buf)
50+
print(hash_val, file=open(os.devnull, "w"))
51+
52+
53+
def func(loops, magic):
54+
# Jit can optimize out the whole function if the result is the same each
55+
# time, so we need some randomized input data )
56+
precomputed_samples = prepare(magic)
57+
results = []
58+
59+
# Main benchmark code.
60+
batch_data = next(precomputed_samples)
61+
t0 = perf.perf_counter()
62+
for _ in range(loops):
63+
records = MemoryRecords(batch_data)
64+
while records.has_next():
65+
batch = records.next_batch()
66+
batch.validate_crc()
67+
for record in batch:
68+
results.append(record.value)
69+
70+
res = perf.perf_counter() - t0
71+
finalize(results)
72+
73+
return res
74+
75+
76+
runner = perf.Runner()
77+
runner.bench_time_func('batch_read_v0', func, 0)
78+
runner.bench_time_func('batch_read_v1', func, 1)

0 commit comments

Comments
 (0)