blob: 1b8dae0dfef1772f0cd18fb27bfc139e17520499 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.jms;
import java.io.File;
import javax.jms.JMSException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.apache.commons.io.FileUtils;
import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSOutputOperatorTest.JMSStringSinglePortOutputOperator;
import com.datatorrent.lib.util.ActiveMQMultiTypeMessageListener;
import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
* Base testing class for testing transactionable store implementations.
*/
public class JMSTransactionableStoreTestBase extends JMSTestBase
{
public static final String SUBJECT = "TEST.FOO";
public static final String CLIENT_ID = "Client1";
public static final String APP_ID = "appId";
public static final int OPERATOR_ID = 1;
public static final int OPERATOR_2_ID = 2;
public static Class<? extends JMSBaseTransactionableStore> storeClass;
public static OperatorContext testOperatorContext;
public static OperatorContext testOperator2Context;
public static class TestMeta extends TestWatcher
{
@Override
protected void starting(org.junit.runner.Description description)
{
//Create fresh operator context
DefaultAttributeMap attributes = new DefaultAttributeMap();
attributes.put(DAG.APPLICATION_ID, APP_ID);
testOperatorContext = mockOperatorContext(OPERATOR_ID, attributes);
testOperator2Context = mockOperatorContext(OPERATOR_2_ID, attributes);
FileUtils.deleteQuietly(new File(FSPsuedoTransactionableStore.DEFAULT_RECOVERY_DIRECTORY));
}
@Override
protected void finished(org.junit.runner.Description description)
{
FileUtils.deleteQuietly(new File(FSPsuedoTransactionableStore.DEFAULT_RECOVERY_DIRECTORY));
}
}
@Rule
public TestMeta testMeta = new TestMeta();
public JMSTransactionableStoreTestBase(Class<? extends JMSBaseTransactionableStore> storeClass)
{
JMSTransactionableStoreTestBase.storeClass = storeClass;
}
/**
* This is a helper method to create the output operator. Note this cannot be
* put in test watcher because because JMS connection issues occur when this code
* is run from a test watcher.
*
* @param metaQueueName metaQueueName to set in JMSTransactionableStore
*/
private JMSStringSinglePortOutputOperator createOperator(OperatorContext context, String metaQueueName)
{
JMSStringSinglePortOutputOperator outputOperator = new JMSStringSinglePortOutputOperator();
JMSBaseTransactionableStore store;
try {
store = storeClass.newInstance();
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
if (JMSTransactionableStore.class.equals(storeClass) && metaQueueName != null) {
((JMSTransactionableStore)store).setMetaQueueName(metaQueueName);
}
outputOperator.getConnectionFactoryProperties().put("userName", "");
outputOperator.getConnectionFactoryProperties().put("password", "");
outputOperator.getConnectionFactoryProperties().put("brokerURL", "tcp://localhost:61617");
outputOperator.setAckMode("CLIENT_ACKNOWLEDGE");
outputOperator.setClientId(CLIENT_ID);
outputOperator.setSubject("TEST.FOO");
outputOperator.setMessageSize(255);
outputOperator.setBatch(1);
outputOperator.setTopic(false);
outputOperator.setDurable(false);
outputOperator.setStore(store);
outputOperator.setVerbose(true);
outputOperator.setup(context);
return outputOperator;
}
@Test
public void connectedTest()
{
JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
Assert.assertTrue("Should be connected.", store.isConnected());
jmsOutputOperator.teardown();
Assert.assertFalse("Should not be connected.", store.isConnected());
}
@Test
public void transactionTest()
{
JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
store.beginTransaction();
Assert.assertTrue("Should be in transaction.", store.isInTransaction());
store.commitTransaction();
Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
jmsOutputOperator.teardown();
}
@Test
public void storeRetreiveTransactionTest()
{
JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
jmsOutputOperator.beginWindow(0L);
jmsOutputOperator.endWindow();
long windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(0L, windowId);
jmsOutputOperator.teardown();
}
/**
* Creates two operators with different operatorId and same appId to test correct functionality of storing and
* retrieving windowId with message selector
*/
@Test
public void twoOperatorsStoreRetrieveWithMessageSelectorTransactionTest()
{
JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
jmsOutputOperator.beginWindow(0L);
jmsOutputOperator.endWindow();
//Create fresh operator context
JMSStringSinglePortOutputOperator jmsOutputOperator2 = createOperator(testOperator2Context, null);
jmsOutputOperator2.beginWindow(1L);
jmsOutputOperator2.endWindow();
long windowIdOp = jmsOutputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(0L, windowIdOp);
long windowIdOp2 = jmsOutputOperator2.getStore().getCommittedWindowId(APP_ID, OPERATOR_2_ID);
Assert.assertEquals(1L, windowIdOp2);
jmsOutputOperator.teardown();
jmsOutputOperator2.teardown();
}
/**
* Similar to the test above with using a custom metaQueueName
*/
@Test
public void twoOperatorsStoreRetrieveWithMessageSelectorTransactionTestWithCustomMetaQueueName()
{
JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, "metaQ1");
jmsOutputOperator.beginWindow(0L);
jmsOutputOperator.endWindow();
//Create fresh operator context
JMSStringSinglePortOutputOperator jmsOutputOperator2 = createOperator(testOperator2Context, "metaQ2");
jmsOutputOperator2.beginWindow(1L);
jmsOutputOperator2.endWindow();
long windowIdOp = jmsOutputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(0L, windowIdOp);
long windowIdOp2 = jmsOutputOperator2.getStore().getCommittedWindowId(APP_ID, OPERATOR_2_ID);
Assert.assertEquals(1L, windowIdOp2);
jmsOutputOperator.teardown();
jmsOutputOperator2.teardown();
}
@Test
public void multiWindowTransactionTest()
{
JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
long windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(-1L, windowId);
jmsOutputOperator.beginWindow(0L);
jmsOutputOperator.endWindow();
windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(0L, windowId);
jmsOutputOperator.beginWindow(1L);
jmsOutputOperator.endWindow();
windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(1L, windowId);
jmsOutputOperator.beginWindow(2L);
jmsOutputOperator.endWindow();
windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(2L, windowId);
jmsOutputOperator.beginWindow(3L);
jmsOutputOperator.endWindow();
windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(3L, windowId);
jmsOutputOperator.beginWindow(4L);
jmsOutputOperator.endWindow();
jmsOutputOperator.teardown();
}
@Test
public void commitTest() throws JMSException, InterruptedException
{
final ActiveMQMultiTypeMessageListener listener = new ActiveMQMultiTypeMessageListener();
listener.setSubject(SUBJECT);
listener.setupConnection();
listener.run();
JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
store.beginTransaction();
jmsOutputOperator.inputPort.put("a");
Thread.sleep(500);
Assert.assertEquals(0, listener.receivedData.size());
store.commitTransaction();
Thread.sleep(500);
Assert.assertEquals(1, listener.receivedData.size());
jmsOutputOperator.teardown();
listener.closeConnection();
}
}