blob: 76eeaae0a39e954a7be2a361a15b203daa06d466 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
import org.apache.nifi.processors.standard.sql.SqlWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.db.JdbcCommon;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
import static org.apache.nifi.util.db.JdbcProperties.USE_AVRO_LOGICAL_TYPES;
@EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database", "record"})
@CapabilityDescription("Executes provided SQL select query. Query result will be converted to the format specified by a Record Writer. "
+ "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on "
+ "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the "
+ "select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. "
+ "FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
@ReadsAttributes({
@ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer "
+ "that represents the JDBC Type of the parameter."),
@ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as "
+ "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."),
@ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. "
+ "Incoming FlowFiles are expected to be parametrized SQL statements. In some cases "
+ "a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - "
+ "ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. "
+ "base64: the string is a Base64 encoded string that can be decoded to bytes. "
+ "hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. "
+ "Dates/Times/Timestamps - "
+ "Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+ "as specified according to java.time.format.DateTimeFormatter. "
+ "If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in "
+ "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), "
+ "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
})
@WritesAttributes({
@WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"),
@WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"),
@WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"),
@WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"),
@WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, "
+ "the zero based index of this result set."),
@WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes "
+ "an Exception, the Flow File is routed to failure and this attribute is set to the exception message."),
@WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this "
+ "attribute will not be populated."),
@WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced"),
@WritesAttribute(attribute = "input.flowfile.uuid", description = "If the processor has an incoming connection, outgoing FlowFiles will have this attribute "
+ "set to the value of the input FlowFile's UUID. If there is no incoming connection, the attribute will not be added."),
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
@WritesAttribute(attribute = "record.count", description = "The number of records output by the Record Writer.")
})
public class ExecuteSQLRecord extends AbstractExecuteSQL {
public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
.name("esqlrecord-record-writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
+ "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
public static final PropertyDescriptor NORMALIZE_NAMES = new PropertyDescriptor.Builder()
.name("esqlrecord-normalize")
.displayName("Normalize Table/Column Names")
.description("Whether to change characters in column names. For example, colons and periods will be changed to underscores.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public ExecuteSQLRecord() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
r.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(DBCP_SERVICE);
pds.add(SQL_PRE_QUERY);
pds.add(SQL_SELECT_QUERY);
pds.add(SQL_POST_QUERY);
pds.add(QUERY_TIMEOUT);
pds.add(RECORD_WRITER_FACTORY);
pds.add(NORMALIZE_NAMES);
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE);
pds.add(MAX_ROWS_PER_FLOW_FILE);
pds.add(OUTPUT_BATCH_SIZE);
pds.add(FETCH_SIZE);
propDescriptors = Collections.unmodifiableList(pds);
}
@Override
protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) {
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions(fileToProcess).asInteger();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
return new RecordSqlWriter(recordSetWriterFactory, options, maxRowsPerFlowFile, fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
}
}