blob: ece2008dd0737a029b4e955b389f7c8f2f1ce0cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.interpreter;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.runtime.Enumerables;
import org.apache.calcite.schema.FilterableTable;
import org.apache.calcite.schema.ProjectableFilterableTable;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.Mappings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.util.List;
import static org.apache.calcite.util.Static.RESOURCE;
import static java.util.Objects.requireNonNull;
/**
* Interpreter node that implements a
* {@link org.apache.calcite.rel.core.TableScan}.
*/
public class TableScanNode implements Node {
private TableScanNode(Compiler compiler, TableScan rel,
Enumerable<Row> enumerable) {
compiler.enumerable(rel, enumerable);
}
@Override public void run() {
// nothing to do
}
/** Creates a TableScanNode.
*
* <p>Tries various table SPIs, and negotiates with the table which filters
* and projects it can implement. Adds to the Enumerable implementations of
* any filters and projects that cannot be implemented by the table. */
static TableScanNode create(Compiler compiler, TableScan rel,
ImmutableList<RexNode> filters, @Nullable ImmutableIntList projects) {
final RelOptTable relOptTable = rel.getTable();
final ProjectableFilterableTable pfTable =
relOptTable.unwrap(ProjectableFilterableTable.class);
if (pfTable != null) {
return createProjectableFilterable(compiler, rel, filters, projects,
pfTable);
}
final FilterableTable filterableTable =
relOptTable.unwrap(FilterableTable.class);
if (filterableTable != null) {
return createFilterable(compiler, rel, filters, projects,
filterableTable);
}
final ScannableTable scannableTable =
relOptTable.unwrap(ScannableTable.class);
if (scannableTable != null) {
return createScannable(compiler, rel, filters, projects,
scannableTable);
}
//noinspection unchecked
final Enumerable<Row> enumerable = relOptTable.unwrap(Enumerable.class);
if (enumerable != null) {
return createEnumerable(compiler, rel, enumerable, null, filters,
projects);
}
final QueryableTable queryableTable =
relOptTable.unwrap(QueryableTable.class);
if (queryableTable != null) {
return createQueryable(compiler, rel, filters, projects,
queryableTable);
}
throw new AssertionError("cannot convert table " + relOptTable
+ " to enumerable");
}
private static TableScanNode createScannable(Compiler compiler, TableScan rel,
ImmutableList<RexNode> filters, @Nullable ImmutableIntList projects,
ScannableTable scannableTable) {
final Enumerable<Row> rowEnumerable =
Enumerables.toRow(scannableTable.scan(compiler.getDataContext()));
return createEnumerable(compiler, rel, rowEnumerable, null, filters,
projects);
}
private static TableScanNode createQueryable(Compiler compiler,
TableScan rel, ImmutableList<RexNode> filters, @Nullable ImmutableIntList projects,
QueryableTable queryableTable) {
final DataContext root = compiler.getDataContext();
final RelOptTable relOptTable = rel.getTable();
final Type elementType = queryableTable.getElementType();
SchemaPlus schema = root.getRootSchema();
for (String name : Util.skipLast(relOptTable.getQualifiedName())) {
requireNonNull(schema, () ->
"schema is null while resolving " + name + " for table"
+ relOptTable.getQualifiedName());
schema = schema.getSubSchema(name);
}
final Enumerable<Row> rowEnumerable;
if (elementType instanceof Class) {
//noinspection unchecked
final Queryable<Object> queryable = Schemas.queryable(root,
(Class) elementType,
relOptTable.getQualifiedName());
ImmutableList.Builder<Field> fieldBuilder = ImmutableList.builder();
Class type = (Class) elementType;
for (Field field : type.getFields()) {
if (Modifier.isPublic(field.getModifiers())
&& !Modifier.isStatic(field.getModifiers())) {
fieldBuilder.add(field);
}
}
final List<Field> fields = fieldBuilder.build();
rowEnumerable = queryable.select(o -> {
final @Nullable Object[] values = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
try {
values[i] = field.get(o);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return new Row(values);
});
} else {
rowEnumerable =
Schemas.queryable(root, Row.class, relOptTable.getQualifiedName());
}
return createEnumerable(compiler, rel, rowEnumerable, null, filters,
projects);
}
private static TableScanNode createFilterable(Compiler compiler,
TableScan rel, ImmutableList<RexNode> filters, @Nullable ImmutableIntList projects,
FilterableTable filterableTable) {
final DataContext root = compiler.getDataContext();
final List<RexNode> mutableFilters = Lists.newArrayList(filters);
final Enumerable<@Nullable Object[]> enumerable =
filterableTable.scan(root, mutableFilters);
for (RexNode filter : mutableFilters) {
if (!filters.contains(filter)) {
throw RESOURCE.filterableTableInventedFilter(filter.toString()).ex();
}
}
final Enumerable<Row> rowEnumerable = Enumerables.toRow(enumerable);
return createEnumerable(compiler, rel, rowEnumerable, null,
mutableFilters, projects);
}
private static TableScanNode createProjectableFilterable(Compiler compiler,
TableScan rel, ImmutableList<RexNode> filters, @Nullable ImmutableIntList projects,
ProjectableFilterableTable pfTable) {
final DataContext root = compiler.getDataContext();
final ImmutableIntList originalProjects = projects;
for (;;) {
final List<RexNode> mutableFilters = Lists.newArrayList(filters);
final int[] projectInts;
if (projects == null
|| projects.equals(TableScan.identity(rel.getTable()))) {
projectInts = null;
} else {
projectInts = projects.toIntArray();
}
for (RexNode filter : mutableFilters) {
if (!filters.contains(filter)) {
throw RESOURCE.filterableTableInventedFilter(filter.toString())
.ex();
}
}
final ImmutableBitSet usedFields =
RelOptUtil.InputFinder.bits(mutableFilters, null);
if (projects != null) {
int changeCount = 0;
for (int usedField : usedFields) {
if (!projects.contains(usedField)) {
// A field that is not projected is used in a filter that the
// table rejected. We won't be able to apply the filter later.
// Try again without any projects.
projects =
ImmutableIntList.copyOf(
Iterables.concat(projects, ImmutableList.of(usedField)));
++changeCount;
}
}
if (changeCount > 0) {
continue;
}
}
final Enumerable<@Nullable Object[]> enumerable1 =
pfTable.scan(root, mutableFilters, projectInts);
final Enumerable<Row> rowEnumerable = Enumerables.toRow(enumerable1);
final ImmutableIntList rejectedProjects;
if (originalProjects == null || originalProjects.equals(projects)) {
rejectedProjects = null;
} else {
// We projected extra columns because they were needed in filters. Now
// project the leading columns.
rejectedProjects = ImmutableIntList.identity(originalProjects.size());
}
return createEnumerable(compiler, rel, rowEnumerable, projects,
mutableFilters, rejectedProjects);
}
}
private static TableScanNode createEnumerable(Compiler compiler,
TableScan rel, Enumerable<Row> enumerable,
final @Nullable ImmutableIntList acceptedProjects, List<RexNode> rejectedFilters,
final @Nullable ImmutableIntList rejectedProjects) {
if (!rejectedFilters.isEmpty()) {
final RexNode filter =
RexUtil.composeConjunction(rel.getCluster().getRexBuilder(),
rejectedFilters);
// Re-map filter for the projects that have been applied already
final RexNode filter2;
final RelDataType inputRowType;
if (acceptedProjects == null) {
filter2 = filter;
inputRowType = rel.getRowType();
} else {
final Mapping mapping = Mappings.target(acceptedProjects,
rel.getTable().getRowType().getFieldCount());
filter2 = RexUtil.apply(mapping, filter);
final RelDataTypeFactory.Builder builder =
rel.getCluster().getTypeFactory().builder();
final List<RelDataTypeField> fieldList =
rel.getTable().getRowType().getFieldList();
for (int acceptedProject : acceptedProjects) {
builder.add(fieldList.get(acceptedProject));
}
inputRowType = builder.build();
}
final Scalar condition =
compiler.compile(ImmutableList.of(filter2), inputRowType);
final Context context = compiler.createContext();
enumerable = enumerable.where(row -> {
context.values = row.getValues();
Boolean b = (Boolean) condition.execute(context);
return b != null && b;
});
}
if (rejectedProjects != null) {
enumerable = enumerable.select(
new Function1<Row, Row>() {
final @Nullable Object[] values = new Object[rejectedProjects.size()];
@Override public Row apply(Row row) {
final @Nullable Object[] inValues = row.getValues();
for (int i = 0; i < rejectedProjects.size(); i++) {
values[i] = inValues[rejectedProjects.get(i)];
}
return Row.asCopy(values);
}
});
}
return new TableScanNode(compiler, rel, enumerable);
}
}