Skip to content

Closing kafka Writer during WriteMessages causes a potential hang #1307

Open
@ionutboangiu

Description

@ionutboangiu

Describe the bug

Closing the kafka.Writer while attempting to write a message can cause the process to hang indefinitely. This issue is resolved by closing the writer again.

Kafka Version

  • Kafka Version: 3.7.0
  • kafka-go Version: current main branch

To Reproduce

package main

import (
	"context"
	"log"
	"net"
	"strconv"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	brokerURL := "localhost:9092"
	topic := "test"
	createTopic(topic, brokerURL)
	writer := &kafka.Writer{
		Addr:   kafka.TCP(brokerURL),
		Topic:  topic,
		Logger: log.Default(),

		Transport: &kafka.Transport{
			Dial: (&net.Dialer{
				Timeout: 3 * time.Second,
			}).DialContext,
		},
	}
	done := make(chan struct{})
	go func() {
		time.Sleep(1 * time.Millisecond)
		writer.Close()
		close(done)
	}()
	if err := writer.WriteMessages(context.Background(), kafka.Message{
		Value: []byte("payload"),
	}); err != nil {
		log.Fatal(err)
	}

	<-done
	log.Print("does not reach this point")
}

func createTopic(name, brokerURL string) {
	conn, err := kafka.Dial("tcp", brokerURL)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	controller, err := conn.Controller()
	if err != nil {
		panic(err)
	}
	var controllerConn *kafka.Conn
	controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		panic(err)
	}
	defer controllerConn.Close()

	err = controllerConn.CreateTopics(kafka.TopicConfig{
		Topic:             name,
		NumPartitions:     1,
		ReplicationFactor: 1,
	})
	if err != nil {
		panic(err)
	}
}

Expected Behavior

Expecting kafka.Writer.Close() not to hang.

Observed Behavior

Hangs here: w.group.Wait()

*batchQueue.cond.Wait() waits for a broadcast signal, preventing the decrementing of the waitgroup counter. The signal would typically come from closing the writer here, but at that point, the w.writers map is still empty.

Sequence of function calls leading to it:

Closing the writer again resolves the issue.

Additional Context

This issue happened in an application that creates writers with different transports, which are closed after a configurable period of inactivity. The transports are not reused, causing the related goroutines (*connPool).discover and (*conn).run to also hang indefinitely.

This is almost a non-issue in real scenarios unless the inactivity period is set to a very low duration. I can also work around it by closing the writer again if the previous close doesn't complete in time. However, I thought you might want to know about it regardless.

Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions