blob: 5c048c467afb2c0ace065dbdac01ef930c21c5a0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.kafka;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
/**
* A bunch of test to verify the input operator will be automatically partitioned
* per kafka partition This test is launching its
* own Kafka cluster.
*/
@Ignore // see https://issues.apache.org/jira/browse/APEXMALHAR-2162
@RunWith(Parameterized.class)
public class KafkaInputOperatorTest extends KafkaOperatorTestBase
{
private int totalBrokers = 0;
private String partition = null;
private String testName = "";
public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
public class KafkaTestInfo extends TestWatcher
{
public org.junit.runner.Description desc;
public String getDir()
{
String methodName = desc.getMethodName();
String className = desc.getClassName();
return "target/" + className + "/" + methodName + "/" + testName;
}
@Override
protected void starting(org.junit.runner.Description description)
{
this.desc = description;
}
}
@Rule
public final KafkaTestInfo testInfo = new KafkaTestInfo();
@Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
public static Collection<Object[]> testScenario()
{
return Arrays.asList(new Object[][]{
{true, false, "one_to_one"},// multi cluster with single partition
{true, false, "one_to_many"},
{true, true, "one_to_one"},// multi cluster with multi partitions
{true, true, "one_to_many"},
{false, true, "one_to_one"}, // single cluster with multi partitions
{false, true, "one_to_many"},
{false, false, "one_to_one"}, // single cluster with single partitions
{false, false, "one_to_many"}
});
}
@Before
public void before()
{
testName = TEST_TOPIC + testCounter++;
logger.info("before() test case: {}", testName);
tupleCollection.clear();
//reset count for next new test case
k = 0;
createTopic(0, testName);
if (hasMultiCluster) {
createTopic(1, testName);
}
}
public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
{
// This class want to initialize several kafka brokers for multiple partitions
this.hasMultiCluster = hasMultiCluster;
this.hasMultiPartition = hasMultiPartition;
int cluster = 1 + (hasMultiCluster ? 1 : 0);
totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
this.partition = partition;
}
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
private static List<String> tupleCollection = new LinkedList<>();
/**
* whether countDown latch count all tuples or just END_TUPLE
*/
private static final boolean countDownAll = false;
private static final int scale = 2;
private static final int totalCount = 10 * scale;
private static final int failureTrigger = 3 * scale;
private static final int tuplesPerWindow = 5 * scale;
private static final int waitTime = 60000 + 300 * scale;
//This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
//so, count valid tuple instead.
private static CountDownLatch latch;
private static boolean hasFailure = false;
private static int k = 0;
private static Thread monitorThread;
/**
* Test Operator to collect tuples from KafkaSingleInputStringOperator.
*
* @param
*/
public static class CollectorModule extends BaseOperator
{
public final transient DefaultInputPort<byte[]> inputPort = new DefaultInputPort<byte[]>()
{
@Override
public void process(byte[] bt)
{
processTuple(bt);
}
};
long currentWindowId;
long operatorId;
boolean isIdempotentTest = false;
transient List<String> windowTupleCollector = Lists.newArrayList();
private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap<>();
private int endTuples = 0;
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
operatorId = context.getId();
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
currentWindowId = windowId;
windowTupleCollector.clear();
endTuples = 0;
}
public void processTuple(byte[] bt)
{
String tuple = new String(bt);
if (hasFailure && k++ == failureTrigger) {
//you can only kill yourself once
hasFailure = false;
throw new RuntimeException();
}
if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
endTuples++;
}
windowTupleCollector.add(tuple);
}
@Override
public void endWindow()
{
super.endWindow();
if (isIdempotentTest) {
String key = operatorId + "," + currentWindowId;
List<String> msgsInWin = tupleCollectedInWindow.get(key);
if (msgsInWin != null) {
Assert.assertEquals(
"replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
} else {
List<String> newList = Lists.newArrayList();
newList.addAll(windowTupleCollector);
tupleCollectedInWindow.put(key, newList);
}
}
//discard the tuples of this window if except happened
int tupleSize = windowTupleCollector.size();
tupleCollection.addAll(windowTupleCollector);
int countDownTupleSize = countDownAll ? tupleSize : endTuples;
if (latch != null) {
Assert.assertTrue(
"received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
while (countDownTupleSize > 0) {
latch.countDown();
--countDownTupleSize;
}
if (latch.getCount() == 0) {
/**
* The time before countDown() and the shutdown() of the application
* will cause fatal error:
* "Catastrophic Error: Invalid State - the operator blocked forever!"
* as the activeQueues could be cleared but alive haven't changed yet.
* throw the ShutdownException to let the engine shutdown;
*/
try {
throw new ShutdownException();
//lc.shutdown();
} finally {
/**
* interrupt the engine thread, let it wake from sleep and handle
* the shutdown at this time, all payload should be handled. so it
* should be ok to interrupt
*/
monitorThread.interrupt();
}
}
}
}
}
/**
* Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
* data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
*
* [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
* consumer) and send using emitTuples() interface on output port]
*
*
* @throws Exception
*/
@Test
public void testInputOperator() throws Exception
{
hasFailure = false;
testInputOperator(false, false);
}
@Test
public void testInputOperatorWithFailure() throws Exception
{
hasFailure = true;
testInputOperator(true, false);
}
@Test
public void testIdempotentInputOperatorWithFailure() throws Exception
{
hasFailure = true;
testInputOperator(true, true);
}
public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
{
// each broker should get a END_TUPLE message
latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
logger.info(
"Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
" hasMultiPartition: {}, partition: {}",
testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
// Start producer
KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
p.setSendCount(totalCount);
Thread t = new Thread(p);
t.start();
int expectedReceiveCount = totalCount + totalBrokers;
// Create DAG for testing.
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
// Create KafkaSinglePortStringInputOperator
KafkaSinglePortInputOperator node = dag.addOperator(
"Kafka input" + testName, KafkaSinglePortInputOperator.class);
node.setInitialPartitionCount(1);
// set topic
node.setTopics(testName);
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
node.setClusters(getClusterConfig());
node.setStrategy(partition);
if (idempotent) {
node.setWindowDataManager(new FSWindowDataManager());
}
// Create Test tuple collector
CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
collector.isIdempotentTest = idempotent;
// Connect ports
dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort)
.setLocality(Locality.CONTAINER_LOCAL);
if (hasFailure) {
setupHasFailureTest(node, dag);
}
// Create local cluster
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
//let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
//but Controller.runAsync() don't expose the thread which run it,
//so we don't know when the thread will be terminated.
//create this thread and then call join() to make sure the Controller shutdown completely.
monitorThread = new Thread((StramLocalCluster)lc, "master");
monitorThread.start();
boolean notTimeout = true;
try {
// Wait 60s for consumer finish consuming all the messages
notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
lc.shutdown();
//wait until control thread finished.
monitorThread.join();
} catch (Exception e) {
logger.warn(e.getMessage());
}
t.join();
if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
expectedReceiveCount, testName, tupleCollection);
}
Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
+ tupleCollection, notTimeout);
// Check results
Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
+ "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
expectedReceiveCount == tupleCollection.size());
logger.info("End of test case: {}", testName);
}
private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
{
operator.setHoldingBufferSize(5000);
dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
//dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
// APPLICATION_PATH + "failureck", new Configuration()));
operator.setMaxTuplesPerWindow(tuplesPerWindow);
}
private String getClusterConfig()
{
String l = "localhost:";
return l + TEST_KAFKA_BROKER_PORT[0] +
(hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1] : "");
}
}