KAFKA-16696 Removed the in-memory implementation of RSM and RLMM (#15911)

Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
deleted file mode 100644
index 7cc4552..0000000
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.storage;
-
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * This class is an implementation of {@link RemoteLogMetadataManager} backed by in-memory store.
- * This class is not completely thread safe.
- */
-public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManager {
-    private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
-
-    private Map<TopicIdPartition, RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
-            new ConcurrentHashMap<>();
-
-    private Map<TopicIdPartition, RemoteLogMetadataCache> idToRemoteLogMetadataCache = new ConcurrentHashMap<>();
-
-    private static final CompletableFuture<Void> COMPLETED_FUTURE = new CompletableFuture<>();
-    static {
-        COMPLETED_FUTURE.complete(null);
-    }
-
-    @Override
-    public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
-        log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata);
-        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-
-        RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
-
-        idToRemoteLogMetadataCache
-                .computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> new RemoteLogMetadataCache())
-                .addCopyInProgressSegment(remoteLogSegmentMetadata);
-
-        return COMPLETED_FUTURE;
-    }
-
-    @Override
-    public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate)
-            throws RemoteStorageException {
-        log.debug("Updating remote log segment: [{}]", metadataUpdate);
-        Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be null");
-
-        getRemoteLogMetadataCache(metadataUpdate.remoteLogSegmentId().topicIdPartition())
-                .updateRemoteLogSegmentMetadata(metadataUpdate);
-
-        return COMPLETED_FUTURE;
-    }
-
-    private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
-            throws RemoteResourceNotFoundException {
-        RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
-        if (remoteLogMetadataCache == null) {
-            throw new RemoteResourceNotFoundException("No existing metadata found for partition: " + topicIdPartition);
-        }
-
-        return remoteLogMetadataCache;
-    }
-
-    @Override
-    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
-                                                                       int epochForOffset,
-                                                                       long offset)
-            throws RemoteStorageException {
-        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
-
-        return getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset, offset);
-    }
-
-    @Override
-    public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
-                                                int leaderEpoch) throws RemoteStorageException {
-        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
-
-        return getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
-    }
-
-    @Override
-    public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) {
-        log.debug("Adding delete state with: [{}]", remotePartitionDeleteMetadata);
-        Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null");
-
-        TopicIdPartition topicIdPartition = remotePartitionDeleteMetadata.topicIdPartition();
-
-        RemotePartitionDeleteState targetState = remotePartitionDeleteMetadata.state();
-        RemotePartitionDeleteMetadata existingMetadata = idToPartitionDeleteMetadata.get(topicIdPartition);
-        RemotePartitionDeleteState existingState = existingMetadata != null ? existingMetadata.state() : null;
-        if (!RemotePartitionDeleteState.isValidTransition(existingState, targetState)) {
-            throw new IllegalStateException("Current state: " + existingState + ", target state: " + targetState);
-        }
-
-        idToPartitionDeleteMetadata.put(topicIdPartition, remotePartitionDeleteMetadata);
-
-        if (targetState == RemotePartitionDeleteState.DELETE_PARTITION_FINISHED) {
-            // Remove the association for the partition.
-            idToRemoteLogMetadataCache.remove(topicIdPartition);
-            idToPartitionDeleteMetadata.remove(topicIdPartition);
-        }
-
-        return COMPLETED_FUTURE;
-    }
-
-    @Override
-    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
-            throws RemoteStorageException {
-        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
-
-        return getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments();
-    }
-
-    @Override
-    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
-            throws RemoteStorageException {
-        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
-
-        return getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
-    }
-
-    @Override
-    public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
-                                             Set<TopicIdPartition> followerPartitions) {
-        // It is not applicable for this implementation. This will track the segments that are added/updated as part of
-        // this instance. It does not depend upon any leader or follower transitions.
-    }
-
-    @Override
-    public void onStopPartitions(Set<TopicIdPartition> partitions) {
-        // It is not applicable for this implementation. This will track the segments that are added/updated as part of
-        // this instance. It does not depend upon stopped partitions.
-    }
-
-    @Override
-    public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
-        long remoteLogSize = 0L;
-        RemoteLogMetadataCache remoteLogMetadataCache = getRemoteLogMetadataCache(topicIdPartition);
-        Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remoteLogMetadataCache.listAllRemoteLogSegments();
-        while (remoteLogSegmentMetadataIterator.hasNext()) {
-            RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
-            remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
-        }
-        return remoteLogSize;
-    }
-
-    @Override
-    public void close() throws IOException {
-        // Clearing the references to the map and assigning empty immutable maps.
-        // Practically, this instance will not be used once it is closed.
-        idToPartitionDeleteMetadata = Collections.emptyMap();
-        idToRemoteLogMetadataCache = Collections.emptyMap();
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-        // Intentionally left blank here as nothing to be initialized here.
-    }
-}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
deleted file mode 100644
index 8650cea..0000000
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
-
-/**
- * This class is an implementation of {@link RemoteStorageManager} backed by in-memory store.
- */
-public class InmemoryRemoteStorageManager implements RemoteStorageManager {
-    private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
-
-    // Map of key to log data, which can be segment or any of its indexes.
-    private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
-
-    static String generateKeyForSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
-        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + ".segment";
-    }
-
-    static String generateKeyForIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                                      IndexType indexType) {
-        return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + "." + indexType.toString();
-    }
-
-    // visible for testing.
-    boolean containsKey(String key) {
-        return keyToLogData.containsKey(key);
-    }
-
-    @Override
-    public Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                                                       LogSegmentData logSegmentData)
-            throws RemoteStorageException {
-        log.debug("copying log segment and indexes for : {}", remoteLogSegmentMetadata);
-        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-        Objects.requireNonNull(logSegmentData, "logSegmentData can not be null");
-
-        if (keyToLogData.containsKey(generateKeyForSegment(remoteLogSegmentMetadata))) {
-            throw new RemoteStorageException("It already contains the segment for the given id: " +
-                                             remoteLogSegmentMetadata.remoteLogSegmentId());
-        }
-
-        try {
-            keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
-                    Files.readAllBytes(logSegmentData.logSegment()));
-            if (logSegmentData.transactionIndex().isPresent()) {
-                keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.TRANSACTION),
-                    Files.readAllBytes(logSegmentData.transactionIndex().get()));
-            }
-            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.LEADER_EPOCH),
-                    logSegmentData.leaderEpochIndex().array());
-            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.PRODUCER_SNAPSHOT),
-                    Files.readAllBytes(logSegmentData.producerSnapshotIndex()));
-            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.OFFSET),
-                    Files.readAllBytes(logSegmentData.offsetIndex()));
-            keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.TIMESTAMP),
-                    Files.readAllBytes(logSegmentData.timeIndex()));
-        } catch (Exception e) {
-            throw new RemoteStorageException(e);
-        }
-        log.debug("copied log segment and indexes for : {} successfully.", remoteLogSegmentMetadata);
-        return Optional.empty();
-    }
-
-    @Override
-    public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                                       int startPosition)
-            throws RemoteStorageException {
-        log.debug("Received fetch segment request at start position: [{}] for [{}]", startPosition, remoteLogSegmentMetadata);
-        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-
-        return fetchLogSegment(remoteLogSegmentMetadata, startPosition, Integer.MAX_VALUE);
-    }
-
-    @Override
-    public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                                       int startPosition,
-                                       int endPosition) throws RemoteStorageException {
-        log.debug("Received fetch segment request at start position: [{}] and end position: [{}] for segment [{}]",
-                startPosition, endPosition, remoteLogSegmentMetadata);
-
-        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-
-        if (startPosition < 0 || endPosition < 0) {
-            throw new IllegalArgumentException("Given start position or end position must not be negative.");
-        }
-
-        if (endPosition < startPosition) {
-            throw new IllegalArgumentException("end position must be greater than or equal to start position");
-        }
-
-        String key = generateKeyForSegment(remoteLogSegmentMetadata);
-        byte[] segment = keyToLogData.get(key);
-
-        if (segment == null) {
-            throw new RemoteResourceNotFoundException("No remote log segment found with start offset:"
-                                                      + remoteLogSegmentMetadata.startOffset() + " and id: "
-                                                      + remoteLogSegmentMetadata.remoteLogSegmentId());
-        }
-
-        if (startPosition >= segment.length) {
-            throw new IllegalArgumentException("start position: " + startPosition
-                                               + " must be less than the length of the segment: " + segment.length);
-        }
-
-        // If the given (endPosition + 1) is more than the segment length then the segment length is taken into account.
-        // Computed length should never be more than the existing segment size.
-        int length = Math.min(segment.length - 1, endPosition) - startPosition + 1;
-        log.debug("Length of the segment to be sent: [{}], for segment: [{}]", length, remoteLogSegmentMetadata);
-
-        return new ByteArrayInputStream(segment, startPosition, length);
-    }
-
-    @Override
-    public InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                                  IndexType indexType) throws RemoteStorageException {
-        log.debug("Received fetch request for index type: [{}], segment [{}]", indexType, remoteLogSegmentMetadata);
-        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-        Objects.requireNonNull(indexType, "indexType can not be null");
-
-        String key = generateKeyForIndex(remoteLogSegmentMetadata, indexType);
-        byte[] index = keyToLogData.get(key);
-        if (index == null) {
-            throw new RemoteResourceNotFoundException("No remote log segment index found with start offset:"
-                                                      + remoteLogSegmentMetadata.startOffset() + " and id: "
-                                                      + remoteLogSegmentMetadata.remoteLogSegmentId());
-        }
-
-        return new ByteArrayInputStream(index);
-    }
-
-    @Override
-    public void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
-        log.info("Deleting log segment for: [{}]", remoteLogSegmentMetadata);
-        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-        String segmentKey = generateKeyForSegment(remoteLogSegmentMetadata);
-        keyToLogData.remove(segmentKey);
-        for (IndexType indexType : IndexType.values()) {
-            String key = generateKeyForIndex(remoteLogSegmentMetadata, indexType);
-            keyToLogData.remove(key);
-        }
-        log.info("Deleted log segment successfully for: [{}]", remoteLogSegmentMetadata);
-    }
-
-    @Override
-    public void close() throws IOException {
-        // Clearing the references to the map and assigning empty immutable map.
-        // Practically, this instance will not be used once it is closed.
-        keyToLogData = Collections.emptyMap();
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-        // Intentionally left blank here as nothing to be initialized here.
-    }
-}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java
deleted file mode 100644
index 44984f7..0000000
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.storage;
-
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class InmemoryRemoteStorageManagerTest {
-    private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteStorageManagerTest.class);
-
-    private static final TopicPartition TP = new TopicPartition("foo", 1);
-    private static final File DIR = TestUtils.tempDirectory("inmem-rsm-");
-    private static final Random RANDOM = new Random();
-
-    @Test
-    public void testCopyLogSegment() throws Exception {
-        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
-        RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata();
-        LogSegmentData logSegmentData = createLogSegmentData();
-        // Copy all the segment data.
-        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
-
-        // Check that the segment data exists in in-memory RSM.
-        boolean containsSegment = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForSegment(segmentMetadata));
-        assertTrue(containsSegment);
-
-        // Check that the indexes exist in in-memory RSM.
-        for (RemoteStorageManager.IndexType indexType : RemoteStorageManager.IndexType.values()) {
-            boolean containsIndex = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForIndex(segmentMetadata, indexType));
-            assertTrue(containsIndex);
-        }
-    }
-
-    private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
-        TopicIdPartition topicPartition = new TopicIdPartition(Uuid.randomUuid(), TP);
-        RemoteLogSegmentId id = new RemoteLogSegmentId(topicPartition, Uuid.randomUuid());
-        return new RemoteLogSegmentMetadata(id, 100L, 200L, System.currentTimeMillis(), 0,
-                System.currentTimeMillis(), 100, Collections.singletonMap(1, 100L));
-    }
-
-    @Test
-    public void testFetchLogSegmentIndexes() throws Exception {
-        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
-        RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata();
-        int segSize = 100;
-        LogSegmentData logSegmentData = createLogSegmentData(segSize);
-
-        // Copy the segment
-        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
-
-        // Check segment data exists for the copied segment.
-        try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 0)) {
-            checkContentSame(segmentStream, logSegmentData.logSegment());
-        }
-
-        HashMap<RemoteStorageManager.IndexType, Path> expectedIndexToPaths = new HashMap<>();
-        expectedIndexToPaths.put(RemoteStorageManager.IndexType.OFFSET, logSegmentData.offsetIndex());
-        expectedIndexToPaths.put(RemoteStorageManager.IndexType.TIMESTAMP, logSegmentData.timeIndex());
-        expectedIndexToPaths.put(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, logSegmentData.producerSnapshotIndex());
-
-        logSegmentData.transactionIndex().ifPresent(txnIndex -> expectedIndexToPaths.put(RemoteStorageManager.IndexType.TRANSACTION, txnIndex));
-
-        // Check all segment indexes exist for the copied segment.
-        for (Map.Entry<RemoteStorageManager.IndexType, Path> entry : expectedIndexToPaths.entrySet()) {
-            RemoteStorageManager.IndexType indexType = entry.getKey();
-            Path indexPath = entry.getValue();
-            log.debug("Fetching index type: {}, indexPath: {}", indexType, indexPath);
-
-            try (InputStream offsetIndexStream = rsm.fetchIndex(segmentMetadata, indexType)) {
-                checkContentSame(offsetIndexStream, indexPath);
-            }
-        }
-
-        try (InputStream leaderEpochIndexStream = rsm.fetchIndex(segmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH)) {
-            ByteBuffer leaderEpochIndex = logSegmentData.leaderEpochIndex();
-            assertEquals(leaderEpochIndex,
-                    readAsByteBuffer(leaderEpochIndexStream, leaderEpochIndex.array().length));
-        }
-    }
-
-    @Test
-    public void testFetchSegmentsForRange() throws Exception {
-        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
-        RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata();
-        int segSize = 100;
-        LogSegmentData logSegmentData = createLogSegmentData(segSize);
-        Path path = logSegmentData.logSegment();
-
-        // Copy the segment
-        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
-
-        // 1. Fetch segment for startPos at 0
-        doTestFetchForRange(rsm, segmentMetadata, path, 0, 40);
-
-        // 2. Fetch segment for start and end positions as start and end of the segment.
-        doTestFetchForRange(rsm, segmentMetadata, path, 0, segSize);
-
-        // 3. Fetch segment for endPos at the end of segment.
-        doTestFetchForRange(rsm, segmentMetadata, path, 90, segSize - 90);
-
-        // 4. Fetch segment only for the start position.
-        doTestFetchForRange(rsm, segmentMetadata, path, 0, 1);
-
-        // 5. Fetch segment only for the end position.
-        doTestFetchForRange(rsm, segmentMetadata, path, segSize - 1, 1);
-
-        // 6. Fetch for any range other than boundaries.
-        doTestFetchForRange(rsm, segmentMetadata, path, 3, 90);
-    }
-
-    private void doTestFetchForRange(InmemoryRemoteStorageManager rsm, RemoteLogSegmentMetadata rlsm, Path path,
-                                     int startPos, int len) throws Exception {
-        // Read from the segment for the expected range.
-        ByteBuffer expectedSegRangeBytes = ByteBuffer.allocate(len);
-        try (SeekableByteChannel seekableByteChannel = Files.newByteChannel(path)) {
-            seekableByteChannel.position(startPos).read(expectedSegRangeBytes);
-        }
-        expectedSegRangeBytes.rewind();
-
-        // Fetch from in-memory RSM for the same range
-        ByteBuffer fetchedSegRangeBytes = ByteBuffer.allocate(len);
-        try (InputStream segmentRangeStream = rsm.fetchLogSegment(rlsm, startPos, startPos + len - 1)) {
-            Utils.readFully(segmentRangeStream, fetchedSegRangeBytes);
-        }
-        fetchedSegRangeBytes.rewind();
-        assertEquals(expectedSegRangeBytes, fetchedSegRangeBytes);
-    }
-
-    @Test
-    public void testFetchInvalidRange() throws Exception {
-        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
-        RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata();
-        int segSize = 100;
-        LogSegmentData logSegmentData = createLogSegmentData(segSize);
-
-        // Copy the segment
-        rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData);
-
-        // Check fetch segments with invalid ranges like startPos < endPos
-        assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, 2, 1));
-
-        // Check fetch segments with invalid ranges like startPos or endPos as negative.
-        assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, -1, 0));
-        assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, -2, -1));
-    }
-
-    @Test
-    public void testDeleteSegment() throws Exception {
-        InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
-        RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata();
-        LogSegmentData logSegmentData = createLogSegmentData();
-
-        // Copy a log segment.
-        rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
-
-        // Check that the copied segment exists in rsm and it is same.
-        try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 0)) {
-            checkContentSame(segmentStream, logSegmentData.logSegment());
-        }
-
-        // Delete segment and check that it does not exist in RSM.
-        rsm.deleteLogSegmentData(segmentMetadata);
-
-        // Check that the segment data does not exist.
-        assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchLogSegment(segmentMetadata, 0));
-
-        // Check that the segment data does not exist for range.
-        assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchLogSegment(segmentMetadata, 0, 1));
-
-        // Check that all the indexes are not found.
-        for (RemoteStorageManager.IndexType indexType : RemoteStorageManager.IndexType.values()) {
-            assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchIndex(segmentMetadata, indexType));
-        }
-    }
-
-    private void checkContentSame(InputStream segmentStream, Path path) throws IOException {
-        byte[] segmentBytes = Files.readAllBytes(path);
-        ByteBuffer byteBuffer = readAsByteBuffer(segmentStream, segmentBytes.length);
-        assertEquals(ByteBuffer.wrap(segmentBytes), byteBuffer);
-    }
-
-    private ByteBuffer readAsByteBuffer(InputStream segmentStream,
-                                        int len) throws IOException {
-        ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[len]);
-        Utils.readFully(segmentStream, byteBuffer);
-        byteBuffer.rewind();
-        return byteBuffer;
-    }
-
-    private LogSegmentData createLogSegmentData() throws Exception {
-        return createLogSegmentData(100);
-    }
-
-    private LogSegmentData createLogSegmentData(int segSize) throws Exception {
-        int prefix = Math.abs(RANDOM.nextInt());
-        Path segment = new File(DIR, prefix + ".seg").toPath();
-        Files.write(segment, TestUtils.randomBytes(segSize));
-
-        Path offsetIndex = new File(DIR, prefix + ".oi").toPath();
-        Files.write(offsetIndex, TestUtils.randomBytes(10));
-
-        Path timeIndex = new File(DIR, prefix + ".ti").toPath();
-        Files.write(timeIndex, TestUtils.randomBytes(10));
-
-        Path txnIndex = new File(DIR, prefix + ".txni").toPath();
-        Files.write(txnIndex, TestUtils.randomBytes(10));
-
-        Path producerSnapshotIndex = new File(DIR, prefix + ".psi").toPath();
-        Files.write(producerSnapshotIndex, TestUtils.randomBytes(10));
-
-        ByteBuffer leaderEpochIndex = ByteBuffer.wrap(TestUtils.randomBytes(10));
-        return new LogSegmentData(segment, offsetIndex, timeIndex, Optional.of(txnIndex), producerSnapshotIndex, leaderEpochIndex);
-    }
-}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 528843a..5a71a6a 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -24,20 +24,16 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerWrapperWithHarness;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 /**
- * This class covers basic tests for {@link RemoteLogMetadataManager} implementations like {@link InmemoryRemoteLogMetadataManager},
- * and {@link org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager}.
+ * This class covers basic tests for {@link RemoteLogMetadataManager} implementations like
+ * {@link TopicBasedRemoteLogMetadataManagerWrapperWithHarness}
  */
 public class RemoteLogMetadataManagerTest {
 
@@ -49,9 +45,10 @@
 
     private final Time time = new MockTime(1);
 
-    @ParameterizedTest(name = "remoteLogMetadataManager = {0}")
-    @MethodSource("remoteLogMetadataManagers")
-    public void testFetchSegments(RemoteLogMetadataManager remoteLogMetadataManager) throws Exception {
+    private RemoteLogMetadataManager remoteLogMetadataManager = new TopicBasedRemoteLogMetadataManagerWrapperWithHarness();
+
+    @Test
+    public void testFetchSegments() throws Exception {
         try {
             remoteLogMetadataManager.configure(Collections.emptyMap());
             remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
@@ -84,9 +81,8 @@
         }
     }
 
-    @ParameterizedTest(name = "remoteLogMetadataManager = {0}")
-    @MethodSource("remoteLogMetadataManagers")
-    public void testRemotePartitionDeletion(RemoteLogMetadataManager remoteLogMetadataManager) throws Exception {
+    @Test
+    public void testRemotePartitionDeletion() throws Exception {
         try {
             remoteLogMetadataManager.configure(Collections.emptyMap());
             remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
@@ -151,8 +147,4 @@
     private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata(RemotePartitionDeleteState state) {
         return new RemotePartitionDeleteMetadata(TP0, state, time.milliseconds(), BROKER_ID_0);
     }
-
-    private static Collection<Arguments> remoteLogMetadataManagers() {
-        return Arrays.asList(Arguments.of(new InmemoryRemoteLogMetadataManager()), Arguments.of(new TopicBasedRemoteLogMetadataManagerWrapperWithHarness()));
-    }
 }
\ No newline at end of file