[ISSUE #6624]Support mark() & reset() for TieredFileSegmentInputStream (#6625)

* test(tieredstorage): add UT to verify TieredFileSegmentInputStream

1. add UT to verify TieredFileSegmentInputStream

* refactor(tieredstorage): refactor TieredFileSegmentInputStream

1. refactor TieredFileSegmentInputStream

* feat(tieredstorage): support mark&reset TieredFileSegmentInputStream

1. support mark&reset TieredFileSegmentInputStream

* style(tieredstorage): remove commended code in TieredFileSegmentInputStreamTest

1. remove commended code in TieredFileSegmentInputStreamTest

* refactor(tieredstorage): better code placement

1. better code placement

* refactor(tieredstorage): refactor TieredFileSegmentInputStream for better understandability

1. refactor TieredFileSegmentInputStream for better understandability

* refactor(tieredstorage): refactor some code in TieredFileSegmentInputStream

1. refactor some code in TieredFileSegmentInputStream

* refactor(tieredstorage): refactor TieredFileSegmentInputStream

1. refactor TieredFileSegmentInputStream
2. add a
TieredFileSegmentInputStream.Factory to build instance

* refactor(tieredstorage): refactor TieredFileSegmentInputStream related directory structure

1. refactor TieredFileSegmentInputStream related directory structure

* refactor(tieredstorage): delete `commitLogOffsetBuffer` in TieredCommitLogInputStream

1. delete `commitLogOffsetBuffer` in TieredCommitLogInputStream

* perf(tieredstorage): benchmark TieredFileSegmentInputStream pef

1. benchmark TieredFileSegmentInputStream pef

Closes https://github.com/apache/rocketmq/issues/6624

* feat(tieredstorage): optimized `read(byte[], int, int)` for TieredFIleSegmentInputStream

1. optimized `read(byte[], int, int)` for TieredFIleSegmentInputStream

Closes https://github.com/apache/rocketmq/issues/6624

* fix(tieredstorage): fix a dead cycle in TieredFileSegmentInputStream

1. fix a dead cycle in TieredFileSegmentInputStream.java
2. remove unused JMH
related dependency

Closes https://github.com/apache/rocketmq/issues/6624
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 2712e84..274f03e 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.tieredstore.provider;
 
 import com.google.common.base.Stopwatch;
-import java.io.InputStream;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -35,6 +35,8 @@
 import org.apache.rocketmq.tieredstore.container.TieredIndexFile;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
@@ -337,7 +339,7 @@
         if (bufferSize == 0) {
             return CompletableFuture.completedFuture(true);
         }
-        TieredFileSegmentInputStream inputStream = new TieredFileSegmentInputStream(fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
+        TieredFileSegmentInputStream inputStream = TieredFileSegmentInputStreamFactory.build(fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
         int finalBufferSize = bufferSize;
         try {
             inflightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
@@ -425,107 +427,4 @@
         }
     }
 
-    public static class TieredFileSegmentInputStream extends InputStream {
-
-        private final FileSegmentType fileType;
-        private final List<ByteBuffer> uploadBufferList;
-        private int bufferReadIndex = 0;
-        private int readOffset = 0;
-        // only used in commitLog
-        private long commitLogOffset;
-        private final ByteBuffer commitLogOffsetBuffer = ByteBuffer.allocate(8);
-        private final ByteBuffer codaBuffer;
-        private ByteBuffer curBuffer;
-        private final int contentLength;
-        private int readBytes = 0;
-
-        public TieredFileSegmentInputStream(FileSegmentType fileType, long startOffset,
-            List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
-            this.fileType = fileType;
-            this.commitLogOffset = startOffset;
-            this.commitLogOffsetBuffer.putLong(0, startOffset);
-            this.uploadBufferList = uploadBufferList;
-            this.codaBuffer = codaBuffer;
-            this.contentLength = contentLength;
-            if (uploadBufferList.size() > 0) {
-                this.curBuffer = uploadBufferList.get(0);
-            }
-            if (fileType == FileSegmentType.INDEX && uploadBufferList.size() != 1) {
-                logger.error("[Bug]TieredFileSegmentInputStream: index file must have only one buffer");
-            }
-        }
-
-        public List<ByteBuffer> getUploadBufferList() {
-            return uploadBufferList;
-        }
-
-        public ByteBuffer getCodaBuffer() {
-            return codaBuffer;
-        }
-
-        @Override
-        public int available() {
-            return contentLength - readBytes;
-        }
-
-        @Override
-        public int read() {
-            if (bufferReadIndex >= uploadBufferList.size()) {
-                return readCoda();
-            }
-
-            int res;
-            switch (fileType) {
-                case COMMIT_LOG:
-                    if (readOffset >= curBuffer.remaining()) {
-                        bufferReadIndex++;
-                        if (bufferReadIndex >= uploadBufferList.size()) {
-                            return readCoda();
-                        }
-                        curBuffer = uploadBufferList.get(bufferReadIndex);
-                        commitLogOffset += readOffset;
-                        commitLogOffsetBuffer.putLong(0, commitLogOffset);
-                        readOffset = 0;
-                    }
-                    if (readOffset >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION && readOffset < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
-                        res = commitLogOffsetBuffer.get(readOffset - MessageBufferUtil.PHYSICAL_OFFSET_POSITION) & 0xff;
-                        readOffset++;
-                    } else {
-                        res = curBuffer.get(readOffset++) & 0xff;
-                    }
-                    break;
-                case CONSUME_QUEUE:
-                    if (!curBuffer.hasRemaining()) {
-                        bufferReadIndex++;
-                        if (bufferReadIndex >= uploadBufferList.size()) {
-                            return -1;
-                        }
-                        curBuffer = uploadBufferList.get(bufferReadIndex);
-                    }
-                    res = curBuffer.get() & 0xff;
-                    break;
-                case INDEX:
-                    if (!curBuffer.hasRemaining()) {
-                        return -1;
-                    }
-                    res = curBuffer.get() & 0xff;
-                    break;
-                default:
-                    throw new IllegalStateException("unknown file type");
-            }
-            readBytes++;
-            return res;
-        }
-
-        private int readCoda() {
-            if (fileType != FileSegmentType.COMMIT_LOG || codaBuffer == null) {
-                return -1;
-            }
-            if (!codaBuffer.hasRemaining()) {
-                return -1;
-            }
-            readBytes++;
-            return codaBuffer.get() & 0xff;
-        }
-    }
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index 081143c..f043e07 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.tieredstore.provider;
 
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
@@ -69,6 +71,6 @@
      * @param append try to append or create a new file
      * @return put result, <code>true</code> if data successfully write; <code>false</code> otherwise
      */
-    CompletableFuture<Boolean> commit0(TieredFileSegment.TieredFileSegmentInputStream inputStream,
-        long position, int length, boolean append);
+    CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream,
+                                       long position, int length, boolean append);
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
new file mode 100644
index 0000000..f5be381
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
+
+    /**
+     * commitLogOffset is the real physical offset of the commitLog buffer which is being read
+     */
+    private long commitLogOffset;
+
+    private final ByteBuffer codaBuffer;
+    
+    private long markCommitLogOffset = -1;
+
+    public TieredCommitLogInputStream(TieredFileSegment.FileSegmentType fileType, long startOffset,
+                                      List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
+        super(fileType, uploadBufferList, contentLength);
+        this.commitLogOffset = startOffset;
+        this.codaBuffer = codaBuffer;
+    }
+
+    @Override
+    public synchronized void mark(int ignore) {
+        super.mark(ignore);
+        this.markCommitLogOffset = commitLogOffset;
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        super.reset();
+        this.commitLogOffset = markCommitLogOffset;
+    }
+
+    @Override
+    public ByteBuffer getCodaBuffer() {
+        return this.codaBuffer;
+    }
+
+    @Override
+    public int read() {
+        if (available() <= 0) {
+            return -1;
+        }
+        readPosition++;
+        if (curReadBufferIndex >= uploadBufferList.size()) {
+            return readCoda();
+        }
+        int res;
+        if (readPosInCurBuffer >= curBuffer.remaining()) {
+            curReadBufferIndex++;
+            if (curReadBufferIndex >= uploadBufferList.size()) {
+                readPosInCurBuffer = 0;
+                return readCoda();
+            }
+            curBuffer = uploadBufferList.get(curReadBufferIndex);
+            commitLogOffset += readPosInCurBuffer;
+            readPosInCurBuffer = 0;
+        }
+        if (readPosInCurBuffer >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION && readPosInCurBuffer < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+            res = (int) ((commitLogOffset >> (8 * (MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - readPosInCurBuffer - 1))) & 0xff);
+            readPosInCurBuffer++;
+        } else {
+            res = curBuffer.get(readPosInCurBuffer++) & 0xff;
+        }
+        return res;
+    }
+
+    private int readCoda() {
+        if (codaBuffer == null || readPosInCurBuffer >= codaBuffer.remaining()) {
+            return -1;
+        }
+        return codaBuffer.get(readPosInCurBuffer++) & 0xff;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off < 0 || len < 0 || len > b.length - off");
+        }
+        if (readPosition >= contentLength) {
+            return -1;
+        }
+
+        int available = available();
+        if (len > available) {
+            len = available;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        int needRead = len;
+        int pos = readPosition;
+        int bufIndex = curReadBufferIndex;
+        int posInCurBuffer = readPosInCurBuffer;
+        long curCommitLogOffset = commitLogOffset;
+        ByteBuffer curBuf = curBuffer;
+        while (needRead > 0 && bufIndex <= uploadBufferList.size()) {
+            int readLen, remaining, realReadLen = 0;
+            if (bufIndex == uploadBufferList.size()) {
+                // read from coda buffer
+                remaining = codaBuffer.remaining() - posInCurBuffer;
+                readLen = remaining < needRead ? remaining : needRead;
+                codaBuffer.position(posInCurBuffer);
+                codaBuffer.get(b, off, readLen);
+                codaBuffer.position(0);
+                // update flags
+                off += readLen;
+                needRead -= readLen;
+                pos += readLen;
+                posInCurBuffer += readLen;
+                continue;
+            }
+            remaining = curBuf.remaining() - posInCurBuffer;
+            readLen = remaining < needRead ? remaining : needRead;
+            curBuf = uploadBufferList.get(bufIndex);
+            if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) {
+                realReadLen = MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer < readLen ?
+                        MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer : readLen;
+                // read from commitLog buffer
+                curBuf.position(posInCurBuffer);
+                curBuf.get(b, off, realReadLen);
+                curBuf.position(0);
+            } else if (posInCurBuffer < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+                realReadLen = MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - posInCurBuffer < readLen ?
+                        MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - posInCurBuffer : readLen;
+                // read from converted PHYSICAL_OFFSET_POSITION
+                byte[] physicalOffsetBytes = new byte[realReadLen];
+                for (int i = 0; i < realReadLen; i++) {
+                    physicalOffsetBytes[i] = (byte) ((curCommitLogOffset >> (8 * (MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - posInCurBuffer - i - 1))) & 0xff);
+                }
+                System.arraycopy(physicalOffsetBytes, 0, b, off, realReadLen);
+            } else if (posInCurBuffer >= MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+                realReadLen = readLen;
+                // read from commitLog buffer
+                curBuf.position(posInCurBuffer);
+                curBuf.get(b, off, readLen);
+                curBuf.position(0);
+            }
+            // update flags
+            off += realReadLen;
+            needRead -= realReadLen;
+            pos += realReadLen;
+            posInCurBuffer += realReadLen;
+            if (posInCurBuffer == curBuffer.remaining()) {
+                // read from next buf
+                bufIndex++;
+                curCommitLogOffset += posInCurBuffer;
+                posInCurBuffer = 0;
+            }
+        }
+        readPosition = pos;
+        curReadBufferIndex = bufIndex;
+        readPosInCurBuffer = posInCurBuffer;
+        commitLogOffset = curCommitLogOffset;
+        curBuffer = curBuf;
+        return len;
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
new file mode 100644
index 0000000..d5118c1
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredFileSegmentInputStream extends InputStream {
+
+    private final TieredFileSegment.FileSegmentType fileType;
+    protected final List<ByteBuffer> uploadBufferList;
+    protected final int contentLength;
+
+    /**
+     * readPosition is the now position in the stream
+     */
+    protected int readPosition = 0;
+
+    /**
+     * curReadBufferIndex is the index of the buffer in uploadBufferList which is being read
+     */
+    protected int curReadBufferIndex = 0;
+    /**
+     * readPosInCurBuffer is the position in the buffer which is being read
+     */
+    protected int readPosInCurBuffer = 0;
+
+    /**
+     * curBuffer is the buffer which is being read, it is the same as uploadBufferList.get(curReadBufferIndex)
+     */
+    protected ByteBuffer curBuffer;
+
+    private int markReadPosition = -1;
+
+    private int markCurReadBufferIndex = -1;
+
+    private int markReadPosInCurBuffer = -1;
+
+    public TieredFileSegmentInputStream(TieredFileSegment.FileSegmentType fileType, List<ByteBuffer> uploadBufferList,
+        int contentLength) {
+        this.fileType = fileType;
+        this.contentLength = contentLength;
+        this.uploadBufferList = uploadBufferList;
+        if (uploadBufferList.size() > 0) {
+            this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+        }
+    }
+
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+
+    @Override
+    public synchronized void mark(int ignore) {
+        this.markReadPosition = readPosition;
+        this.markCurReadBufferIndex = curReadBufferIndex;
+        this.markReadPosInCurBuffer = readPosInCurBuffer;
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        if (this.markReadPosition == -1) {
+            throw new IOException("mark not set");
+        }
+        this.readPosition = markReadPosition;
+        this.curReadBufferIndex = markCurReadBufferIndex;
+        this.readPosInCurBuffer = markReadPosInCurBuffer;
+        if (this.curReadBufferIndex < uploadBufferList.size()) {
+            this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+        }
+    }
+
+    @Override
+    public int available() {
+        return contentLength - readPosition;
+    }
+
+    public List<ByteBuffer> getUploadBufferList() {
+        return uploadBufferList;
+    }
+
+    public ByteBuffer getCodaBuffer() {
+        return null;
+    }
+
+    @Override
+    public int read() {
+        if (available() <= 0) {
+            return -1;
+        }
+        readPosition++;
+        if (readPosInCurBuffer >= curBuffer.remaining()) {
+            curReadBufferIndex++;
+            if (curReadBufferIndex >= uploadBufferList.size()) {
+                return -1;
+            }
+            curBuffer = uploadBufferList.get(curReadBufferIndex);
+            readPosInCurBuffer = 0;
+        }
+        return curBuffer.get(readPosInCurBuffer++) & 0xff;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off < 0 || len < 0 || len > b.length - off");
+        }
+        if (readPosition >= contentLength) {
+            return -1;
+        }
+
+        int available = available();
+        if (len > available) {
+            len = available;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        int needRead = len;
+        int pos = readPosition;
+        int bufIndex = curReadBufferIndex;
+        int posInCurBuffer = readPosInCurBuffer;
+        ByteBuffer curBuf = curBuffer;
+        while (needRead > 0 && bufIndex < uploadBufferList.size()) {
+            curBuf = uploadBufferList.get(bufIndex);
+            int remaining = curBuf.remaining() - posInCurBuffer;
+            int readLen = remaining < needRead ? remaining : needRead;
+            // read from curBuf
+            curBuf.position(posInCurBuffer);
+            curBuf.get(b, off, readLen);
+            curBuf.position(0);
+            // update flags
+            off += readLen;
+            needRead -= readLen;
+            pos += readLen;
+            posInCurBuffer += readLen;
+            if (posInCurBuffer == curBuffer.remaining()) {
+                // read from next buf
+                bufIndex++;
+                posInCurBuffer = 0;
+            }
+        }
+        readPosition = pos;
+        curReadBufferIndex = bufIndex;
+        readPosInCurBuffer = posInCurBuffer;
+        curBuffer = curBuf;
+        return len;
+    }
+}
+
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
new file mode 100644
index 0000000..e6f7749
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredFileSegmentInputStreamFactory {
+
+    public static TieredFileSegmentInputStream build(TieredFileSegment.FileSegmentType fileType,
+                                                     long startOffset,
+                                                     List<ByteBuffer> uploadBufferList,
+                                                     ByteBuffer codaBuffer,
+                                                     int contentLength) {
+        if (fileType == TieredFileSegment.FileSegmentType.COMMIT_LOG) {
+            return new TieredCommitLogInputStream(fileType, startOffset, uploadBufferList, codaBuffer, contentLength);
+        } else if (fileType == TieredFileSegment.FileSegmentType.CONSUME_QUEUE) {
+            return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength);
+        } else if (fileType == TieredFileSegment.FileSegmentType.INDEX) {
+            if (uploadBufferList.size() != 1) {
+                throw new IllegalArgumentException("uploadBufferList size in INDEX type input stream must be 1");
+            }
+            return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength);
+        } else {
+            throw new IllegalArgumentException("fileType is not supported");
+        }
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 9d9620f..7032799 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -35,6 +35,7 @@
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
 import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
 import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
@@ -182,7 +183,7 @@
 
     @Override
     public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
-        boolean append) {
+                                              boolean append) {
         Stopwatch stopwatch = Stopwatch.createStarted();
         AttributesBuilder attributesBuilder = newAttributesBuilder()
             .put(LABEL_OPERATION, OPERATION_POSIX_WRITE);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index 860b172..b5c4e9d 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -84,7 +84,7 @@
         DefaultMessageStore defaultMessageStore = Mockito.mock(DefaultMessageStore.class);
         TieredDispatcher dispatcher = new TieredDispatcher(defaultMessageStore, storeConfig);
 
-        SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
+        SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
         Mockito.when(defaultMessageStore.selectOneMessageByOffset(7, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
         DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 6, 7, MessageBufferUtilTest.MSG_LEN, 1);
         dispatcher.dispatch(request);
@@ -98,13 +98,13 @@
         dispatcher.buildCQAndIndexFile();
         Assert.assertEquals(7, container.getConsumeQueueMaxOffset());
 
-        ByteBuffer buffer1 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 7);
         container.appendCommitLog(buffer1);
-        ByteBuffer buffer2 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 8);
         container.appendCommitLog(buffer2);
