blob: 534f8d4d87fa6cdad39d592819eeb59415cb3014 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.rocketmq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.NAME_SERVER_PORT;
import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.TEST_IP;
/**
* Test for {@link RocketMQStreamer}.
*/
public class RocketMQStreamerTest extends GridCommonAbstractTest {
/** Test topic. */
private static final String TOPIC_NAME = "testTopic";
/** Test consumer group. */
private static final String CONSUMER_GRP = "testConsumerGrp";
/** Test server. */
private static TestRocketMQServer testRocketMQServer;
/** Number of events to handle. */
private static final int EVT_NUM = 1000;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
grid().cache(DEFAULT_CACHE_NAME).clear();
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
testRocketMQServer = new TestRocketMQServer(log);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
if (testRocketMQServer != null)
testRocketMQServer.shutdown();
}
/** Constructor. */
public RocketMQStreamerTest() {
super(true);
}
/**
* Tests data is properly injected into the grid.
*
* @throws Exception If fails.
*/
@Test
public void testStreamer() throws Exception {
RocketMQStreamer<String, byte[]> streamer = null;
Ignite ignite = grid();
try (IgniteDataStreamer<String, byte[]> dataStreamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
dataStreamer.allowOverwrite(true);
dataStreamer.autoFlushFrequency(10);
streamer = new RocketMQStreamer<>();
//configure.
streamer.setIgnite(ignite);
streamer.setStreamer(dataStreamer);
streamer.setNameSrvAddr(TEST_IP + ":" + NAME_SERVER_PORT);
streamer.setConsumerGrp(CONSUMER_GRP);
streamer.setTopic(TOPIC_NAME);
streamer.setMultipleTupleExtractor(new TestTupleExtractor());
streamer.start();
IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
assertEquals(0, cache.size(CachePeekMode.PRIMARY));
final CountDownLatch latch = new CountDownLatch(EVT_NUM);
IgniteBiPredicate<UUID, CacheEvent> putLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@Override public boolean apply(UUID uuid, CacheEvent evt) {
assert evt != null;
latch.countDown();
return true;
}
};
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(putLsnr, null, EVT_CACHE_OBJECT_PUT);
produceData();
latch.await(30, TimeUnit.SECONDS);
assertEquals(EVT_NUM, cache.size(CachePeekMode.PRIMARY));
}
finally {
if (streamer != null)
streamer.stop();
}
}
/**
* Test tuple extractor.
*/
public static class TestTupleExtractor implements StreamMultipleTupleExtractor<List<MessageExt>, String, byte[]> {
/** {@inheritDoc} */
@Override public Map<String, byte[]> extract(List<MessageExt> msgs) {
final Map<String, byte[]> map = new HashMap<>();
for (MessageExt msg : msgs)
map.put(msg.getMsgId(), msg.getBody());
return map;
}
}
/**
* Adds data to RocketMQ.
*
* @throws Exception If fails.
*/
private void produceData() throws Exception {
initTopic(TOPIC_NAME, TEST_IP + ":" + NAME_SERVER_PORT);
DefaultMQProducer producer = new DefaultMQProducer("testProducerGrp");
producer.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT);
try {
producer.start();
for (int i = 0; i < EVT_NUM; i++)
producer.send(new Message(TOPIC_NAME, "", String.valueOf(i).getBytes("UTF-8")));
}
catch (Exception e) {
throw new Exception(e);
}
finally {
producer.shutdown();
}
}
/**
* Initializes RocketMQ topic.
*
* @param topic Topic.
* @param nsAddr Nameserver address.
* @throws IgniteInterruptedCheckedException If fails.
*/
private void initTopic(String topic, String nsAddr) throws Exception {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setNamesrvAddr(nsAddr);
try {
defaultMQAdminExt.start();
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName(topic);
topicConfig.setReadQueueNums(4);
topicConfig.setWriteQueueNums(4);
defaultMQAdminExt.createAndUpdateTopicConfig(testRocketMQServer.getBrokerAddr(), topicConfig);
U.sleep(100);
}
finally {
defaultMQAdminExt.shutdown();
}
}
}