blob: 2042ac821f5c15369e4f2a6c803153d4da48c3cf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.db.jdbc;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
import org.jooq.tools.jdbc.JDBCUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.google.common.annotations.VisibleForTesting;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.KryoCloneUtils;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
import static org.jooq.impl.DSL.field;
/**
* Abstract operator for consuming data using JDBC interface<br>
* User needs to provide tableName, dbConnection, columnsExpression, look-up key<br>
* Optionally batchSize, pollInterval and a where clause can be given <br>
* This operator uses static partitioning to arrive at range queries to
* idempotent reads<br>
* This operator will create a configured number of non-polling static
* partitions for fetching the existing data in the table. And an additional
* single partition for polling additive data. Assumption is that there is an
* ordered unique column using which range queries can be formed<br>
*
* Only newly added data will be fetched by the polling jdbc partition, also
* assumption is rows won't be added or deleted in middle during scan.
*
* The operator uses jOOQ to build the SQL queries based on the discovered {@link org.jooq.SQLDialect}.
* Note that some of the dialects (including Oracle) are only available in commercial
* jOOQ distributions. If the dialect is not available, a generic translation is applied,
* you can post-process the generated SQL by overriding {@link #buildRangeQuery(Object, int, int)}.
*
* @displayName Jdbc Polling Input Operator
* @category Input
* @tags database, sql, jdbc, partitionable, idempotent, pollable
*
* @since 3.5.0
*/
@Evolving
@OperatorAnnotation(checkpointableWithinAppWindow = false)
public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements
ActivationListener<OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>>
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024;
private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
private static int DEFAULT_FETCH_SIZE = 20000;
private static int DEFAULT_BATCH_SIZE = 2000;
private static int DEFAULT_SLEEP_TIME = 100;
private static int DEFAULT_RESULT_LIMIT = 100000;
private int pollInterval = DEFAULT_POLL_INTERVAL; //in milliseconds
private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
private int fetchSize = DEFAULT_FETCH_SIZE;
/**
* Parameter to limit the number of results to fetch in one query by the Poller partition.
*/
private int resultLimit = DEFAULT_RESULT_LIMIT;
@Min(0)
private int partitionCount = 0;
private int batchSize = DEFAULT_BATCH_SIZE;
@NotNull
private String tableName;
private String columnsExpression;
@NotNull
private String key;
private String whereCondition = null;
private long currentWindowId;
private WindowDataManager windowManager;
protected WindowData currentWindowRecoveryState;
private boolean rebaseOffset;
protected KeyValPair<Integer, Integer> rangeQueryPair;
protected Integer lastEmittedRow;
protected transient DSLContext dslContext;
private transient volatile boolean execute;
private transient ScheduledExecutorService scanService;
private transient ScheduledFuture<?> pollFuture;
protected transient LinkedBlockingQueue<T> emitQueue;
protected transient PreparedStatement ps;
protected boolean isPollerPartition;
private transient int lastOffset;
private transient Object prevKey;
private transient Object lastKey;
/**
* The candidate key/offset pair identified by the poller thread
* that, once emitted, can be used to rebase the lower bound for subsequent queries.
*/
private transient AtomicReference<MutablePair<Object, Integer>> fetchedKeyAndOffset = new AtomicReference<>();
/**
* Signal to the fetch thread to rebase query.
*/
private transient AtomicBoolean adjustKeyAndOffset = new AtomicBoolean();
public AbstractJdbcPollInputOperator()
{
currentWindowRecoveryState = new WindowData();
windowManager = new FSWindowDataManager();
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
dslContext = createDSLContext();
if (scanService == null) {
scanService = Executors.newScheduledThreadPool(1);
}
execute = true;
emitQueue = new LinkedBlockingQueue<>(queueCapacity);
windowManager.setup(context);
}
protected DSLContext createDSLContext()
{
return DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
}
@Override
public void activate(OperatorContext context)
{
long largestRecoveryWindow = windowManager.getLargestCompletedWindow();
if (largestRecoveryWindow == Stateless.WINDOW_ID
|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
initializePreparedStatement();
schedulePollTask();
}
}
private class DBPoller implements Runnable
{
@Override
public void run()
{
pollRecords();
}
}
private void schedulePollTask()
{
if (isPollerPartition) {
pollFuture = scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
} else {
LOG.debug("Scheduling for one time execution.");
pollFuture = scanService.schedule(new DBPoller(), 0, TimeUnit.MILLISECONDS);
}
}
protected void initializePreparedStatement()
{
try {
if (currentWindowRecoveryState.lowerBound == 0 && currentWindowRecoveryState.key == null) {
lastOffset = rangeQueryPair.getKey();
} else {
lastOffset = currentWindowRecoveryState.lowerBound;
lastKey = currentWindowRecoveryState.key;
}
if (!isPollerPartition) {
ps = store.getConnection().prepareStatement(
buildRangeQuery(null, lastOffset, (rangeQueryPair.getValue() - lastOffset)),
TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void beginWindow(long windowId)
{
currentWindowId = windowId;
if (currentWindowId <= windowManager.getLargestCompletedWindow()) {
try {
replay(currentWindowId);
return;
} catch (SQLException e) {
throw new RuntimeException("Replay failed", e);
}
}
currentWindowRecoveryState = WindowData.of(currentWindowRecoveryState.key, lastEmittedRow, 0);
if (isPollerPartition) {
MutablePair<Object, Integer> keyOffset = fetchedKeyAndOffset.get();
if (keyOffset != null && keyOffset.getRight() < lastEmittedRow) {
if (!adjustKeyAndOffset.get()) {
// rebase offset
lastEmittedRow -= keyOffset.getRight();
currentWindowRecoveryState.lowerBound = lastEmittedRow;
currentWindowRecoveryState.key = keyOffset.getLeft();
adjustKeyAndOffset.set(true);
}
}
}
}
@Override
public void emitTuples()
{
if (currentWindowId <= windowManager.getLargestCompletedWindow()) {
return;
}
int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
while (pollSize-- > 0) {
T obj = emitQueue.poll();
if (obj != null) {
emitTuple(obj);
}
lastEmittedRow++;
}
}
protected abstract void emitTuple(T tuple);
/**
* Visible to subclasses to allow for custom offset saving and initialization.
*/
protected static class WindowData
{
// members visible for access in subclasses of poll operator
public Object key;
public int lowerBound;
public int upperBound;
public static WindowData of(Object key, int lowerBound, int upperBound)
{
WindowData wd = new WindowData();
wd.key = key;
wd.lowerBound = lowerBound;
wd.upperBound = upperBound;
return wd;
}
}
@Override
public void endWindow()
{
if (pollFuture != null && (pollFuture.isCancelled() || pollFuture.isDone())) {
try {
pollFuture.get();
} catch (Exception e) {
throw new RuntimeException("JDBC thread failed", e);
}
if (isPollerPartition) {
throw new IllegalStateException("poller task terminated");
} else {
// exit static query partition
BaseOperator.shutdown();
}
}
try {
if (currentWindowId > windowManager.getLargestCompletedWindow()) {
currentWindowRecoveryState.upperBound = lastEmittedRow;
windowManager.save(currentWindowRecoveryState, currentWindowId);
}
} catch (IOException e) {
throw new RuntimeException("saving recovery", e);
}
}
@Override
public void deactivate()
{
execute = false;
scanService.shutdownNow();
store.disconnect();
}
protected Object extractKey(ResultSet rs) throws SQLException
{
return rs.getObject(this.key);
}
/**
* Execute the query and transfer results to the emit queue.
* @param preparedStatement PreparedStatement to execute the query and fetch results.
*/
protected int insertDbDataInQueue(PreparedStatement preparedStatement) throws SQLException, InterruptedException
{
int resultCount = 0;
preparedStatement.setFetchSize(getFetchSize());
ResultSet result = preparedStatement.executeQuery();
while (execute && result.next()) {
T obj = getTuple(result);
if (obj == null) {
continue;
}
while (execute && !emitQueue.offer(obj)) {
Thread.sleep(DEFAULT_SLEEP_TIME);
}
if (isPollerPartition && rebaseOffset) {
if (prevKey == null) {
prevKey = extractKey(result);
} else if (this.fetchedKeyAndOffset.get() == null) {
// track key change
Object nextKey = extractKey(result);
if (!nextKey.equals(prevKey)) {
// new key, ready for rebase (WHERE key > ?)
fetchedKeyAndOffset.set(new MutablePair<>(prevKey, lastOffset + resultCount));
}
}
}
resultCount++;
}
result.close();
preparedStatement.close();
return resultCount;
}
/**
* Fetch results from JDBC and transfer to queue.
*/
protected void pollRecords()
{
try {
if (isPollerPartition) {
if (adjustKeyAndOffset.get()) {
LOG.debug("lastOffset {} lastKey {} rebase {}", lastOffset, lastKey, fetchedKeyAndOffset.get());
lastOffset -= fetchedKeyAndOffset.get().getRight();
lastKey = fetchedKeyAndOffset.get().getLeft();
prevKey = null;
fetchedKeyAndOffset.set(null);
adjustKeyAndOffset.set(false);
}
int count = getRecordsCount(lastKey);
LOG.debug("Poll count {}", count);
while (lastOffset < count) {
PreparedStatement preparedStatement = store.getConnection().prepareStatement(buildRangeQuery(lastKey, lastOffset, resultLimit),
TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
lastOffset += insertDbDataInQueue(preparedStatement);
}
} else {
insertDbDataInQueue(ps);
}
} catch (SQLException | InterruptedException ex) {
throw new RuntimeException(ex);
} finally {
if (!isPollerPartition) {
LOG.debug("fetched all records, marking complete.");
execute = false;
}
}
}
public abstract T getTuple(ResultSet result);
protected void replay(long windowId) throws SQLException
{
try {
WindowData wd = (WindowData)windowManager.retrieve(windowId);
if (wd != null && wd.upperBound - wd.lowerBound > 0) {
LOG.debug("[Recovering Window ID - {} for key: {} record range: {}, {}]", windowId, wd.key,
wd.lowerBound, wd.upperBound);
ps = store.getConnection().prepareStatement(
buildRangeQuery(wd.key, wd.lowerBound, (wd.upperBound - wd.lowerBound)), TYPE_FORWARD_ONLY,
CONCUR_READ_ONLY);
LOG.info("Query formed to recover data - {}", ps.toString());
emitReplayedTuples(ps);
}
if (currentWindowId == windowManager.getLargestCompletedWindow()) {
currentWindowRecoveryState = WindowData.of(wd.key, wd.upperBound, wd.upperBound);
initializePreparedStatement();
schedulePollTask();
}
} catch (IOException e) {
throw new RuntimeException("Exception during replay of records.", e);
}
}
/**
* Replays the tuples in sync mode for replayed windows
*/
public void emitReplayedTuples(PreparedStatement ps)
{
ResultSet rs = null;
try (PreparedStatement pStat = ps;) {
pStat.setFetchSize(getFetchSize());
rs = pStat.executeQuery();
if (rs == null || rs.isClosed()) {
return;
}
while (rs.next()) {
emitTuple(getTuple(rs));
lastEmittedRow++;
}
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
/**
* Uses a static partitioning scheme to initialize operator partitions with
* non-overlapping key ranges to read In addition to 'n' partitions, 'n+1'
* partition is a polling partition which reads the records beyond the given
* range
*/
@Override
public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions(
Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, PartitioningContext context)
{
List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<>(
getPartitionCount());
final List<KeyValPair<Integer, Integer>> partitionRanges;
try {
store.connect();
dslContext = createDSLContext();
partitionRanges = getPartitionedQueryRanges(getPartitionCount());
} catch (SQLException e) {
LOG.error("Exception in initializing the partition range", e);
throw new RuntimeException(e);
} finally {
store.disconnect();
}
KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
int pollOffset = 0;
// The n given partitions are for range queries and n + 1 partition is for polling query
for (KeyValPair<Integer, Integer> range : partitionRanges) {
AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
jdbcPoller.rangeQueryPair = range;
jdbcPoller.lastEmittedRow = range.getKey();
jdbcPoller.isPollerPartition = false;
newPartitions.add(new DefaultPartition<>(jdbcPoller));
pollOffset = range.getValue();
}
// The upper bound for the n+1 partition is set to null since its a pollable partition
AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
jdbcPoller.rangeQueryPair = new KeyValPair<>(pollOffset, null);
jdbcPoller.lastEmittedRow = pollOffset;
jdbcPoller.isPollerPartition = true;
newPartitions.add(new DefaultPartition<>(jdbcPoller));
return newPartitions;
}
@Override
public void partitioned(
Map<Integer, com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions)
{
// Nothing to implement here
}
private List<KeyValPair<Integer, Integer>> getPartitionedQueryRanges(int partitions)
throws SQLException
{
if (partitions == 0) {
return new ArrayList<>(0);
}
int rowCount = 0;
try {
rowCount = getRecordsCount(null);
} catch (SQLException e) {
LOG.error("Exception in getting the record range", e);
}
List<KeyValPair<Integer, Integer>> partitionToQueryList = new ArrayList<>();
int events = (rowCount / partitions);
for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) {
partitionToQueryList.add(new KeyValPair<>(lowerOffset, upperOffset));
}
partitionToQueryList.add(new KeyValPair<>(events * (partitions - 1), rowCount));
LOG.info("Partition ranges - " + partitionToQueryList.toString());
return partitionToQueryList;
}
protected Condition andLowerBoundKeyCondition(Condition c, Object lowerBoundKey)
{
return c.and(this.key + " > ?", lowerBoundKey);
}
/**
* Finds the total number of rows in the table
*
* @return number of records in table
*/
private int getRecordsCount(Object lowerBoundKey) throws SQLException
{
Condition condition = DSL.trueCondition();
if (getWhereCondition() != null) {
condition = condition.and(getWhereCondition());
}
if (isPollerPartition && lowerBoundKey != null) {
condition = andLowerBoundKeyCondition(condition, lowerBoundKey);
}
int recordsCount = dslContext.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class);
return recordsCount;
}
/**
* Helper function returns a range query based on the bounds passed<br>
*/
protected String buildRangeQuery(Object lowerBoundKey, int offset, int limit)
{
Condition condition = DSL.trueCondition();
if (getWhereCondition() != null) {
condition = condition.and(getWhereCondition());
}
if (isPollerPartition && lowerBoundKey != null) {
condition = andLowerBoundKeyCondition(condition, lowerBoundKey);
}
String sqlQuery;
if (getColumnsExpression() != null) {
Collection<Field<?>> columns = new ArrayList<>();
for (String column : getColumnsExpression().split(",")) {
columns.add(field(column));
}
sqlQuery = dslContext.select(columns).from(getTableName()).where(condition)
.orderBy(field(getKey())).limit(limit).offset(offset).getSQL(ParamType.INLINED);
} else {
sqlQuery = dslContext.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit)
.offset(offset).getSQL(ParamType.INLINED);
}
LOG.info("DSL Query: " + sqlQuery);
return sqlQuery;
}
@VisibleForTesting
protected void setScheduledExecutorService(ScheduledExecutorService service)
{
scanService = service;
}
/**
* Gets {@link WindowDataManager}
*
* @return windowDatatManager
*/
public WindowDataManager getWindowManager()
{
return windowManager;
}
/**
* Sets {@link WindowDataManager}
*
* @param windowDataManager
*/
public void setWindowManager(WindowDataManager windowDataManager)
{
this.windowManager = windowDataManager;
}
/**
* Gets non-polling static partitions count
*
* @return partitionCount
*/
public int getPartitionCount()
{
return partitionCount;
}
/**
* Sets non-polling static partitions count.<p>
* When set to 0, the operator will run in poll mode only.
*
* @param partitionCount
*/
public void setPartitionCount(int partitionCount)
{
this.partitionCount = partitionCount;
}
/**
* Returns the where clause
*
* @return whereCondition
*/
public String getWhereCondition()
{
return whereCondition;
}
/**
* Sets the where clause
*
* @param whereCondition
*/
public void setWhereCondition(String whereCondition)
{
this.whereCondition = whereCondition;
}
/**
* Returns the list of columns to select from the table
*
* @return columnsExpression
*/
public String getColumnsExpression()
{
return columnsExpression;
}
/**
* Comma separated list of columns to select from the given table
*
* @param columnsExpression
*/
public void setColumnsExpression(String columnsExpression)
{
this.columnsExpression = columnsExpression;
}
/**
* Returns the fetchsize for getting the results
*
* @return fetchSize
*/
public int getFetchSize()
{
return fetchSize;
}
/**
* Sets the fetchsize for getting the results
*
* @param fetchSize
*/
public void setFetchSize(int fetchSize)
{
this.fetchSize = fetchSize;
}
/**
* Returns the interval for polling the DB
*
* @return pollInterval
*/
public int getPollInterval()
{
return pollInterval;
}
/**
* Sets the interval for polling the DB
*
* @param pollInterval
*/
public void setPollInterval(int pollInterval)
{
this.pollInterval = pollInterval;
}
/**
* Returns the capacity of the emit queue
*
* @return queueCapacity
*/
public int getQueueCapacity()
{
return queueCapacity;
}
/**
* Sets the capacity of the emit queue
*
* @param queueCapacity
*/
public void setQueueCapacity(int queueCapacity)
{
this.queueCapacity = queueCapacity;
}
/**
* Returns the tableName which would be queried
*
* @return tableName
*/
public String getTableName()
{
return tableName;
}
/**
* Sets the tableName to query
*
* @param tableName
*/
public void setTableName(String tableName)
{
this.tableName = tableName;
}
/**
* Returns batchSize indicating the number of elements to emit in a bacth
*
* @return batchSize
*/
public int getBatchSize()
{
return batchSize;
}
/**
* Sets batchSize for number of elements to emit in a bacth
*
* @param batchSize
*/
public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}
/**
* Sets primary key column name
*
* @return key
*/
public String getKey()
{
return key;
}
/**
* gets primary key column name
*
* @param key
*/
public void setKey(String key)
{
this.key = key;
}
/**
* gets the Result Limit size, parameter to limit the number of results
* to fetch in one query by the Poller partition.
*/
public int getResultLimit()
{
return resultLimit;
}
/**
* Sets the
* @param resultLimit Parameter to limit the number of results to fetch in one query by the Poller partition.
*/
public void setResultLimit(int resultLimit)
{
this.resultLimit = resultLimit;
}
public boolean isRebaseOffset()
{
return rebaseOffset;
}
/**
* Whether the query should automatically be augmented with a WHERE
* condition for trailing lower bound key value.
* <p>
* Rebase allows the operator to poll from tables where old data is
* periodically purged. Without it, the default zero based row offset would
* lead to missed data. The trailing floor is also more efficient when working
* with key partitioned sources as the query planner can skip those partitions
* below the base key.
*/
public void setRebaseOffset(boolean rebaseOffset)
{
this.rebaseOffset = rebaseOffset;
}
}