Description
Describe the bug
Closing the kafka.Writer while attempting to write a message can cause the process to hang indefinitely. This issue is resolved by closing the writer again.
Kafka Version
- Kafka Version: 3.7.0
- kafka-go Version: current main branch
To Reproduce
package main
import (
"context"
"log"
"net"
"strconv"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
brokerURL := "localhost:9092"
topic := "test"
createTopic(topic, brokerURL)
writer := &kafka.Writer{
Addr: kafka.TCP(brokerURL),
Topic: topic,
Logger: log.Default(),
Transport: &kafka.Transport{
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
}).DialContext,
},
}
done := make(chan struct{})
go func() {
time.Sleep(1 * time.Millisecond)
writer.Close()
close(done)
}()
if err := writer.WriteMessages(context.Background(), kafka.Message{
Value: []byte("payload"),
}); err != nil {
log.Fatal(err)
}
<-done
log.Print("does not reach this point")
}
func createTopic(name, brokerURL string) {
conn, err := kafka.Dial("tcp", brokerURL)
if err != nil {
panic(err)
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err)
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err)
}
defer controllerConn.Close()
err = controllerConn.CreateTopics(kafka.TopicConfig{
Topic: name,
NumPartitions: 1,
ReplicationFactor: 1,
})
if err != nil {
panic(err)
}
}
Expected Behavior
Expecting kafka.Writer.Close()
not to hang.
Observed Behavior
Hangs here: w.group.Wait()
*batchQueue.cond.Wait() waits for a broadcast signal, preventing the decrementing of the waitgroup counter. The signal would typically come from closing the writer here, but at that point, the w.writers
map is still empty.
Sequence of function calls leading to it:
- *kafka.Writer.batchMessages
- newPartitionWriter
- *kafka.Writer.spawn (adds 1 to the waitgroup counter)
- f()/*partitionWriter.writeBatches()
- *partitionWriter.queue.Get()
- *batchQueue.cond.Wait()
Closing the writer again resolves the issue.
Additional Context
This issue happened in an application that creates writers with different transports, which are closed after a configurable period of inactivity. The transports are not reused, causing the related goroutines (*connPool).discover
and (*conn).run
to also hang indefinitely.
This is almost a non-issue in real scenarios unless the inactivity period is set to a very low duration. I can also work around it by closing the writer again if the previous close doesn't complete in time. However, I thought you might want to know about it regardless.
Thanks!