KAFKA-15107: Support custom metadata for remote log segment (#13984)

* KAFKA-15107: Support custom metadata for remote log segment

This commit does the changes discussed in the KIP-917. Mainly, changes the `RemoteStorageManager` interface in order to return `CustomMetadata` and then ensures these custom metadata are stored, propagated, (de-)serialized correctly along with the standard metadata throughout the whole lifecycle. It introduces the `remote.log.metadata.custom.metadata.max.size` to limit the custom metadata size acceptable by the broker and stop uploading in case a piece of metadata exceeds this limit.

On testing:
1. `RemoteLogManagerTest` checks the case when a piece of custom metadata is larger than the configured limit.
2. `RemoteLogSegmentMetadataTest` checks if `createWithUpdates` works correctly, including custom metadata.
3. `RemoteLogSegmentMetadataTransformTest`, `RemoteLogSegmentMetadataSnapshotTransformTest`, and `RemoteLogSegmentMetadataUpdateTransformTest` were added to test the corresponding class (de-)serialization, including custom metadata.
4. `FileBasedRemoteLogMetadataCacheTest` checks if custom metadata are being correctly saved and loaded to a file (indirectly, via `equals`).
5. `RemoteLogManagerConfigTest` checks if the configuration setting is handled correctly.

Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>, Divij Vaidya <diviv@amazon.com>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6982920..e01e709 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -40,6 +40,7 @@
               files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
     <suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
+    <suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
 
diff --git a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java b/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
new file mode 100644
index 0000000..c893f34
--- /dev/null
+++ b/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+class CustomMetadataSizeLimitExceededException extends Exception {
+}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6b723aa..33bde33 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -47,6 +47,7 @@
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
@@ -480,12 +481,14 @@
     class RLMTask extends CancellableRunnable {
 
         private final TopicIdPartition topicIdPartition;
+        private final int customMetadataSizeLimit;
         private final Logger logger;
 
         private volatile int leaderEpoch = -1;
 
-        public RLMTask(TopicIdPartition topicIdPartition) {
+        public RLMTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit) {
             this.topicIdPartition = topicIdPartition;
+            this.customMetadataSizeLimit = customMetadataSizeLimit;
             LogContext logContext = new LogContext("[RemoteLogManager=" + brokerId + " partition=" + topicIdPartition + "] ");
             logger = logContext.logger(RLMTask.class);
         }
@@ -586,6 +589,11 @@
                 } else {
                     logger.debug("Skipping copying segments, current read-offset:{}, and LSO:{}", copiedOffset, lso);
                 }
+            } catch (CustomMetadataSizeLimitExceededException e) {
+                // Only stop this task. Logging is done where the exception is thrown.
+                brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
+                brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
+                this.cancel();
             } catch (InterruptedException ex) {
                 throw ex;
             } catch (Exception ex) {
@@ -597,7 +605,8 @@
             }
         }
 
-        private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException {
+        private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
+                CustomMetadataSizeLimitExceededException {
             File logFile = segment.log().file();
             String logFileName = logFile.getName();
 
@@ -623,10 +632,30 @@
                     producerStateSnapshotFile.toPath(), leaderEpochsIndex);
             brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
             brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
-            remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
+            Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
             RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+                    customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+            if (customMetadata.isPresent()) {
+                long customMetadataSize = customMetadata.get().value().length;
+                if (customMetadataSize > this.customMetadataSizeLimit) {
+                    CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException();
+                    logger.error("Custom metadata size {} exceeds configured limit {}." +
+                                    " Copying will be stopped and copied segment will be attempted to clean." +
+                                    " Original metadata: {}",
+                            customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e);
+                    try {
+                        // For deletion, we provide back the custom metadata by creating a new metadata object from the update.
+                        // However, the update itself will not be stored in this case.
+                        remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
+                        logger.info("Successfully cleaned segment after custom metadata size exceeded");
+                    } catch (RemoteStorageException e1) {
+                        logger.error("Error while cleaning segment after custom metadata size exceeded, consider cleaning manually", e1);
+                    }
+                    throw e;
+                }
+            }
 
             remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
             brokerTopicStats.topicStats(log.topicPartition().topic())
@@ -883,7 +912,7 @@
                                             Consumer<RLMTask> convertToLeaderOrFollower) {
         RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
                 topicIdPartition -> {
-                    RLMTask task = new RLMTask(topicIdPartition);
+                    RLMTask task = new RLMTask(topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
                     // set this upfront when it is getting initialized instead of doing it after scheduling.
                     convertToLeaderOrFollower.accept(task);
                     LOGGER.info("Created a new task: {} and getting scheduled", task);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4d5bc8d..941f4dc 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -44,6 +44,7 @@
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
@@ -106,7 +107,6 @@
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
@@ -342,7 +342,8 @@
         dummyFuture.complete(null);
         when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
         when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
-        doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
+        when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
+                .thenReturn(Optional.empty());
 
         // Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
         assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
@@ -353,7 +354,7 @@
         assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
         assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
         task.convertToLeader(2);
         task.copyLogSegmentsToRemote(mockLog);
 
@@ -397,6 +398,100 @@
         assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
     }
 
+    // We are verifying that if the size of a piece of custom metadata is bigger than the configured limit,
+    // the copy task should be cancelled and there should be an attempt to delete the just copied segment.
+    @Test
+    void testCustomMetadataSizeExceedsLimit() throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+        long lastStableOffset = 150L;
+        long logEndOffset = 150L;
+
+        when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(-1L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+        verify(oldSegment, times(0)).readNextOffset();
+        verify(activeSegment, times(0)).readNextOffset();
+
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
+
+        ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+        when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+        LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000);
+        LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500);
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
+        when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        int customMetadataSizeLimit = 128;
+        CustomMetadata customMetadata = new CustomMetadata(new byte[customMetadataSizeLimit * 2]);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class)))
+                .thenReturn(Optional.of(customMetadata));
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
+        task.convertToLeader(2);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
+        verify(remoteLogMetadataManager).addRemoteLogSegmentMetadata(remoteLogSegmentMetadataArg.capture());
+
+        // Check we attempt to delete the segment data providing the custom metadata back.
+        RemoteLogSegmentMetadataUpdate expectedMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
+                remoteLogSegmentMetadataArg.getValue().remoteLogSegmentId(), time.milliseconds(),
+                Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        RemoteLogSegmentMetadata expectedDeleteMetadata = remoteLogSegmentMetadataArg.getValue().createWithUpdates(expectedMetadataUpdate);
+        verify(remoteStorageManager, times(1)).deleteLogSegmentData(eq(expectedDeleteMetadata));
+
+        // Check the task is cancelled in the end.
+        assertTrue(task.isCancelled());
+
+        // The metadata update should not be posted.
+        verify(remoteLogMetadataManager, never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
+
+        // Verify the metric for remote writes are not updated.
+        assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
+        assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
+        // Verify we did not report any failure for remote writes
+        assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
+        assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
+        assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
+    }
+
     @Test
     void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
         long oldSegmentStartOffset = 0L;