-        ByteBuffer buffer3 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer3 = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9);
         container.appendCommitLog(buffer3);
         container.commitCommitLog();
@@ -152,10 +152,10 @@
 
         Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult);
 
-        mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
+        mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
         Mockito.when(defaultStore.selectOneMessageByOffset(7, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
 
-        ByteBuffer msg = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 7);
         mockResult = new SelectMappedBufferResult(0, msg, MessageBufferUtilTest.MSG_LEN, null);
         Mockito.when(defaultStore.selectOneMessageByOffset(8, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index d0a3e3f..ddcc9fa 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -89,13 +89,13 @@
         getMessageResult = fetcher.getMessageAsync("group", mq.getTopic(), mq.getQueueId(), 0, 32, null).join();
         Assert.assertEquals(GetMessageStatus.NO_MESSAGE_IN_QUEUE, getMessageResult.getStatus());
 
-        ByteBuffer msg1 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
         msg1.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 0);
         AppendResult result = container.appendCommitLog(msg1);
         Assert.assertEquals(AppendResult.SUCCESS, result);
 
-        ByteBuffer msg2 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg2 = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
         msg2.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN);
         container.appendCommitLog(msg2);
@@ -199,7 +199,7 @@
         TieredMessageQueueContainer container = TieredContainerManager.getInstance(storeConfig).getOrCreateMQContainer(mq);
         container.initOffset(0);
 
-        ByteBuffer msg1 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
         msg1.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 0);
         long currentTimeMillis1 = System.currentTimeMillis();
