Skip to content

Redpanda ListOffsets can return incorrect results when producing records using franz-go #25769

Open
@mihaitodor

Description

@mihaitodor

Version & Environment

Redpanda version: (use rpk version):

rpk version: v25.1.1
Git ref: c3547f4
Build date: 2025-04-04T23:45:58Z
OS/Arch: linux/arm64
Go version: go1.23.1

Redpanda Cluster
node-0 v25.1.1 - c3547f4

Operating System: OSX Sequoia 15.3.2 (24D81)

github.com/twmb/[email protected]
github.com/twmb/franz-go/pkg/[email protected]
github.com/twmb/franz-go/pkg/[email protected]

colima version 0.8.1

Docker info:
Client: Docker Engine - Community
 Version:    27.5.1
 Context:    default
 Debug Mode: false
 Plugins:
  buildx: Docker Buildx (Docker Inc.)
    Version:  v0.20.1
    Path:     /Users/mihaitodor/.docker/cli-plugins/docker-buildx

Server:
 Containers: 1
  Running: 1
  Paused: 0
  Stopped: 0
 Images: 30
 Server Version: 27.3.1
 Storage Driver: overlay2
  Backing Filesystem: extfs
  Supports d_type: true
  Using metacopy: false
  Native Overlay Diff: true
  userxattr: false
 Logging Driver: json-file
 Cgroup Driver: cgroupfs
 Cgroup Version: 2
 Plugins:
  Volume: local
  Network: bridge host ipvlan macvlan null overlay
  Log: awslogs fluentd gcplogs gelf journald json-file local splunk syslog
 Swarm: inactive
 Runtimes: io.containerd.runc.v2 runc
 Default Runtime: runc
 Init Binary: docker-init
 containerd version: 7f7fdf5fed64eb6a7caf99b3e12efcf9d60e311c
 runc version: v1.1.14-0-g2c9f560
 init version: de40ad0
 Security Options:
  apparmor
  seccomp
   Profile: builtin
  cgroupns
 Kernel Version: 6.8.0-47-generic
 Operating System: Ubuntu 24.04.1 LTS
 OSType: linux
 Architecture: aarch64
 CPUs: 8
 Total Memory: 31.28GiB
 Name: colima
 ID: 466cd49b-8c3a-431c-ba8e-6e370a916158
 Docker Root Dir: /var/lib/docker
 Debug Mode: false
 Experimental: false
 Insecure Registries:
  127.0.0.0/8
 Live Restore Enabled: false

WARNING: bridge-nf-call-iptables is disabled
WARNING: bridge-nf-call-ip6tables is disabled

What went wrong?

Please start a Redpanda container like so:

$ docker run --rm -it --name=source -p 8081:8081 -p 9092:9092 -p 9644:9644 redpandadata/redpanda redpanda start --node-id 0 --mode dev-container --set "rpk.additional_start_flags=[--reactor-backend=epoll]" --set redpanda.auto_create_topics_enabled=true --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr host.docker.internal:9092 --schema-registry-addr 0.0.0.0:8081