@@ -532,7 +627,7 @@
         // Verify aggregate metrics
         assertEquals(0, brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
         assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
-        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
         task.convertToLeader(2);
         task.copyLogSegmentsToRemote(mockLog);
 
@@ -572,7 +667,7 @@
         when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
         when(mockLog.lastStableOffset()).thenReturn(250L);
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
         task.convertToFollower();
         task.copyLogSegmentsToRemote(mockLog);
 
@@ -714,7 +809,7 @@
 
     @Test
     void testRLMTaskShouldSetLeaderEpochCorrectly() {
-        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
         assertFalse(task.isLeader());
         task.convertToLeader(1);
         assertTrue(task.isLeader());
@@ -862,7 +957,7 @@
         when(log.logSegments(5L, Long.MAX_VALUE))
                 .thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, activeSegment)));
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
         List<RemoteLogManager.EnrichedLogSegment> expected =
                 Arrays.asList(
                         new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
@@ -888,7 +983,7 @@
         when(log.logSegments(5L, Long.MAX_VALUE))
                 .thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, segment3, activeSegment)));
 
-        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
         List<RemoteLogManager.EnrichedLogSegment> expected =
                 Arrays.asList(
                         new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 47ae7bf..9b58932 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -19,10 +19,12 @@
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.TreeMap;
 
 /**
@@ -67,6 +69,11 @@
     private final int segmentSizeInBytes;
 
     /**
+     * Custom metadata.
+     */
+    private final Optional<CustomMetadata> customMetadata;
+
+    /**
      * It indicates the state in which the action is executed on this segment.
      */
     private final RemoteLogSegmentState state;
@@ -84,6 +91,7 @@
      * @param brokerId            Broker id from which this event is generated.
      * @param eventTimestampMs    Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
      * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param customMetadata      Custom metadata.
      * @param state               State of the respective segment of remoteLogSegmentId.
      * @param segmentLeaderEpochs leader epochs occurred within this segment.
      */
@@ -94,6 +102,7 @@
                                     int brokerId,
                                     long eventTimestampMs,
                                     int segmentSizeInBytes,
+                                    Optional<CustomMetadata> customMetadata,
                                     RemoteLogSegmentState state,
                                     Map<Integer, Long> segmentLeaderEpochs) {
         super(brokerId, eventTimestampMs);
@@ -112,6 +121,7 @@
         this.endOffset = endOffset;
         this.maxTimestampMs = maxTimestampMs;
         this.segmentSizeInBytes = segmentSizeInBytes;
+        this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
 
         if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
             throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty");
@@ -149,6 +159,7 @@
                 maxTimestampMs,
                 brokerId,
                 eventTimestampMs, segmentSizeInBytes,
+                Optional.empty(),
                 RemoteLogSegmentState.COPY_SEGMENT_STARTED,
                 segmentLeaderEpochs);
     }
