[ISSUE #6624]Support mark() & reset() for TieredFileSegmentInputStream (#6625)
* test(tieredstorage): add UT to verify TieredFileSegmentInputStream
1. add UT to verify TieredFileSegmentInputStream
* refactor(tieredstorage): refactor TieredFileSegmentInputStream
1. refactor TieredFileSegmentInputStream
* feat(tieredstorage): support mark&reset TieredFileSegmentInputStream
1. support mark&reset TieredFileSegmentInputStream
* style(tieredstorage): remove commended code in TieredFileSegmentInputStreamTest
1. remove commended code in TieredFileSegmentInputStreamTest
* refactor(tieredstorage): better code placement
1. better code placement
* refactor(tieredstorage): refactor TieredFileSegmentInputStream for better understandability
1. refactor TieredFileSegmentInputStream for better understandability
* refactor(tieredstorage): refactor some code in TieredFileSegmentInputStream
1. refactor some code in TieredFileSegmentInputStream
* refactor(tieredstorage): refactor TieredFileSegmentInputStream
1. refactor TieredFileSegmentInputStream
2. add a
TieredFileSegmentInputStream.Factory to build instance
* refactor(tieredstorage): refactor TieredFileSegmentInputStream related directory structure
1. refactor TieredFileSegmentInputStream related directory structure
* refactor(tieredstorage): delete `commitLogOffsetBuffer` in TieredCommitLogInputStream
1. delete `commitLogOffsetBuffer` in TieredCommitLogInputStream
* perf(tieredstorage): benchmark TieredFileSegmentInputStream pef
1. benchmark TieredFileSegmentInputStream pef
Closes https://github.com/apache/rocketmq/issues/6624
* feat(tieredstorage): optimized `read(byte[], int, int)` for TieredFIleSegmentInputStream
1. optimized `read(byte[], int, int)` for TieredFIleSegmentInputStream
Closes https://github.com/apache/rocketmq/issues/6624
* fix(tieredstorage): fix a dead cycle in TieredFileSegmentInputStream
1. fix a dead cycle in TieredFileSegmentInputStream.java
2. remove unused JMH
related dependency
Closes https://github.com/apache/rocketmq/issues/6624
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 2712e84..274f03e 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.tieredstore.provider;
import com.google.common.base.Stopwatch;
-import java.io.InputStream;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -35,6 +35,8 @@
import org.apache.rocketmq.tieredstore.container.TieredIndexFile;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -337,7 +339,7 @@
if (bufferSize == 0) {
return CompletableFuture.completedFuture(true);
}
- TieredFileSegmentInputStream inputStream = new TieredFileSegmentInputStream(fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
+ TieredFileSegmentInputStream inputStream = TieredFileSegmentInputStreamFactory.build(fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
int finalBufferSize = bufferSize;
try {
inflightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
@@ -425,107 +427,4 @@
}
}
- public static class TieredFileSegmentInputStream extends InputStream {
-
- private final FileSegmentType fileType;
- private final List<ByteBuffer> uploadBufferList;
- private int bufferReadIndex = 0;
- private int readOffset = 0;
- // only used in commitLog
- private long commitLogOffset;
- private final ByteBuffer commitLogOffsetBuffer = ByteBuffer.allocate(8);
- private final ByteBuffer codaBuffer;
- private ByteBuffer curBuffer;
- private final int contentLength;
- private int readBytes = 0;
-
- public TieredFileSegmentInputStream(FileSegmentType fileType, long startOffset,
- List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
- this.fileType = fileType;
- this.commitLogOffset = startOffset;
- this.commitLogOffsetBuffer.putLong(0, startOffset);
- this.uploadBufferList = uploadBufferList;
- this.codaBuffer = codaBuffer;
- this.contentLength = contentLength;
- if (uploadBufferList.size() > 0) {
- this.curBuffer = uploadBufferList.get(0);
- }
- if (fileType == FileSegmentType.INDEX && uploadBufferList.size() != 1) {
- logger.error("[Bug]TieredFileSegmentInputStream: index file must have only one buffer");
- }
- }
-
- public List<ByteBuffer> getUploadBufferList() {
- return uploadBufferList;
- }
-
- public ByteBuffer getCodaBuffer() {
- return codaBuffer;
- }
-
- @Override
- public int available() {
- return contentLength - readBytes;
- }
-
- @Override
- public int read() {
- if (bufferReadIndex >= uploadBufferList.size()) {
- return readCoda();
- }
-
- int res;
- switch (fileType) {
- case COMMIT_LOG:
- if (readOffset >= curBuffer.remaining()) {
- bufferReadIndex++;
- if (bufferReadIndex >= uploadBufferList.size()) {
- return readCoda();
- }
- curBuffer = uploadBufferList.get(bufferReadIndex);
- commitLogOffset += readOffset;
- commitLogOffsetBuffer.putLong(0, commitLogOffset);
- readOffset = 0;
- }
- if (readOffset >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION && readOffset < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
- res = commitLogOffsetBuffer.get(readOffset - MessageBufferUtil.PHYSICAL_OFFSET_POSITION) & 0xff;
- readOffset++;
- } else {
- res = curBuffer.get(readOffset++) & 0xff;
- }
- break;
- case CONSUME_QUEUE:
- if (!curBuffer.hasRemaining()) {
- bufferReadIndex++;
- if (bufferReadIndex >= uploadBufferList.size()) {
- return -1;
- }
- curBuffer = uploadBufferList.get(bufferReadIndex);
- }
- res = curBuffer.get() & 0xff;
- break;
- case INDEX:
- if (!curBuffer.hasRemaining()) {
- return -1;
- }
- res = curBuffer.get() & 0xff;
- break;
- default:
- throw new IllegalStateException("unknown file type");
- }
- readBytes++;
- return res;
- }
-
- private int readCoda() {
- if (fileType != FileSegmentType.COMMIT_LOG || codaBuffer == null) {
- return -1;
- }
- if (!codaBuffer.hasRemaining()) {
- return -1;
- }
- readBytes++;
- return codaBuffer.get() & 0xff;
- }
- }
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index 081143c..f043e07 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.tieredstore.provider;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
@@ -69,6 +71,6 @@
* @param append try to append or create a new file
* @return put result, <code>true</code> if data successfully write; <code>false</code> otherwise
*/
- CompletableFuture<Boolean> commit0(TieredFileSegment.TieredFileSegmentInputStream inputStream,
- long position, int length, boolean append);
+ CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream,
+ long position, int length, boolean append);
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
new file mode 100644
index 0000000..f5be381
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
+
+ /**
+ * commitLogOffset is the real physical offset of the commitLog buffer which is being read
+ */
+ private long commitLogOffset;
+
+ private final ByteBuffer codaBuffer;
+
+ private long markCommitLogOffset = -1;
+
+ public TieredCommitLogInputStream(TieredFileSegment.FileSegmentType fileType, long startOffset,
+ List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
+ super(fileType, uploadBufferList, contentLength);
+ this.commitLogOffset = startOffset;
+ this.codaBuffer = codaBuffer;
+ }
+
+ @Override
+ public synchronized void mark(int ignore) {
+ super.mark(ignore);
+ this.markCommitLogOffset = commitLogOffset;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ super.reset();
+ this.commitLogOffset = markCommitLogOffset;
+ }
+
+ @Override
+ public ByteBuffer getCodaBuffer() {
+ return this.codaBuffer;
+ }
+
+ @Override
+ public int read() {
+ if (available() <= 0) {
+ return -1;
+ }
+ readPosition++;
+ if (curReadBufferIndex >= uploadBufferList.size()) {
+ return readCoda();
+ }
+ int res;
+ if (readPosInCurBuffer >= curBuffer.remaining()) {
+ curReadBufferIndex++;
+ if (curReadBufferIndex >= uploadBufferList.size()) {
+ readPosInCurBuffer = 0;
+ return readCoda();
+ }
+ curBuffer = uploadBufferList.get(curReadBufferIndex);
+ commitLogOffset += readPosInCurBuffer;
+ readPosInCurBuffer = 0;
+ }
+ if (readPosInCurBuffer >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION && readPosInCurBuffer < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+ res = (int) ((commitLogOffset >> (8 * (MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - readPosInCurBuffer - 1))) & 0xff);
+ readPosInCurBuffer++;
+ } else {
+ res = curBuffer.get(readPosInCurBuffer++) & 0xff;
+ }
+ return res;
+ }
+
+ private int readCoda() {
+ if (codaBuffer == null || readPosInCurBuffer >= codaBuffer.remaining()) {
+ return -1;
+ }
+ return codaBuffer.get(readPosInCurBuffer++) & 0xff;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException("off < 0 || len < 0 || len > b.length - off");
+ }
+ if (readPosition >= contentLength) {
+ return -1;
+ }
+
+ int available = available();
+ if (len > available) {
+ len = available;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ int needRead = len;
+ int pos = readPosition;
+ int bufIndex = curReadBufferIndex;
+ int posInCurBuffer = readPosInCurBuffer;
+ long curCommitLogOffset = commitLogOffset;
+ ByteBuffer curBuf = curBuffer;
+ while (needRead > 0 && bufIndex <= uploadBufferList.size()) {
+ int readLen, remaining, realReadLen = 0;
+ if (bufIndex == uploadBufferList.size()) {
+ // read from coda buffer
+ remaining = codaBuffer.remaining() - posInCurBuffer;
+ readLen = remaining < needRead ? remaining : needRead;
+ codaBuffer.position(posInCurBuffer);
+ codaBuffer.get(b, off, readLen);
+ codaBuffer.position(0);
+ // update flags
+ off += readLen;
+ needRead -= readLen;
+ pos += readLen;
+ posInCurBuffer += readLen;
+ continue;
+ }
+ remaining = curBuf.remaining() - posInCurBuffer;
+ readLen = remaining < needRead ? remaining : needRead;
+ curBuf = uploadBufferList.get(bufIndex);
+ if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) {
+ realReadLen = MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer < readLen ?
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer : readLen;
+ // read from commitLog buffer
+ curBuf.position(posInCurBuffer);
+ curBuf.get(b, off, realReadLen);
+ curBuf.position(0);
+ } else if (posInCurBuffer < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+ realReadLen = MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - posInCurBuffer < readLen ?
+ MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - posInCurBuffer : readLen;
+ // read from converted PHYSICAL_OFFSET_POSITION
+ byte[] physicalOffsetBytes = new byte[realReadLen];
+ for (int i = 0; i < realReadLen; i++) {
+ physicalOffsetBytes[i] = (byte) ((curCommitLogOffset >> (8 * (MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - posInCurBuffer - i - 1))) & 0xff);
+ }
+ System.arraycopy(physicalOffsetBytes, 0, b, off, realReadLen);
+ } else if (posInCurBuffer >= MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+ realReadLen = readLen;
+ // read from commitLog buffer
+ curBuf.position(posInCurBuffer);
+ curBuf.get(b, off, readLen);
+ curBuf.position(0);
+ }
+ // update flags
+ off += realReadLen;
+ needRead -= realReadLen;
+ pos += realReadLen;
+ posInCurBuffer += realReadLen;
+ if (posInCurBuffer == curBuffer.remaining()) {
+ // read from next buf
+ bufIndex++;
+ curCommitLogOffset += posInCurBuffer;
+ posInCurBuffer = 0;
+ }
+ }
+ readPosition = pos;
+ curReadBufferIndex = bufIndex;
+ readPosInCurBuffer = posInCurBuffer;
+ commitLogOffset = curCommitLogOffset;
+ curBuffer = curBuf;
+ return len;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
new file mode 100644
index 0000000..d5118c1
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredFileSegmentInputStream extends InputStream {
+
+ private final TieredFileSegment.FileSegmentType fileType;
+ protected final List<ByteBuffer> uploadBufferList;
+ protected final int contentLength;
+
+ /**
+ * readPosition is the now position in the stream
+ */
+ protected int readPosition = 0;
+
+ /**
+ * curReadBufferIndex is the index of the buffer in uploadBufferList which is being read
+ */
+ protected int curReadBufferIndex = 0;
+ /**
+ * readPosInCurBuffer is the position in the buffer which is being read
+ */
+ protected int readPosInCurBuffer = 0;
+
+ /**
+ * curBuffer is the buffer which is being read, it is the same as uploadBufferList.get(curReadBufferIndex)
+ */
+ protected ByteBuffer curBuffer;
+
+ private int markReadPosition = -1;
+
+ private int markCurReadBufferIndex = -1;
+
+ private int markReadPosInCurBuffer = -1;
+
+ public TieredFileSegmentInputStream(TieredFileSegment.FileSegmentType fileType, List<ByteBuffer> uploadBufferList,
+ int contentLength) {
+ this.fileType = fileType;
+ this.contentLength = contentLength;
+ this.uploadBufferList = uploadBufferList;
+ if (uploadBufferList.size() > 0) {
+ this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+ }
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public synchronized void mark(int ignore) {
+ this.markReadPosition = readPosition;
+ this.markCurReadBufferIndex = curReadBufferIndex;
+ this.markReadPosInCurBuffer = readPosInCurBuffer;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (this.markReadPosition == -1) {
+ throw new IOException("mark not set");
+ }
+ this.readPosition = markReadPosition;
+ this.curReadBufferIndex = markCurReadBufferIndex;
+ this.readPosInCurBuffer = markReadPosInCurBuffer;
+ if (this.curReadBufferIndex < uploadBufferList.size()) {
+ this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+ }
+ }
+
+ @Override
+ public int available() {
+ return contentLength - readPosition;
+ }
+
+ public List<ByteBuffer> getUploadBufferList() {
+ return uploadBufferList;
+ }
+
+ public ByteBuffer getCodaBuffer() {
+ return null;
+ }
+
+ @Override
+ public int read() {
+ if (available() <= 0) {
+ return -1;
+ }
+ readPosition++;
+ if (readPosInCurBuffer >= curBuffer.remaining()) {
+ curReadBufferIndex++;
+ if (curReadBufferIndex >= uploadBufferList.size()) {
+ return -1;
+ }
+ curBuffer = uploadBufferList.get(curReadBufferIndex);
+ readPosInCurBuffer = 0;
+ }
+ return curBuffer.get(readPosInCurBuffer++) & 0xff;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException("off < 0 || len < 0 || len > b.length - off");
+ }
+ if (readPosition >= contentLength) {
+ return -1;
+ }
+
+ int available = available();
+ if (len > available) {
+ len = available;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ int needRead = len;
+ int pos = readPosition;
+ int bufIndex = curReadBufferIndex;
+ int posInCurBuffer = readPosInCurBuffer;
+ ByteBuffer curBuf = curBuffer;
+ while (needRead > 0 && bufIndex < uploadBufferList.size()) {
+ curBuf = uploadBufferList.get(bufIndex);
+ int remaining = curBuf.remaining() - posInCurBuffer;
+ int readLen = remaining < needRead ? remaining : needRead;
+ // read from curBuf
+ curBuf.position(posInCurBuffer);
+ curBuf.get(b, off, readLen);
+ curBuf.position(0);
+ // update flags
+ off += readLen;
+ needRead -= readLen;
+ pos += readLen;
+ posInCurBuffer += readLen;
+ if (posInCurBuffer == curBuffer.remaining()) {
+ // read from next buf
+ bufIndex++;
+ posInCurBuffer = 0;
+ }
+ }
+ readPosition = pos;
+ curReadBufferIndex = bufIndex;
+ readPosInCurBuffer = posInCurBuffer;
+ curBuffer = curBuf;
+ return len;
+ }
+}
+
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
new file mode 100644
index 0000000..e6f7749
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredFileSegmentInputStreamFactory {
+
+ public static TieredFileSegmentInputStream build(TieredFileSegment.FileSegmentType fileType,
+ long startOffset,
+ List<ByteBuffer> uploadBufferList,
+ ByteBuffer codaBuffer,
+ int contentLength) {
+ if (fileType == TieredFileSegment.FileSegmentType.COMMIT_LOG) {
+ return new TieredCommitLogInputStream(fileType, startOffset, uploadBufferList, codaBuffer, contentLength);
+ } else if (fileType == TieredFileSegment.FileSegmentType.CONSUME_QUEUE) {
+ return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength);
+ } else if (fileType == TieredFileSegment.FileSegmentType.INDEX) {
+ if (uploadBufferList.size() != 1) {
+ throw new IllegalArgumentException("uploadBufferList size in INDEX type input stream must be 1");
+ }
+ return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength);
+ } else {
+ throw new IllegalArgumentException("fileType is not supported");
+ }
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 9d9620f..7032799 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -35,6 +35,7 @@
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
@@ -182,7 +183,7 @@
@Override
public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
- boolean append) {
+ boolean append) {
Stopwatch stopwatch = Stopwatch.createStarted();
AttributesBuilder attributesBuilder = newAttributesBuilder()
.put(LABEL_OPERATION, OPERATION_POSIX_WRITE);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index 860b172..b5c4e9d 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -84,7 +84,7 @@
DefaultMessageStore defaultMessageStore = Mockito.mock(DefaultMessageStore.class);
TieredDispatcher dispatcher = new TieredDispatcher(defaultMessageStore, storeConfig);
- SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
+ SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
Mockito.when(defaultMessageStore.selectOneMessageByOffset(7, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
DispatchRequest request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), 6, 7, MessageBufferUtilTest.MSG_LEN, 1);
dispatcher.dispatch(request);
@@ -98,13 +98,13 @@
dispatcher.buildCQAndIndexFile();
Assert.assertEquals(7, container.getConsumeQueueMaxOffset());
- ByteBuffer buffer1 = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 7);
container.appendCommitLog(buffer1);
- ByteBuffer buffer2 = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 8);
container.appendCommitLog(buffer2);
- ByteBuffer buffer3 = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer buffer3 = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9);
container.appendCommitLog(buffer3);
container.commitCommitLog();
@@ -152,10 +152,10 @@
Mockito.when(((ConsumeQueue) defaultStore.getConsumeQueue(mq.getTopic(), mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult);
- mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
+ mockResult = new SelectMappedBufferResult(0, MessageBufferUtilTest.buildMockedMessageBuffer(), MessageBufferUtilTest.MSG_LEN, null);
Mockito.when(defaultStore.selectOneMessageByOffset(7, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
- ByteBuffer msg = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer msg = MessageBufferUtilTest.buildMockedMessageBuffer();
msg.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 7);
mockResult = new SelectMappedBufferResult(0, msg, MessageBufferUtilTest.MSG_LEN, null);
Mockito.when(defaultStore.selectOneMessageByOffset(8, MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index d0a3e3f..ddcc9fa 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -89,13 +89,13 @@
getMessageResult = fetcher.getMessageAsync("group", mq.getTopic(), mq.getQueueId(), 0, 32, null).join();
Assert.assertEquals(GetMessageStatus.NO_MESSAGE_IN_QUEUE, getMessageResult.getStatus());
- ByteBuffer msg1 = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
msg1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
msg1.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 0);
AppendResult result = container.appendCommitLog(msg1);
Assert.assertEquals(AppendResult.SUCCESS, result);
- ByteBuffer msg2 = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer msg2 = MessageBufferUtilTest.buildMockedMessageBuffer();
msg2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
msg2.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN);
container.appendCommitLog(msg2);
@@ -199,7 +199,7 @@
TieredMessageQueueContainer container = TieredContainerManager.getInstance(storeConfig).getOrCreateMQContainer(mq);
container.initOffset(0);
- ByteBuffer msg1 = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
msg1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
msg1.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 0);
long currentTimeMillis1 = System.currentTimeMillis();
@@ -207,7 +207,7 @@
AppendResult result = container.appendCommitLog(msg1);
Assert.assertEquals(AppendResult.SUCCESS, result);
- ByteBuffer msg2 = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer msg2 = MessageBufferUtilTest.buildMockedMessageBuffer();
msg2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
msg2.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN);
long currentTimeMillis2 = System.currentTimeMillis();
@@ -244,7 +244,7 @@
long timestamp = System.currentTimeMillis();
- ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 50);
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp);
container.initOffset(50);
@@ -266,13 +266,13 @@
Assert.assertEquals(0, fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
container.initOffset(0);
- ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
container.appendCommitLog(buffer);
- buffer = MessageBufferUtilTest.buildMessageBuffer();
+ buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
container.appendCommitLog(buffer);
- buffer = MessageBufferUtilTest.buildMessageBuffer();
+ buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 2);
container.appendCommitLog(buffer);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
index 11afa36..ccfe18b 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
@@ -68,7 +68,7 @@
@Test
public void testAppendCommitLog() throws ClassNotFoundException, NoSuchMethodException {
TieredMessageQueueContainer container = new TieredMessageQueueContainer(mq, storeConfig);
- ByteBuffer message = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
AppendResult result = container.appendCommitLog(message);
Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result);
@@ -143,27 +143,27 @@
TieredMessageQueueContainer container = new TieredMessageQueueContainer(mq, storeConfig);
container.initOffset(50);
long timestamp1 = System.currentTimeMillis();
- ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 50);
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp1);
container.appendCommitLog(buffer, true);
long timestamp2 = timestamp1 + 100;
- buffer = MessageBufferUtilTest.buildMessageBuffer();
+ buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 51);
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
container.appendCommitLog(buffer, true);
- buffer = MessageBufferUtilTest.buildMessageBuffer();
+ buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 52);
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
container.appendCommitLog(buffer, true);
- buffer = MessageBufferUtilTest.buildMessageBuffer();
+ buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 53);
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
container.appendCommitLog(buffer, true);
long timestamp3 = timestamp2 + 100;
- buffer = MessageBufferUtilTest.buildMessageBuffer();
+ buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 54);
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp3);
container.appendCommitLog(buffer, true);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
index 254b151..3c47d1c 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
@@ -23,6 +23,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
import org.junit.Assert;
public class MemoryFileSegment extends TieredFileSegment {
@@ -81,7 +82,7 @@
@Override
public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
- boolean append) {
+ boolean append) {
try {
if (blocker != null && !blocker.get()) {
throw new IllegalStateException();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
index f7e5488..741a38c 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
@@ -20,6 +20,7 @@
import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
import org.junit.Assert;
public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment {
@@ -37,7 +38,7 @@
@Override
public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
- boolean append) {
+ boolean append) {
try {
if (blocker != null && !blocker.get()) {
throw new IllegalStateException();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
new file mode 100644
index 0000000..3d9fdba
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+import com.google.common.base.Supplier;
+import org.apache.rocketmq.tieredstore.container.TieredCommitLog;
+import org.apache.rocketmq.tieredstore.container.TieredConsumeQueue;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class TieredFileSegmentInputStreamTest {
+
+ private final static long COMMIT_LOG_START_OFFSET = 13131313;
+
+ private final static int MSG_LEN = MessageBufferUtilTest.MSG_LEN;
+
+ private final static int MSG_NUM = 10;
+
+ private final static int RESET_TIMES = 10;
+
+ private final static Random RANDOM = new Random();
+
+ @Test
+ public void testCommitLogTypeInputStream() {
+ List<ByteBuffer> uploadBufferList = new ArrayList<>();
+ int bufferSize = 0;
+ for (int i = 0; i < MSG_NUM; i++) {
+ ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedMessageBuffer();
+ uploadBufferList.add(byteBuffer);
+ bufferSize += byteBuffer.remaining();
+ }
+
+ // build expected byte buffer for verifying the TieredFileSegmentInputStream
+ ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+ for (ByteBuffer byteBuffer : uploadBufferList) {
+ expectedByteBuffer.put(byteBuffer);
+ byteBuffer.rewind();
+ }
+ // set real physical offset
+ for (int i = 0; i < MSG_NUM; i++) {
+ long physicalOffset = COMMIT_LOG_START_OFFSET + i * MSG_LEN;
+ int position = i * MSG_LEN + MessageBufferUtil.PHYSICAL_OFFSET_POSITION;
+ expectedByteBuffer.putLong(position, physicalOffset);
+ }
+
+ int finalBufferSize = bufferSize;
+ int[] batchReadSizeTestSet = {
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 1
+ };
+ verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+ TieredFileSegment.FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), finalBufferSize, batchReadSizeTestSet);
+
+ }
+
+ @Test
+ public void testCommitLogTypeInputStreamWithCoda() {
+ List<ByteBuffer> uploadBufferList = new ArrayList<>();
+ int bufferSize = 0;
+ for (int i = 0; i < MSG_NUM; i++) {
+ ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedMessageBuffer();
+ uploadBufferList.add(byteBuffer);
+ bufferSize += byteBuffer.remaining();
+ }
+
+ ByteBuffer codaBuffer = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
+ codaBuffer.putInt(TieredCommitLog.CODA_SIZE);
+ codaBuffer.putInt(TieredCommitLog.BLANK_MAGIC_CODE);
+ long timeMillis = System.currentTimeMillis();
+ codaBuffer.putLong(timeMillis);
+ codaBuffer.flip();
+ int codaBufferSize = codaBuffer.remaining();
+ bufferSize += codaBufferSize;
+
+ // build expected byte buffer for verifying the TieredFileSegmentInputStream
+ ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+ for (ByteBuffer byteBuffer : uploadBufferList) {
+ expectedByteBuffer.put(byteBuffer);
+ byteBuffer.rewind();
+ }
+ expectedByteBuffer.put(codaBuffer);
+ codaBuffer.rewind();
+ // set real physical offset
+ for (int i = 0; i < MSG_NUM; i++) {
+ long physicalOffset = COMMIT_LOG_START_OFFSET + i * MSG_LEN;
+ int position = i * MSG_LEN + MessageBufferUtil.PHYSICAL_OFFSET_POSITION;
+ expectedByteBuffer.putLong(position, physicalOffset);
+ }
+
+ int finalBufferSize = bufferSize;
+ int[] batchReadSizeTestSet = {
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1,
+ MSG_LEN - 1, MSG_LEN, MSG_LEN + 1,
+ bufferSize - 1, bufferSize, bufferSize + 1
+ };
+ verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+ TieredFileSegment.FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize, batchReadSizeTestSet);
+
+ }
+
+ @Test
+ public void testConsumeQueueTypeInputStream() {
+ List<ByteBuffer> uploadBufferList = new ArrayList<>();
+ int bufferSize = 0;
+ for (int i = 0; i < MSG_NUM; i++) {
+ ByteBuffer byteBuffer = MessageBufferUtilTest.buildMockedConsumeQueueBuffer();
+ uploadBufferList.add(byteBuffer);
+ bufferSize += byteBuffer.remaining();
+ }
+
+ // build expected byte buffer for verifying the TieredFileSegmentInputStream
+ ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+ for (ByteBuffer byteBuffer : uploadBufferList) {
+ expectedByteBuffer.put(byteBuffer);
+ byteBuffer.rewind();
+ }
+
+ int finalBufferSize = bufferSize;
+ int[] batchReadSizeTestSet = {TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1};
+ verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+ TieredFileSegment.FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet);
+
+ }
+
+ @Test
+ public void testIndexTypeInputStream() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(24);
+ byteBuffer.putLong(1);
+ byteBuffer.putLong(2);
+ byteBuffer.putLong(3);
+ byteBuffer.flip();
+ List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer);
+
+ // build expected byte buffer for verifying the TieredFileSegmentInputStream
+ ByteBuffer expectedByteBuffer = byteBuffer.slice();
+
+ verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build(
+ TieredFileSegment.FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList, null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25});
+ }
+
+ private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<TieredFileSegmentInputStream> constructor,
+ int bufferSize, int[] readBatchSizeTestSet) {
+ TieredFileSegmentInputStream inputStream = constructor.get();
+
+ // verify
+ verifyInputStream(inputStream, expectedByteBuffer);
+
+ // verify reset with method InputStream#mark() hasn't been called
+ try {
+ inputStream.reset();
+ Assert.fail("Should throw IOException");
+ } catch (IOException e) {
+ Assert.assertTrue(e instanceof IOException);
+ }
+
+ // verify reset with method InputStream#mark() has been called
+ int resetPosition = RANDOM.nextInt(bufferSize);
+ int expectedResetPosition = 0;
+ inputStream = constructor.get();
+ // verify and mark with resetPosition, use read() to read a byte each time
+ for (int i = 0; i < RESET_TIMES; i++) {
+ verifyInputStream(inputStream, expectedByteBuffer, expectedResetPosition, resetPosition);
+
+ try {
+ inputStream.reset();
+ } catch (IOException e) {
+ Assert.fail("Should not throw IOException");
+ }
+
+ expectedResetPosition = resetPosition;
+ resetPosition += RANDOM.nextInt(bufferSize - resetPosition);
+ }
+ for (int i = 0; i < readBatchSizeTestSet.length; i++) {
+ inputStream = constructor.get();
+ int readBatchSize = readBatchSizeTestSet[i];
+ expectedResetPosition = 0;
+ resetPosition = readBatchSize * RANDOM.nextInt(1 + bufferSize / readBatchSize);
+ // verify and mark with resetPosition, use read(byte[]) to read a byte array each time
+ for (int j = 0; j < RESET_TIMES; j++) {
+ verifyInputStreamViaBatchRead(inputStream, expectedByteBuffer, expectedResetPosition, resetPosition, readBatchSize);
+ try {
+ inputStream.reset();
+ } catch (IOException e) {
+ Assert.fail("Should not throw IOException");
+ }
+
+ expectedResetPosition = resetPosition;
+ resetPosition += readBatchSize * RANDOM.nextInt(1 + (bufferSize - resetPosition) / readBatchSize);
+ }
+ }
+ }
+
+ private void verifyInputStream(InputStream inputStream, ByteBuffer expectedBuffer) {
+ verifyInputStream(inputStream, expectedBuffer, 0, -1);
+ }
+
+ /**
+ * verify the input stream
+ *
+ * @param inputStream the input stream to be verified
+ * @param expectedBuffer the expected byte buffer
+ * @param expectedBufferReadPos the expected start position of the expected byte buffer
+ * @param expectedMarkCalledPos the expected position when the method InputStream#mark() is called. <i>(-1 means ignored)</i>
+ */
+ private void verifyInputStream(InputStream inputStream, ByteBuffer expectedBuffer, int expectedBufferReadPos,
+ int expectedMarkCalledPos) {
+ try {
+ expectedBuffer.position(expectedBufferReadPos);
+ while (true) {
+ if (expectedMarkCalledPos == expectedBuffer.position()) {
+ inputStream.mark(0);
+ }
+ int b = inputStream.read();
+ if (b == -1)
+ break;
+ Assert.assertEquals(expectedBuffer.get(), (byte) b);
+ }
+ Assert.assertFalse(expectedBuffer.hasRemaining());
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ /**
+ * verify the input stream
+ *
+ * @param inputStream the input stream to be verified
+ * @param expectedBuffer the expected byte buffer
+ * @param expectedBufferReadPos the expected start position of the expected byte buffer
+ * @param expectedMarkCalledPos the expected position when the method InputStream#mark() is called. <i>(-1 means ignored)</i>
+ * @param readBatchSize the batch size of each read(byte[]) operation
+ */
+ private void verifyInputStreamViaBatchRead(InputStream inputStream, ByteBuffer expectedBuffer,
+ int expectedBufferReadPos, int expectedMarkCalledPos, int readBatchSize) {
+ try {
+ expectedBuffer.position(expectedBufferReadPos);
+ byte[] buf = new byte[readBatchSize];
+ while (true) {
+ if (expectedMarkCalledPos == expectedBuffer.position()) {
+ inputStream.mark(0);
+ }
+ int len = inputStream.read(buf, 0, readBatchSize);
+ if (len == -1)
+ break;
+ byte[] expected = new byte[len];
+ expectedBuffer.get(expected, 0, len);
+ for (int i = 0; i < len; i++) {
+ Assert.assertEquals(expected[i], buf[i]);
+ }
+ }
+ Assert.assertFalse(expectedBuffer.hasRemaining());
+ } catch (IOException e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
index f55f748..79b1883 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
@@ -41,11 +41,11 @@
TieredFileSegment segment = createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
segment.initPosition(segment.getSize());
long lastSize = segment.getSize();
- segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
- segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+ segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
+ segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
Assert.assertTrue(segment.needCommit());
- ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
long msg3StoreTime = System.currentTimeMillis();
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, msg3StoreTime);
long queueOffset = baseOffset * 1000L;
@@ -117,8 +117,8 @@
long startTime = System.currentTimeMillis();
MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
long lastSize = segment.getSize();
- segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
- segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+ segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
+ segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
segment.blocker = new CompletableFuture<>();
new Thread(() -> {
@@ -127,7 +127,7 @@
} catch (InterruptedException e) {
Assert.fail(e.getMessage());
}
- ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+ ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
segment.append(buffer, 0);
segment.blocker.complete(false);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index 268ea2d..befd401 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -50,7 +50,7 @@
+ 2 + 30 //properties
+ 0;
- public static ByteBuffer buildMessageBuffer() {
+ public static ByteBuffer buildMockedMessageBuffer() {
// Initialization of storage space
ByteBuffer buffer = ByteBuffer.allocate(MSG_LEN);
// 1 TOTALSIZE
@@ -99,23 +99,36 @@
return buffer;
}
+ public static ByteBuffer buildMockedConsumeQueueBuffer() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ // 1 COMMIT_LOG_OFFSET
+ byteBuffer.putLong(1);
+ // 2 MESSAGE_SIZE
+ byteBuffer.putInt(2);
+ // 3 TAG_HASH_CODE
+ byteBuffer.putLong(3);
+ byteBuffer.flip();
+ return byteBuffer;
+ }
+
+
@Test
public void testGetTotalSize() {
- ByteBuffer buffer = buildMessageBuffer();
+ ByteBuffer buffer = buildMockedMessageBuffer();
int totalSize = MessageBufferUtil.getTotalSize(buffer);
Assert.assertEquals(MSG_LEN, totalSize);
}
@Test
public void testGetMagicCode() {
- ByteBuffer buffer = buildMessageBuffer();
+ ByteBuffer buffer = buildMockedMessageBuffer();
int magicCode = MessageBufferUtil.getMagicCode(buffer);
Assert.assertEquals(MessageDecoder.MESSAGE_MAGIC_CODE_V2, magicCode);
}
@Test
public void testSplitMessages() {
- ByteBuffer msgBuffer1 = buildMessageBuffer();
+ ByteBuffer msgBuffer1 = buildMockedMessageBuffer();
msgBuffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 10);
ByteBuffer msgBuffer2 = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
@@ -124,7 +137,7 @@
msgBuffer2.putLong(System.currentTimeMillis());
msgBuffer2.flip();
- ByteBuffer msgBuffer3 = buildMessageBuffer();
+ ByteBuffer msgBuffer3 = buildMockedMessageBuffer();
msgBuffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 11);
ByteBuffer msgBuffer = ByteBuffer.allocate(msgBuffer1.remaining() + msgBuffer2.remaining() + msgBuffer3.remaining());
@@ -202,21 +215,21 @@
@Test
public void testGetQueueOffset() {
- ByteBuffer buffer = buildMessageBuffer();
+ ByteBuffer buffer = buildMockedMessageBuffer();
long queueOffset = MessageBufferUtil.getQueueOffset(buffer);
Assert.assertEquals(6, queueOffset);
}
@Test
public void testGetStoreTimeStamp() {
- ByteBuffer buffer = buildMessageBuffer();
+ ByteBuffer buffer = buildMockedMessageBuffer();
long storeTimeStamp = MessageBufferUtil.getStoreTimeStamp(buffer);
Assert.assertEquals(11, storeTimeStamp);
}
@Test
public void testGetOffsetId() {
- ByteBuffer buffer = buildMessageBuffer();
+ ByteBuffer buffer = buildMockedMessageBuffer();
InetSocketAddress inetSocketAddress = new InetSocketAddress("255.255.255.255", 65535);
ByteBuffer addr = ByteBuffer.allocate(Long.BYTES);
addr.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
@@ -232,7 +245,7 @@
@Test
public void testGetProperties() {
- ByteBuffer buffer = buildMessageBuffer();
+ ByteBuffer buffer = buildMockedMessageBuffer();
Map<String, String> properties = MessageBufferUtil.getProperties(buffer);
Assert.assertEquals(2, properties.size());
Assert.assertTrue(properties.containsKey(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));