KAFKA-15107: Support custom metadata for remote log segment (#13984)
* KAFKA-15107: Support custom metadata for remote log segment
This commit does the changes discussed in the KIP-917. Mainly, changes the `RemoteStorageManager` interface in order to return `CustomMetadata` and then ensures these custom metadata are stored, propagated, (de-)serialized correctly along with the standard metadata throughout the whole lifecycle. It introduces the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata size acceptable by the broker and stop uploading in case a piece of metadata exceeds this limit.
On testing:
1. `RemoteLogManagerTest` checks the case when a piece of custom metadata is larger than the configured limit.
2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works correctly, including custom metadata.
3. `RemoteLogSegmentMetadataTransformTest`, `RemoteLogSegmentMetadataSnapshotTransformTest`, and `RemoteLogSegmentMetadataUpdateTransformTest` were added to test the corresponding class (de-)serialization, including custom metadata.
4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are being correctly saved and loaded to a file (indirectly, via `equals`).
5. `RemoteLogManagerConfigTest` checks if the configuration setting is handled correctly.
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6982920..e01e709 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -40,6 +40,7 @@
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
+ <suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java b/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
new file mode 100644
index 0000000..c893f34
--- /dev/null
+++ b/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+class CustomMetadataSizeLimitExceededException extends Exception {
+}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6b723aa..33bde33 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -47,6 +47,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
@@ -480,12 +481,14 @@
class RLMTask extends CancellableRunnable {
private final TopicIdPartition topicIdPartition;
+ private final int customMetadataSizeLimit;
private final Logger logger;
private volatile int leaderEpoch = -1;
- public RLMTask(TopicIdPartition topicIdPartition) {
+ public RLMTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit) {
this.topicIdPartition = topicIdPartition;
+ this.customMetadataSizeLimit = customMetadataSizeLimit;
LogContext logContext = new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] ");
logger = logContext.logger(RLMTask.class);
}
@@ -586,6 +589,11 @@
} else {
logger.debug("Skipping copying segments, current read-offset:{}, and LSO:{}", copiedOffset, lso);
}
+ } catch (CustomMetadataSizeLimitExceededException e) {
+ // Only stop this task. Logging is done where the exception is thrown.
+ brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
+ brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
+ this.cancel();
} catch (InterruptedException ex) {
throw ex;
} catch (Exception ex) {
@@ -597,7 +605,8 @@
}
}
- private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException {
+ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
+ CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();
@@ -623,10 +632,30 @@
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
- remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
+ Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+ if (customMetadata.isPresent()) {
+ long customMetadataSize = customMetadata.get().value().length;
+ if (customMetadataSize > this.customMetadataSizeLimit) {
+ CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException();
+ logger.error("Custom metadata size {} exceeds configured limit {}." +
+ " Copying will be stopped and copied segment will be attempted to clean." +
+ " Original metadata: {}",
+ customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e);
+ try {
+ // For deletion, we provide back the custom metadata by creating a new metadata object from the update.
+ // However, the update itself will not be stored in this case.
+ remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+ logger.info("Successfully cleaned segment after custom metadata size exceeded");
+ } catch (RemoteStorageException e1) {
+ logger.error("Error while cleaning segment after custom metadata size exceeded, consider cleaning manually", e1);
+ }
+ throw e;
+ }
+ }
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
brokerTopicStats.topicStats(log.topicPartition().topic())
@@ -883,7 +912,7 @@
Consumer<RLMTask> convertToLeaderOrFollower) {
RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
- RLMTask task = new RLMTask(topicIdPartition);
+ RLMTask task = new RLMTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
// set this upfront when it is getting initialized instead of doing it after scheduling.
convertToLeaderOrFollower.accept(task);
LOGGER.info("Created a new task: {} and getting scheduled", task);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4d5bc8d..941f4dc 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -44,6 +44,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
@@ -106,7 +107,6 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -342,7 +342,8 @@
dummyFuture.complete(null);
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
- doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
+ when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
+ .thenReturn(Optional.empty());
// Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
@@ -353,7 +354,7 @@
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);
@@ -397,6 +398,100 @@
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}
+ // We are verifying that if the size of a piece of custom metadata is bigger than the configured limit,
+ // the copy task should be cancelled and there should be an attempt to delete the just copied segment.
+ @Test
+ void testCustomMetadataSizeExceedsLimit() throws Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+ long lastStableOffset = 150L;
+ long logEndOffset = 150L;
+
+ when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+ when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+ verify(oldSegment, times(0)).readNextOffset();
+ verify(activeSegment, times(0)).readNextOffset();
+
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
+
+ ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+ when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+ when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+ LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000);
+ LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500);
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
+ when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ int customMetadataSizeLimit = 128;
+ CustomMetadata customMetadata = new CustomMetadata(new byte[customMetadataSizeLimit * 2]);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+ when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+ when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
+ .thenReturn(Optional.of(customMetadata));
+
+ RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
+ task.convertToLeader(2);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+ verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
+
+ // Check we attempt to delete the segment data providing the custom metadata back.
+ RemoteLogSegmentMetadataUpdate expectedMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
+ remoteLogSegmentMetadataArg.getValue().remoteLogSegmentId(), time.milliseconds(),
+ Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ RemoteLogSegmentMetadata expectedDeleteMetadata = remoteLogSegmentMetadataArg.getValue().createWithUpdates(expectedMetadataUpdate);
+ verify(remoteStorageManager, times(1)).deleteLogSegmentData(eq(expectedDeleteMetadata));
+
+ // Check the task is cancelled in the end.
+ assertTrue(task.isCancelled());
+
+ // The metadata update should not be posted.
+ verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+
+ // Verify the metric for remote writes are not updated.
+ assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+ assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
+ // Verify we did not report any failure for remote writes
+ assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+ assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
+ assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+ }
+
@Test
void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
long oldSegmentStartOffset = 0L;
@@ -532,7 +627,7 @@
// Verify aggregate metrics
assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);
@@ -572,7 +667,7 @@
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
when(mockLog.lastStableOffset()).thenReturn(250L);
- RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToFollower();
task.copyLogSegmentsToRemote(mockLog);
@@ -714,7 +809,7 @@
@Test
void testRLMTaskShouldSetLeaderEpochCorrectly() {
- RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
assertFalse(task.isLeader());
task.convertToLeader(1);
assertTrue(task.isLeader());
@@ -862,7 +957,7 @@
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, activeSegment)));
- RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
@@ -888,7 +983,7 @@
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, segment3, activeSegment)));
- RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 47ae7bf..9b58932 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -19,10 +19,12 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
+import java.util.Optional;
import java.util.TreeMap;
/**
@@ -67,6 +69,11 @@
private final int segmentSizeInBytes;
/**
+ * Custom metadata.
+ */
+ private final Optional<CustomMetadata> customMetadata;
+
+ /**
* It indicates the state in which the action is executed on this segment.
*/
private final RemoteLogSegmentState state;
@@ -84,6 +91,7 @@
* @param brokerId Broker id from which this event is generated.
* @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
+ * @param customMetadata Custom metadata.
* @param state State of the respective segment of remoteLogSegmentId.
* @param segmentLeaderEpochs leader epochs occurred within this segment.
*/
@@ -94,6 +102,7 @@
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
+ Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs) {
super(brokerId, eventTimestampMs);
@@ -112,6 +121,7 @@
this.endOffset = endOffset;
this.maxTimestampMs = maxTimestampMs;
this.segmentSizeInBytes = segmentSizeInBytes;
+ this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty");
@@ -149,6 +159,7 @@
maxTimestampMs,
brokerId,
eventTimestampMs, segmentSizeInBytes,
+ Optional.empty(),
RemoteLogSegmentState.COPY_SEGMENT_STARTED,
segmentLeaderEpochs);
}
@@ -197,6 +208,13 @@
}
/**
+ * @return Custom metadata.
+ */
+ public Optional<CustomMetadata> customMetadata() {
+ return customMetadata;
+ }
+
+ /**
* Returns the current state of this remote log segment. It can be any of the below
* <ul>
* {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
@@ -223,7 +241,7 @@
return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
endOffset, maxTimestampMs, rlsmUpdate.brokerId(), rlsmUpdate.eventTimestampMs(),
- segmentSizeInBytes, rlsmUpdate.state(), segmentLeaderEpochs);
+ segmentSizeInBytes, rlsmUpdate.customMetadata(), rlsmUpdate.state(), segmentLeaderEpochs);
}
@Override
@@ -244,7 +262,9 @@
&& maxTimestampMs == that.maxTimestampMs
&& segmentSizeInBytes == that.segmentSizeInBytes
&& Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId)
- && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == that.state
+ && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs)
+ && Objects.equals(customMetadata, that.customMetadata)
+ && state == that.state
&& eventTimestampMs() == that.eventTimestampMs()
&& brokerId() == that.brokerId();
}
@@ -252,7 +272,7 @@
@Override
public int hashCode() {
return Objects.hash(remoteLogSegmentId, startOffset, endOffset, brokerId(), maxTimestampMs,
- eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, state);
+ eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
}
@Override
@@ -266,8 +286,57 @@
", eventTimestampMs=" + eventTimestampMs() +
", segmentLeaderEpochs=" + segmentLeaderEpochs +
", segmentSizeInBytes=" + segmentSizeInBytes +
+ ", customMetadata=" + customMetadata +
", state=" + state +
'}';
}
+ /**
+ * Custom metadata from a {@link RemoteStorageManager} plugin.
+ *
+ * <p>The content of these metadata is RSM-dependent and is opaque to the broker, i.e.
+ * it's not interpreted, only stored along with the rest of the remote log segment metadata.
+ *
+ * <p>Examples of such metadata are:
+ * <ol>
+ * <li>The storage path on the remote storage in case it's nondeterministic or version-dependent.</li>
+ * <li>The actual size of the all files related to the segment on the remote storage.</li>
+ * </ol>
+ *
+ * <p>The maximum size the broker accepts and stores is controlled by
+ * the {@code remote.log.metadata.custom.metadata.max.bytes} setting.
+ */
+ public static class CustomMetadata {
+ private final byte[] value;
+
+ public CustomMetadata(byte[] value) {
+ this.value = value;
+ }
+
+ public byte[] value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CustomMetadata that = (CustomMetadata) o;
+ return Arrays.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(value);
+ }
+
+ @Override
+ public String toString() {
+ return "CustomMetadata{" + value.length + " bytes}";
+ }
+ }
}
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
index a01df96..210615e 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
@@ -18,8 +18,10 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import java.util.Objects;
+import java.util.Optional;
/**
* It describes the metadata update about the log segment in the remote storage. This is currently used to update the
@@ -35,6 +37,11 @@
private final RemoteLogSegmentId remoteLogSegmentId;
/**
+ * Custom metadata.
+ */
+ private final Optional<CustomMetadata> customMetadata;
+
+ /**
* It indicates the state in which the action is executed on this segment.
*/
private final RemoteLogSegmentState state;
@@ -42,13 +49,17 @@
/**
* @param remoteLogSegmentId Universally unique remote log segment id.
* @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
+ * @param customMetadata Custom metadata.
* @param state State of the remote log segment.
* @param brokerId Broker id from which this event is generated.
*/
public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId remoteLogSegmentId, long eventTimestampMs,
- RemoteLogSegmentState state, int brokerId) {
+ Optional<CustomMetadata> customMetadata,
+ RemoteLogSegmentState state,
+ int brokerId) {
super(brokerId, eventTimestampMs);
this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null");
+ this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
this.state = Objects.requireNonNull(state, "state can not be null");
}
@@ -60,6 +71,13 @@
}
/**
+ * @return Custom metadata.
+ */
+ public Optional<CustomMetadata> customMetadata() {
+ return customMetadata;
+ }
+
+ /**
* It represents the state of the remote log segment. It can be one of the values of {@link RemoteLogSegmentState}.
*/
public RemoteLogSegmentState state() {
@@ -81,6 +99,7 @@
}
RemoteLogSegmentMetadataUpdate that = (RemoteLogSegmentMetadataUpdate) o;
return Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId) &&
+ Objects.equals(customMetadata, that.customMetadata) &&
state == that.state &&
eventTimestampMs() == that.eventTimestampMs() &&
brokerId() == that.brokerId();
@@ -88,14 +107,15 @@
@Override
public int hashCode() {
- return Objects.hash(remoteLogSegmentId, state, eventTimestampMs(), brokerId());
+ return Objects.hash(remoteLogSegmentId, customMetadata, state, eventTimestampMs(), brokerId());
}
@Override
public String toString() {
return "RemoteLogSegmentMetadataUpdate{" +
"remoteLogSegmentId=" + remoteLogSegmentId +
- ", state=" + state +
+ ", customMetadata=" + customMetadata +
+ ", state=" + state +
", eventTimestampMs=" + eventTimestampMs() +
", brokerId=" + brokerId() +
'}';
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index cc26109..fa81997 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -18,9 +18,11 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import java.io.Closeable;
import java.io.InputStream;
+import java.util.Optional;
/**
* This interface provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
@@ -81,10 +83,11 @@
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param logSegmentData data to be copied to tiered storage.
+ * @return custom metadata to be added to the segment metadata after copying.
* @throws RemoteStorageException if there are any errors in storing the data of the segment.
*/
- void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- LogSegmentData logSegmentData)
+ Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
throws RemoteStorageException;
/**
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
index 8a83033..dfd905c 100644
--- a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
@@ -16,14 +16,18 @@
*/
package org.apache.kafka.server.log.remote.storage;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;
+import java.util.Optional;
public class NoOpRemoteStorageManager implements RemoteStorageManager {
@Override
- public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- LogSegmentData logSegmentData) {
+ public Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData) {
+ return Optional.empty();
}
@Override
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
new file mode 100644
index 0000000..4cd2b35
--- /dev/null
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTest {
+ private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(),
+ new TopicPartition("foo", 0));
+
+ @Test
+ void createWithUpdates() {
+ int brokerId = 0;
+ int eventTimestamp = 0;
+ int brokerIdFinished = 1;
+ int timestampFinished = 1;
+ long startOffset = 0L;
+ long endOffset = 100L;
+ int segmentSize = 123;
+ long maxTimestamp = -1L;
+
+ Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
+ RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+ maxTimestamp, brokerId, eventTimestamp, segmentSize,
+ segmentLeaderEpochs);
+
+ CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2, 3});
+ RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
+ segmentId, timestampFinished, Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ brokerIdFinished);
+ RemoteLogSegmentMetadata updatedMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+ RemoteLogSegmentMetadata expectedUpdatedMetadata = new RemoteLogSegmentMetadata(
+ segmentId, startOffset, endOffset,
+ maxTimestamp, brokerIdFinished, timestampFinished, segmentSize, Optional.of(customMetadata),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ segmentLeaderEpochs
+ );
+ assertEquals(expectedUpdatedMetadata, updatedMetadata);
+
+ // Check that the original metadata have not changed.
+ assertEquals(segmentId, segmentMetadata.remoteLogSegmentId());
+ assertEquals(startOffset, segmentMetadata.startOffset());
+ assertEquals(endOffset, segmentMetadata.endOffset());
+ assertEquals(maxTimestamp, segmentMetadata.maxTimestampMs());
+ assertEquals(brokerId, segmentMetadata.brokerId());
+ assertEquals(eventTimestamp, segmentMetadata.eventTimestampMs());
+ assertEquals(segmentSize, segmentMetadata.segmentSizeInBytes());
+ assertEquals(segmentLeaderEpochs, segmentMetadata.segmentLeaderEpochs());
+ }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
index 15e4562..0b0b281 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
@@ -77,7 +77,8 @@
private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) {
return new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()), snapshot.startOffset(),
snapshot.endOffset(), snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(),
- snapshot.segmentSizeInBytes(), snapshot.state(), snapshot.segmentLeaderEpochs());
+ snapshot.segmentSizeInBytes(), snapshot.customMetadata(), snapshot.state(), snapshot.segmentLeaderEpochs()
+ );
}
/**
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
index c936d50..ec1ed6a 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -20,6 +20,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.Collections;
@@ -27,6 +28,7 @@
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
+import java.util.Optional;
/**
* This class represents the entry containing the metadata about a remote log segment. This is similar to
@@ -69,6 +71,11 @@
private final int segmentSizeInBytes;
/**
+ * Custom metadata.
+ */
+ private final Optional<CustomMetadata> customMetadata;
+
+ /**
* It indicates the state in which the action is executed on this segment.
*/
private final RemoteLogSegmentState state;
@@ -79,13 +86,14 @@
* {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch
* then it should have an entry with epoch mapping to start-offset of this segment.
*
- * @param segmentId Universally unique remote log segment id.
+ * @param segmentId Universally unique remote log segment id.
* @param startOffset Start offset of this segment (inclusive).
* @param endOffset End offset of this segment (inclusive).
* @param maxTimestampMs Maximum timestamp in milliseconds in this segment.
* @param brokerId Broker id from which this event is generated.
* @param eventTimestampMs Epoch time in milliseconds at which the remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
+ * @param customMetadata Custom metadata.
* @param state State of the respective segment of remoteLogSegmentId.
* @param segmentLeaderEpochs leader epochs occurred within this segment.
*/
@@ -96,6 +104,7 @@
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
+ Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs) {
super(brokerId, eventTimestampMs);
@@ -106,6 +115,7 @@
this.endOffset = endOffset;
this.maxTimestampMs = maxTimestampMs;
this.segmentSizeInBytes = segmentSizeInBytes;
+ this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty");
@@ -117,7 +127,8 @@
public static RemoteLogSegmentMetadataSnapshot create(RemoteLogSegmentMetadata metadata) {
return new RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), metadata.startOffset(), metadata.endOffset(),
metadata.maxTimestampMs(), metadata.brokerId(), metadata.eventTimestampMs(),
- metadata.segmentSizeInBytes(), metadata.state(), metadata.segmentLeaderEpochs());
+ metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), metadata.segmentLeaderEpochs()
+ );
}
/**
@@ -163,6 +174,13 @@
}
/**
+ * @return Custom metadata.
+ */
+ public Optional<CustomMetadata> customMetadata() {
+ return customMetadata;
+ }
+
+ /**
* Returns the current state of this remote log segment. It can be any of the below
* <ul>
* {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
@@ -185,13 +203,19 @@
if (this == o) return true;
if (!(o instanceof RemoteLogSegmentMetadataSnapshot)) return false;
RemoteLogSegmentMetadataSnapshot that = (RemoteLogSegmentMetadataSnapshot) o;
- return startOffset == that.startOffset && endOffset == that.endOffset && maxTimestampMs == that.maxTimestampMs && segmentSizeInBytes == that.segmentSizeInBytes && Objects.equals(
- segmentId, that.segmentId) && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == that.state;
+ return startOffset == that.startOffset
+ && endOffset == that.endOffset
+ && maxTimestampMs == that.maxTimestampMs
+ && segmentSizeInBytes == that.segmentSizeInBytes
+ && Objects.equals(customMetadata, that.customMetadata)
+ && Objects.equals(segmentId, that.segmentId)
+ && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs)
+ && state == that.state;
}
@Override
public int hashCode() {
- return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, state);
+ return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
}
@Override
@@ -203,6 +227,7 @@
", maxTimestampMs=" + maxTimestampMs +
", segmentLeaderEpochs=" + segmentLeaderEpochs +
", segmentSizeInBytes=" + segmentSizeInBytes +
+ ", customMetadata=" + customMetadata +
", state=" + state +
'}';
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
index bd613f8..ad47ee0 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -19,11 +19,13 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataSnapshot> {
@@ -39,6 +41,7 @@
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
.setRemoteLogSegmentState(segmentMetadata.state().id());
+ segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
@@ -59,6 +62,7 @@
segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset());
}
+ Optional<CustomMetadata> customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
return new RemoteLogSegmentMetadataSnapshot(record.segmentId(),
record.startOffset(),
record.endOffset(),
@@ -66,6 +70,7 @@
record.brokerId(),
record.eventTimestampMs(),
record.segmentSizeInBytes(),
+ customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
segmentLeaderEpochs);
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index 4282b9e..9e893d2 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -22,12 +22,14 @@
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadata> {
@@ -43,6 +45,7 @@
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
.setRemoteLogSegmentState(segmentMetadata.state().id());
+ segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
@@ -75,6 +78,7 @@
segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset());
}
+ Optional<CustomMetadata> customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
RemoteLogSegmentMetadata remoteLogSegmentMetadata =
new RemoteLogSegmentMetadata(remoteLogSegmentId, record.startOffset(), record.endOffset(),
record.maxTimestampMs(), record.brokerId(),
@@ -82,6 +86,7 @@
segmentLeaderEpochs);
RemoteLogSegmentMetadataUpdate rlsmUpdate
= new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, record.eventTimestampMs(),
+ customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
record.brokerId());
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
index 3db7765..e2d2bf8 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
@@ -21,9 +21,12 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import java.util.Optional;
+
public class RemoteLogSegmentMetadataUpdateTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataUpdate> {
public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) {
@@ -32,6 +35,7 @@
.setBrokerId(segmentMetadataUpdate.brokerId())
.setEventTimestampMs(segmentMetadataUpdate.eventTimestampMs())
.setRemoteLogSegmentState(segmentMetadataUpdate.state().id());
+ segmentMetadataUpdate.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
@@ -42,8 +46,9 @@
TopicIdPartition topicIdPartition = new TopicIdPartition(entry.topicIdPartition().id(),
new TopicPartition(entry.topicIdPartition().name(), entry.topicIdPartition().partition()));
+ Optional<CustomMetadata> customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
return new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(topicIdPartition, entry.id()),
- record.eventTimestampMs(), RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
+ record.eventTimestampMs(), customMetadata, RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
}
private RemoteLogSegmentMetadataUpdateRecord.RemoteLogSegmentIdEntry createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadataUpdate data) {
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
index 6284e4c..0260264 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
@@ -18,9 +18,12 @@
import org.apache.kafka.storage.internals.log.StorageAction;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
+import java.util.Optional;
/**
* A wrapper class of {@link RemoteStorageManager} that sets the context class loader when calling the respective
@@ -66,12 +69,9 @@
}
}
- public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ public Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
LogSegmentData logSegmentData) throws RemoteStorageException {
- withClassLoader(() -> {
- delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData);
- return null;
- });
+ return withClassLoader(() -> delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData));
}
@Override
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 17a2746..1167ee7 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -81,6 +81,13 @@
public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = "Listener name of the local broker to which it should get connected if " +
"needed by RemoteLogMetadataManager implementation.";
+ public static final String REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP = "remote.log.metadata.custom.metadata.max.bytes";
+ public static final String REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC = "The maximum size of custom metadata in bytes that the broker " +
+ "should accept from a remote storage plugin. If custom metadata exceeds this limit, the updated segment metadata " +
+ "will not be stored, the copied data will be attempted to delete, " +
+ "and the remote copying task for this topic-partition will stop with an error.";
+ public static final int DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES = 128;
+
public static final String REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = "remote.log.index.file.cache.total.size.bytes";
public static final String REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space allocated to store index files fetched " +
"from remote storage in the local storage.";
@@ -181,6 +188,12 @@
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
+ .defineInternal(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
+ atLeast(0),
+ LOW,
+ REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
LONG,
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
@@ -260,6 +273,7 @@
private final String remoteLogMetadataManagerPrefix;
private final HashMap<String, Object> remoteLogMetadataManagerProps;
private final String remoteLogMetadataManagerListenerName;
+ private final int remoteLogMetadataCustomMetadataMaxBytes;
public RemoteLogManagerConfig(AbstractConfig config) {
this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
@@ -276,6 +290,7 @@
config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP),
config.getInt(REMOTE_LOG_READER_THREADS_PROP),
config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
+ config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null
? config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP))
@@ -301,6 +316,7 @@
double remoteLogManagerTaskRetryJitter,
int remoteLogReaderThreads,
int remoteLogReaderMaxPendingTasks,
+ int remoteLogMetadataCustomMetadataMaxBytes,
String remoteStorageManagerPrefix,
Map<String, Object> remoteStorageManagerProps, /* properties having keys stripped out with remoteStorageManagerPrefix */
String remoteLogMetadataManagerPrefix,
@@ -324,6 +340,7 @@
this.remoteLogMetadataManagerPrefix = remoteLogMetadataManagerPrefix;
this.remoteLogMetadataManagerProps = new HashMap<>(remoteLogMetadataManagerProps);
this.remoteLogMetadataManagerListenerName = remoteLogMetadataManagerListenerName;
+ this.remoteLogMetadataCustomMetadataMaxBytes = remoteLogMetadataCustomMetadataMaxBytes;
}
public boolean enableRemoteStorageSystem() {
@@ -382,6 +399,10 @@
return remoteLogMetadataManagerListenerName;
}
+ public int remoteLogMetadataCustomMetadataMaxBytes() {
+ return remoteLogMetadataCustomMetadataMaxBytes;
+ }
+
public String remoteStorageManagerPrefix() {
return remoteStorageManagerPrefix;
}
@@ -412,6 +433,7 @@
&& remoteLogManagerTaskRetryJitter == that.remoteLogManagerTaskRetryJitter
&& remoteLogReaderThreads == that.remoteLogReaderThreads
&& remoteLogReaderMaxPendingTasks == that.remoteLogReaderMaxPendingTasks
+ && remoteLogMetadataCustomMetadataMaxBytes == that.remoteLogMetadataCustomMetadataMaxBytes
&& Objects.equals(remoteStorageManagerClassName, that.remoteStorageManagerClassName)
&& Objects.equals(remoteStorageManagerClassPath, that.remoteStorageManagerClassPath)
&& Objects.equals(remoteLogMetadataManagerClassName, that.remoteLogMetadataManagerClassName)
@@ -427,7 +449,7 @@
public int hashCode() {
return Objects.hash(enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath,
remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
- remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs,
+ remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs,
remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix);
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
index d18144e..c737135 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
@@ -117,6 +117,14 @@
"about": "Segment size in bytes."
},
{
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
+ {
"name": "RemoteLogSegmentState",
"type": "int8",
"versions": "0+",
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
index dbb2913..20fb173 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -83,6 +83,14 @@
"about": "Segment size in bytes"
},
{
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
+ {
"name": "RemoteLogSegmentState",
"type": "int8",
"versions": "0+",
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
index 24003dc..48aa34d 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
@@ -73,6 +73,14 @@
"about": "Epoch time in milli seconds at which this event is generated."
},
{
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
+ {
"name": "RemoteLogSegmentState",
"type": "int8",
"versions": "0+",
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
index 5f77417..d5341e0 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.test.TestUtils;
@@ -50,8 +51,10 @@
0, 100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
1024 * 1024, Collections.singletonMap(0, 0L));
cache.addCopyInProgressSegment(metadata1);
- RemoteLogSegmentMetadataUpdate metadataUpdate1 = new RemoteLogSegmentMetadataUpdate(segmentId1, System.currentTimeMillis(),
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ RemoteLogSegmentMetadataUpdate metadataUpdate1 = new RemoteLogSegmentMetadataUpdate(
+ segmentId1, System.currentTimeMillis(),
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
cache.updateRemoteLogSegmentMetadata(metadataUpdate1);
Optional<RemoteLogSegmentMetadata> receivedMetadata = cache.remoteLogSegmentMetadata(0, 0L);
assertTrue(receivedMetadata.isPresent());
@@ -63,8 +66,10 @@
0, 900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
1024 * 1024, Collections.singletonMap(0, 0L));
cache.addCopyInProgressSegment(metadata2);
- RemoteLogSegmentMetadataUpdate metadataUpdate2 = new RemoteLogSegmentMetadataUpdate(segmentId2, System.currentTimeMillis(),
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ RemoteLogSegmentMetadataUpdate metadataUpdate2 = new RemoteLogSegmentMetadataUpdate(
+ segmentId2, System.currentTimeMillis(),
+ Optional.of(new CustomMetadata(new byte[]{4, 5, 6, 7})),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
cache.updateRemoteLogSegmentMetadata(metadataUpdate2);
// Fetch segment for leader epoch:0 and start offset:0, it should be the newly added segment.
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
index 789997f..6fe0846 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
@@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
public class RemoteLogMetadataCacheTest {
@@ -57,7 +58,7 @@
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata updatedMetadata = segmentMetadata
.createWithUpdates(new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
- time.milliseconds(), state, BROKER_ID_1));
+ time.milliseconds(), Optional.empty(), state, BROKER_ID_1));
Assertions.assertThrows(IllegalArgumentException.class, () ->
cache.addCopyInProgressSegment(updatedMetadata));
}
@@ -67,7 +68,9 @@
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> {
RemoteLogSegmentId nonExistingId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(nonExistingId,
- time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
+ time.milliseconds(),
+ Optional.empty(),
+ RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
});
// Check for invalid state transition.
@@ -75,7 +78,9 @@
RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 0L), 0,
100, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
- time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
+ time.milliseconds(),
+ Optional.empty(),
+ RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
});
}
@@ -90,8 +95,11 @@
BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
cache.addCopyInProgressSegment(segmentMetadata);
- RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId,
- time.milliseconds(), state, BROKER_ID_1);
+ RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
+ segmentId,
+ time.milliseconds(),
+ Optional.empty(),
+ state, BROKER_ID_1);
cache.updateRemoteLogSegmentMetadata(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index 3f9db8c..e3d1a2a 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -23,6 +23,8 @@
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
@@ -30,6 +32,7 @@
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,15 +49,22 @@
segLeaderEpochs.put(1, 20L);
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID);
+ Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1,
- 123L, 1024, segLeaderEpochs);
+ 123L, 1024, customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
String expected = String.format(
- "partition: 0, offset: 0, value: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0, id=%s}, startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, state=COPY_SEGMENT_STARTED}\n",
+ "partition: 0, offset: 0, value: " +
+ "RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0, id=%s}, " +
+ "startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, " +
+ "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, " +
+ "customMetadata=Optional[CustomMetadata{10 bytes}], " +
+ "state=COPY_SEGMENT_STARTED}\n",
TOPIC_ID, SEGMENT_ID);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos)) {
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 402d1a2..5b48790 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -24,6 +24,7 @@
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -34,6 +35,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
public class RemoteLogMetadataSerdeTest {
@@ -69,12 +71,17 @@
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
- time.milliseconds(), 1024, segLeaderEpochs);
+ time.milliseconds(), 1024,
+ Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED,
+ segLeaderEpochs
+ );
}
private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(),
+ Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
index 1b46028..dbfbbf3 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
@@ -61,10 +62,13 @@
long startOffset = 0;
for (int i = 0; i < 100; i++) {
long endOffset = startOffset + 100L;
+ CustomMetadata customMetadata = new CustomMetadata(new byte[]{(byte) i});
remoteLogSegmentMetadatas.add(
new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(), startOffset, endOffset,
System.currentTimeMillis(), 1, 100, 1024,
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, startOffset)));
+ Optional.of(customMetadata),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, startOffset)
+ ));
startOffset = endOffset + 1;
}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
index 87e7683..504f47e 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
@@ -27,6 +27,7 @@
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemotePartitionDeleteMetadataTransform;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -35,6 +36,7 @@
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.Optional;
public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
@@ -58,6 +60,7 @@
RemoteLogSegmentMetadataUpdate metadataUpdate =
new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index b847e7cb..6928c6e 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -92,7 +92,8 @@
});
RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate(
- segment0Id, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+ segment0Id, time.milliseconds(), Optional.empty(),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segment0Update);
RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update);
@@ -167,6 +168,7 @@
remoteLogSegmentLifecycleManager
.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(),
+ Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED,
BROKER_ID_1));
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent());
@@ -176,6 +178,7 @@
remoteLogSegmentLifecycleManager
.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(),
+ Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1));
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent());
@@ -218,7 +221,9 @@
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
- RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), state, BROKER_ID_1);
+ RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
+ Optional.empty(),
+ state, BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
@@ -367,6 +372,7 @@
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+ Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
new file mode 100644
index 0000000..17d4242
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataSnapshotTransformTest {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+ Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentMetadataSnapshot snapshot = new RemoteLogSegmentMetadataSnapshot(
+ Uuid.randomUuid(),
+ 0L, 100L, -1L, 0, 0, 1234,
+ customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ segmentLeaderEpochs
+ );
+
+ RemoteLogSegmentMetadataSnapshotTransform transform = new RemoteLogSegmentMetadataSnapshotTransform();
+ ApiMessageAndVersion message = transform.toApiMessageAndVersion(snapshot);
+ assertEquals(snapshot, transform.fromApiMessageAndVersion(message));
+ }
+
+ private static Stream<Object> parameters() {
+ return Stream.of(
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ Optional.of(new CustomMetadata(new byte[0])),
+ Optional.empty()
+ );
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
new file mode 100644
index 0000000..84324e7
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTransformTest {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+ Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(
+ new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), 0, "topic"), Uuid.randomUuid()),
+ 0L, 100L, -1L, 0, 0, 1234,
+ customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ segmentLeaderEpochs
+ );
+
+ RemoteLogSegmentMetadataTransform transform = new RemoteLogSegmentMetadataTransform();
+ ApiMessageAndVersion message = transform.toApiMessageAndVersion(metadata);
+ assertEquals(metadata, transform.fromApiMessageAndVersion(message));
+ }
+
+ private static Stream<Object> parameters() {
+ return Stream.of(
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ Optional.of(new CustomMetadata(new byte[0])),
+ Optional.empty()
+ );
+ }
+}
\ No newline at end of file
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
new file mode 100644
index 0000000..09f1b76
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataUpdateTransformTest {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+ Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentMetadataUpdate(
+ new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), 0, "topic"), Uuid.randomUuid()),
+ 123L,
+ customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ 1
+ );
+
+ RemoteLogSegmentMetadataUpdateTransform transform = new RemoteLogSegmentMetadataUpdateTransform();
+ ApiMessageAndVersion message = transform.toApiMessageAndVersion(metadataUpdate);
+ assertEquals(metadataUpdate, transform.fromApiMessageAndVersion(message));
+ }
+
+ private static Stream<Object> parameters() {
+ return Stream.of(
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ Optional.of(new CustomMetadata(new byte[0])),
+ Optional.empty()
+ );
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
index c9541f6..8650cea 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
@@ -26,8 +26,11 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
/**
* This class is an implementation of {@link RemoteStorageManager} backed by in-memory store.
*/
@@ -52,8 +55,8 @@
}
@Override
- public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- LogSegmentData logSegmentData)
+ public Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
throws RemoteStorageException {
log.debug("copying log segment and indexes for : {}", remoteLogSegmentMetadata);
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
@@ -83,6 +86,7 @@
throw new RemoteStorageException(e);
}
log.debug("copied log segment and indexes for : {} successfully.", remoteLogSegmentMetadata);
+ return Optional.empty();
}
@Override
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 058d8a5..a8847e5 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -20,6 +20,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
@@ -302,9 +303,9 @@
}
@Override
- public void copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final LogSegmentData data)
+ public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final LogSegmentData data)
throws RemoteStorageException {
- Callable<Void> callable = () -> {
+ Callable<Optional<CustomMetadata>> callable = () -> {
final RemoteLogSegmentId id = metadata.remoteLogSegmentId();
final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(COPY_SEGMENT, id);
RemoteLogSegmentFileset fileset = null;
@@ -331,10 +332,10 @@
throw e;
}
- return null;
+ return Optional.empty();
};
- wrap(callable);
+ return wrap(callable);
}
@Override
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index bb3c2ff..c8b428c 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -41,7 +41,7 @@
RemoteLogManagerConfig expectedRemoteLogManagerConfig
= new RemoteLogManagerConfig(true, "dummy.remote.storage.class", "dummy.remote.storage.class.path",
"dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path",
- "listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100,
+ "listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100,
rsmPrefix, rsmProps, rlmmPrefix, rlmmProps);
Map<String, Object> props = extractProps(expectedRemoteLogManagerConfig);
@@ -82,6 +82,8 @@
remoteLogManagerConfig.remoteLogReaderThreads());
props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
remoteLogManagerConfig.remoteLogReaderMaxPendingTasks());
+ props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+ remoteLogManagerConfig.remoteLogMetadataCustomMetadataMaxBytes());
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
remoteLogManagerConfig.remoteStorageManagerPrefix());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 95521c4..528843a 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -69,7 +69,8 @@
// 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available.
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ Optional.empty(),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
BROKER_ID_1);
// Wait until the segment is updated successfully.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
@@ -108,7 +109,7 @@
remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
- segmentId, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+ segmentId, time.milliseconds(), Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
// Wait until the segment is updated successfully.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();