@@ -197,6 +208,13 @@
     }
 
     /**
+     * @return Custom metadata.
+     */
+    public Optional<CustomMetadata> customMetadata() {
+        return customMetadata;
+    }
+
+    /**
      * Returns the current state of this remote log segment. It can be any of the below
      * <ul>
      *     {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
@@ -223,7 +241,7 @@
 
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
                 endOffset, maxTimestampMs, rlsmUpdate.brokerId(), rlsmUpdate.eventTimestampMs(),
-                segmentSizeInBytes, rlsmUpdate.state(), segmentLeaderEpochs);
+                segmentSizeInBytes, rlsmUpdate.customMetadata(), rlsmUpdate.state(), segmentLeaderEpochs);
     }
 
     @Override
@@ -244,7 +262,9 @@
                 && maxTimestampMs == that.maxTimestampMs
                 && segmentSizeInBytes == that.segmentSizeInBytes
                 && Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId)
-                && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == that.state
+                && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs)
+                && Objects.equals(customMetadata, that.customMetadata)
+                && state == that.state
                 && eventTimestampMs() == that.eventTimestampMs()
                 && brokerId() == that.brokerId();
     }
@@ -252,7 +272,7 @@
     @Override
     public int hashCode() {
         return Objects.hash(remoteLogSegmentId, startOffset, endOffset, brokerId(), maxTimestampMs,
-                eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, state);
+                eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
     }
 
     @Override
@@ -266,8 +286,57 @@
                ", eventTimestampMs=" + eventTimestampMs() +
                ", segmentLeaderEpochs=" + segmentLeaderEpochs +
                ", segmentSizeInBytes=" + segmentSizeInBytes +
+               ", customMetadata=" + customMetadata +
                ", state=" + state +
                '}';
     }
 
+    /**
+     * Custom metadata from a {@link RemoteStorageManager} plugin.
+     *
+     * <p>The content of these metadata is RSM-dependent and is opaque to the broker, i.e.
+     * it's not interpreted, only stored along with the rest of the remote log segment metadata.
+     *
+     * <p>Examples of such metadata are:
+     * <ol>
+     *     <li>The storage path on the remote storage in case it's nondeterministic or version-dependent.</li>
+     *     <li>The actual size of the all files related to the segment on the remote storage.</li>
+     * </ol>
+     *
+     * <p>The maximum size the broker accepts and stores is controlled by
+     * the {@code remote.log.metadata.custom.metadata.max.bytes} setting.
+     */
+    public static class CustomMetadata {
+        private final byte[] value;
+
+        public CustomMetadata(byte[] value) {
+            this.value = value;
+        }
+
+        public byte[] value() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CustomMetadata that = (CustomMetadata) o;
+            return Arrays.equals(value, that.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(value);
+        }
+
+        @Override
+        public String toString() {
+            return "CustomMetadata{" + value.length + " bytes}";
+        }
+    }
 }
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
index a01df96..210615e 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
@@ -18,8 +18,10 @@
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * It describes the metadata update about the log segment in the remote storage. This is currently used to update the
@@ -35,6 +37,11 @@
     private final RemoteLogSegmentId remoteLogSegmentId;
 
     /**
+     * Custom metadata.
+     */
+    private final Optional<CustomMetadata> customMetadata;
+
+    /**
      * It indicates the state in which the action is executed on this segment.
      */
     private final RemoteLogSegmentState state;
@@ -42,13 +49,17 @@
     /**
      * @param remoteLogSegmentId Universally unique remote log segment id.
      * @param eventTimestampMs   Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
+     * @param customMetadata     Custom metadata.
      * @param state              State of the remote log segment.
      * @param brokerId           Broker id from which this event is generated.
      */
     public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId remoteLogSegmentId, long eventTimestampMs,
-                                          RemoteLogSegmentState state, int brokerId) {
+                                          Optional<CustomMetadata> customMetadata,
+                                          RemoteLogSegmentState state,
+                                          int brokerId) {
         super(brokerId, eventTimestampMs);
         this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null");
+        this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
         this.state = Objects.requireNonNull(state, "state can not be null");
     }
 
@@ -60,6 +71,13 @@
     }
 
     /**
+     * @return Custom metadata.
+     */
+    public Optional<CustomMetadata> customMetadata() {
+        return customMetadata;
+    }
+
+    /**
      * It represents the state of the remote log segment. It can be one of the values of {@link RemoteLogSegmentState}.
      */
     public RemoteLogSegmentState state() {
@@ -81,6 +99,7 @@
         }
         RemoteLogSegmentMetadataUpdate that = (RemoteLogSegmentMetadataUpdate) o;
         return Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId) &&
+               Objects.equals(customMetadata, that.customMetadata) &&
                state == that.state &&
                eventTimestampMs() == that.eventTimestampMs() &&
                brokerId() == that.brokerId();
@@ -88,14 +107,15 @@
 
     @Override
     public int hashCode() {
-        return Objects.hash(remoteLogSegmentId, state, eventTimestampMs(), brokerId());
+        return Objects.hash(remoteLogSegmentId, customMetadata, state, eventTimestampMs(), brokerId());
     }
 
     @Override
     public String toString() {
         return "RemoteLogSegmentMetadataUpdate{" +
                "remoteLogSegmentId=" + remoteLogSegmentId +
-                ", state=" + state +
+               ", customMetadata=" + customMetadata +
+               ", state=" + state +
                ", eventTimestampMs=" + eventTimestampMs() +
                ", brokerId=" + brokerId() +
                '}';
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index cc26109..fa81997 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -18,9 +18,11 @@
 
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 
 import java.io.Closeable;
 import java.io.InputStream;
+import java.util.Optional;
 
 /**
  * This interface provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
@@ -81,10 +83,11 @@
      *
      * @param remoteLogSegmentMetadata metadata about the remote log segment.
      * @param logSegmentData           data to be copied to tiered storage.
+     * @return custom metadata to be added to the segment metadata after copying.
      * @throws RemoteStorageException if there are any errors in storing the data of the segment.
      */
-    void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                            LogSegmentData logSegmentData)
+    Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                                LogSegmentData logSegmentData)
             throws RemoteStorageException;
 
     /**
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
index 8a83033..dfd905c 100644
--- a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
@@ -16,14 +16,18 @@
  */
 package org.apache.kafka.server.log.remote.storage;
 
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.Map;
+import java.util.Optional;
 
 public class NoOpRemoteStorageManager implements RemoteStorageManager {
     @Override
-    public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                                   LogSegmentData logSegmentData) {
+    public Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                                       LogSegmentData logSegmentData) {
+        return Optional.empty();
     }
 
     @Override
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
new file mode 100644
index 0000000..4cd2b35
--- /dev/null
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTest {
+    private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(),
+            new TopicPartition("foo", 0));
+
+    @Test
+    void createWithUpdates() {
+        int brokerId = 0;
+        int eventTimestamp = 0;
+        int brokerIdFinished = 1;
+        int timestampFinished = 1;
+        long startOffset = 0L;
+        long endOffset = 100L;
+        int segmentSize = 123;
+        long maxTimestamp = -1L;
+
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
+        RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+                maxTimestamp, brokerId, eventTimestamp, segmentSize,
+                segmentLeaderEpochs);
+
+        CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2, 3});
+        RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
+                segmentId, timestampFinished, Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                brokerIdFinished);
+        RemoteLogSegmentMetadata updatedMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+        RemoteLogSegmentMetadata expectedUpdatedMetadata = new RemoteLogSegmentMetadata(
+                segmentId, startOffset, endOffset,
+                maxTimestamp, brokerIdFinished, timestampFinished, segmentSize, Optional.of(customMetadata),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                segmentLeaderEpochs
+        );
+        assertEquals(expectedUpdatedMetadata, updatedMetadata);
+
+        // Check that the original metadata have not changed.
+        assertEquals(segmentId, segmentMetadata.remoteLogSegmentId());
+        assertEquals(startOffset, segmentMetadata.startOffset());
+        assertEquals(endOffset, segmentMetadata.endOffset());
+        assertEquals(maxTimestamp, segmentMetadata.maxTimestampMs());
+        assertEquals(brokerId, segmentMetadata.brokerId());
+        assertEquals(eventTimestamp, segmentMetadata.eventTimestampMs());
+        assertEquals(segmentSize, segmentMetadata.segmentSizeInBytes());
+        assertEquals(segmentLeaderEpochs, segmentMetadata.segmentLeaderEpochs());
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
index 15e4562..0b0b281 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
@@ -77,7 +77,8 @@
     private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) {
         return new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()), snapshot.startOffset(),
                                             snapshot.endOffset(), snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(),