After that, please run the following code:

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"
	"sync"
	"time"

	"github.com/twmb/franz-go/pkg/kadm"
	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	brokerAddr := "localhost:9092"
	client, err := kgo.NewClient(
		kgo.SeedBrokers([]string{brokerAddr}...),
		kgo.ProducerBatchCompression(kgo.SnappyCompression()),
	)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	adm := kadm.NewClient(client)

	topic := "foobar"

	_, err = adm.CreateTopic(context.Background(), 1, -1, nil, topic)
	if err != nil {
		panic(err)
	}

	batches := 5
	records := 10
	for i := range batches {
		var wg sync.WaitGroup

		data := make([]*kgo.Record, 0, records)
		for j := range records {
			data = append(data, &kgo.Record{
				Topic:     topic,
				Value:     []byte(`test`),
				Key:       []byte(strconv.Itoa(i*records + j + 1)),
				Timestamp: time.UnixMilli(int64(i*records + j + 1)),
			})
		}

		results := make(kgo.ProduceResults, 0, records)
		wg.Add(records)
		for j := range records {
			promise := func(r *kgo.Record, err error) {
				defer wg.Done()
				results = append(results, kgo.ProduceResult{Record: r, Err: err})
			}
			client.Produce(context.Background(), data[j], promise)

			// Sleep between batches...
			// time.Sleep(250 * time.Millisecond)
		}
		wg.Wait()

		err := results.FirstErr()
		if err != nil {
			log.Fatalf("failed batch %d: %s", i, err)
		}
	}

	for ts := range records * batches {
		offsets, err := adm.ListOffsetsAfterMilli(context.Background(), int64(ts+1), topic)
		if err != nil {
			log.Fatalf("failed to list offsets: %s", err)
		}

		if err := offsets.Error(); err != nil {
			log.Fatalf("listed offsets error: %s", err)
		}

		offset, ok := offsets.Lookup(topic, 0)
		if !ok {
			log.Fatal("failed to find offset")
		}

		fmt.Println("Offset: ", offset.Offset)
	}
}

You should see the following output:

