blob: bd6962c0f97a5c89b415a8f0be4272e8c6483134 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.db.JdbcCommon;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractExecuteSQL extends AbstractProcessor {
public static final String RESULT_ROW_COUNT = "executesql.row.count";
public static final String RESULT_QUERY_DURATION = "executesql.query.duration";
public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime";
public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime";
public static final String RESULTSET_INDEX = "executesql.resultset.index";
public static final String RESULT_ERROR_MESSAGE = "executesql.error.message";
public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Successfully created FlowFile from SQL query result set.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
.build();
protected Set<Relationship> relationships;
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
.required(true)
.identifiesControllerService(DBCPService.class)
.build();
public static final PropertyDescriptor SQL_PRE_QUERY = new PropertyDescriptor.Builder()
.name("sql-pre-query")
.displayName("SQL Pre-Query")
.description("A semicolon-delimited list of queries executed before the main SQL query is executed. " +
"For example, set session properties before main query. " +
"It's possible to include semicolons in the statements themselves by escaping them with a backslash ('\\;'). " +
"Results/outputs from these queries will be suppressed if there are no errors.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("SQL select query")
.description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes "
+ "using Expression Language. If this property is specified, it will be used regardless of the content of "
+ "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected "
+ "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression "
+ "Language is not evaluated for flow file contents.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SQL_POST_QUERY = new PropertyDescriptor.Builder()
.name("sql-post-query")
.displayName("SQL Post-Query")
.description("A semicolon-delimited list of queries executed after the main SQL query is executed. " +
"Example like setting session properties after main query. " +
"It's possible to include semicolons in the statements themselves by escaping them with a backslash ('\\;'). " +
"Results/outputs from these queries will be suppressed if there are no errors.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.name("esql-max-rows")
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large "
+ "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("esql-output-batch-size")
.displayName("Output Batch Size")
.description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows "
+ "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles "
+ "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will "
+ "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this "
+ "property is set.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("esql-fetch-size")
.displayName("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be "
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
protected List<PropertyDescriptor> propDescriptors;
protected DBCPService dbcpService;
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
}
@OnScheduled
public void setup(ProcessContext context) {
// If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization
if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) {
final String errorString = "Either the Select Query must be specified or there must be an incoming connection "
+ "providing flowfile(s) containing a SQL select query";
getLogger().error(errorString);
throw new ProcessException(errorString);
}
dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
// However, if we have no FlowFile and we have connections coming from other Processors, then
// we know that we should run only if we have a FlowFile.
if (fileToProcess == null && context.hasNonLoopConnection()) {
return;
}
}
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger();
final int queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
List<String> preQueries = getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
List<String> postQueries = getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess);
String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
selectQuery = queryContents.toString();
}
int resultCount = 0;
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
final PreparedStatement st = con.prepareStatement(selectQuery)) {
if (fetchSize != null && fetchSize > 0) {
try {
st.setFetchSize(fetchSize);
} catch (SQLException se) {
// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
}
}
st.setQueryTimeout(queryTimeout); // timeout in seconds
// Execute pre-query, throw exception and cleanup Flow Files if fail
Pair<String,SQLException> failure = executeConfigStatements(con, preQueries);
if (failure != null) {
// In case of failure, assigning config query to "selectQuery" to follow current error handling
selectQuery = failure.getLeft();
throw failure.getRight();
}
if (fileToProcess != null) {
JdbcCommon.setParameters(st, fileToProcess.getAttributes());
}
logger.debug("Executing query {}", new Object[]{selectQuery});
int fragmentIndex = 0;
final String fragmentId = UUID.randomUUID().toString();
final StopWatch executionTime = new StopWatch(true);
boolean hasResults = st.execute();
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
boolean hasUpdateCount = st.getUpdateCount() != -1;
Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
while (hasResults || hasUpdateCount) {
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
if (hasResults) {
final AtomicLong nrOfRows = new AtomicLong(0L);
try {
final ResultSet resultSet = st.getResultSet();
do {
final StopWatch fetchTime = new StopWatch(true);
FlowFile resultSetFF;
if (fileToProcess == null) {
resultSetFF = session.create();
} else {
resultSetFF = session.create(fileToProcess);
}
if (inputFileAttrMap != null) {
resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
}
try {
resultSetFF = session.write(resultSetFF, out -> {
try {
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
} catch (Exception e) {
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
});
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
// set attributes
final Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
if (inputFileUUID != null) {
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
}
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
sqlWriter.updateCounters(session);
// if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes
if (maxRowsPerFlowFile > 0) {
// if row count is zero and this is not the first fragment, drop it instead of committing it.
if (nrOfRows.get() == 0 && fragmentIndex > 0) {
session.remove(resultSetFF);
break;
}
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex));
}
logger.info("{} contains {} records; transferring to 'success'", new Object[]{resultSetFF, nrOfRows.get()});
// Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
if(context.hasIncomingConnection()) {
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
} else {
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
}
resultSetFlowFiles.add(resultSetFF);
// If we've reached the batch size, send out the flow files
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
// Need to remove the original input file if it exists
if (fileToProcess != null) {
session.remove(fileToProcess);
fileToProcess = null;
}
session.commitAsync();
resultSetFlowFiles.clear();
}
fragmentIndex++;
} catch (Exception e) {
// Remove any result set flow file(s) and propagate the exception
session.remove(resultSetFF);
session.remove(resultSetFlowFiles);
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {
throw new ProcessException(e);
}
}
} while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
// If we are splitting results but not outputting batches, set count on all FlowFiles
if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
}
}
} catch (final SQLException e) {
throw new ProcessException(e);
}
resultCount++;
}
// are there anymore result sets?
try {
hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
hasUpdateCount = st.getUpdateCount() != -1;
} catch (SQLException ex) {
hasResults = false;
hasUpdateCount = false;
}
}
// Execute post-query, throw exception and cleanup Flow Files if fail
failure = executeConfigStatements(con, postQueries);
if (failure != null) {
selectQuery = failure.getLeft();
resultSetFlowFiles.forEach(ff -> session.remove(ff));
throw failure.getRight();
}
// Transfer any remaining files to SUCCESS
session.transfer(resultSetFlowFiles, REL_SUCCESS);
resultSetFlowFiles.clear();
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if (fileToProcess != null) {
if (resultCount > 0) {
session.remove(fileToProcess);
} else {
fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0");
fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
session.transfer(fileToProcess, REL_SUCCESS);
}
} else if (resultCount == 0) {
//If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
// Then generate an empty Output FlowFile
FlowFile resultSetFF = session.create();
resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger()));
resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0");
resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
session.transfer(resultSetFF, REL_SUCCESS);
}
} catch (final ProcessException | SQLException e) {
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if (fileToProcess == null) {
// This can happen if any exceptions occur while setting up the connection, statement, etc.
logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure",
new Object[]{selectQuery, e});
context.yield();
} else {
if (context.hasIncomingConnection()) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure",
new Object[]{selectQuery, fileToProcess, e});
fileToProcess = session.penalize(fileToProcess);
} else {
logger.error("Unable to execute SQL select query {} due to {}; routing to failure",
new Object[]{selectQuery, e});
context.yield();
}
session.putAttribute(fileToProcess,RESULT_ERROR_MESSAGE,e.getMessage());
session.transfer(fileToProcess, REL_FAILURE);
}
}
}
/*
* Executes given queries using pre-defined connection.
* Returns null on success, or a query string if failed.
*/
protected Pair<String,SQLException> executeConfigStatements(final Connection con, final List<String> configQueries){
if (configQueries == null || configQueries.isEmpty()) {
return null;
}
for (String confSQL : configQueries) {
try(final Statement st = con.createStatement()){
st.execute(confSQL);
} catch (SQLException e) {
return Pair.of(confSQL, e);
}
}
return null;
}
/*
* Extract list of queries from config property
*/
protected List<String> getQueries(final String value) {
if (value == null || value.length() == 0 || value.trim().length() == 0) {
return null;
}
final List<String> queries = new LinkedList<>();
for (String query : value.split("(?<!\\\\);")) {
query = query.replaceAll("\\\\;", ";");
if (query.trim().length() > 0) {
queries.add(query.trim());
}
}
return queries;
}
protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess);
}