blob: 2f0c1e617f90393cf9e1b11482e54d2302a69049 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.zetasql.translation;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkNotNull;
import com.google.zetasql.ZetaSQLType.TypeKind;
import com.google.zetasql.resolvedast.ResolvedColumn;
import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedTableScan;
import java.util.List;
import java.util.Properties;
import org.apache.beam.sdk.extensions.sql.zetasql.TableResolution;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteSchema;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptTable;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelRoot;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.Table;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.TranslatableTable;
/** Converts table scan. */
class TableScanConverter extends RelConverter<ResolvedTableScan> {
private static final ImmutableSet<TypeKind> UNSUPPORTED_DATA_TYPES =
ImmutableSet.of(TYPE_DATETIME, TYPE_NUMERIC);
TableScanConverter(ConversionContext context) {
super(context);
}
@Override
public RelNode convert(ResolvedTableScan zetaNode, List<RelNode> inputs) {
checkTableScanSchema(zetaNode.getColumnList());
List<String> tablePath = getTablePath(zetaNode.getTable());
SchemaPlus defaultSchemaPlus = getConfig().getDefaultSchema();
// TODO: reject incorrect top-level schema
Table calciteTable =
TableResolution.resolveCalciteTable(getConfig().getContext(), defaultSchemaPlus, tablePath);
// we already resolved the table before passing the query to Analyzer, so it should be there
checkNotNull(
calciteTable,
"Unable to resolve the table path %s in schema %s",
tablePath,
defaultSchemaPlus.getName());
String defaultSchemaName = defaultSchemaPlus.getName();
final CalciteCatalogReader catalogReader =
new CalciteCatalogReader(
CalciteSchema.from(defaultSchemaPlus),
ImmutableList.of(defaultSchemaName),
getCluster().getTypeFactory(),
new CalciteConnectionConfigImpl(new Properties()));
RelOptTableImpl relOptTable =
RelOptTableImpl.create(
catalogReader,
calciteTable.getRowType(getCluster().getTypeFactory()),
calciteTable,
ImmutableList.<String>builder().add(defaultSchemaName).addAll(tablePath).build());
if (calciteTable instanceof TranslatableTable) {
return ((TranslatableTable) calciteTable).toRel(createToRelContext(), relOptTable);
} else {
throw new RuntimeException("Does not support non TranslatableTable type table!");
}
}
private List<String> getTablePath(com.google.zetasql.Table table) {
if (!getTrait().isTableResolved(table)) {
throw new RuntimeException(
"Unexpected table found when converting to Calcite rel node: " + table);
}
return getTrait().getTablePath(table);
}
private RelOptTable.ToRelContext createToRelContext() {
return new RelOptTable.ToRelContext() {
@Override
public RelRoot expandView(
RelDataType relDataType, String s, List<String> list, List<String> list1) {
throw new UnsupportedOperationException("This RelContext does not support expandView");
}
@Override
public RelOptCluster getCluster() {
return TableScanConverter.this.getCluster();
}
};
}
private void checkTableScanSchema(List<ResolvedColumn> columnList) {
if (columnList != null) {
for (ResolvedColumn resolvedColumn : columnList) {
if (UNSUPPORTED_DATA_TYPES.contains(resolvedColumn.getType().getKind())) {
throw new IllegalArgumentException(
"Does not support " + UNSUPPORTED_DATA_TYPES + " types in source tables");
}
}
}
}
}