blob: 20c2dbf976be3b208de826a4a2d4797c86fcb7e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.queryrecord;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.Schema.TableType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.util.Pair;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class FlowFileTable extends AbstractTable implements QueryableTable, TranslatableTable {
private final RecordReaderFactory recordReaderFactory;
private final ComponentLog logger;
private RecordSchema recordSchema;
private RelDataType relDataType = null;
private volatile ProcessSession session;
private volatile FlowFile flowFile;
private volatile int maxRecordsRead;
private final Set<FlowFileEnumerator> enumerators = new HashSet<>();
/**
* Creates a FlowFile table.
*/
public FlowFileTable(final ProcessSession session, final FlowFile flowFile, final RecordSchema schema, final RecordReaderFactory recordReaderFactory, final ComponentLog logger) {
this.session = session;
this.flowFile = flowFile;
this.recordSchema = schema;
this.recordReaderFactory = recordReaderFactory;
this.logger = logger;
}
public void setFlowFile(final ProcessSession session, final FlowFile flowFile) {
this.session = session;
this.flowFile = flowFile;
this.maxRecordsRead = 0;
}
@Override
public String toString() {
return "FlowFileTable";
}
public void close() {
synchronized (enumerators) {
for (final FlowFileEnumerator enumerator : enumerators) {
enumerator.close();
}
}
}
/**
* Returns an enumerable over a given projection of the fields.
*
* <p>
* Called from generated code.
*/
public Enumerable<Object> project(final int[] fields) {
return new AbstractEnumerable<Object>() {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Enumerator<Object> enumerator() {
final FlowFileEnumerator flowFileEnumerator = new FlowFileEnumerator(session, flowFile, logger, recordReaderFactory, fields) {
@Override
protected void onFinish() {
final int recordCount = getRecordsRead();
if (recordCount > maxRecordsRead) {
maxRecordsRead = recordCount;
}
}
@Override
public void close() {
synchronized (enumerators) {
enumerators.remove(this);
}
super.close();
}
};
synchronized (enumerators) {
enumerators.add(flowFileEnumerator);
}
return flowFileEnumerator;
}
};
}
public int getRecordsRead() {
return maxRecordsRead;
}
@Override
@SuppressWarnings("rawtypes")
public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) {
return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
}
@Override
public Type getElementType() {
return Object[].class;
}
@Override
public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) {
throw new UnsupportedOperationException();
}
@Override
public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) {
// Request all fields.
final int fieldCount = relOptTable.getRowType().getFieldCount();
final int[] fields = new int[fieldCount];
for (int i = 0; i < fieldCount; i++) {
fields[i] = i;
}
return new FlowFileTableScan(context.getCluster(), relOptTable, this, fields);
}
@Override
public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
if (relDataType != null) {
return relDataType;
}
final List<String> names = new ArrayList<>();
final List<RelDataType> types = new ArrayList<>();
final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory;
for (final RecordField field : recordSchema.getFields()) {
names.add(field.getFieldName());
final RelDataType relDataType = getRelDataType(field.getDataType(), javaTypeFactory);
types.add(javaTypeFactory.createTypeWithNullability(relDataType, field.isNullable()));
}
relDataType = typeFactory.createStructType(Pair.zip(names, types));
return relDataType;
}
private RelDataType getRelDataType(final DataType fieldType, final JavaTypeFactory typeFactory) {
switch (fieldType.getFieldType()) {
case BOOLEAN:
return typeFactory.createJavaType(boolean.class);
case BYTE:
return typeFactory.createJavaType(byte.class);
case CHAR:
return typeFactory.createJavaType(char.class);
case DATE:
return typeFactory.createJavaType(java.sql.Date.class);
case DOUBLE:
return typeFactory.createJavaType(double.class);
case FLOAT:
return typeFactory.createJavaType(float.class);
case INT:
return typeFactory.createJavaType(int.class);
case SHORT:
return typeFactory.createJavaType(short.class);
case TIME:
return typeFactory.createJavaType(java.sql.Time.class);
case TIMESTAMP:
return typeFactory.createJavaType(java.sql.Timestamp.class);
case LONG:
return typeFactory.createJavaType(long.class);
case STRING:
return typeFactory.createJavaType(String.class);
case ARRAY:
ArrayDataType array = (ArrayDataType) fieldType;
return typeFactory.createArrayType(getRelDataType(array.getElementType(), typeFactory), -1);
case RECORD:
return typeFactory.createJavaType(Record.class);
case MAP:
return typeFactory.createJavaType(HashMap.class);
case BIGINT:
return typeFactory.createJavaType(BigInteger.class);
case DECIMAL:
return typeFactory.createJavaType(BigDecimal.class);
case CHOICE:
final ChoiceDataType choiceDataType = (ChoiceDataType) fieldType;
DataType widestDataType = choiceDataType.getPossibleSubTypes().get(0);
for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
if (possibleType == widestDataType) {
continue;
}
if (possibleType.getFieldType().isWiderThan(widestDataType.getFieldType())) {
widestDataType = possibleType;
continue;
}
if (widestDataType.getFieldType().isWiderThan(possibleType.getFieldType())) {
continue;
}
// Neither is wider than the other.
widestDataType = null;
break;
}
// If one of the CHOICE data types is the widest, use it.
if (widestDataType != null) {
return getRelDataType(widestDataType, typeFactory);
}
// None of the data types is strictly the widest. Check if all data types are numeric.
// This would happen, for instance, if the data type is a choice between float and integer.
// If that is the case, we can use a String type for the table schema because all values will fit
// into a String. This will still allow for casting, etc. if the query requires it.
boolean allNumeric = true;
for (final DataType possibleType : choiceDataType.getPossibleSubTypes()) {
if (!isNumeric(possibleType)) {
allNumeric = false;
break;
}
}
if (allNumeric) {
return typeFactory.createJavaType(String.class);
}
// There is no specific type that we can use for the schema. This would happen, for instance, if our
// CHOICE is between an integer and a Record.
return typeFactory.createJavaType(Object.class);
}
throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType);
}
private boolean isNumeric(final DataType dataType) {
switch (dataType.getFieldType()) {
case BIGINT:
case BYTE:
case DECIMAL:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case SHORT:
return true;
default:
return false;
}
}
@Override
public TableType getJdbcTableType() {
return TableType.TEMPORARY_TABLE;
}
}