-                                            snapshot.segmentSizeInBytes(), snapshot.state(), snapshot.segmentLeaderEpochs());
+                                            snapshot.segmentSizeInBytes(), snapshot.customMetadata(), snapshot.state(), snapshot.segmentLeaderEpochs()
+        );
     }
 
     /**
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
index c936d50..ec1ed6a 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 
 import java.util.Collections;
@@ -27,6 +28,7 @@
 import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.TreeMap;
+import java.util.Optional;
 
 /**
  * This class represents the entry containing the metadata about a remote log segment. This is similar to
@@ -69,6 +71,11 @@
     private final int segmentSizeInBytes;
 
     /**
+     * Custom metadata.
+     */
+    private final Optional<CustomMetadata> customMetadata;
+
+    /**
      * It indicates the state in which the action is executed on this segment.
      */
     private final RemoteLogSegmentState state;
@@ -79,13 +86,14 @@
      * {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch
      * then it should have an entry with epoch mapping to start-offset of this segment.
      *
-     * @param segmentId                  Universally unique remote log segment id.
+     * @param segmentId           Universally unique remote log segment id.
      * @param startOffset         Start offset of this segment (inclusive).
      * @param endOffset           End offset of this segment (inclusive).
      * @param maxTimestampMs      Maximum timestamp in milliseconds in this segment.
      * @param brokerId            Broker id from which this event is generated.
      * @param eventTimestampMs    Epoch time in milliseconds at which the remote log segment is copied to the remote tier storage.
      * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param customMetadata      Custom metadata.
      * @param state               State of the respective segment of remoteLogSegmentId.
      * @param segmentLeaderEpochs leader epochs occurred within this segment.
      */
@@ -96,6 +104,7 @@
                                             int brokerId,
                                             long eventTimestampMs,
                                             int segmentSizeInBytes,
+                                            Optional<CustomMetadata> customMetadata,
                                             RemoteLogSegmentState state,
                                             Map<Integer, Long> segmentLeaderEpochs) {
         super(brokerId, eventTimestampMs);
@@ -106,6 +115,7 @@
         this.endOffset = endOffset;
         this.maxTimestampMs = maxTimestampMs;
         this.segmentSizeInBytes = segmentSizeInBytes;
+        this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
 
         if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
             throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty");
@@ -117,7 +127,8 @@
     public static RemoteLogSegmentMetadataSnapshot create(RemoteLogSegmentMetadata metadata) {
         return new RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), metadata.startOffset(), metadata.endOffset(),
                                                     metadata.maxTimestampMs(), metadata.brokerId(), metadata.eventTimestampMs(),
-                                                    metadata.segmentSizeInBytes(), metadata.state(), metadata.segmentLeaderEpochs());
+                                                    metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), metadata.segmentLeaderEpochs()
+        );
     }
 
     /**
@@ -163,6 +174,13 @@
     }
 
     /**
+     * @return Custom metadata.
+     */
+    public Optional<CustomMetadata> customMetadata() {
+        return customMetadata;
+    }
+
+    /**
      * Returns the current state of this remote log segment. It can be any of the below
      * <ul>
      *     {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
@@ -185,13 +203,19 @@
         if (this == o) return true;
         if (!(o instanceof RemoteLogSegmentMetadataSnapshot)) return false;
         RemoteLogSegmentMetadataSnapshot that = (RemoteLogSegmentMetadataSnapshot) o;
-        return startOffset == that.startOffset && endOffset == that.endOffset && maxTimestampMs == that.maxTimestampMs && segmentSizeInBytes == that.segmentSizeInBytes && Objects.equals(
-                segmentId, that.segmentId) && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == that.state;
+        return startOffset == that.startOffset
+                && endOffset == that.endOffset
+                && maxTimestampMs == that.maxTimestampMs
+                && segmentSizeInBytes == that.segmentSizeInBytes
+                && Objects.equals(customMetadata, that.customMetadata)
+                && Objects.equals(segmentId, that.segmentId)
+                && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs)
+                && state == that.state;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, state);
+        return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
     }
 
     @Override
@@ -203,6 +227,7 @@
                 ", maxTimestampMs=" + maxTimestampMs +
                 ", segmentLeaderEpochs=" + segmentLeaderEpochs +
                 ", segmentSizeInBytes=" + segmentSizeInBytes +
+                ", customMetadata=" + customMetadata +
                 ", state=" + state +
                 '}';
     }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
index bd613f8..ad47ee0 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -19,11 +19,13 @@
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
 import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataSnapshot> {
@@ -39,6 +41,7 @@
                 .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
                 .setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
                 .setRemoteLogSegmentState(segmentMetadata.state().id());
+        segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
 
         return new ApiMessageAndVersion(record, record.highestSupportedVersion());
     }
@@ -59,6 +62,7 @@
             segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset());
         }
 
+        Optional<CustomMetadata> customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
         return new RemoteLogSegmentMetadataSnapshot(record.segmentId(),
                                                     record.startOffset(),
                                                     record.endOffset(),
@@ -66,6 +70,7 @@
                                                     record.brokerId(),
                                                     record.eventTimestampMs(),
                                                     record.segmentSizeInBytes(),
+                                                    customMetadata,
                                                     RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
                                                     segmentLeaderEpochs);
     }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index 4282b9e..9e893d2 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -22,12 +22,14 @@
 import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadata> {
@@ -43,6 +45,7 @@
                 .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
                 .setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
                 .setRemoteLogSegmentState(segmentMetadata.state().id());
+        segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
 
         return new ApiMessageAndVersion(record, record.highestSupportedVersion());
     }
@@ -75,6 +78,7 @@
             segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset());
         }
 
+        Optional<CustomMetadata> customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
         RemoteLogSegmentMetadata remoteLogSegmentMetadata =
                 new RemoteLogSegmentMetadata(remoteLogSegmentId, record.startOffset(), record.endOffset(),
                                              record.maxTimestampMs(), record.brokerId(),
@@ -82,6 +86,7 @@
                                              segmentLeaderEpochs);
         RemoteLogSegmentMetadataUpdate rlsmUpdate
                 = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, record.eventTimestampMs(),
+                                                     customMetadata,
                                                      RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
                                                      record.brokerId());
 
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
index 3db7765..e2d2bf8 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
@@ -21,9 +21,12 @@
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 
+import java.util.Optional;
+
 public class RemoteLogSegmentMetadataUpdateTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataUpdate> {
 
     public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) {
@@ -32,6 +35,7 @@
                 .setBrokerId(segmentMetadataUpdate.brokerId())
                 .setEventTimestampMs(segmentMetadataUpdate.eventTimestampMs())
                 .setRemoteLogSegmentState(segmentMetadataUpdate.state().id());
+        segmentMetadataUpdate.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
 
         return new ApiMessageAndVersion(record, record.highestSupportedVersion());
     }
@@ -42,8 +46,9 @@
         TopicIdPartition topicIdPartition = new TopicIdPartition(entry.topicIdPartition().id(),
                 new TopicPartition(entry.topicIdPartition().name(), entry.topicIdPartition().partition()));
 
+        Optional<CustomMetadata> customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
         return new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(topicIdPartition, entry.id()),
-                record.eventTimestampMs(), RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
+                record.eventTimestampMs(), customMetadata, RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
     }
 
     private RemoteLogSegmentMetadataUpdateRecord.RemoteLogSegmentIdEntry createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadataUpdate data) {
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
index 6284e4c..0260264 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
@@ -18,9 +18,12 @@
 
 import org.apache.kafka.storage.internals.log.StorageAction;
 
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * A wrapper class of {@link RemoteStorageManager} that sets the context class loader when calling the respective
@@ -66,12 +69,9 @@
         }
     }
 
-    public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+    public Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    LogSegmentData logSegmentData) throws RemoteStorageException {
-        withClassLoader(() -> {
-            delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData);
-            return null;
-        });
+        return withClassLoader(() -> delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData));
     }
 
     @Override
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 17a2746..1167ee7 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -81,6 +81,13 @@
     public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = "Listener name of the local broker to which it should get connected if " +
             "needed by RemoteLogMetadataManager implementation.";
 
+    public static final String REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP = "remote.log.metadata.custom.metadata.max.bytes";
+    public static final String REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC = "The maximum size of custom metadata in bytes that the broker " +
+            "should accept from a remote storage plugin. If custom  metadata exceeds this limit, the updated segment metadata " +
+            "will not be stored, the copied data will be attempted to delete, " +
+            "and the remote copying task for this topic-partition will stop with an error.";
+    public static final int DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES = 128;
+
     public static final String REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = "remote.log.index.file.cache.total.size.bytes";
     public static final String REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space allocated to store index files fetched " +
             "from remote storage in the local storage.";
@@ -181,6 +188,12 @@
                                   new ConfigDef.NonEmptyString(),
                                   MEDIUM,
                                   REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
+                  .defineInternal(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+                                  INT,
+                                  DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
+                                  atLeast(0),
+                                  LOW,
+                                  REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
                   .defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
                                   LONG,
                                   DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
@@ -260,6 +273,7 @@
     private final String remoteLogMetadataManagerPrefix;
     private final HashMap<String, Object> remoteLogMetadataManagerProps;
     private final String remoteLogMetadataManagerListenerName;
+    private final int remoteLogMetadataCustomMetadataMaxBytes;
 
     public RemoteLogManagerConfig(AbstractConfig config) {
         this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
@@ -276,6 +290,7 @@
              config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP),
              config.getInt(REMOTE_LOG_READER_THREADS_PROP),
              config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
+             config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP),
              config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
              config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null
                  ? config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP))
@@ -301,6 +316,7 @@
                                   double remoteLogManagerTaskRetryJitter,
                                   int remoteLogReaderThreads,
                                   int remoteLogReaderMaxPendingTasks,
+                                  int remoteLogMetadataCustomMetadataMaxBytes,
                                   String remoteStorageManagerPrefix,
                                   Map<String, Object> remoteStorageManagerProps, /* properties having keys stripped out with remoteStorageManagerPrefix */
                                   String remoteLogMetadataManagerPrefix,
