Description
Version & Environment
Redpanda version: (use rpk version
):
rpk version: v25.1.1
Git ref: c3547f4
Build date: 2025-04-04T23:45:58Z
OS/Arch: linux/arm64
Go version: go1.23.1
Redpanda Cluster
node-0 v25.1.1 - c3547f4
Operating System: OSX Sequoia 15.3.2 (24D81)
github.com/twmb/[email protected]
github.com/twmb/franz-go/pkg/[email protected]
github.com/twmb/franz-go/pkg/[email protected]
colima version 0.8.1
Docker info:
Client: Docker Engine - Community
Version: 27.5.1
Context: default
Debug Mode: false
Plugins:
buildx: Docker Buildx (Docker Inc.)
Version: v0.20.1
Path: /Users/mihaitodor/.docker/cli-plugins/docker-buildx
Server:
Containers: 1
Running: 1
Paused: 0
Stopped: 0
Images: 30
Server Version: 27.3.1
Storage Driver: overlay2
Backing Filesystem: extfs
Supports d_type: true
Using metacopy: false
Native Overlay Diff: true
userxattr: false
Logging Driver: json-file
Cgroup Driver: cgroupfs
Cgroup Version: 2
Plugins:
Volume: local
Network: bridge host ipvlan macvlan null overlay
Log: awslogs fluentd gcplogs gelf journald json-file local splunk syslog
Swarm: inactive
Runtimes: io.containerd.runc.v2 runc
Default Runtime: runc
Init Binary: docker-init
containerd version: 7f7fdf5fed64eb6a7caf99b3e12efcf9d60e311c
runc version: v1.1.14-0-g2c9f560
init version: de40ad0
Security Options:
apparmor
seccomp
Profile: builtin
cgroupns
Kernel Version: 6.8.0-47-generic
Operating System: Ubuntu 24.04.1 LTS
OSType: linux
Architecture: aarch64
CPUs: 8
Total Memory: 31.28GiB
Name: colima
ID: 466cd49b-8c3a-431c-ba8e-6e370a916158
Docker Root Dir: /var/lib/docker
Debug Mode: false
Experimental: false
Insecure Registries:
127.0.0.0/8
Live Restore Enabled: false
WARNING: bridge-nf-call-iptables is disabled
WARNING: bridge-nf-call-ip6tables is disabled
What went wrong?
Please start a Redpanda container like so:
$ docker run --rm -it --name=source -p 8081:8081 -p 9092:9092 -p 9644:9644 redpandadata/redpanda redpanda start --node-id 0 --mode dev-container --set "rpk.additional_start_flags=[--reactor-backend=epoll]" --set redpanda.auto_create_topics_enabled=true --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr host.docker.internal:9092 --schema-registry-addr 0.0.0.0:8081
After that, please run the following code:
package main
import (
"context"
"fmt"
"log"
"strconv"
"sync"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
brokerAddr := "localhost:9092"
client, err := kgo.NewClient(
kgo.SeedBrokers([]string{brokerAddr}...),
kgo.ProducerBatchCompression(kgo.SnappyCompression()),
)
if err != nil {
panic(err)
}
defer client.Close()
adm := kadm.NewClient(client)
topic := "foobar"
_, err = adm.CreateTopic(context.Background(), 1, -1, nil, topic)
if err != nil {
panic(err)
}
batches := 5
records := 10
for i := range batches {
var wg sync.WaitGroup
data := make([]*kgo.Record, 0, records)
for j := range records {
data = append(data, &kgo.Record{
Topic: topic,
Value: []byte(`test`),
Key: []byte(strconv.Itoa(i*records + j + 1)),
Timestamp: time.UnixMilli(int64(i*records + j + 1)),
})
}
results := make(kgo.ProduceResults, 0, records)
wg.Add(records)
for j := range records {
promise := func(r *kgo.Record, err error) {
defer wg.Done()
results = append(results, kgo.ProduceResult{Record: r, Err: err})
}
client.Produce(context.Background(), data[j], promise)
// Sleep between batches...
// time.Sleep(250 * time.Millisecond)
}
wg.Wait()
err := results.FirstErr()
if err != nil {
log.Fatalf("failed batch %d: %s", i, err)
}
}
for ts := range records * batches {
offsets, err := adm.ListOffsetsAfterMilli(context.Background(), int64(ts+1), topic)
if err != nil {
log.Fatalf("failed to list offsets: %s", err)
}
if err := offsets.Error(); err != nil {
log.Fatalf("listed offsets error: %s", err)
}
offset, ok := offsets.Lookup(topic, 0)
if !ok {
log.Fatal("failed to find offset")
}
fmt.Println("Offset: ", offset.Offset)
}
}
You should see the following output:
Offset: 0
Offset: 0
Offset: 0
Offset: 0
Offset: 0
Offset: 0
Offset: 0
Offset: 0
Offset: 0
Offset: 0
Offset: 10
Offset: 10
Offset: 10
Offset: 10
Offset: 10
Offset: 10
Offset: 10
Offset: 10
Offset: 10
Offset: 10
Offset: 20
Offset: 20
Offset: 20
Offset: 20
Offset: 20
Offset: 20
Offset: 20
Offset: 20
Offset: 20
Offset: 20
Offset: 30
Offset: 30
Offset: 30
Offset: 30
Offset: 30
Offset: 30
Offset: 30
Offset: 30
Offset: 30
Offset: 30
Offset: 40
Offset: 40
Offset: 40
Offset: 40
Offset: 40
Offset: 40
Offset: 40
Offset: 40
Offset: 40
Offset: 40
`rpk topic consume foobar -o ':end' -X brokers=localhost:9092` output:
{
"topic": "foobar",
"key": "1",
"value": "test",
"timestamp": 1,
"partition": 0,
"offset": 0
}
{
"topic": "foobar",
"key": "2",
"value": "test",
"timestamp": 2,
"partition": 0,
"offset": 1
}
{
"topic": "foobar",
"key": "3",
"value": "test",
"timestamp": 3,
"partition": 0,
"offset": 2
}
{
"topic": "foobar",
"key": "4",
"value": "test",
"timestamp": 4,
"partition": 0,
"offset": 3
}
{
"topic": "foobar",
"key": "5",
"value": "test",
"timestamp": 5,
"partition": 0,
"offset": 4
}
{
"topic": "foobar",
"key": "6",
"value": "test",
"timestamp": 6,
"partition": 0,
"offset": 5
}
{
"topic": "foobar",
"key": "7",
"value": "test",
"timestamp": 7,
"partition": 0,
"offset": 6
}
{
"topic": "foobar",
"key": "8",
"value": "test",
"timestamp": 8,
"partition": 0,
"offset": 7
}
{
"topic": "foobar",
"key": "9",
"value": "test",
"timestamp": 9,
"partition": 0,
"offset": 8
}
{
"topic": "foobar",
"key": "10",
"value": "test",
"timestamp": 10,
"partition": 0,
"offset": 9
}
{
"topic": "foobar",
"key": "11",
"value": "test",
"timestamp": 11,
"partition": 0,
"offset": 10
}
{
"topic": "foobar",
"key": "12",
"value": "test",
"timestamp": 12,
"partition": 0,
"offset": 11
}
{
"topic": "foobar",
"key": "13",
"value": "test",
"timestamp": 13,
"partition": 0,
"offset": 12
}
{
"topic": "foobar",
"key": "14",
"value": "test",
"timestamp": 14,
"partition": 0,
"offset": 13
}
{
"topic": "foobar",
"key": "15",
"value": "test",
"timestamp": 15,
"partition": 0,
"offset": 14
}
{
"topic": "foobar",
"key": "16",
"value": "test",
"timestamp": 16,
"partition": 0,
"offset": 15
}
{
"topic": "foobar",
"key": "17",
"value": "test",
"timestamp": 17,
"partition": 0,
"offset": 16
}
{
"topic": "foobar",
"key": "18",
"value": "test",
"timestamp": 18,
"partition": 0,
"offset": 17
}
{
"topic": "foobar",
"key": "19",
"value": "test",
"timestamp": 19,
"partition": 0,
"offset": 18
}
{
"topic": "foobar",
"key": "20",
"value": "test",
"timestamp": 20,
"partition": 0,
"offset": 19
}
{
"topic": "foobar",
"key": "21",
"value": "test",
"timestamp": 21,
"partition": 0,
"offset": 20
}
{
"topic": "foobar",
"key": "22",
"value": "test",
"timestamp": 22,
"partition": 0,
"offset": 21
}
{
"topic": "foobar",
"key": "23",
"value": "test",
"timestamp": 23,
"partition": 0,
"offset": 22
}
{
"topic": "foobar",
"key": "24",
"value": "test",
"timestamp": 24,
"partition": 0,
"offset": 23
}
{
"topic": "foobar",
"key": "25",
"value": "test",
"timestamp": 25,
"partition": 0,
"offset": 24
}
{
"topic": "foobar",
"key": "26",
"value": "test",
"timestamp": 26,
"partition": 0,
"offset": 25
}
{
"topic": "foobar",
"key": "27",
"value": "test",
"timestamp": 27,
"partition": 0,
"offset": 26
}
{
"topic": "foobar",
"key": "28",
"value": "test",
"timestamp": 28,
"partition": 0,
"offset": 27
}
{
"topic": "foobar",
"key": "29",
"value": "test",
"timestamp": 29,
"partition": 0,
"offset": 28
}
{
"topic": "foobar",
"key": "30",
"value": "test",
"timestamp": 30,
"partition": 0,
"offset": 29
}
{
"topic": "foobar",
"key": "31",
"value": "test",
"timestamp": 31,
"partition": 0,
"offset": 30
}
{
"topic": "foobar",
"key": "32",
"value": "test",
"timestamp": 32,
"partition": 0,
"offset": 31
}
{
"topic": "foobar",
"key": "33",
"value": "test",
"timestamp": 33,
"partition": 0,
"offset": 32
}
{
"topic": "foobar",
"key": "34",
"value": "test",
"timestamp": 34,
"partition": 0,
"offset": 33
}
{
"topic": "foobar",
"key": "35",
"value": "test",
"timestamp": 35,
"partition": 0,
"offset": 34
}
{
"topic": "foobar",
"key": "36",
"value": "test",
"timestamp": 36,
"partition": 0,
"offset": 35
}
{
"topic": "foobar",
"key": "37",
"value": "test",
"timestamp": 37,
"partition": 0,
"offset": 36
}
{
"topic": "foobar",
"key": "38",
"value": "test",
"timestamp": 38,
"partition": 0,
"offset": 37
}
{
"topic": "foobar",
"key": "39",
"value": "test",
"timestamp": 39,
"partition": 0,
"offset": 38
}
{
"topic": "foobar",
"key": "40",
"value": "test",
"timestamp": 40,
"partition": 0,
"offset": 39
}
{
"topic": "foobar",
"key": "41",
"value": "test",
"timestamp": 41,
"partition": 0,
"offset": 40
}
{
"topic": "foobar",
"key": "42",
"value": "test",
"timestamp": 42,
"partition": 0,
"offset": 41
}
{
"topic": "foobar",
"key": "43",
"value": "test",
"timestamp": 43,
"partition": 0,
"offset": 42
}
{
"topic": "foobar",
"key": "44",
"value": "test",
"timestamp": 44,
"partition": 0,
"offset": 43
}
{
"topic": "foobar",
"key": "45",
"value": "test",
"timestamp": 45,
"partition": 0,
"offset": 44
}
{
"topic": "foobar",
"key": "46",
"value": "test",
"timestamp": 46,
"partition": 0,
"offset": 45
}
{
"topic": "foobar",
"key": "47",
"value": "test",
"timestamp": 47,
"partition": 0,
"offset": 46
}
{
"topic": "foobar",
"key": "48",
"value": "test",
"timestamp": 48,
"partition": 0,
"offset": 47
}
{
"topic": "foobar",
"key": "49",
"value": "test",
"timestamp": 49,
"partition": 0,
"offset": 48
}
{
"topic": "foobar",
"key": "50",
"value": "test",
"timestamp": 50,
"partition": 0,
"offset": 49
}
Note how the record timestamps are in ascending order.
What should have happened instead?
Try uncommenting line 57 (time.Sleep(250 * time.Millisecond)
) so there is a pause between each emitted message. You should now get the following output (please recreate the container first):
Offset: 0
Offset: 1
Offset: 2
Offset: 3
Offset: 4
Offset: 5
Offset: 6
Offset: 7
Offset: 8
Offset: 9
Offset: 10
Offset: 11
Offset: 12
Offset: 13
Offset: 14
Offset: 15
Offset: 16
Offset: 17
Offset: 18
Offset: 19
Offset: 20
Offset: 21
Offset: 22
Offset: 23
Offset: 24
Offset: 25
Offset: 26
Offset: 27
Offset: 28
Offset: 29
Offset: 30
Offset: 31
Offset: 32
Offset: 33
Offset: 34
Offset: 35
Offset: 36
Offset: 37
Offset: 38
Offset: 39
Offset: 40
Offset: 41
Offset: 42
Offset: 43
Offset: 44
Offset: 45
Offset: 46
Offset: 47
Offset: 48
Offset: 49
From the kgo.Client.Produce
docs:
Records are produced in order per partition if the record is produced successfully.
This seems to break if the records are produced in parallel since ListOffsetsAfterMilli
returns offsets which contain timestamps that are older than the one supplied as input.
When using a Kafka container, even without the sleep, ListOffsetsAfterMilli
will return the expected offsets.
Docker Compose for running Kafka:
services:
source:
image: bitnami/kafka
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:19092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_DOCKER://source:19092
KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
KAFKA_CFG_SUPER_USERS: User:redpanda;User:ANONYMOUS
ports:
- 9092:9092
- 19092:19092
healthcheck:
test: [ "CMD", "kafka-topics.sh", "--bootstrap-server=localhost:9092", "--list" ]
start_period: 5s
interval: 3s
How to reproduce the issue?
See above.
Additional information
I tested that the records are compressed:
$ rpk topic consume foobar --format '%d - %a{compression}\n' -o ':end' -X brokers=localhost:9092
1 - snappy
2 - snappy
3 - snappy
4 - snappy
5 - snappy
...
If I change kgo.ProducerBatchCompression(kgo.SnappyCompression())
to kgo.ProducerBatchCompression(kgo.NoCompression())
, the issue goes away.
JIRA Link: CORE-9778