blob: b89a05306e318386b0ef8e0139547904a73a27b0 [file] [log] [blame]
/*
* Copyright (c) 2012-2014 Malhar, Inc.
* All Rights Reserved.
*/
package com.datatorrent.contrib.goldengate.app;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.contrib.goldengate.DBQueryProcessor;
import com.datatorrent.contrib.goldengate.FileQueryProcessor;
import com.datatorrent.contrib.goldengate.KafkaJsonEncoder;
import com.datatorrent.contrib.goldengate.lib.CSVFileOutput;
import com.datatorrent.contrib.goldengate.lib.KafkaInput;
import com.datatorrent.contrib.goldengate.lib.OracleDBOutputOperator;
import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
@ApplicationAnnotation(name="GoldenGateDemo")
public class GoldenGateApp implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
KafkaInput kafkaInput = new KafkaInput();
dag.addOperator("GoldenGateInput", KafkaInput.class);
////
OracleDBOutputOperator db = new OracleDBOutputOperator();
dag.addOperator("OracleReplicator", db);
////
ConsoleOutputOperator console = new ConsoleOutputOperator();
dag.addOperator("Console", console);
////
CSVFileOutput csvFileOutput = new CSVFileOutput();
dag.addOperator("CSVReplicator", csvFileOutput);
////
dag.addStream("GoldenGateConsoleStream", kafkaInput.outputPort, console.input);
dag.addStream("OracleReplicatorStream", kafkaInput.employeePort, db.input);
dag.addStream("CSVReplicatorStream", kafkaInput.employeePort1, csvFileOutput.input);
////
KafkaSinglePortStringInputOperator dbQueryInput = dag.addOperator("DBQuery", KafkaSinglePortStringInputOperator.class);
DBQueryProcessor dbQueryProcessor = dag.addOperator("DBQueryProcessor", DBQueryProcessor.class);
KafkaSinglePortOutputOperator<Object, Object> dbQueryOutput = dag.addOperator("DBQueryResponse", new KafkaSinglePortOutputOperator<Object, Object>());
Properties configProperties = new Properties();
configProperties.setProperty("serializer.class", KafkaJsonEncoder.class.getName());
configProperties.setProperty("metadata.broker.list", "node25.morado.com:9092");
dbQueryOutput.setConfigProperties(configProperties);
dag.addStream("dbQueries", dbQueryInput.outputPort, dbQueryProcessor.queryInput);
dag.addStream("dbRows", dbQueryProcessor.queryOutput, dbQueryOutput.inputPort);
////
KafkaSinglePortStringInputOperator fileQueryInput = dag.addOperator("FileQuery", KafkaSinglePortStringInputOperator.class);
FileQueryProcessor fileQueryProcessor = dag.addOperator("FileQueryProcessor", FileQueryProcessor.class);
KafkaSinglePortOutputOperator<Object, Object> fileQueryOutput = dag.addOperator("FileQueryResponse", new KafkaSinglePortOutputOperator<Object, Object>());
fileQueryOutput.setConfigProperties(configProperties);
dag.addStream("fileQueries", fileQueryInput.outputPort, fileQueryProcessor.queryInput);
dag.addStream("fileData", fileQueryProcessor.queryOutput, fileQueryOutput.inputPort);
}
}