Skip to content

[WIP]KAFKA-19080 The constraint on segment.ms is not enforced at topic level #19371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 47 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ef72eee
fix the topic level segment.bytes error
m1a2st Apr 4, 2025
36e59c8
use spotlessApply
m1a2st Apr 4, 2025
7a29f51
update test default value
m1a2st Apr 4, 2025
1d66fb0
wip
m1a2st Apr 4, 2025
60c01e9
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 9, 2025
9398bae
revert test change
m1a2st Apr 9, 2025
0f766c5
add new constructor for test
m1a2st Apr 9, 2025
a0342ed
add escape annotation
m1a2st Apr 9, 2025
f707465
fix some test
m1a2st Apr 9, 2025
768c8ed
Revert "fix some test"
m1a2st Apr 10, 2025
640ca6b
Revert "add escape annotation"
m1a2st Apr 10, 2025
2bfd478
Revert "add new constructor for test"
m1a2st Apr 10, 2025
51da598
fix all test
m1a2st Apr 10, 2025
8c39276
fix some test
m1a2st Apr 10, 2025
eefa646
fix MetadataLog Test
m1a2st Apr 10, 2025
e2adfbe
fix MetadataLog Test
m1a2st Apr 10, 2025
3d8e122
fix some test
m1a2st Apr 10, 2025
303ef39
fix fail tests
m1a2st Apr 11, 2025
3f1b757
Merge remote-tracking branch 'origin/KAFKA-19080' into KAFKA-19080
m1a2st Apr 11, 2025
71dbb89
fix config def error
m1a2st Apr 11, 2025
4c62f11
fix fail test
m1a2st Apr 11, 2025
c847bbc
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 11, 2025
ad36e75
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 12, 2025
5e4fe46
Update LogCleaner file
m1a2st Apr 12, 2025
9b9f469
change segmentSize modifier
m1a2st Apr 12, 2025
4e7a080
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 14, 2025
8b7d366
addressed by comments
m1a2st Apr 14, 2025
5bbdf1f
addressed by comments
m1a2st Apr 14, 2025
2a3db85
update the test
m1a2st Apr 14, 2025
919365a
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 15, 2025
a332072
remove unused import
m1a2st Apr 15, 2025
b774d92
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 20, 2025
6876739
move other internal config
m1a2st Apr 20, 2025
34a794f
update KafkaMetadataLog apply flow
m1a2st Apr 20, 2025
c10c9cb
fix compile error
m1a2st Apr 20, 2025
ea981e3
fix fail test
m1a2st Apr 21, 2025
934ac37
fix fail test
m1a2st Apr 21, 2025
77898ed
revert unused change
m1a2st Apr 21, 2025
1756fee
revert unused change
m1a2st Apr 21, 2025
0bfec85
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 22, 2025
1a3f737
temp
m1a2st Apr 22, 2025
5761c62
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 24, 2025
06131bf
Merge branch 'trunk' into KAFKA-19080
m1a2st Apr 24, 2025
7483a5d
completed the feature
m1a2st Apr 25, 2025
048e052
fix fail test
m1a2st Apr 25, 2025
76bd287
fix fail test
m1a2st Apr 25, 2025
ce644ae
addressed by comments
m1a2st Apr 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.raft.{Isolation, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
Expand Down Expand Up @@ -587,7 +587,10 @@ object KafkaMetadataLog extends Logging {
): KafkaMetadataLog = {
val props = new Properties()
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, config.maxBatchSizeInBytes.toString)
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
if (config.internalLogSegmentBytes != null)
props.setProperty(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
else
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, config.logSegmentBytes.toString)
props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)
props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT.toString)

