blob: 4125ad33212be0bc5513b7d429aaf6a5442bfef6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
import org.apache.drill.exec.record.metadata.schema.SchemaProviderFactory;
import org.apache.drill.exec.store.table.function.TableParamDef;
import org.apache.drill.exec.store.table.function.TableSignature;
import org.apache.drill.exec.store.table.function.WithOptionsTableMacro;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.calcite.linq4j.tree.DefaultExpression;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Table;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.dotdrill.View;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer, AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSchema.class);
private static final Expression EXPRESSION = new DefaultExpression(Object.class);
private static final String SCHEMA_PARAMETER_NAME = "schema";
/**
* Schema parameter for table function which creates schema provider based on given parameter value.
*/
private static final TableParamDef SCHEMA_PARAMETER = TableParamDef.optional(
SCHEMA_PARAMETER_NAME, String.class, (drillTable, value) -> {
if (value == null) {
return;
}
SchemaProvider schemaProvider;
try {
schemaProvider = SchemaProviderFactory.create(String.valueOf(value));
// since we path schema here as table parameter
// read schema to ensure that schema is valid before query execution
schemaProvider.read();
} catch (IOException | IllegalArgumentException e) {
throw UserException.validationError(e)
.message(e.getMessage())
.addContext("Schema parameter value [%s]", value)
.build(logger);
}
drillTable.getMetadataProviderManager().setSchemaProvider(schemaProvider);
});
public static final String SCHEMA_SEPARATOR = ".";
protected final List<String> schemaPath;
protected final String name;
public AbstractSchema(List<String> parentSchemaPath, String name) {
name = name == null ? null : name.toLowerCase();
schemaPath = new ArrayList<>();
schemaPath.addAll(parentSchemaPath);
schemaPath.add(name);
this.name = name;
}
@Override
public Iterable<String> getSubPartitions(String table,
List<String> partitionColumns,
List<String> partitionValues
) throws PartitionNotFoundException {
throw new UnsupportedOperationException(
String.format("Schema of type: %s " +
"does not support retrieving sub-partition information.",
this.getClass().getSimpleName()));
}
public String getName() {
return name;
}
public List<String> getSchemaPath() {
return schemaPath;
}
public String getFullSchemaName() {
return Joiner.on(SCHEMA_SEPARATOR).join(schemaPath);
}
@Override
public final String toString() {
return getFullSchemaName();
}
/**
* Returns string describing schema type which shows where the schema came from.
* Good practice here is to return json type name of storage plugin's config.
*
* @return schema type name
*/
public abstract String getTypeName();
/**
* The schema can be a top level schema which doesn't have its own tables, but refers
* to one of the default sub schemas for table look up.
*
* Default implementation returns itself.
*
* Ex. "dfs" schema refers to the tables in "default" workspace when querying for
* tables in "dfs" schema.
*
* @return Return the default schema where tables are created or retrieved from.
*/
public Schema getDefaultSchema() {
return this;
}
/**
* Create a new view given definition.
* @param view View info including name, definition etc.
* @return Returns true if an existing view is replaced with the given view. False otherwise.
* @throws IOException in case of error creating a view
*/
public boolean createView(View view) throws IOException {
throw UserException.unsupportedError()
.message("Creating new view is not supported in schema [%s]", getSchemaPath())
.build(logger);
}
/**
* Drop the view with given name.
*
* @param viewName view name
* @throws IOException in case of error dropping the view
*/
public void dropView(String viewName) throws IOException {
throw UserException.unsupportedError()
.message("Dropping a view is supported in schema [%s]", getSchemaPath())
.build(logger);
}
/**
* Creates table entry using table name, list of partition columns
* and storage strategy used to create table folder and files
*
* @param tableName : new table name.
* @param partitionColumns : list of partition columns. Empty list if there is no partition columns.
* @param storageStrategy : storage strategy used to create table folder and files
* @return create table entry
*/
public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
throw UserException.unsupportedError()
.message("Creating new tables is not supported in schema [%s]", getSchemaPath())
.build(logger);
}
/**
* Creates table entry using table name and list of partition columns if any.
* Table folder and files will be created using persistent storage strategy.
*
* @param tableName : new table name.
* @param partitionColumns : list of partition columns. Empty list if there is no partition columns.
* @return create table entry
*/
public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
return createNewTable(tableName, partitionColumns, StorageStrategy.DEFAULT);
}
/**
* Create stats table entry for given <i>tableName</i>.
* @param tableName table name
* @return instance of create table entry
*/
public CreateTableEntry createStatsTable(String tableName) {
throw UserException.unsupportedError()
.message("Statistics tables are not supported in schema [%s]", getSchemaPath())
.build(logger);
}
/**
* Create an append statistics table entry for given <i>tableName</i>. If there is not existing
* statistics table, a new one is created.
* @param tableName table name
* @return instance of create table entry
*/
public CreateTableEntry appendToStatsTable(String tableName) {
throw UserException.unsupportedError()
.message("Statistics tables are not supported in schema [%s]", getSchemaPath())
.build(logger);
}
/**
* Get the statistics table for given <i>tableName</i>
* @param tableName table name
* @return instance of statistics table
*/
public Table getStatsTable(String tableName) {
throw UserException.unsupportedError()
.message("Statistics tables are not supported in schema [%s]", getSchemaPath())
.build(logger);
}
/**
* Reports whether to show items from this schema in INFORMATION_SCHEMA
* tables.
* (Controls ... TODO: Doc.: Mention what this typically controls or
* affects.)
* <p>
* This base implementation returns {@code true}.
* </p>
*/
public boolean showInInformationSchema() {
return true;
}
/**
* For the given table names returns list of acceptable table functions
* which are common for all Drill schemas. When overriding this method,
* parent functions must be included first to be evaluated first.
* If not included, parent functions won't be taken into account when creating table instance.
*
* @param name table name
* @return list of table functions
*/
@Override
public Collection<Function> getFunctions(String name) {
List<TableParamDef> parameters = getFunctionParameters();
TableSignature signature = TableSignature.of(name, parameters);
WithOptionsTableMacro function = new WithOptionsTableMacro(signature, arguments -> {
Table table = getTable(name);
if (table instanceof DrillTable) {
return applyFunctionParameters((DrillTable) table, parameters, arguments);
}
throw new DrillRuntimeException(String.format("Table [%s] is not of Drill table instance. " +
"Given instance is of [%s].", name, table.getClass().getName()));
});
return Collections.singletonList(function);
}
/**
* Returns of common table function parameters that can be used all Drill schema implementations.
*
* @return list of table function parameters
*/
public List<TableParamDef> getFunctionParameters() {
return Collections.singletonList(SCHEMA_PARAMETER);
}
/**
* For the given list of parameters definitions executes action for the corresponding value.
*
* @param drillTable Drill table instance
* @param paramDefs parameter definitions
* @param values parameter values
* @return updated Drill table instance
*/
public DrillTable applyFunctionParameters(DrillTable drillTable, List<TableParamDef> paramDefs, List<Object> values) {
IntStream.range(0, paramDefs.size())
.forEach(i -> paramDefs.get(i).apply(drillTable, values.get(i)));
return drillTable;
}
/**
* Returns a map of types in this schema by name.
*
* <p>The implementations of {@link #getTypeNames()}
* and {@link #getType(String)} depend on this map.
* The default implementation of this method returns the empty map.
* Override this method to change their behavior.</p>
*
* @return Map of types in this schema by name
*/
protected Map<String, RelProtoDataType> getTypeMap() {
return ImmutableMap.of();
}
@Override
public Set<String> getTypeNames() {
return getTypeMap().keySet();
}
@Override
public RelProtoDataType getType(String name) {
return getTypeMap().get(name);
}
@Override
public Set<String> getFunctionNames() {
return Collections.emptySet();
}
@Override
public Schema getSubSchema(String name) {
return null;
}
@Override
public Set<String> getSubSchemaNames() {
return Collections.emptySet();
}
@Override
public boolean isMutable() {
return false;
}
@Override
public Table getTable(String name){
return null;
}
@Override
public Set<String> getTableNames() {
return Collections.emptySet();
}
@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
return EXPRESSION;
}
@Override
public void close() throws Exception {
// no-op: default implementation for most implementations.
}
@Override
public Schema snapshot(SchemaVersion version) {
return this;
}
public void dropTable(String tableName) {
throw UserException.unsupportedError()
.message("Dropping tables is not supported in schema [%s]", getSchemaPath())
.build(logger);
}
/**
* Get the collection of {@link Table} tables specified in the tableNames.
*
* @param tableNames the requested tables, specified by the table names
* @return the collection of requested tables
*/
public List<Pair<String, ? extends Table>> getTablesByNames(final Set<String> tableNames) {
return tableNames.stream()
.map(tableName -> Pair.of(tableName, getTable(tableName)))
.filter(pair -> Objects.nonNull(pair.getValue())) // Schema may return NULL for table if user doesn't have permissions to load the table.
.collect(Collectors.toList());
}
/**
* Used by {@link org.apache.drill.exec.store.ischema.InfoSchemaRecordGenerator.Tables}
* for getting all table objects along with type for every requested schema. It's desired
* for this method to work fast because it impacts SHOW TABLES query.
*
* @return collection of table names and types
*/
public Collection<Map.Entry<String, TableType>> getTableNamesAndTypes() {
return getTablesByNames(getTableNames()).stream()
.map(nameAndTable -> Pair.of(nameAndTable.getKey(), nameAndTable.getValue().getJdbcTableType()))
.collect(Collectors.toList());
}
/**
* Indicates if table names in schema are case sensitive. By default they are.
* If schema implementation claims its table names are case insensitive,
* it is responsible for making case insensitive look up by table name.
*
* @return true if table names are case sensitive
*/
public boolean areTableNamesCaseSensitive() {
return true;
}
}