blob: f9fb714d5ab3c68fc7f09f6b06272aaaed8c142d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.db.jdbc;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.SelectField;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
import org.jooq.tools.jdbc.JDBCUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.google.common.annotations.VisibleForTesting;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.DTThrowable;
import static java.sql.ResultSet.CONCUR_READ_ONLY;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
import static org.jooq.impl.DSL.field;
/**
* Abstract operator for consuming data using JDBC interface<br>
* User needs to provide tableName, dbConnection, columnsExpression, look-up key<br>
* Optionally batchSize, pollInterval and a where clause can be given <br>
* This operator uses static partitioning to arrive at range queries to
* idempotent reads<br>
* This operator will create a configured number of non-polling static
* partitions for fetching the existing data in the table. And an additional
* single partition for polling additive data. Assumption is that there is an
* ordered unique column using which range queries can be formed<br>
*
* Only newly added data will be fetched by the polling jdbc partition, also
* assumption is rows won't be added or deleted in middle during scan.
*
*
* @displayName Jdbc Polling Input Operator
* @category Input
* @tags database, sql, jdbc, partitionable, idepotent, pollable
*
* @since 3.5.0
*/
@Evolving
@OperatorAnnotation(checkpointableWithinAppWindow = false)
public abstract class AbstractJdbcPollInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> implements
ActivationListener<OperatorContext>, Partitioner<AbstractJdbcPollInputOperator<T>>
{
private static int DEFAULT_QUEUE_CAPACITY = 4 * 1024;
private static int DEFAULT_POLL_INTERVAL = 10 * 1000;
private static int DEFAULT_FETCH_SIZE = 20000;
private static int DEFAULT_BATCH_SIZE = 2000;
private static int DEFAULT_SLEEP_TIME = 100;
private int pollInterval = DEFAULT_POLL_INTERVAL; //in miliseconds
private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
private int fetchSize = DEFAULT_FETCH_SIZE;
@Min(1)
private int partitionCount = 1;
private int batchSize = DEFAULT_BATCH_SIZE;
@NotNull
private String tableName;
@NotNull
private String columnsExpression;
@NotNull
private String key;
private String whereCondition = null;
private long currentWindowId;
private WindowDataManager windowManager;
protected KeyValPair<Integer, Integer> rangeQueryPair;
protected Integer lowerBound;
protected Integer lastEmittedRow;
private transient int operatorId;
private transient DSLContext create;
private transient volatile boolean execute;
private transient ScheduledExecutorService scanService;
private transient AtomicReference<Throwable> threadException;
protected transient boolean isPolled;
protected transient Integer lastPolledRow;
protected transient LinkedBlockingDeque<T> emitQueue;
protected transient PreparedStatement ps;
protected boolean isPollerPartition;
protected transient MutablePair<Integer, Integer> currentWindowRecoveryState;
public AbstractJdbcPollInputOperator()
{
currentWindowRecoveryState = new MutablePair<>();
windowManager = new FSWindowDataManager();
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
intializeDSLContext();
if (scanService == null) {
scanService = Executors.newScheduledThreadPool(1);
}
execute = true;
emitQueue = new LinkedBlockingDeque<>(queueCapacity);
operatorId = context.getId();
windowManager.setup(context);
}
private void intializeDSLContext()
{
create = DSL.using(store.getConnection(), JDBCUtils.dialect(store.getDatabaseUrl()));
}
@Override
public void activate(OperatorContext context)
{
initializePreparedStatement();
long largestRecoveryWindow = windowManager.getLargestCompletedWindow();
if (largestRecoveryWindow == Stateless.WINDOW_ID
|| context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
}
}
protected void initializePreparedStatement()
{
try {
// If its a range query pass upper and lower bounds, If its a polling query pass only the lower bound
if (isPollerPartition) {
ps = store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(), Integer.MAX_VALUE),
TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
} else {
ps = store.getConnection().prepareStatement(
buildRangeQuery(rangeQueryPair.getKey(), (rangeQueryPair.getValue() - rangeQueryPair.getKey())),
TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
}
} catch (SQLException e) {
LOG.error("Exception in initializing the range query for a given partition", e);
throw new RuntimeException(e);
}
}
@Override
public void beginWindow(long windowId)
{
currentWindowId = windowId;
if (currentWindowId <= windowManager.getLargestCompletedWindow()) {
try {
replay(currentWindowId);
return;
} catch (SQLException e) {
LOG.error("Exception in replayed windows", e);
throw new RuntimeException(e);
}
}
if (isPollerPartition) {
updatePollQuery();
isPolled = false;
}
lowerBound = lastEmittedRow;
}
private void updatePollQuery()
{
if ((lastPolledRow != lastEmittedRow)) {
if (lastEmittedRow == null) {
lastPolledRow = rangeQueryPair.getKey();
} else {
lastPolledRow = lastEmittedRow;
}
try {
ps = store.getConnection().prepareStatement(buildRangeQuery(lastPolledRow, Integer.MAX_VALUE),
TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void emitTuples()
{
if (currentWindowId <= windowManager.getLargestCompletedWindow()) {
return;
}
int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : batchSize;
while (pollSize-- > 0) {
T obj = emitQueue.poll();
if (obj != null) {
emitTuple(obj);
}
lastEmittedRow++;
}
}
protected abstract void emitTuple(T tuple);
@Override
public void endWindow()
{
try {
if (currentWindowId > windowManager.getLargestCompletedWindow()) {
currentWindowRecoveryState = new MutablePair<>(lowerBound, lastEmittedRow);
windowManager.save(currentWindowRecoveryState, currentWindowId);
}
} catch (IOException e) {
throw new RuntimeException("saving recovery", e);
}
if (threadException != null) {
store.disconnect();
DTThrowable.rethrow(threadException.get());
}
}
@Override
public void deactivate()
{
scanService.shutdownNow();
store.disconnect();
}
protected void pollRecords()
{
if (isPolled) {
return;
}
try {
ps.setFetchSize(getFetchSize());
ResultSet result = ps.executeQuery();
if (result.next()) {
do {
while (!emitQueue.offer(getTuple(result))) {
Thread.sleep(DEFAULT_SLEEP_TIME);
}
} while (result.next());
}
isPolled = true;
} catch (SQLException ex) {
execute = false;
threadException = new AtomicReference<Throwable>(ex);
} catch (InterruptedException e) {
threadException = new AtomicReference<Throwable>(e);
} finally {
if (!isPollerPartition) {
store.disconnect();
}
}
}
public abstract T getTuple(ResultSet result);
protected void replay(long windowId) throws SQLException
{
try {
@SuppressWarnings("unchecked")
MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, Integer>)windowManager.retrieve(windowId);
if (recoveredData != null && shouldReplayWindow(recoveredData)) {
LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", windowId, recoveredData.left,
recoveredData.right);
ps = store.getConnection().prepareStatement(
buildRangeQuery(recoveredData.left, (recoveredData.right - recoveredData.left)), TYPE_FORWARD_ONLY,
CONCUR_READ_ONLY);
LOG.info("Query formed to recover data - {}", ps.toString());
emitReplayedTuples(ps);
}
if (currentWindowId == windowManager.getLargestCompletedWindow()) {
try {
if (!isPollerPartition && rangeQueryPair.getValue() != null) {
ps = store.getConnection().prepareStatement(
buildRangeQuery(lastEmittedRow, (rangeQueryPair.getValue() - lastEmittedRow)), TYPE_FORWARD_ONLY,
CONCUR_READ_ONLY);
} else {
Integer bound = null;
if (lastEmittedRow == null) {
bound = rangeQueryPair.getKey();
} else {
bound = lastEmittedRow;
}
ps = store.getConnection().prepareStatement(buildRangeQuery(bound, Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
CONCUR_READ_ONLY);
}
scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, TimeUnit.MILLISECONDS);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
} catch (IOException e) {
throw new RuntimeException("Exception during replay of records.", e);
}
}
private boolean shouldReplayWindow(MutablePair<Integer, Integer> recoveredData)
{
if (recoveredData.left == null || recoveredData.right == null) {
return false;
}
if (recoveredData.right.equals(rangeQueryPair.getValue()) || recoveredData.right.equals(lastEmittedRow)) {
return false;
}
return true;
}
/**
* Replays the tuples in sync mode for replayed windows
*/
public void emitReplayedTuples(PreparedStatement ps)
{
ResultSet rs = null;
try (PreparedStatement pStat = ps;) {
pStat.setFetchSize(getFetchSize());
rs = pStat.executeQuery();
if (rs == null || rs.isClosed()) {
return;
}
while (rs.next()) {
emitTuple(getTuple(rs));
lastEmittedRow++;
}
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
/**
* Uses a static partitioning scheme to initialize operator partitions with
* non-overlapping key ranges to read In addition to 'n' partitions, 'n+1'
* partition is a polling partition which reads the records beyond the given
* range
*/
@Override
public Collection<com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> definePartitions(
Collection<Partition<AbstractJdbcPollInputOperator<T>>> partitions, PartitioningContext context)
{
List<Partition<AbstractJdbcPollInputOperator<T>>> newPartitions = new ArrayList<Partition<AbstractJdbcPollInputOperator<T>>>(
getPartitionCount());
HashMap<Integer, KeyValPair<Integer, Integer>> partitionToRangeMap = null;
try {
store.connect();
intializeDSLContext();
partitionToRangeMap = getPartitionedQueryRangeMap(getPartitionCount());
} catch (SQLException e) {
LOG.error("Exception in initializing the partition range", e);
throw new RuntimeException(e);
} finally {
store.disconnect();
}
KryoCloneUtils<AbstractJdbcPollInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
// The n given partitions are for range queries and n + 1 partition is for polling query
for (int i = 0; i <= getPartitionCount(); i++) {
AbstractJdbcPollInputOperator<T> jdbcPoller = cloneUtils.getClone();
if (i < getPartitionCount()) {
jdbcPoller.rangeQueryPair = partitionToRangeMap.get(i);
jdbcPoller.lastEmittedRow = partitionToRangeMap.get(i).getKey();
jdbcPoller.isPollerPartition = false;
} else {
// The upper bound for the n+1 partition is set to null since its a pollable partition
int partitionKey = partitionToRangeMap.get(i - 1).getValue();
jdbcPoller.rangeQueryPair = new KeyValPair<Integer, Integer>(partitionKey, null);
jdbcPoller.lastEmittedRow = partitionKey;
jdbcPoller.isPollerPartition = true;
}
newPartitions.add(new DefaultPartition<AbstractJdbcPollInputOperator<T>>(jdbcPoller));
}
return newPartitions;
}
@Override
public void partitioned(
Map<Integer, com.datatorrent.api.Partitioner.Partition<AbstractJdbcPollInputOperator<T>>> partitions)
{
// Nothing to implement here
}
private HashMap<Integer, KeyValPair<Integer, Integer>> getPartitionedQueryRangeMap(int partitions)
throws SQLException
{
int rowCount = 0;
try {
rowCount = getRecordsCount();
} catch (SQLException e) {
LOG.error("Exception in getting the record range", e);
}
HashMap<Integer, KeyValPair<Integer, Integer>> partitionToQueryMap = new HashMap<>();
int events = (rowCount / partitions);
for (int i = 0, lowerOffset = 0, upperOffset = events; i < partitions - 1; i++, lowerOffset += events, upperOffset += events) {
partitionToQueryMap.put(i, new KeyValPair<Integer, Integer>(lowerOffset, upperOffset));
}
partitionToQueryMap.put(partitions - 1, new KeyValPair<Integer, Integer>(events * (partitions - 1), (int)rowCount));
LOG.info("Partition map - " + partitionToQueryMap.toString());
return partitionToQueryMap;
}
/**
* Finds the total number of rows in the table
*
* @return number of records in table
*/
private int getRecordsCount() throws SQLException
{
Condition condition = DSL.trueCondition();
if (getWhereCondition() != null) {
condition = condition.and(getWhereCondition());
}
int recordsCount = create.select(DSL.count()).from(getTableName()).where(condition).fetchOne(0, int.class);
return recordsCount;
}
/**
* Helper function returns a range query based on the bounds passed<br>
*/
protected String buildRangeQuery(int offset, int limit)
{
Condition condition = DSL.trueCondition();
if (getWhereCondition() != null) {
condition = condition.and(getWhereCondition());
}
String sqlQuery;
if (getColumnsExpression() != null) {
Collection<Field<?>> columns = new ArrayList<>();
for (String column : getColumnsExpression().split(",")) {
columns.add(field(column));
}
sqlQuery = create.select((Collection<? extends SelectField<?>>)columns).from(getTableName()).where(condition)
.orderBy(field(getKey())).limit(limit).offset(offset).getSQL(ParamType.INLINED);
} else {
sqlQuery = create.select().from(getTableName()).where(condition).orderBy(field(getKey())).limit(limit)
.offset(offset).getSQL(ParamType.INLINED);
}
LOG.info("DSL Query: " + sqlQuery);
return sqlQuery;
}
/**
* This class polls a store that can be queried with a JDBC interface The
* preparedStatement is updated as more rows are read
*/
public class DBPoller implements Runnable
{
@Override
public void run()
{
while (execute) {
if ((isPollerPartition && !isPolled) || !isPollerPartition) {
pollRecords();
}
}
}
}
@VisibleForTesting
protected void setScheduledExecutorService(ScheduledExecutorService service)
{
scanService = service;
}
/**
* Gets {@link WindowDataManager}
*
* @return windowDatatManager
*/
public WindowDataManager getWindowManager()
{
return windowManager;
}
/**
* Sets {@link WindowDataManager}
*
* @param windowDataManager
*/
public void setWindowManager(WindowDataManager windowDataManager)
{
this.windowManager = windowDataManager;
}
/**
* Gets non-polling static partitions count
*
* @return partitionCount
*/
public int getPartitionCount()
{
return partitionCount;
}
/**
* Sets non-polling static partitions count
*
* @param partitionCount
*/
public void setPartitionCount(int partitionCount)
{
this.partitionCount = partitionCount;
}
/**
* Returns the where clause
*
* @return whereCondition
*/
public String getWhereCondition()
{
return whereCondition;
}
/**
* Sets the where clause
*
* @param whereCondition
*/
public void setWhereCondition(String whereCondition)
{
this.whereCondition = whereCondition;
}
/**
* Returns the list of columns to select from the table
*
* @return columnsExpression
*/
public String getColumnsExpression()
{
return columnsExpression;
}
/**
* Comma separated list of columns to select from the given table
*
* @param columnsExpression
*/
public void setColumnsExpression(String columnsExpression)
{
this.columnsExpression = columnsExpression;
}
/**
* Returns the fetchsize for getting the results
*
* @return fetchSize
*/
public int getFetchSize()
{
return fetchSize;
}
/**
* Sets the fetchsize for getting the results
*
* @param fetchSize
*/
public void setFetchSize(int fetchSize)
{
this.fetchSize = fetchSize;
}
/**
* Returns the interval for polling the DB
*
* @return pollInterval
*/
public int getPollInterval()
{
return pollInterval;
}
/**
* Sets the interval for polling the DB
*
* @param pollInterval
*/
public void setPollInterval(int pollInterval)
{
this.pollInterval = pollInterval;
}
/**
* Returns the capacity of the emit queue
*
* @return queueCapacity
*/
public int getQueueCapacity()
{
return queueCapacity;
}
/**
* Sets the capacity of the emit queue
*
* @param queueCapacity
*/
public void setQueueCapacity(int queueCapacity)
{
this.queueCapacity = queueCapacity;
}
/**
* Returns the tableName which would be queried
*
* @return tableName
*/
public String getTableName()
{
return tableName;
}
/**
* Sets the tableName to query
*
* @param tableName
*/
public void setTableName(String tableName)
{
this.tableName = tableName;
}
/**
* Returns batchSize indicating the number of elements to emit in a bacth
*
* @return batchSize
*/
public int getBatchSize()
{
return batchSize;
}
/**
* Sets batchSize for number of elements to emit in a bacth
*
* @param batchSize
*/
public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}
/**
* Sets primary key column name
*
* @return key
*/
public String getKey()
{
return key;
}
/**
* gets primary key column name
*
* @param key
*/
public void setKey(String key)
{
this.key = key;
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcPollInputOperator.class);
}