Skip to content

Commit 8b05ee8

Browse files
Add DefaultRecordBatch implementation aka V2 message format parser/builder. (#1185)
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
1 parent 4213d53 commit 8b05ee8

18 files changed

+1696
-31
lines changed

benchmarks/record_batch_compose.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ def func(loops, magic):
5858
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
5959
for _ in range(MESSAGES_PER_BATCH):
6060
key, value, timestamp = next(precomputed_samples)
61-
size = batch.append(timestamp=timestamp, key=key, value=value)
61+
size = batch.append(
62+
timestamp=timestamp, key=key, value=value)
6263
assert size
6364
batch.close()
6465
results.append(batch.buffer())
@@ -73,3 +74,4 @@ def func(loops, magic):
7374
runner = perf.Runner()
7475
runner.bench_time_func('batch_append_v0', func, 0)
7576
runner.bench_time_func('batch_append_v1', func, 1)
77+
runner.bench_time_func('batch_append_v2', func, 2)

benchmarks/record_batch_read.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ def prepare(magic):
3535
size = batch.append(
3636
random.randint(*TIMESTAMP_RANGE),
3737
random_bytes(KEY_SIZE),
38-
random_bytes(VALUE_SIZE))
38+
random_bytes(VALUE_SIZE),
39+
headers=[])
3940
assert size
4041
batch.close()
4142
samples.append(bytes(batch.buffer()))
@@ -78,3 +79,4 @@ def func(loops, magic):
7879
runner = perf.Runner()
7980
runner.bench_time_func('batch_read_v0', func, 0)
8081
runner.bench_time_func('batch_read_v1', func, 1)
82+
runner.bench_time_func('batch_read_v2', func, 2)

0 commit comments

Comments
 (0)