Merge branch 'goldengate' of github.com:DataTorrent/Malhar into goldengate
Conflicts:
contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/DBQueryProcessor.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/DBQueryProcessor.java
new file mode 100644
index 0000000..ff98278
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/goldengate/DBQueryProcessor.java
@@ -0,0 +1,169 @@
+package com.datatorrent.contrib.goldengate;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.cache.*;
+
+import org.codehaus.jackson.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+
+import com.datatorrent.api.Context;
+
+import com.datatorrent.common.util.DTThrowable;
+
+/**
+ * Created by Pramod Immaneni <pramod@datatorrent.com> on 10/21/14.
+ */
+public class DBQueryProcessor extends QueryProcessor implements RemovalListener<String, PreparedStatement>
+{
+ private static final Logger logger = LoggerFactory.getLogger(DBQueryProcessor.class);
+ private static final String GET_RECENT_TABLE_ENTRIES = "GET_RECENT_TABLE_ENTRIES";
+
+ private static final String TABLE_DATA = "TABLE";
+
+ private static final String[] TABLE_HEADERS = {"Employee ID", "Name", "Department"};
+
+ private String getQuery = "select * from (select * from %s order by eid desc) where rownum < ?";
+
+ protected JdbcStore store;
+ private transient LoadingCache<String, PreparedStatement> statements;
+ private int statementSize = 100;
+
+ public DBQueryProcessor()
+ {
+ store = new JdbcStore();
+ }
+
+ public int getStatementSize()
+ {
+ return statementSize;
+ }
+
+ public void setStatementSize(int statementSize)
+ {
+ this.statementSize = statementSize;
+ }
+
+ public JdbcStore getStore()
+ {
+ return store;
+ }
+
+ public void setStore(JdbcStore store)
+ {
+ this.store = store;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ store.connect();
+ statements = CacheBuilder.newBuilder().maximumSize(statementSize).removalListener(this).build(new CacheLoader<String, PreparedStatement>()
+ {
+ @Override
+ public PreparedStatement load(String s) throws Exception
+ {
+ String getTableQuery = String.format(getQuery, s);
+ logger.info("Get query {}", getTableQuery);
+ return store.getConnection().prepareStatement(getTableQuery);
+ }
+ });
+ }
+
+ @Override
+ public void teardown()
+ {
+ statements.cleanUp();
+ store.disconnect();
+ }
+
+ @Override
+ public void onRemoval(RemovalNotification<String, PreparedStatement> notification)
+ {
+ try {
+ notification.getValue().close();
+ } catch (SQLException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ @Override
+ protected Class<? extends Query> getQueryClass(String selector, JsonNode json)
+ {
+ logger.info("Selector {} JSON {}", selector, json);
+ Class<? extends Query> queryClass = null;
+ if (selector.equals(GET_RECENT_TABLE_ENTRIES)) {
+ queryClass = GetRecentTableEntriesQuery.class;
+ }
+ return queryClass;
+ }
+
+ @Override
+ protected void executeQuery(Query query, QueryResults results)
+ {
+ if (query instanceof GetRecentTableEntriesQuery) {
+ processGetRecentTableEntries((GetRecentTableEntriesQuery)query, results);
+ }
+ }
+
+ public void processGetRecentTableEntries(GetRecentTableEntriesQuery query, QueryResults results) {
+ logger.info("Get recent entries query info {} {}", query.tableName, query.numberEntries);
+ String tableName = query.tableName;
+ int numberEntries = query.numberEntries;
+ try {
+ PreparedStatement getStatement = statements.get(tableName);
+ getStatement.setInt(1, numberEntries+1);
+ logger.info("query {}", getStatement);
+ ResultSet resultSet = getStatement.executeQuery();
+ List<Object[]> rows = new ArrayList<Object[]>();
+ while (resultSet.next()) {
+ Object[] row = new Object[3];
+ row[0] = resultSet.getInt(1);
+ row[1] = resultSet.getString(2);
+ row[2] = resultSet.getInt(3);
+ rows.add(row);
+ }
+ TableData resultsData = new TableData();
+ resultsData.headers = TABLE_HEADERS;
+ resultsData.rows = rows.toArray(new Object[0][0]);
+ results.setData(resultsData);
+ results.setType(TABLE_DATA);
+ logger.info("result rows {}", resultsData.rows.length);
+ } catch (SQLException e) {
+ DTThrowable.rethrow(e);
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static class GetRecentTableEntriesQuery extends Query
+ {
+ public String tableName;
+ public int numberEntries;
+
+ @Override
+ public String toString()
+ {
+ return "GetRecentTableEntriesQuery{" +
+ "tableName='" + tableName + '\'' +
+ ", numberEntries=" + numberEntries +
+ '}';
+ }
+ }
+
+ public static class TableData implements QueryResults.Data
+ {
+ public String[] headers;
+ public Object[][] rows;
+ }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/FileQueryProcessor.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/FileQueryProcessor.java
new file mode 100644
index 0000000..d0833e6
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/goldengate/FileQueryProcessor.java
@@ -0,0 +1,138 @@
+package com.datatorrent.contrib.goldengate;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import com.google.common.collect.EvictingQueue;
+
+import org.codehaus.jackson.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context;
+
+import com.datatorrent.common.util.DTThrowable;
+
+/**
+ * Created by Pramod Immaneni <pramod@datatorrent.com> on 10/22/14.
+ */
+public class FileQueryProcessor extends QueryProcessor
+{
+ private static final Logger logger = LoggerFactory.getLogger(FileQueryProcessor.class);
+ private static final String GET_LATEST_FILE_CONTENTS = "GET_LATEST_FILE_CONTENTS";
+
+ private static final String CONTENT_DATA = "CONTENT";
+
+ private String filePath;
+
+ private transient FileSystem fs;
+
+ public String getFilePath()
+ {
+ return filePath;
+ }
+
+ public void setFilePath(String filePath)
+ {
+ this.filePath = filePath;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ try {
+ fs = FileSystem.get(new Configuration());
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ logger.error("Error closing filesystem", e);
+ }
+ }
+
+ @Override
+ protected Class<? extends Query> getQueryClass(String selector, JsonNode json)
+ {
+ logger.info("JSON {}", json);
+ Class<? extends Query> queryClass = null;
+ if (selector.equals(GET_LATEST_FILE_CONTENTS)) {
+ queryClass = GetLatestFileContentsQuery.class;
+ }
+ return queryClass;
+ }
+
+ @Override
+ protected void executeQuery(Query query, QueryResults results)
+ {
+ if (query instanceof GetLatestFileContentsQuery) {
+ processGetLatestFileContents((GetLatestFileContentsQuery)query, results);
+ }
+ }
+
+ public void processGetLatestFileContents(GetLatestFileContentsQuery query, QueryResults results) {
+ logger.info("File contents query info {} {}", query.filePath, query.numberLines);
+ String filePath = query.filePath;
+ if (filePath == null) filePath = this.filePath;
+ int numberLines = query.numberLines;
+ BufferedReader reader = null;
+ try {
+ EvictingQueue<String> queue = EvictingQueue.create(numberLines);
+ FSDataInputStream inputStream = fs.open(new Path(filePath));
+ reader = new BufferedReader(new InputStreamReader(inputStream));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ queue.add(line);
+ }
+ ContentData contentData = new ContentData();
+ contentData.lines = queue.toArray(new String[0]);
+ results.setData(contentData);
+ results.setType(CONTENT_DATA);
+ logger.info("result lines {}", contentData.lines.length);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ logger.error("Error closing reader", e);
+ }
+ }
+ }
+ }
+
+ public static class GetLatestFileContentsQuery extends Query
+ {
+ public String filePath;
+ public int numberLines;
+
+ @Override
+ public String toString()
+ {
+ return "GetLatestFileContentsQuery{" +
+ "fileName='" + filePath + '\'' +
+ ", numberLines=" + numberLines +
+ '}';
+ }
+ }
+
+ public static class ContentData implements QueryResults.Data
+ {
+ public String[] lines;
+ }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/GoldenGateQueryProcessor.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/GoldenGateQueryProcessor.java
deleted file mode 100644
index 2088575..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/goldengate/GoldenGateQueryProcessor.java
+++ /dev/null
@@ -1,254 +0,0 @@
-package com.datatorrent.contrib.goldengate;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.cache.*;
-import com.google.common.collect.EvictingQueue;
-
-import org.codehaus.jackson.JsonNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.lib.db.jdbc.JdbcStore;
-
-import com.datatorrent.api.Context;
-
-import com.datatorrent.common.util.DTThrowable;
-
-/**
- * Created by Pramod Immaneni <pramod@datatorrent.com> on 10/21/14.
- */
-public class GoldenGateQueryProcessor extends QueryProcessor implements RemovalListener<String, PreparedStatement>
-{
- private static final Logger logger = LoggerFactory.getLogger(GoldenGateQueryProcessor.class);
- private static final String GET_RECENT_TABLE_ENTRIES = "GET_RECENT_TABLE_ENTRIES";
- private static final String GET_LATEST_FILE_CONTENTS = "GET_LATEST_FILE_CONTENTS";
-
- private static final String TABLE_DATA = "TABLE";
- private static final String CONTENT_DATA = "CONTENT";
-
- private static final String[] TABLE_HEADERS = {"Employee ID", "Name", "Department"};
-
- private String getQuery = "select * from (select * from %s order by eid desc) where rownum < ?";
-
- private String filePath;
-
- protected JdbcStore store;
-
- private transient FileSystem fs;
- private transient LoadingCache<String, PreparedStatement> statements;
- private int statementSize = 100;
-
- public GoldenGateQueryProcessor()
- {
- store = new JdbcStore();
- }
-
- public JdbcStore getStore()
- {
- return store;
- }
-
- public void setStore(JdbcStore store)
- {
- this.store = store;
- }
-
- public int getStatementSize()
- {
- return statementSize;
- }
-
- public void setStatementSize(int statementSize)
- {
- this.statementSize = statementSize;
- }
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- super.setup(context);
- store.connect();
- statements = CacheBuilder.newBuilder().maximumSize(statementSize).removalListener(this).build(new CacheLoader<String, PreparedStatement>()
- {
- @Override
- public PreparedStatement load(String s) throws Exception
- {
- String getTableQuery = String.format(getQuery, s);
- logger.info("Get query {}", getTableQuery);
- return store.getConnection().prepareStatement(getTableQuery);
- }
- });
- try {
- fs = FileSystem.get(new Configuration());
- } catch (IOException e) {
- DTThrowable.rethrow(e);
- }
- }
-
- @Override
- public void teardown()
- {
- try {
- fs.close();
- } catch (IOException e) {
- logger.error("Error closing filesystem", e);
- }
- statements.cleanUp();
- store.disconnect();
- }
-
-
-
- @Override
- public void onRemoval(RemovalNotification<String, PreparedStatement> notification)
- {
- try {
- notification.getValue().close();
- } catch (SQLException e) {
- DTThrowable.rethrow(e);
- }
- }
-
- public String getFilePath()
- {
- return filePath;
- }
-
- public void setFilePath(String filePath)
- {
- this.filePath = filePath;
- }
-
- @Override
- protected Class<? extends Query> getQueryClass(JsonNode json)
- {
- logger.info("JSON {}", json);
- Class<? extends Query> queryClass = null;
- String selector = json.get("selector").getTextValue();
- if (selector != null) {
- if (selector.equals(GET_RECENT_TABLE_ENTRIES)) {
- queryClass = GetRecentTableEntriesQuery.class;
- } else if (selector.equals(GET_LATEST_FILE_CONTENTS)) {
- queryClass = GetLatestFileContentsQuery.class;
- }
- }
- return queryClass;
- }
-
- @Override
- protected void executeQuery(Query query, QueryResults results)
- {
- if (query instanceof GetRecentTableEntriesQuery) {
- processGetRecentTableEntries((GetRecentTableEntriesQuery)query, results);
- } else if (query instanceof GetLatestFileContentsQuery) {
- processGetLatestFileContents((GetLatestFileContentsQuery)query, results);
- }
- }
-
- public void processGetRecentTableEntries(GetRecentTableEntriesQuery query, QueryResults results) {
- logger.info("Get recent entries query info {} {}", query.tableName, query.numberEntries);
- String tableName = query.tableName;
- int numberEntries = query.numberEntries;
- try {
- PreparedStatement getStatement = statements.get(tableName);
- getStatement.setInt(1, numberEntries+1);
- logger.info("query {}", getStatement);
- ResultSet resultSet = getStatement.executeQuery();
- List<Object[]> rows = new ArrayList<Object[]>();
- while (resultSet.next()) {
- Object[] row = new Object[3];
- row[0] = resultSet.getInt(1);
- row[1] = resultSet.getString(2);
- row[2] = resultSet.getInt(3);
- rows.add(row);
- }
- TableData resultsData = new TableData();
- resultsData.headers = TABLE_HEADERS;
- resultsData.rows = rows.toArray(new Object[0][0]);
- results.setData(resultsData);
- results.setType(TABLE_DATA);
- } catch (SQLException e) {
- DTThrowable.rethrow(e);
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
-
- public void processGetLatestFileContents(GetLatestFileContentsQuery query, QueryResults results) {
- logger.info("File contents query info {} {}", query.filePath, query.numberLines);
- String filePath = query.filePath;
- if (filePath == null) filePath = this.filePath;
- int numberLines = query.numberLines;
- try {
- EvictingQueue<String> queue = EvictingQueue.create(numberLines);
- FSDataInputStream inputStream = fs.open(new Path(filePath));
- BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
- String line = null;
- while ((line = reader.readLine()) != null) {
- queue.add(line);
- }
- ContentData contentData = new ContentData();
- contentData.lines = queue.toArray(new String[0]);
- results.setData(contentData);
- results.setType(CONTENT_DATA);
- reader.close();
- } catch (IOException e) {
- DTThrowable.rethrow(e);
- }
- }
-
- public static class GetRecentTableEntriesQuery extends Query
- {
- public String tableName;
- public int numberEntries;
-
- @Override
- public String toString()
- {
- return "GetRecentTableEntriesQuery{" +
- "tableName='" + tableName + '\'' +
- ", numberEntries=" + numberEntries +
- '}';
- }
- }
-
- public static class GetLatestFileContentsQuery extends Query
- {
- public String filePath;
- public int numberLines;
-
- @Override
- public String toString()
- {
- return "GetLatestFileContentsQuery{" +
- "fileName='" + filePath + '\'' +
- ", numberLines=" + numberLines +
- '}';
- }
- }
- public static class TableData implements QueryResults.Data
- {
- public String[] headers;
- public Object[][] rows;
- }
-
- public static class ContentData implements QueryResults.Data
- {
- public String[] lines;
- }
-
-}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/QueryProcessor.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/QueryProcessor.java
index 6b19a72..7160f4e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/goldengate/QueryProcessor.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/goldengate/QueryProcessor.java
@@ -77,14 +77,22 @@
{
logger.debug("process query {}", queryString);
try {
- // Not efficient reading json twice
JsonNode json = mapper.readTree(queryString);
- Class<? extends Query> queryClass = getQueryClass(json);
-
- if (queryClass != null) {
- Query query = mapper.readValue(queryString, queryClass);
- setQueryProperties(query, json);
- processQuery(query);
+ JsonNode idJson = json.get("id");
+ JsonNode keys = json.get("keys");
+ if ((idJson != null) && (keys != null)) {
+ String id = idJson.getTextValue();
+ JsonNode selectorJson = keys.get("selector");
+ if (selectorJson != null) {
+ String selector = selectorJson.getTextValue();
+ Class<? extends Query> queryClass = getQueryClass(selector, keys);
+ if (queryClass != null) {
+ Query query = mapper.readValue(keys, queryClass);
+ query.id = id;
+ setQueryProperties(query, keys);
+ processQuery(query);
+ }
+ }
}
}
catch (Exception ex) {
@@ -112,7 +120,7 @@
protected void setQueryProperties(Query query, JsonNode json) {
}
- protected abstract Class<? extends Query> getQueryClass(JsonNode json);
+ protected abstract Class<? extends Query> getQueryClass(String selector, JsonNode json);
protected abstract void executeQuery(Query query, QueryResults results);
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java
index 763220d..e7ef8a4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java
@@ -5,17 +5,24 @@
package com.datatorrent.contrib.goldengate.app;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+import com.datatorrent.contrib.goldengate.DBQueryProcessor;
+import com.datatorrent.contrib.goldengate.FileQueryProcessor;
+import com.datatorrent.contrib.goldengate.KafkaJsonEncoder;
+import com.datatorrent.contrib.goldengate.lib.CSVFileOutput;
+import com.datatorrent.contrib.goldengate.lib.KafkaInput;
+import com.datatorrent.contrib.goldengate.lib.OracleDBOutputOperator;
+import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.goldengate.GoldenGateQueryProcessor;
-import com.datatorrent.contrib.goldengate.KafkaJsonEncoder;
-import com.datatorrent.contrib.goldengate.lib.*;
-import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
@ApplicationAnnotation(name="GoldenGateDemo")
public class GoldenGateApp implements StreamingApplication
@@ -51,22 +58,27 @@
////
- KafkaSinglePortStringInputOperator queryInput = dag.addOperator("QueryInput", KafkaSinglePortStringInputOperator.class);
-
- //
-
- GoldenGateQueryProcessor queryProcessor = dag.addOperator("QueryProcessor", GoldenGateQueryProcessor.class);
-
- //
-
- KafkaSinglePortOutputOperator<Object, Object> queryOutput = dag.addOperator("QueryResult", new KafkaSinglePortOutputOperator<Object, Object>());
+ KafkaSinglePortStringInputOperator dbQueryInput = dag.addOperator("DBQuery", KafkaSinglePortStringInputOperator.class);
+ DBQueryProcessor dbQueryProcessor = dag.addOperator("DBQueryProcessor", DBQueryProcessor.class);
+ KafkaSinglePortOutputOperator<Object, Object> dbQueryOutput = dag.addOperator("DBQueryResponse", new KafkaSinglePortOutputOperator<Object, Object>());
Properties configProperties = new Properties();
configProperties.setProperty("serializer.class", KafkaJsonEncoder.class.getName());
configProperties.setProperty("metadata.broker.list", "node25.morado.com:9092");
- queryOutput.setConfigProperties(configProperties);
+ dbQueryOutput.setConfigProperties(configProperties);
- dag.addStream("queries", queryInput.outputPort, queryProcessor.queryInput);
- dag.addStream("results", queryProcessor.queryOutput, queryOutput.inputPort);
+ dag.addStream("dbQueries", dbQueryInput.outputPort, dbQueryProcessor.queryInput);
+ dag.addStream("dbRows", dbQueryProcessor.queryOutput, dbQueryOutput.inputPort);
+
+ ////
+
+ KafkaSinglePortStringInputOperator fileQueryInput = dag.addOperator("FileQuery", KafkaSinglePortStringInputOperator.class);
+ FileQueryProcessor fileQueryProcessor = dag.addOperator("FileQueryProcessor", FileQueryProcessor.class);
+ KafkaSinglePortOutputOperator<Object, Object> fileQueryOutput = dag.addOperator("FileQueryResponse", new KafkaSinglePortOutputOperator<Object, Object>());
+
+ fileQueryOutput.setConfigProperties(configProperties);
+
+ dag.addStream("fileQueries", fileQueryInput.outputPort, fileQueryProcessor.queryInput);
+ dag.addStream("fileData", fileQueryProcessor.queryOutput, fileQueryOutput.inputPort);
}
}