| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.stream.rocketmq; |
| |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.events.CacheEvent; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.stream.StreamMultipleTupleExtractor; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.rocketmq.client.producer.DefaultMQProducer; |
| import org.apache.rocketmq.common.TopicConfig; |
| import org.apache.rocketmq.common.message.Message; |
| import org.apache.rocketmq.common.message.MessageExt; |
| import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; |
| import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.NAME_SERVER_PORT; |
| import static org.apache.ignite.stream.rocketmq.TestRocketMQServer.TEST_IP; |
| |
| /** |
| * Test for {@link RocketMQStreamer}. |
| */ |
| public class RocketMQStreamerTest extends GridCommonAbstractTest { |
| /** Test topic. */ |
| private static final String TOPIC_NAME = "testTopic"; |
| |
| /** Test consumer group. */ |
| private static final String CONSUMER_GRP = "testConsumerGrp"; |
| |
| /** Test server. */ |
| private static TestRocketMQServer testRocketMQServer; |
| |
| /** Number of events to handle. */ |
| private static final int EVT_NUM = 1000; |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override protected void beforeTest() throws Exception { |
| grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| grid().cache(DEFAULT_CACHE_NAME).clear(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| testRocketMQServer = new TestRocketMQServer(log); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTestsStopped() throws Exception { |
| if (testRocketMQServer != null) |
| testRocketMQServer.shutdown(); |
| } |
| |
| /** Constructor. */ |
| public RocketMQStreamerTest() { |
| super(true); |
| } |
| |
| /** |
| * Tests data is properly injected into the grid. |
| * |
| * @throws Exception If fails. |
| */ |
| @Test |
| public void testStreamer() throws Exception { |
| RocketMQStreamer<String, byte[]> streamer = null; |
| |
| Ignite ignite = grid(); |
| |
| try (IgniteDataStreamer<String, byte[]> dataStreamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { |
| dataStreamer.allowOverwrite(true); |
| dataStreamer.autoFlushFrequency(10); |
| |
| streamer = new RocketMQStreamer<>(); |
| |
| //configure. |
| streamer.setIgnite(ignite); |
| streamer.setStreamer(dataStreamer); |
| streamer.setNameSrvAddr(TEST_IP + ":" + NAME_SERVER_PORT); |
| streamer.setConsumerGrp(CONSUMER_GRP); |
| streamer.setTopic(TOPIC_NAME); |
| streamer.setMultipleTupleExtractor(new TestTupleExtractor()); |
| |
| streamer.start(); |
| |
| IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME); |
| |
| assertEquals(0, cache.size(CachePeekMode.PRIMARY)); |
| |
| final CountDownLatch latch = new CountDownLatch(EVT_NUM); |
| |
| IgniteBiPredicate<UUID, CacheEvent> putLsnr = new IgniteBiPredicate<UUID, CacheEvent>() { |
| @Override public boolean apply(UUID uuid, CacheEvent evt) { |
| assert evt != null; |
| |
| latch.countDown(); |
| |
| return true; |
| } |
| }; |
| |
| ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(putLsnr, null, EVT_CACHE_OBJECT_PUT); |
| |
| produceData(); |
| |
| latch.await(30, TimeUnit.SECONDS); |
| |
| assertEquals(EVT_NUM, cache.size(CachePeekMode.PRIMARY)); |
| } |
| finally { |
| if (streamer != null) |
| streamer.stop(); |
| } |
| } |
| |
| /** |
| * Test tuple extractor. |
| */ |
| public static class TestTupleExtractor implements StreamMultipleTupleExtractor<List<MessageExt>, String, byte[]> { |
| |
| /** {@inheritDoc} */ |
| @Override public Map<String, byte[]> extract(List<MessageExt> msgs) { |
| final Map<String, byte[]> map = new HashMap<>(); |
| |
| for (MessageExt msg : msgs) |
| map.put(msg.getMsgId(), msg.getBody()); |
| |
| return map; |
| } |
| } |
| |
| /** |
| * Adds data to RocketMQ. |
| * |
| * @throws Exception If fails. |
| */ |
| private void produceData() throws Exception { |
| initTopic(TOPIC_NAME, TEST_IP + ":" + NAME_SERVER_PORT); |
| |
| DefaultMQProducer producer = new DefaultMQProducer("testProducerGrp"); |
| |
| producer.setNamesrvAddr(TEST_IP + ":" + NAME_SERVER_PORT); |
| |
| try { |
| producer.start(); |
| |
| for (int i = 0; i < EVT_NUM; i++) |
| producer.send(new Message(TOPIC_NAME, "", String.valueOf(i).getBytes("UTF-8"))); |
| } |
| catch (Exception e) { |
| throw new Exception(e); |
| } |
| finally { |
| producer.shutdown(); |
| } |
| } |
| |
| /** |
| * Initializes RocketMQ topic. |
| * |
| * @param topic Topic. |
| * @param nsAddr Nameserver address. |
| * @throws IgniteInterruptedCheckedException If fails. |
| */ |
| private void initTopic(String topic, String nsAddr) throws Exception { |
| DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); |
| defaultMQAdminExt.setNamesrvAddr(nsAddr); |
| try { |
| defaultMQAdminExt.start(); |
| |
| TopicConfig topicConfig = new TopicConfig(); |
| topicConfig.setTopicName(topic); |
| topicConfig.setReadQueueNums(4); |
| topicConfig.setWriteQueueNums(4); |
| |
| defaultMQAdminExt.createAndUpdateTopicConfig(testRocketMQServer.getBrokerAddr(), topicConfig); |
| |
| U.sleep(100); |
| } |
| finally { |
| defaultMQAdminExt.shutdown(); |
| } |
| } |
| } |