Merge remote-tracking branch 'origin' into testRemoteLogManagerRemoteMetrics
diff --git a/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java
deleted file mode 100644
index 8561fae..0000000
--- a/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.message.FetchResponseData.PartitionData;
-
-import java.util.Optional;
-
-/**
- The replica alter log dirs tier state machine is unsupported but is provided to the ReplicaAlterLogDirsThread.
- */
-public class ReplicaAlterLogDirsTierStateMachine implements TierStateMachine {
-
- public PartitionFetchState start(TopicPartition topicPartition,
- PartitionFetchState currentFetchState,
- PartitionData fetchPartitionData) throws Exception {
- // JBOD is not supported with tiered storage.
- throw new UnsupportedOperationException("Building remote log aux state is not supported in ReplicaAlterLogDirsThread.");
- }
-
- public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
- PartitionFetchState currentFetchState) {
- return Optional.empty();
- }
-}
diff --git a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
deleted file mode 100644
index 0462e12..0000000
--- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import kafka.cluster.Partition;
-import kafka.log.UnifiedLog;
-import kafka.log.remote.RemoteLogManager;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.message.FetchResponseData.PartitionData;
-import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
-import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.common.CheckpointFile;
-import org.apache.kafka.server.common.OffsetAndEpoch;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
-import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
-import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
-import org.apache.kafka.storage.internals.log.EpochEntry;
-import org.apache.kafka.storage.internals.log.LogFileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Option;
-import scala.collection.JavaConverters;
-
-import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented;
-
-/**
- The replica fetcher tier state machine follows a state machine progression.
-
- Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
- There is no need to advance the state.
-
- When started, the tier state machine will fetch the local log start offset of the
- leader and then build the follower's remote log aux state until the leader's
- local log start offset.
- */
-public class ReplicaFetcherTierStateMachine implements TierStateMachine {
- private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
-
- private final LeaderEndPoint leader;
- private final ReplicaManager replicaMgr;
-
- public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
- ReplicaManager replicaMgr) {
- this.leader = leader;
- this.replicaMgr = replicaMgr;
- }
-
-
- /**
- * Start the tier state machine for the provided topic partition. Currently, this start method will build the
- * entire remote aux log state synchronously.
- *
- * @param topicPartition the topic partition
- * @param currentFetchState the current PartitionFetchState which will
- * be used to derive the return value
- * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
- *
- * @return the new PartitionFetchState after the successful start of the
- * tier state machine
- */
- public PartitionFetchState start(TopicPartition topicPartition,
- PartitionFetchState currentFetchState,
- PartitionData fetchPartitionData) throws Exception {
-
- OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
- int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
- long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
-
- long offsetToFetch = 0;
- replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
- replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
-
- try {
- offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset());
- } catch (RemoteStorageException e) {
- replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
- replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
- throw e;
- }
-
- OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
- long leaderEndOffset = fetchLatestOffsetResult.offset();
-
- long initialLag = leaderEndOffset - offsetToFetch;
-
- return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
- Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
- }
-
- /**
- * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560.
- *
- * @param topicPartition the topic partition
- * @param currentFetchState the current PartitionFetchState which will
- * be used to derive the return value
- *
- * @return the original PartitionFetchState
- */
- public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
- PartitionFetchState currentFetchState) {
- // No-op for now
- return Optional.of(currentFetchState);
- }
-
- private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
- TopicPartition partition,
- Integer currentLeaderEpoch) {
- int previousEpoch = epoch - 1;
-
- // Find the end-offset for the epoch earlier to the given epoch from the leader
- Map<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
- partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
- Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
- if (maybeEpochEndOffset.isEmpty()) {
- throw new KafkaException("No response received for partition: " + partition);
- }
-
- EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
- if (epochEndOffset.errorCode() != Errors.NONE.code()) {
- throw Errors.forCode(epochEndOffset.errorCode()).exception();
- }
-
- return epochEndOffset;
- }
-
- private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
- RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
- InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
- try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
- CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
- return readBuffer.read();
- }
- }
-
- private void buildProducerSnapshotFile(File snapshotFile,
- RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- RemoteLogManager rlm) throws IOException, RemoteStorageException {
- File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
- // Copy it to snapshot file in atomic manner.
- Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
- tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
- Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
- }
-
- /**
- * It tries to build the required state for this partition from leader and remote storage so that it can start
- * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the
- * next offset following the end offset of the remote log portion.
- */
- private Long buildRemoteLogAuxState(TopicPartition topicPartition,
- Integer currentLeaderEpoch,
- Long leaderLocalLogStartOffset,
- Integer epochForLeaderLocalLogStartOffset,
- Long leaderLogStartOffset) throws IOException, RemoteStorageException {
-
- UnifiedLog unifiedLog = replicaMgr.localLogOrException(topicPartition);
-
- long nextOffset;
-
- if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) {
- if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
-
- RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
-
- // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
- // until that offset
- long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
- int targetEpoch;
- // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
- // will have the same epoch.
- if (epochForLeaderLocalLogStartOffset == 0) {
- targetEpoch = epochForLeaderLocalLogStartOffset;
- } else {
- // Fetch the earlier epoch/end-offset(exclusive) from the leader.
- EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
- // Check if the target offset lies within the range of earlier epoch. Here, epoch's end-offset is exclusive.
- if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
- // Always use the leader epoch from returned earlierEpochEndOffset.
- // This gives the respective leader epoch, that will handle any gaps in epochs.
- // For ex, leader epoch cache contains:
- // leader-epoch start-offset
- // 0 20
- // 1 85
- // <2> - gap no messages were appended in this leader epoch.
- // 3 90
- // 4 98
- // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
- // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
- // So, for offset 89, we should return leader epoch as 1 like below.
- targetEpoch = earlierEpochEndOffset.leaderEpoch();
- } else {
- targetEpoch = epochForLeaderLocalLogStartOffset;
- }
- }
-
- Optional<RemoteLogSegmentMetadata> maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset);
-
- if (maybeRlsm.isPresent()) {
- RemoteLogSegmentMetadata remoteLogSegmentMetadata = maybeRlsm.get();
- // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
- // segments from (remoteLogSegmentMetadata.endOffset() + 1)
- // Assign nextOffset with the offset from which next fetch should happen.
- nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
-
- // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
- Partition partition = replicaMgr.getPartitionOrException(topicPartition);
- partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset));
-
- // Increment start offsets
- unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
- unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented);
-
- // Build leader epoch cache.
- List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata);
- if (unifiedLog.leaderEpochCache().isDefined()) {
- unifiedLog.leaderEpochCache().get().assign(epochs);
- }
-
- log.debug("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), partition);
-
- // Restore producer snapshot
- File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
- buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata, rlm);
-
- // Reload producer snapshots.
- unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
- unifiedLog.loadProducerState(nextOffset);
- log.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, " +
- "with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}",
- partition, unifiedLog.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset);
- } else {
- throw new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition +
- ", currentLeaderEpoch: " + currentLeaderEpoch +
- ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset +
- ", leaderLogStartOffset: " + leaderLogStartOffset +
- ", epoch: " + targetEpoch +
- "as the previous remote log segment metadata was not found");
- }
- } else {
- // If the tiered storage is not enabled throw an exception back so that it will retry until the tiered storage
- // is set as expected.
- throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled");
- }
-
- return nextOffset;
- }
-}
diff --git a/core/src/main/java/kafka/server/TierStateMachine.java b/core/src/main/java/kafka/server/TierStateMachine.java
index 58a44cc..085e6c0 100644
--- a/core/src/main/java/kafka/server/TierStateMachine.java
+++ b/core/src/main/java/kafka/server/TierStateMachine.java
@@ -17,15 +17,65 @@
package kafka.server;
-import java.util.Optional;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import kafka.cluster.Partition;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData.PartitionData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.common.OffsetAndEpoch;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.apache.kafka.storage.internals.log.LogFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented;
/**
- * This interface defines the APIs needed to handle any state transitions related to tiering
+ * This class defines the APIs and implementation needed to handle any state transitions related to tiering
+ *
+ * When started, the tier state machine will fetch the local log start offset of the
+ * leader and then build the follower's remote log aux state until the leader's
+ * local log start offset.
*/
-public interface TierStateMachine {
+public class TierStateMachine {
+ private static final Logger log = LoggerFactory.getLogger(TierStateMachine.class);
+
+ private final LeaderEndPoint leader;
+ private final ReplicaManager replicaMgr;
+ private final boolean useFutureLog;
+ public TierStateMachine(LeaderEndPoint leader,
+ ReplicaManager replicaMgr,
+ boolean useFutureLog) {
+ this.leader = leader;
+ this.replicaMgr = replicaMgr;
+ this.useFutureLog = useFutureLog;
+ }
/**
* Start the tier state machine for the provided topic partition.
@@ -40,19 +90,176 @@
*/
PartitionFetchState start(TopicPartition topicPartition,
PartitionFetchState currentFetchState,
- PartitionData fetchPartitionData) throws Exception;
+ PartitionData fetchPartitionData) throws Exception {
+ OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+ int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+ long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+ long offsetToFetch;
+ replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+ replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+ UnifiedLog unifiedLog;
+ if (useFutureLog) {
+ unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+ } else {
+ unifiedLog = replicaMgr.localLogOrException(topicPartition);
+ }
+
+ try {
+ offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog);
+ } catch (RemoteStorageException e) {
+ replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+ replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+ throw e;
+ }
+
+ OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+ long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+ long initialLag = leaderEndOffset - offsetToFetch;
+
+ return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+ Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+ }
+
+ private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+ TopicPartition partition,
+ Integer currentLeaderEpoch) {
+ int previousEpoch = epoch - 1;
+
+ // Find the end-offset for the epoch earlier to the given epoch from the leader
+ Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+ partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+ Option<OffsetForLeaderEpochResponseData.EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+ if (maybeEpochEndOffset.isEmpty()) {
+ throw new KafkaException("No response received for partition: " + partition);
+ }
+
+ OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+ if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+ throw Errors.forCode(epochEndOffset.errorCode()).exception();
+ }
+
+ return epochEndOffset;
+ }
+
+ private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+ InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+ try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+ CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
+ return readBuffer.read();
+ }
+ }
+
+ private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+ long nextOffset,
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ RemoteLogManager rlm) throws IOException, RemoteStorageException {
+ // Restore producer snapshot
+ File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+ Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp");
+ // Copy it to snapshot file in atomic manner.
+ Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+ tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING);
+ Utils.atomicMoveWithFallback(tmpSnapshotFile, snapshotFile.toPath(), false);
+
+ // Reload producer snapshots.
+ unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+ unifiedLog.loadProducerState(nextOffset);
+ }
/**
- * Optionally advance the state of the tier state machine, based on the
- * current PartitionFetchState. The decision to advance the tier
- * state machine is implementation specific.
- *
- * @param topicPartition the topic partition
- * @param currentFetchState the current PartitionFetchState which will
- * be used to derive the return value
- *
- * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState
+ * It tries to build the required state for this partition from leader and remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
*/
- Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
- PartitionFetchState currentFetchState);
+ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ Integer currentLeaderEpoch,
+ Long leaderLocalLogStartOffset,
+ Integer epochForLeaderLocalLogStartOffset,
+ Long leaderLogStartOffset,
+ UnifiedLog unifiedLog) throws IOException, RemoteStorageException {
+
+ if (!unifiedLog.remoteStorageSystemEnable() || !unifiedLog.config().remoteStorageEnable()) {
+ // If the tiered storage is not enabled throw an exception back so that it will retry until the tiered storage
+ // is set as expected.
+ throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled");
+ }
+
+ if (replicaMgr.remoteLogManager().isEmpty())
+ throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+ RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+ // until that offset
+ long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+ int targetEpoch;
+ // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+ OffsetForLeaderEpochResponseData.EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+ // Check if the target offset lies within the range of earlier epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will handle any gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1 like below.
+ targetEpoch = earlierEpochEndOffset.leaderEpoch();
+ } else {
+ targetEpoch = epochForLeaderLocalLogStartOffset;
+ }
+ }
+
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
+ .orElseThrow(() -> new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition +
+ ", currentLeaderEpoch: " + currentLeaderEpoch +
+ ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset +
+ ", leaderLogStartOffset: " + leaderLogStartOffset +
+ ", epoch: " + targetEpoch +
+ "as the previous remote log segment metadata was not found"));
+
+
+ // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
+ // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+ // Assign nextOffset with the offset from which next fetch should happen.
+ long nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
+
+ // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+ Partition partition = replicaMgr.getPartitionOrException(topicPartition);
+ partition.truncateFullyAndStartAt(nextOffset, useFutureLog, Option.apply(leaderLogStartOffset));
+ // Increment start offsets
+ unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
+ unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented);
+
+ // Build leader epoch cache.
+ List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata);
+ if (unifiedLog.leaderEpochCache().isDefined()) {
+ unifiedLog.leaderEpochCache().get().assign(epochs);
+ }
+
+ log.info("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), partition);
+
+ buildProducerSnapshotFile(unifiedLog, nextOffset, remoteLogSegmentMetadata, rlm);
+
+ log.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, " +
+ "with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}",
+ partition, unifiedLog.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset);
+
+ return nextOffset;
+ }
}
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index b2121f5..34ab1b0 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -907,9 +907,16 @@
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
logPrefix: String): Unit = {
+ System.err.print(s"del")
+ System.err.flush()
segmentsToDelete.foreach { segment =>
+ System.err.print(s"ren:${segment.baseOffset()} $dir")
+ System.err.flush()
if (!segment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
segment.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX)
+
+ System.err.print(s"ren done")
+ System.err.flush()
}
def deleteSegments(): Unit = {
@@ -926,6 +933,9 @@
scheduler.scheduleOnce("delete-file", () => deleteSegments(), config.fileDeleteDelayMs)
else
deleteSegments()
+
+ System.err.print(s"del end")
+ System.err.flush()
}
private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f69ec42..fa6f275 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -308,7 +308,11 @@
* Each call of this function will undo one pause.
*/
def resumeCleaning(topicPartitions: Iterable[TopicPartition]): Unit = {
+ System.err.print("lock")
+ System.err.flush()
inLock(lock) {
+ System.err.print("lock get")
+ System.err.flush()
topicPartitions.foreach {
topicPartition =>
inProgress.get(topicPartition) match {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 3bc6533..4011f6c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1247,6 +1247,7 @@
// Update the cached map and log cleaner as appropriate.
futureLogs.remove(topicPartition)
currentLogs.put(topicPartition, destLog)
+ System.err.print(s"c:${currentLogs.get(topicPartition)}")
if (cleaner != null) {
sourceLog.foreach { srcLog =>
cleaner.alterCheckpointDir(topicPartition, srcLog.parentDirFile, destLog.parentDirFile)
@@ -1394,6 +1395,8 @@
*/
private def cleanupLogs(): Unit = {
debug("Beginning log cleanup...")
+ System.err.print(s"start clean ")
+ System.err.flush()
var total = 0
val startMs = time.milliseconds
@@ -1409,27 +1412,42 @@
}
}
+ System.err.print(s"c1:${currentLogs.get(new TopicPartition("topicB", 0))}")
+ System.err.flush()
+
try {
deletableLogs.foreach {
case (topicPartition, log) =>
+ if (topicPartition.topic().contains("topicB")) {
+ System.err.print(s"d:${log.dir}")
+ System.err.flush()
+ }
debug(s"Garbage collecting '${log.name}'")
total += log.deleteOldSegments()
val futureLog = futureLogs.get(topicPartition)
if (futureLog != null) {
// clean future logs
+ System.err.print(s"fut:${futureLog.name} ")
+ System.err.flush()
debug(s"Garbage collecting future log '${futureLog.name}'")
total += futureLog.deleteOldSegments()
}
}
} finally {
if (cleaner != null) {
+ System.err.print(s"cleaner ")
+ System.err.flush()
cleaner.resumeCleaning(deletableLogs.map(_._1))
+ System.err.print(s"cleaner end")
+ System.err.flush()
}
}
debug(s"Log cleanup completed. $total files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
+ System.err.print(s"end clean ")
+ System.err.flush()
}
/**
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index eb6fc51..bff2801 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1447,6 +1447,8 @@
reason: SegmentDeletionReason): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
+ System.err.print(s"toD:${deletable.size}")
+ System.err.flush()
if (deletable.nonEmpty)
deleteSegments(deletable, reason)
else
@@ -1574,6 +1576,10 @@
private def deleteRetentionSizeBreachedSegments(): Int = {
val retentionSize: Long = localRetentionSize(config, remoteLogEnabled())
+ if (name.contains("topicB")) {
+ System.err.print(s"$name $parentDir $size ")
+ System.err.flush()
+ }
if (retentionSize < 0 || size < retentionSize) return 0
var diff = size - retentionSize
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
@@ -2283,6 +2289,8 @@
logDirFailureChannel: LogDirFailureChannel,
parentDir: String,
topicPartition: TopicPartition): Unit = {
+ System.err.print(s"dels:${segments.size}")
+ System.err.flush()
val snapshotsToDelete = segments.flatMap { segment =>
producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset).asScala
}
@@ -2301,6 +2309,9 @@
scheduler.scheduleOnce("delete-producer-snapshot", () => deleteProducerSnapshots(), config.fileDeleteDelayMs)
else
deleteProducerSnapshots()
+
+ System.err.print(s"dels done")
+ System.err.flush()
}
private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
@@ -2353,17 +2364,21 @@
val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabled)
toDelete.foreach { segment =>
if (segment.largestRecordTimestamp.isPresent)
- if (remoteLogEnabled)
+ if (remoteLogEnabled) {
log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " +
s"record timestamp in the segment")
- else
+ System.err.println(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " +
+ s"record timestamp in the segment")
+ } else
log.info(s"Deleting segment $segment due to log retention time ${retentionMs}ms breach based on the largest " +
s"record timestamp in the segment")
else {
- if (remoteLogEnabled)
+ if (remoteLogEnabled) {
log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the " +
s"last modified time of the segment")
- else
+ System.err.println(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the " +
+ s"last modified time of the segment")
+ } else
log.info(s"Deleting segment $segment due to log retention time ${retentionMs}ms breach based on the " +
s"last modified time of the segment")
}
@@ -2376,8 +2391,13 @@
var size = log.size
toDelete.foreach { segment =>
size -= segment.size
- if (remoteLogEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabled)} breach. " +
- s"Local log size after deletion will be $size.")
+ if (remoteLogEnabled) {
+ log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabled)} breach. " +
+ s"Local log size after deletion will be $size.")
+ System.err.println(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabled)} breach. " +
+ s"Local log size after deletion will be $size.")
+ System.err.flush()
+ }
else log.info(s"Deleting segment $segment due to log retention size ${log.config.retentionSize} breach. Log size " +
s"after deletion will be $size.")
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index b2650cc..150ab84 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -34,7 +34,7 @@
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
@@ -610,10 +610,6 @@
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
- if (config.logDirs.size > 1) {
- throw new KafkaException("Tiered storage is not supported with multiple log dirs.")
- }
-
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e433741..5cb6535 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1333,9 +1333,6 @@
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.")
- if (isRemoteLogStorageSystemEnabled && logDirs.size > 1) {
- throw new ConfigException(s"Multiple log directories `${logDirs.mkString(",")}` are not supported when remote log storage is enabled")
- }
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4a490d7..72457d5 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -44,7 +44,7 @@
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
-import org.apache.kafka.common.{Endpoint, KafkaException, Node, TopicPartition}
+import org.apache.kafka.common.{Endpoint, Node, TopicPartition}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
@@ -683,10 +683,6 @@
protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
- if (config.logDirs.size > 1) {
- throw new KafkaException("Tiered storage is not supported with multiple log dirs.")
- }
-
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 2ea0939..95c7a5a 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -40,7 +40,7 @@
clientId = name,
leader = leader,
failedPartitions,
- fetchTierStateMachine = new ReplicaAlterLogDirsTierStateMachine(),
+ fetchTierStateMachine = new TierStateMachine(leader, replicaMgr, true),
fetchBackOffMs = fetchBackOffMs,
isInterruptible = false,
brokerTopicStats) {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c45a5d6..bb07368 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -37,7 +37,7 @@
clientId = name,
leader = leader,
failedPartitions,
- fetchTierStateMachine = new ReplicaFetcherTierStateMachine(leader, replicaMgr),
+ fetchTierStateMachine = new TierStateMachine(leader, replicaMgr, false),
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
isInterruptible = false,
replicaMgr.brokerTopicStats) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3549943..37f2df4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -573,6 +573,7 @@
* If no errors occurred, the map will be empty.
*/
private def stopPartitions(partitionsToStop: Set[StopPartition]): Map[TopicPartition, Throwable] = {
+ System.err.print(s"stop:$partitionsToStop")
// First stop fetchers for all partitions.
val partitions = partitionsToStop.map(_.topicPartition)
replicaFetcherManager.removeFetcherForPartitions(partitions)
@@ -703,6 +704,10 @@
getPartitionOrException(topicPartition).futureLog.isDefined
}
+ def futureLogOrException(topicPartition: TopicPartition): UnifiedLog = {
+ getPartitionOrException(topicPartition).futureLocalLogOrException
+ }
+
def localLog(topicPartition: TopicPartition): Option[UnifiedLog] = {
onlinePartition(topicPartition).flatMap(_.log)
}
@@ -1744,8 +1749,8 @@
val leaderLogStartOffset = log.logStartOffset
val leaderLogEndOffset = log.logEndOffset
- if (params.isFromFollower) {
- // If it is from a follower then send the offset metadata only as the data is already available in remote
+ if (params.isFromFollower || params.isFromFuture) {
+ // If it is from a follower or from a future replica, then send the offset metadata only as the data is already available in remote
// storage and throw an error saying that this offset is moved to tiered storage.
createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 212e60d..d78be03 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -36,6 +36,7 @@
import org.apache.kafka.security.PasswordEncoderConfigs;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.collection.JavaConverters;
@@ -63,6 +64,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Disabled
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 9d03206..b74a5f5 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -33,6 +33,7 @@
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Collections;
@@ -47,6 +48,8 @@
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+
+@Disabled
@ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index dc3ff0f..907a3ec 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -28,7 +28,7 @@
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, Disabled, Tag, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -39,6 +39,7 @@
import scala.concurrent.ExecutionException
import scala.util.Random
+@Disabled
@Tag("integration")
class RemoteTopicCrudTest extends IntegrationTestHarness {
diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index bcfa7f2..e534191 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -33,7 +33,7 @@
import org.apache.kafka.server.config.KafkaSecurityConfigs
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo, Timeout}
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -46,6 +46,7 @@
* time to the build. However, if an admin API involves differing interactions with
* authentication/authorization layers, we may add the test case here.
*/
+@Disabled
@Timeout(120)
abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logging {
def brokerCount = 3
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 9818b6d..06e53ef 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -34,11 +34,12 @@
import org.apache.kafka.server.config.{KafkaSecurityConfigs, QuotaConfigs}
import org.apache.kafka.server.quota._
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
+@Disabled
class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_SSL
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 0be5e30..ee57746 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -14,7 +14,6 @@
import java.util
import java.util.Properties
-
import kafka.security.authorizer.AclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
@@ -29,7 +28,7 @@
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import scala.jdk.CollectionConverters._
@@ -73,6 +72,7 @@
}
}
+@Disabled
class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
import DescribeAuthorizedOperationsTest._
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 60dee25..1d9e13c 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -43,7 +43,7 @@
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
@@ -68,6 +68,7 @@
* would end up with QuorumTestHarness twice.
*/
@Timeout(60)
+@Disabled
abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
override val brokerCount = 3
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index 17f12ab..329279e 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -23,7 +23,7 @@
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ServerLogConfigs
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -32,6 +32,7 @@
* Tests where the broker is configured to use LogAppendTime. For tests where LogAppendTime is configured via topic
* level configs, see the *ProducerSendTest classes.
*/
+@Disabled
class LogAppendTimeTest extends IntegrationTestHarness {
val producerCount: Int = 1
val consumerCount: Int = 1
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 149d3e9..67a2320 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -28,7 +28,7 @@
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -36,6 +36,7 @@
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
+@Disabled
class MetricsTest extends IntegrationTestHarness with SaslSetup {
override val brokerCount = 1
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 2133019..6fc9e1c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -69,6 +69,7 @@
*
* Also see [[org.apache.kafka.clients.admin.KafkaAdminClientTest]] for unit tests of the admin client.
*/
+@Disabled
class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
import PlaintextAdminIntegrationTest._
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
index 365a872..865d2b5 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -22,6 +22,7 @@
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -30,7 +31,7 @@
import java.util.Optional
import scala.jdk.CollectionConverters._
-
+@Disabled
class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
val producerCount: Int = 1
val brokerCount: Int = 2
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 47e8de0..7fe5dcf 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -33,7 +33,7 @@
import org.apache.kafka.server.config.ZkConfigs
import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import java.util.Collections
import scala.jdk.CollectionConverters._
@@ -42,6 +42,7 @@
import scala.concurrent.ExecutionException
import scala.util.{Failure, Success, Try}
+@Disabled
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index fa13715..406dcfc 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -29,6 +29,7 @@
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -36,6 +37,7 @@
import scala.jdk.CollectionConverters._
import scala.collection.mutable
+@Disabled
class TransactionsBounceTest extends IntegrationTestHarness {
private val consumeRecordTimeout = 30000
private val producerBufferSize = 65536
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 993e31d..4c4b58b 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -28,7 +28,7 @@
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -44,6 +44,7 @@
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
+@Disabled
class TransactionsTest extends IntegrationTestHarness {
override def brokerCount = 3
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index b66d536..0ed7002 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -54,6 +54,7 @@
}
}
+@Disabled
@ClusterTestDefaults(serverProperties = Array(
new ClusterConfigProperty(key = "transaction.state.log.num.partitions", value = "1")
))
diff --git a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index c9f3c46..8013dbc 100644
--- a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -36,6 +36,7 @@
import java.util.concurrent.{Executors, TimeUnit}
import scala.jdk.CollectionConverters._
+@Disabled
class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
val numNodes = 2
val numParts = 1
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index dde38e6..284d57a 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -38,10 +38,11 @@
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.KafkaSecurityConfigs
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import scala.jdk.CollectionConverters._
+@Disabled
class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
override val brokerCount = 1
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index 8ed6064..31049c2 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -29,7 +29,7 @@
import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertThrows, fail}
import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import java.util.Optional
import java.util.concurrent.{TimeUnit, TimeoutException}
@@ -42,6 +42,7 @@
* failure paths is to use timeouts. See {@link unit.kafka.server.BrokerRegistrationRequestTest} for integration test
* of just the broker registration path.
*/
+@Disabled
@Timeout(120)
@Tag("integration")
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 5640f43..5b4156f 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -25,10 +25,12 @@
import org.apache.kafka.clients.admin.{FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
+@Disabled
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class MetadataVersionIntegrationTest {
@ClusterTests(value = Array(
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 6e4cd7d..f803669 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -51,7 +51,7 @@
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ServerLogConfigs, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
-import org.junit.jupiter.api.{Assumptions, Timeout}
+import org.junit.jupiter.api.{Assumptions, Disabled, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.function.Executable
import org.slf4j.{Logger, LoggerFactory}
@@ -59,7 +59,6 @@
import java.util
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
import java.util.{Collections, Optional, UUID}
-import scala.collection.Seq
import scala.jdk.CollectionConverters._
object ZkMigrationIntegrationTest {
@@ -90,6 +89,7 @@
}
}
+@Disabled
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Timeout(300)
class ZkMigrationIntegrationTest {
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 11b7ceb..f975061 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -33,7 +33,6 @@
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.Mockito.{mock, verify, when}
-import java.util.Optional
import scala.collection.{Map, Set, mutable}
import scala.jdk.CollectionConverters._
@@ -313,12 +312,10 @@
override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
}
- private class MockResizeFetcherTierStateMachine extends TierStateMachine {
+ private class MockResizeFetcherTierStateMachine extends TierStateMachine(null, null, false) {
override def start(topicPartition: TopicPartition, currentFetchState: PartitionFetchState, fetchPartitionData: PartitionData): PartitionFetchState = {
throw new UnsupportedOperationException("Materializing tier state is not supported in this test.")
}
-
- override def maybeAdvanceState(tp: TopicPartition, currentFetchState: PartitionFetchState): Optional[PartitionFetchState] = Optional.empty[PartitionFetchState]
}
private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions, fetchTierStateMachine: TierStateMachine)
diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
index 5cb5957..2c7a17a 100644
--- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -28,8 +28,9 @@
import org.apache.kafka.server.common.ProducerIdsBlock
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT)
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index de5720f..b22afe4 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -25,10 +25,12 @@
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.extension.ExtendWith
// TODO: Introduce template in ClusterTests https://issues.apache.org/jira/browse/KAFKA-16595
// currently we can't apply template in ClusterTests hence we see bunch of duplicate settings in ClusterTests
+@Disabled
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(brokers = 1)
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
diff --git a/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
index f17b27e..5f5b963 100644
--- a/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
@@ -31,6 +31,7 @@
import scala.collection.Seq
import scala.jdk.CollectionConverters._
+
class BaseFetchRequestTest extends BaseRequestTest {
protected var producer: KafkaProducer[String, String] = _
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index 1e3adc3..ce1f239 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -22,12 +22,13 @@
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.{AfterEach, Disabled}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
+@Disabled
@ClusterTestDefaults(clusterType = Type.ALL)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class BrokerMetricNamesTest(cluster: ClusterInstance) {
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 6c6199f..33318e0 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -35,13 +35,14 @@
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
/**
* This test simulates a broker registering with the KRaft quorum under different configurations.
*/
+@Disabled
@Timeout(120)
@Tag("integration")
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index ed80f80..b5a8e6a 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -31,11 +31,12 @@
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Disabled, Tag}
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
+@Disabled
@ClusterTestDefaults(clusterType = Type.ALL)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Tag("integration")
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 250cc83..cb87b0a 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -25,13 +25,13 @@
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull}
-import org.junit.jupiter.api.Tag
-import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import java.util.stream.Collectors
import scala.jdk.CollectionConverters._
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 03eddbe..aca930e 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -25,10 +25,10 @@
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Tag
-import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index df8864b..a0531fa 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -27,12 +27,13 @@
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import java.util
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
+@Disabled
class DelegationTokenRequestsTest extends IntegrationTestHarness with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
private val kafkaClientSaslMechanism = "PLAIN"
diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 92cea4c..230d8e8 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -23,9 +23,10 @@
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 45f967f..e4d64ec 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -23,11 +23,12 @@
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index 2a94655..6f9bec9 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -25,12 +25,13 @@
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 2411e61..63db3ee 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -26,6 +26,7 @@
import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.record.BrokerCompressionType
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -40,6 +41,7 @@
* Subclasses of `BaseConsumerTest` exercise the consumer and fetch request/response. This class
* complements those classes with tests that require lower-level access to the protocol.
*/
+@Disabled
class FetchRequestTest extends BaseFetchRequestTest {
@ParameterizedTest
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
index b907384..73b571c 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
@@ -22,7 +22,7 @@
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{Disabled, Test}
import java.util.Properties
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_2_IV0
@@ -31,6 +31,7 @@
import scala.collection.Seq
import scala.jdk.CollectionConverters._
+@Disabled
class FetchRequestWithLegacyMessageFormatTest extends BaseFetchRequestTest {
override def brokerPropertyOverrides(properties: Properties): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 80a308e..b6794c6 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -25,13 +25,14 @@
import org.apache.kafka.common.message.SyncGroupRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Collections
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
index 21c0de1..7ffad0d 100644
--- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
@@ -27,7 +27,7 @@
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Collections
@@ -36,6 +36,7 @@
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e67bcbb..a59d07b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1859,20 +1859,13 @@
}
@Test
- def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, String.valueOf(true))
- props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a,/tmp/b")
-
- val caught = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
- assertTrue(caught.getMessage.contains("Multiple log directories `/tmp/a,/tmp/b` are not supported when remote log storage is enabled"))
- }
-
- @Test
def testSingleLogDirectoryWithRemoteLogStorage(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, String.valueOf(true))
props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a")
assertDoesNotThrow(() => KafkaConfig.fromProps(props))
+
+ props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a,/tmp/b")
+ assertDoesNotThrow(() => KafkaConfig.fromProps(props))
}
}
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 3eaff93..d9144ec 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -23,9 +23,10 @@
import org.apache.kafka.common.requests.JoinGroupRequest
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index c03dffc..c80de1b 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -25,9 +25,10 @@
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import org.apache.kafka.coordinator.group.Group
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
index 9e22560..86df92d 100644
--- a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
@@ -20,9 +20,7 @@
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.FetchResponseData
-import java.util.Optional
-
-class MockTierStateMachine(leader: LeaderEndPoint) extends ReplicaFetcherTierStateMachine(leader, null) {
+class MockTierStateMachine(leader: LeaderEndPoint) extends TierStateMachine(leader, null, false) {
var fetcher: MockFetcherThread = _
@@ -37,11 +35,6 @@
Fetching, Some(currentFetchState.currentLeaderEpoch))
}
- override def maybeAdvanceState(topicPartition: TopicPartition,
- currentFetchState: PartitionFetchState): Optional[PartitionFetchState] = {
- Optional.of(currentFetchState)
- }
-
def setFetcher(mockFetcherThread: MockFetcherThread): Unit = {
fetcher = mockFetcherThread
}
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 346b170..8f78989 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -21,9 +21,10 @@
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.junit.jupiter.api.Assertions.fail
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index 8b4561d..ecfcd8e 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -21,9 +21,10 @@
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.junit.jupiter.api.Assertions.fail
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 79ec402..80a5acb 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -20,18 +20,16 @@
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
-
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetFetchResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
-
import scala.jdk.CollectionConverters._
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 34a165c..809e00a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -17,7 +17,7 @@
package kafka.server
-import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.{AfterEach, Disabled}
import kafka.utils.TestUtils
import TestUtils._
import kafka.api.IntegrationTestHarness
@@ -27,6 +27,7 @@
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+@Disabled
class ReplicaFetchTest extends IntegrationTestHarness {
val topic1 = "foo"
val topic2 = "bar"
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ab25864..e443999 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3285,12 +3285,8 @@
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
- if (enableRemoteStorage) {
- props.put("log.dirs", path1)
- props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, enableRemoteStorage.toString)
- } else {
- props.put("log.dirs", path1 + "," + path2)
- }
+ props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, enableRemoteStorage.toString)
+ props.put("log.dirs", path1 + "," + path2)
propsModifier.apply(props)
val config = KafkaConfig.fromProps(props)
val logProps = new Properties()
@@ -4981,9 +4977,8 @@
assertEquals(followerPartitions, actualFollowerPartitions)
}
- // KAFKA-16031: Enabling remote storage after JBOD is supported in tiered storage
@ParameterizedTest
- @ValueSource(booleans = Array(false))
+ @ValueSource(booleans = Array(true, false))
def testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory(enableRemoteStorage: Boolean): Unit = {
val localId = 1
val topicPartition0 = new TopicPartition("foo", 0)
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 4b7e129..85fa37c 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -34,7 +34,7 @@
import org.apache.kafka.server.config.KafkaSecurityConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}
import scala.jdk.CollectionConverters._
@@ -69,6 +69,7 @@
}
}
+@Disabled
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
private var sasl: SaslSetup = _
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index ac0d068..be7a4a9 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -25,7 +25,7 @@
import org.apache.kafka.common.message.SyncGroupRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import java.util.Collections
@@ -33,6 +33,7 @@
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
+@Disabled
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
similarity index 93%
rename from core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala
rename to core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
index af7d1ce..139aeb0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
@@ -23,20 +23,21 @@
import org.apache.kafka.common.record._
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.collection.Map
-class ReplicaFetcherTierStateMachineTest {
+class TierStateMachineTest {
- val truncateOnFetch = true
val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> Uuid.randomUuid())
val version = ApiKeys.FETCH.latestVersion()
private val failedPartitions = new FailedPartitions
- @Test
- def testFollowerFetchMovedToTieredStore(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testFollowerFetchMovedToTieredStore(truncateOnFetch: Boolean): Unit = {
val partition = new TopicPartition("topic", 0)
val replicaLog = Seq(
@@ -94,8 +95,9 @@
* tiered storage as well. Hence, `X < globalLogStartOffset`.
* 4. Follower comes online and tries to fetch X from leader.
*/
- @Test
- def testFollowerFetchOffsetOutOfRangeWithTieredStore(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testFollowerFetchOffsetOutOfRangeWithTieredStore(truncateOnFetch: Boolean): Unit = {
val partition = new TopicPartition("topic", 0)
val replicaLog = Seq(
@@ -105,7 +107,7 @@
val replicaState = PartitionState(replicaLog, leaderEpoch = 7, highWatermark = 0L, rlmEnabled = true)
- val mockLeaderEndpoint = new MockLeaderEndPoint
+ val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
@@ -153,8 +155,9 @@
assertEquals(11L, replicaState.logEndOffset)
}
- @Test
- def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testFencedOffsetResetAfterMovedToRemoteTier(truncateOnFetch: Boolean): Unit = {
val partition = new TopicPartition("topic", 0)
var isErrorHandled = false
val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
@@ -189,4 +192,5 @@
assertTrue(fetcher.fetchState(partition).isEmpty)
assertTrue(failedPartitions.contains(partition))
}
-}
+
+}
\ No newline at end of file
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index ed310f4..2b41c03 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -107,7 +107,7 @@
long offset = recordMetadata.offset();
long startTimeMs = time.milliseconds();
long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
- log.info("Wait until the consumer is caught up with the target partition {} up-to offset {}", partition, offset);
+// log.info("Wait until the consumer is caught up with the target partition {} up-to offset {}", partition, offset);
while (true) {
long readOffset = consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
if (readOffset >= offset) {
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
index 700a8e4..4af8cd1 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
@@ -22,6 +22,8 @@
import java.util.Objects;
import java.util.Optional;
+import static org.apache.kafka.common.requests.FetchRequest.FUTURE_LOCAL_REPLICA_ID;
+
public class FetchParams {
public final short requestVersion;
public final int replicaId;
@@ -56,6 +58,10 @@
return FetchRequest.isValidBrokerId(replicaId);
}
+ public boolean isFromFuture() {
+ return replicaId == FUTURE_LOCAL_REPLICA_ID;
+ }
+
public boolean isFromConsumer() {
return FetchRequest.isConsumer(replicaId);
}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 104eb9a..fb9c969 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -804,6 +804,13 @@
// Helper method for `deleteIfExists()`
private Void deleteTypeIfExists(StorageAction<Boolean, IOException> delete, String fileType, File file, boolean logIfMissing) throws IOException {
try {
+// try {
+// Random r = new Random();
+// int s = r.nextInt( 2000);
+// Thread.sleep(s);
+// } catch (InterruptedException e) {
+// throw new RuntimeException(e);
+// }
if (delete.execute())
LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath());
else {
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index abad6ea..438bb98 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -23,6 +23,7 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Disabled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@
/**
* A test harness class that brings up 3 brokers and registers {@link TopicBasedRemoteLogMetadataManager} on broker with id as 0.
*/
+@Disabled
public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHarness {
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerHarness.class);
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
index 6272d0e..7f27598 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.tiered.storage;
+import org.apache.kafka.tiered.storage.actions.AlterLogDirAction;
import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
import org.apache.kafka.tiered.storage.actions.ConsumeAction;
import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
@@ -313,6 +314,14 @@
return this;
}
+ public TieredStorageTestBuilder alterLogDir(String topic,
+ Integer partition,
+ int replicaIds) {
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ actions.add(new AlterLogDirAction(topicPartition, replicaIds));
+ return this;
+ }
+
public TieredStorageTestBuilder expectUserTopicMappedToMetadataPartitions(String topic,
List<Integer> metadataPartitions) {
actions.add(new ExpectUserTopicMappedToMetadataPartitionsAction(topic, metadataPartitions));
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index 274bbf8..5d0172c 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -102,7 +102,7 @@
// NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name.
@ParameterizedTest(name = "{displayName}.quorum={0}")
- @ValueSource(strings = {"zk", "kraft"})
+ @ValueSource(strings = {"kraft"})
public void executeTieredStorageTest(String quorum) {
TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
writeTestSpecifications(builder);
@@ -154,7 +154,7 @@
@SuppressWarnings("deprecation")
public static List<BrokerLocalStorage> localStorages(Seq<KafkaBroker> brokers) {
return JavaConverters.seqAsJavaList(brokers).stream()
- .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(),
+ .map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.setAsJavaSet(b.config().logDirs().toSet()),
STORAGE_WAIT_TIMEOUT_SEC))
.collect(Collectors.toList());
}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java
new file mode 100644
index 0000000..34f4c7d
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public final class AlterLogDirAction implements TieredStorageTestAction {
+
+ private final TopicPartition topicPartition;
+ private final int brokerId;
+
+ public AlterLogDirAction(TopicPartition topicPartition,
+ int brokerId) {
+ this.topicPartition = topicPartition;
+ this.brokerId = brokerId;
+ }
+
+ @Override
+ public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
+ Optional<BrokerLocalStorage> localStorage = context.localStorages().stream().filter(storage -> storage.getBrokerId() == brokerId).findFirst();
+ if (!localStorage.isPresent()) {
+ throw new IllegalArgumentException("cannot find local storage for this topic partition:" + topicPartition + " in this broker id:" + brokerId);
+ }
+
+ Optional<File> sourceDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst();
+ if (!sourceDir.isPresent()) {
+ throw new IllegalArgumentException("No log dir with topic partition:" + topicPartition + " in this broker id:" + brokerId);
+ }
+ Optional<File> targetDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> !localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst();
+ if (!targetDir.isPresent()) {
+ throw new IllegalArgumentException("No log dir without topic partition:" + topicPartition + " in this broker id:" + brokerId);
+ }
+
+ // build alterReplicaLogDirs request content to move from sourceDir to targetDir
+ TopicPartitionReplica topicPartitionReplica = new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), brokerId);
+ Map<TopicPartitionReplica, String> logDirs = Collections.singletonMap(topicPartitionReplica, targetDir.get().getAbsolutePath());
+
+ AlterReplicaLogDirsResult results = context.admin().alterReplicaLogDirs(logDirs);
+ results.values().get(topicPartitionReplica).get(30, TimeUnit.SECONDS);
+
+ // wait until the topic partition folder disappears from source dir and appears in the target dir
+ TestUtils.waitForCondition(() -> localStorage.get().dirContainsTopicPartition(topicPartition, targetDir.get()) &&
+ !localStorage.get().dirContainsTopicPartition(topicPartition, sourceDir.get()),
+ "Failed to alter dir:" + logDirs);
+ }
+
+ @Override
+ public void describe(PrintStream output) {
+ output.print("alter dir for topic partition:" + topicPartition + " in this broker id:" + brokerId);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
new file mode 100644
index 0000000..5a7e516
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest10.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest10.java
new file mode 100644
index 0000000..f27b3a7
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest10.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest10 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest2.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest2.java
new file mode 100644
index 0000000..f563d8d
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest2.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest2 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest3.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest3.java
new file mode 100644
index 0000000..b431c4c
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest3.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest3 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest4.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest4.java
new file mode 100644
index 0000000..a569c09
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest4.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest4 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest5.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest5.java
new file mode 100644
index 0000000..10ef503
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest5.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest5 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest6.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest6.java
new file mode 100644
index 0000000..f46ed82
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest6.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest6 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest7.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest7.java
new file mode 100644
index 0000000..74a2ab2
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest7.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest7 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest8.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest8.java
new file mode 100644
index 0000000..b693d08
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest8.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest8 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest9.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest9.java
new file mode 100644
index 0000000..871b5c3
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest9.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest9 extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final String topicB = "topicB";
+ final int p0 = 0;
+ final int partitionCount = 1;
+ final int replicationFactor = 1;
+ final int maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final int broker0 = 0;
+
+ builder
+ // create topicB with 1 partition and 1 RF
+ .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+ .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // alter dir within the replica, we only expect one replicaId
+ .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+ .expectLeader(topicB, p0, broker0, false)
+ // produce some more events and verify the earliest local offset
+ .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+ .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+ // consume from the beginning of the topic to read data from local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+ .consume(topicB, p0, 0L, 4, 3);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
index 5f10df5..8bfc0f5 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
@@ -72,6 +72,7 @@
.expectFetchFromTieredStorage(broker0, topicA, p0, 1)
.consume(topicA, p0, 3L, 2, 1)
+
// switch leader to change the leader-epoch from 0 to 1
.expectLeader(topicA, p0, broker1, true)
// produce some more messages and move the log-start-offset such that earliest-epoch changes from 0 to 1
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
index 1e97360..207f9bd 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
@@ -74,7 +74,7 @@
public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq<TopicPartition> topicPartitions) {
JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> {
List<BrokerLocalStorage> localStorages = JavaConverters.bufferAsJavaList(brokers()).stream()
- .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
+ .map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.setAsJavaSet(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC))
.collect(Collectors.toList());
localStorages
.stream()
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
index 2d26e43..e641d06 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -38,17 +39,17 @@
public final class BrokerLocalStorage {
private final Integer brokerId;
- private final File brokerStorageDirectory;
+ private final Set<File> brokerStorageDirectories;
private final Integer storageWaitTimeoutSec;
private final int storagePollPeriodSec = 1;
private final Time time = Time.SYSTEM;
public BrokerLocalStorage(Integer brokerId,
- String storageDirname,
+ Set<String> storageDirnames,
Integer storageWaitTimeoutSec) {
this.brokerId = brokerId;
- this.brokerStorageDirectory = new File(storageDirname);
+ this.brokerStorageDirectories = storageDirnames.stream().map(File::new).collect(Collectors.toSet());
this.storageWaitTimeoutSec = storageWaitTimeoutSec;
}
@@ -56,6 +57,10 @@
return brokerId;
}
+ public Set<File> getBrokerStorageDirectories() {
+ return brokerStorageDirectories;
+ }
+
/**
* Wait until the first segment offset in Apache Kafka storage for the given topic-partition is
* equal to the provided offset.
@@ -141,7 +146,12 @@
if (offsetToSearch.equals(firstLogFileBaseOffset)) {
return true;
}
- File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), topicPartition.toString());
+ File logDir = brokerStorageDirectories.stream()
+ .filter(dir -> dirContainsTopicPartition(topicPartition, dir))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " +
+ "was not found", brokerId, topicPartition)));
+ File partitionDir = new File(logDir.getAbsolutePath(), topicPartition.toString());
File firstSegmentFile = new File(partitionDir.getAbsolutePath(),
LogFileUtils.filenamePrefixFromOffset(firstLogFileBaseOffset) + LogFileUtils.LOG_FILE_SUFFIX);
try (FileRecords fileRecords = FileRecords.open(firstSegmentFile, false)) {
@@ -157,13 +167,15 @@
}
public void eraseStorage() throws IOException {
- for (File file : Objects.requireNonNull(brokerStorageDirectory.listFiles())) {
- Utils.delete(file);
+ for (File brokerDir : brokerStorageDirectories) {
+ for (File file : Objects.requireNonNull(brokerDir.listFiles())) {
+ Utils.delete(file);
+ }
}
}
private OffsetHolder getEarliestLocalOffset(TopicPartition topicPartition) {
- List<String> partitionFiles = getTopicPartitionFiles(topicPartition);
+ List<String> partitionFiles = getTopicPartitionFileNames(topicPartition);
Optional<String> firstLogFile = partitionFiles.stream()
.filter(filename -> filename.endsWith(LogFileUtils.LOG_FILE_SUFFIX))
.sorted()
@@ -175,8 +187,29 @@
return new OffsetHolder(LogFileUtils.offsetFromFileName(firstLogFile.get()), partitionFiles);
}
- private List<String> getTopicPartitionFiles(TopicPartition topicPartition) {
- File[] files = brokerStorageDirectory.listFiles((dir, name) -> name.equals(topicPartition.toString()));
+ public boolean dirContainsTopicPartition(TopicPartition topicPartition, File logDir) {
+ File[] files = getTopicPartitionFiles(topicPartition, Collections.singleton(logDir));
+ return files != null && files.length > 0;
+ }
+
+ private File[] getTopicPartitionFiles(TopicPartition topicPartition) {
+ return getTopicPartitionFiles(topicPartition, brokerStorageDirectories);
+ }
+
+ private File[] getTopicPartitionFiles(TopicPartition topicPartition, Set<File> logDirs) {
+ File[] files = null;
+ for (File brokerDir : logDirs) {
+ files = brokerDir.listFiles((dir, name) -> name.equals(topicPartition.toString()));
+ // currently, we only expect one topic partition in one log dir
+ if (files != null && files.length != 0) {
+ break;
+ }
+ }
+ return files;
+ }
+
+ private List<String> getTopicPartitionFileNames(TopicPartition topicPartition) {
+ File[] files = getTopicPartitionFiles(topicPartition);
if (files == null || files.length == 0) {
throw new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " +
"was not found", brokerId, topicPartition));
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
index 1c2aef8..13cfca2 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.tiered.storage.utils;
+import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
@@ -55,7 +57,7 @@
// Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before
// segments eligible for deletion gets physically removed.
- public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+ public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 20;
// The default value of log cleanup interval is 30 secs, and it increases the test execution time.
private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
private static final Integer RLM_TASK_INTERVAL_MS = 500;
@@ -149,6 +151,9 @@
overridingProps.setProperty(storageConfigPrefix(testClassName, DELETE_ON_CLOSE_CONFIG), "false");
// Set a small number of retry interval for retrying RemoteLogMetadataManager resources initialization to speed up the test
overridingProps.setProperty(metadataConfigPrefix(testClassName, REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP), RLMM_INIT_RETRY_INTERVAL_MS.toString());
+ // Set 2 log dirs to make sure JBOD feature is working correctly
+ overridingProps.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, TestUtils.tempDir().getAbsolutePath() + "," + TestUtils.tempDir().getAbsolutePath());
+
return overridingProps;
}
diff --git a/storage/src/test/resources/log4j.properties b/storage/src/test/resources/log4j.properties
index 7ee388a..a9c7bb7 100644
--- a/storage/src/test/resources/log4j.properties
+++ b/storage/src/test/resources/log4j.properties
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=OFF, stdout
+log4j.rootLogger=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
@@ -25,4 +25,5 @@
log4j.logger.org.apache.kafka.server.log.remote.storage=INFO
log4j.logger.org.apache.kafka.server.log.remote.metadata.storage=INFO
+log4j.logger.org.apache.kafka.storage.internals.log=INFO
log4j.logger.kafka.log.remote=INFO
diff --git a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
index 104f21e..b654741 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
@@ -32,6 +32,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -48,6 +49,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Disabled
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL)
@Tag("integration")
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 867afc3..49461e0 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -28,6 +28,7 @@
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -43,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Disabled
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.KRAFT)
@Tag("integration")
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 231222e..d3f4e86 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -35,6 +35,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -52,6 +53,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+@Disabled
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index e2267e5..24b8fb1 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -30,6 +30,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.collection.JavaConverters;
@@ -61,6 +62,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Disabled
@SuppressWarnings("deprecation")
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = {
diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
index aa858f4..de07062 100644
--- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
@@ -25,6 +25,7 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -43,6 +44,7 @@
import static java.util.Arrays.stream;
+@Disabled
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.KRAFT)
@Tag("integration")
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
index c15a467..e9eb3cf 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
@@ -38,6 +38,7 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -53,6 +54,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+@Disabled
@Tag("integration")
@ExtendWith(ClusterTestExtensions.class)
public class DeleteOffsetsConsumerGroupCommandIntegrationTest {