blob: af931f91ce8707f1ae7662a4ccc8274720ff1665 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.dfs.easy;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.StatisticsRecordWriter;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.shaded.guava.com.google.common.base.Functions;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for file readers.
* <p>
* Provides a bridge between the legacy {@link RecordReader}-style
* readers and the newer {@link ManagedReader} style. Over time, split the
* class, or provide a cleaner way to handle the differences.
*
* @param <T> the format plugin config for this reader
*/
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
private static final Logger logger = LoggerFactory.getLogger(EasyFormatPlugin.class);
/**
* Defines the static, programmer-defined options for this plugin. These
* options are attributes of how the plugin works. The plugin config,
* defined in the class definition, provides user-defined options that can
* vary across uses of the plugin.
*/
public static class EasyFormatConfig {
public BasicFormatMatcher matcher;
public boolean readable = true;
public boolean writable;
public boolean blockSplittable;
public boolean compressible;
public Configuration fsConf;
public List<String> extensions;
public String defaultName;
// Config options that, prior to Drill 1.15, required the plugin to
// override methods. Moving forward, plugins should be migrated to
// use this simpler form. New plugins should use these options
// instead of overriding methods.
public boolean supportsLimitPushdown;
public boolean supportsProjectPushdown;
public boolean supportsFileImplicitColumns = true;
public boolean supportsAutoPartitioning;
public boolean supportsStatistics;
public int readerOperatorType = -1;
public int writerOperatorType = -1;
/**
* Choose whether to use the "traditional" or "enhanced" reader
* structure. Can also be selected at runtime by overriding
* {@link #useEnhancedScan(OptionManager)}.
*/
public boolean useEnhancedScan;
}
/**
* Builds the readers row-set based scan operator.
*/
private static class EasyReaderFactory extends FileReaderFactory {
private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
private final EasySubScan scan;
private final FragmentContext context;
public EasyReaderFactory(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
EasySubScan scan, FragmentContext context) {
this.plugin = plugin;
this.scan = scan;
this.context = context;
}
@Override
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
try {
return plugin.newBatchReader(scan, context.getOptions());
} catch (ExecutionSetupException e) {
throw UserException.validationError(e)
.addContext("Reason", "Failed to create a batch reader")
.addContext(errorContext())
.build(logger);
}
}
}
private final String name;
private final EasyFormatConfig easyConfig;
private final DrillbitContext context;
private final StoragePluginConfig storageConfig;
protected final T formatConfig;
/**
* Legacy constructor.
*/
protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable,
boolean blockSplittable,
boolean compressible, List<String> extensions, String defaultName) {
this.name = name == null ? defaultName : name;
easyConfig = new EasyFormatConfig();
easyConfig.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
easyConfig.readable = readable;
easyConfig.writable = writable;
this.context = context;
easyConfig.blockSplittable = blockSplittable;
easyConfig.compressible = compressible;
easyConfig.fsConf = fsConf;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
}
/**
* Revised constructor in which settings are gathered into a configuration object.
*
* @param name name of the plugin
* @param config configuration options for this plugin which determine
* developer-defined runtime behavior
* @param context the global server-wide Drillbit context
* @param storageConfig the configuration for the storage plugin that owns this
* format plugin
* @param formatConfig the Jackson-serialized format configuration as created
* by the user in the Drill web console. Holds user-defined options
*/
protected EasyFormatPlugin(String name, EasyFormatConfig config, DrillbitContext context,
StoragePluginConfig storageConfig, T formatConfig) {
this.name = name;
this.easyConfig = config;
this.context = context;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
if (easyConfig.matcher == null) {
easyConfig.matcher = new BasicFormatMatcher(this,
easyConfig.fsConf, easyConfig.extensions,
easyConfig.compressible);
}
}
@Override
public Configuration getFsConf() { return easyConfig.fsConf; }
@Override
public DrillbitContext getContext() { return context; }
public EasyFormatConfig easyConfig() { return easyConfig; }
@Override
public String getName() { return name; }
/**
* Does this plugin support pushing the limit down to the batch reader? If so, then
* the reader itself should have logic to stop reading the file as soon as the limit has been
* reached. It makes the most sense to do this with file formats that have consistent schemata
* that are identified at the first row. CSV for example. If the user only wants 100 rows, it
* does not make sense to read the entire file.
*/
public boolean supportsLimitPushdown() { return easyConfig.supportsLimitPushdown; }
/**
* Does this plugin support projection push down? That is, can the reader
* itself handle the tasks of projecting table columns, creating null
* columns for missing table columns, and so on?
*
* @return {@code true} if the plugin supports projection push-down,
* {@code false} if Drill should do the task by adding a project operator
*/
public boolean supportsPushDown() { return easyConfig.supportsProjectPushdown; }
/**
* Whether this format plugin supports implicit file columns.
*
* @return {@code true} if the plugin supports implicit file columns,
* {@code false} otherwise
*/
public boolean supportsFileImplicitColumns() {
return easyConfig.supportsFileImplicitColumns;
}
/**
* Whether or not you can split the format based on blocks within file
* boundaries. If not, the simple format engine will only split on file
* boundaries.
*
* @return {@code true} if splitable.
*/
public boolean isBlockSplittable() { return easyConfig.blockSplittable; }
/**
* Indicates whether or not this format could also be in a compression
* container (for example: csv.gz versus csv). If this format uses its own
* internal compression scheme, such as Parquet does, then this should return
* false.
*
* @return {@code true} if it is compressible
*/
public boolean isCompressible() { return easyConfig.compressible; }
/**
* Return a record reader for the specific file format, when using the original
* {@link ScanBatch} scanner.
* @param context fragment context
* @param dfs Drill file system
* @param fileWork metadata about the file to be scanned
* @param columns list of projected columns (or may just contain the wildcard)
* @param userName the name of the user running the query
* @return a record reader for this format
* @throws ExecutionSetupException for many reasons
*/
public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
List<SchemaPath> columns, String userName) throws ExecutionSetupException {
throw new ExecutionSetupException("Must implement getRecordReader() if using the legacy scanner.");
}
protected CloseableRecordBatch getReaderBatch(FragmentContext context,
EasySubScan scan) throws ExecutionSetupException {
if (useEnhancedScan(context.getOptions())) {
return buildScan(context, scan);
} else {
return buildScanBatch(context, scan);
}
}
/**
* Choose whether to use the enhanced scan based on the row set and scan
* framework, or the "traditional" ad-hoc structure based on ScanBatch.
* Normally set as a config option. Override this method if you want to
* make the choice based on a system/session option.
*
* @return true to use the enhanced scan framework, false for the
* traditional scan-batch framework
*/
protected boolean useEnhancedScan(OptionManager options) {
return easyConfig.useEnhancedScan;
}
/**
* Use the original scanner based on the {@link RecordReader} interface.
* Requires that the storage plugin roll its own solutions for null columns.
* Is not able to limit vector or batch sizes. Retained or backward
* compatibility with "classic" format plugins which have not yet been
* upgraded to use the new framework.
*/
private CloseableRecordBatch buildScanBatch(FragmentContext context,
EasySubScan scan) throws ExecutionSetupException {
final ColumnExplorer columnExplorer =
new ColumnExplorer(context.getOptions(), scan.getColumns());
if (! columnExplorer.isStarQuery()) {
scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
columnExplorer.getTableColumns(), scan.getSelectionRoot(),
scan.getPartitionDepth(), scan.getSchema(), scan.getMaxRecords());
scan.setOperatorId(scan.getOperatorId());
}
final OperatorContext oContext = context.newOperatorContext(scan);
final DrillFileSystem dfs;
try {
dfs = oContext.newFileSystem(easyConfig().fsConf);
} catch (final IOException e) {
throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
}
final List<RecordReader> readers = new LinkedList<>();
final List<Map<String, String>> implicitColumns = Lists.newArrayList();
Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
for (final FileWork work : scan.getWorkUnits()) {
final RecordReader recordReader = getRecordReader(
context, dfs, work, scan.getColumns(), scan.getUserName());
readers.add(recordReader);
final List<String> partitionValues = ColumnExplorer.listPartitionValues(
work.getPath(), scan.getSelectionRoot(), false);
final Map<String, String> implicitValues = columnExplorer.populateColumns(
work.getPath(), partitionValues, supportsFileImplicitColumns, dfs);
implicitColumns.add(implicitValues);
if (implicitValues.size() > mapWithMaxColumns.size()) {
mapWithMaxColumns = implicitValues;
}
}
// all readers should have the same number of implicit columns, add missing ones with value null
final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant(null));
for (final Map<String, String> map : implicitColumns) {
map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
}
return new ScanBatch(context, oContext, readers, implicitColumns);
}
/**
* Revised scanner based on the revised {@link org.apache.drill.exec.physical.resultSet.ResultSetLoader}
* and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader} classes.
* Handles most projection tasks automatically. Able to limit
* vector and batch sizes. Use this for new format plugins.
*/
private CloseableRecordBatch buildScan(FragmentContext context,
EasySubScan scan) throws ExecutionSetupException {
try {
final FileScanBuilder builder = frameworkBuilder(context.getOptions(), scan);
// Add batch reader, if none specified
if (builder.readerFactory() == null) {
builder.setReaderFactory(new EasyReaderFactory(this, scan, context));
}
return builder.buildScanOperator(context, scan);
} catch (final UserException e) {
// Rethrow user exceptions directly
throw e;
} catch (final Throwable e) {
// Wrap all others
throw new ExecutionSetupException(e);
}
}
/**
* Initialize the scan framework builder with standard options.
* Call this from the plugin-specific
* {@link #frameworkBuilder(OptionManager, EasySubScan)} method.
* The plugin can then customize/revise options as needed.
*
* @param builder the scan framework builder you create in the
* {@link #frameworkBuilder(OptionManager, EasySubScan)} method
* @param scan the physical scan operator definition passed to
* the {@link #frameworkBuilder(OptionManager, EasySubScan)} method
*/
protected void initScanBuilder(FileScanBuilder builder, EasySubScan scan) {
builder.projection(scan.getColumns());
builder.setUserName(scan.getUserName());
// Pass along the output schema, if any
builder.providedSchema(scan.getSchema());
// Pass along file path information
builder.setFileSystemConfig(easyConfig().fsConf);
builder.setFiles(scan.getWorkUnits());
final Path selectionRoot = scan.getSelectionRoot();
if (selectionRoot != null) {
builder.implicitColumnOptions().setSelectionRoot(selectionRoot);
builder.implicitColumnOptions().setPartitionDepth(scan.getPartitionDepth());
}
// Additional error context to identify this plugin
builder.errorContext(
currentBuilder -> currentBuilder
.addContext("Format plugin", easyConfig.defaultName)
.addContext("Format plugin", EasyFormatPlugin.this.getClass().getSimpleName())
.addContext("Plugin config name", getName()));
}
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
EasySubScan scan, OptionManager options) throws ExecutionSetupException {
throw new ExecutionSetupException("Must implement newBatchReader() if using the enhanced framework.");
}
/**
* Create the plugin-specific framework that manages the scan. The framework
* creates batch readers one by one for each file or block. It defines semantic
* rules for projection. It handles "early" or "late" schema readers. A typical
* framework builds on standardized frameworks for files in general or text
* files in particular.
*
* @param scan the physical operation definition for the scan operation. Contains
* one or more files to read. (The Easy format plugin works only for files.)
* @return the scan framework which orchestrates the scan operation across
* potentially many files
* @throws ExecutionSetupException for all setup failures
*/
protected FileScanBuilder frameworkBuilder(
OptionManager options, EasySubScan scan) throws ExecutionSetupException {
throw new ExecutionSetupException("Must implement frameworkBuilder() if using the enhanced framework.");
}
public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter writer) {
return false;
}
public RecordWriter getRecordWriter(FragmentContext context,
EasyWriter writer) throws IOException {
throw new UnsupportedOperationException("unimplemented");
}
public StatisticsRecordWriter getStatisticsRecordWriter(FragmentContext context,
EasyWriter writer) throws IOException {
return null;
}
public CloseableRecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer)
throws ExecutionSetupException {
try {
if (isStatisticsRecordWriter(context, writer)) {
return new StatisticsWriterRecordBatch(writer, incoming, context, getStatisticsRecordWriter(context, writer));
} else {
return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer));
}
} catch(IOException e) {
throw new ExecutionSetupException(String.format("Failed to create the WriterRecordBatch. %s", e.getMessage()), e);
}
}
protected ScanStats getScanStats(PlannerSettings settings, EasyGroupScan scan) {
long data = 0;
for (CompleteFileWork work : scan.getWorkIterable()) {
data += work.getTotalBytes();
}
final long estRowCount = data / 1024;
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, estRowCount, 1, data);
}
@Override
public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) {
return new EasyWriter(child, location, partitionColumns, this);
}
@Override
public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
throws IOException {
return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, null);
}
@Override
public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException {
return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, metadataProviderManager);
}
@Override
public T getConfig() { return formatConfig; }
@Override
public StoragePluginConfig getStorageConfig() { return storageConfig; }
@Override
public boolean supportsRead() { return easyConfig.readable; }
@Override
public boolean supportsWrite() { return easyConfig.writable; }
@Override
public boolean supportsAutoPartitioning() { return easyConfig.supportsAutoPartitioning; }
@Override
public FormatMatcher getMatcher() { return easyConfig.matcher; }
@Override
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
return ImmutableSet.of();
}
public int getReaderOperatorType() { return easyConfig.readerOperatorType; }
public int getWriterOperatorType() { return easyConfig.writerOperatorType; }
@Override
public boolean supportsStatistics() { return easyConfig.supportsStatistics; }
@Override
public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
throw new UnsupportedOperationException("unimplemented");
}
@Override
public void writeStatistics(TableStatistics statistics, FileSystem fs,
Path statsTablePath) throws IOException {
throw new UnsupportedOperationException("unimplemented");
}
}