| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.rocketmq.logappender; |
| |
| import org.apache.rocketmq.broker.BrokerController; |
| import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
| import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
| import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
| import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
| import org.apache.rocketmq.client.exception.MQClientException; |
| import org.apache.rocketmq.common.BrokerConfig; |
| import org.apache.rocketmq.common.MQVersion; |
| import org.apache.rocketmq.common.MixAll; |
| import org.apache.rocketmq.common.consumer.ConsumeFromWhere; |
| import org.apache.rocketmq.common.message.MessageExt; |
| import org.apache.rocketmq.common.namesrv.NamesrvConfig; |
| import org.apache.rocketmq.logappender.common.ProducerInstance; |
| import org.apache.rocketmq.namesrv.NamesrvController; |
| import org.apache.rocketmq.remoting.netty.NettyClientConfig; |
| import org.apache.rocketmq.remoting.netty.NettyServerConfig; |
| import org.apache.rocketmq.remoting.protocol.RemotingCommand; |
| import org.apache.rocketmq.store.config.MessageStoreConfig; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| /** |
| * Basic test rocketmq broker and name server init |
| */ |
| public class AbstractTestCase { |
| |
| private static String nameServer = "localhost:9876"; |
| |
| private static NamesrvController namesrvController; |
| |
| private static BrokerController brokerController; |
| |
| private static String topic = "TopicTest"; |
| |
| @BeforeClass |
| public static void startRocketmqService() throws Exception { |
| |
| startNamesrv(); |
| |
| startBroker(); |
| } |
| |
| /** |
| * Start rocketmq name server |
| * @throws Exception |
| */ |
| private static void startNamesrv() throws Exception { |
| |
| NamesrvConfig namesrvConfig = new NamesrvConfig(); |
| NettyServerConfig nettyServerConfig = new NettyServerConfig(); |
| nettyServerConfig.setListenPort(9876); |
| |
| namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig); |
| boolean initResult = namesrvController.initialize(); |
| if (!initResult) { |
| namesrvController.shutdown(); |
| throw new Exception(); |
| } |
| namesrvController.start(); |
| } |
| |
| /** |
| * Start rocketmq broker service |
| * @throws Exception |
| */ |
| private static void startBroker() throws Exception { |
| |
| System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); |
| |
| BrokerConfig brokerConfig = new BrokerConfig(); |
| brokerConfig.setNamesrvAddr(nameServer); |
| brokerConfig.setBrokerId(MixAll.MASTER_ID); |
| NettyServerConfig nettyServerConfig = new NettyServerConfig(); |
| nettyServerConfig.setListenPort(10911); |
| NettyClientConfig nettyClientConfig = new NettyClientConfig(); |
| MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); |
| |
| brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); |
| boolean initResult = brokerController.initialize(); |
| if (!initResult) { |
| brokerController.shutdown(); |
| throw new Exception(); |
| } |
| brokerController.start(); |
| } |
| |
| @AfterClass |
| public static void stop() { |
| ProducerInstance.closeAll(); |
| if (brokerController != null) { |
| brokerController.shutdown(); |
| } |
| |
| if (namesrvController != null) { |
| namesrvController.shutdown(); |
| } |
| } |
| |
| protected int consumeMessages(int count,final String key,int timeout) throws MQClientException, InterruptedException { |
| |
| final AtomicInteger cc = new AtomicInteger(0); |
| final CountDownLatch countDownLatch = new CountDownLatch(count); |
| |
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("hello"); |
| consumer.setNamesrvAddr(nameServer); |
| consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); |
| consumer.subscribe(topic, "*"); |
| |
| consumer.registerMessageListener(new MessageListenerConcurrently() { |
| |
| @Override |
| public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, |
| ConsumeConcurrentlyContext context) { |
| for (MessageExt msg : msgs) { |
| String body = new String(msg.getBody()); |
| if(key==null||body.contains(key)){ |
| countDownLatch.countDown(); |
| cc.incrementAndGet(); |
| continue; |
| } |
| } |
| return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
| } |
| }); |
| consumer.start(); |
| countDownLatch.await(timeout, TimeUnit.SECONDS); |
| consumer.shutdown(); |
| return cc.get(); |
| } |
| } |