blob: eb05c2b639f6d7b13cfc13fa8ba2b73cd356ee7c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flume.ng.souce;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.NAME_SERVER_CONFIG;
import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_CONFIG;
import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_DEFAULT;
import static org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TOPIC_DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
*
*/
public class RocketMQSourceTest {
private static final Logger log = LoggerFactory.getLogger(RocketMQSourceTest.class);
private static String nameServer = "localhost:9876";
private static NamesrvController namesrvController;
private static BrokerController brokerController;
private String tag = TAG_DEFAULT + "_SOURCE_TEST_" + new Random().nextInt(99);
private String producerGroup = "PRODUCER_GROUP_SOURCE_TEST";
@BeforeClass
public static void startMQ() throws Exception {
/*
start nameserver
*/
startNamesrv();
/*
start broker
*/
startBroker();
Thread.sleep(2000);
}
private static void startNamesrv() throws Exception {
NamesrvConfig namesrvConfig = new NamesrvConfig();
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
boolean initResult = namesrvController.initialize();
if (!initResult) {
namesrvController.shutdown();
throw new Exception();
}
namesrvController.start();
}
private static void startBroker() throws Exception {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setNamesrvAddr(nameServer);
brokerConfig.setBrokerId(MixAll.MASTER_ID);
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(10911);
NettyClientConfig nettyClientConfig = new NettyClientConfig();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
boolean initResult = brokerController.initialize();
if (!initResult) {
brokerController.shutdown();
throw new Exception();
}
brokerController.start();
}
@Test
public void testEvent() throws EventDeliveryException, MQBrokerException, MQClientException, InterruptedException, UnsupportedEncodingException {
// publish test message
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
String sendMsg = "\"Hello Flume\"" + "," + DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss");
try {
producer.start();
Message msg = new Message(TOPIC_DEFAULT, tag, sendMsg.getBytes("UTF-8"));
SendResult sendResult = producer.send(msg);
log.info("publish message : {}, sendResult:{}", sendMsg, sendResult);
} catch (Exception e) {
throw new MQClientException("Failed to publish messages", e);
} finally {
producer.shutdown();
}
// start source
Context context = new Context();
context.put(NAME_SERVER_CONFIG, nameServer);
context.put(TAG_CONFIG, tag);
Channel channel = new MemoryChannel();
Configurables.configure(channel, context);
List<Channel> channels = new ArrayList<>();
channels.add(channel);
ChannelSelector channelSelector = new ReplicatingChannelSelector();
channelSelector.setChannels(channels);
ChannelProcessor channelProcessor = new ChannelProcessor(channelSelector);
RocketMQSource source = new RocketMQSource();
source.setChannelProcessor(channelProcessor);
Configurables.configure(source, context);
source.start();
PollableSource.Status status = source.process();
if (status == PollableSource.Status.BACKOFF) {
fail("Error");
}
/*
wait for processQueueTable init
*/
Thread.sleep(1000);
source.stop();
/*
mock flume sink
*/
Transaction transaction = channel.getTransaction();
transaction.begin();
Event event = channel.take();
if (event == null) {
transaction.commit();
fail("Error");
}
byte[] body = event.getBody();
String receiveMsg = new String(body, "UTF-8");
log.info("receive message : {}", receiveMsg);
assertEquals(sendMsg, receiveMsg);
}
@AfterClass
public static void stop() {
if (brokerController != null) {
brokerController.shutdown();
}
if (namesrvController != null) {
namesrvController.shutdown();
}
}
}