APEXMALHAR-2513 JdbcPollInputOperator does not respect poll interval setting. (#649)
diff --git a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java
index 95ead8b..6863d6a 100644
--- a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java
+++ b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java
@@ -44,7 +44,7 @@
private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
private static final String TABLE_NAME = "test_event_table";
- private static final String OUTPUT_DIR_NAME = "/tmp/test/output";
+ private static final String OUTPUT_DIR_NAME = "./target/tmp/test/output";
@BeforeClass
public static void setup()
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index 504f7fa..af8f77f 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -132,7 +132,6 @@
private transient volatile boolean execute;
private transient ScheduledExecutorService scanService;
private transient ScheduledFuture<?> pollFuture;
- protected transient boolean isPolled;
protected transient LinkedBlockingDeque<T> emitQueue;
protected transient PreparedStatement ps;
protected boolean isPollerPartition;
@@ -176,6 +175,15 @@
}
}
+ private class DBPoller implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ pollRecords();
+ }
+ }
+
private void schedulePollTask()
{
if (isPollerPartition) {
@@ -216,9 +224,6 @@
throw new RuntimeException("Replay failed", e);
}
}
- if (isPollerPartition) {
- isPolled = false;
- }
lowerBound = lastEmittedRow;
}
@@ -287,22 +292,23 @@
ResultSet result = preparedStatement.executeQuery();
if (result.next()) {
do {
- while (!emitQueue.offer(getTuple(result))) {
+ while (execute && !emitQueue.offer(getTuple(result))) {
Thread.sleep(DEFAULT_SLEEP_TIME);
}
- } while (result.next());
+ } while (execute && result.next());
result.close();
}
preparedStatement.close();
}
+ /**
+ * Fetch results from JDBC and transfer to queue.
+ */
protected void pollRecords()
{
- if (isPolled) {
- return;
- }
try {
if (isPollerPartition) {
+ LOG.debug("poll query");
int nextOffset = getRecordsCount();
while (lastOffset < nextOffset) {
PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastOffset, resultLimit),
@@ -314,7 +320,6 @@
} else {
insertDbDataInQueue(ps);
}
- isPolled = true;
} catch (SQLException | InterruptedException ex) {
throw new RuntimeException(ex);
} finally {
@@ -323,7 +328,6 @@
execute = false;
}
}
- isPolled = true;
}
public abstract T getTuple(ResultSet result);
@@ -522,28 +526,6 @@
return sqlQuery;
}
- /**
- * This class polls a store that can be queried with a JDBC interface The
- * preparedStatement is updated as more rows are read
- */
- public class DBPoller implements Runnable
- {
- @Override
- public void run()
- {
- try {
- LOG.debug("Entering poll task");
- while (execute) {
- if ((isPollerPartition && !isPolled) || !isPollerPartition) {
- pollRecords();
- }
- }
- } finally {
- LOG.debug("Exiting poll task");
- }
- }
- }
-
@VisibleForTesting
protected void setScheduledExecutorService(ScheduledExecutorService service)
{
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
index 8442d55..897fe10 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOPollInputOperator.java
@@ -186,10 +186,9 @@
protected void populateColumnDataTypes() throws SQLException
{
columnDataTypes = Lists.newArrayList();
- PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1));
- try (ResultSet rs = stmt.executeQuery()) {
+ try (PreparedStatement stmt = store.getConnection().prepareStatement(buildRangeQuery(1, 1))) {
Map<String, Integer> nameToType = Maps.newHashMap();
- ResultSetMetaData rsMetaData = rs.getMetaData();
+ ResultSetMetaData rsMetaData = stmt.getMetaData();
LOG.debug("resultSet MetaData column count {}", rsMetaData.getColumnCount());
for (int i = 1; i <= rsMetaData.getColumnCount(); i++) {
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
index c01804f..1f8d7db 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java
@@ -137,7 +137,13 @@
firstInstance.outputPort.setSink(sink1);
firstInstance.beginWindow(0);
firstInstance.pollRecords();
- firstInstance.pollRecords();
+ try {
+ firstInstance.pollRecords();
+ // non-poller partition
+ Assert.fail("expected closed connection");
+ } catch (Exception e) {
+ // expected
+ }
firstInstance.emitTuples();
firstInstance.endWindow();
@@ -168,6 +174,7 @@
thirdInstance.outputPort.setSink(sink3);
thirdInstance.beginWindow(0);
thirdInstance.pollRecords();
+ thirdInstance.pollRecords();
thirdInstance.emitTuples();
thirdInstance.endWindow();
@@ -280,8 +287,9 @@
CollectorTestSink<Object> sink1 = new CollectorTestSink<>();
firstInstance.outputPort.setSink(sink1);
firstInstance.beginWindow(0);
+ Assert.assertFalse(firstInstance.ps.isClosed());
firstInstance.pollRecords();
- firstInstance.pollRecords();
+ Assert.assertTrue(firstInstance.ps.isClosed());
firstInstance.emitTuples();
firstInstance.endWindow();