blob: 59bca13f6340374c1470c1e897bf023c78eb1793 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.jms;
import org.apache.sling.amq.ActiveMQConnectionFactoryService;
import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest;
import org.apache.sling.jms.impl.JMSTopicManager;
import org.apache.sling.mom.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.*;
/**
*/
public class JMSTopicManagerTest {
private static final long MESSAGE_LATENCY = 1000;
private static final Logger LOGGER = LoggerFactory.getLogger(JMSTopicManagerTest.class);
private JMSTopicManager jsmTopicManager;
private ActiveMQConnectionFactoryService amqConnectionFactoryService;
private Map<String, Object> testMap;
private boolean passed;
private long lastSent;
@Mock
private ServiceReference<Subscriber> serviceReference;
@Mock
private Bundle bundle;
@Mock
private BundleContext bundleContext;
private Map<String, Object> serviceProperties = new HashMap<String, Object>();
public JMSTopicManagerTest() {
MockitoAnnotations.initMocks(this);
}
@Before
public void before() throws NoSuchFieldException, IllegalAccessException, JMSException {
Mockito.when(serviceReference.getBundle()).thenReturn(bundle);
Mockito.when(bundle.getBundleContext()).thenReturn(bundleContext);
Mockito.when(serviceReference.getPropertyKeys()).thenAnswer(new Answer<String[]>() {
@Override
public String[] answer(InvocationOnMock invocationOnMock) throws Throwable {
return (String[]) serviceProperties.keySet().toArray(new String[serviceProperties.size()]);
}
});
Mockito.when(serviceReference.getProperty(Mockito.anyString())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return serviceProperties.get(invocationOnMock.getArguments()[0]);
}
});
amqConnectionFactoryService = ActiveMQConnectionFactoryServiceTest.activate(null);
jsmTopicManager = JMSTopicManagerTest.activate(amqConnectionFactoryService);
testMap = JsonTest.createTestMap();
passed = false;
}
public static JMSTopicManager activate(ActiveMQConnectionFactoryService amqConnectionFactoryService) throws NoSuchFieldException, IllegalAccessException, JMSException {
JMSTopicManager jsmTopicManager = new JMSTopicManager();
setPrivate(jsmTopicManager, "connectionFactoryService", amqConnectionFactoryService);
jsmTopicManager.activate(new HashMap<String, Object>());
return jsmTopicManager;
}
private static void setPrivate(Object object, String name, Object value) throws NoSuchFieldException, IllegalAccessException {
Field field = object.getClass().getDeclaredField(name);
if ( !field.isAccessible()) {
field.setAccessible(true);
}
field.set(object, value);
}
@After
public void after() throws JMSException {
JMSTopicManagerTest.deactivate(jsmTopicManager);
ActiveMQConnectionFactoryServiceTest.deactivate(amqConnectionFactoryService);
}
public static void deactivate(JMSTopicManager jsmTopicManager) throws JMSException {
jsmTopicManager.deactivate(new HashMap<String, Object>());
}
/**
* Test a working publish operation, read the message and check all ok. Will try and read the message for 1s. Normally messages
* arrive within 15ms.
* @throws Exception
*/
@Test
public void testPublish() throws Exception {
// make the test map unique.
testMap.put("testing", "testPublish" + System.currentTimeMillis());
addSubscriber(new String[]{"testtopic"}, true);
jsmTopicManager.publish(Types.topicName("testtopic"), Types.commandName("testcommand"), testMap);
lastSent = System.currentTimeMillis();
assertTrue(waitForPassed(MESSAGE_LATENCY));
removeSubscriber();
}
private void addSubscriber(String[] topics, boolean match) {
Subscriber subscriber = new TestingSubscriber(this, match, topics);
serviceProperties.clear();
serviceProperties.put(Subscriber.TOPIC_NAMES_PROP, topics);
Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(subscriber);
jsmTopicManager.addSubscriber(serviceReference);
}
private void removeSubscriber() {
jsmTopicManager.removeSubscriber(serviceReference);
}
/**
* Test that a message sent with the wrong topic doesn't arrive, filtered by the topic inside the jmsTopicManager.
* @throws Exception
*/
@Test
public void testFilterdByTopic() throws Exception {
// make the test map unique.
testMap.put("testing", "testFilterdByTopic" + System.currentTimeMillis());
addSubscriber(new String[]{"testtopic"}, false);
lastSent = System.currentTimeMillis();
assertFalse(waitForPassed(MESSAGE_LATENCY)); // not expecting a message at all
removeSubscriber();
}
/**
* Check that a message sent to the correct topic is filtered by the MessageFilter.
* The test waits 1s for the message to arrive. If testPublish does not fail, message
* latency is < 1s.
* @throws Exception
*/
@Test
public void testFilterdByFilter() throws Exception {
// make the test map unique.
testMap.put("testing", "testFilterdByFilter" + System.currentTimeMillis());
addSubscriber(new String[]{"testtopic"}, false);
jsmTopicManager.publish(Types.topicName("testtopic"), Types.commandName("testcommand"), testMap);
lastSent = System.currentTimeMillis();
assertFalse(waitForPassed(MESSAGE_LATENCY)); // not expecting a message at all
removeSubscriber();
}
private boolean waitForPassed(long t) {
long end = System.currentTimeMillis() + t;
while(System.currentTimeMillis() < end) {
if (passed) {
return true;
} else {
Thread.yield();
}
}
LOGGER.info("Message not recieved after "+t+" ms");
return false;
}
private static class TestingSubscriber implements Subscriber, MessageFilter {
private JMSTopicManagerTest test;
private final boolean accept;
private final Set<Types.Name> topicnames;
public TestingSubscriber(JMSTopicManagerTest test, boolean accept, String[] topicname) {
this.test = test;
this.accept = accept;
this.topicnames = new HashSet<Types.Name>();
for(String t : topicname) {
topicnames.add(Types.topicName(t));
}
}
@Override
public void onMessage(Types.TopicName topic, Map<String, Object> message) {
LOGGER.info("Got message in "+(System.currentTimeMillis()-test.lastSent)+" ms");
JsonTest.checkEquals(test.testMap, message);
test.passed = true;
}
@Override
public boolean accept(Types.Name name, Map<String, Object> mapMessage) {
return topicnames.contains(name) == accept;
}
}
}