Offset:  0
Offset:  0
Offset:  0
Offset:  0
Offset:  0
Offset:  0
Offset:  0
Offset:  0
Offset:  0
Offset:  0
Offset:  10
Offset:  10
Offset:  10
Offset:  10
Offset:  10
Offset:  10
Offset:  10
Offset:  10
Offset:  10
Offset:  10
Offset:  20
Offset:  20
Offset:  20
Offset:  20
Offset:  20
Offset:  20
Offset:  20
Offset:  20
Offset:  20
Offset:  20
Offset:  30
Offset:  30
Offset:  30
Offset:  30
Offset:  30
Offset:  30
Offset:  30
Offset:  30
Offset:  30
Offset:  30
Offset:  40
Offset:  40
Offset:  40
Offset:  40
Offset:  40
Offset:  40
Offset:  40
Offset:  40
Offset:  40
Offset:  40
`rpk topic consume foobar -o ':end' -X brokers=localhost:9092` output:
{
  "topic": "foobar",
  "key": "1",
  "value": "test",
  "timestamp": 1,
  "partition": 0,
  "offset": 0
}
{
  "topic": "foobar",
  "key": "2",
  "value": "test",
  "timestamp": 2,
  "partition": 0,
  "offset": 1
}
{
  "topic": "foobar",
  "key": "3",
  "value": "test",
  "timestamp": 3,
  "partition": 0,
  "offset": 2
}
{
  "topic": "foobar",
  "key": "4",
  "value": "test",
  "timestamp": 4,
  "partition": 0,
  "offset": 3
}
{
  "topic": "foobar",
  "key": "5",
  "value": "test",
  "timestamp": 5,
  "partition": 0,
  "offset": 4
}
{
  "topic": "foobar",
  "key": "6",
  "value": "test",
  "timestamp": 6,
  "partition": 0,
  "offset": 5
}
{
  "topic": "foobar",
  "key": "7",
  "value": "test",
  "timestamp": 7,
  "partition": 0,
  "offset": 6
}
{
  "topic": "foobar",
  "key": "8",
  "value": "test",
  "timestamp": 8,
  "partition": 0,
  "offset": 7
}
{
  "topic": "foobar",
  "key": "9",
  "value": "test",
  "timestamp": 9,
  "partition": 0,
  "offset": 8
}
{
  "topic": "foobar",
  "key": "10",
  "value": "test",
  "timestamp": 10,
  "partition": 0,
  "offset": 9
}
{
  "topic": "foobar",
  "key": "11",
  "value": "test",
  "timestamp": 11,
  "partition": 0,
  "offset": 10
}
{
  "topic": "foobar",
  "key": "12",
  "value": "test",
  "timestamp": 12,
  "partition": 0,
  "offset": 11
}
{
  "topic": "foobar",
  "key": "13",
  "value": "test",
  "timestamp": 13,
  "partition": 0,
  "offset": 12
}
{
  "topic": "foobar",
  "key": "14",
  "value": "test",
  "timestamp": 14,
  "partition": 0,
  "offset": 13
}
{
  "topic": "foobar",
  "key": "15",
  "value": "test",
  "timestamp": 15,
  "partition": 0,
  "offset": 14
}
{
  "topic": "foobar",
  "key": "16",
  "value": "test",
  "timestamp": 16,
  "partition": 0,
  "offset": 15
}
{
  "topic": "foobar",
  "key": "17",
  "value": "test",
  "timestamp": 17,
  "partition": 0,
  "offset": 16
}
{
  "topic": "foobar",
  "key": "18",
  "value": "test",
  "timestamp": 18,
  "partition": 0,
  "offset": 17
}
{
  "topic": "foobar",
  "key": "19",
  "value": "test",
  "timestamp": 19,
  "partition": 0,
  "offset": 18
}
{
  "topic": "foobar",
  "key": "20",
  "value": "test",
  "timestamp": 20,
  "partition": 0,
  "offset": 19
}
{
  "topic": "foobar",
  "key": "21",
  "value": "test",
  "timestamp": 21,
  "partition": 0,
  "offset": 20
}
{
  "topic": "foobar",
  "key": "22",
  "value": "test",
  "timestamp": 22,
  "partition": 0,
  "offset": 21
}
{
  "topic": "foobar",
  "key": "23",
  "value": "test",
  "timestamp": 23,
  "partition": 0,
  "offset": 22
}
{
  "topic": "foobar",
  "key": "24",
  "value": "test",
  "timestamp": 24,
  "partition": 0,
  "offset": 23
}
{
  "topic": "foobar",
  "key": "25",
  "value": "test",
  "timestamp": 25,
  "partition": 0,
  "offset": 24
}
{
  "topic": "foobar",
  "key": "26",
  "value": "test",
  "timestamp": 26,
  "partition": 0,
  "offset": 25
}
{
  "topic": "foobar",
  "key": "27",
  "value": "test",
  "timestamp": 27,
  "partition": 0,
  "offset": 26
}
{
  "topic": "foobar",
  "key": "28",
  "value": "test",
  "timestamp": 28,
  "partition": 0,
  "offset": 27
}
{
  "topic": "foobar",
  "key": "29",
  "value": "test",
  "timestamp": 29,
  "partition": 0,
  "offset": 28
}
{
  "topic": "foobar",
  "key": "30",
  "value": "test",
  "timestamp": 30,
  "partition": 0,
  "offset": 29
}
{
  "topic": "foobar",
  "key": "31",
  "value": "test",
  "timestamp": 31,
  "partition": 0,
  "offset": 30
}
{
  "topic": "foobar",
  "key": "32",
  "value": "test",
  "timestamp": 32,
  "partition": 0,
  "offset": 31
}
{
  "topic": "foobar",
  "key": "33",
  "value": "test",
  "timestamp": 33,
  "partition": 0,
  "offset": 32
}
{
  "topic": "foobar",
  "key": "34",
  "value": "test",
  "timestamp": 34,
  "partition": 0,
  "offset": 33
}
{
  "topic": "foobar",
  "key": "35",
  "value": "test",
  "timestamp": 35,
  "partition": 0,
  "offset": 34
}
{
  "topic": "foobar",
  "key": "36",
  "value": "test",
  "timestamp": 36,
  "partition": 0,
  "offset": 35
}
{
  "topic": "foobar",
  "key": "37",
  "value": "test",
  "timestamp": 37,
  "partition": 0,
  "offset": 36
}
{
  "topic": "foobar",
  "key": "38",
  "value": "test",
  "timestamp": 38,
  "partition": 0,
  "offset": 37
}
{
  "topic": "foobar",
  "key": "39",
  "value": "test",
  "timestamp": 39,
  "partition": 0,
  "offset": 38
}
{
  "topic": "foobar",
  "key": "40",
  "value": "test",
  "timestamp": 40,
  "partition": 0,
  "offset": 39
}
{
  "topic": "foobar",
  "key": "41",
  "value": "test",
  "timestamp": 41,
  "partition": 0,
  "offset": 40
}
{
  "topic": "foobar",
  "key": "42",
  "value": "test",
  "timestamp": 42,
  "partition": 0,
  "offset": 41
}
{
  "topic": "foobar",
  "key": "43",
  "value": "test",
  "timestamp": 43,
  "partition": 0,
  "offset": 42
}
{
  "topic": "foobar",
  "key": "44",
  "value": "test",
  "timestamp": 44,
  "partition": 0,
  "offset": 43
}
{
  "topic": "foobar",
  "key": "45",
  "value": "test",
  "timestamp": 45,
  "partition": 0,
  "offset": 44
}
{
  "topic": "foobar",
  "key": "46",
  "value": "test",
  "timestamp": 46,
  "partition": 0,
  "offset": 45
}
{
  "topic": "foobar",
  "key": "47",
  "value": "test",
  "timestamp": 47,
  "partition": 0,
  "offset": 46
}
{
  "topic": "foobar",
  "key": "48",
  "value": "test",
  "timestamp": 48,
  "partition": 0,
  "offset": 47
}
{
  "topic": "foobar",
  "key": "49",
  "value": "test",
  "timestamp": 49,
  "partition": 0,
  "offset": 48
}
{
  "topic": "foobar",
  "key": "50",
  "value": "test",
  "timestamp": 50,
  "partition": 0,
  "offset": 49
}

