blob: 93e6daa0649d86d4b61018d734a846e9ebfc59e0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.jdbc;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserExceptionUtils;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.alias.AliasRegistryProvider;
import org.apache.drill.exec.planner.sql.SchemaUtilities;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.SubSchemaWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
* Loads schemas from storage plugins later when {@link #getSubSchema(String, boolean)}
* is called.
*/
public class DynamicRootSchema extends DynamicSchema {
private static final Logger logger = LoggerFactory.getLogger(DynamicRootSchema.class);
private static final String ROOT_SCHEMA_NAME = "";
private final SchemaConfig schemaConfig;
private final StoragePluginRegistry storages;
private final AliasRegistryProvider aliasRegistryProvider;
/** Creates a root schema. */
DynamicRootSchema(StoragePluginRegistry storages, SchemaConfig schemaConfig, AliasRegistryProvider aliasRegistryProvider) {
super(null, new RootSchema(storages), ROOT_SCHEMA_NAME);
this.schemaConfig = schemaConfig;
this.storages = storages;
this.aliasRegistryProvider = aliasRegistryProvider;
}
@Override
protected CalciteSchema getImplicitSubSchema(String schemaName,
boolean caseSensitive) {
String actualSchemaName = aliasRegistryProvider.getStorageAliasesRegistry()
.getUserAliases(schemaConfig.getUserName()).get(SchemaPath.getSimplePath(schemaName).toExpr());
return getSchema(actualSchemaName != null
? SchemaPath.parseFromString(actualSchemaName).getRootSegmentPath()
: schemaName,
caseSensitive);
}
private CalciteSchema getSchema(String schemaName, boolean caseSensitive) {
// Drill registers schemas in lower case, see AbstractSchema constructor
schemaName = schemaName == null ? null : schemaName.toLowerCase();
CalciteSchema retSchema = subSchemaMap.map().get(schemaName);
if (retSchema != null) {
return retSchema;
}
loadSchemaFactory(schemaName, caseSensitive);
retSchema = subSchemaMap.map().get(schemaName);
return retSchema;
}
private SchemaPath resolveTableAlias(String alias) {
return Optional.ofNullable(aliasRegistryProvider.getTableAliasesRegistry()
.getUserAliases(schemaConfig.getUserName()).get(alias))
.map(SchemaPath::parseFromString)
.orElse(null);
}
private void registerSchemasWithRetry(StoragePlugin plugin) throws Exception {
long maxAttempts = 1 + schemaConfig
.getOption(ExecConstants.STORAGE_PLUGIN_RETRY_ATTEMPTS)
.num_val;
long retryDelayMs = schemaConfig
.getOption(ExecConstants.STORAGE_PLUGIN_RETRY_DELAY)
.num_val;
int attempt=0;
Exception lastAttemptEx = null;
while (attempt++ < maxAttempts) {
try {
plugin.registerSchemas(schemaConfig, plus());
return;
} catch (Exception ex) {
lastAttemptEx = ex;
logger.warn(
"Attempt {} of {} to register schemas for plugin {} failed.",
attempt, maxAttempts, plugin,
ex
);
if (attempt < maxAttempts) {
logger.info(
"Next attempt to register schemas for plugin {} will be made in {}ms.",
plugin,
retryDelayMs
);
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException intEx) {
logger.warn(
"Interrupted while waiting to make another attempt to register " +
"schemas for plugin {}.",
plugin,
intEx
);
}
}
}
}
throw lastAttemptEx;
}
/**
* Loads schema factory(storage plugin) for specified {@code schemaName}
* @param schemaName the name of the schema
* @param caseSensitive whether matching for the schema name is case sensitive
*/
private void loadSchemaFactory(String schemaName, boolean caseSensitive) {
StoragePlugin plugin = null;
try {
SchemaPlus schemaPlus = this.plus();
plugin = storages.getPlugin(schemaName);
if (plugin != null) {
registerSchemasWithRetry(plugin);
return;
}
// Could not find the plugin of schemaName. The schemaName could be `dfs.tmp`, a 2nd level schema under 'dfs'
List<String> paths = SchemaUtilities.getSchemaPathAsList(schemaName);
if (paths.size() == 2) {
plugin = storages.getPlugin(paths.get(0));
if (plugin == null) {
return;
}
// Looking for the SchemaPlus for the top level (e.g. 'dfs') of schemaName (e.g. 'dfs.tmp')
SchemaPlus firstLevelSchema = schemaPlus.getSubSchema(paths.get(0));
if (firstLevelSchema == null) {
// register schema for this storage plugin to 'this'.
registerSchemasWithRetry(plugin);
firstLevelSchema = schemaPlus.getSubSchema(paths.get(0));
}
// Load second level schemas for this storage plugin
List<SchemaPlus> secondLevelSchemas = new ArrayList<>();
for (String secondLevelSchemaName : firstLevelSchema.getSubSchemaNames()) {
secondLevelSchemas.add(firstLevelSchema.getSubSchema(secondLevelSchemaName));
}
for (SchemaPlus schema : secondLevelSchemas) {
org.apache.drill.exec.store.AbstractSchema drillSchema;
try {
drillSchema = schema.unwrap(AbstractSchema.class);
} catch (ClassCastException e) {
throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
}
SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
schemaPlus.add(wrapper.getName(), wrapper);
}
}
} catch (Exception ex) {
logger.error("Failed to load schema for {}", schemaName, ex);
// We can't proceed further without a schema, throw a runtime exception.
// The UserException thrown from here must have an error type of PLUGIN
// because that will be used to decide whether planning should be retried
// by {@link DrillSqlWorker}.
//
// If the exception we've caught is already a UserException we have to
// jump through a hoop to ensure that the one that we throw from
// here has an error type of PLUGIN instead of inheriting that of the
// caught UserException. See the logic in the UserException builder
// methods.
UserException.Builder exceptBuilder = ex instanceof UserException
? ((UserException) ex).rebuild().errorType(DrillPBError.ErrorType.PLUGIN)
: UserException
.pluginError(ex)
.message("Failed to load schema for schema %s", schemaName)
.addContext("%s: %s", ex.getClass().getName(), ex.getMessage())
.addContext(UserExceptionUtils.getUserHint(ex)); //Provide hint if it exists
if (schemaConfig.getOption(ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE).bool_val) {
String msg = String.format(
"The plugin %s will now be disabled (see SYSTEM option %s)",
plugin.getName(),
ExecConstants.STORAGE_PLUGIN_AUTO_DISABLE
);
exceptBuilder.addContext(msg);
logger.warn(msg);
try {
storages.setEnabled(plugin.getName(), false);
} catch (PluginException disableEx) {
logger.error("Could not disable {}", plugin.getName(), disableEx);
}
}
throw exceptBuilder.build(logger);
}
}
@Override
protected TableEntry getImplicitTable(String tableName, boolean caseSensitive) {
return Optional.ofNullable(getTemporaryTable(tableName, caseSensitive))
.<TableEntry>map(table -> new TableEntryImpl(this, tableName, table.getTable(), table.sqls))
.orElse(super.getImplicitTable(tableName, true));
}
private TableEntry getTemporaryTable(String tableName, boolean caseSensitive) {
CalciteSchema currentSchema = this;
PathSegment.NameSegment pathSegment =
Optional.ofNullable(resolveTableAlias(SchemaPath.getCompoundPath(tableName).toExpr()))
.map(SchemaPath::getRootSegment)
.orElse(null);
if (pathSegment == null) {
return null;
}
while (!pathSegment.isLastPath()) {
currentSchema = currentSchema.getImplicitSubSchema(pathSegment.getPath(), caseSensitive);
pathSegment = pathSegment.getChild().getNameSegment();
}
if (currentSchema != null) {
return currentSchema.getTable(pathSegment.getNameSegment().getPath(), caseSensitive);
}
return null;
}
public static class RootSchema extends AbstractSchema {
private final StoragePluginRegistry storages;
public RootSchema(StoragePluginRegistry storages) {
super(Collections.emptyList(), ROOT_SCHEMA_NAME);
this.storages = storages;
}
@Override
public Set<String> getSubSchemaNames() {
return storages.availablePlugins();
}
@Override
public String getTypeName() {
return ROOT_SCHEMA_NAME;
}
@Override public Expression getExpression(SchemaPlus parentSchema,
String name) {
return Expressions.call(
DataContext.ROOT,
BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
}
@Override
public boolean showInInformationSchema() {
return false;
}
}
}