Expand All @@ -596,12 +599,8 @@ object KafkaMetadataLog extends Logging {
props.setProperty(TopicConfig.RETENTION_BYTES_CONFIG, "-1")
LogConfig.validate(props)
val defaultLogConfig = new LogConfig(props)

if (config.logSegmentBytes < config.logSegmentMinBytes) {
throw new InvalidConfigurationException(
s"Cannot set ${MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG} below ${config.logSegmentMinBytes}: ${config.logSegmentBytes}"
)
} else if (defaultLogConfig.retentionMs >= 0) {

if (defaultLogConfig.retentionMs >= 0) {
throw new InvalidConfigurationException(
s"Cannot set ${TopicConfig.RETENTION_MS_CONFIG} above -1: ${defaultLogConfig.retentionMs}."
)
Expand Down Expand Up @@ -636,13 +635,6 @@ object KafkaMetadataLog extends Logging {
config,
nodeId
)

// Print a warning if users have overridden the internal config
if (config.logSegmentMinBytes != KafkaRaftClient.MAX_BATCH_SIZE_BYTES) {
metadataLog.error(s"Overriding ${MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG} is only supported for testing. Setting " +
s"this value too low may lead to an inability to write batches of metadata records.")
}

// When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower
// when the follower crashes after downloading a snapshot from the leader but before it could truncate the log fully.
metadataLog.truncateToLatestSnapshot()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testDeleteRecordsAfterCorruptRecords(groupProtocol: String): Unit = {
val config = new Properties()
config.put(TopicConfig.SEGMENT_BYTES_CONFIG, "200")
config.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, "200")
createTopic(topic, numPartitions = 1, replicationFactor = 1, config)

client = createAdminClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PR
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
Expand Down Expand Up @@ -580,7 +581,7 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
assertEquals(3, result.replicationFactor(topic1).get())
val topicConfigs = result.config(topic1).get().entries.asScala
assertTrue(topicConfigs.nonEmpty)
val segmentBytesConfig = topicConfigs.find(_.name == TopicConfig.SEGMENT_BYTES_CONFIG).get
val segmentBytesConfig = topicConfigs.find(_.name == LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG).get
assertEquals(100000, segmentBytesConfig.value.toLong)
assertEquals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG, segmentBytesConfig.source)
val compressionConfig = topicConfigs.find(_.name == TopicConfig.COMPRESSION_TYPE_CONFIG).get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
"Config not updated in LogManager")

val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found"))
TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing topic config using defaults not updated")
TestUtils.waitUntilTrue(() => {log.config.segmentSize() == 1048576}, "Existing topic config using defaults not updated")
val KafkaConfigToLogConfigName: Map[String, String] =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
props.asScala.foreach { case (k, v) =>
Expand Down
19 changes: 3 additions & 16 deletions core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.errors.{InvalidConfigurationException, RecordTooLargeException}
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.protocol
import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.record.ArbitraryMemoryRecords
Expand Down Expand Up @@ -76,15 +76,8 @@ final class KafkaMetadataLogTest {
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
props.put(KRaftConfigs.NODE_ID_CONFIG, Int.box(2))
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 * 1024))
assertThrows(classOf[InvalidConfigurationException], () => {
val kafkaConfig = KafkaConfig.fromProps(props)
val metadataConfig = new MetadataLogConfig(kafkaConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
})

props.put(MetadataLogConfig.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG, Int.box(10240))
props.put(MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, Int.box(10240))
val kafkaConfig = KafkaConfig.fromProps(props)
val metadataConfig = new MetadataLogConfig(kafkaConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
Expand Down Expand Up @@ -689,7 +682,6 @@ final class KafkaMetadataLogTest {
val recordSize = 64
val config = new MetadataLogConfig(
DefaultMetadataLogConfig.logSegmentBytes,
DefaultMetadataLogConfig.logSegmentMinBytes,
DefaultMetadataLogConfig.logSegmentMillis,
DefaultMetadataLogConfig.retentionMaxBytes,
DefaultMetadataLogConfig.retentionMillis,
Expand Down Expand Up @@ -908,7 +900,6 @@ final class KafkaMetadataLogTest {
@Test
def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
val config = new MetadataLogConfig(
512,
512,
10 * 1000,
256,
Expand Down Expand Up @@ -945,7 +936,6 @@ final class KafkaMetadataLogTest {
def testDeleteSnapshots(): Unit = {
// Generate some logs and a few snapshots, set retention low and verify that cleaning occurs
val config = new MetadataLogConfig(
1024,
1024,
10 * 1000,
1024,
Expand Down Expand Up @@ -979,7 +969,6 @@ final class KafkaMetadataLogTest {
def testSoftRetentionLimit(): Unit = {
// Set retention equal to the segment size and generate slightly more than one segment of logs
val config = new MetadataLogConfig(
10240,
10240,
10 * 1000,
10240,
Expand Down Expand Up @@ -1023,7 +1012,6 @@ final class KafkaMetadataLogTest {
@Test
def testSegmentsLessThanLatestSnapshot(): Unit = {
val config = new MetadataLogConfig(
10240,
10240,
10 * 1000,
10240,
Expand Down Expand Up @@ -1082,7 +1070,6 @@ object KafkaMetadataLogTest {
}

val DefaultMetadataLogConfig = new MetadataLogConfig(
100 * 1024,
100 * 1024,
10 * 1000,
100 * 1024,
Expand All @@ -1103,7 +1090,7 @@ object KafkaMetadataLogTest {
UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition)
)

val metadataLog = KafkaMetadataLog(
val metadataLog = KafkaMetadataLog.apply(
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
logDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class AbstractPartitionTest {

def createLogProperties(overrides: Map[String, String]): Properties = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer)
overrides.foreach { case (k, v) => logProps.put(k, v) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class PartitionLockTest extends Logging {

private def createLogProperties(overrides: Map[String, String]): Properties = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer)
overrides.foreach { case (k, v) => logProps.put(k, v) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ abstract class AbstractLogCleanerIntegrationTest {
maxCompactionLagMs: Long = defaultMaxCompactionLagMs): Properties = {
val props = new Properties()
props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize: java.lang.Integer)
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize: java.lang.Integer)
props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: java.lang.Integer)
props.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 100*1024: java.lang.Integer)
props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay: java.lang.Integer)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
Expand Down
18 changes: 9 additions & 9 deletions core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class LogCleanerManagerTest extends Logging {
val topicPartition = new TopicPartition("log", 0)
val topicPartition2 = new TopicPartition("log2", 0)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig: LogConfig = new LogConfig(logProps)
Expand Down Expand Up @@ -370,7 +370,7 @@ class LogCleanerManagerTest extends Logging {

// change cleanup policy from delete to compact
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, log.config.segmentSize: Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, log.config.segmentSize(): Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, log.config.retentionMs: java.lang.Long)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0: Integer)
Expand Down Expand Up @@ -548,7 +548,7 @@ class LogCleanerManagerTest extends Logging {
@Test
def testCleanableOffsetsForNone(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)

val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))

Expand All @@ -570,7 +570,7 @@ class LogCleanerManagerTest extends Logging {
@Test
def testCleanableOffsetsActiveSegment(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)

val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))

Expand All @@ -592,7 +592,7 @@ class LogCleanerManagerTest extends Logging {
def testCleanableOffsetsForTime(): Unit = {
val compactionLag = 60 * 60 * 1000
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer)

val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
Expand Down Expand Up @@ -625,7 +625,7 @@ class LogCleanerManagerTest extends Logging {
def testCleanableOffsetsForShortTime(): Unit = {
val compactionLag = 60 * 60 * 1000
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer)

val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
Expand Down Expand Up @@ -667,7 +667,7 @@ class LogCleanerManagerTest extends Logging {
def testUndecidedTransactionalDataNotCleanable(): Unit = {
val compactionLag = 60 * 60 * 1000
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag: java.lang.Integer)

val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
Expand Down Expand Up @@ -711,7 +711,7 @@ class LogCleanerManagerTest extends Logging {
@Test
def testDoneCleaning(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
while (log.numberOfSegments < 8)
log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), 0)
Expand Down Expand Up @@ -830,7 +830,7 @@ class LogCleanerManagerTest extends Logging {

private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy: String): LogConfig = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize: Integer)
logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 1: Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.05: java.lang.Double) // small for easier and clearer tests
Expand Down
Loading
Loading