blob: befd401ffe71aafb7ca28f7e85b176db5659b446 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tieredstore.util;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.tieredstore.container.TieredCommitLog;
import org.apache.rocketmq.tieredstore.container.TieredConsumeQueue;
import org.junit.Assert;
import org.junit.Test;
public class MessageBufferUtilTest {
public static final int MSG_LEN = 4 //TOTALSIZE
+ 4 //MAGICCODE
+ 4 //BODYCRC
+ 4 //QUEUEID
+ 4 //FLAG
+ 8 //QUEUEOFFSET
+ 8 //PHYSICALOFFSET
+ 4 //SYSFLAG
+ 8 //BORNTIMESTAMP
+ 8 //BORNHOST
+ 8 //STORETIMESTAMP
+ 8 //STOREHOSTADDRESS
+ 4 //RECONSUMETIMES
+ 8 //Prepared Transaction Offset
+ 4 + 0 //BODY
+ 2 + 0 //TOPIC
+ 2 + 30 //properties
+ 0;
public static ByteBuffer buildMockedMessageBuffer() {
// Initialization of storage space
ByteBuffer buffer = ByteBuffer.allocate(MSG_LEN);
// 1 TOTALSIZE
buffer.putInt(MSG_LEN);
// 2 MAGICCODE
buffer.putInt(MessageDecoder.MESSAGE_MAGIC_CODE_V2);
// 3 BODYCRC
buffer.putInt(3);
// 4 QUEUEID
buffer.putInt(4);
// 5 FLAG
buffer.putInt(5);
// 6 QUEUEOFFSET
buffer.putLong(6);
// 7 PHYSICALOFFSET
buffer.putLong(7);
// 8 SYSFLAG
buffer.putInt(8);
// 9 BORNTIMESTAMP
buffer.putLong(9);
// 10 BORNHOST
buffer.putLong(10);
// 11 STORETIMESTAMP
buffer.putLong(11);
// 12 STOREHOSTADDRESS
buffer.putLong(10);
// 13 RECONSUMETIMES
buffer.putInt(13);
// 14 Prepared Transaction Offset
buffer.putLong(14);
// 15 BODY
buffer.putInt(0);
// 16 TOPIC
buffer.putShort((short) 0);
// 17 PROPERTIES
Map<String, String> map = new HashMap<>();
map.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "uk");
map.put("userkey", "uservalue0");
String properties = MessageDecoder.messageProperties2String(map);
byte[] propertiesBytes = properties.getBytes(StandardCharsets.UTF_8);
buffer.putShort((short) propertiesBytes.length);
buffer.put(propertiesBytes);
buffer.flip();
Assert.assertEquals(MSG_LEN, buffer.remaining());
return buffer;
}
public static ByteBuffer buildMockedConsumeQueueBuffer() {
ByteBuffer byteBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
// 1 COMMIT_LOG_OFFSET
byteBuffer.putLong(1);
// 2 MESSAGE_SIZE
byteBuffer.putInt(2);
// 3 TAG_HASH_CODE
byteBuffer.putLong(3);
byteBuffer.flip();
return byteBuffer;
}
@Test
public void testGetTotalSize() {
ByteBuffer buffer = buildMockedMessageBuffer();
int totalSize = MessageBufferUtil.getTotalSize(buffer);
Assert.assertEquals(MSG_LEN, totalSize);
}
@Test
public void testGetMagicCode() {
ByteBuffer buffer = buildMockedMessageBuffer();
int magicCode = MessageBufferUtil.getMagicCode(buffer);
Assert.assertEquals(MessageDecoder.MESSAGE_MAGIC_CODE_V2, magicCode);
}
@Test
public void testSplitMessages() {
ByteBuffer msgBuffer1 = buildMockedMessageBuffer();
msgBuffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 10);
ByteBuffer msgBuffer2 = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
msgBuffer2.putInt(TieredCommitLog.CODA_SIZE);
msgBuffer2.putInt(TieredCommitLog.BLANK_MAGIC_CODE);
msgBuffer2.putLong(System.currentTimeMillis());
msgBuffer2.flip();
ByteBuffer msgBuffer3 = buildMockedMessageBuffer();
msgBuffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 11);
ByteBuffer msgBuffer = ByteBuffer.allocate(msgBuffer1.remaining() + msgBuffer2.remaining() + msgBuffer3.remaining());
msgBuffer.put(msgBuffer1);
msgBuffer.put(msgBuffer2);
msgBuffer.put(msgBuffer3);
msgBuffer.flip();
ByteBuffer cqBuffer1 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
cqBuffer1.putLong(1000);
cqBuffer1.putInt(MSG_LEN);
cqBuffer1.putLong(0);
cqBuffer1.flip();
ByteBuffer cqBuffer2 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
cqBuffer2.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
cqBuffer2.putInt(MSG_LEN);
cqBuffer2.putLong(0);
cqBuffer2.flip();
ByteBuffer cqBuffer3 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
cqBuffer3.putLong(1000 + MSG_LEN);
cqBuffer3.putInt(MSG_LEN);
cqBuffer3.putLong(0);
cqBuffer3.flip();
ByteBuffer cqBuffer4 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
cqBuffer4.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
cqBuffer4.putInt(MSG_LEN - 10);
cqBuffer4.putLong(0);
cqBuffer4.flip();
ByteBuffer cqBuffer5 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
cqBuffer5.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
cqBuffer5.putInt(MSG_LEN * 10);
cqBuffer5.putLong(0);
cqBuffer5.flip();
ByteBuffer cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
cqBuffer.put(cqBuffer1);
cqBuffer.put(cqBuffer2);
cqBuffer.flip();
cqBuffer1.rewind();
cqBuffer2.rewind();
List<Pair<Integer, Integer>> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
Assert.assertEquals(2, msgList.size());
Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
cqBuffer.put(cqBuffer1);
cqBuffer.put(cqBuffer4);
cqBuffer.flip();
cqBuffer1.rewind();
cqBuffer4.rewind();
msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
Assert.assertEquals(1, msgList.size());
Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3);
cqBuffer.put(cqBuffer1);
cqBuffer.put(cqBuffer3);
cqBuffer.flip();
msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
Assert.assertEquals(2, msgList.size());
Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
cqBuffer.put(cqBuffer5);
cqBuffer.flip();
msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
Assert.assertEquals(0, msgList.size());
}
@Test
public void testGetQueueOffset() {
ByteBuffer buffer = buildMockedMessageBuffer();
long queueOffset = MessageBufferUtil.getQueueOffset(buffer);
Assert.assertEquals(6, queueOffset);
}
@Test
public void testGetStoreTimeStamp() {
ByteBuffer buffer = buildMockedMessageBuffer();
long storeTimeStamp = MessageBufferUtil.getStoreTimeStamp(buffer);
Assert.assertEquals(11, storeTimeStamp);
}
@Test
public void testGetOffsetId() {
ByteBuffer buffer = buildMockedMessageBuffer();
InetSocketAddress inetSocketAddress = new InetSocketAddress("255.255.255.255", 65535);
ByteBuffer addr = ByteBuffer.allocate(Long.BYTES);
addr.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
addr.putInt(inetSocketAddress.getPort());
addr.flip();
for (int i = 0; i < addr.remaining(); i++) {
buffer.put(MessageBufferUtil.STORE_HOST_POSITION + i, addr.get(i));
}
String excepted = MessageDecoder.createMessageId(ByteBuffer.allocate(TieredStoreUtil.MSG_ID_LENGTH), addr, 7);
String offsetId = MessageBufferUtil.getOffsetId(buffer);
Assert.assertEquals(excepted, offsetId);
}
@Test
public void testGetProperties() {
ByteBuffer buffer = buildMockedMessageBuffer();
Map<String, String> properties = MessageBufferUtil.getProperties(buffer);
Assert.assertEquals(2, properties.size());
Assert.assertTrue(properties.containsKey(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
Assert.assertEquals("uk", properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
Assert.assertTrue(properties.containsKey("userkey"));
Assert.assertEquals("uservalue0", properties.get("userkey"));
}
}