@@ -207,7 +207,7 @@
         AppendResult result = container.appendCommitLog(msg1);
         Assert.assertEquals(AppendResult.SUCCESS, result);
 
-        ByteBuffer msg2 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg2 = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
         msg2.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN);
         long currentTimeMillis2 = System.currentTimeMillis();
@@ -244,7 +244,7 @@
 
 
         long timestamp = System.currentTimeMillis();
-        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 50);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp);
         container.initOffset(50);
@@ -266,13 +266,13 @@
         Assert.assertEquals(0, fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
 
         container.initOffset(0);
-        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
         container.appendCommitLog(buffer);
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
         container.appendCommitLog(buffer);
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 2);
         container.appendCommitLog(buffer);
 
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
index 11afa36..ccfe18b 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
@@ -68,7 +68,7 @@
     @Test
     public void testAppendCommitLog() throws ClassNotFoundException, NoSuchMethodException {
         TieredMessageQueueContainer container = new TieredMessageQueueContainer(mq, storeConfig);
-        ByteBuffer message = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
         AppendResult result = container.appendCommitLog(message);
         Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result);
 
@@ -143,27 +143,27 @@
         TieredMessageQueueContainer container = new TieredMessageQueueContainer(mq, storeConfig);
         container.initOffset(50);
         long timestamp1 = System.currentTimeMillis();
