blob: da036450feca00092bd129311264a55a27b1ca15 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.examples.kafka.exactlyonceoutput;
import org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
/**
* Kafka exactly-once example (Kafka 0.9 API)
*
* This application verifies exactly-once semantics by writing a defined sequence of input data to two Kafka
* output operators -- one that guarantees those semantics (using KafkaSingleExactlyOnceOutputOperator) and one that does not,
* each writing to a different topic. It deliberately causes the intermediate pass-through
* operator to fail causing it to be restarted and some tuples to be reprocessed.
* Then a KafkaInputOperator reads tuples from both topics to verify that the former topic has no duplicates
* but the latter does and writes a single line to a HDFS file with the verification results
* of the following form:
*
* Duplicates: exactly-once: 0, at-least-once: 5
*
* Operators:
*
* **BatchSequenceGenerator:**
* Generates a sequence of numbers starting going from 1 to maxTuplesTotal which can be set in the properties.
*
* **PassthroughFailOperator:**
* This operator kills itself after a defined number of processed tuples by intentionally throwing an exception.
* STRAM will redeploy the operator on a new container. The exception only needs to be thrown once, so a file is
* written to HDFS just before throwing the exception and its presence is checked after restart to determine
* if the exception was already thrown.
*
* **KafkaSinglePortExactlyOnceOutputOperator:**
* Topic, bootstrap.servers, serializer and deserializer are set in properties.xml.
* The topic names should not be changed for this application.
*
* **ValidationToFile:**
* Puts values of input into list depending on topic. If value of maxTuplesTotal is reached it will calculate duplicates
* and write validation output to HDFS. (output line will be printed in container dt.log as well).
*/
@ApplicationAnnotation(name = "KafkaExactlyOnceOutput")
public class Application implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
BatchSequenceGenerator sequenceGenerator = dag.addOperator("sequenceGenerator", BatchSequenceGenerator.class);
PassthroughFailOperator passthroughFailOperator = dag.addOperator("passthrough", PassthroughFailOperator.class);
KafkaSinglePortExactlyOnceOutputOperator<String> kafkaExactlyOnceOutputOperator =
dag.addOperator("kafkaExactlyOnceOutputOperator", KafkaSinglePortExactlyOnceOutputOperator.class);
KafkaSinglePortOutputOperator kafkaOutputOperator =
dag.addOperator("kafkaOutputOperator", KafkaSinglePortOutputOperator.class);
dag.addStream("sequenceToPassthrough", sequenceGenerator.out, passthroughFailOperator.input);
dag.addStream("linesToKafka", passthroughFailOperator.output, kafkaOutputOperator.inputPort,
kafkaExactlyOnceOutputOperator.inputPort);
KafkaSinglePortInputOperator kafkaInputTopicExactly = dag.addOperator("kafkaTopicExactly", KafkaSinglePortInputOperator.class);
kafkaInputTopicExactly.setInitialOffset(KafkaSinglePortInputOperator.InitialOffset.EARLIEST.name());
KafkaSinglePortInputOperator kafkaInputTopicAtLeast = dag.addOperator("kafkaTopicAtLeast", KafkaSinglePortInputOperator.class);
kafkaInputTopicAtLeast.setInitialOffset(KafkaSinglePortInputOperator.InitialOffset.EARLIEST.name());
ValidationToFile validationToFile = dag.addOperator("validationToFile", ValidationToFile.class);
dag.addStream("messagesFromExactly", kafkaInputTopicExactly.outputPort, validationToFile.topicExactlyInput);
dag.addStream("messagesFromAtLeast", kafkaInputTopicAtLeast.outputPort, validationToFile.topicAtLeastInput);
}
}