blob: ff33d09d5f95f34eb726dc7eb4276a6ba9e582a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.jms11;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
/**
* Test for {@link JmsStreamer}. Tests both queues and topics.
*
* @author Raul Kripalani
*/
public class IgniteJmsStreamerTest extends GridCommonAbstractTest {
/** */
private static final int CACHE_ENTRY_COUNT = 100;
/** */
private static final String QUEUE_NAME = "ignite.test.queue";
/** */
private static final String TOPIC_NAME = "ignite.test.topic";
/** */
private static final Map<String, String> TEST_DATA = new HashMap<>();
static {
for (int i = 1; i <= CACHE_ENTRY_COUNT; i++)
TEST_DATA.put(Integer.toString(i), "v" + i);
}
/** */
private BrokerService broker;
/** */
private ConnectionFactory connFactory;
/** Constructor. */
public IgniteJmsStreamerTest() {
super(true);
}
/**
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
@Override public void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(false);
broker.setPersistenceAdapter(null);
broker.setPersistenceFactory(null);
PolicyMap plcMap = new PolicyMap();
PolicyEntry plc = new PolicyEntry();
plc.setQueuePrefetch(1);
broker.setDestinationPolicy(plcMap);
broker.getDestinationPolicy().setDefaultEntry(plc);
broker.setSchedulerSupport(false);
broker.start(true);
connFactory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
}
/**
* @throws Exception Iff ailed.
*/
@Override public void afterTest() throws Exception {
grid().cache(DEFAULT_CACHE_NAME).clear();
broker.stop();
broker.deleteAllMessages();
}
/**
* @throws Exception If failed.
*/
@Test
public void testQueueFromName() throws Exception {
Destination dest = new ActiveMQQueue(QUEUE_NAME);
// produce messages into the queue
produceObjectMessages(dest, false);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer);
jmsStreamer.setDestinationType(Queue.class);
jmsStreamer.setDestinationName(QUEUE_NAME);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
jmsStreamer.start();
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTopicFromName() throws JMSException, InterruptedException {
Destination dest = new ActiveMQTopic(TOPIC_NAME);
// should not produced messages until subscribed to the topic; otherwise they will be missed because this is not
// a durable subscriber (for which a dedicated test exists)
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer);
jmsStreamer.setDestinationType(Topic.class);
jmsStreamer.setDestinationName(TOPIC_NAME);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
jmsStreamer.start();
// produce messages
produceObjectMessages(dest, false);
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testQueueFromExplicitDestination() throws Exception {
Destination dest = new ActiveMQQueue(QUEUE_NAME);
// produce messages into the queue
produceObjectMessages(dest, false);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer);
jmsStreamer.setDestination(dest);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
// start the streamer
jmsStreamer.start();
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTopicFromExplicitDestination() throws JMSException, InterruptedException {
Destination dest = new ActiveMQTopic(TOPIC_NAME);
// should not produced messages until subscribed to the topic; otherwise they will be missed because this is not
// a durable subscriber (for which a dedicated test exists)
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer);
jmsStreamer.setDestination(dest);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
jmsStreamer.start();
// produce messages
produceObjectMessages(dest, false);
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testInsertMultipleCacheEntriesFromOneMessage() throws Exception {
Destination dest = new ActiveMQQueue(QUEUE_NAME);
// produce A SINGLE MESSAGE, containing all data, into the queue
produceStringMessages(dest, true);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer);
jmsStreamer.setDestination(dest);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
jmsStreamer.start();
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDurableSubscriberStartStopStart() throws Exception {
Destination dest = new ActiveMQTopic(TOPIC_NAME);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer);
jmsStreamer.setDestination(dest);
jmsStreamer.setDurableSubscription(true);
jmsStreamer.setClientId(Long.toString(System.currentTimeMillis()));
jmsStreamer.setDurableSubscriptionName("ignite-test-durable");
// we start the streamer so that the durable subscriber registers itself
jmsStreamer.start();
// we stop it immediately
jmsStreamer.stop();
// we assert that there are no clients of the broker (to make sure we disconnected properly)
assertEquals(0, broker.getCurrentConnections());
// we send messages while we're still away
produceStringMessages(dest, false);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
jmsStreamer.start();
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testQueueMessagesConsumedInBatchesCompletionSizeBased() throws Exception {
Destination dest = new ActiveMQQueue(QUEUE_NAME);
// produce multiple messages into the queue
produceStringMessages(dest, false);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer);
jmsStreamer.setDestination(dest);
jmsStreamer.setBatched(true);
jmsStreamer.setBatchClosureSize(99);
// disable time-based session commits
jmsStreamer.setBatchClosureMillis(0);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
jmsStreamer.start();
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
// we expect all entries to be loaded, but still one (uncommitted) message should remain in the queue
// as observed by the broker
DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics();
assertEquals(1, qStats.getMessages().getCount());
assertEquals(1, qStats.getInflight().getCount());
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testQueueMessagesConsumedInBatchesCompletionTimeBased() throws Exception {
Destination dest = new ActiveMQQueue(QUEUE_NAME);
// produce multiple messages into the queue
produceStringMessages(dest, false);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer);
jmsStreamer.setDestination(dest);
jmsStreamer.setBatched(true);
jmsStreamer.setBatchClosureMillis(2000);
// disable size-based session commits
jmsStreamer.setBatchClosureSize(0);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics();
jmsStreamer.start();
// all messages are still inflight
assertEquals(CACHE_ENTRY_COUNT, qStats.getMessages().getCount());
assertEquals(0, qStats.getDequeues().getCount());
// wait a little bit
Thread.sleep(100);
// all messages are still inflight
assertEquals(CACHE_ENTRY_COUNT, qStats.getMessages().getCount());
assertEquals(0, qStats.getDequeues().getCount());
// now let the scheduler execute
Thread.sleep(2100);
// all messages are committed
assertEquals(0, qStats.getMessages().getCount());
assertEquals(CACHE_ENTRY_COUNT, qStats.getDequeues().getCount());
latch.await(5, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testGenerateNoEntries() throws Exception {
Destination dest = new ActiveMQQueue(QUEUE_NAME);
// produce multiple messages into the queue
produceStringMessages(dest, false);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer);
// override the transformer with one that generates no cache entries
jmsStreamer.setTransformer(TestTransformers.generateNoEntries());
jmsStreamer.setDestination(dest);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(1);
jmsStreamer.start();
// no cache PUT events were received in 3 seconds, i.e. CountDownLatch does not fire
assertFalse(latch.await(3, TimeUnit.SECONDS));
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testTransactedSessionNoBatching() throws Exception {
Destination dest = new ActiveMQQueue(QUEUE_NAME);
// produce multiple messages into the queue
produceStringMessages(dest, false);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer);
jmsStreamer.setTransacted(true);
jmsStreamer.setDestination(dest);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
jmsStreamer.start();
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testQueueMultipleThreads() throws Exception {
Destination dest = new ActiveMQQueue(QUEUE_NAME);
// produce messages into the queue
produceObjectMessages(dest, false);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer);
jmsStreamer.setDestination(dest);
jmsStreamer.setThreads(5);
// subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT
CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT);
// start the streamer
jmsStreamer.start();
DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(dest).getDestinationStatistics();
assertEquals(5, qStats.getConsumers().getCount());
// all cache PUT events received in 10 seconds
latch.await(10, TimeUnit.SECONDS);
// assert that all consumers received messages - given that the prefetch is 1
for (Subscription subscription : broker.getBroker().getDestinationMap().get(dest).getConsumers())
assertTrue(subscription.getDequeueCounter() > 0);
assertAllCacheEntriesLoaded();
jmsStreamer.stop();
}
}
/**
* Test for ExceptionListener functionality.
*
* @throws Exception If fails.
*/
@Test
public void testExceptionListener() throws Exception {
// restart broker with auth plugin
if (broker.isStarted())
broker.stop();
broker.waitUntilStopped();
broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(new ArrayList())});
broker.start(true);
connFactory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
final List<Throwable> lsnrExceptions = new LinkedList<>();
final CountDownLatch latch = new CountDownLatch(1);
Destination dest = new ActiveMQQueue(QUEUE_NAME);
try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer);
jmsStreamer.setExceptionListener(new ExceptionListener() {
@Override public void onException(JMSException e) {
System.out.println("ERROR");
lsnrExceptions.add(e);
latch.countDown();
}
});
jmsStreamer.setDestination(dest);
GridTestUtils.assertThrowsWithCause(new Callable<Void>() {
@Override public Void call() throws Exception {
jmsStreamer.start();
return null;
}
}, SecurityException.class);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(lsnrExceptions.size() > 0);
GridTestUtils.assertThrowsWithCause(new Callable<Void>() {
@Override public Void call() throws Exception {
jmsStreamer.stop();
return null;
}
}, IgniteException.class);
}
}
/**
*
*/
private void assertAllCacheEntriesLoaded() {
// Get the cache and check that the entries are present
IgniteCache<String, String> cache = grid().cache(DEFAULT_CACHE_NAME);
for (Map.Entry<String, String> entry : TEST_DATA.entrySet())
assertEquals(entry.getValue(), cache.get(entry.getKey()));
}
@SuppressWarnings("unchecked")
private <T extends Message> JmsStreamer<T, String, String> newJmsStreamer(Class<T> type,
IgniteDataStreamer<String, String> dataStreamer) {
JmsStreamer<T, String, String> jmsStreamer = new JmsStreamer<>();
jmsStreamer.setIgnite(grid());
jmsStreamer.setStreamer(dataStreamer);
jmsStreamer.setConnectionFactory(connFactory);
if (type == ObjectMessage.class)
jmsStreamer.setTransformer((MessageTransformer<T, String, String>) TestTransformers.forObjectMessage());
else
jmsStreamer.setTransformer((MessageTransformer<T, String, String>) TestTransformers.forTextMessage());
dataStreamer.allowOverwrite(true);
dataStreamer.autoFlushFrequency(10);
return jmsStreamer;
}
/**
* @param expect Expected events number.
* @return Event receive latch.
*/
private CountDownLatch subscribeToPutEvents(int expect) {
Ignite ignite = grid();
// Listen to cache PUT events and expect as many as messages as test data items
final CountDownLatch latch = new CountDownLatch(expect);
@SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> cb = new IgniteBiPredicate<UUID, CacheEvent>() {
@Override public boolean apply(UUID uuid, CacheEvent evt) {
latch.countDown();
return true;
}
};
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(cb, null, EVT_CACHE_OBJECT_PUT);
return latch;
}
private void produceObjectMessages(Destination dest, boolean singleMsg) throws JMSException {
Session ses = connFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = ses.createProducer(dest);
HashSet<TestTransformers.TestObject> set = new HashSet<>();
for (String key : TEST_DATA.keySet()) {
TestTransformers.TestObject to = new TestTransformers.TestObject(key, TEST_DATA.get(key));
set.add(to);
}
int messagesSent;
if (singleMsg) {
mp.send(ses.createObjectMessage(set));
messagesSent = 1;
}
else {
for (TestTransformers.TestObject to : set)
mp.send(ses.createObjectMessage(to));
messagesSent = set.size();
}
if (dest instanceof Queue) {
try {
assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(dest)
.getDestinationStatistics().getMessages().getCount());
}
catch (Exception e) {
fail(e.toString());
}
}
}
private void produceStringMessages(Destination dest, boolean singleMsg) throws JMSException {
Session ses = connFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer mp = ses.createProducer(dest);
HashSet<String> set = new HashSet<>();
for (String key : TEST_DATA.keySet())
set.add(key + "," + TEST_DATA.get(key));
int messagesSent;
if (singleMsg) {
StringBuilder sb = new StringBuilder();
for (String s : set)
sb.append(s).append("|");
sb.deleteCharAt(sb.length() - 1);
mp.send(ses.createTextMessage(sb.toString()));
messagesSent = 1;
}
else {
for (String s : set)
mp.send(ses.createTextMessage(s));
messagesSent = set.size();
}
if (dest instanceof Queue) {
try {
assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(dest)
.getDestinationStatistics().getMessages().getCount());
}
catch (Exception e) {
fail(e.toString());
}
}
}
}