blob: 20c2dbf976be3b208de826a4a2d4797c86fcb7e1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.queryrecord;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.Schema.TableType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.util.Pair;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class FlowFileTable extends AbstractTable implements QueryableTable, TranslatableTable {
private final RecordReaderFactory recordReaderFactory;
private final ComponentLog logger;
private RecordSchema recordSchema;
private RelDataType relDataType = null;
private volatile ProcessSession session;
private volatile FlowFile flowFile;
private volatile int maxRecordsRead;
private final Set<FlowFileEnumerator> enumerators = new HashSet<>();
* Creates a FlowFile table.
public FlowFileTable(final ProcessSession session, final FlowFile flowFile, final RecordSchema schema, final RecordReaderFactory recordReaderFactory, final ComponentLog logger) {
this.session = session;
this.flowFile = flowFile;
this.recordSchema = schema;
this.recordReaderFactory = recordReaderFactory;
this.logger = logger;
public void setFlowFile(final ProcessSession session, final FlowFile flowFile) {
this.session = session;
this.flowFile = flowFile;
this.maxRecordsRead = 0;
public String toString() {
return "FlowFileTable";
public void close() {
synchronized (enumerators) {
for (final FlowFileEnumerator enumerator : enumerators) {
* Returns an enumerable over a given projection of the fields.
* <p>
* Called from generated code.
public Enumerable<Object> project(final int[] fields) {
return new AbstractEnumerable<Object>() {
@SuppressWarnings({"unchecked", "rawtypes"})
public Enumerator<Object> enumerator() {
final FlowFileEnumerator flowFileEnumerator = new FlowFileEnumerator(session, flowFile, logger, recordReaderFactory, fields) {
protected void onFinish() {
final int recordCount = getRecordsRead();
if (recordCount > maxRecordsRead) {
maxRecordsRead = recordCount;
public void close() {
synchronized (enumerators) {
synchronized (enumerators) {
return flowFileEnumerator;
public int getRecordsRead() {
return maxRecordsRead;
public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) {
return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
public Type getElementType() {
return Object[].class;
public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) {
throw new UnsupportedOperationException();
public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) {
// Request all fields.
final int fieldCount = relOptTable.getRowType().getFieldCount();
final int[] fields = new int[fieldCount];
for (int i = 0; i < fieldCount; i++) {
fields[i] = i;
return new FlowFileTableScan(context.getCluster(), relOptTable, this, fields);
public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
if (relDataType != null) {
return relDataType;
final List<String> names = new ArrayList<>();
final List<RelDataType> types = new ArrayList<>();
final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
for (final RecordField field : recordSchema.getFields()) {
final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory);
types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable()));
relDataType = typeFactory.createStructType(, types));
return relDataType;
private RelDataType getRelDataType(final DataType fieldType, final JavaTypeFactory typeFactory) {
switch (fieldType.getFieldType()) {
return typeFactory.createJavaType(boolean.class);
case BYTE:
return typeFactory.createJavaType(byte.class);
case CHAR:
return typeFactory.createJavaType(char.class);
case DATE:
return typeFactory.createJavaType(java.sql.Date.class);
case DOUBLE:
return typeFactory.createJavaType(double.class);
case FLOAT:
return typeFactory.createJavaType(float.class);
case INT:
return typeFactory.createJavaType(int.class);
case SHORT:
return typeFactory.createJavaType(short.class);
case TIME:
return typeFactory.createJavaType(java.sql.Time.class);
return typeFactory.createJavaType(java.sql.Timestamp.class);
case LONG:
return typeFactory.createJavaType(long.class);
case STRING:
return typeFactory.createJavaType(String.class);
case ARRAY:
ArrayDataType array = (ArrayDataType) fieldType;
return typeFactory.createArrayType(getRelDataType(array.getElementType(), typeFactory), -1);
case RECORD:
return typeFactory.createJavaType(Record.class);
case MAP:
return typeFactory.createJavaType(HashMap.class);
case BIGINT:
return typeFactory.createJavaType(BigInteger.class);
return typeFactory.createJavaType(BigDecimal.class);
case CHOICE:
final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);
for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
if (possibleType == widestDataType) {
if (possibleType.getFieldType().isWiderThan(widestDataType.getFieldType())) {
widestDataType = possibleType;
if (widestDataType.getFieldType().isWiderThan(possibleType.getFieldType())) {
// Neither is wider than the other.
widestDataType = null;
// If one of the CHOICE data types is the widest, use it.
if (widestDataType != null) {
return getRelDataType(widestDataType, typeFactory);
// None of the data types is strictly the widest. Check if all data types are numeric.
// This would happen, for instance, if the data type is a choice between float and integer.
// If that is the case, we can use a String type for the table schema because all values will fit
// into a String. This will still allow for casting, etc. if the query requires it.
boolean allNumeric = true;
for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
if (!isNumeric(possibleType)) {
allNumeric = false;
if (allNumeric) {
return typeFactory.createJavaType(String.class);
// There is no specific type that we can use for the schema. This would happen, for instance, if our
// CHOICE is between an integer and a Record.
return typeFactory.createJavaType(Object.class);
throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);
private boolean isNumeric(final DataType dataType) {
switch (dataType.getFieldType()) {
case BIGINT:
case BYTE:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case SHORT:
return true;
return false;
public TableType getJdbcTableType() {
return TableType.TEMPORARY_TABLE;