APEXMALHAR-2513 JdbcPollInputOperator does not respect poll interval setting. (#649)

diff --git a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java
index 95ead8b..6863d6a 100644
--- a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java
+++ b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java
@@ -44,7 +44,7 @@
   private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
   private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
   private static final String TABLE_NAME = "test_event_table";
-  private static final String OUTPUT_DIR_NAME = "/tmp/test/output";
+  private static final String OUTPUT_DIR_NAME = "./target/tmp/test/output";
 
   @BeforeClass
   public static void setup()
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index 504f7fa..af8f77f 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -132,7 +132,6 @@
   private transient volatile boolean execute;
   private transient ScheduledExecutorService scanService;
   private transient ScheduledFuture<?> pollFuture;
-  protected transient boolean isPolled;
   protected transient LinkedBlockingDeque<T> emitQueue;
   protected transient PreparedStatement ps;
   protected boolean isPollerPartition;
@@ -176,6 +175,15 @@
     }
   }
 
+  private class DBPoller implements Runnable
+  {
+    @Override
+    public void run()
+    {
+      pollRecords();
+    }
+  }
+
   private void schedulePollTask()
   {
     if (isPollerPartition) {
@@ -216,9 +224,6 @@
         throw new RuntimeException("Replay failed", e);
       }
     }
-    if (isPollerPartition) {
-      isPolled = false;
-    }
     lowerBound = lastEmittedRow;
   }
 
@@ -287,22 +292,23 @@
     ResultSet result = preparedStatement.executeQuery();
     if (result.next()) {
       do {
-        while (!emitQueue.offer(getTuple(result))) {
+        while (execute && !emitQueue.offer(getTuple(result))) {
           Thread.sleep(DEFAULT_SLEEP_TIME);
         }
-      } while (result.next());
+      } while (execute && result.next());
       result.close();
     }
     preparedStatement.close();
   }
 
+  /**
+   * Fetch results from JDBC and transfer to queue.
+   */
   protected void pollRecords()
   {
-    if (isPolled) {
-      return;
-    }
     try {
       if (isPollerPartition) {
+        LOG.debug("poll query");
         int nextOffset = getRecordsCount();
         while (lastOffset < nextOffset) {
           PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastOffset, resultLimit),
@@ -314,7 +320,6 @@
       } else {
         insertDbDataInQueue(ps);
       }
-      isPolled = true;
     } catch (SQLException | InterruptedException ex) {
       throw new RuntimeException(ex);
     } finally {
@@ -323,7 +328,6 @@
         execute = false;
       }
     }
-    isPolled = true;
   }
 
   public abstract T getTuple(ResultSet result);
@@ -522,28 +526,6 @@
     return sqlQuery;
   }
 
-  /**
-   * This class polls a store that can be queried with a JDBC interface The
-   * preparedStatement is updated as more rows are read
-   */
-  public class DBPoller implements Runnable
-  {
-    @Override
-    public void run()
-    {
-      try {
-        LOG.debug("Entering poll task");
-        while (execute) {
-          if ((isPollerPartition && !isPolled) || !isPollerPartition) {
-            pollRecords();
-          }
-        }
-      } finally {
-        LOG.debug("Exiting poll task");
-      }
-    }
-  }
-
   @VisibleForTesting
   protected void setScheduledExecutorService(ScheduledExecutorService service)
   {
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
index 8442d55..897fe10 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -186,10 +186,9 @@
   protected void populateColumnDataTypes() throws SQLException
   {
     columnDataTypes = Lists.newArrayList();
-    PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1));
-    try (ResultSet rs = stmt.executeQuery()) {
+    try (PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1))) {
       Map<String, Integer> nameToType = Maps.newHashMap();
-      ResultSetMetaData rsMetaData = rs.getMetaData();
+      ResultSetMetaData rsMetaData = stmt.getMetaData();
       LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
 
       for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index c01804f..1f8d7db 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -137,7 +137,13 @@
     firstInstance.outputPort.setSink(sink1);
     firstInstance.beginWindow(0);
     firstInstance.pollRecords();
-    firstInstance.pollRecords();
+    try {
+      firstInstance.pollRecords();
+      // non-poller partition
+      Assert.fail("expected closed connection");
+    } catch (Exception e) {
+      // expected
+    }
     firstInstance.emitTuples();
     firstInstance.endWindow();
 
@@ -168,6 +174,7 @@
     thirdInstance.outputPort.setSink(sink3);
     thirdInstance.beginWindow(0);
     thirdInstance.pollRecords();
+    thirdInstance.pollRecords();
     thirdInstance.emitTuples();
     thirdInstance.endWindow();
 
@@ -280,8 +287,9 @@
     CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
     firstInstance.outputPort.setSink(sink1);
     firstInstance.beginWindow(0);
+    Assert.assertFalse(firstInstance.ps.isClosed());
     firstInstance.pollRecords();
-    firstInstance.pollRecords();
+    Assert.assertTrue(firstInstance.ps.isClosed());
     firstInstance.emitTuples();
     firstInstance.endWindow();