Merge branch 'goldengate' of github.com:DataTorrent/Malhar into goldengate

Conflicts:
	contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/DBQueryProcessor.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/DBQueryProcessor.java
new file mode 100644
index 0000000..ff98278
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/goldengate/DBQueryProcessor.java
@@ -0,0 +1,169 @@
+package com.datatorrent.contrib.goldengate;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.cache.*;
+
+import org.codehaus.jackson.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+
+import com.datatorrent.api.Context;
+
+import com.datatorrent.common.util.DTThrowable;
+
+/**
+ * Created by Pramod Immaneni <pramod@datatorrent.com> on 10/21/14.
+ */
+public class DBQueryProcessor extends QueryProcessor implements RemovalListener<String, PreparedStatement>
+{
+  private static final Logger logger = LoggerFactory.getLogger(DBQueryProcessor.class);
+  private static final String GET_RECENT_TABLE_ENTRIES = "GET_RECENT_TABLE_ENTRIES";
+
+  private static final String TABLE_DATA = "TABLE";
+
+  private static final String[] TABLE_HEADERS = {"Employee ID", "Name", "Department"};
+
+  private String getQuery = "select * from (select * from %s order by eid desc) where rownum < ?";
+
+  protected JdbcStore store;
+  private transient LoadingCache<String, PreparedStatement> statements;
+  private int statementSize = 100;
+
+  public DBQueryProcessor()
+  {
+    store = new JdbcStore();
+  }
+
+  public int getStatementSize()
+  {
+    return statementSize;
+  }
+
+  public void setStatementSize(int statementSize)
+  {
+    this.statementSize = statementSize;
+  }
+
+  public JdbcStore getStore()
+  {
+    return store;
+  }
+
+  public void setStore(JdbcStore store)
+  {
+    this.store = store;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    store.connect();
+    statements = CacheBuilder.newBuilder().maximumSize(statementSize).removalListener(this).build(new CacheLoader<String, PreparedStatement>()
+    {
+      @Override
+      public PreparedStatement load(String s) throws Exception
+      {
+        String getTableQuery = String.format(getQuery, s);
+        logger.info("Get query {}", getTableQuery);
+        return store.getConnection().prepareStatement(getTableQuery);
+      }
+    });
+  }
+
+  @Override
+  public void teardown()
+  {
+    statements.cleanUp();
+    store.disconnect();
+  }
+
+  @Override
+  public void onRemoval(RemovalNotification<String, PreparedStatement> notification)
+  {
+    try {
+      notification.getValue().close();
+    } catch (SQLException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  @Override
+  protected Class<? extends Query> getQueryClass(String selector, JsonNode json)
+  {
+    logger.info("Selector {} JSON {}", selector, json);
+    Class<? extends Query> queryClass = null;
+    if (selector.equals(GET_RECENT_TABLE_ENTRIES)) {
+      queryClass = GetRecentTableEntriesQuery.class;
+    }
+    return queryClass;
+  }
+
+  @Override
+  protected void executeQuery(Query query, QueryResults results)
+  {
+    if (query instanceof GetRecentTableEntriesQuery) {
+      processGetRecentTableEntries((GetRecentTableEntriesQuery)query, results);
+    }
+  }
+
+  public void processGetRecentTableEntries(GetRecentTableEntriesQuery query, QueryResults results) {
+    logger.info("Get recent entries query info {} {}", query.tableName, query.numberEntries);
+    String tableName = query.tableName;
+    int numberEntries = query.numberEntries;
+    try {
+      PreparedStatement getStatement = statements.get(tableName);
+      getStatement.setInt(1, numberEntries+1);
+      logger.info("query {}", getStatement);
+      ResultSet resultSet = getStatement.executeQuery();
+      List<Object[]> rows = new ArrayList<Object[]>();
+      while (resultSet.next()) {
+        Object[] row = new Object[3];
+        row[0] = resultSet.getInt(1);
+        row[1] = resultSet.getString(2);
+        row[2] = resultSet.getInt(3);
+        rows.add(row);
+      }
+      TableData resultsData = new TableData();
+      resultsData.headers = TABLE_HEADERS;
+      resultsData.rows = rows.toArray(new Object[0][0]);
+      results.setData(resultsData);
+      results.setType(TABLE_DATA);
+      logger.info("result rows {}", resultsData.rows.length);
+    } catch (SQLException e) {
+      DTThrowable.rethrow(e);
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static class GetRecentTableEntriesQuery extends Query
+  {
+    public String tableName;
+    public int numberEntries;
+
+    @Override
+    public String toString()
+    {
+      return "GetRecentTableEntriesQuery{" +
+              "tableName='" + tableName + '\'' +
+              ", numberEntries=" + numberEntries +
+              '}';
+    }
+  }
+
+  public static class TableData implements QueryResults.Data
+  {
+    public String[] headers;
+    public Object[][] rows;
+  }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/FileQueryProcessor.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/FileQueryProcessor.java
new file mode 100644
index 0000000..d0833e6
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/goldengate/FileQueryProcessor.java
@@ -0,0 +1,138 @@
+package com.datatorrent.contrib.goldengate;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import com.google.common.collect.EvictingQueue;
+
+import org.codehaus.jackson.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context;
+
+import com.datatorrent.common.util.DTThrowable;
+
+/**
+ * Created by Pramod Immaneni <pramod@datatorrent.com> on 10/22/14.
+ */
+public class FileQueryProcessor extends QueryProcessor
+{
+  private static final Logger logger = LoggerFactory.getLogger(FileQueryProcessor.class);
+  private static final String GET_LATEST_FILE_CONTENTS = "GET_LATEST_FILE_CONTENTS";
+
+  private static final String CONTENT_DATA = "CONTENT";
+
+  private String filePath;
+
+  private transient FileSystem fs;
+
+  public String getFilePath()
+  {
+    return filePath;
+  }
+
+  public void setFilePath(String filePath)
+  {
+    this.filePath = filePath;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    try {
+      fs = FileSystem.get(new Configuration());
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      fs.close();
+    } catch (IOException e) {
+      logger.error("Error closing filesystem", e);
+    }
+  }
+
+  @Override
+  protected Class<? extends Query> getQueryClass(String selector, JsonNode json)
+  {
+    logger.info("JSON {}", json);
+    Class<? extends Query> queryClass = null;
+    if (selector.equals(GET_LATEST_FILE_CONTENTS)) {
+      queryClass = GetLatestFileContentsQuery.class;
+    }
+    return queryClass;
+  }
+
+  @Override
+  protected void executeQuery(Query query, QueryResults results)
+  {
+    if (query instanceof GetLatestFileContentsQuery) {
+      processGetLatestFileContents((GetLatestFileContentsQuery)query, results);
+    }
+  }
+
+  public void processGetLatestFileContents(GetLatestFileContentsQuery query, QueryResults results) {
+    logger.info("File contents query info {} {}", query.filePath, query.numberLines);
+    String filePath = query.filePath;
+    if (filePath == null) filePath = this.filePath;
+    int numberLines = query.numberLines;
+    BufferedReader reader = null;
+    try {
+      EvictingQueue<String> queue = EvictingQueue.create(numberLines);
+      FSDataInputStream inputStream = fs.open(new Path(filePath));
+      reader = new BufferedReader(new InputStreamReader(inputStream));
+      String line = null;
+      while ((line = reader.readLine()) != null) {
+        queue.add(line);
+      }
+      ContentData contentData = new ContentData();
+      contentData.lines = queue.toArray(new String[0]);
+      results.setData(contentData);
+      results.setType(CONTENT_DATA);
+      logger.info("result lines {}", contentData.lines.length);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          logger.error("Error closing reader", e);
+        }
+      }
+    }
+  }
+
+  public static class GetLatestFileContentsQuery extends Query
+  {
+    public String filePath;
+    public int numberLines;
+
+    @Override
+    public String toString()
+    {
+      return "GetLatestFileContentsQuery{" +
+              "fileName='" + filePath + '\'' +
+              ", numberLines=" + numberLines +
+              '}';
+    }
+  }
+
+  public static class ContentData implements QueryResults.Data
+  {
+    public String[] lines;
+  }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/GoldenGateQueryProcessor.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/GoldenGateQueryProcessor.java
deleted file mode 100644
index 2088575..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/goldengate/GoldenGateQueryProcessor.java
+++ /dev/null
@@ -1,254 +0,0 @@
-package com.datatorrent.contrib.goldengate;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.cache.*;
-import com.google.common.collect.EvictingQueue;
-
-import org.codehaus.jackson.JsonNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.datatorrent.lib.db.jdbc.JdbcStore;
-
-import com.datatorrent.api.Context;
-
-import com.datatorrent.common.util.DTThrowable;
-
-/**
- * Created by Pramod Immaneni <pramod@datatorrent.com> on 10/21/14.
- */
-public class GoldenGateQueryProcessor extends QueryProcessor implements RemovalListener<String, PreparedStatement>
-{
-  private static final Logger logger = LoggerFactory.getLogger(GoldenGateQueryProcessor.class);
-  private static final String GET_RECENT_TABLE_ENTRIES = "GET_RECENT_TABLE_ENTRIES";
-  private static final String GET_LATEST_FILE_CONTENTS = "GET_LATEST_FILE_CONTENTS";
-
-  private static final String TABLE_DATA = "TABLE";
-  private static final String CONTENT_DATA = "CONTENT";
-
-  private static final String[] TABLE_HEADERS = {"Employee ID", "Name", "Department"};
-
-  private String getQuery = "select * from (select * from %s order by eid desc) where rownum < ?";
-
-  private String filePath;
-
-  protected JdbcStore store;
-
-  private transient FileSystem fs;
-  private transient LoadingCache<String, PreparedStatement> statements;
-  private int statementSize = 100;
-
-  public GoldenGateQueryProcessor()
-  {
-    store = new JdbcStore();
-  }
-
-  public JdbcStore getStore()
-  {
-    return store;
-  }
-
-  public void setStore(JdbcStore store)
-  {
-    this.store = store;
-  }
-
-  public int getStatementSize()
-  {
-    return statementSize;
-  }
-
-  public void setStatementSize(int statementSize)
-  {
-    this.statementSize = statementSize;
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    super.setup(context);
-    store.connect();
-    statements = CacheBuilder.newBuilder().maximumSize(statementSize).removalListener(this).build(new CacheLoader<String, PreparedStatement>()
-    {
-      @Override
-      public PreparedStatement load(String s) throws Exception
-      {
-        String getTableQuery = String.format(getQuery, s);
-        logger.info("Get query {}", getTableQuery);
-        return store.getConnection().prepareStatement(getTableQuery);
-      }
-    });
-    try {
-      fs = FileSystem.get(new Configuration());
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
-    }
-  }
-
-  @Override
-  public void teardown()
-  {
-    try {
-      fs.close();
-    } catch (IOException e) {
-      logger.error("Error closing filesystem", e);
-    }
-    statements.cleanUp();
-    store.disconnect();
-  }
-
-
-
-  @Override
-  public void onRemoval(RemovalNotification<String, PreparedStatement> notification)
-  {
-    try {
-      notification.getValue().close();
-    } catch (SQLException e) {
-      DTThrowable.rethrow(e);
-    }
-  }
-
-  public String getFilePath()
-  {
-    return filePath;
-  }
-
-  public void setFilePath(String filePath)
-  {
-    this.filePath = filePath;
-  }
-
-  @Override
-  protected Class<? extends Query> getQueryClass(JsonNode json)
-  {
-    logger.info("JSON {}", json);
-    Class<? extends Query> queryClass = null;
-    String selector = json.get("selector").getTextValue();
-    if (selector != null) {
-      if (selector.equals(GET_RECENT_TABLE_ENTRIES)) {
-        queryClass = GetRecentTableEntriesQuery.class;
-      } else if (selector.equals(GET_LATEST_FILE_CONTENTS)) {
-        queryClass = GetLatestFileContentsQuery.class;
-      }
-    }
-    return queryClass;
-  }
-
-  @Override
-  protected void executeQuery(Query query, QueryResults results)
-  {
-    if (query instanceof GetRecentTableEntriesQuery) {
-      processGetRecentTableEntries((GetRecentTableEntriesQuery)query, results);
-    } else if (query instanceof GetLatestFileContentsQuery) {
-      processGetLatestFileContents((GetLatestFileContentsQuery)query, results);
-    }
-  }
-
-  public void processGetRecentTableEntries(GetRecentTableEntriesQuery query, QueryResults results) {
-    logger.info("Get recent entries query info {} {}", query.tableName, query.numberEntries);
-    String tableName = query.tableName;
-    int numberEntries = query.numberEntries;
-    try {
-      PreparedStatement getStatement = statements.get(tableName);
-      getStatement.setInt(1, numberEntries+1);
-      logger.info("query {}", getStatement);
-      ResultSet resultSet = getStatement.executeQuery();
-      List<Object[]> rows = new ArrayList<Object[]>();
-      while (resultSet.next()) {
-        Object[] row = new Object[3];
-        row[0] = resultSet.getInt(1);
-        row[1] = resultSet.getString(2);
-        row[2] = resultSet.getInt(3);
-        rows.add(row);
-      }
-      TableData resultsData = new TableData();
-      resultsData.headers = TABLE_HEADERS;
-      resultsData.rows = rows.toArray(new Object[0][0]);
-      results.setData(resultsData);
-      results.setType(TABLE_DATA);
-    } catch (SQLException e) {
-      DTThrowable.rethrow(e);
-    } catch (ExecutionException e) {
-      e.printStackTrace();
-    }
-  }
-
-  public void processGetLatestFileContents(GetLatestFileContentsQuery query, QueryResults results) {
-    logger.info("File contents query info {} {}", query.filePath, query.numberLines);
-    String filePath = query.filePath;
-    if (filePath == null) filePath = this.filePath;
-    int numberLines = query.numberLines;
-    try {
-      EvictingQueue<String> queue = EvictingQueue.create(numberLines);
-      FSDataInputStream inputStream = fs.open(new Path(filePath));
-      BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
-      String line = null;
-      while ((line = reader.readLine()) != null) {
-        queue.add(line);
-      }
-      ContentData contentData = new ContentData();
-      contentData.lines = queue.toArray(new String[0]);
-      results.setData(contentData);
-      results.setType(CONTENT_DATA);
-      reader.close();
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
-    }
-  }
-
-  public static class GetRecentTableEntriesQuery extends Query
-  {
-    public String tableName;
-    public int numberEntries;
-
-    @Override
-    public String toString()
-    {
-      return "GetRecentTableEntriesQuery{" +
-              "tableName='" + tableName + '\'' +
-              ", numberEntries=" + numberEntries +
-              '}';
-    }
-  }
-
-  public static class GetLatestFileContentsQuery extends Query
-  {
-    public String filePath;
-    public int numberLines;
-
-    @Override
-    public String toString()
-    {
-      return "GetLatestFileContentsQuery{" +
-              "fileName='" + filePath + '\'' +
-              ", numberLines=" + numberLines +
-              '}';
-    }
-  }
-  public static class TableData implements QueryResults.Data
-  {
-    public String[] headers;
-    public Object[][] rows;
-  }
-
-  public static class ContentData implements QueryResults.Data
-  {
-    public String[] lines;
-  }
-
-}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/QueryProcessor.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/QueryProcessor.java
index 6b19a72..7160f4e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/goldengate/QueryProcessor.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/goldengate/QueryProcessor.java
@@ -77,14 +77,22 @@
   {
     logger.debug("process query {}", queryString);
     try {
-      // Not efficient reading json twice
       JsonNode json = mapper.readTree(queryString);
-      Class<? extends Query> queryClass = getQueryClass(json);
-
-      if (queryClass != null) {
-        Query query = mapper.readValue(queryString, queryClass);
-        setQueryProperties(query, json);
-        processQuery(query);
+      JsonNode idJson = json.get("id");
+      JsonNode keys = json.get("keys");
+      if ((idJson != null) && (keys != null)) {
+        String id = idJson.getTextValue();
+        JsonNode selectorJson = keys.get("selector");
+        if (selectorJson != null) {
+          String selector = selectorJson.getTextValue();
+          Class<? extends Query> queryClass = getQueryClass(selector, keys);
+          if (queryClass != null) {
+            Query query = mapper.readValue(keys, queryClass);
+            query.id = id;
+            setQueryProperties(query, keys);
+            processQuery(query);
+          }
+        }
       }
     }
     catch (Exception ex) {
@@ -112,7 +120,7 @@
   protected void setQueryProperties(Query query, JsonNode json) {
   }
 
-  protected abstract Class<? extends Query> getQueryClass(JsonNode json);
+  protected abstract Class<? extends Query> getQueryClass(String selector, JsonNode json);
 
   protected abstract void executeQuery(Query query, QueryResults results);
 
diff --git a/contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java b/contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java
index 763220d..e7ef8a4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/goldengate/app/GoldenGateApp.java
@@ -5,17 +5,24 @@
 
 package com.datatorrent.contrib.goldengate.app;
 
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+import com.datatorrent.contrib.goldengate.DBQueryProcessor;
+import com.datatorrent.contrib.goldengate.FileQueryProcessor;
+import com.datatorrent.contrib.goldengate.KafkaJsonEncoder;
+import com.datatorrent.contrib.goldengate.lib.CSVFileOutput;
+import com.datatorrent.contrib.goldengate.lib.KafkaInput;
+import com.datatorrent.contrib.goldengate.lib.OracleDBOutputOperator;
+import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.goldengate.GoldenGateQueryProcessor;
-import com.datatorrent.contrib.goldengate.KafkaJsonEncoder;
-import com.datatorrent.contrib.goldengate.lib.*;
-import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
 
 @ApplicationAnnotation(name="GoldenGateDemo")
 public class GoldenGateApp implements StreamingApplication
@@ -51,22 +58,27 @@
 
     ////
 
-    KafkaSinglePortStringInputOperator queryInput = dag.addOperator("QueryInput", KafkaSinglePortStringInputOperator.class);
-
-    //
-
-    GoldenGateQueryProcessor queryProcessor = dag.addOperator("QueryProcessor", GoldenGateQueryProcessor.class);
-
-    //
-
-    KafkaSinglePortOutputOperator<Object, Object> queryOutput = dag.addOperator("QueryResult", new KafkaSinglePortOutputOperator<Object, Object>());
+    KafkaSinglePortStringInputOperator dbQueryInput = dag.addOperator("DBQuery", KafkaSinglePortStringInputOperator.class);
+    DBQueryProcessor dbQueryProcessor = dag.addOperator("DBQueryProcessor", DBQueryProcessor.class);
+    KafkaSinglePortOutputOperator<Object, Object> dbQueryOutput = dag.addOperator("DBQueryResponse", new KafkaSinglePortOutputOperator<Object, Object>());
 
     Properties configProperties = new Properties();
     configProperties.setProperty("serializer.class", KafkaJsonEncoder.class.getName());
     configProperties.setProperty("metadata.broker.list", "node25.morado.com:9092");
-    queryOutput.setConfigProperties(configProperties);
+    dbQueryOutput.setConfigProperties(configProperties);
 
-    dag.addStream("queries", queryInput.outputPort, queryProcessor.queryInput);
-    dag.addStream("results", queryProcessor.queryOutput, queryOutput.inputPort);
+    dag.addStream("dbQueries", dbQueryInput.outputPort, dbQueryProcessor.queryInput);
+    dag.addStream("dbRows", dbQueryProcessor.queryOutput, dbQueryOutput.inputPort);
+
+    ////
+
+    KafkaSinglePortStringInputOperator fileQueryInput = dag.addOperator("FileQuery", KafkaSinglePortStringInputOperator.class);
+    FileQueryProcessor fileQueryProcessor = dag.addOperator("FileQueryProcessor", FileQueryProcessor.class);
+    KafkaSinglePortOutputOperator<Object, Object> fileQueryOutput = dag.addOperator("FileQueryResponse", new KafkaSinglePortOutputOperator<Object, Object>());
+
+    fileQueryOutput.setConfigProperties(configProperties);
+
+    dag.addStream("fileQueries", fileQueryInput.outputPort, fileQueryProcessor.queryInput);
+    dag.addStream("fileData", fileQueryProcessor.queryOutput, fileQueryOutput.inputPort);
   }
 }