blob: 2aa39e0fe4151a59deef7638dd8ece8f0a366495 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.db.JdbcCommon;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchProcessor {
public static final String RESULT_TABLENAME = "tablename";
public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
private static AllowableValue TRANSACTION_READ_COMMITTED = new AllowableValue(
private static AllowableValue TRANSACTION_READ_UNCOMMITTED = new AllowableValue(
private static AllowableValue TRANSACTION_REPEATABLE_READ = new AllowableValue(
private static AllowableValue TRANSACTION_NONE = new AllowableValue(
private static AllowableValue TRANSACTION_SERIALIZABLE = new AllowableValue(
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
.displayName("Output Batch Size")
.description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this "
+ "property is set.")
public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
.displayName("Maximum Number of Fragments")
.description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
"This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are "
+ "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.")
public static final PropertyDescriptor TRANS_ISOLATION_LEVEL = new PropertyDescriptor.Builder()
.displayName("Transaction Isolation Level")
.description("This setting will set the transaction isolation level for the database connection for drivers that support this setting")
public Set<Relationship> getRelationships() {
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
public void setup(final ProcessContext context) {
maxValueProperties = getDefaultMaxValueProperties(context, null);
public void stop() {
// Reset the column type map in case properties change
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
// Fetch the column/table info once
if (!setupComplete.get()) {
ProcessSession session = sessionFactory.createSession();
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
: 0;
final Integer transIsolationLevel = context.getProperty(TRANS_ISOLATION_LEVEL).isSet()
? context.getProperty(TRANS_ISOLATION_LEVEL).asInteger()
: null;
SqlWriter sqlWriter = configureSqlWriter(session, context);
final StateMap stateMap;
try {
stateMap = session.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
//If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name,
// but store the new initial max value under the fully-qualified key.
if (statePropertyMap.containsKey(maxPropKey)) {
newMaxPropValue = statePropertyMap.get(maxPropKey);
} else {
newMaxPropValue = maxProp.getValue();
statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? null
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
final StopWatch stopWatch = new StopWatch(true);
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
final Statement st = con.createStatement()) {
if (fetchSize != null && fetchSize > 0) {
try {
} catch (SQLException se) {
// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
if (transIsolationLevel != null) {
String jdbcURL = "DBCPService";
try {
DatabaseMetaData databaseMetaData = con.getMetaData();
if (databaseMetaData != null) {
jdbcURL = databaseMetaData.getURL();
} catch (SQLException se) {
// Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
if (logger.isDebugEnabled()) {
logger.debug("Executing query {}", new Object[] { selectQuery });
try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
int fragmentIndex=0;
// Max values will be updated in the state property map by the callback
final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
while(true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
FlowFile fileToProcess = session.create();
try {
fileToProcess = session.write(fileToProcess, out -> {
try {
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), maxValCollector));
} catch (Exception e) {
throw new ProcessException("Error during database query or conversion of records.", e);
} catch (ProcessException e) {
// Add flowfile to results before rethrowing so it will be removed from session in outer catch
throw e;
if (nrOfRows.get() > 0) {
// set attributes
final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_TABLENAME, tableName);
if(maxRowsPerFlowFile > 0) {
attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
fileToProcess = session.putAllAttributes(fileToProcess, attributesToAdd);
logger.debug("{} contains {} records; transferring to 'success'",
new Object[]{fileToProcess, nrOfRows.get()});
session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
// If we've reached the batch size, send out the flow files
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} else {
// If there were no rows returned, don't send the flowfile
// If no rows and this was first FlowFile, yield
if(fragmentIndex == 0){
if (maxFragments > 0 && fragmentIndex >= maxFragments) {
// If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around
if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
// If we are splitting up the data into flow files, don't loop back around if we've gotten all results
if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) {
// Apply state changes from the Max Value tracker
// Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
if (outputBatchSize == 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
// Add maximum values as attributes
for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
// Get just the column name from the key
String key = entry.getKey();
String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue()));
//set count on all FlowFiles
if (maxRowsPerFlowFile > 0) {
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
} catch (final SQLException e) {
throw e;
session.transfer(resultSetFlowFiles, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
if (!resultSetFlowFiles.isEmpty()) {
} finally {
try {
// Update the state
session.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe});
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames,
String customWhereClause, Map<String, String> stateMap) {
return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap);
protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames,
String customWhereClause, Map<String, String> stateMap) {
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Table name must be specified");
final StringBuilder query;
if (StringUtils.isEmpty(sqlQuery)) {
query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null));
} else {
query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
List<String> whereClauses = new ArrayList<>();
// Check state map for last max values
if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) {
IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
String colName = maxValColumnNames.get(index);
String maxValueKey = getStateKey(tableName, colName, dbAdapter);
String maxValue = stateMap.get(maxValueKey);
if (StringUtils.isEmpty(maxValue)) {
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
maxValue = stateMap.get(colName.toLowerCase());
if (!StringUtils.isEmpty(maxValue)) {
Integer type = columnTypeMap.get(maxValueKey);
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled.
throw new IllegalArgumentException("No column type found for: " + colName);
// Add a condition for the WHERE clause
whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
if (customWhereClause != null) {
whereClauses.add("(" + customWhereClause + ")");
if (!whereClauses.isEmpty()) {
query.append(" WHERE ");
query.append(StringUtils.join(whereClauses, " AND "));
return query.toString();
public class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback {
DatabaseAdapter dbAdapter;
final Map<String, String> newColMap;
final Map<String, String> originalState;
String tableName;
public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) {
this.dbAdapter = dbAdapter;
this.originalState = stateMap;
this.newColMap = new HashMap<>();
this.tableName = tableName;
public void processRow(ResultSet resultSet) throws IOException {
if (resultSet == null) {
try {
// Iterate over the row, check-and-set max values
final ResultSetMetaData meta = resultSet.getMetaData();
final int nrOfColumns = meta.getColumnCount();
if (nrOfColumns > 0) {
for (int i = 1; i <= nrOfColumns; i++) {
String colName = meta.getColumnName(i).toLowerCase();
String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter);
Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey);
// Skip any columns we're not keeping track of or whose value is null
if (type == null || resultSet.getObject(i) == null) {
String maxValueString = newColMap.get(fullyQualifiedMaxValueKey);
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
if (StringUtils.isEmpty(maxValueString)) {
maxValueString = newColMap.get(colName);
String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
if (newMaxValueString != null) {
newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString);
} catch (ParseException | SQLException e) {
throw new IOException(e);
public void applyStateChanges() {
protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context);