| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.store.dfs; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| |
| import org.apache.calcite.schema.SchemaPlus; |
| import org.apache.drill.common.JSONOptions; |
| import org.apache.drill.common.config.LogicalPlanPersistence; |
| import org.apache.drill.common.exceptions.ExecutionSetupException; |
| import org.apache.drill.common.expression.SchemaPath; |
| import org.apache.drill.common.logical.FormatPluginConfig; |
| import org.apache.drill.common.logical.StoragePluginConfig; |
| import org.apache.drill.exec.metastore.MetadataProviderManager; |
| import org.apache.drill.exec.ops.OptimizerRulesContext; |
| import org.apache.drill.exec.physical.base.AbstractGroupScan; |
| import org.apache.drill.exec.server.DrillbitContext; |
| import org.apache.drill.exec.server.options.SessionOptionManager; |
| import org.apache.drill.exec.store.AbstractStoragePlugin; |
| import org.apache.drill.exec.store.ClassPathFileSystem; |
| import org.apache.drill.exec.store.LocalSyncableFileSystem; |
| import org.apache.drill.exec.store.SchemaConfig; |
| import org.apache.drill.exec.store.StoragePluginOptimizerRule; |
| import org.apache.drill.shaded.guava.com.google.common.base.Strings; |
| import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; |
| import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem, |
| * LocalFileSystem, as well Apache Drill specific CachedFileSystem, ClassPathFileSystem and LocalSyncableFileSystem. |
| * Tables are file names, directories and path patterns. This storage engine delegates to FSFormatEngines but shares |
| * references to the FileSystem configuration and path management. |
| */ |
| public class FileSystemPlugin extends AbstractStoragePlugin { |
| |
| private static final Logger logger = LoggerFactory.getLogger(FileSystemPlugin.class); |
| |
| /** |
| * org.apache.hadoop.io.compress library supports such codecs as Gzip and Bzip2 out of box. |
| * This list stores only codecs that are missing in Hadoop library. |
| */ |
| private static final List<String> ADDITIONAL_CODECS = Collections.singletonList( |
| ZipCodec.class.getCanonicalName()); |
| |
| private final FileSystemSchemaFactory schemaFactory; |
| private final FormatCreator formatCreator; |
| private final Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig; |
| private final FileSystemConfig config; |
| private final Configuration fsConf; |
| private final LogicalPlanPersistence lpPersistance; |
| |
| public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException { |
| super(context, name); |
| this.config = config; |
| this.lpPersistance = context.getLpPersistence(); |
| |
| try { |
| fsConf = new Configuration(); |
| Optional.ofNullable(config.getConfig()) |
| .ifPresent(c -> c.forEach(fsConf::set)); |
| |
| fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getConnection()); |
| fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName()); |
| fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName()); |
| |
| addCodecs(fsConf); |
| |
| if (isS3Connection(fsConf)) { |
| handleS3Credentials(fsConf); |
| } |
| |
| formatCreator = newFormatCreator(config, context, fsConf); |
| List<FormatMatcher> matchers = new ArrayList<>(); |
| formatPluginsByConfig = new HashMap<>(); |
| for (FormatPlugin p : formatCreator.getConfiguredFormatPlugins()) { |
| matchers.add(p.getMatcher()); |
| formatPluginsByConfig.put(p.getConfig(), p); |
| } |
| |
| boolean noWorkspace = config.getWorkspaces() == null || config.getWorkspaces().isEmpty(); |
| List<WorkspaceSchemaFactory> factories = new ArrayList<>(); |
| if (!noWorkspace) { |
| for (Map.Entry<String, WorkspaceConfig> space : config.getWorkspaces().entrySet()) { |
| factories.add(new WorkspaceSchemaFactory(this, space.getKey(), name, space.getValue(), matchers, context.getLpPersistence(), context.getClasspathScan())); |
| } |
| } |
| |
| // if the "default" workspace is not given add one. |
| if (noWorkspace || !config.getWorkspaces().containsKey(DEFAULT_WS_NAME)) { |
| factories.add(new WorkspaceSchemaFactory(this, DEFAULT_WS_NAME, name, WorkspaceConfig.DEFAULT, matchers, context.getLpPersistence(), context.getClasspathScan())); |
| } |
| |
| this.schemaFactory = new FileSystemSchemaFactory(name, factories); |
| } catch (IOException e) { |
| throw new ExecutionSetupException("Failure setting up file system plugin.", e); |
| } |
| } |
| |
| /** |
| * Merges codecs from configuration with the {@link #ADDITIONAL_CODECS} |
| * and updates configuration property. |
| * Drill built-in codecs are added at the beginning of the codecs string |
| * so config codecs can override Drill ones. |
| * |
| * @param conf Hadoop configuration |
| */ |
| private void addCodecs(Configuration conf) { |
| String confCodecs = conf.get(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY); |
| String builtInCodecs = String.join(",", ADDITIONAL_CODECS); |
| String newCodecs = Strings.isNullOrEmpty(confCodecs) |
| ? builtInCodecs |
| : builtInCodecs + ", " + confCodecs; |
| logger.trace("Codecs: {}", newCodecs); |
| conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, newCodecs); |
| } |
| |
| private boolean isS3Connection(Configuration conf) { |
| URI uri = FileSystem.getDefaultUri(conf); |
| return uri.getScheme().equals("s3a"); |
| } |
| |
| /** |
| * Retrieve secret and access keys from configured (with |
| * {@link org.apache.hadoop.security.alias.CredentialProviderFactory#CREDENTIAL_PROVIDER_PATH} property) |
| * credential providers and set it into {@code conf}. If provider path is not configured or credential |
| * is absent in providers, it will conditionally fallback to configuration setting. The fallback will occur unless |
| * {@link org.apache.hadoop.security.alias.CredentialProvider#CLEAR_TEXT_FALLBACK} is set to {@code false}. |
| * |
| * @param conf {@code Configuration} which will be updated with credentials from provider |
| * @throws IOException thrown if a credential cannot be retrieved from provider |
| */ |
| private void handleS3Credentials(Configuration conf) throws IOException { |
| String[] credentialKeys = {"fs.s3a.secret.key", "fs.s3a.access.key"}; |
| for (String key : credentialKeys) { |
| char[] credentialChars = conf.getPassword(key); |
| if (credentialChars == null) { |
| logger.warn("Property '{}' is absent.", key); |
| } else { |
| conf.set(key, String.valueOf(credentialChars)); |
| } |
| } |
| } |
| |
| /** |
| * Creates a new FormatCreator instance. |
| * |
| * To be used by subclasses to return custom formats if required. |
| * Note that this method is called by the constructor, which fields may not be initialized yet. |
| * |
| * @param config the plugin configuration |
| * @param context the drillbit context |
| * @return a new FormatCreator instance |
| */ |
| protected FormatCreator newFormatCreator(FileSystemConfig config, DrillbitContext context, Configuration fsConf) { |
| return new FormatCreator(context, fsConf, config, context.getClasspathScan()); |
| } |
| |
| @Override |
| public boolean supportsRead() { |
| return true; |
| } |
| |
| @Override |
| public StoragePluginConfig getConfig() { |
| return config; |
| } |
| |
| @Override |
| public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException { |
| return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options, null); |
| } |
| |
| @Override |
| public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException { |
| return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options, metadataProviderManager); |
| } |
| |
| @Override |
| public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException { |
| return getPhysicalScan(userName, selection, columns, null, null); |
| } |
| |
| @Override |
| public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException { |
| FormatSelection formatSelection = selection.getWith(lpPersistance, FormatSelection.class); |
| FormatPlugin plugin = getFormatPlugin(formatSelection.getFormat()); |
| return plugin.getGroupScan(userName, formatSelection.getSelection(), columns, options, metadataProviderManager); |
| } |
| |
| @Override |
| public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { |
| schemaFactory.registerSchemas(schemaConfig, parent); |
| } |
| |
| public FormatPlugin getFormatPlugin(String name) { |
| return formatCreator.getFormatPluginByName(name); |
| } |
| |
| /** |
| * If format plugin configuration is for named format plugin, will return format plugin from pre-loaded list by name. |
| * For other cases will try to find format plugin by its configuration, if not present will attempt to create one. |
| * |
| * @param config format plugin configuration |
| * @return format plugin for given configuration if found, null otherwise |
| */ |
| @Override |
| public FormatPlugin getFormatPlugin(FormatPluginConfig config) { |
| if (config instanceof NamedFormatPluginConfig) { |
| return formatCreator.getFormatPluginByName(((NamedFormatPluginConfig) config).name); |
| } |
| |
| FormatPlugin plugin = formatPluginsByConfig.get(config); |
| if (plugin == null) { |
| plugin = formatCreator.newFormatPlugin(config); |
| } |
| return plugin; |
| } |
| |
| @Override |
| public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) { |
| Builder<StoragePluginOptimizerRule> setBuilder = ImmutableSet.builder(); |
| for (FormatPlugin plugin : formatCreator.getConfiguredFormatPlugins()) { |
| Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules(); |
| if (rules != null && rules.size() > 0) { |
| setBuilder.addAll(rules); |
| } |
| } |
| return setBuilder.build(); |
| } |
| |
| public Configuration getFsConf() { |
| return new Configuration(fsConf); |
| } |
| } |