@@ -324,6 +340,7 @@
         this.remoteLogMetadataManagerPrefix = remoteLogMetadataManagerPrefix;
         this.remoteLogMetadataManagerProps = new HashMap<>(remoteLogMetadataManagerProps);
         this.remoteLogMetadataManagerListenerName = remoteLogMetadataManagerListenerName;
+        this.remoteLogMetadataCustomMetadataMaxBytes = remoteLogMetadataCustomMetadataMaxBytes;
     }
 
     public boolean enableRemoteStorageSystem() {
@@ -382,6 +399,10 @@
         return remoteLogMetadataManagerListenerName;
     }
 
+    public int remoteLogMetadataCustomMetadataMaxBytes() {
+        return remoteLogMetadataCustomMetadataMaxBytes;
+    }
+
     public String remoteStorageManagerPrefix() {
         return remoteStorageManagerPrefix;
     }
@@ -412,6 +433,7 @@
                 && remoteLogManagerTaskRetryJitter == that.remoteLogManagerTaskRetryJitter
                 && remoteLogReaderThreads == that.remoteLogReaderThreads
                 && remoteLogReaderMaxPendingTasks == that.remoteLogReaderMaxPendingTasks
+                && remoteLogMetadataCustomMetadataMaxBytes == that.remoteLogMetadataCustomMetadataMaxBytes
                 && Objects.equals(remoteStorageManagerClassName, that.remoteStorageManagerClassName)
                 && Objects.equals(remoteStorageManagerClassPath, that.remoteStorageManagerClassPath)
                 && Objects.equals(remoteLogMetadataManagerClassName, that.remoteLogMetadataManagerClassName)
