blob: daff612680466a93092c41fa6c93c0015bd9e1e0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.io.jms;
import java.io.File;
import javax.jms.ConnectionFactory;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.AssumptionViolatedException;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.testbench.CollectorTestSink;
import org.apache.commons.io.FileUtils;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.PropertiesFileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import static org.apache.apex.malhar.lib.helper.OperatorContextTestHelper.mockOperatorContext;
/**
* Tests for {@link JMSStringInputOperator} for AMZ SQS.
* Note: for SQS we should use AckMode as "AUTO_ACKNOWLEDGE" and
* no transacted mode (transacted = false).
*
* Note: check the comment for com.amazon.sqs.javamessaging.SQSMessageConsumer.close()
* specifically: "Since consumer prefetch threads use SQS long-poll feature with 20 seconds
* timeout, closing each consumer prefetch thread can take up to 20 seconds,
* which in-turn will impact the time on consumer close."
*
* Because of the above this test takes a long time due to consumer.close() in
* org.apache.apex.malhar.lib.io.jms.AbstractJMSInputOperator.cleanup()
*
* NOTE: tests are automatically skipped if the secret key in sqstestCreds.properties
* is missing or blank.
*
* NOTE: each test creates its own uniquely named queue in SQS and then deletes it afterwards.
* Also we try to scrub any leftover queues from the previous runs just in case tests were
* aborted (check org.apache.apex.malhar.lib.io.jms.SQSTestBase.generateCurrentQueueName(String))
*
*/
public class SQSStringInputOperatorTest
{
public static class TestMeta extends TestWatcher
{
String baseDir;
JMSStringInputOperator operator;
CollectorTestSink<Object> sink;
Context.OperatorContext context;
SQSTestBase testBase;
@Override
protected void starting(Description description)
{
final String methodName = description.getMethodName();
final String className = description.getClassName();
testBase = new SQSTestBase();
if (testBase.validateTestCreds() == false) {
return;
}
testBase.generateCurrentQueueName(methodName);
try {
testBase.beforTest();
} catch (AssumptionViolatedException ave) {
throw ave;
} catch (Exception e) {
throw new RuntimeException(e);
}
baseDir = "target/" + className + "/" + methodName;
Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
attributeMap.put(Context.DAGContext.APPLICATION_PATH, baseDir);
context = mockOperatorContext(1, attributeMap);
operator = new JMSStringInputOperator();
operator.setConnectionFactoryBuilder(new JMSBase.ConnectionFactoryBuilder()
{
@Override
public ConnectionFactory buildConnectionFactory()
{
// Create the connection factory using the environment variable credential provider.
// Connections this factory creates can talk to the queues in us-east-1 region.
SQSConnectionFactory connectionFactory =
SQSConnectionFactory.builder()
.withRegion(Region.getRegion(Regions.US_EAST_1))
.withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(testBase.getDevCredsFilePath()))
.build();
return connectionFactory;
}
@Override
public String toString()
{
return className + "/" + methodName + "/ConnectionFactoryBuilder";
}
});
operator.setSubject(testBase.getCurrentQueueName());
// for SQS ack mode should be "AUTO_ACKNOWLEDGE" and transacted = false
operator.setAckMode("AUTO_ACKNOWLEDGE");
operator.setTransacted(false);
sink = new CollectorTestSink<>();
operator.output.setSink(sink);
operator.setup(context);
operator.activate(context);
}
@Override
protected void finished(Description description)
{
if (operator == null) {
Assert.assertFalse(testBase.validateTestCreds());
return;
}
operator.deactivate();
operator.teardown();
try {
FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
testBase.afterTest();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Rule
public TestMeta testMeta = new TestMeta();
/**
* Basic string input test
*
* @throws Exception
*/
@Test
public void testStringMsgInput() throws Exception
{
testMeta.testBase.validateAssumption();
testMeta.testBase.produceMsg("testStringMsgInput", 10, false);
Thread.sleep(1000);
testMeta.operator.emitTuples();
Assert.assertEquals("num of messages", 10, testMeta.sink.collectedTuples.size());
}
@Test
public void testRecoveryAndIdempotency() throws Exception
{
testMeta.testBase.validateAssumption();
testMeta.testBase.produceUniqueMsgs("testRecoveryAndIdempotency", 25, false);
Thread.sleep(3000);
testMeta.operator.beginWindow(1);
testMeta.operator.emitTuples();
testMeta.operator.endWindow();
Assert.assertEquals("num of messages in window 1 pre-failure", 25, testMeta.sink.collectedTuples.size());
// for some reason AMZ SQS doesn't preserve the order producer->consumer or we might have a race
// condition on our producer side, so we can't be sure that get(4) will return "4:..."
// In any case we will only check message matching between pre-failure
// and post-failure cases for 4th and 17th message
final String message4 = (String)testMeta.sink.collectedTuples.get(4);
final String message17 = (String)testMeta.sink.collectedTuples.get(17);
//failure and then re-deployment of operator
testMeta.sink.collectedTuples.clear();
testMeta.operator.setup(testMeta.context);
testMeta.operator.activate(testMeta.context);
Assert.assertEquals("largest recovery window", 1,
testMeta.operator.getWindowDataManager().getLargestCompletedWindow());
testMeta.operator.beginWindow(1);
testMeta.operator.endWindow();
Assert.assertEquals("num of messages in window 1", 25, testMeta.sink.collectedTuples.size());
Assert.assertEquals(message4, testMeta.sink.collectedTuples.get(4));
Assert.assertEquals(message17, testMeta.sink.collectedTuples.get(17));
testMeta.sink.collectedTuples.clear();
}
/**
* This test is different from the one in JMSStringInputOperatorTest because of the differences
* in acknowledge mode. There is no Ack failure but rather failure in emit Tuple. But endWindow
* eventually drains the holdingBuffer and emits all the outstanding tuples so we see 9 tuples
* eventually. Because of the async nature of the operator (messages are delivered to the operator
* as an async listener) we can't be sure how many messages are present in testMeta.sink.collectedTuples
* before endWindow so our assertion is just testMeta.sink.collectedTuples.size() < 9
*
* @throws Exception
*/
@Test
public void testFailureAfterPersistenceAndBeforeRecovery() throws Exception
{
testMeta.testBase.validateAssumption();
testMeta.sink = new CollectorTestSink<Object>()
{
@Override
public void put(Object payload)
{
if (payload instanceof String && ((String)payload).startsWith("4:")) {
throw new RuntimeException("fail 4th message");
}
synchronized (collectedTuples) {
collectedTuples.add(payload);
collectedTuples.notifyAll();
}
}
};
testMeta.operator.output.setSink(testMeta.sink);
testMeta.testBase.produceUniqueMsgs("testFailureAfterPersistenceAndBeforeRecovery", 10, false);
Thread.sleep(1000);
testMeta.operator.beginWindow(1);
try {
testMeta.operator.emitTuples();
} catch (Throwable t) {
LOG.debug("emit exception");
}
Assert.assertTrue("num of messages before endWindow 1", testMeta.sink.collectedTuples.size() < 9);
testMeta.operator.endWindow();
Assert.assertEquals("num of messages after endWindow 1", 9, testMeta.sink.collectedTuples.size());
testMeta.operator.setup(testMeta.context);
testMeta.operator.activate(testMeta.context);
Assert.assertEquals("window 1 should exist", 1,
testMeta.operator.getWindowDataManager().getLargestCompletedWindow());
}
private static final transient Logger LOG = LoggerFactory.getLogger(SQSStringInputOperatorTest.class);
}