blob: 12e1bd0f28a66ad5112cb471dae108f8245fa5af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.file;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserException.Builder;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents;
import org.apache.drill.exec.physical.impl.scan.file.ImplicitColumnManager.ImplicitColumnOptions;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiatorImpl;
import org.apache.drill.exec.physical.impl.scan.framework.ShimBatchReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The file scan framework adds into the scan framework support for implicit
* reading from DFS splits (a file and a block). Since this framework is
* file-based, it also adds support for file metadata (AKA implicit columns.
* The file scan framework brings together a number of components:
* <ul>
* <li>The set of options defined by the base framework.</li>
* <li>The set of files and/or blocks to read.</li>
* <li>The file system configuration to use for working with the files
* or blocks.</li>
* <li>The factory class to create a reader for each of the files or blocks
* defined above. (Readers are created one-by-one as files are read.)</li>
* <li>Options as defined by the base class.</li>
* </ul>
* <p>
* The framework iterates over file descriptions, creating readers at the
* moment they are needed. This allows simpler logic because, at the point of
* reader creation, we have a file system, context and so on.
* <p>
* @See {AbstractScanFramework} for details.
*/
public class FileScanFramework extends ManagedScanFramework {
private static final Logger logger = LoggerFactory.getLogger(FileScanFramework.class);
/**
* The file schema negotiator adds no behavior at present, but is
* created as a placeholder anticipating the need for file-specific
* behavior later. Readers are expected to use an instance of this
* class so that their code need not change later if/when we add new
* methods. For example, perhaps we want to specify an assumed block
* size for S3 files, or want to specify behavior if the file no longer
* exists. Those are out of scope of this first round of changes which
* focus on schema.
*/
public interface FileSchemaNegotiator extends SchemaNegotiator {
/**
* Gives the Drill file system for this operator.
*/
DrillFileSystem fileSystem();
/**
* Describes the file split (path and block offset) for this scan.
*
* @return Hadoop file split object with the file path, block
* offset, and length.
*/
FileSplit split();
}
/**
* Implementation of the file-level schema negotiator. At present, no
* file-specific features exist. This class shows, however, where we would
* add such features.
*/
public static class FileSchemaNegotiatorImpl extends SchemaNegotiatorImpl
implements FileSchemaNegotiator {
private final FileSplit split;
public FileSchemaNegotiatorImpl(FileScanFramework framework) {
super(framework);
this.split = framework.currentSplit;
context = new FileRowSetContext(parentErrorContext(), split);
}
@Override
public DrillFileSystem fileSystem() {
return ((FileScanFramework) framework).dfs;
}
@Override
public FileSplit split() { return split; }
}
public static class FileRowSetContext extends ChildErrorContext {
private final FileSplit split;
public FileRowSetContext(CustomErrorContext parent, FileSplit split) {
super(parent);
this.split = split;
}
@Override
public void addContext(Builder builder) {
super.addContext(builder);
builder.addContext("File:", Path.getPathWithoutSchemeAndAuthority(split.getPath()).toString());
if (split.getStart() != 0) {
builder.addContext("Offset:", split.getStart());
}
}
}
/**
* Options for a file-based scan.
*/
public static class FileScanBuilder extends ScanFrameworkBuilder {
private List<? extends FileWork> files;
private Configuration fsConf;
private final ImplicitColumnOptions metadataOptions = new ImplicitColumnOptions();
public void setFileSystemConfig(Configuration fsConf) {
this.fsConf = fsConf;
}
public void setFiles(List<? extends FileWork> files) {
this.files = files;
}
public ImplicitColumnOptions implicitColumnOptions() { return metadataOptions; }
@Override
public ScanOperatorEvents buildEvents() {
return new FileScanFramework(this);
}
}
/**
* Iterates over the splits for the present scan. For each, creates a
* new reader. The file framework passes the file split (and the Drill
* file system) in via the schema negotiator at open time. This protocol
* makes clear that the constructor for the reader should do nothing;
* work should be done in the open() call.
*/
public abstract static class FileReaderFactory implements ReaderFactory {
private FileScanFramework fileFramework;
@Override
public void bind(ManagedScanFramework baseFramework) {
this.fileFramework = (FileScanFramework) baseFramework;
}
@Override
public ManagedReader<? extends SchemaNegotiator> next() {
if (fileFramework.nextSplit() == null) {
return null;
}
return newReader();
}
public CustomErrorContext errorContext() {
return fileFramework == null ? null : fileFramework.errorContext();
}
public abstract ManagedReader<? extends FileSchemaNegotiator> newReader();
/**
* @return FileScanFramework or empty object in case it is not binded yet with {@link #bind(ManagedScanFramework)}
*/
protected Optional<FileScanFramework> fileFramework() {
return Optional.ofNullable(fileFramework);
}
}
private ImplicitColumnManager metadataManager;
private DrillFileSystem dfs;
private final List<FileSplit> splits = new ArrayList<>();
private Iterator<FileSplit> splitIter;
private FileSplit currentSplit;
public FileScanFramework(FileScanBuilder builder) {
super(builder);
assert builder.files != null;
assert builder.fsConf != null;
}
public FileScanBuilder options() {
return (FileScanBuilder) builder;
}
@Override
protected void configure() {
super.configure();
FileScanBuilder options = options();
// Create the Drill file system.
try {
dfs = context.newFileSystem(options.fsConf);
} catch (IOException e) {
throw UserException.dataReadError(e)
.addContext("Failed to create FileSystem")
.build(logger);
}
// Prepare the list of files. We need the list of paths up
// front to compute the maximum partition. Then, we need to
// iterate over the splits to create readers on demand.
List<Path> paths = new ArrayList<>();
for (FileWork work : options.files) {
Path path = dfs.makeQualified(work.getPath());
paths.add(path);
FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
splits.add(split);
}
splitIter = splits.iterator();
// Create the metadata manager to handle file metadata columns
// (so-called implicit columns and partition columns.)
options.implicitColumnOptions().setFiles(paths);
metadataManager = new ImplicitColumnManager(
context.getFragmentContext().getOptions(),
options.implicitColumnOptions(),
dfs);
builder.withImplicitColumns(metadataManager);
}
protected FileSplit nextSplit() {
if (! splitIter.hasNext()) {
currentSplit = null;
return null;
}
currentSplit = splitIter.next();
// Tell the metadata manager about the current file so it can
// populate the metadata columns, if requested.
metadataManager.startFile(currentSplit.getPath());
return currentSplit;
}
@Override
protected SchemaNegotiatorImpl newNegotiator() {
return new FileSchemaNegotiatorImpl(this);
}
@Override
public boolean open(ShimBatchReader shimBatchReader) {
try {
return super.open(shimBatchReader);
} catch (UserException e) {
throw e;
} catch (Exception e) {
throw UserException.executionError(e)
.addContext("File", currentSplit.getPath().toString())
.build(logger);
}
}
public DrillFileSystem fileSystem() {
return dfs;
}
}