| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.api |
| |
| import org.apache.kafka.clients.producer.ProducerConfig |
| import org.apache.kafka.common.TopicPartition |
| import org.apache.kafka.common.config.TopicConfig |
| import org.apache.kafka.server.config.ReplicationConfigs |
| import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows} |
| import org.junit.jupiter.params.ParameterizedTest |
| import org.junit.jupiter.params.provider.ValueSource |
| |
| import java.util |
| import java.util.{Collections, Optional, Properties} |
| import scala.annotation.nowarn |
| import scala.jdk.CollectionConverters._ |
| |
| class ConsumerWithLegacyMessageFormatIntegrationTest extends AbstractConsumerTest { |
| |
| override protected def brokerPropertyOverrides(properties: Properties): Unit = { |
| // legacy message formats are only supported with IBP < 3.0 |
| // KRaft mode is not supported for inter.broker.protocol.version = 2.8, The minimum version required is 3.0-IV1" |
| if (!isKRaftTest()) |
| properties.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8") |
| } |
| |
| @nowarn("cat=deprecation") |
| @ParameterizedTest |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testOffsetsForTimes(quorum: String): Unit = { |
| val numParts = 2 |
| val topic1 = "part-test-topic-1" |
| val topic2 = "part-test-topic-2" |
| val topic3 = "part-test-topic-3" |
| val props = new Properties() |
| props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0") |
| createTopic(topic1, numParts) |
| // Topic2 is in old message format. |
| createTopic(topic2, numParts, 1, props) |
| createTopic(topic3, numParts) |
| |
| val consumer = createConsumer() |
| |
| // Test negative target time |
| assertThrows(classOf[IllegalArgumentException], |
| () => consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(topic1, 0), -1))) |
| |
| val producer = createProducer() |
| val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]() |
| var i = 0 |
| for (topic <- List(topic1, topic2, topic3)) { |
| for (part <- 0 until numParts) { |
| val tp = new TopicPartition(topic, part) |
| // In sendRecords(), each message will have key, value and timestamp equal to the sequence number. |
| sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0) |
| timestampsToSearch.put(tp, (i * 20).toLong) |
| i += 1 |
| } |
| } |
| // The timestampToSearch map should contain: |
| // (topic1Partition0 -> 0, |
| // topic1Partition1 -> 20, |
| // topic2Partition0 -> 40, |
| // topic2Partition1 -> 60, |
| // topic3Partition0 -> 80, |
| // topic3Partition1 -> 100) |
| val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch) |
| |
| val timestampTopic1P0 = timestampOffsets.get(new TopicPartition(topic1, 0)) |
| assertEquals(0, timestampTopic1P0.offset) |
| assertEquals(0, timestampTopic1P0.timestamp) |
| assertEquals(Optional.of(0), timestampTopic1P0.leaderEpoch) |
| |
| val timestampTopic1P1 = timestampOffsets.get(new TopicPartition(topic1, 1)) |
| assertEquals(20, timestampTopic1P1.offset) |
| assertEquals(20, timestampTopic1P1.timestamp) |
| assertEquals(Optional.of(0), timestampTopic1P1.leaderEpoch) |
| |
| if (!isKRaftTest()) { |
| assertNull(timestampOffsets.get(new TopicPartition(topic2, 0)), "null should be returned when message format is 0.9.0") |
| assertNull(timestampOffsets.get(new TopicPartition(topic2, 1)), "null should be returned when message format is 0.9.0") |
| } |
| else { |
| // legacy message formats are supported for IBP version < 3.0 and KRaft runs on minimum version 3.0-IV1 |
| val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2, 0)) |
| assertEquals(40, timestampTopic2P0.offset) |
| assertEquals(40, timestampTopic2P0.timestamp) |
| assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch) |
| |
| val timestampTopic2P1 = timestampOffsets.get(new TopicPartition(topic2, 1)) |
| assertEquals(60, timestampTopic2P1.offset) |
| assertEquals(60, timestampTopic2P1.timestamp) |
| assertEquals(Optional.of(0), timestampTopic2P1.leaderEpoch) |
| } |
| |
| val timestampTopic3P0 = timestampOffsets.get(new TopicPartition(topic3, 0)) |
| assertEquals(80, timestampTopic3P0.offset) |
| assertEquals(80, timestampTopic3P0.timestamp) |
| assertEquals(Optional.of(0), timestampTopic3P0.leaderEpoch) |
| |
| assertNull(timestampOffsets.get(new TopicPartition(topic3, 1))) |
| } |
| |
| @nowarn("cat=deprecation") |
| @ParameterizedTest |
| @ValueSource(strings = Array("zk", "kraft")) |
| def testEarliestOrLatestOffsets(quorum: String): Unit = { |
| val topic0 = "topicWithNewMessageFormat" |
| val topic1 = "topicWithOldMessageFormat" |
| val prop = new Properties() |
| // idempotence producer doesn't support old version of messages |
| prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false") |
| val producer = createProducer(configOverrides = prop) |
| createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 2, recordsPerPartition = 100) |
| val props = new Properties() |
| props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0") |
| createTopic(topic1, numPartitions = 1, replicationFactor = 1, props) |
| sendRecords(producer, numRecords = 100, new TopicPartition(topic1, 0)) |
| |
| val t0p0 = new TopicPartition(topic0, 0) |
| val t0p1 = new TopicPartition(topic0, 1) |
| val t1p0 = new TopicPartition(topic1, 0) |
| val partitions = Set(t0p0, t0p1, t1p0).asJava |
| val consumer = createConsumer() |
| |
| val earliests = consumer.beginningOffsets(partitions) |
| assertEquals(0L, earliests.get(t0p0)) |
| assertEquals(0L, earliests.get(t0p1)) |
| assertEquals(0L, earliests.get(t1p0)) |
| |
| val latests = consumer.endOffsets(partitions) |
| assertEquals(100L, latests.get(t0p0)) |
| assertEquals(100L, latests.get(t0p1)) |
| assertEquals(100L, latests.get(t1p0)) |
| } |
| } |