Note how the record timestamps are in ascending order.

What should have happened instead?

Try uncommenting line 57 (time.Sleep(250 * time.Millisecond)) so there is a pause between each emitted message. You should now get the following output (please recreate the container first):

Offset:  0
Offset:  1
Offset:  2
Offset:  3
Offset:  4
Offset:  5
Offset:  6
Offset:  7
Offset:  8
Offset:  9
Offset:  10
Offset:  11
Offset:  12
Offset:  13
Offset:  14
Offset:  15
Offset:  16
Offset:  17
Offset:  18
Offset:  19
Offset:  20
Offset:  21
Offset:  22
Offset:  23
Offset:  24
Offset:  25
Offset:  26
Offset:  27
Offset:  28
Offset:  29
Offset:  30
Offset:  31
Offset:  32
Offset:  33
Offset:  34
Offset:  35
Offset:  36
Offset:  37
Offset:  38
Offset:  39
Offset:  40
Offset:  41
Offset:  42
Offset:  43
Offset:  44
Offset:  45
Offset:  46
Offset:  47
Offset:  48
Offset:  49

From the kgo.Client.Produce docs:

Records are produced in order per partition if the record is produced successfully.

This seems to break if the records are produced in parallel since ListOffsetsAfterMilli returns offsets which contain timestamps that are older than the one supplied as input.

When using a Kafka container, even without the sleep, ListOffsetsAfterMilli will return the expected offsets.

Docker Compose for running Kafka:
services:
  source:
    image: bitnami/kafka
    environment:
      KAFKA_CFG_NODE_ID: 0
      KAFKA_CFG_PROCESS_ROLES: controller,broker
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_DOCKER:PLAINTEXT
      KAFKA_CFG_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_DOCKER://0.0.0.0:19092,CONTROLLER://:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_DOCKER://source:19092
      KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
      KAFKA_CFG_SUPER_USERS: User:redpanda;User:ANONYMOUS
    ports:
      - 9092:9092
      - 19092:19092
    healthcheck:
      test: [ "CMD", "kafka-topics.sh", "--bootstrap-server=localhost:9092", "--list" ]
      start_period: 5s
      interval: 3s

How to reproduce the issue?

See above.

Additional information

I tested that the records are compressed:

$ rpk topic consume foobar --format '%d - %a{compression}\n' -o ':end' -X brokers=localhost:9092
1 - snappy
2 - snappy
3 - snappy
4 - snappy
5 - snappy
...

If I change kgo.ProducerBatchCompression(kgo.SnappyCompression()) to kgo.ProducerBatchCompression(kgo.NoCompression()), the issue goes away.

JIRA Link: CORE-9778

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind/bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions