KAFKA-16696 Removed the in-memory implementation of RSM and RLMM (#15911)
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
deleted file mode 100644
index 7cc4552..0000000
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteLogMetadataManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.storage;
-
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * This class is an implementation of {@link RemoteLogMetadataManager} backed by in-memory store.
- * This class is not completely thread safe.
- */
-public class InmemoryRemoteLogMetadataManager implements RemoteLogMetadataManager {
- private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteLogMetadataManager.class);
-
- private Map<TopicIdPartition, RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
- new ConcurrentHashMap<>();
-
- private Map<TopicIdPartition, RemoteLogMetadataCache> idToRemoteLogMetadataCache = new ConcurrentHashMap<>();
-
- private static final CompletableFuture<Void> COMPLETED_FUTURE = new CompletableFuture<>();
- static {
- COMPLETED_FUTURE.complete(null);
- }
-
- @Override
- public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
- log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata);
- Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-
- RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
-
- idToRemoteLogMetadataCache
- .computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> new RemoteLogMetadataCache())
- .addCopyInProgressSegment(remoteLogSegmentMetadata);
-
- return COMPLETED_FUTURE;
- }
-
- @Override
- public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate)
- throws RemoteStorageException {
- log.debug("Updating remote log segment: [{}]", metadataUpdate);
- Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be null");
-
- getRemoteLogMetadataCache(metadataUpdate.remoteLogSegmentId().topicIdPartition())
- .updateRemoteLogSegmentMetadata(metadataUpdate);
-
- return COMPLETED_FUTURE;
- }
-
- private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
- throws RemoteResourceNotFoundException {
- RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
- if (remoteLogMetadataCache == null) {
- throw new RemoteResourceNotFoundException("No existing metadata found for partition: " + topicIdPartition);
- }
-
- return remoteLogMetadataCache;
- }
-
- @Override
- public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
- int epochForOffset,
- long offset)
- throws RemoteStorageException {
- Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
-
- return getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset, offset);
- }
-
- @Override
- public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
- int leaderEpoch) throws RemoteStorageException {
- Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
-
- return getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
- }
-
- @Override
- public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) {
- log.debug("Adding delete state with: [{}]", remotePartitionDeleteMetadata);
- Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null");
-
- TopicIdPartition topicIdPartition = remotePartitionDeleteMetadata.topicIdPartition();
-
- RemotePartitionDeleteState targetState = remotePartitionDeleteMetadata.state();
- RemotePartitionDeleteMetadata existingMetadata = idToPartitionDeleteMetadata.get(topicIdPartition);
- RemotePartitionDeleteState existingState = existingMetadata != null ? existingMetadata.state() : null;
- if (!RemotePartitionDeleteState.isValidTransition(existingState, targetState)) {
- throw new IllegalStateException("Current state: " + existingState + ", target state: " + targetState);
- }
-
- idToPartitionDeleteMetadata.put(topicIdPartition, remotePartitionDeleteMetadata);
-
- if (targetState == RemotePartitionDeleteState.DELETE_PARTITION_FINISHED) {
- // Remove the association for the partition.
- idToRemoteLogMetadataCache.remove(topicIdPartition);
- idToPartitionDeleteMetadata.remove(topicIdPartition);
- }
-
- return COMPLETED_FUTURE;
- }
-
- @Override
- public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
- throws RemoteStorageException {
- Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
-
- return getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments();
- }
-
- @Override
- public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
- throws RemoteStorageException {
- Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
-
- return getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
- }
-
- @Override
- public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
- Set<TopicIdPartition> followerPartitions) {
- // It is not applicable for this implementation. This will track the segments that are added/updated as part of
- // this instance. It does not depend upon any leader or follower transitions.
- }
-
- @Override
- public void onStopPartitions(Set<TopicIdPartition> partitions) {
- // It is not applicable for this implementation. This will track the segments that are added/updated as part of
- // this instance. It does not depend upon stopped partitions.
- }
-
- @Override
- public long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException {
- long remoteLogSize = 0L;
- RemoteLogMetadataCache remoteLogMetadataCache = getRemoteLogMetadataCache(topicIdPartition);
- Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remoteLogMetadataCache.listAllRemoteLogSegments();
- while (remoteLogSegmentMetadataIterator.hasNext()) {
- RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
- remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
- }
- return remoteLogSize;
- }
-
- @Override
- public void close() throws IOException {
- // Clearing the references to the map and assigning empty immutable maps.
- // Practically, this instance will not be used once it is closed.
- idToPartitionDeleteMetadata = Collections.emptyMap();
- idToRemoteLogMetadataCache = Collections.emptyMap();
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
- // Intentionally left blank here as nothing to be initialized here.
- }
-}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
deleted file mode 100644
index 8650cea..0000000
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
-
-/**
- * This class is an implementation of {@link RemoteStorageManager} backed by in-memory store.
- */
-public class InmemoryRemoteStorageManager implements RemoteStorageManager {
- private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteStorageManager.class);
-
- // Map of key to log data, which can be segment or any of its indexes.
- private Map<String, byte[]> keyToLogData = new ConcurrentHashMap<>();
-
- static String generateKeyForSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
- return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + ".segment";
- }
-
- static String generateKeyForIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- IndexType indexType) {
- return remoteLogSegmentMetadata.remoteLogSegmentId().id().toString() + "." + indexType.toString();
- }
-
- // visible for testing.
- boolean containsKey(String key) {
- return keyToLogData.containsKey(key);
- }
-
- @Override
- public Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- LogSegmentData logSegmentData)
- throws RemoteStorageException {
- log.debug("copying log segment and indexes for : {}", remoteLogSegmentMetadata);
- Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
- Objects.requireNonNull(logSegmentData, "logSegmentData can not be null");
-
- if (keyToLogData.containsKey(generateKeyForSegment(remoteLogSegmentMetadata))) {
- throw new RemoteStorageException("It already contains the segment for the given id: " +
- remoteLogSegmentMetadata.remoteLogSegmentId());
- }
-
- try {
- keyToLogData.put(generateKeyForSegment(remoteLogSegmentMetadata),
- Files.readAllBytes(logSegmentData.logSegment()));
- if (logSegmentData.transactionIndex().isPresent()) {
- keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.TRANSACTION),
- Files.readAllBytes(logSegmentData.transactionIndex().get()));
- }
- keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.LEADER_EPOCH),
- logSegmentData.leaderEpochIndex().array());
- keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.PRODUCER_SNAPSHOT),
- Files.readAllBytes(logSegmentData.producerSnapshotIndex()));
- keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.OFFSET),
- Files.readAllBytes(logSegmentData.offsetIndex()));
- keyToLogData.put(generateKeyForIndex(remoteLogSegmentMetadata, IndexType.TIMESTAMP),
- Files.readAllBytes(logSegmentData.timeIndex()));
- } catch (Exception e) {
- throw new RemoteStorageException(e);
- }
- log.debug("copied log segment and indexes for : {} successfully.", remoteLogSegmentMetadata);
- return Optional.empty();
- }
-
- @Override
- public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- int startPosition)
- throws RemoteStorageException {
- log.debug("Received fetch segment request at start position: [{}] for [{}]", startPosition, remoteLogSegmentMetadata);
- Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-
- return fetchLogSegment(remoteLogSegmentMetadata, startPosition, Integer.MAX_VALUE);
- }
-
- @Override
- public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- int startPosition,
- int endPosition) throws RemoteStorageException {
- log.debug("Received fetch segment request at start position: [{}] and end position: [{}] for segment [{}]",
- startPosition, endPosition, remoteLogSegmentMetadata);
-
- Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
-
- if (startPosition < 0 || endPosition < 0) {
- throw new IllegalArgumentException("Given start position or end position must not be negative.");
- }
-
- if (endPosition < startPosition) {
- throw new IllegalArgumentException("end position must be greater than or equal to start position");
- }
-
- String key = generateKeyForSegment(remoteLogSegmentMetadata);
- byte[] segment = keyToLogData.get(key);
-
- if (segment == null) {
- throw new RemoteResourceNotFoundException("No remote log segment found with start offset:"
- + remoteLogSegmentMetadata.startOffset() + " and id: "
- + remoteLogSegmentMetadata.remoteLogSegmentId());
- }
-
- if (startPosition >= segment.length) {
- throw new IllegalArgumentException("start position: " + startPosition
- + " must be less than the length of the segment: " + segment.length);
- }
-
- // If the given (endPosition + 1) is more than the segment length then the segment length is taken into account.
- // Computed length should never be more than the existing segment size.
- int length = Math.min(segment.length - 1, endPosition) - startPosition + 1;
- log.debug("Length of the segment to be sent: [{}], for segment: [{}]", length, remoteLogSegmentMetadata);
-
- return new ByteArrayInputStream(segment, startPosition, length);
- }
-
- @Override
- public InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- IndexType indexType) throws RemoteStorageException {
- log.debug("Received fetch request for index type: [{}], segment [{}]", indexType, remoteLogSegmentMetadata);
- Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
- Objects.requireNonNull(indexType, "indexType can not be null");
-
- String key = generateKeyForIndex(remoteLogSegmentMetadata, indexType);
- byte[] index = keyToLogData.get(key);
- if (index == null) {
- throw new RemoteResourceNotFoundException("No remote log segment index found with start offset:"
- + remoteLogSegmentMetadata.startOffset() + " and id: "
- + remoteLogSegmentMetadata.remoteLogSegmentId());
- }
-
- return new ByteArrayInputStream(index);
- }
-
- @Override
- public void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
- log.info("Deleting log segment for: [{}]", remoteLogSegmentMetadata);
- Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
- String segmentKey = generateKeyForSegment(remoteLogSegmentMetadata);
- keyToLogData.remove(segmentKey);
- for (IndexType indexType : IndexType.values()) {
- String key = generateKeyForIndex(remoteLogSegmentMetadata, indexType);
- keyToLogData.remove(key);
- }
- log.info("Deleted log segment successfully for: [{}]", remoteLogSegmentMetadata);
- }
-
- @Override
- public void close() throws IOException {
- // Clearing the references to the map and assigning empty immutable map.
- // Practically, this instance will not be used once it is closed.
- keyToLogData = Collections.emptyMap();
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
- // Intentionally left blank here as nothing to be initialized here.
- }
-}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java
deleted file mode 100644
index 44984f7..0000000
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManagerTest.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.log.remote.storage;
-
-import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class InmemoryRemoteStorageManagerTest {
- private static final Logger log = LoggerFactory.getLogger(InmemoryRemoteStorageManagerTest.class);
-
- private static final TopicPartition TP = new TopicPartition("foo", 1);
- private static final File DIR = TestUtils.tempDirectory("inmem-rsm-");
- private static final Random RANDOM = new Random();
-
- @Test
- public void testCopyLogSegment() throws Exception {
- InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
- RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata();
- LogSegmentData logSegmentData = createLogSegmentData();
- // Copy all the segment data.
- rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
-
- // Check that the segment data exists in in-memory RSM.
- boolean containsSegment = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForSegment(segmentMetadata));
- assertTrue(containsSegment);
-
- // Check that the indexes exist in in-memory RSM.
- for (RemoteStorageManager.IndexType indexType : RemoteStorageManager.IndexType.values()) {
- boolean containsIndex = rsm.containsKey(InmemoryRemoteStorageManager.generateKeyForIndex(segmentMetadata, indexType));
- assertTrue(containsIndex);
- }
- }
-
- private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
- TopicIdPartition topicPartition = new TopicIdPartition(Uuid.randomUuid(), TP);
- RemoteLogSegmentId id = new RemoteLogSegmentId(topicPartition, Uuid.randomUuid());
- return new RemoteLogSegmentMetadata(id, 100L, 200L, System.currentTimeMillis(), 0,
- System.currentTimeMillis(), 100, Collections.singletonMap(1, 100L));
- }
-
- @Test
- public void testFetchLogSegmentIndexes() throws Exception {
- InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
- RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata();
- int segSize = 100;
- LogSegmentData logSegmentData = createLogSegmentData(segSize);
-
- // Copy the segment
- rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
-
- // Check segment data exists for the copied segment.
- try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 0)) {
- checkContentSame(segmentStream, logSegmentData.logSegment());
- }
-
- HashMap<RemoteStorageManager.IndexType, Path> expectedIndexToPaths = new HashMap<>();
- expectedIndexToPaths.put(RemoteStorageManager.IndexType.OFFSET, logSegmentData.offsetIndex());
- expectedIndexToPaths.put(RemoteStorageManager.IndexType.TIMESTAMP, logSegmentData.timeIndex());
- expectedIndexToPaths.put(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, logSegmentData.producerSnapshotIndex());
-
- logSegmentData.transactionIndex().ifPresent(txnIndex -> expectedIndexToPaths.put(RemoteStorageManager.IndexType.TRANSACTION, txnIndex));
-
- // Check all segment indexes exist for the copied segment.
- for (Map.Entry<RemoteStorageManager.IndexType, Path> entry : expectedIndexToPaths.entrySet()) {
- RemoteStorageManager.IndexType indexType = entry.getKey();
- Path indexPath = entry.getValue();
- log.debug("Fetching index type: {}, indexPath: {}", indexType, indexPath);
-
- try (InputStream offsetIndexStream = rsm.fetchIndex(segmentMetadata, indexType)) {
- checkContentSame(offsetIndexStream, indexPath);
- }
- }
-
- try (InputStream leaderEpochIndexStream = rsm.fetchIndex(segmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH)) {
- ByteBuffer leaderEpochIndex = logSegmentData.leaderEpochIndex();
- assertEquals(leaderEpochIndex,
- readAsByteBuffer(leaderEpochIndexStream, leaderEpochIndex.array().length));
- }
- }
-
- @Test
- public void testFetchSegmentsForRange() throws Exception {
- InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
- RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata();
- int segSize = 100;
- LogSegmentData logSegmentData = createLogSegmentData(segSize);
- Path path = logSegmentData.logSegment();
-
- // Copy the segment
- rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
-
- // 1. Fetch segment for startPos at 0
- doTestFetchForRange(rsm, segmentMetadata, path, 0, 40);
-
- // 2. Fetch segment for start and end positions as start and end of the segment.
- doTestFetchForRange(rsm, segmentMetadata, path, 0, segSize);
-
- // 3. Fetch segment for endPos at the end of segment.
- doTestFetchForRange(rsm, segmentMetadata, path, 90, segSize - 90);
-
- // 4. Fetch segment only for the start position.
- doTestFetchForRange(rsm, segmentMetadata, path, 0, 1);
-
- // 5. Fetch segment only for the end position.
- doTestFetchForRange(rsm, segmentMetadata, path, segSize - 1, 1);
-
- // 6. Fetch for any range other than boundaries.
- doTestFetchForRange(rsm, segmentMetadata, path, 3, 90);
- }
-
- private void doTestFetchForRange(InmemoryRemoteStorageManager rsm, RemoteLogSegmentMetadata rlsm, Path path,
- int startPos, int len) throws Exception {
- // Read from the segment for the expected range.
- ByteBuffer expectedSegRangeBytes = ByteBuffer.allocate(len);
- try (SeekableByteChannel seekableByteChannel = Files.newByteChannel(path)) {
- seekableByteChannel.position(startPos).read(expectedSegRangeBytes);
- }
- expectedSegRangeBytes.rewind();
-
- // Fetch from in-memory RSM for the same range
- ByteBuffer fetchedSegRangeBytes = ByteBuffer.allocate(len);
- try (InputStream segmentRangeStream = rsm.fetchLogSegment(rlsm, startPos, startPos + len - 1)) {
- Utils.readFully(segmentRangeStream, fetchedSegRangeBytes);
- }
- fetchedSegRangeBytes.rewind();
- assertEquals(expectedSegRangeBytes, fetchedSegRangeBytes);
- }
-
- @Test
- public void testFetchInvalidRange() throws Exception {
- InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
- RemoteLogSegmentMetadata remoteLogSegmentMetadata = createRemoteLogSegmentMetadata();
- int segSize = 100;
- LogSegmentData logSegmentData = createLogSegmentData(segSize);
-
- // Copy the segment
- rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData);
-
- // Check fetch segments with invalid ranges like startPos < endPos
- assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, 2, 1));
-
- // Check fetch segments with invalid ranges like startPos or endPos as negative.
- assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, -1, 0));
- assertThrows(Exception.class, () -> rsm.fetchLogSegment(remoteLogSegmentMetadata, -2, -1));
- }
-
- @Test
- public void testDeleteSegment() throws Exception {
- InmemoryRemoteStorageManager rsm = new InmemoryRemoteStorageManager();
- RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata();
- LogSegmentData logSegmentData = createLogSegmentData();
-
- // Copy a log segment.
- rsm.copyLogSegmentData(segmentMetadata, logSegmentData);
-
- // Check that the copied segment exists in rsm and it is same.
- try (InputStream segmentStream = rsm.fetchLogSegment(segmentMetadata, 0)) {
- checkContentSame(segmentStream, logSegmentData.logSegment());
- }
-
- // Delete segment and check that it does not exist in RSM.
- rsm.deleteLogSegmentData(segmentMetadata);
-
- // Check that the segment data does not exist.
- assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchLogSegment(segmentMetadata, 0));
-
- // Check that the segment data does not exist for range.
- assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchLogSegment(segmentMetadata, 0, 1));
-
- // Check that all the indexes are not found.
- for (RemoteStorageManager.IndexType indexType : RemoteStorageManager.IndexType.values()) {
- assertThrows(RemoteResourceNotFoundException.class, () -> rsm.fetchIndex(segmentMetadata, indexType));
- }
- }
-
- private void checkContentSame(InputStream segmentStream, Path path) throws IOException {
- byte[] segmentBytes = Files.readAllBytes(path);
- ByteBuffer byteBuffer = readAsByteBuffer(segmentStream, segmentBytes.length);
- assertEquals(ByteBuffer.wrap(segmentBytes), byteBuffer);
- }
-
- private ByteBuffer readAsByteBuffer(InputStream segmentStream,
- int len) throws IOException {
- ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[len]);
- Utils.readFully(segmentStream, byteBuffer);
- byteBuffer.rewind();
- return byteBuffer;
- }
-
- private LogSegmentData createLogSegmentData() throws Exception {
- return createLogSegmentData(100);
- }
-
- private LogSegmentData createLogSegmentData(int segSize) throws Exception {
- int prefix = Math.abs(RANDOM.nextInt());
- Path segment = new File(DIR, prefix + ".seg").toPath();
- Files.write(segment, TestUtils.randomBytes(segSize));
-
- Path offsetIndex = new File(DIR, prefix + ".oi").toPath();
- Files.write(offsetIndex, TestUtils.randomBytes(10));
-
- Path timeIndex = new File(DIR, prefix + ".ti").toPath();
- Files.write(timeIndex, TestUtils.randomBytes(10));
-
- Path txnIndex = new File(DIR, prefix + ".txni").toPath();
- Files.write(txnIndex, TestUtils.randomBytes(10));
-
- Path producerSnapshotIndex = new File(DIR, prefix + ".psi").toPath();
- Files.write(producerSnapshotIndex, TestUtils.randomBytes(10));
-
- ByteBuffer leaderEpochIndex = ByteBuffer.wrap(TestUtils.randomBytes(10));
- return new LogSegmentData(segment, offsetIndex, timeIndex, Optional.of(txnIndex), producerSnapshotIndex, leaderEpochIndex);
- }
-}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 528843a..5a71a6a 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -24,20 +24,16 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerWrapperWithHarness;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
- * This class covers basic tests for {@link RemoteLogMetadataManager} implementations like {@link InmemoryRemoteLogMetadataManager},
- * and {@link org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager}.
+ * This class covers basic tests for {@link RemoteLogMetadataManager} implementations like
+ * {@link TopicBasedRemoteLogMetadataManagerWrapperWithHarness}
*/
public class RemoteLogMetadataManagerTest {
@@ -49,9 +45,10 @@
private final Time time = new MockTime(1);
- @ParameterizedTest(name = "remoteLogMetadataManager = {0}")
- @MethodSource("remoteLogMetadataManagers")
- public void testFetchSegments(RemoteLogMetadataManager remoteLogMetadataManager) throws Exception {
+ private RemoteLogMetadataManager remoteLogMetadataManager = new TopicBasedRemoteLogMetadataManagerWrapperWithHarness();
+
+ @Test
+ public void testFetchSegments() throws Exception {
try {
remoteLogMetadataManager.configure(Collections.emptyMap());
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
@@ -84,9 +81,8 @@
}
}
- @ParameterizedTest(name = "remoteLogMetadataManager = {0}")
- @MethodSource("remoteLogMetadataManagers")
- public void testRemotePartitionDeletion(RemoteLogMetadataManager remoteLogMetadataManager) throws Exception {
+ @Test
+ public void testRemotePartitionDeletion() throws Exception {
try {
remoteLogMetadataManager.configure(Collections.emptyMap());
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
@@ -151,8 +147,4 @@
private RemotePartitionDeleteMetadata createRemotePartitionDeleteMetadata(RemotePartitionDeleteState state) {
return new RemotePartitionDeleteMetadata(TP0, state, time.milliseconds(), BROKER_ID_0);
}
-
- private static Collection<Arguments> remoteLogMetadataManagers() {
- return Arrays.asList(Arguments.of(new InmemoryRemoteLogMetadataManager()), Arguments.of(new TopicBasedRemoteLogMetadataManagerWrapperWithHarness()));
- }
}
\ No newline at end of file