Merge remote-tracking branch 'origin' into testRemoteLogManagerRemoteMetrics
diff --git a/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java
deleted file mode 100644
index 8561fae..0000000
--- a/core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.message.FetchResponseData.PartitionData;
-
-import java.util.Optional;
-
-/**
- The replica alter log dirs tier state machine is unsupported but is provided to the ReplicaAlterLogDirsThread.
- */
-public class ReplicaAlterLogDirsTierStateMachine implements TierStateMachine {
-
-    public PartitionFetchState start(TopicPartition topicPartition,
-                                     PartitionFetchState currentFetchState,
-                                     PartitionData fetchPartitionData) throws Exception {
-        // JBOD is not supported with tiered storage.
-        throw new UnsupportedOperationException("Building remote log aux state is not supported in ReplicaAlterLogDirsThread.");
-    }
-
-    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
-                                                           PartitionFetchState currentFetchState) {
-        return Optional.empty();
-    }
-}
diff --git a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
deleted file mode 100644
index 0462e12..0000000
--- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import kafka.cluster.Partition;
-import kafka.log.UnifiedLog;
-import kafka.log.remote.RemoteLogManager;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.message.FetchResponseData.PartitionData;
-import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
-import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.common.CheckpointFile;
-import org.apache.kafka.server.common.OffsetAndEpoch;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
-import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
-import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
-import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
-import org.apache.kafka.storage.internals.log.EpochEntry;
-import org.apache.kafka.storage.internals.log.LogFileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Option;
-import scala.collection.JavaConverters;
-
-import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented;
-
-/**
- The replica fetcher tier state machine follows a state machine progression.
-
- Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
- There is no need to advance the state.
-
- When started, the tier state machine will fetch the local log start offset of the
- leader and then build the follower's remote log aux state until the leader's
- local log start offset.
- */
-public class ReplicaFetcherTierStateMachine implements TierStateMachine {
-    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
-
-    private final LeaderEndPoint leader;
-    private final ReplicaManager replicaMgr;
-
-    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
-                                          ReplicaManager replicaMgr) {
-        this.leader = leader;
-        this.replicaMgr = replicaMgr;
-    }
-
-
-    /**
-     * Start the tier state machine for the provided topic partition. Currently, this start method will build the
-     * entire remote aux log state synchronously.
-     *
-     * @param topicPartition the topic partition
-     * @param currentFetchState the current PartitionFetchState which will
-     *                          be used to derive the return value
-     * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
-     *
-     * @return the new PartitionFetchState after the successful start of the
-     *         tier state machine
-     */
-    public PartitionFetchState start(TopicPartition topicPartition,
-                                     PartitionFetchState currentFetchState,
-                                     PartitionData fetchPartitionData) throws Exception {
-
-        OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
-        int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
-        long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
-
-        long offsetToFetch = 0;
-        replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
-        replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
-
-        try {
-            offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset());
-        } catch (RemoteStorageException e) {
-            replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
-            replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
-            throw e;
-        }
-
-        OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
-        long leaderEndOffset = fetchLatestOffsetResult.offset();
-
-        long initialLag = leaderEndOffset - offsetToFetch;
-
-        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
-                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
-    }
-
-    /**
-     * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560.
-     *
-     * @param topicPartition the topic partition
-     * @param currentFetchState the current PartitionFetchState which will
-     *                          be used to derive the return value
-     *
-     * @return the original PartitionFetchState
-     */
-    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
-                                                           PartitionFetchState currentFetchState) {
-        // No-op for now
-        return Optional.of(currentFetchState);
-    }
-
-    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
-                                                      TopicPartition partition,
-                                                      Integer currentLeaderEpoch) {
-        int previousEpoch = epoch - 1;
-
-        // Find the end-offset for the epoch earlier to the given epoch from the leader
-        Map<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
-        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
-        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
-        if (maybeEpochEndOffset.isEmpty()) {
-            throw new KafkaException("No response received for partition: " + partition);
-        }
-
-        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
-        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
-            throw Errors.forCode(epochEndOffset.errorCode()).exception();
-        }
-
-        return epochEndOffset;
-    }
-
-    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
-                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
-        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
-        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
-            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
-            return readBuffer.read();
-        }
-    }
-
-    private void buildProducerSnapshotFile(File snapshotFile,
-                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
-        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
-        // Copy it to snapshot file in atomic manner.
-        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
-                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
-        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
-    }
-
-    /**
-     * It tries to build the required state for this partition from leader and remote storage so that it can start
-     * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the
-     * next offset following the end offset of the remote log portion.
-     */
-    private Long buildRemoteLogAuxState(TopicPartition topicPartition,
-                                        Integer currentLeaderEpoch,
-                                        Long leaderLocalLogStartOffset,
-                                        Integer epochForLeaderLocalLogStartOffset,
-                                        Long leaderLogStartOffset) throws IOException, RemoteStorageException {
-
-        UnifiedLog unifiedLog = replicaMgr.localLogOrException(topicPartition);
-
-        long nextOffset;
-
-        if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) {
-            if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
-
-            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
-
-            // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
-            // until that offset
-            long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
-            int targetEpoch;
-            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
-            // will have the same epoch.
-            if (epochForLeaderLocalLogStartOffset == 0) {
-                targetEpoch = epochForLeaderLocalLogStartOffset;
-            } else {
-                // Fetch the earlier epoch/end-offset(exclusive) from the leader.
-                EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
-                // Check if the target offset lies within the range of earlier epoch. Here, epoch's end-offset is exclusive.
-                if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
-                    // Always use the leader epoch from returned earlierEpochEndOffset.
-                    // This gives the respective leader epoch, that will handle any gaps in epochs.
-                    // For ex, leader epoch cache contains:
-                    // leader-epoch   start-offset
-                    //  0               20
-                    //  1               85
-                    //  <2> - gap no messages were appended in this leader epoch.
-                    //  3               90
-                    //  4               98
-                    // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
-                    // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
-                    // So, for offset 89, we should return leader epoch as 1 like below.
-                    targetEpoch = earlierEpochEndOffset.leaderEpoch();
-                } else {
-                    targetEpoch = epochForLeaderLocalLogStartOffset;
-                }
-            }
-
-            Optional<RemoteLogSegmentMetadata> maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset);
-
-            if (maybeRlsm.isPresent()) {
-                RemoteLogSegmentMetadata remoteLogSegmentMetadata = maybeRlsm.get();
-                // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
-                // segments from (remoteLogSegmentMetadata.endOffset() + 1)
-                // Assign nextOffset with the offset from which next fetch should happen.
-                nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
-
-                // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
-                Partition partition = replicaMgr.getPartitionOrException(topicPartition);
-                partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset));
-
-                // Increment start offsets
-                unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
-                unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented);
-
-                // Build leader epoch cache.
-                List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata);
-                if (unifiedLog.leaderEpochCache().isDefined()) {
-                    unifiedLog.leaderEpochCache().get().assign(epochs);
-                }
-
-                log.debug("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), partition);
-
-                // Restore producer snapshot
-                File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
-                buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata, rlm);
-
-                // Reload producer snapshots.
-                unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
-                unifiedLog.loadProducerState(nextOffset);
-                log.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, " +
-                                "with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}",
-                        partition, unifiedLog.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset);
-            } else {
-                throw new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition +
-                        ", currentLeaderEpoch: " + currentLeaderEpoch +
-                        ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset +
-                        ", leaderLogStartOffset: " + leaderLogStartOffset +
-                        ", epoch: " + targetEpoch +
-                        "as the previous remote log segment metadata was not found");
-            }
-        } else {
-            // If the tiered storage is not enabled throw an exception back so that it will retry until the tiered storage
-            // is set as expected.
-            throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled");
-        }
-
-        return nextOffset;
-    }
-}
diff --git a/core/src/main/java/kafka/server/TierStateMachine.java b/core/src/main/java/kafka/server/TierStateMachine.java
index 58a44cc..085e6c0 100644
--- a/core/src/main/java/kafka/server/TierStateMachine.java
+++ b/core/src/main/java/kafka/server/TierStateMachine.java
@@ -17,15 +17,65 @@
 
 package kafka.server;
 
