blob: 4e97d72c0d25632fbb72999b0abbb88428282d88 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.apex.malhar.kafka;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
* A bunch of test to verify the input operator will be automatically partitioned per kafka partition This test is launching its
* own Kafka cluster.
public class KafkaInputOperatorTest extends KafkaOperatorTestBase
private int totalBrokers = 0;
private String partition = null;
private String testName = "";
public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
public class KafkaTestInfo extends TestWatcher
public org.junit.runner.Description desc;
public String getDir()
String methodName = desc.getMethodName();
String className = desc.getClassName();
return "target/" + className + "/" + methodName + "/" + testName;
protected void starting(org.junit.runner.Description description)
this.desc = description;
public final KafkaTestInfo testInfo = new KafkaTestInfo();
@Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
public static Collection<Object[]> testScenario()
return Arrays.asList(new Object[][]{
{true, false, "one_to_one"},// multi cluster with single partition
{true, false, "one_to_many"},
{true, true, "one_to_one"},// multi cluster with multi partitions
{true, true, "one_to_many"},
{false, true, "one_to_one"}, // single cluster with multi partitions
{false, true, "one_to_many"},
{false, false, "one_to_one"}, // single cluster with single partitions
{false, false, "one_to_many"}
public void before()
testName = TEST_TOPIC + testCounter++;"before() test case: {}", testName);
//reset count for next new test case
k = 0;
createTopic(0, testName);
if (hasMultiCluster) {
createTopic(1, testName);
public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
// This class want to initialize several kafka brokers for multiple partitions
this.hasMultiCluster = hasMultiCluster;
this.hasMultiPartition = hasMultiPartition;
int cluster = 1 + (hasMultiCluster ? 1 : 0);
totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
this.partition = partition;
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
private static List<String> tupleCollection = new LinkedList<>();
* whether countDown latch count all tuples or just END_TUPLE
private static final boolean countDownAll = false;
private static final int scale = 2;
private static final int totalCount = 10 * scale;
private static final int failureTrigger = 3 * scale;
private static final int tuplesPerWindow = 5 * scale;
private static final int waitTime = 60000 + 300 * scale;
//This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
//so, count valid tuple instead.
private static CountDownLatch latch;
private static boolean hasFailure = false;
private static int k = 0;
private static Thread monitorThread;
* Test Operator to collect tuples from KafkaSingleInputStringOperator.
* @param
public static class CollectorModule extends BaseOperator
public final transient DefaultInputPort<byte[]> inputPort = new DefaultInputPort<byte[]>()
public void process(byte[] bt)
long currentWindowId;
long operatorId;
boolean isIdempotentTest = false;
transient List<String> windowTupleCollector = Lists.newArrayList();
private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap<>();
private int endTuples = 0;
public void setup(Context.OperatorContext context)
operatorId = context.getId();
public void beginWindow(long windowId)
currentWindowId = windowId;
endTuples = 0;
public void processTuple(byte[] bt)
String tuple = new String(bt);
if (hasFailure && k++ == failureTrigger) {
//you can only kill yourself once
hasFailure = false;
throw new RuntimeException();
if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
public void endWindow()
if (isIdempotentTest) {
String key = operatorId + "," + currentWindowId;
List<String> msgsInWin = tupleCollectedInWindow.get(key);
if (msgsInWin != null) {
Assert.assertEquals("replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
} else {
List<String> newList = Lists.newArrayList();
tupleCollectedInWindow.put(key, newList);
//discard the tuples of this window if except happened
int tupleSize = windowTupleCollector.size();
int countDownTupleSize = countDownAll ? tupleSize : endTuples;
if (latch != null) {
Assert.assertTrue("received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
while (countDownTupleSize > 0) {
if (latch.getCount() == 0) {
* The time before countDown() and the shutdown() of the application
* will cause fatal error:
* "Catastrophic Error: Invalid State - the operator blocked forever!"
* as the activeQueues could be cleared but alive haven't changed yet.
* throw the ShutdownException to let the engine shutdown;
try {
throw new ShutdownException();
} finally {
* interrupt the engine thread, let it wake from sleep and handle
* the shutdown at this time, all payload should be handled. so it
* should be ok to interrupt
* Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
* data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
* [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
* consumer) and send using emitTuples() interface on output port]
* @throws Exception
public void testInputOperator() throws Exception
hasFailure = false;
testInputOperator(false, false);
public void testInputOperatorWithFailure() throws Exception
hasFailure = true;
testInputOperator(true, false);
public void testIdempotentInputOperatorWithFailure() throws Exception
hasFailure = true;
testInputOperator(true, true);
public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
// each broker should get a END_TUPLE message
latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);"Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}",
testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
// Start producer
KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
Thread t = new Thread(p);
int expectedReceiveCount = totalCount + totalBrokers;
// Create DAG for testing.
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
// Create KafkaSinglePortStringInputOperator
KafkaSinglePortInputOperator node = dag.addOperator("Kafka input" + testName, KafkaSinglePortInputOperator.class);
// set topic
if (idempotent) {
node.setWindowDataManager(new FSWindowDataManager());
// Create Test tuple collector
CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
collector.isIdempotentTest = idempotent;
// Connect ports
dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
if (hasFailure) {
setupHasFailureTest(node, dag);
// Create local cluster
LocalMode.Controller lc = lma.getController();
//let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
//but Controller.runAsync() don't expose the thread which run it, so we don't know when the thread will be terminated.
//create this thread and then call join() to make sure the Controller shutdown completely.
monitorThread = new Thread((StramLocalCluster)lc, "master");
boolean notTimeout = true;
try {
// Wait 60s for consumer finish consuming all the messages
notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
//wait until control thread finished.
} catch (Exception e) {
if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {"Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
expectedReceiveCount, testName, tupleCollection);
Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, notTimeout);
// Check results
Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
expectedReceiveCount == tupleCollection.size());"End of test case: {}", testName);
private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
//dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(APPLICATION_PATH + "failureck", new Configuration()));
private String getClusterConfig()
String l = "localhost:";
return l + TEST_KAFKA_BROKER_PORT[0][0] +
(hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
(hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
(hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");