blob: 9af31854ebb78090fc896f096b3fda57961d5424 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.kafka;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import static org.apache.apex.malhar.lib.helper.OperatorContextTestHelper.mockOperatorContext;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
{
String testName;
private static List<Person> tupleCollection = new LinkedList<>();
private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator;
@Before
public void before()
{
FileUtils.deleteQuietly(new File(APPLICATION_PATH));
testName = TEST_TOPIC + testCounter++;
createTopic(0, testName);
if (hasMultiCluster) {
createTopic(1, testName);
}
}
@After
public void after()
{
FileUtils.deleteQuietly(new File(APPLICATION_PATH));
}
@Test
public void testExactlyOnceWithFailure() throws Exception
{
List<Person> toKafka = GenerateList();
sendDataToKafka(true, toKafka, true, false);
List<Person> fromKafka = ReadFromKafka();
Assert.assertTrue("With Failure", compare(fromKafka, toKafka));
}
@Test
public void testExactlyOnceWithNoFailure() throws Exception
{
List<Person> toKafka = GenerateList();
sendDataToKafka(true, toKafka, false, false);
List<Person> fromKafka = ReadFromKafka();
Assert.assertTrue("With No Failure", compare(fromKafka, toKafka));
}
@Test
public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception
{
List<Person> toKafka = GenerateList();
try {
sendDataToKafka(true, toKafka, true, true);
} catch (RuntimeException ex) {
boolean expectedException = false;
if (ex.getMessage().contains("Violates")) {
expectedException = true;
}
Assert.assertTrue("Different tuples after recovery", expectedException);
return;
}
Assert.assertTrue("Wrong tuples during replay, should throw exception", false);
}
@Test
public void testKafkaOutput() throws Exception
{
List<Person> toKafka = GenerateList();
sendDataToKafka(false, toKafka, false, false);
List<Person> fromKafka = ReadFromKafka();
Assert.assertTrue("No failure", compare(fromKafka, toKafka));
}
@Test
public void testKafkaOutputWithFailure() throws Exception
{
List<Person> toKafka = GenerateList();
sendDataToKafka(false, toKafka, true, true);
List<Person> fromKafka = ReadFromKafka();
Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
}
private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure,
boolean differentTuplesAfterRecovery) throws InterruptedException
{
Properties props = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
if (!exactlyOnce) {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_SERIALIZER);
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
OperatorContext operatorContext = mockOperatorContext(2, attributeMap);
cleanUp(operatorContext);
Operator kafkaOutput;
DefaultInputPort<Person> inputPort;
if (exactlyOnce) {
KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
ResetKafkaOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
} else {
KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
ResetKafkaSimpleOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
}
kafkaOutput.beginWindow(1);
inputPort.getSink().put(toKafka.get(0));
inputPort.getSink().put(toKafka.get(1));
inputPort.getSink().put(toKafka.get(2));
kafkaOutput.endWindow();
kafkaOutput.beginWindow(2);
inputPort.getSink().put(toKafka.get(3));
inputPort.getSink().put(toKafka.get(4));
inputPort.getSink().put(toKafka.get(5));
kafkaOutput.endWindow();
kafkaOutput.beginWindow(3);
inputPort.getSink().put(toKafka.get(6));
inputPort.getSink().put(toKafka.get(7));
if (hasFailure) {
if (exactlyOnce) {
KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
ResetKafkaOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
} else {
KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
ResetKafkaSimpleOutput(testName, props, operatorContext);
inputPort = kafkaOutputTemp.inputPort;
kafkaOutput = kafkaOutputTemp;
}
kafkaOutput.beginWindow(2);
inputPort.getSink().put(toKafka.get(3));
inputPort.getSink().put(toKafka.get(4));
inputPort.getSink().put(toKafka.get(5));
kafkaOutput.endWindow();
kafkaOutput.beginWindow(3);
inputPort.getSink().put(toKafka.get(6));
if (!differentTuplesAfterRecovery) {
inputPort.getSink().put(toKafka.get(7));
}
}
inputPort.getSink().put(toKafka.get(8));
inputPort.getSink().put(toKafka.get(9));
kafkaOutput.endWindow();
kafkaOutput.beginWindow(4);
inputPort.getSink().put(toKafka.get(10));
inputPort.getSink().put(toKafka.get(11));
kafkaOutput.endWindow();
cleanUp(operatorContext);
}
private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(
String testName, Properties props, Context.OperatorContext operatorContext)
{
KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
kafkaOutput.setTopic(testName);
kafkaOutput.setProperties(props);
kafkaOutput.setup(operatorContext);
return kafkaOutput;
}
private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(
String testName, Properties props, Context.OperatorContext operatorContext)
{
KafkaSinglePortOutputOperator<String, Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
kafkaOutput.setTopic(testName);
kafkaOutput.setProperties(props);
kafkaOutput.setup(operatorContext);
return kafkaOutput;
}
private void cleanUp(Context.OperatorContext operatorContext)
{
FSWindowDataManager windowDataManager = new FSWindowDataManager();
windowDataManager.setup(operatorContext);
try {
windowDataManager.committed(windowDataManager.getLargestCompletedWindow());
} catch (IOException e) {
e.printStackTrace();
}
}
private boolean compare(List<Person> fromKafka, List<Person> toKafka)
{
if (fromKafka.size() != toKafka.size()) {
return false;
}
for (int i = 0; i < fromKafka.size(); ++i) {
if (!fromKafka.get(i).equals(toKafka.get(i))) {
return false;
}
}
return true;
}
private String getClusterConfig()
{
String l = "localhost:";
return l + TEST_KAFKA_BROKER_PORT[0] +
(hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1] : "");
}
private List<Person> GenerateList()
{
List<Person> people = new ArrayList<>();
for (Integer i = 0; i < 12; ++i) {
people.add(new Person(i.toString(), i));
}
return people;
}
private List<Person> ReadFromKafka()
{
tupleCollection.clear();
// Create KafkaSinglePortStringInputOperator
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
props.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_DESERIALIZER);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
props.put(GROUP_ID_CONFIG, "KafkaTest");
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
// Create KafkaSinglePortStringInputOperator
KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
node.setConsumerProps(props);
node.setInitialPartitionCount(1);
// set topic
node.setTopics(testName);
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
node.setClusters(getClusterConfig());
node.setStrategy("one_to_one");
// Create Test tuple collector
CollectorModule collector1 = dag.addOperator("collector", new CollectorModule());
// Connect ports
dag.addStream("Kafka message", node.outputPort, collector1.inputPort);
// Create local cluster
final LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
lc.run(30000);
return tupleCollection;
}
public static class CollectorModule extends BaseOperator
{
public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
long currentWindowId;
long operatorId;
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
operatorId = context.getId();
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
currentWindowId = windowId;
}
@Override
public void endWindow()
{
super.endWindow();
}
}
public static class CollectorInputPort extends DefaultInputPort<byte[]>
{
CollectorModule ownerNode;
CollectorInputPort(CollectorModule node)
{
this.ownerNode = node;
}
@Override
public void process(byte[] bt)
{
tupleCollection.add(new KafkaHelper().deserialize("r", bt));
}
}
public static class Person
{
public String name;
public Integer age;
public Person(String name, Integer age)
{
this.name = name;
this.age = age;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Person person = (Person)o;
if (name != null ? !name.equals(person.name) : person.name != null) {
return false;
}
return age != null ? age.equals(person.age) : person.age == null;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (age != null ? age.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return name + age.toString();
}
}
}