-import java.util.Optional;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import kafka.cluster.Partition;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.FetchResponseData.PartitionData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.common.OffsetAndEpoch;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.apache.kafka.storage.internals.log.LogFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import static org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented;
 
 /**
- * This interface defines the APIs needed to handle any state transitions related to tiering
+ *  This class defines the APIs and implementation needed to handle any state transitions related to tiering
+ *
+ *  When started, the tier state machine will fetch the local log start offset of the
+ *  leader and then build the follower's remote log aux state until the leader's
+ *  local log start offset.
  */
-public interface TierStateMachine {
+public class TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(TierStateMachine.class);
+
+    private final LeaderEndPoint leader;
+    private final ReplicaManager replicaMgr;
+    private final boolean useFutureLog;
+    public TierStateMachine(LeaderEndPoint leader,
+                            ReplicaManager replicaMgr,
+                            boolean useFutureLog) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.useFutureLog = useFutureLog;
+    }
 
     /**
      * Start the tier state machine for the provided topic partition.
@@ -40,19 +90,176 @@
      */
     PartitionFetchState start(TopicPartition topicPartition,
                               PartitionFetchState currentFetchState,
-                              PartitionData fetchPartitionData) throws Exception;
+                              PartitionData fetchPartitionData) throws Exception {
+        OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+        long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+        long offsetToFetch;
+        replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+        replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+        UnifiedLog unifiedLog;
+        if (useFutureLog) {
+            unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+        } else {
+            unifiedLog = replicaMgr.localLogOrException(topicPartition);
+        }
+
+        try {
+            offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset(), unifiedLog);
+        } catch (RemoteStorageException e) {
+            replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+            replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+            throw e;
+        }
+
+        OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+    }
+
+    private OffsetForLeaderEpochResponseData.EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                                                       TopicPartition partition,
+                                                                                       Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+        Option<OffsetForLeaderEpochResponseData.EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+                                           long nextOffset,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        // Restore producer snapshot
+        File snapshotFile = LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+        Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile, snapshotFile.toPath(), false);
+
+        // Reload producer snapshots.
+        unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+        unifiedLog.loadProducerState(nextOffset);
+    }
 
     /**
-     * Optionally advance the state of the tier state machine, based on the
-     * current PartitionFetchState. The decision to advance the tier
-     * state machine is implementation specific.
-     *
-     * @param topicPartition the topic partition
-     * @param currentFetchState the current PartitionFetchState which will
-     *                          be used to derive the return value
-     *
-     * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the
+     * next offset following the end offset of the remote log portion.
      */
