blob: df22e6f4a4189603fe4cd7bfdf568573f85da74e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.db.jdbc;
import java.sql.BatchUpdateException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
/**
* This is the base class implementation of a transactionable JDBC output operator. 
* Subclasses should implement the method which provides the insertion command.
* <p>
* This operator creates a transaction at the start of window, executes batches of sql updates,
* and closes the transaction at the end of the window. Each tuple corresponds to an SQL update statement.
* The operator groups the updates in a batch and submits them with one call to the database. Batch processing
* improves performance considerably.<br/>
* The size of a batch is configured by batchSize property.
* </p>
* <p>
* The tuples in a window are stored in check-pointed collection which is cleared in the endWindow().
* This is needed for the recovery. The operator writes a tuple exactly once in the database, which is why
* only when all the updates are executed, the transaction is committed in the end window call.
* </p>
* @displayName Abstract JDBC Transactionable Output
* @category Output
* @tags transactional
*
* @param <T> type of tuple
* @since 0.9.4
*/
public abstract class AbstractJdbcTransactionableOutputOperator<T>
extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore>
implements Operator.ActivationListener<Context.OperatorContext>
{
protected static int DEFAULT_BATCH_SIZE = 1000;
@Min(1)
private int batchSize;
protected final List<T> tuples;
protected transient int batchStartIdx;
private transient PreparedStatement updateCommand;
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<T> error = new DefaultOutputPort<>();
@AutoMetric
private int tuplesWrittenSuccessfully;
@AutoMetric
private int errorTuples;
public AbstractJdbcTransactionableOutputOperator()
{
tuples = Lists.newArrayList();
batchSize = DEFAULT_BATCH_SIZE;
batchStartIdx = 0;
store = new JdbcTransactionalStore();
}
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
}
@Override
public void activate(OperatorContext context)
{
try {
updateCommand = store.connection.prepareStatement(getUpdateCommand());
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
tuplesWrittenSuccessfully = 0;
errorTuples = 0;
}
@Override
public void endWindow()
{
if (tuples.size() - batchStartIdx > 0) {
processBatch();
}
super.endWindow();
tuples.clear();
batchStartIdx = 0;
}
@Override
public void deactivate()
{
}
@Override
public void processTuple(T tuple)
{
tuples.add(tuple);
if ((tuples.size() - batchStartIdx) >= batchSize) {
processBatch();
}
}
protected void processBatch()
{
logger.debug("start {} end {}", batchStartIdx, tuples.size());
try {
for (int i = batchStartIdx; i < tuples.size(); i++) {
setStatementParameters(updateCommand, tuples.get(i));
updateCommand.addBatch();
}
updateCommand.executeBatch();
updateCommand.clearBatch();
batchStartIdx += tuples.size() - batchStartIdx;
} catch (BatchUpdateException bue) {
logger.error(bue.getMessage());
processUpdateCounts(bue.getUpdateCounts(), tuples.size() - batchStartIdx);
} catch (SQLException e) {
throw new RuntimeException("processing batch", e);
}
}
/**
* Identify which commands in the batch failed and redirect these on the error port.
* See https://docs.oracle.com/javase/7/docs/api/java/sql/BatchUpdateException.html for more details
*
* @param updateCounts
* @param commandsInBatch
*/
protected void processUpdateCounts(int[] updateCounts, int commandsInBatch)
{
if (updateCounts.length < commandsInBatch) {
// Driver chose not to continue processing after failure.
error.emit(tuples.get(updateCounts.length + batchStartIdx));
errorTuples++;
// In this case, updateCounts is the number of successful queries
tuplesWrittenSuccessfully += updateCounts.length;
// Skip the error record
batchStartIdx += updateCounts.length + 1;
// And process the remaining if any
if ((tuples.size() - batchStartIdx) > 0) {
processBatch();
}
} else {
// Driver processed all batch statements in spite of failures.
// Pick out the failures and send on error port.
tuplesWrittenSuccessfully = commandsInBatch;
for (int i = 0; i < commandsInBatch; i++) {
if (updateCounts[i] == Statement.EXECUTE_FAILED) {
error.emit(tuples.get(i + batchStartIdx));
errorTuples++;
tuplesWrittenSuccessfully--;
}
}
}
}
/**
* Sets the size of a batch operation.<br/>
* <b>Default:</b> {@value #DEFAULT_BATCH_SIZE}
*
* @param batchSize size of a batch
*/
public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}
/**
* Gets the statement which insert/update the table in the database.
*
* @return the sql statement to update a tuple in the database.
*/
protected abstract String getUpdateCommand();
/**
* Sets the parameter of the insert/update statement with values from the tuple.
*
* @param statement update statement which was returned by {@link #getUpdateCommand()}
* @param tuple tuple
* @throws SQLException
*/
protected abstract void setStatementParameters(PreparedStatement statement, T tuple) throws SQLException;
public int getTuplesWrittenSuccessfully()
{
return tuplesWrittenSuccessfully;
}
/**
* Setter for metric tuplesWrittenSuccessfully
* @param tuplesWrittenSuccessfully
*/
public void setTuplesWrittenSuccessfully(int tuplesWrittenSuccessfully)
{
this.tuplesWrittenSuccessfully = tuplesWrittenSuccessfully;
}
private static final Logger logger = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class);
}