blob: 346b17028fb926da0150beb9e563c191cf681d3c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
@Tag("integration")
class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testOffsetCommit(true)
}
@ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = {
testOffsetCommit(false)
}
@ClusterTest(clusterType = Type.ALL, serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
))
def testOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = {
testOffsetCommit(false)
}
private def testOffsetCommit(useNewProtocol: Boolean): Unit = {
if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
fail("Cannot use the new protocol with the old group coordinator.")
}
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
// Create the topic.
createTopic(
topic = "foo",
numPartitions = 3
)
// Join the consumer group. Note that we don't heartbeat here so we must use
// a session long enough for the duration of the test.
val (memberId, memberEpoch) = joinConsumerGroup("grp", useNewProtocol)
// Start from version 1 because version 0 goes to ZK.
for (version <- 1 to ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
// Commit offset.
commitOffset(
groupId = "grp",
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
partition = 0,
offset = 100L,
expectedError = Errors.NONE,
version = version.toShort
)
// Commit offset with unknown group should fail.
commitOffset(
groupId = "unknown",
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
partition = 0,
offset = 100L,
expectedError =
if (isNewGroupCoordinatorEnabled && version >= 9) Errors.GROUP_ID_NOT_FOUND
else Errors.ILLEGAL_GENERATION,
version = version.toShort
)
// Commit offset with empty group id should fail.
commitOffset(
groupId = "",
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
partition = 0,
offset = 100L,
expectedError =
if (isNewGroupCoordinatorEnabled && version >= 9) Errors.GROUP_ID_NOT_FOUND
else Errors.ILLEGAL_GENERATION,
version = version.toShort
)
// Commit offset with unknown member id should fail.
commitOffset(
groupId = "grp",
memberId = "",
memberEpoch = memberEpoch,
topic = "foo",
partition = 0,
offset = 100L,
expectedError = Errors.UNKNOWN_MEMBER_ID,
version = version.toShort
)
// Commit offset with stale member epoch should fail.
commitOffset(
groupId = "grp",
memberId = memberId,
memberEpoch = memberEpoch + 1,
topic = "foo",
partition = 0,
offset = 100L,
expectedError =
if (useNewProtocol && version >= 9) Errors.STALE_MEMBER_EPOCH
else if (useNewProtocol) Errors.UNSUPPORTED_VERSION
else Errors.ILLEGAL_GENERATION,
version = version.toShort
)
// Commit offset to a group without member id/epoch should succeed.
// This simulate a call from the admin client.
commitOffset(
groupId = "other-grp",
memberId = "",
memberEpoch = -1,
topic = "foo",
partition = 0,
offset = 100L,
expectedError = Errors.NONE,
version = version.toShort
)
}
}
}