blob: 2bf6bc5b228d5d81576c692c8164a96ac67e82ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store;
import static org.apache.drill.shaded.guava.com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.LogicalPlanPersistence;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor;
import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.sys.CaseInsensitivePersistentStore;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
import org.apache.drill.shaded.guava.com.google.common.cache.RemovalListener;
import org.apache.drill.shaded.guava.com.google.common.io.Resources;
public class StoragePluginRegistryImpl implements StoragePluginRegistry {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class);
private final StoragePluginMap enabledPlugins;
private final DrillSchemaFactory schemaFactory;
private final DrillbitContext context;
private final LogicalPlanPersistence lpPersistence;
private final ScanResult classpathScan;
private final PersistentStore<StoragePluginConfig> pluginSystemTable;
private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins;
private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap();
private Map<String, StoragePlugin> systemPlugins = Collections.emptyMap();
public StoragePluginRegistryImpl(DrillbitContext context) {
this.enabledPlugins = new StoragePluginMap();
this.schemaFactory = new DrillSchemaFactory(null);
this.context = checkNotNull(context);
this.lpPersistence = checkNotNull(context.getLpPersistence());
this.classpathScan = checkNotNull(context.getClasspathScan());
this.pluginSystemTable = initPluginsSystemTable(context, lpPersistence);
this.ephemeralPlugins = CacheBuilder.newBuilder()
.expireAfterAccess(24, TimeUnit.HOURS)
.maximumSize(250)
.removalListener(
(RemovalListener<StoragePluginConfig, StoragePlugin>) notification -> closePlugin(notification.getValue()))
.build(new CacheLoader<StoragePluginConfig, StoragePlugin>() {
@Override
public StoragePlugin load(StoragePluginConfig config) throws Exception {
return create(null, config);
}
});
}
@Override
public void init() {
availablePlugins = findAvailablePlugins(classpathScan);
systemPlugins = initSystemPlugins(classpathScan, context);
try {
StoragePlugins bootstrapPlugins = pluginSystemTable.getAll().hasNext() ? null : loadBootstrapPlugins(lpPersistence);
StoragePluginsHandler storagePluginsHandler = new StoragePluginsHandlerService(context);
storagePluginsHandler.loadPlugins(pluginSystemTable, bootstrapPlugins);
defineEnabledPlugins();
} catch (IOException e) {
logger.error("Failure setting up storage enabledPlugins. Drillbit exiting.", e);
throw new IllegalStateException(e);
}
}
@Override
public void deletePlugin(String name) {
StoragePlugin plugin = enabledPlugins.remove(name);
closePlugin(plugin);
pluginSystemTable.delete(name);
}
@Override
public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException {
for (;;) {
StoragePlugin oldPlugin = enabledPlugins.get(name);
StoragePlugin newPlugin = create(name, config);
boolean done = false;
try {
if (oldPlugin != null) {
done = newPlugin == null
? enabledPlugins.remove(name, oldPlugin)
: enabledPlugins.replace(name, oldPlugin, newPlugin);
} else if (newPlugin != null) {
done = (null == enabledPlugins.putIfAbsent(name, newPlugin));
} else {
done = true;
}
} finally {
StoragePlugin pluginToClose = done ? oldPlugin : newPlugin;
closePlugin(pluginToClose);
}
if (done) {
if (persist) {
pluginSystemTable.put(name, config);
}
return newPlugin;
}
}
}
@Override
public StoragePlugin getPlugin(String name) throws ExecutionSetupException {
StoragePlugin plugin = enabledPlugins.get(name);
if (systemPlugins.get(name) != null) {
return plugin;
}
// since we lazily manage the list of plugins per server, we need to update this once we know that it is time.
StoragePluginConfig config = pluginSystemTable.get(name);
if (config == null) {
if (plugin != null) {
enabledPlugins.remove(name);
}
return null;
} else {
if (plugin == null
|| !plugin.getConfig().equals(config)
|| plugin.getConfig().isEnabled() != config.isEnabled()) {
plugin = createOrUpdate(name, config, false);
}
return plugin;
}
}
@Override
public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
if (config instanceof NamedStoragePluginConfig) {
return getPlugin(((NamedStoragePluginConfig) config).getName());
} else {
// try to lookup plugin by configuration
StoragePlugin plugin = enabledPlugins.get(config);
if (plugin != null) {
return plugin;
}
// no named plugin matches the desired configuration, let's create an
// ephemeral storage plugin (or get one from the cache)
try {
return ephemeralPlugins.get(config);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ExecutionSetupException) {
throw (ExecutionSetupException) cause;
} else {
// this shouldn't happen. here for completeness.
throw new ExecutionSetupException("Failure while trying to create ephemeral plugin.", cause);
}
}
}
}
@Override
public void addEnabledPlugin(String name, StoragePlugin plugin) {
enabledPlugins.put(name, plugin);
}
@Override
public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException {
StoragePlugin storagePlugin = getPlugin(storageConfig);
return storagePlugin.getFormatPlugin(formatConfig);
}
@Override
public PersistentStore<StoragePluginConfig> getStore() {
return pluginSystemTable;
}
@Override
public SchemaFactory getSchemaFactory() {
return schemaFactory;
}
@Override
public Iterator<Entry<String, StoragePlugin>> iterator() {
return enabledPlugins.iterator();
}
@Override
public synchronized void close() throws Exception {
ephemeralPlugins.invalidateAll();
enabledPlugins.close();
pluginSystemTable.close();
}
/**
* Add a plugin and configuration. Assumes neither exists. Primarily for testing.
*
* @param config plugin config
* @param plugin plugin implementation
*/
@VisibleForTesting
public void addPluginToPersistentStoreIfAbsent(String name, StoragePluginConfig config, StoragePlugin plugin) {
addEnabledPlugin(name, plugin);
pluginSystemTable.putIfAbsent(name, config);
}
/**
* <ol>
* <li>Initializes persistent store for storage plugins.</li>
* <li>Since storage plugins names are case-insensitive in Drill, to ensure backward compatibility,
* re-writes those not stored in lower case with lower case names, for duplicates issues warning. </li>
* <li>Wraps plugin system table into case insensitive wrapper.</li>
* </ol>
*
* @param context drillbit context
* @param lpPersistence deserialization mapper provider
* @return persistent store for storage plugins
*/
private PersistentStore<StoragePluginConfig> initPluginsSystemTable(DrillbitContext context, LogicalPlanPersistence lpPersistence) {
try {
PersistentStore<StoragePluginConfig> pluginSystemTable = context
.getStoreProvider()
.getOrCreateStore(PersistentStoreConfig
.newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class)
.name(PSTORE_NAME)
.build());
Iterator<Entry<String, StoragePluginConfig>> storedPlugins = pluginSystemTable.getAll();
while (storedPlugins.hasNext()) {
Entry<String, StoragePluginConfig> entry = storedPlugins.next();
String pluginName = entry.getKey();
if (!pluginName.equals(pluginName.toLowerCase())) {
logger.debug("Replacing plugin name {} with its lower case equivalent.", pluginName);
pluginSystemTable.delete(pluginName);
if (!pluginSystemTable.putIfAbsent(pluginName.toLowerCase(), entry.getValue())) {
logger.warn("Duplicated storage plugin name [{}] is found. Duplicate is deleted from persistent storage.", pluginName);
}
}
}
return new CaseInsensitivePersistentStore<>(pluginSystemTable);
} catch (StoreException e) {
logger.error("Failure while loading storage plugin registry.", e);
throw new DrillRuntimeException("Failure while reading and loading storage plugin configuration.", e);
}
}
/**
* Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE}
* and format plugins {@link ExecConstants#BOOTSTRAP_FORMAT_PLUGINS_FILE} files for the first fresh
* instantiating of Drill
*
* @param lpPersistence deserialization mapper provider
* @return bootstrap storage plugins
* @throws IOException if a read error occurs
*/
private StoragePlugins loadBootstrapPlugins(LogicalPlanPersistence lpPersistence) throws IOException {
// bootstrap load the config since no plugins are stored.
logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
Set<URL> storageUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
Set<URL> formatUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_FORMAT_PLUGINS_FILE, false);
if (storageUrls != null && !storageUrls.isEmpty()) {
logger.info("Loading the storage plugin configs from URLs {}.", storageUrls);
StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
Map<String, URL> pluginURLMap = new HashMap<>();
for (URL url : storageUrls) {
loadStoragePlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence);
}
if (formatUrls != null && !formatUrls.isEmpty()) {
logger.info("Loading the format plugin configs from URLs {}.", formatUrls);
for (URL url : formatUrls) {
loadFormatPlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence);
}
}
return bootstrapPlugins;
} else {
throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE);
}
}
/**
* Loads storage plugins from the given URL
*
* @param url URL to the storage plugins bootstrap file
* @param bootstrapPlugins a collection where the plugins should be loaded to
* @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging
* @param lpPersistence need to get an object mapper for the bootstrap files
* @throws IOException if failed to retrieve a plugin from a bootstrap file
*/
private void loadStoragePlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException {
StoragePlugins plugins = getPluginsFromResource(url, lpPersistence);
plugins.forEach(plugin -> {
StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
if (oldPluginConfig != null) {
logger.warn("Duplicate plugin instance '[{}]' defined in [{}, {}], ignoring the later one.",
plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
} else {
pluginURLMap.put(plugin.getKey(), url);
}
});
}
/**
* Loads format plugins from the given URL and adds the formats to the specified storage plugins
*
* @param url URL to the format plugins bootstrap file
* @param bootstrapPlugins a collection with loaded storage plugins. New formats will be added to them
* @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging
* @param lpPersistence need to get an object mapper for the bootstrap files
* @throws IOException if failed to retrieve a plugin from a bootstrap file
*/
private void loadFormatPlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException {
StoragePlugins plugins = getPluginsFromResource(url, lpPersistence);
plugins.forEach(formatPlugin -> {
String targetStoragePluginName = formatPlugin.getKey();
StoragePluginConfig storagePlugin = bootstrapPlugins.getConfig(targetStoragePluginName);
StoragePluginConfig formatPluginValue = formatPlugin.getValue();
if (storagePlugin == null) {
logger.warn("No storage plugins with the given name are registered: '[{}]'", targetStoragePluginName);
} else if (storagePlugin instanceof FileSystemConfig && formatPluginValue instanceof FileSystemConfig) {
FileSystemConfig targetPlugin = (FileSystemConfig) storagePlugin;
((FileSystemConfig) formatPluginValue).getFormats().forEach((formatName, formatValue) -> {
FormatPluginConfig oldPluginConfig = targetPlugin.getFormats().putIfAbsent(formatName, formatValue);
if (oldPluginConfig != null) {
logger.warn("Duplicate format instance '[{}]' defined in [{}, {}], ignoring the later one.",
formatName, pluginURLMap.get(targetStoragePluginName), url);
}
});
} else {
logger.warn("Formats are only supported by File System plugin type: '[{}]'", targetStoragePluginName);
}
});
}
private StoragePlugins getPluginsFromResource(URL resource, LogicalPlanPersistence lpPersistence) throws IOException {
String pluginsData = Resources.toString(resource, Charsets.UTF_8);
return lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
}
/**
* Dynamically loads system plugins annotated with {@link SystemPlugin}.
* Will skip plugin initialization if no matching constructor, incorrect class implementation, name absence are detected.
*
* @param classpathScan classpath scan result
* @param context drillbit context
* @return map with system plugins stored by name
*/
private Map<String, StoragePlugin> initSystemPlugins(ScanResult classpathScan, DrillbitContext context) {
Map<String, StoragePlugin> plugins = CaseInsensitiveMap.newHashMap();
List<AnnotatedClassDescriptor> annotatedClasses = classpathScan.getAnnotatedClasses(SystemPlugin.class.getName());
logger.trace("Found {} annotated classes with SystemPlugin annotation: {}.", annotatedClasses.size(), annotatedClasses);
for (AnnotatedClassDescriptor annotatedClass : annotatedClasses) {
try {
Class<?> aClass = Class.forName(annotatedClass.getClassName());
boolean isPluginInitialized = false;
for (Constructor<?> constructor : aClass.getConstructors()) {
Class<?>[] parameterTypes = constructor.getParameterTypes();
if (parameterTypes.length != 1 || parameterTypes[0] != DrillbitContext.class) {
logger.trace("Not matching constructor for {}. Expecting constructor with one parameter for DrillbitContext class.",
annotatedClass.getClassName());
continue;
}
Object instance = constructor.newInstance(context);
if (!(instance instanceof StoragePlugin)) {
logger.debug("Created instance of {} does not implement StoragePlugin interface.", annotatedClass.getClassName());
continue;
}
StoragePlugin storagePlugin = (StoragePlugin) instance;
String name = storagePlugin.getName();
if (name == null) {
logger.debug("Storage plugin name {} is not defined. Skipping plugin initialization.", annotatedClass.getClassName());
continue;
}
storagePlugin.getConfig().setEnabled(true);
plugins.put(name, storagePlugin);
isPluginInitialized = true;
}
if (!isPluginInitialized) {
logger.debug("Skipping plugin registration, did not find matching constructor or initialized object of wrong type.");
}
} catch (ReflectiveOperationException e) {
logger.warn("Error during system plugin {} initialization. Plugin initialization will be skipped.", annotatedClass.getClassName(), e);
}
}
logger.trace("The following system plugins have been initialized: {}.", plugins.keySet());
return plugins;
}
/**
* Get a list of all available storage plugin class constructors.
* @param classpathScan A classpath scan to use.
* @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors.
*/
@SuppressWarnings("unchecked")
private Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) {
Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<>();
final Collection<Class<? extends StoragePlugin>> pluginClasses =
classpathScan.getImplementations(StoragePlugin.class);
final String lineBrokenList =
pluginClasses.size() == 0
? "" : "\n\t- " + Joiner.on("\n\t- ").join(pluginClasses);
logger.debug("Found {} storage plugin configuration classes: {}.",
pluginClasses.size(), lineBrokenList);
for (Class<? extends StoragePlugin> plugin : pluginClasses) {
int i = 0;
for (Constructor<?> c : plugin.getConstructors()) {
Class<?>[] params = c.getParameterTypes();
if (params.length != 3
|| params[1] != DrillbitContext.class
|| !StoragePluginConfig.class.isAssignableFrom(params[0])
|| params[2] != String.class) {
logger.debug("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a "
+ "[constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
continue;
}
availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
i++;
}
if (i == 0) {
logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters "
+ "of (StoragePluginConfig, Config)", plugin.getCanonicalName());
}
}
return availablePlugins;
}
/**
* It initializes {@link #enabledPlugins} with currently enabled plugins
*/
private void defineEnabledPlugins() {
Map<String, StoragePlugin> activePlugins = new HashMap<>();
Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll();
while (allPlugins.hasNext()) {
Entry<String, StoragePluginConfig> plugin = allPlugins.next();
String name = plugin.getKey();
StoragePluginConfig config = plugin.getValue();
if (config.isEnabled()) {
try {
StoragePlugin storagePlugin = create(name, config);
activePlugins.put(name, storagePlugin);
} catch (ExecutionSetupException e) {
logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e);
config.setEnabled(false);
pluginSystemTable.put(name, config);
}
}
}
activePlugins.putAll(systemPlugins);
enabledPlugins.putAll(activePlugins);
}
/**
* Creates plugin instance with the given {@code name} and configuration {@code pluginConfig}.
* The plugin need to be present in a list of available plugins and be enabled in the configuration
*
* @param name name of the plugin
* @param pluginConfig plugin configuration
* @return plugin client or {@code null} if plugin is disabled
*/
private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
if (!pluginConfig.isEnabled()) {
return null;
}
StoragePlugin plugin;
Constructor<? extends StoragePlugin> constructor = availablePlugins.get(pluginConfig.getClass());
if (constructor == null) {
throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
pluginConfig));
}
try {
plugin = constructor.newInstance(pluginConfig, context, name);
plugin.start();
return plugin;
} catch (ReflectiveOperationException | IOException e) {
Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
if (t instanceof ExecutionSetupException) {
throw ((ExecutionSetupException) t);
}
throw new ExecutionSetupException(String.format("Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
}
}
private void closePlugin(StoragePlugin plugin) {
if (plugin == null) {
return;
}
try {
plugin.close();
} catch (Exception e) {
logger.warn("Exception while shutting down storage plugin.");
}
}
public class DrillSchemaFactory extends AbstractSchemaFactory {
public DrillSchemaFactory(String name) {
super(name);
}
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
Stopwatch watch = Stopwatch.createStarted();
try {
Set<String> currentPluginNames = new HashSet<>(enabledPlugins.getNames());
// iterate through the plugin instances in the persistent store adding
// any new ones and refreshing those whose configuration has changed
Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll();
while (allPlugins.hasNext()) {
Entry<String, StoragePluginConfig> plugin = allPlugins.next();
if (plugin.getValue().isEnabled()) {
getPlugin(plugin.getKey());
currentPluginNames.remove(plugin.getKey());
}
}
// remove those which are no longer in the registry
for (String pluginName : currentPluginNames) {
if (systemPlugins.get(pluginName) != null) {
continue;
}
enabledPlugins.remove(pluginName);
}
// finally register schemas with the refreshed plugins
for (StoragePlugin plugin : enabledPlugins.plugins()) {
try {
plugin.registerSchemas(schemaConfig, parent);
} catch (Exception e) {
logger.warn("Error during `{}` schema initialization: {}", plugin.getName(), e.getMessage(), e.getCause());
}
}
} catch (ExecutionSetupException e) {
throw new DrillRuntimeException("Failure while updating storage plugins", e);
}
// Add second level schema as top level schema with name qualified with parent schema name
// Ex: "dfs" schema has "default" and "tmp" as sub schemas. Add following extra schemas "dfs.default" and
// "dfs.tmp" under root schema.
//
// Before change, schema tree looks like below:
// "root"
// -- "dfs"
// -- "default"
// -- "tmp"
// -- "hive"
// -- "default"
// -- "hivedb1"
//
// After the change, the schema tree looks like below:
// "root"
// -- "dfs"
// -- "default"
// -- "tmp"
// -- "dfs.default"
// -- "dfs.tmp"
// -- "hive"
// -- "default"
// -- "hivedb1"
// -- "hive.default"
// -- "hive.hivedb1"
List<SchemaPlus> secondLevelSchemas = new ArrayList<>();
for (String firstLevelSchemaName : parent.getSubSchemaNames()) {
SchemaPlus firstLevelSchema = parent.getSubSchema(firstLevelSchemaName);
for (String secondLevelSchemaName : firstLevelSchema.getSubSchemaNames()) {
secondLevelSchemas.add(firstLevelSchema.getSubSchema(secondLevelSchemaName));
}
}
for (SchemaPlus schema : secondLevelSchemas) {
AbstractSchema drillSchema;
try {
drillSchema = schema.unwrap(AbstractSchema.class);
} catch (ClassCastException e) {
throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
}
SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
parent.add(wrapper.getName(), wrapper);
}
logger.debug("Took {} ms to register schemas.", watch.elapsed(TimeUnit.MILLISECONDS));
}
}
}