-    Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
-                                                    PartitionFetchState currentFetchState);
+    private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset,
+                                        UnifiedLog unifiedLog) throws IOException, RemoteStorageException {
+
+        if (!unifiedLog.remoteStorageSystemEnable() || !unifiedLog.config().remoteStorageEnable()) {
+            // If the tiered storage is not enabled throw an exception back so that it will retry until the tiered storage
+            // is set as expected.
+            throw new RemoteStorageException("Couldn't build the state from remote store for partition " + topicPartition + ", as remote log storage is not yet enabled");
+        }
+
+        if (replicaMgr.remoteLogManager().isEmpty())
+            throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+        RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+        // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+        // until that offset
+        long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+        int targetEpoch;
+        // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+        // will have the same epoch.
+        if (epochForLeaderLocalLogStartOffset == 0) {
+            targetEpoch = epochForLeaderLocalLogStartOffset;
+        } else {
+            // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+            OffsetForLeaderEpochResponseData.EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+            // Check if the target offset lies within the range of earlier epoch. Here, epoch's end-offset is exclusive.
+            if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
+                // Always use the leader epoch from returned earlierEpochEndOffset.
+                // This gives the respective leader epoch, that will handle any gaps in epochs.
+                // For ex, leader epoch cache contains:
+                // leader-epoch   start-offset
+                //  0               20
+                //  1               85
+                //  <2> - gap no messages were appended in this leader epoch.
+                //  3               90
+                //  4               98
+                // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+                // So, for offset 89, we should return leader epoch as 1 like below.
+                targetEpoch = earlierEpochEndOffset.leaderEpoch();
+            } else {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            }
+        }
+
+        RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
+                .orElseThrow(() -> new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition +
+                        ", currentLeaderEpoch: " + currentLeaderEpoch +
+                        ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset +
+                        ", leaderLogStartOffset: " + leaderLogStartOffset +
+                        ", epoch: " + targetEpoch +
+                        "as the previous remote log segment metadata was not found"));
+
+
+        // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
+        // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+        // Assign nextOffset with the offset from which next fetch should happen.
+        long nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
+
+        // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+        Partition partition = replicaMgr.getPartitionOrException(topicPartition);
+        partition.truncateFullyAndStartAt(nextOffset, useFutureLog, Option.apply(leaderLogStartOffset));
+        // Increment start offsets
+        unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
+        unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, LeaderOffsetIncremented);
+
+        // Build leader epoch cache.
+        List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata);
+        if (unifiedLog.leaderEpochCache().isDefined()) {
+            unifiedLog.leaderEpochCache().get().assign(epochs);
+        }
+
+        log.info("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), partition);
+
+        buildProducerSnapshotFile(unifiedLog, nextOffset, remoteLogSegmentMetadata, rlm);
+
+        log.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, " +
+                        "with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}",
+                partition, unifiedLog.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset);
+
+        return nextOffset;
+    }
 }
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index b2121f5..34ab1b0 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -907,9 +907,16 @@
                                       scheduler: Scheduler,
                                       logDirFailureChannel: LogDirFailureChannel,
                                       logPrefix: String): Unit = {
+    System.err.print(s"del")
+    System.err.flush()
     segmentsToDelete.foreach { segment =>
+      System.err.print(s"ren:${segment.baseOffset()} $dir")
+      System.err.flush()
       if (!segment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
         segment.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX)
+
+      System.err.print(s"ren done")
+      System.err.flush()
     }
 
     def deleteSegments(): Unit = {
@@ -926,6 +933,9 @@
       scheduler.scheduleOnce("delete-file", () => deleteSegments(), config.fileDeleteDelayMs)
     else
       deleteSegments()
+
+    System.err.print(s"del end")
+    System.err.flush()
   }
 
   private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index f69ec42..fa6f275 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -308,7 +308,11 @@
     *  Each call of this function will undo one pause.
     */
   def resumeCleaning(topicPartitions: Iterable[TopicPartition]): Unit = {
+    System.err.print("lock")
+    System.err.flush()
     inLock(lock) {
+      System.err.print("lock get")
+      System.err.flush()
       topicPartitions.foreach {
         topicPartition =>
           inProgress.get(topicPartition) match {
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 3bc6533..4011f6c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1247,6 +1247,7 @@
     // Update the cached map and log cleaner as appropriate.
     futureLogs.remove(topicPartition)
     currentLogs.put(topicPartition, destLog)
+    System.err.print(s"c:${currentLogs.get(topicPartition)}")
     if (cleaner != null) {
       sourceLog.foreach { srcLog =>
         cleaner.alterCheckpointDir(topicPartition, srcLog.parentDirFile, destLog.parentDirFile)
@@ -1394,6 +1395,8 @@
    */
   private def cleanupLogs(): Unit = {
     debug("Beginning log cleanup...")
+    System.err.print(s"start clean ")
+    System.err.flush()
     var total = 0
     val startMs = time.milliseconds
 
@@ -1409,27 +1412,42 @@
       }
     }
 
+    System.err.print(s"c1:${currentLogs.get(new TopicPartition("topicB", 0))}")
+    System.err.flush()
+
     try {
       deletableLogs.foreach {
         case (topicPartition, log) =>
+          if (topicPartition.topic().contains("topicB")) {
+            System.err.print(s"d:${log.dir}")
+            System.err.flush()
+          }
           debug(s"Garbage collecting '${log.name}'")
           total += log.deleteOldSegments()
 
           val futureLog = futureLogs.get(topicPartition)
           if (futureLog != null) {
             // clean future logs
+            System.err.print(s"fut:${futureLog.name} ")
+            System.err.flush()
             debug(s"Garbage collecting future log '${futureLog.name}'")
             total += futureLog.deleteOldSegments()
           }
       }
     } finally {
       if (cleaner != null) {
+        System.err.print(s"cleaner ")
+        System.err.flush()
         cleaner.resumeCleaning(deletableLogs.map(_._1))
+        System.err.print(s"cleaner end")
+        System.err.flush()
       }
     }
 
     debug(s"Log cleanup completed. $total files deleted in " +
                   (time.milliseconds - startMs) / 1000 + " seconds")
+    System.err.print(s"end clean ")
+    System.err.flush()
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index eb6fc51..bff2801 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1447,6 +1447,8 @@
                                 reason: SegmentDeletionReason): Int = {
     lock synchronized {
       val deletable = deletableSegments(predicate)
+      System.err.print(s"toD:${deletable.size}")
+      System.err.flush()
       if (deletable.nonEmpty)
         deleteSegments(deletable, reason)
       else
@@ -1574,6 +1576,10 @@
 
   private def deleteRetentionSizeBreachedSegments(): Int = {
     val retentionSize: Long = localRetentionSize(config, remoteLogEnabled())
+    if (name.contains("topicB")) {
+      System.err.print(s"$name $parentDir $size ")
+      System.err.flush()
+    }
     if (retentionSize < 0 || size < retentionSize) return 0
     var diff = size - retentionSize
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
@@ -2283,6 +2289,8 @@
                                            logDirFailureChannel: LogDirFailureChannel,
                                            parentDir: String,
                                            topicPartition: TopicPartition): Unit = {
+    System.err.print(s"dels:${segments.size}")
+    System.err.flush()
     val snapshotsToDelete = segments.flatMap { segment =>
       producerStateManager.removeAndMarkSnapshotForDeletion(segment.baseOffset).asScala
     }
@@ -2301,6 +2309,9 @@
       scheduler.scheduleOnce("delete-producer-snapshot", () => deleteProducerSnapshots(), config.fileDeleteDelayMs)
     else
       deleteProducerSnapshots()
+
+    System.err.print(s"dels done")
+    System.err.flush()
   }
 
   private[log] def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
@@ -2353,17 +2364,21 @@
     val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabled)
     toDelete.foreach { segment =>
       if (segment.largestRecordTimestamp.isPresent)
-        if (remoteLogEnabled)
+        if (remoteLogEnabled) {
           log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " +
             s"record timestamp in the segment")
-        else
+          System.err.println(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the largest " +
+            s"record timestamp in the segment")
+        } else
           log.info(s"Deleting segment $segment due to log retention time ${retentionMs}ms breach based on the largest " +
             s"record timestamp in the segment")
       else {
-        if (remoteLogEnabled)
+        if (remoteLogEnabled) {
           log.info(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the " +
             s"last modified time of the segment")
-        else
+          System.err.println(s"Deleting segment $segment due to local log retention time ${retentionMs}ms breach based on the " +
+            s"last modified time of the segment")
+        } else
           log.info(s"Deleting segment $segment due to log retention time ${retentionMs}ms breach based on the " +
             s"last modified time of the segment")
       }
@@ -2376,8 +2391,13 @@
     var size = log.size
     toDelete.foreach { segment =>
       size -= segment.size
-      if (remoteLogEnabled) log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabled)} breach. " +
-        s"Local log size after deletion will be $size.")
+      if (remoteLogEnabled) {
+        log.info(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabled)} breach. " +
+          s"Local log size after deletion will be $size.")
+        System.err.println(s"Deleting segment $segment due to local log retention size ${UnifiedLog.localRetentionSize(log.config, remoteLogEnabled)} breach. " +
+          s"Local log size after deletion will be $size.")
+        System.err.flush()
+      }
       else log.info(s"Deleting segment $segment due to log retention size ${log.config.retentionSize} breach. Log size " +
         s"after deletion will be $size.")
     }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index b2650cc..150ab84 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -34,7 +34,7 @@
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group
 import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
 import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
@@ -610,10 +610,6 @@
 
   protected def createRemoteLogManager(): Option[RemoteLogManager] = {
     if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
-      if (config.logDirs.size > 1) {
-        throw new KafkaException("Tiered storage is not supported with multiple log dirs.")
-      }
-
       Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
         (tp: TopicPartition) => logManager.getLog(tp).asJava,
         (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e433741..5cb6535 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1333,9 +1333,6 @@
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
     require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
     require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.")
-    if (isRemoteLogStorageSystemEnabled && logDirs.size > 1) {
-      throw new ConfigException(s"Multiple log directories `${logDirs.mkString(",")}` are not supported when remote log storage is enabled")
-    }
     require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
     require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
       " to prevent unnecessary socket timeouts")
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4a490d7..72457d5 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -44,7 +44,7 @@
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
-import org.apache.kafka.common.{Endpoint, KafkaException, Node, TopicPartition}
+import org.apache.kafka.common.{Endpoint, Node, TopicPartition}
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
@@ -683,10 +683,6 @@
 
   protected def createRemoteLogManager(): Option[RemoteLogManager] = {
     if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
-      if (config.logDirs.size > 1) {
-        throw new KafkaException("Tiered storage is not supported with multiple log dirs.")
-      }
-
       Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
         (tp: TopicPartition) => logManager.getLog(tp).asJava,
         (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 2ea0939..95c7a5a 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -40,7 +40,7 @@
                                 clientId = name,
                                 leader = leader,
                                 failedPartitions,
-                                fetchTierStateMachine = new ReplicaAlterLogDirsTierStateMachine(),
+                                fetchTierStateMachine = new TierStateMachine(leader, replicaMgr, true),
                                 fetchBackOffMs = fetchBackOffMs,
                                 isInterruptible = false,
                                 brokerTopicStats) {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c45a5d6..bb07368 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -37,7 +37,7 @@
                                 clientId = name,
                                 leader = leader,
                                 failedPartitions,
-                                fetchTierStateMachine = new ReplicaFetcherTierStateMachine(leader, replicaMgr),
+                                fetchTierStateMachine = new TierStateMachine(leader, replicaMgr, false),
                                 fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
                                 isInterruptible = false,
                                 replicaMgr.brokerTopicStats) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3549943..37f2df4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -573,6 +573,7 @@
    *                         If no errors occurred, the map will be empty.
    */
   private def stopPartitions(partitionsToStop: Set[StopPartition]): Map[TopicPartition, Throwable] = {
+    System.err.print(s"stop:$partitionsToStop")
     // First stop fetchers for all partitions.
     val partitions = partitionsToStop.map(_.topicPartition)
     replicaFetcherManager.removeFetcherForPartitions(partitions)
@@ -703,6 +704,10 @@
     getPartitionOrException(topicPartition).futureLog.isDefined
   }
 
+  def futureLogOrException(topicPartition: TopicPartition): UnifiedLog = {
+    getPartitionOrException(topicPartition).futureLocalLogOrException
+  }
+
   def localLog(topicPartition: TopicPartition): Option[UnifiedLog] = {
     onlinePartition(topicPartition).flatMap(_.log)
   }
@@ -1744,8 +1749,8 @@
       val leaderLogStartOffset = log.logStartOffset
       val leaderLogEndOffset = log.logEndOffset
 
-      if (params.isFromFollower) {
-        // If it is from a follower then send the offset metadata only as the data is already available in remote
+      if (params.isFromFollower || params.isFromFuture) {
+        // If it is from a follower or from a future replica, then send the offset metadata only as the data is already available in remote
         // storage and throw an error saying that this offset is moved to tiered storage.
         createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
           new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 212e60d..d78be03 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -36,6 +36,7 @@
 import org.apache.kafka.security.PasswordEncoderConfigs;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 import scala.collection.JavaConverters;
@@ -63,6 +64,7 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@Disabled
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
 @ExtendWith(value = ClusterTestExtensions.class)
 @ClusterTestDefaults
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 9d03206..b74a5f5 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -33,6 +33,7 @@
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Collections;
@@ -47,6 +48,8 @@
 import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
 import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
 
+
+@Disabled
 @ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
     @ClusterConfigProperty(key = "default.key", value = "default.value"),
     @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index dc3ff0f..907a3ec 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -28,7 +28,7 @@
 import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, Disabled, Tag, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -39,6 +39,7 @@
 import scala.concurrent.ExecutionException
 import scala.util.Random
 
+@Disabled
 @Tag("integration")
 class RemoteTopicCrudTest extends IntegrationTestHarness {
 
diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index bcfa7f2..e534191 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -33,7 +33,7 @@
 import org.apache.kafka.server.config.KafkaSecurityConfigs
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo, Timeout}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
@@ -46,6 +46,7 @@
  * time to the build. However, if an admin API involves differing interactions with
  * authentication/authorization layers, we may add the test case here.
  */
+@Disabled
 @Timeout(120)
 abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logging {
   def brokerCount = 3
diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 9818b6d..06e53ef 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -34,11 +34,12 @@
 import org.apache.kafka.server.config.{KafkaSecurityConfigs, QuotaConfigs}
 import org.apache.kafka.server.quota._
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 
+@Disabled
 class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 0be5e30..ee57746 100644
--- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -14,7 +14,6 @@
 
 import java.util
 import java.util.Properties
-
 import kafka.security.authorizer.AclAuthorizer
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
@@ -29,7 +28,7 @@
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.config.ZkConfigs
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
@@ -73,6 +72,7 @@
   }
 }
 
+@Disabled
 class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
   import DescribeAuthorizedOperationsTest._
 
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 60dee25..1d9e13c 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -43,7 +43,7 @@
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
@@ -68,6 +68,7 @@
   * would end up with QuorumTestHarness twice.
   */
 @Timeout(60)
+@Disabled
 abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
 
   override val brokerCount = 3
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index 17f12ab..329279e 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -23,7 +23,7 @@
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.server.config.ServerLogConfigs
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -32,6 +32,7 @@
   * Tests where the broker is configured to use LogAppendTime. For tests where LogAppendTime is configured via topic
   * level configs, see the *ProducerSendTest classes.
   */
+@Disabled
 class LogAppendTimeTest extends IntegrationTestHarness {
   val producerCount: Int = 1
   val consumerCount: Int = 1
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 149d3e9..67a2320 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -28,7 +28,7 @@
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -36,6 +36,7 @@
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
+@Disabled
 class MetricsTest extends IntegrationTestHarness with SaslSetup {
 
   override val brokerCount = 1
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 2133019..6fc9e1c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -69,6 +69,7 @@
  *
  * Also see [[org.apache.kafka.clients.admin.KafkaAdminClientTest]] for unit tests of the admin client.
  */
+@Disabled
 class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   import PlaintextAdminIntegrationTest._
 
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
index 365a872..865d2b5 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -22,6 +22,7 @@
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -30,7 +31,7 @@
 import java.util.Optional
 import scala.jdk.CollectionConverters._
 
-
+@Disabled
 class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
   val producerCount: Int = 1
   val brokerCount: Int = 2
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 47e8de0..7fe5dcf 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -33,7 +33,7 @@
 import org.apache.kafka.server.config.ZkConfigs
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 
 import java.util.Collections
 import scala.jdk.CollectionConverters._
@@ -42,6 +42,7 @@
 import scala.concurrent.ExecutionException
 import scala.util.{Failure, Success, Try}
 
+@Disabled
 class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
   val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
 
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index fa13715..406dcfc 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -29,6 +29,7 @@
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -36,6 +37,7 @@
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 
+@Disabled
 class TransactionsBounceTest extends IntegrationTestHarness {
   private val consumeRecordTimeout = 30000
   private val producerBufferSize =  65536
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 993e31d..4c4b58b 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -28,7 +28,7 @@
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -44,6 +44,7 @@
 import scala.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
 
+@Disabled
 class TransactionsTest extends IntegrationTestHarness {
   override def brokerCount = 3
 
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index b66d536..0ed7002 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -54,6 +54,7 @@
   }
 }
 
+@Disabled
 @ClusterTestDefaults(serverProperties = Array(
   new ClusterConfigProperty(key = "transaction.state.log.num.partitions", value = "1")
 ))
diff --git a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index c9f3c46..8013dbc 100644
--- a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -36,6 +36,7 @@
 import java.util.concurrent.{Executors, TimeUnit}
 import scala.jdk.CollectionConverters._
 
+@Disabled
 class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
   val numNodes = 2
   val numParts = 1
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index dde38e6..284d57a 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -38,10 +38,11 @@
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.server.config.KafkaSecurityConfigs
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 
 import scala.jdk.CollectionConverters._
 
+@Disabled
 class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
   override val brokerCount = 1
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index 8ed6064..31049c2 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -29,7 +29,7 @@
 import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}
 import org.junit.jupiter.api.Assertions.{assertThrows, fail}
 import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 
 import java.util.Optional
 import java.util.concurrent.{TimeUnit, TimeoutException}
@@ -42,6 +42,7 @@
  * failure paths is to use timeouts. See {@link unit.kafka.server.BrokerRegistrationRequestTest} for integration test
  * of just the broker registration path.
  */
+@Disabled
 @Timeout(120)
 @Tag("integration")
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 5640f43..5b4156f 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -25,10 +25,12 @@
 import org.apache.kafka.clients.admin.{FeatureUpdate, UpdateFeaturesOptions}
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 
+@Disabled
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class MetadataVersionIntegrationTest {
   @ClusterTests(value = Array(
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 6e4cd7d..f803669 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -51,7 +51,7 @@
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
 import org.apache.kafka.server.config.{ConfigType, KRaftConfigs, ServerLogConfigs, ZkConfigs}
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
-import org.junit.jupiter.api.{Assumptions, Timeout}
+import org.junit.jupiter.api.{Assumptions, Disabled, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.function.Executable
 import org.slf4j.{Logger, LoggerFactory}
@@ -59,7 +59,6 @@
 import java.util
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
 import java.util.{Collections, Optional, UUID}
-import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationIntegrationTest {
@@ -90,6 +89,7 @@
   }
 }
 
+@Disabled
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @Timeout(300)
 class ZkMigrationIntegrationTest {
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 11b7ceb..f975061 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -33,7 +33,6 @@
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.mockito.Mockito.{mock, verify, when}
 
-import java.util.Optional
 import scala.collection.{Map, Set, mutable}
 import scala.jdk.CollectionConverters._
 
@@ -313,12 +312,10 @@
     override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = new OffsetAndEpoch(1L, 0)
   }
 
-  private class MockResizeFetcherTierStateMachine extends TierStateMachine {
+  private class MockResizeFetcherTierStateMachine extends TierStateMachine(null, null, false) {
     override def start(topicPartition: TopicPartition, currentFetchState: PartitionFetchState, fetchPartitionData: PartitionData): PartitionFetchState = {
       throw new UnsupportedOperationException("Materializing tier state is not supported in this test.")
     }
-
-    override def maybeAdvanceState(tp: TopicPartition, currentFetchState: PartitionFetchState): Optional[PartitionFetchState] = Optional.empty[PartitionFetchState]
   }
 
   private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions, fetchTierStateMachine: TierStateMachine)
diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
index 5cb5957..2c7a17a 100644
--- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -28,8 +28,9 @@
 import org.apache.kafka.server.common.ProducerIdsBlock
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index de5720f..b22afe4 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -25,10 +25,12 @@
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.api.extension.ExtendWith
 
 // TODO: Introduce template in ClusterTests https://issues.apache.org/jira/browse/KAFKA-16595
 //  currently we can't apply template in ClusterTests hence we see bunch of duplicate settings in ClusterTests
+@Disabled
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
diff --git a/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
index f17b27e..5f5b963 100644
--- a/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseFetchRequestTest.scala
@@ -31,6 +31,7 @@
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
+
 class BaseFetchRequestTest extends BaseRequestTest {
 
   protected var producer: KafkaProducer[String, String] = _
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index 1e3adc3..ce1f239 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -22,12 +22,13 @@
 import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.{AfterEach, Disabled}
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 
+@Disabled
 @ClusterTestDefaults(clusterType = Type.ALL)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class BrokerMetricNamesTest(cluster: ClusterInstance) {
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 6c6199f..33318e0 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -35,13 +35,14 @@
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 
 import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
 
 /**
  * This test simulates a broker registering with the KRaft quorum under different configurations.
  */
+@Disabled
 @Timeout(120)
 @Tag("integration")
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index ed80f80..b5a8e6a 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -31,11 +31,12 @@
 import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
 import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.{Disabled, Tag}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 
+@Disabled
 @ClusterTestDefaults(clusterType = Type.ALL)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @Tag("integration")
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 250cc83..cb87b0a 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -25,13 +25,13 @@
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertNotNull}
-import org.junit.jupiter.api.Tag
-import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.util.stream.Collectors
 import scala.jdk.CollectionConverters._
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 03eddbe..aca930e 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -25,10 +25,10 @@
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Tag
-import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index df8864b..a0531fa 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -27,12 +27,13 @@
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
 
 import java.util
 import scala.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
 
+@Disabled
 class DelegationTokenRequestsTest extends IntegrationTestHarness with SaslSetup {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   private val kafkaClientSaslMechanism = "PLAIN"
diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 92cea4c..230d8e8 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -23,9 +23,10 @@
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 45f967f..e4d64ec 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -23,11 +23,12 @@
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index 2a94655..6f9bec9 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -25,12 +25,13 @@
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, DescribeQuorumResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 2411e61..63db3ee 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -26,6 +26,7 @@
 import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -40,6 +41,7 @@
   * Subclasses of `BaseConsumerTest` exercise the consumer and fetch request/response. This class
   * complements those classes with tests that require lower-level access to the protocol.
   */
+@Disabled
 class FetchRequestTest extends BaseFetchRequestTest {
 
   @ParameterizedTest
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
index b907384..73b571c 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
@@ -22,7 +22,7 @@
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{Disabled, Test}
 
 import java.util.Properties
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_2_IV0
@@ -31,6 +31,7 @@
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
+@Disabled
 class FetchRequestWithLegacyMessageFormatTest extends BaseFetchRequestTest {
 
   override def brokerPropertyOverrides(properties: Properties): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 80a308e..b6794c6 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -25,13 +25,14 @@
 import org.apache.kafka.common.message.SyncGroupRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.util.Collections
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
index 21c0de1..7ffad0d 100644
--- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
@@ -27,7 +27,7 @@
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.util.Collections
@@ -36,6 +36,7 @@
 import scala.concurrent.{Await, Future}
 import scala.jdk.CollectionConverters._
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e67bcbb..a59d07b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1859,20 +1859,13 @@
   }
 
   @Test
-  def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, String.valueOf(true))
-    props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a,/tmp/b")
-
-    val caught = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
-    assertTrue(caught.getMessage.contains("Multiple log directories `/tmp/a,/tmp/b` are not supported when remote log storage is enabled"))
-  }
-
-  @Test
   def testSingleLogDirectoryWithRemoteLogStorage(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, String.valueOf(true))
     props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a")
     assertDoesNotThrow(() => KafkaConfig.fromProps(props))
+
+    props.put(ServerLogConfigs.LOG_DIRS_CONFIG, "/tmp/a,/tmp/b")
+    assertDoesNotThrow(() => KafkaConfig.fromProps(props))
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 3eaff93..d9144ec 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -23,9 +23,10 @@
 import org.apache.kafka.common.requests.JoinGroupRequest
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index c03dffc..c80de1b 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -25,9 +25,10 @@
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import org.apache.kafka.coordinator.group.Group
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
index 9e22560..86df92d 100644
--- a/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/server/MockTierStateMachine.scala
@@ -20,9 +20,7 @@
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.FetchResponseData
 
-import java.util.Optional
-
-class MockTierStateMachine(leader: LeaderEndPoint) extends ReplicaFetcherTierStateMachine(leader, null) {
+class MockTierStateMachine(leader: LeaderEndPoint) extends TierStateMachine(leader, null, false) {
 
   var fetcher: MockFetcherThread = _
 
@@ -37,11 +35,6 @@
       Fetching, Some(currentFetchState.currentLeaderEpoch))
   }
 
-  override def maybeAdvanceState(topicPartition: TopicPartition,
-                                 currentFetchState: PartitionFetchState): Optional[PartitionFetchState] = {
-    Optional.of(currentFetchState)
-  }
-
   def setFetcher(mockFetcherThread: MockFetcherThread): Unit = {
     fetcher = mockFetcherThread
   }
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 346b170..8f78989 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -21,9 +21,10 @@
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.junit.jupiter.api.Assertions.fail
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index 8b4561d..ecfcd8e 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -21,9 +21,10 @@
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.junit.jupiter.api.Assertions.fail
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 79ec402..80a5acb 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -20,18 +20,16 @@
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
-
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
-
 import scala.jdk.CollectionConverters._
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 34a165c..809e00a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.{AfterEach, Disabled}
 import kafka.utils.TestUtils
 import TestUtils._
 import kafka.api.IntegrationTestHarness
@@ -27,6 +27,7 @@
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+@Disabled
 class ReplicaFetchTest extends IntegrationTestHarness {
   val topic1 = "foo"
   val topic2 = "bar"
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ab25864..e443999 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3285,12 +3285,8 @@
     val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
     val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
     val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
-    if (enableRemoteStorage) {
-      props.put("log.dirs", path1)
-      props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, enableRemoteStorage.toString)
-    } else {
-      props.put("log.dirs", path1 + "," + path2)
-    }
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, enableRemoteStorage.toString)
+    props.put("log.dirs", path1 + "," + path2)
     propsModifier.apply(props)
     val config = KafkaConfig.fromProps(props)
     val logProps = new Properties()
@@ -4981,9 +4977,8 @@
     assertEquals(followerPartitions, actualFollowerPartitions)
   }
 
-  // KAFKA-16031: Enabling remote storage after JBOD is supported in tiered storage
   @ParameterizedTest
-  @ValueSource(booleans = Array(false))
+  @ValueSource(booleans = Array(true, false))
   def testApplyDeltaShouldHandleReplicaAssignedToOnlineDirectory(enableRemoteStorage: Boolean): Unit = {
     val localId = 1
     val topicPartition0 = new TopicPartition("foo", 0)
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 4b7e129..85fa37c 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -34,7 +34,7 @@
 import org.apache.kafka.server.config.KafkaSecurityConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}
 
 import scala.jdk.CollectionConverters._
 
@@ -69,6 +69,7 @@
   }
 }
 
+@Disabled
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
   private var sasl: SaslSetup = _
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index ac0d068..be7a4a9 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -25,7 +25,7 @@
 import org.apache.kafka.common.message.SyncGroupRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
-import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.{Disabled, Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import java.util.Collections
@@ -33,6 +33,7 @@
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
 
+@Disabled
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
similarity index 93%
rename from core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala
rename to core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
index af7d1ce..139aeb0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherTierStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala
@@ -23,20 +23,21 @@
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
 import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.Map
 
-class ReplicaFetcherTierStateMachineTest {
+class TierStateMachineTest {
 
-  val truncateOnFetch = true
   val topicIds = Map("topic1" -> Uuid.randomUuid(), "topic2" -> Uuid.randomUuid())
   val version = ApiKeys.FETCH.latestVersion()
   private val failedPartitions = new FailedPartitions
 
-  @Test
-  def testFollowerFetchMovedToTieredStore(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testFollowerFetchMovedToTieredStore(truncateOnFetch: Boolean): Unit = {
     val partition = new TopicPartition("topic", 0)
 
     val replicaLog = Seq(
@@ -94,8 +95,9 @@
    *    tiered storage as well. Hence, `X < globalLogStartOffset`.
    * 4. Follower comes online and tries to fetch X from leader.
    */
-  @Test
-  def testFollowerFetchOffsetOutOfRangeWithTieredStore(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testFollowerFetchOffsetOutOfRangeWithTieredStore(truncateOnFetch: Boolean): Unit = {
     val partition = new TopicPartition("topic", 0)
 
     val replicaLog = Seq(
@@ -105,7 +107,7 @@
 
     val replicaState = PartitionState(replicaLog, leaderEpoch = 7, highWatermark = 0L, rlmEnabled = true)
 
-    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
     val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
     val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
 
@@ -153,8 +155,9 @@
     assertEquals(11L, replicaState.logEndOffset)
   }
 
-  @Test
-  def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testFencedOffsetResetAfterMovedToRemoteTier(truncateOnFetch: Boolean): Unit = {
     val partition = new TopicPartition("topic", 0)
     var isErrorHandled = false
     val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = truncateOnFetch, version = version)
@@ -189,4 +192,5 @@
     assertTrue(fetcher.fetchState(partition).isEmpty)
     assertTrue(failedPartitions.contains(partition))
   }
-}
+
+}
\ No newline at end of file
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index ed310f4..2b41c03 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -107,7 +107,7 @@
         long offset = recordMetadata.offset();
         long startTimeMs = time.milliseconds();
         long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
-        log.info("Wait until the consumer is caught up with the target partition {} up-to offset {}", partition, offset);
+//        log.info("Wait until the consumer is caught up with the target partition {} up-to offset {}", partition, offset);
         while (true) {
             long readOffset = consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
             if (readOffset >= offset) {
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
index 700a8e4..4af8cd1 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchParams.java
@@ -22,6 +22,8 @@
 import java.util.Objects;
 import java.util.Optional;
 
+import static org.apache.kafka.common.requests.FetchRequest.FUTURE_LOCAL_REPLICA_ID;
+
 public class FetchParams {
     public final short requestVersion;
     public final int replicaId;
@@ -56,6 +58,10 @@
         return FetchRequest.isValidBrokerId(replicaId);
     }
 
+    public boolean isFromFuture() {
+        return replicaId == FUTURE_LOCAL_REPLICA_ID;
+    }
+
     public boolean isFromConsumer() {
         return FetchRequest.isConsumer(replicaId);
     }
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 104eb9a..fb9c969 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -804,6 +804,13 @@
     // Helper method for `deleteIfExists()`
     private Void deleteTypeIfExists(StorageAction<Boolean, IOException> delete, String fileType, File file, boolean logIfMissing) throws IOException {
         try {
+//            try {
+//                Random r = new Random();
+//                int s = r.nextInt( 2000);
+//                Thread.sleep(s);
+//            } catch (InterruptedException e) {
+//                throw new RuntimeException(e);
+//            }
             if (delete.execute())
                 LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath());
             else {
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index abad6ea..438bb98 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Disabled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,7 @@
 /**
  * A test harness class that brings up 3 brokers and registers {@link TopicBasedRemoteLogMetadataManager} on broker with id as 0.
  */
+@Disabled
 public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHarness {
     private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerHarness.class);
 
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
index 6272d0e..7f27598 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.tiered.storage;
 
+import org.apache.kafka.tiered.storage.actions.AlterLogDirAction;
 import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
 import org.apache.kafka.tiered.storage.actions.ConsumeAction;
 import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
@@ -313,6 +314,14 @@
         return this;
     }
 
+    public TieredStorageTestBuilder alterLogDir(String topic,
+                                                Integer partition,
+                                                int replicaIds) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new AlterLogDirAction(topicPartition, replicaIds));
+        return this;
+    }
+
     public TieredStorageTestBuilder expectUserTopicMappedToMetadataPartitions(String topic,
                                                                               List<Integer> metadataPartitions) {
         actions.add(new ExpectUserTopicMappedToMetadataPartitionsAction(topic, metadataPartitions));
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index 274bbf8..5d0172c 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -102,7 +102,7 @@
 
     // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() in the ParameterizedTest name.
     @ParameterizedTest(name = "{displayName}.quorum={0}")
-    @ValueSource(strings = {"zk", "kraft"})
+    @ValueSource(strings = {"kraft"})
     public void executeTieredStorageTest(String quorum) {
         TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
         writeTestSpecifications(builder);
@@ -154,7 +154,7 @@
     @SuppressWarnings("deprecation")
     public static List<BrokerLocalStorage> localStorages(Seq<KafkaBroker> brokers) {
         return JavaConverters.seqAsJavaList(brokers).stream()
-                .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(),
+                .map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.setAsJavaSet(b.config().logDirs().toSet()),
                         STORAGE_WAIT_TIMEOUT_SEC))
                 .collect(Collectors.toList());
     }
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java
new file mode 100644
index 0000000..34f4c7d
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public final class AlterLogDirAction implements TieredStorageTestAction {
+
+    private final TopicPartition topicPartition;
+    private final int brokerId;
+
+    public AlterLogDirAction(TopicPartition topicPartition,
+                             int brokerId) {
+        this.topicPartition = topicPartition;
+        this.brokerId = brokerId;
+    }
+
+    @Override
+    public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
+        Optional<BrokerLocalStorage> localStorage = context.localStorages().stream().filter(storage -> storage.getBrokerId() == brokerId).findFirst();
+        if (!localStorage.isPresent()) {
+            throw new IllegalArgumentException("cannot find local storage for this topic partition:" + topicPartition + " in this broker id:" + brokerId);
+        }
+
+        Optional<File> sourceDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst();
+        if (!sourceDir.isPresent()) {
+            throw new IllegalArgumentException("No log dir with topic partition:" + topicPartition + " in this broker id:" + brokerId);
+        }
+        Optional<File> targetDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> !localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst();
+        if (!targetDir.isPresent()) {
+            throw new IllegalArgumentException("No log dir without topic partition:" + topicPartition + " in this broker id:" + brokerId);
+        }
+
+        // build alterReplicaLogDirs request content to move from sourceDir to targetDir
+        TopicPartitionReplica topicPartitionReplica = new TopicPartitionReplica(topicPartition.topic(), topicPartition.partition(), brokerId);
+        Map<TopicPartitionReplica, String> logDirs = Collections.singletonMap(topicPartitionReplica, targetDir.get().getAbsolutePath());
+
+        AlterReplicaLogDirsResult results = context.admin().alterReplicaLogDirs(logDirs);
+        results.values().get(topicPartitionReplica).get(30, TimeUnit.SECONDS);
+
+        // wait until the topic partition folder disappears from source dir and appears in the target dir
+        TestUtils.waitForCondition(() -> localStorage.get().dirContainsTopicPartition(topicPartition, targetDir.get()) &&
+                    !localStorage.get().dirContainsTopicPartition(topicPartition, sourceDir.get()),
+                "Failed to alter dir:" + logDirs);
+    }
+
+    @Override
+    public void describe(PrintStream output) {
+        output.print("alter dir for topic partition:" + topicPartition + " in this broker id:" + brokerId);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
new file mode 100644
index 0000000..5a7e516
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest10.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest10.java
new file mode 100644
index 0000000..f27b3a7
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest10.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest10 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest2.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest2.java
new file mode 100644
index 0000000..f563d8d
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest2.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest2 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest3.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest3.java
new file mode 100644
index 0000000..b431c4c
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest3.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest3 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest4.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest4.java
new file mode 100644
index 0000000..a569c09
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest4.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest4 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest5.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest5.java
new file mode 100644
index 0000000..10ef503
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest5.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest5 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest6.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest6.java
new file mode 100644
index 0000000..f46ed82
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest6.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest6 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest7.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest7.java
new file mode 100644
index 0000000..74a2ab2
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest7.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest7 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest8.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest8.java
new file mode 100644
index 0000000..b693d08
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest8.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest8 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest9.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest9.java
new file mode 100644
index 0000000..871b5c3
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest9.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest9 extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final String topicB = "topicB";
+        final int p0 = 0;
+        final int partitionCount = 1;
+        final int replicationFactor = 1;
+        final int maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final int broker0 = 0;
+
+        builder
+                // create topicB with 1 partition and 1 RF
+                .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // alter dir within the replica, we only expect one replicaId
+                .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
+                .expectLeader(topicB, p0, broker0, false)
+                // produce some more events and verify the earliest local offset
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
+                .produce(topicB, p0, new KeyValueSpec("k3", "v3"))
+                // consume from the beginning of the topic to read data from local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicB, p0, 3)
+                .consume(topicB, p0, 0L, 4, 3);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
index 5f10df5..8bfc0f5 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsDueToLogStartOffsetBreachTest.java
@@ -72,6 +72,7 @@
                 .expectFetchFromTieredStorage(broker0, topicA, p0, 1)
                 .consume(topicA, p0, 3L, 2, 1)
 
+
                 // switch leader to change the leader-epoch from 0 to 1
                 .expectLeader(topicA, p0, broker1, true)
                 // produce some more messages and move the log-start-offset such that earliest-epoch changes from 0 to 1
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
index 1e97360..207f9bd 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
@@ -74,7 +74,7 @@
     public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq<TopicPartition> topicPartitions) {
         JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> {
             List<BrokerLocalStorage> localStorages = JavaConverters.bufferAsJavaList(brokers()).stream()
-                    .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
+                    .map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.setAsJavaSet(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC))
                     .collect(Collectors.toList());
             localStorages
                     .stream()
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
index 2d26e43..e641d06 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
@@ -31,6 +31,7 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -38,17 +39,17 @@
 public final class BrokerLocalStorage {
 
     private final Integer brokerId;
-    private final File brokerStorageDirectory;
+    private final Set<File> brokerStorageDirectories;
     private final Integer storageWaitTimeoutSec;
 
     private final int storagePollPeriodSec = 1;
     private final Time time = Time.SYSTEM;
 
     public BrokerLocalStorage(Integer brokerId,
-                              String storageDirname,
+                              Set<String> storageDirnames,
                               Integer storageWaitTimeoutSec) {
         this.brokerId = brokerId;
-        this.brokerStorageDirectory = new File(storageDirname);
+        this.brokerStorageDirectories = storageDirnames.stream().map(File::new).collect(Collectors.toSet());
         this.storageWaitTimeoutSec = storageWaitTimeoutSec;
     }
 
@@ -56,6 +57,10 @@
         return brokerId;
     }
 
+    public Set<File> getBrokerStorageDirectories() {
+        return brokerStorageDirectories;
+    }
+
     /**
      * Wait until the first segment offset in Apache Kafka storage for the given topic-partition is
      * equal to the provided offset.
@@ -141,7 +146,12 @@
         if (offsetToSearch.equals(firstLogFileBaseOffset)) {
             return true;
         }
-        File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), topicPartition.toString());
+        File logDir = brokerStorageDirectories.stream()
+                .filter(dir -> dirContainsTopicPartition(topicPartition, dir))
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " +
+                "was not found", brokerId, topicPartition)));
+        File partitionDir = new File(logDir.getAbsolutePath(), topicPartition.toString());
         File firstSegmentFile = new File(partitionDir.getAbsolutePath(),
                 LogFileUtils.filenamePrefixFromOffset(firstLogFileBaseOffset) + LogFileUtils.LOG_FILE_SUFFIX);
         try (FileRecords fileRecords = FileRecords.open(firstSegmentFile, false)) {
@@ -157,13 +167,15 @@
     }
 
     public void eraseStorage() throws IOException {
-        for (File file : Objects.requireNonNull(brokerStorageDirectory.listFiles())) {
-            Utils.delete(file);
+        for (File brokerDir : brokerStorageDirectories) {
+            for (File file : Objects.requireNonNull(brokerDir.listFiles())) {
+                Utils.delete(file);
+            }
         }
     }
 
     private OffsetHolder getEarliestLocalOffset(TopicPartition topicPartition) {
-        List<String> partitionFiles = getTopicPartitionFiles(topicPartition);
+        List<String> partitionFiles = getTopicPartitionFileNames(topicPartition);
         Optional<String> firstLogFile = partitionFiles.stream()
                 .filter(filename -> filename.endsWith(LogFileUtils.LOG_FILE_SUFFIX))
                 .sorted()
@@ -175,8 +187,29 @@
         return new OffsetHolder(LogFileUtils.offsetFromFileName(firstLogFile.get()), partitionFiles);
     }
 
-    private List<String> getTopicPartitionFiles(TopicPartition topicPartition) {
-        File[] files = brokerStorageDirectory.listFiles((dir, name) -> name.equals(topicPartition.toString()));
+    public boolean dirContainsTopicPartition(TopicPartition topicPartition, File logDir) {
+        File[] files = getTopicPartitionFiles(topicPartition, Collections.singleton(logDir));
+        return files != null && files.length > 0;
+    }
+
+    private File[] getTopicPartitionFiles(TopicPartition topicPartition) {
+        return getTopicPartitionFiles(topicPartition, brokerStorageDirectories);
+    }
+
+    private File[] getTopicPartitionFiles(TopicPartition topicPartition, Set<File> logDirs) {
+        File[] files = null;
+        for (File brokerDir : logDirs) {
+            files = brokerDir.listFiles((dir, name) -> name.equals(topicPartition.toString()));
+            // currently, we only expect one topic partition in one log dir
+            if (files != null && files.length != 0) {
+                break;
+            }
+        }
+        return files;
+    }
+
+    private List<String> getTopicPartitionFileNames(TopicPartition topicPartition) {
+        File[] files = getTopicPartitionFiles(topicPartition);
         if (files == null || files.length == 0) {
             throw new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " +
                     "was not found", brokerId, topicPartition));
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
index 1c2aef8..13cfca2 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.tiered.storage.utils;
 
+import kafka.utils.TestUtils;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
@@ -55,7 +57,7 @@
 
     // Log cleanup interval is configured to be 500 ms. We need to wait at least that amount of time before
     // segments eligible for deletion gets physically removed.
-    public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+    public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 20;
     // The default value of log cleanup interval is 30 secs, and it increases the test execution time.
     private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
     private static final Integer RLM_TASK_INTERVAL_MS = 500;
@@ -149,6 +151,9 @@
         overridingProps.setProperty(storageConfigPrefix(testClassName, DELETE_ON_CLOSE_CONFIG), "false");
         // Set a small number of retry interval for retrying RemoteLogMetadataManager resources initialization to speed up the test
         overridingProps.setProperty(metadataConfigPrefix(testClassName, REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP), RLMM_INIT_RETRY_INTERVAL_MS.toString());
+        // Set 2 log dirs to make sure JBOD feature is working correctly
+        overridingProps.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, TestUtils.tempDir().getAbsolutePath() + "," + TestUtils.tempDir().getAbsolutePath());
+
         return overridingProps;
     }
 
diff --git a/storage/src/test/resources/log4j.properties b/storage/src/test/resources/log4j.properties
index 7ee388a..a9c7bb7 100644
--- a/storage/src/test/resources/log4j.properties
+++ b/storage/src/test/resources/log4j.properties
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=OFF, stdout
+log4j.rootLogger=WARN, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
@@ -25,4 +25,5 @@
 
 log4j.logger.org.apache.kafka.server.log.remote.storage=INFO
 log4j.logger.org.apache.kafka.server.log.remote.metadata.storage=INFO
+log4j.logger.org.apache.kafka.storage.internals.log=INFO
 log4j.logger.kafka.log.remote=INFO
diff --git a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
index 104f21e..b654741 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
@@ -32,6 +32,7 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.server.common.AdminCommandFailedException;
 import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -48,6 +49,7 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@Disabled
 @ExtendWith(value = ClusterTestExtensions.class)
 @ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 867afc3..49461e0 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -28,6 +28,7 @@
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -43,6 +44,7 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@Disabled
 @ExtendWith(value = ClusterTestExtensions.class)
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 231222e..d3f4e86 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -35,6 +35,7 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -52,6 +53,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
+@Disabled
 @ExtendWith(value = ClusterTestExtensions.class)
 @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
     @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index e2267e5..24b8fb1 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 import scala.collection.JavaConverters;
@@ -61,6 +62,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
+@Disabled
 @SuppressWarnings("deprecation")
 @ExtendWith(value = ClusterTestExtensions.class)
 @ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = {
diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
index aa858f4..de07062 100644
--- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -43,6 +44,7 @@
 
 import static java.util.Arrays.stream;
 
+@Disabled
 @ExtendWith(value = ClusterTestExtensions.class)
 @ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
index c15a467..e9eb3cf 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
@@ -38,6 +38,7 @@
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -53,6 +54,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
+@Disabled
 @Tag("integration")
 @ExtendWith(ClusterTestExtensions.class)
 public class DeleteOffsetsConsumerGroupCommandIntegrationTest {