blob: 8331b485d947b8dcc500c714fbf8db210f2ea0f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.testbase;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.util.List;
import javax.annotation.Resource;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.console.model.request.ConsumerConfigInfo;
import org.apache.rocketmq.console.model.request.TopicConfigInfo;
import org.apache.rocketmq.console.service.ConsumerService;
import org.apache.rocketmq.console.service.TopicService;
import org.apache.rocketmq.console.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan(basePackageClasses = {TestRocketMQServer.class})
public abstract class RocketMQConsoleTestBase {
private Logger consoleTestBaseLog = LoggerFactory.getLogger(RocketMQConsoleTestBase.class);
protected static final int RETRY_QUEUE_NUMS = 2;
protected static final int WRITE_QUEUE_NUM = 16;
protected static final int READ_QUEUE_NUM = 16;
protected static final int PERM = 6;
protected static final String TEST_CONSUMER_GROUP = "CONSOLE_TEST_CONSUMER_GROUP";
protected static final String TEST_PRODUCER_GROUP = "CONSOLE_TEST_PRODUCER_GROUP";
protected static final String TEST_CREATE_DELETE_CONSUMER_GROUP = "CREATE_DELETE_CONSUMER_GROUP";
protected static final String TEST_CREATE_DELETE_TOPIC = "CREATE_DELETE_TOPIC";
protected static final String TEST_TOPIC_MESSAGE_BODY = "hello world";
protected static final String TEST_TOPIC_MESSAGE_KEY = "TEST_TOPIC_KEY";
protected static final String TEST_CONSOLE_TOPIC = "TEST_CONSOLE_TOPIC";
@Resource
protected ConsumerService consumerService;
protected ConsumerConfigInfo consumerConfigInfo = new ConsumerConfigInfo();
private DefaultMQPushConsumer consumer;
private DefaultMQProducer producer;
@Resource
protected TopicService topicService;
public static abstract class RetryTempLate<T> {
protected abstract T process() throws Exception;
public T execute(int times, long waitTime) throws Exception {
Exception exception = null;
for (int i = 0; i < times; i++) {
try {
return process();
}
catch (Exception ignore) {
exception = ignore;
if (waitTime > 0) {
Thread.sleep(waitTime);
}
}
}
throw Throwables.propagate(exception);
}
}
protected void startTestMQProducer() {
producer = new DefaultMQProducer(TEST_PRODUCER_GROUP);
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
try {
producer.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
protected SendResult sendTestTopicMessage() throws InterruptedException {
if (producer == null) {
startTestMQProducer();
}
Message message = new Message(TEST_CONSOLE_TOPIC, TEST_TOPIC_MESSAGE_BODY.getBytes());
message.setKeys(Lists.<String>newArrayList(TEST_TOPIC_MESSAGE_KEY));
for (int i = 0; i < 3; i++) {
try {
return producer.send(message);
}
catch (Exception ignore) {
Thread.sleep(500);
}
}
throw new RuntimeException("send message error");
}
protected void startTestMQConsumer() {
consumer = new DefaultMQPushConsumer(TEST_CONSUMER_GROUP); //test online consumer
consumerConfigInfo.setBrokerNameList(Lists.newArrayList(TestConstant.TEST_BROKER_NAME));
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(TEST_CONSUMER_GROUP);
subscriptionGroupConfig.setRetryQueueNums(RETRY_QUEUE_NUMS);
consumerConfigInfo.setSubscriptionGroupConfig(subscriptionGroupConfig);
consumerService.createAndUpdateSubscriptionGroupConfig(consumerConfigInfo);
consumer.setNamesrvAddr(TestConstant.NAME_SERVER_ADDRESS);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe(TEST_CONSOLE_TOPIC, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
consoleTestBaseLog.info("op=consumeMessage message={}", JsonUtil.obj2String(msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
protected void registerTestMQTopic() {
TopicConfigInfo topicConfigInfo = new TopicConfigInfo();
topicConfigInfo.setBrokerNameList(Lists.newArrayList(TestConstant.TEST_BROKER_NAME));
topicConfigInfo.setTopicName(TEST_CONSOLE_TOPIC);
topicConfigInfo.setWriteQueueNums(WRITE_QUEUE_NUM);
topicConfigInfo.setReadQueueNums(READ_QUEUE_NUM);
topicConfigInfo.setPerm(PERM);
topicService.createOrUpdate(topicConfigInfo);
}
protected void initMQClientEnv() {
startTestMQProducer();
startTestMQConsumer();
}
protected void destroyMQClientEnv() {
if (consumer != null) {
consumer.shutdown();
}
if (producer != null) {
producer.shutdown();
}
}
}