blob: cb921e28b9125599220a537c9ab545dcfaff9fdc [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.standard;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParser.Config;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.queryrecord.FlowFileTable;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_PRECISION;
import static org.apache.nifi.util.db.JdbcProperties.DEFAULT_SCALE;
@Tags({"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"})
@CapabilityDescription("Evaluates one or more SQL queries against the contents of a FlowFile. The result of the "
+ "SQL query then becomes the content of the output FlowFile. This can be used, for example, "
+ "for field-specific filtering, transformation, and row-level filtering. "
+ "Columns can be renamed, simple calculations and aggregations performed, etc. "
+ "The Processor is configured with a Record Reader Controller Service and a Record Writer service so as to allow flexibility in incoming and outgoing data formats. "
+ "The Processor must be configured with at least one user-defined property. The name of the Property "
+ "is the Relationship to route data to, and the value of the Property is a SQL SELECT statement that is used to specify how input data should be transformed/filtered. "
+ "The SQL statement must be valid ANSI SQL and is powered by Apache Calcite. "
+ "If the transformation fails, the original FlowFile is routed to the 'failure' relationship. Otherwise, the data selected will be routed to the associated "
+ "relationship. If the Record Writer chooses to inherit the schema from the Record, it is important to note that the schema that is inherited will be from the "
+ "ResultSet, rather than the input Record. This allows a single instance of the QueryRecord processor to have multiple queries, each of which returns a different "
+ "set of columns and aggregations. As a result, though, the schema that is derived will have no schema name, so it is important that the configured Record Writer not attempt "
+ "to write the Schema Name as an attribute if inheriting the Schema from the Record. See the Processor Usage documentation for more information.")
@DynamicRelationship(name="<Property Name>", description="Each user-defined property defines a new Relationship for this Processor.")
@DynamicProperty(name = "The name of the relationship to route data to",
value="A SQL SELECT statement that is used to determine what data should be routed to this relationship.",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description="Each user-defined property specifies a SQL SELECT statement to run over the data, with the data "
+ "that is selected being routed to the relationship whose name is the property name")
@WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
@WritesAttribute(attribute = "record.count", description = "The number of records selected by the query")
public class QueryRecord extends AbstractProcessor {
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing results to a FlowFile")
static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
.displayName("Include Zero Record FlowFiles")
.description("When running the SQL statement against an incoming FlowFile, if the result has no data, "
+ "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
.allowableValues("true", "false")
static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder()
.displayName("Cache Schema")
.description("This property is no longer used. It remains solely for backward compatibility in order to avoid making existing Processors invalid upon upgrade. This property will be" +
" removed in future versions. Now, instead of forcing the user to understand the semantics of schema caching, the Processor caches up to 25 schemas and automatically rolls off the" +
" old schemas. This provides the same performance when caching was enabled previously and in some cases very significant performance improvements if caching was previously disabled.")
.allowableValues("true", "false")
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.description("The original FlowFile is routed to this relationship")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("If a FlowFile fails processing for any reason (for example, the SQL "
+ "statement contains columns not present in input data), the original FlowFile it will "
+ "be routed to this relationship")
private List<PropertyDescriptor> properties;
private final Set<Relationship> relationships = Collections.synchronizedSet(new HashSet<>());
private final Cache<Tuple<String, RecordSchema>, BlockingQueue<CachedStatement>> statementQueues = Caffeine.newBuilder()
protected void init(final ProcessorInitializationContext context) {
try {
DriverManager.registerDriver(new org.apache.calcite.jdbc.Driver());
} catch (final SQLException e) {
throw new ProcessException("Failed to load Calcite JDBC Driver", e);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DEFAULT_SCALE); = Collections.unmodifiableList(properties);
public Set<Relationship> getRelationships() {
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (!descriptor.isDynamic()) {
final Relationship relationship = new Relationship.Builder()
.description("User-defined relationship that specifies where data that matches the specified SQL query should be routed")
if (newValue == null) {
} else {
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("SQL select statement specifies how data should be filtered/transformed. "
+ "SQL SELECT should select from the FLOWFILE table")
.addValidator(new SqlValidator())
public synchronized void cleanup() {
for (final BlockingQueue<CachedStatement> statementQueue : statementQueues.asMap().values()) {
private void onCacheEviction(final Tuple<String, RecordSchema> key, final BlockingQueue<CachedStatement> queue, final RemovalCause cause) {
private void clearQueue(final BlockingQueue<CachedStatement> statementQueue) {
CachedStatement stmt;
while ((stmt = statementQueue.poll()) != null) {
closeQuietly(stmt.getStatement(), stmt.getConnection());
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile original = session.get();
if (original == null) {
final StopWatch stopWatch = new StopWatch(true);
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(original).asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(original).asInteger();
final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>();
final Set<FlowFile> createdFlowFiles = new HashSet<>();
// Determine the Record Reader's schema
final RecordSchema writerSchema;
final RecordSchema readerSchema;
try (final InputStream rawIn = {
final Map<String, String> originalAttributes = original.getAttributes();
final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, original.getSize(), getLogger());
readerSchema = reader.getSchema();
writerSchema = recordSetWriterFactory.getSchema(originalAttributes, readerSchema);
} catch (final Exception e) {
getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
// Determine the schema for writing the data
final Map<String, String> originalAttributes = original.getAttributes();
int recordsRead = 0;
try {
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic()) {
final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
// We have to fork a child because we may need to read the input FlowFile more than once,
// and we cannot call on the original FlowFile while we are within a write
// callback for the original FlowFile.
FlowFile transformed = session.create(original);
boolean flowFileRemoved = false;
try {
final String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue();
final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
final QueryResult queryResult = query(session, original, readerSchema, sql, recordReaderFactory);
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
try {
final ResultSet rs = queryResult.getResultSet();
transformed = session.write(transformed, new OutputStreamCallback() {
public void process(final OutputStream out) throws IOException {
final ResultSetRecordSet recordSet;
final RecordSchema writeSchema;
try {
recordSet = new ResultSetRecordSet(rs, writerSchema, defaultPrecision, defaultScale);
final RecordSchema resultSetSchema = recordSet.getSchema();
writeSchema = recordSetWriterFactory.getSchema(originalAttributes, resultSetSchema);
} catch (final SQLException | SchemaNotFoundException e) {
throw new ProcessException(e);
try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), writeSchema, out, original)) {
} catch (final Exception e) {
throw new IOException(e);
} finally {
recordsRead = Math.max(recordsRead, queryResult.getRecordsRead());
final WriteResult result = writeResultRef.get();
if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()) {
flowFileRemoved = true;
getLogger().info("Transformed {} but the result contained no data so will not pass on a FlowFile", new Object[] {original});
} else {
final Map<String, String> attributesToAdd = new HashMap<>();
if (result.getAttributes() != null) {
attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
transformed = session.putAllAttributes(transformed, attributesToAdd);
transformedFlowFiles.put(transformed, relationship);
session.adjustCounter("Records Written", result.getRecordCount(), false);
} finally {
// Ensure that we have the FlowFile in the set in case we throw any Exception
if (!flowFileRemoved) {
final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
if (transformedFlowFiles.size() > 0) {
session.getProvenanceReporter().fork(original, transformedFlowFiles.keySet(), elapsedMillis);
for (final Map.Entry<FlowFile, Relationship> entry : transformedFlowFiles.entrySet()) {
final FlowFile transformed = entry.getKey();
final Relationship relationship = entry.getValue();
session.getProvenanceReporter().route(transformed, relationship);
session.transfer(transformed, relationship);
getLogger().info("Successfully queried {} in {} millis", new Object[] {original, elapsedMillis});
session.transfer(original, REL_ORIGINAL);
} catch (final SQLException e) {
getLogger().error("Unable to query {} due to {}", new Object[] {original, e.getCause() == null ? e : e.getCause()});
session.transfer(original, REL_FAILURE);
} catch (final Exception e) {
getLogger().error("Unable to query {} due to {}", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
session.adjustCounter("Records Read", recordsRead, false);
private synchronized CachedStatement getStatement(final String sql, final RecordSchema schema, final Supplier<CachedStatement> statementBuilder) {
final Tuple<String, RecordSchema> tuple = new Tuple<>(sql, schema);
final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(tuple, key -> new LinkedBlockingQueue<>());
final CachedStatement cachedStmt = statementQueue.poll();
if (cachedStmt != null) {
return cachedStmt;
return statementBuilder.get();
private CachedStatement buildCachedStatement(final String sql, final ProcessSession session, final FlowFile flowFile, final RecordSchema schema,
final RecordReaderFactory recordReaderFactory) {
final CalciteConnection connection = createConnection();
final SchemaPlus rootSchema = createRootSchema(connection);
final FlowFileTable flowFileTable = new FlowFileTable(session, flowFile, schema, recordReaderFactory, getLogger());
rootSchema.add("FLOWFILE", flowFileTable);
try {
final PreparedStatement stmt = connection.prepareStatement(sql);
return new CachedStatement(stmt, flowFileTable, connection);
} catch (final SQLException e) {
throw new ProcessException(e);
private CalciteConnection createConnection() {
final Properties properties = new Properties();
try {
final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
return calciteConnection;
} catch (final Exception e) {
throw new ProcessException(e);
protected QueryResult query(final ProcessSession session, final FlowFile flowFile, final RecordSchema schema, final String sql, final RecordReaderFactory recordReaderFactory)
throws SQLException {
final Supplier<CachedStatement> statementBuilder = () -> buildCachedStatement(sql, session, flowFile, schema, recordReaderFactory);
final CachedStatement cachedStatement = getStatement(sql, schema, statementBuilder);
final PreparedStatement stmt = cachedStatement.getStatement();
final FlowFileTable table = cachedStatement.getTable();
table.setFlowFile(session, flowFile);
final ResultSet rs;
try {
rs = stmt.executeQuery();
} catch (final Throwable t) {
throw t;
return new QueryResult() {
public void close() throws IOException {
final BlockingQueue<CachedStatement> statementQueue = statementQueues.getIfPresent(new Tuple<>(sql, schema));
if (statementQueue == null || !statementQueue.offer(cachedStatement)) {
try {
} catch (SQLException e) {
throw new IOException("Failed to close statement", e);
public ResultSet getResultSet() {
return rs;
public int getRecordsRead() {
return table.getRecordsRead();
private SchemaPlus createRootSchema(final CalciteConnection calciteConnection) {
final SchemaPlus rootSchema = calciteConnection.getRootSchema();
rootSchema.add("RPATH", ScalarFunctionImpl.create(ObjectRecordPath.class, "eval"));
rootSchema.add("RPATH_STRING", ScalarFunctionImpl.create(StringRecordPath.class, "eval"));
rootSchema.add("RPATH_INT", ScalarFunctionImpl.create(IntegerRecordPath.class, "eval"));
rootSchema.add("RPATH_LONG", ScalarFunctionImpl.create(LongRecordPath.class, "eval"));
rootSchema.add("RPATH_DATE", ScalarFunctionImpl.create(DateRecordPath.class, "eval"));
rootSchema.add("RPATH_DOUBLE", ScalarFunctionImpl.create(DoubleRecordPath.class, "eval"));
rootSchema.add("RPATH_FLOAT", ScalarFunctionImpl.create(FloatRecordPath.class, "eval"));
return rootSchema;
private void closeQuietly(final AutoCloseable... closeables) {
if (closeables == null) {
for (final AutoCloseable closeable : closeables) {
if (closeable == null) {
try {
} catch (final Exception e) {
getLogger().warn("Failed to close SQL resource", e);
private static class SqlValidator implements Validator {
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder()
.explanation("Expression Language Present")
final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
final Config config = SqlParser.configBuilder()
final SqlParser parser = SqlParser.create(substituted, config);
try {
return new ValidationResult.Builder()
} catch (final Exception e) {
return new ValidationResult.Builder()
.explanation("Not a valid SQL Statement: " + e.getMessage())
private interface QueryResult extends Closeable {
ResultSet getResultSet();
int getRecordsRead();
private static class CachedStatement {
private final FlowFileTable table;
private final PreparedStatement statement;
private final Connection connection;
public CachedStatement(final PreparedStatement statement, final FlowFileTable table, final Connection connection) {
this.statement = statement;
this.table = table;
this.connection = connection;
public FlowFileTable getTable() {
return table;
public PreparedStatement getStatement() {
return statement;
public Connection getConnection() {
return connection;
// ------------------------------------------------------------
// User-Defined Functions for Calcite
// ------------------------------------------------------------
public static class ObjectRecordPath extends RecordPathFunction {
private static final RecordField ROOT_RECORD_FIELD = new RecordField("root", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
private static final RecordSchema ROOT_RECORD_SCHEMA = new SimpleRecordSchema(Collections.singletonList(ROOT_RECORD_FIELD));
private static final RecordField PARENT_RECORD_FIELD = new RecordField("root", RecordFieldType.RECORD.getRecordDataType(ROOT_RECORD_SCHEMA));
public Object eval(Object record, String recordPath) {
if (record == null) {
return null;
if (record instanceof Record) {
return eval((Record) record, recordPath);
if (record instanceof Record[]) {
return eval((Record[]) record, recordPath);
if (record instanceof Iterable) {
return eval((Iterable<Record>) record, recordPath);
if (record instanceof Map) {
return eval((Map<?, ?>) record, recordPath);
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against given argument because the argument is of type " + record.getClass() + " instead of Record");
private Object eval(final Map<?, ?> map, final String recordPath) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final Record record = new MapRecord(ROOT_RECORD_SCHEMA, Collections.singletonMap("root", map));
final FieldValue parentFieldValue = new StandardFieldValue(record, PARENT_RECORD_FIELD, null);
final FieldValue fieldValue = new StandardFieldValue(map, ROOT_RECORD_FIELD, parentFieldValue);
final RecordPathResult result = compiled.evaluate(record, fieldValue);
final List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
return evalResults(selectedFields);
private Object eval(final Record record, final String recordPath) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final RecordPathResult result = compiled.evaluate(record);
final List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
return evalResults(selectedFields);
private Object eval(final Iterable<Record> records, final String recordPath) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final List<FieldValue> selectedFields = new ArrayList<>();
for (final Record record : records) {
final RecordPathResult result = compiled.evaluate(record);
return evalResults(selectedFields);
private Object eval(final Record[] records, final String recordPath) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final List<FieldValue> selectedFields = new ArrayList<>();
for (final Record record : records) {
final RecordPathResult result = compiled.evaluate(record);
return evalResults(selectedFields);
private Object evalResults(final List<FieldValue> selectedFields) {
if (selectedFields.isEmpty()) {
return null;
if (selectedFields.size() == 1) {
return selectedFields.get(0).getValue();
public static class StringRecordPath extends RecordPathFunction {
public String eval(Object record, String recordPath) {
return eval(record, recordPath, Object::toString);
public static class IntegerRecordPath extends RecordPathFunction {
public Integer eval(Object record, String recordPath) {
return eval(record, recordPath, val -> {
if (val instanceof Number) {
return ((Number) val).intValue();
if (val instanceof String) {
return Integer.parseInt((String) val);
if (val instanceof Date) {
return (int) ((Date) val).getTime();
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Integer against " + record
+ " because the value returned is of type " + val.getClass());
public static class LongRecordPath extends RecordPathFunction {
public Long eval(Object record, String recordPath) {
return eval(record, recordPath, val -> {
if (val instanceof Number) {
return ((Number) val).longValue();
if (val instanceof String) {
return Long.parseLong((String) val);
if (val instanceof Date) {
return ((Date) val).getTime();
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Long against " + record
+ " because the value returned is of type " + val.getClass());
public static class FloatRecordPath extends RecordPathFunction {
public Float eval(Object record, String recordPath) {
return eval(record, recordPath, val -> {
if (val instanceof Number) {
return ((Number) val).floatValue();
if (val instanceof String) {
return Float.parseFloat((String) val);
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Float against " + record
+ " because the value returned is of type " + val.getClass());
public static class DoubleRecordPath extends RecordPathFunction {
public Double eval(Object record, String recordPath) {
return eval(record, recordPath, val -> {
if (val instanceof Number) {
return ((Number) val).doubleValue();
if (val instanceof String) {
return Double.parseDouble((String) val);
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Double against " + record
+ " because the value returned is of type " + val.getClass());
public static class DateRecordPath extends RecordPathFunction {
// Interestingly, Calcite throws an Exception if the schema indicates a DATE type and we return a java.util.Date. Calcite requires that a Long be returned instead.
public Long eval(Object record, String recordPath) {
return eval(record, recordPath, val -> {
if (val instanceof Number) {
return ((Number) val).longValue();
if (val instanceof String) {
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Date against " + record
+ " because the value returned is of type String. To parse a String value as a Date, please use the toDate function. For example, " +
"SELECT RPATH_DATE( record, 'toDate( /event/timestamp, \"yyyy-MM-dd\" )' ) AS eventDate FROM FLOWFILE");
if (val instanceof Date) {
return ((Date) val).getTime();
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " as Date against " + record
+ " because the value returned is of type " + val.getClass());
public static class RecordPathFunction {
private static final RecordField ROOT_RECORD_FIELD = new RecordField("root", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
private static final RecordSchema ROOT_RECORD_SCHEMA = new SimpleRecordSchema(Collections.singletonList(ROOT_RECORD_FIELD));
private static final RecordField PARENT_RECORD_FIELD = new RecordField("root", RecordFieldType.RECORD.getRecordDataType(ROOT_RECORD_SCHEMA));
protected static final RecordPathCache RECORD_PATH_CACHE = new RecordPathCache(100);
protected <T> T eval(final Object record, final String recordPath, final Function<Object, T> transform) {
if (record == null) {
return null;
try {
if (record instanceof Record) {
return eval((Record) record, recordPath, transform);
} else if (record instanceof Record[]) {
return eval((Record[]) record, recordPath, transform);
} else if (record instanceof Iterable) {
return eval((Iterable<Record>) record, recordPath, transform);
} else if (record instanceof Map) {
return eval((Map<?, ?>) record, recordPath, transform);
} catch (IllegalArgumentException e) {
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against " + record, e);
throw new RuntimeException("Cannot evaluate RecordPath " + recordPath + " against given argument because the argument is of type " + record.getClass() + " instead of Record");
private <T> T eval(final Map<?, ?> map, final String recordPath, final Function<Object, T> transform) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final Record record = new MapRecord(ROOT_RECORD_SCHEMA, Collections.singletonMap("root", map));
final FieldValue parentFieldValue = new StandardFieldValue(record, PARENT_RECORD_FIELD, null);
final FieldValue fieldValue = new StandardFieldValue(map, ROOT_RECORD_FIELD, parentFieldValue);
final RecordPathResult result = compiled.evaluate(record, fieldValue);
return evalResults(result.getSelectedFields(), transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained.");
private <T> T eval(final Record record, final String recordPath, final Function<Object, T> transform) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final RecordPathResult result = compiled.evaluate((Record) record);
return evalResults(result.getSelectedFields(), transform,
() -> "RecordPath " + recordPath + " evaluated against " + record + " resulted in more than one return value. The RecordPath must be further constrained.");
private <T> T eval(final Record[] records, final String recordPath, final Function<Object, T> transform) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final List<FieldValue> selectedFields = new ArrayList<>();
for (final Record record : records) {
final RecordPathResult result = compiled.evaluate(record);
if (selectedFields.isEmpty()) {
return null;
return evalResults(, transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained.");
private <T> T eval(final Iterable<Record> records, final String recordPath, final Function<Object, T> transform) {
final RecordPath compiled = RECORD_PATH_CACHE.getCompiled(recordPath);
final List<FieldValue> selectedFields = new ArrayList<>();
for (final Record record : records) {
final RecordPathResult result = compiled.evaluate(record);
if (selectedFields.isEmpty()) {
return null;
return evalResults(, transform, () -> "RecordPath " + recordPath + " resulted in more than one return value. The RecordPath must be further constrained.");
private <T> T evalResults(final Stream<FieldValue> fields, final Function<Object, T> transform, final Supplier<String> multipleReturnValueErrorSupplier) {
.reduce((a, b) -> {
// Only allow a single value
throw new RuntimeException(multipleReturnValueErrorSupplier.get());