@@ -427,7 +449,7 @@
     public int hashCode() {
         return Objects.hash(enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath,
                             remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
-                            remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs,
+                            remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs,
                             remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
                             remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
                             remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix);
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
index d18144e..c737135 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
@@ -117,6 +117,14 @@
       "about": "Segment size in bytes."
     },
     {
+      "name": "CustomMetadata",
+      "type": "bytes",
+      "default": "null",
+      "versions": "0+",
+      "nullableVersions": "0+",
+      "about": "Custom metadata."
+    },
+    {
       "name": "RemoteLogSegmentState",
       "type": "int8",
       "versions": "0+",
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
index dbb2913..20fb173 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -83,6 +83,14 @@
       "about": "Segment size in bytes"
     },
     {
+      "name": "CustomMetadata",
+      "type": "bytes",
+      "default": "null",
+      "versions": "0+",
+      "nullableVersions": "0+",
+      "about": "Custom metadata."
+    },
+    {
       "name": "RemoteLogSegmentState",
       "type": "int8",
       "versions": "0+",
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
index 24003dc..48aa34d 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
@@ -73,6 +73,14 @@
       "about": "Epoch time in milli seconds at which this event is generated."
     },
     {
+      "name": "CustomMetadata",
+      "type": "bytes",
+      "default": "null",
+      "versions": "0+",
+      "nullableVersions": "0+",
+      "about": "Custom metadata."
+    },
+    {
       "name": "RemoteLogSegmentState",
       "type": "int8",
       "versions": "0+",
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
index 5f77417..d5341e0 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.test.TestUtils;
@@ -50,8 +51,10 @@
                                                                           0, 100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
                                                                           1024 * 1024, Collections.singletonMap(0, 0L));
         cache.addCopyInProgressSegment(metadata1);
-        RemoteLogSegmentMetadataUpdate metadataUpdate1 = new RemoteLogSegmentMetadataUpdate(segmentId1, System.currentTimeMillis(),
-                                                                                           RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        RemoteLogSegmentMetadataUpdate metadataUpdate1 = new RemoteLogSegmentMetadataUpdate(
+                segmentId1, System.currentTimeMillis(),
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
         cache.updateRemoteLogSegmentMetadata(metadataUpdate1);
         Optional<RemoteLogSegmentMetadata> receivedMetadata = cache.remoteLogSegmentMetadata(0, 0L);
         assertTrue(receivedMetadata.isPresent());
@@ -63,8 +66,10 @@
                                                                           0, 900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
                                                                           1024 * 1024, Collections.singletonMap(0, 0L));
         cache.addCopyInProgressSegment(metadata2);
-        RemoteLogSegmentMetadataUpdate metadataUpdate2 = new RemoteLogSegmentMetadataUpdate(segmentId2, System.currentTimeMillis(),
-                                                                                           RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        RemoteLogSegmentMetadataUpdate metadataUpdate2 = new RemoteLogSegmentMetadataUpdate(
+                segmentId2, System.currentTimeMillis(),
+                Optional.of(new CustomMetadata(new byte[]{4, 5, 6, 7})),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
         cache.updateRemoteLogSegmentMetadata(metadataUpdate2);
 
         // Fetch segment for leader epoch:0 and start offset:0, it should be the newly added segment.
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
index 789997f..6fe0846 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
@@ -31,6 +31,7 @@
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 
 public class RemoteLogMetadataCacheTest {
 
@@ -57,7 +58,7 @@
                         -1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
                 RemoteLogSegmentMetadata updatedMetadata = segmentMetadata
                         .createWithUpdates(new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
-                                time.milliseconds(), state, BROKER_ID_1));
+                                time.milliseconds(), Optional.empty(), state, BROKER_ID_1));
                 Assertions.assertThrows(IllegalArgumentException.class, () ->
                         cache.addCopyInProgressSegment(updatedMetadata));
             }
