blob: 066e7797d026216774f07ed71abf66f379fb47e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.meta.provider;
import static java.util.stream.Collectors.toList;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.TableName;
import org.apache.beam.sdk.extensions.sql.meta.CustomTableResolver;
import org.apache.beam.sdk.extensions.sql.meta.Table;
/**
* Base class for table providers that look up table metadata using full table names, instead of
* querying it by parts of the name separately.
*/
@Experimental
public abstract class FullNameTableProvider implements TableProvider, CustomTableResolver {
private List<TableName> knownTables;
protected FullNameTableProvider() {
knownTables = new ArrayList<>();
}
public abstract Table getTableByFullName(TableName fullTableName);
@Override
public void registerKnownTableNames(List<TableName> tableNames) {
knownTables.addAll(tableNames);
}
@Override
public TableProvider getSubProvider(String name) {
// TODO: implement with trie
// If 'name' matches a sub-schema/sub-provider we start tracking
// the subsequent calls to getSubProvider().
//
// Simple table ids and final table lookup
//
// If there is no matching sub-schema then returning null from here indicates
// that 'name' is either not part of this schema or it's a table, not a sub-schema,
// this will be checked right after this in a getTable() call.
//
// Because this is a getSubProvider() call it means Calcite expects
// the sub-schema/sub-provider to be returned, not a table,
// so we only need to check against known compound table identifiers.
// If 'name' acutally represents a simple identifier then it will be checked
// in a 'getTable()' call later. Unless there's the same sub-provider name,
// in which case it's a conflict and we will use the sub-schema and not assume it's a table.
// Calcite does the same.
//
// Here we find if there are any parsed tables that start from 'name' that belong to this
// table provider.
// We then create a fake tracking provider that in a trie-manner collects
// getSubProvider()/getTable() calls by checking whether there are known parsed table names
// matching what Calcite asks us for.
List<TableName> tablesToLookFor =
knownTables.stream()
.filter(TableName::isCompound)
.filter(tableName -> tableName.getPrefix().equals(name))
.collect(toList());
return tablesToLookFor.size() > 0 ? new TableNameTrackingProvider(1, tablesToLookFor) : null;
}
/**
* Calcite calls getSubProvider()/getTable() on this class when resolving a table name. This class
* keeps track of these calls and checks against known table names (extracted from a query), so
* that when a full table name is parsed out it calls the actual table provider to get a table
* based on the full name, instead of calling it component by component.
*
* <p>This class nables table providers to query their metadata source using full table names.
*/
class TableNameTrackingProvider extends InMemoryMetaTableProvider {
int schemaLevel;
List<TableName> tableNames;
TableNameTrackingProvider(int schemaLevel, List<TableName> tableNames) {
this.schemaLevel = schemaLevel;
this.tableNames = tableNames;
}
@Override
public TableProvider getSubProvider(String name) {
// Find if any of the parsed table names have 'name' as part
// of their path at current index.
//
// If there are, return a new tracking provider for such tables and incremented index.
//
// If there are none, it means something weird has happened and returning null
// will make Calcite try other schemas. Maybe things will work out.
//
// However since we originally register all parsed table names for the given schema
// in this provider we should only receive a getSubProvider() call for something unknown
// when it's a leaf path element, i.e. actual table name, which will be handled in
// getTable() call.
List<TableName> matchingTables =
tableNames.stream()
.filter(TableName::isCompound)
.filter(tableName -> tableName.getPath().size() > schemaLevel)
.filter(tableName -> tableName.getPath().get(schemaLevel).equals(name))
.collect(toList());
return matchingTables.size() > 0
? new TableNameTrackingProvider(schemaLevel + 1, matchingTables)
: null;
}
@Override
public String getTableType() {
return "google.cloud.datacatalog.subprovider";
}
@Nullable
@Override
public Table getTable(String name) {
// This is called only after getSubProvider() returned null,
// and since we are tracking the actual parsed table names, this should
// be it, there should exist a parsed table that matches the 'name'.
Optional<TableName> matchingTable =
tableNames.stream()
.filter(tableName -> tableName.getTableName().equals(name))
.findFirst();
TableName fullTableName =
matchingTable.orElseThrow(
() ->
new IllegalStateException(
"Unexpected table '"
+ name
+ "' requested. Current schema level is "
+ schemaLevel
+ ". Current known table names: "
+ tableNames.toString()));
return FullNameTableProvider.this.getTableByFullName(fullTableName);
}
@Override
public synchronized BeamSqlTable buildBeamSqlTable(Table table) {
return FullNameTableProvider.this.buildBeamSqlTable(table);
}
}
}