blob: 1ce69b4d6dddefc239be15749e36defa57267849 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.statictopic;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.TestUtils;
import org.apache.rocketmq.test.util.VerifyUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.google.common.truth.Truth.assertThat;
import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
@FixMethodOrder
public class StaticTopicIT extends BaseConf {
private static Logger logger = Logger.getLogger(StaticTopicIT.class);
private DefaultMQAdminExt defaultMQAdminExt;
@Before
public void setUp() throws Exception {
System.setProperty("rocketmq.client.rebalance.waitInterval", "500");
defaultMQAdminExt = getAdmin(nsAddr);
waitBrokerRegistered(nsAddr, clusterName, brokerNum);
defaultMQAdminExt.start();
}
@Test
public void testCommandsWithCluster() throws Exception {
//This case is used to mock the env to test the command manually
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10;
int msgEachQueue = 100;
{
MQAdminTestUtils.createStaticTopicWithCommand(topic, queueNum, null, clusterName, nsAddr);
sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 0);
//consume and check
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
{
MQAdminTestUtils.remappingStaticTopicWithCommand(topic, null, clusterName, nsAddr);
awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, msgEachQueue);
}
}
@Test
public void testCommandsWithBrokers() throws Exception {
//This case is used to mock the env to test the command manually
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10;
int msgEachQueue = 100;
{
Set<String> brokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopicWithCommand(topic, queueNum, brokers, null, nsAddr);
sendMessagesAndCheck(producer, brokers, topic, queueNum, msgEachQueue, 0);
//consume and check
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
{
Set<String> brokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopicWithCommand(topic, brokers, null, nsAddr);
awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
sendMessagesAndCheck(producer, brokers, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
}
}
@Test
public void testNoTargetBrokers() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
int queueNum = 10;
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
TopicConfigAndQueueMapping configMapping = remoteBrokerConfigMap.get(broker2Name);
Assert.assertEquals(0, configMapping.getWriteQueueNums());
Assert.assertEquals(0, configMapping.getReadQueueNums());
Assert.assertEquals(0, configMapping.getMappingDetail().getHostedQueues().size());
}
{
Set<String> targetBrokers = new HashSet<>();
targetBrokers.add(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
}
private void sendMessagesAndCheck(RMQNormalProducer producer, Set<String> targetBrokers, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception {
ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
Assert.assertEquals(queueNum, messageQueueList.size());
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(topic, messageQueue.getTopic());
Assert.assertEquals(TopicQueueMappingUtils.getMockBrokerName(MixAll.METADATA_SCOPE_GLOBAL), messageQueue.getBrokerName());
Assert.assertEquals(i, messageQueue.getQueueId());
String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
Assert.assertTrue(targetBrokers.contains(destBrokerName));
}
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
Assert.assertTrue(awaitDispatchMs(500));
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
}
TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, topicStatsTable.getOffsetTable().get(messageQueue).getMinOffset());
Assert.assertEquals(msgEachQueue + baseOffset, topicStatsTable.getOffsetTable().get(messageQueue).getMaxOffset());
}
}
private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) {
Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
for (Object object : msgs) {
MessageExt messageExt = (MessageExt) object;
if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
}
messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
}
for (List<MessageExt> msgEachQueue: messagesByQueue.values()) {
Collections.sort(msgEachQueue, new Comparator<MessageExt>() {
@Override
public int compare(MessageExt o1, MessageExt o2) {
return (int) (o1.getQueueOffset() - o2.getQueueOffset());
}
});
}
return messagesByQueue;
}
private void consumeMessagesAndCheck(RMQNormalProducer producer, RMQNormalConsumer consumer, String topic, int queueNum, int msgEachQueue, int startGen, int genNum) {
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
/*System.out.println("produce:" + producer.getAllMsgBody().size());
System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
Assert.assertEquals(producer.getAllMsgBody().size(), consumer.getListener().getAllMsgBody().size());
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
/*for (MessageExt messageExt:messageExts) {
System.out.printf("%d %d\n", messageExt.getQueueId(), messageExt.getQueueOffset());
}*/
int totalEachQueue = msgEachQueue * genNum;
Assert.assertEquals(totalEachQueue, messageExts.size());
for (int j = 0; j < totalEachQueue; j++) {
MessageExt messageExt = messageExts.get(j);
int currGen = startGen + j / msgEachQueue;
Assert.assertEquals(topic, messageExt.getTopic());
Assert.assertEquals(TopicQueueMappingUtils.getMockBrokerName(MixAll.METADATA_SCOPE_GLOBAL), messageExt.getBrokerName());
Assert.assertEquals(i, messageExt.getQueueId());
Assert.assertEquals((j % msgEachQueue) + currGen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExt.getQueueOffset());
}
}
}
@Test
public void testCreateProduceConsumeStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
//check the static topic config
{
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) {
String broker = entry.getKey();
TopicConfigAndQueueMapping configMapping = entry.getValue();
TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
Assert.assertNotNull(localConfigMapping);
Assert.assertEquals(configMapping, localConfigMapping);
}
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
}
//send and check
sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 0);
//consume and check
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
@Test
public void testRemappingProduceConsumeStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
int queueNum = 1;
int msgEachQueue = 100;
//create send consume
{
Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
//remapping the static topic
{
Set<String> targetBrokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
Assert.assertEquals(broker2Name, mappingOne.getBname());
Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
}
awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), consumer.getConsumer(), defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
}
}
public boolean awaitRefreshStaticTopicMetadata(long timeMs, String topic, DefaultMQProducer producer, DefaultMQPushConsumer consumer, DefaultMQAdminExt adminExt) throws Exception {
long start = System.currentTimeMillis();
MQClientInstance currentInstance = null;
while (System.currentTimeMillis() - start <= timeMs) {
boolean allOk = true;
if (producer != null) {
currentInstance = producer.getDefaultMQProducerImpl().getmQClientFactory();
currentInstance.updateTopicRouteInfoFromNameServer(topic);
if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
allOk = false;
}
}
if (consumer != null) {
currentInstance = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory();
currentInstance.updateTopicRouteInfoFromNameServer(topic);
if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
allOk = false;
}
}
if (adminExt != null) {
currentInstance = adminExt.getDefaultMQAdminExtImpl().getMqClientInstance();
currentInstance.updateTopicRouteInfoFromNameServer(topic);
if (!MQAdminTestUtils.checkStaticTopic(topic, adminExt, currentInstance)) {
allOk = false;
}
}
if (allOk) {
return true;
}
Thread.sleep(100);
}
return false;
}
@Test
public void testDoubleReadCheckConsumerOffset() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
String group = initConsumerGroup();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
long start = System.currentTimeMillis();
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
{
Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
producer.shutdown();
consumer.shutdown();
//use a new producer
producer = getProducer(nsAddr, topic);
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(group);
List<MessageQueue> messageQueues = producer.getMessageQueue();
for (MessageQueue queue: messageQueues) {
OffsetWrapper wrapper = consumeStats.getOffsetTable().get(queue);
Assert.assertNotNull(wrapper);
Assert.assertEquals(msgEachQueue, wrapper.getBrokerOffset());
Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
Assert.assertTrue(wrapper.getLastTimestamp() > start);
}
List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
for (int i = 0; i < brokers.size(); i++) {
Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//make the metadata
awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, (i + 1) * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
TestUtils.waitForSeconds(1);
consumeStats = defaultMQAdminExt.examineConsumeStats(group);
messageQueues = producer.getMessageQueue();
for (MessageQueue queue: messageQueues) {
OffsetWrapper wrapper = consumeStats.getOffsetTable().get(queue);
Assert.assertNotNull(wrapper);
Assert.assertEquals(msgEachQueue + brokers.size() * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, wrapper.getBrokerOffset());
Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
Assert.assertTrue(wrapper.getLastTimestamp() > start);
}
consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());
}
@Test
public void testRemappingAndClear() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
int queueNum = 10;
int msgEachQueue = 100;
//create to broker1Name
{
Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
}
//remapping to broker2Name
{
Set<String> targetBrokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1 * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
//remapping to broker3Name
{
Set<String> targetBrokers = ImmutableSet.of(broker3Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2 * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
}
// 1 -> 2 -> 3, currently 1 should not has any mappings
{
for (int i = 0; i < 10; i++) {
for (BrokerController brokerController: brokerControllerList) {
brokerController.getTopicQueueMappingCleanService().wakeup();
}
Thread.sleep(100);
}
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(brokerNum, brokerConfigMap.size());
TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name);
TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name);
TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name);
Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size());
Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size());
Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size());
}
{
Set<String> topics = new HashSet<>(brokerController1.getTopicConfigManager().getTopicConfigTable().keySet());
topics.remove(topic);
brokerController1.getMessageStore().cleanUnusedTopic(topics);
brokerController2.getMessageStore().cleanUnusedTopic(topics);
for (int i = 0; i < 10; i++) {
for (BrokerController brokerController: brokerControllerList) {
brokerController.getTopicQueueMappingCleanService().wakeup();
}
Thread.sleep(100);
}
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
Assert.assertEquals(brokerNum, brokerConfigMap.size());
TopicConfigAndQueueMapping config1 = brokerConfigMap.get(broker1Name);
TopicConfigAndQueueMapping config2 = brokerConfigMap.get(broker2Name);
TopicConfigAndQueueMapping config3 = brokerConfigMap.get(broker3Name);
Assert.assertEquals(0, config1.getMappingDetail().getHostedQueues().size());
Assert.assertEquals(queueNum, config2.getMappingDetail().getHostedQueues().size());
Assert.assertEquals(queueNum, config3.getMappingDetail().getHostedQueues().size());
//The first leader will clear it
for (List<LogicQueueMappingItem> items : config1.getMappingDetail().getHostedQueues().values()) {
Assert.assertEquals(3, items.size());
}
//The second leader do nothing
for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) {
Assert.assertEquals(1, items.size());
}
}
}
@Test
public void testRemappingWithNegativeLogicOffset() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
int queueNum = 10;
int msgEachQueue = 100;
//create and send
{
Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
}
//remapping the static topic with -1 logic offset
{
Set<String> targetBrokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
Assert.assertEquals(broker2Name, mappingOne.getBname());
Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
}
//leave the time to refresh the metadata
awaitRefreshStaticTopicMetadata(3000, topic, producer.getProducer(), null, defaultMQAdminExt);
//here the gen should be 0
sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
}
}
@After
public void tearDown() {
System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");
super.shutdown();
}
}