@@ -67,7 +68,9 @@
         Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> {
             RemoteLogSegmentId nonExistingId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
             cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(nonExistingId,
-                    time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
+                    time.milliseconds(),
+                    Optional.empty(),
+                    RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
         });
 
         // Check for invalid state transition.
@@ -75,7 +78,9 @@
             RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 0L), 0,
                     100, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
             cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
-                    time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
+                    time.milliseconds(),
+                    Optional.empty(),
+                    RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
         });
     }
 
@@ -90,8 +95,11 @@
                                                                                 BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
         cache.addCopyInProgressSegment(segmentMetadata);
 
-        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId,
-                                                                                              time.milliseconds(), state, BROKER_ID_1);
+        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
+                segmentId,
+                time.milliseconds(),
+                Optional.empty(),
+                state, BROKER_ID_1);
         cache.updateRemoteLogSegmentMetadata(segMetadataUpdate);
 
         return segmentMetadata.createWithUpdates(segMetadataUpdate);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index 3f9db8c..e3d1a2a 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -23,6 +23,8 @@
 import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -30,6 +32,7 @@
 import java.io.PrintStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -46,15 +49,22 @@
         segLeaderEpochs.put(1, 20L);
         segLeaderEpochs.put(2, 80L);
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID);
+        Optional<CustomMetadata> customMetadata = Optional.of(new CustomMetadata(new byte[10]));
         RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata(
                 remoteLogSegmentId, 0L, 100L, -1L, 1,
-                123L, 1024, segLeaderEpochs);
+                123L, 1024, customMetadata,
+                RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
 
         byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata);
         ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
 
         String expected = String.format(
-                "partition: 0, offset: 0, value: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0, id=%s}, startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, state=COPY_SEGMENT_STARTED}\n",
+                "partition: 0, offset: 0, value: " +
+                        "RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0, id=%s}, " +
+                        "startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, " +
+                        "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, " +
+                        "customMetadata=Optional[CustomMetadata{10 bytes}], " +
+                        "state=COPY_SEGMENT_STARTED}\n",
                 TOPIC_ID, SEGMENT_ID);
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
              PrintStream ps = new PrintStream(baos)) {
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 402d1a2..5b48790 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -34,6 +35,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class RemoteLogMetadataSerdeTest {
 
@@ -69,12 +71,17 @@
         segLeaderEpochs.put(2, 80L);
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
-                                            time.milliseconds(), 1024, segLeaderEpochs);
+                                            time.milliseconds(), 1024,
+                                            Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
+                                            RemoteLogSegmentState.COPY_SEGMENT_STARTED,
+                                            segLeaderEpochs
+        );
     }
 
     private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() {
         RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
         return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(),
+                                                  Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
                                                   RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
     }
 
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
index 1b46028..dbfbbf3 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Assertions;
@@ -61,10 +62,13 @@
         long startOffset = 0;
         for (int i = 0; i < 100; i++) {
             long endOffset = startOffset + 100L;
+            CustomMetadata customMetadata = new CustomMetadata(new byte[]{(byte) i});
             remoteLogSegmentMetadatas.add(
                     new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(), startOffset, endOffset,
                                                          System.currentTimeMillis(), 1, 100, 1024,
-                                                         RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, startOffset)));
+                                                         Optional.of(customMetadata),
+                                                         RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, startOffset)
+                    ));
             startOffset = endOffset + 1;
         }
 
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
index 87e7683..504f47e 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemotePartitionDeleteMetadataTransform;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -35,6 +36,7 @@
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.Optional;
 
 public class RemoteLogMetadataTransformTest {
     private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
@@ -58,6 +60,7 @@
 
         RemoteLogSegmentMetadataUpdate metadataUpdate =
                 new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
+                                                   Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
                                                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
         ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
         RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index b847e7cb..6928c6e 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -92,7 +92,8 @@
             });
 
             RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate(
-                    segment0Id, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+                    segment0Id, time.milliseconds(), Optional.empty(),
+                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
             remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segment0Update);
             RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update);
 
@@ -167,6 +168,7 @@
             remoteLogSegmentLifecycleManager
                     .updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
                                                                                        time.milliseconds(),
+                                                                                       Optional.empty(),
                                                                                        RemoteLogSegmentState.DELETE_SEGMENT_STARTED,
                                                                                        BROKER_ID_1));
             Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent());
@@ -176,6 +178,7 @@
             remoteLogSegmentLifecycleManager
                     .updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
                                                                                        time.milliseconds(),
+                                                                                       Optional.empty(),
                                                                                        RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
                                                                                        BROKER_ID_1));
             Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent());
@@ -218,7 +221,9 @@
                                                                                 time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
         remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
 
-        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), state, BROKER_ID_1);
+        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
+                Optional.empty(),
+                state, BROKER_ID_1);
         remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
 
         return segmentMetadata.createWithUpdates(segMetadataUpdate);
@@ -367,6 +372,7 @@
 
             RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
                                                                                                       time.milliseconds(),
+                                                                                                      Optional.empty(),
                                                                                                       RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
                                                                                                       BROKER_ID_1);
             remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