-        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 50);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp1);
         container.appendCommitLog(buffer, true);
 
         long timestamp2 = timestamp1 + 100;
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 51);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
         container.appendCommitLog(buffer, true);
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 52);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
         container.appendCommitLog(buffer, true);
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 53);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
         container.appendCommitLog(buffer, true);
 
         long timestamp3 = timestamp2 + 100;
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 54);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp3);
         container.appendCommitLog(buffer, true);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
index 254b151..3c47d1c 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
@@ -23,6 +23,7 @@
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
 import org.junit.Assert;
 
 public class MemoryFileSegment extends TieredFileSegment {
@@ -81,7 +82,7 @@
 
     @Override
     public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
-        boolean append) {
+                                              boolean append) {
         try {
             if (blocker != null && !blocker.get()) {
                 throw new IllegalStateException();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
index f7e5488..741a38c 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
@@ -20,6 +20,7 @@
 import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
 import org.junit.Assert;
 
 public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment {
@@ -37,7 +38,7 @@
 
     @Override
     public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
-        boolean append) {
+                                              boolean append) {
         try {
             if (blocker != null && !blocker.get()) {
                 throw new IllegalStateException();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
new file mode 100644
index 0000000..3d9fdba
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+import com.google.common.base.Supplier;
+import org.apache.rocketmq.tieredstore.container.TieredCommitLog;
+import org.apache.rocketmq.tieredstore.container.TieredConsumeQueue;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class TieredFileSegmentInputStreamTest {
+
+    private final static long COMMIT_LOG_START_OFFSET = 13131313;
+
+    private final static int MSG_LEN = MessageBufferUtilTest.MSG_LEN;
+
+    private final static int MSG_NUM = 10;
+
+    private final static int RESET_TIMES = 10;
+
+    private final static Random RANDOM = new Random();
+
+    @Test
+    public void testCommitLogTypeInputStream() {
+        List<ByteBuffer> uploadBufferList = new ArrayList<>();
+        int bufferSize = 0;
+        for (int i = 0; i < MSG_NUM; i++) {
+            ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedMessageBuffer();
+            uploadBufferList.add(byteBuffer);
+            bufferSize += byteBuffer.remaining();
+        }
+
+        // build expected byte buffer for verifying the TieredFileSegmentInputStream
+        ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+        for (ByteBuffer byteBuffer : uploadBufferList) {
+            expectedByteBuffer.put(byteBuffer);
+            byteBuffer.rewind();
+        }
+        // set real physical offset
+        for (int i = 0; i < MSG_NUM; i++) {
+            long physicalOffset = COMMIT_LOG_START_OFFSET + i * MSG_LEN;
+            int position = i * MSG_LEN + MessageBufferUtil.PHYSICAL_OFFSET_POSITION;
+            expectedByteBuffer.putLong(position, physicalOffset);
+        }
+
+        int finalBufferSize = bufferSize;
+        int[] batchReadSizeTestSet = {
+            MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 1
+        };
+        verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+            TieredFileSegment.FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), finalBufferSize, batchReadSizeTestSet);
+
+    }
+
+    @Test
+    public void testCommitLogTypeInputStreamWithCoda() {
+        List<ByteBuffer> uploadBufferList = new ArrayList<>();
+        int bufferSize = 0;
+        for (int i = 0; i < MSG_NUM; i++) {
+            ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedMessageBuffer();
+            uploadBufferList.add(byteBuffer);
+            bufferSize += byteBuffer.remaining();
+        }
+
+        ByteBuffer codaBuffer = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
+        codaBuffer.putInt(TieredCommitLog.CODA_SIZE);
+        codaBuffer.putInt(TieredCommitLog.BLANK_MAGIC_CODE);
+        long timeMillis = System.currentTimeMillis();
+        codaBuffer.putLong(timeMillis);
+        codaBuffer.flip();
+        int codaBufferSize = codaBuffer.remaining();
+        bufferSize += codaBufferSize;
+
+        // build expected byte buffer for verifying the TieredFileSegmentInputStream
+        ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+        for (ByteBuffer byteBuffer : uploadBufferList) {
+            expectedByteBuffer.put(byteBuffer);
+            byteBuffer.rewind();
+        }
+        expectedByteBuffer.put(codaBuffer);
+        codaBuffer.rewind();
+        // set real physical offset
+        for (int i = 0; i < MSG_NUM; i++) {
+            long physicalOffset = COMMIT_LOG_START_OFFSET + i * MSG_LEN;
+            int position = i * MSG_LEN + MessageBufferUtil.PHYSICAL_OFFSET_POSITION;
+            expectedByteBuffer.putLong(position, physicalOffset);
+        }
+
+        int finalBufferSize = bufferSize;
+        int[] batchReadSizeTestSet = {
+            MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1,
+            MSG_LEN - 1, MSG_LEN, MSG_LEN + 1,
+            bufferSize - 1, bufferSize, bufferSize + 1
+        };
+        verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+            TieredFileSegment.FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize, batchReadSizeTestSet);
+
+    }
+
+    @Test
+    public void testConsumeQueueTypeInputStream() {
+        List<ByteBuffer> uploadBufferList = new ArrayList<>();
+        int bufferSize = 0;
+        for (int i = 0; i < MSG_NUM; i++) {
+            ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedConsumeQueueBuffer();
+            uploadBufferList.add(byteBuffer);
+            bufferSize += byteBuffer.remaining();
+        }
+
+        // build expected byte buffer for verifying the TieredFileSegmentInputStream
+        ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+        for (ByteBuffer byteBuffer : uploadBufferList) {
+            expectedByteBuffer.put(byteBuffer);
+            byteBuffer.rewind();
+        }
+
+        int finalBufferSize = bufferSize;
+        int[] batchReadSizeTestSet = {TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1};
+        verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+            TieredFileSegment.FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet);
+
+    }
+
+    @Test
+    public void testIndexTypeInputStream() {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(24);
+        byteBuffer.putLong(1);
+        byteBuffer.putLong(2);
+        byteBuffer.putLong(3);
+        byteBuffer.flip();
+        List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer);
+
+        // build expected byte buffer for verifying the TieredFileSegmentInputStream
+        ByteBuffer expectedByteBuffer = byteBuffer.slice();
+
+        verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+            TieredFileSegment.FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList, null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25});
+    }
+
+    private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<TieredFileSegmentInputStream> constructor,
+        int bufferSize, int[] readBatchSizeTestSet) {
+        TieredFileSegmentInputStream inputStream = constructor.get();
+
+        // verify
+        verifyInputStream(inputStream, expectedByteBuffer);
+
+        // verify reset with method InputStream#mark() hasn't been called
+        try {
+            inputStream.reset();
+            Assert.fail("Should throw IOException");
+        } catch (IOException e) {
+            Assert.assertTrue(e instanceof IOException);
+        }
+
+        // verify reset with method InputStream#mark() has been called
+        int resetPosition = RANDOM.nextInt(bufferSize);
+        int expectedResetPosition = 0;
+        inputStream = constructor.get();
+        // verify and mark with resetPosition, use read() to read a byte each time
+        for (int i = 0; i < RESET_TIMES; i++) {
+            verifyInputStream(inputStream, expectedByteBuffer, expectedResetPosition, resetPosition);
+
+            try {
+                inputStream.reset();
+            } catch (IOException e) {
+                Assert.fail("Should not throw IOException");
+            }
+
+            expectedResetPosition = resetPosition;
+            resetPosition += RANDOM.nextInt(bufferSize - resetPosition);
+        }
+        for (int i = 0; i < readBatchSizeTestSet.length; i++) {
+            inputStream = constructor.get();
+            int readBatchSize = readBatchSizeTestSet[i];
+            expectedResetPosition = 0;
+            resetPosition = readBatchSize * RANDOM.nextInt(1 + bufferSize / readBatchSize);
+            // verify and mark with resetPosition, use read(byte[]) to read a byte array each time
+            for (int j = 0; j < RESET_TIMES; j++) {
+                verifyInputStreamViaBatchRead(inputStream, expectedByteBuffer, expectedResetPosition, resetPosition, readBatchSize);
+                try {
+                    inputStream.reset();
+                } catch (IOException e) {
+                    Assert.fail("Should not throw IOException");
+                }
+
+                expectedResetPosition = resetPosition;
+                resetPosition += readBatchSize * RANDOM.nextInt(1 + (bufferSize - resetPosition) / readBatchSize);
+            }
+        }
+    }
+
+    private void verifyInputStream(InputStream inputStream, ByteBuffer expectedBuffer) {
+        verifyInputStream(inputStream, expectedBuffer, 0, -1);
+    }
+
+    /**
+     * verify the input stream
+     *
+     * @param inputStream           the input stream to be verified
+     * @param expectedBuffer        the expected byte buffer
+     * @param expectedBufferReadPos the expected start position of the expected byte buffer
+     * @param expectedMarkCalledPos the expected position when the method InputStream#mark() is called. <i>(-1 means ignored)</i>
+     */
+    private void verifyInputStream(InputStream inputStream, ByteBuffer expectedBuffer, int expectedBufferReadPos,
+        int expectedMarkCalledPos) {
+        try {
+            expectedBuffer.position(expectedBufferReadPos);
+            while (true) {
+                if (expectedMarkCalledPos == expectedBuffer.position()) {
+                    inputStream.mark(0);
+                }
+                int b = inputStream.read();
+                if (b == -1)
+                    break;
+                Assert.assertEquals(expectedBuffer.get(), (byte) b);
+            }
+            Assert.assertFalse(expectedBuffer.hasRemaining());
+        } catch (IOException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    /**
+     * verify the input stream
+     *
+     * @param inputStream           the input stream to be verified
+     * @param expectedBuffer        the expected byte buffer
+     * @param expectedBufferReadPos the expected start position of the expected byte buffer
+     * @param expectedMarkCalledPos the expected position when the method InputStream#mark() is called. <i>(-1 means ignored)</i>
+     * @param readBatchSize         the batch size of each read(byte[]) operation
+     */
+    private void verifyInputStreamViaBatchRead(InputStream inputStream, ByteBuffer expectedBuffer,
+        int expectedBufferReadPos, int expectedMarkCalledPos, int readBatchSize) {
+        try {
+            expectedBuffer.position(expectedBufferReadPos);
+            byte[] buf = new byte[readBatchSize];
+            while (true) {
+                if (expectedMarkCalledPos == expectedBuffer.position()) {
+                    inputStream.mark(0);
+                }
+                int len = inputStream.read(buf, 0, readBatchSize);
+                if (len == -1)
+                    break;
+                byte[] expected = new byte[len];
+                expectedBuffer.get(expected, 0, len);
+                for (int i = 0; i < len; i++) {
+                    Assert.assertEquals(expected[i], buf[i]);
+                }
+            }
+            Assert.assertFalse(expectedBuffer.hasRemaining());
+        } catch (IOException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
index f55f748..79b1883 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
@@ -41,11 +41,11 @@
         TieredFileSegment segment = createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
         segment.initPosition(segment.getSize());
         long lastSize = segment.getSize();
-        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
-        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
         Assert.assertTrue(segment.needCommit());
 
-        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         long msg3StoreTime = System.currentTimeMillis();
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, msg3StoreTime);
         long queueOffset = baseOffset * 1000L;
@@ -117,8 +117,8 @@
         long startTime = System.currentTimeMillis();
         MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
         long lastSize = segment.getSize();
-        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
-        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
 
         segment.blocker = new CompletableFuture<>();
         new Thread(() -> {
@@ -127,7 +127,7 @@
             } catch (InterruptedException e) {
                 Assert.fail(e.getMessage());
             }
-            ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+            ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
             buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
             segment.append(buffer, 0);
             segment.blocker.complete(false);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index 268ea2d..befd401 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -50,7 +50,7 @@
         + 2 + 30 //properties
         + 0;
 
-    public static ByteBuffer buildMessageBuffer() {
+    public static ByteBuffer buildMockedMessageBuffer() {
         // Initialization of storage space
         ByteBuffer buffer = ByteBuffer.allocate(MSG_LEN);
         // 1 TOTALSIZE
@@ -99,23 +99,36 @@
         return buffer;
     }
 
+    public static ByteBuffer buildMockedConsumeQueueBuffer() {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        // 1 COMMIT_LOG_OFFSET
+        byteBuffer.putLong(1);
+        // 2 MESSAGE_SIZE
+        byteBuffer.putInt(2);
+        // 3 TAG_HASH_CODE
+        byteBuffer.putLong(3);
+        byteBuffer.flip();
+        return byteBuffer;
+    }
+
+
     @Test
     public void testGetTotalSize() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         int totalSize = MessageBufferUtil.getTotalSize(buffer);
         Assert.assertEquals(MSG_LEN, totalSize);
     }
 
     @Test
     public void testGetMagicCode() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         int magicCode = MessageBufferUtil.getMagicCode(buffer);
         Assert.assertEquals(MessageDecoder.MESSAGE_MAGIC_CODE_V2, magicCode);
     }
 
     @Test
     public void testSplitMessages() {
-        ByteBuffer msgBuffer1 = buildMessageBuffer();
+        ByteBuffer msgBuffer1 = buildMockedMessageBuffer();
         msgBuffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 10);
         ByteBuffer msgBuffer2 = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
 
@@ -124,7 +137,7 @@
         msgBuffer2.putLong(System.currentTimeMillis());
         msgBuffer2.flip();
 
-        ByteBuffer msgBuffer3 = buildMessageBuffer();
+        ByteBuffer msgBuffer3 = buildMockedMessageBuffer();
         msgBuffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 11);
 
         ByteBuffer msgBuffer = ByteBuffer.allocate(msgBuffer1.remaining() + msgBuffer2.remaining() + msgBuffer3.remaining());
@@ -202,21 +215,21 @@
 
     @Test
     public void testGetQueueOffset() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         long queueOffset = MessageBufferUtil.getQueueOffset(buffer);
         Assert.assertEquals(6, queueOffset);
     }
 
     @Test
     public void testGetStoreTimeStamp() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         long storeTimeStamp = MessageBufferUtil.getStoreTimeStamp(buffer);
         Assert.assertEquals(11, storeTimeStamp);
     }
 
     @Test
     public void testGetOffsetId() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         InetSocketAddress inetSocketAddress = new InetSocketAddress("255.255.255.255", 65535);
         ByteBuffer addr = ByteBuffer.allocate(Long.BYTES);
         addr.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
@@ -232,7 +245,7 @@
 
     @Test
     public void testGetProperties() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         Map<String, String> properties = MessageBufferUtil.getProperties(buffer);
         Assert.assertEquals(2, properties.size());
         Assert.assertTrue(properties.containsKey(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));