new file mode 100644
index 0000000..17d4242
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataSnapshotTransformTest {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        RemoteLogSegmentMetadataSnapshot snapshot = new RemoteLogSegmentMetadataSnapshot(
+                Uuid.randomUuid(),
+                0L, 100L, -1L, 0, 0, 1234,
+                customMetadata,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                segmentLeaderEpochs
+        );
+
+        RemoteLogSegmentMetadataSnapshotTransform transform = new RemoteLogSegmentMetadataSnapshotTransform();
+        ApiMessageAndVersion message = transform.toApiMessageAndVersion(snapshot);
+        assertEquals(snapshot, transform.fromApiMessageAndVersion(message));
+    }
+
+    private static Stream<Object> parameters() {
+        return Stream.of(
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+                Optional.of(new CustomMetadata(new byte[0])),
+                Optional.empty()
+        );
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
new file mode 100644
index 0000000..84324e7
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransformTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTransformTest {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), 0, "topic"), Uuid.randomUuid()),
+                0L, 100L, -1L, 0, 0, 1234,
+                customMetadata,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                segmentLeaderEpochs
+        );
+
+        RemoteLogSegmentMetadataTransform transform = new RemoteLogSegmentMetadataTransform();
+        ApiMessageAndVersion message = transform.toApiMessageAndVersion(metadata);
+        assertEquals(metadata, transform.fromApiMessageAndVersion(message));
+    }
+
+    private static Stream<Object> parameters() {
+        return Stream.of(
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+                Optional.of(new CustomMetadata(new byte[0])),
+                Optional.empty()
+        );
+    }
+}
\ No newline at end of file
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
new file mode 100644
index 0000000..09f1b76
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransformTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataUpdateTransformTest {
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        segmentLeaderEpochs.put(0, 0L);
+        RemoteLogSegmentMetadataUpdate metadataUpdate = new RemoteLogSegmentMetadataUpdate(
+                new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), 0, "topic"), Uuid.randomUuid()),
+                123L,
+                customMetadata,
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                1
+        );
+
+        RemoteLogSegmentMetadataUpdateTransform transform = new RemoteLogSegmentMetadataUpdateTransform();
+        ApiMessageAndVersion message = transform.toApiMessageAndVersion(metadataUpdate);
+        assertEquals(metadataUpdate, transform.fromApiMessageAndVersion(message));
+    }
+
+    private static Stream<Object> parameters() {
+        return Stream.of(
+                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+                Optional.of(new CustomMetadata(new byte[0])),
+                Optional.empty()
+        );
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
index c9541f6..8650cea 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/InmemoryRemoteStorageManager.java
@@ -26,8 +26,11 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
 /**
  * This class is an implementation of {@link RemoteStorageManager} backed by in-memory store.
  */
@@ -52,8 +55,8 @@
     }
 
     @Override
-    public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
-                                   LogSegmentData logSegmentData)
+    public Optional<CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                                       LogSegmentData logSegmentData)
             throws RemoteStorageException {
         log.debug("copying log segment and indexes for : {}", remoteLogSegmentMetadata);
         Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
@@ -83,6 +86,7 @@
             throw new RemoteStorageException(e);
         }
         log.debug("copied log segment and indexes for : {} successfully.", remoteLogSegmentMetadata);
+        return Optional.empty();
     }
 
     @Override
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
index 058d8a5..a8847e5 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
@@ -302,9 +303,9 @@
     }
 
     @Override
-    public void copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final LogSegmentData data)
+    public Optional<CustomMetadata> copyLogSegmentData(final RemoteLogSegmentMetadata metadata, final LogSegmentData data)
             throws RemoteStorageException {
-        Callable<Void> callable = () -> {
+        Callable<Optional<CustomMetadata>> callable = () -> {
             final RemoteLogSegmentId id = metadata.remoteLogSegmentId();
             final LocalTieredStorageEvent.Builder eventBuilder = newEventBuilder(COPY_SEGMENT, id);
             RemoteLogSegmentFileset fileset = null;
@@ -331,10 +332,10 @@
                 throw e;
             }
 
-            return null;
+            return Optional.empty();
         };
 
-        wrap(callable);
+        return wrap(callable);
     }
 
     @Override
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index bb3c2ff..c8b428c 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -41,7 +41,7 @@
         RemoteLogManagerConfig expectedRemoteLogManagerConfig
                 = new RemoteLogManagerConfig(true, "dummy.remote.storage.class", "dummy.remote.storage.class.path",
                                              "dummy.remote.log.metadata.class", "dummy.remote.log.metadata.class.path",
-                                             "listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100,
+                                             "listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100,
                                              rsmPrefix, rsmProps, rlmmPrefix, rlmmProps);
 
         Map<String, Object> props = extractProps(expectedRemoteLogManagerConfig);
@@ -82,6 +82,8 @@
                   remoteLogManagerConfig.remoteLogReaderThreads());
         props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
                   remoteLogManagerConfig.remoteLogReaderMaxPendingTasks());
+        props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+                  remoteLogManagerConfig.remoteLogMetadataCustomMetadataMaxBytes());
         props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
                   remoteLogManagerConfig.remoteStorageManagerPrefix());
         props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 95521c4..528843a 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -69,7 +69,8 @@
 
             // 2.Move that segment to COPY_SEGMENT_FINISHED state and this segment should be available.
             RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
-                                                                                                      RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+                    Optional.empty(),
+                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
                                                                                                       BROKER_ID_1);
             // Wait until the segment is updated successfully.
             remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
@@ -108,7 +109,7 @@
             remoteLogMetadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
 
             RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
-                    segmentId, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+                    segmentId, time.milliseconds(), Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
             // Wait until the segment is updated successfully.
             remoteLogMetadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();