blob: 4da644a0f21f573a70cd8ff60215b39f18710041 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.io;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory;
import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.XZInputStreamFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.LocatedFileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The base class for {@link RichInputFormat}s that read from files. For specific input types the
* {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented.
* Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to
* change the life cycle behavior.
*
* <p>After the {@link #open(FileInputSplit)} method completed, the file input data is available
* from the {@link #stream} field.</p>
*/
@Public
public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit> {
// -------------------------------------- Constants -------------------------------------------
private static final Logger LOG = LoggerFactory.getLogger(FileInputFormat.class);
private static final long serialVersionUID = 1L;
/**
* The fraction that the last split may be larger than the others.
*/
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
/**
* The timeout (in milliseconds) to wait for a filesystem stream to respond.
*/
private static long DEFAULT_OPENING_TIMEOUT;
/**
* A mapping of file extensions to decompression algorithms based on DEFLATE. Such compressions lead to
* unsplittable files.
*/
protected static final Map<String, InflaterInputStreamFactory<?>> INFLATER_INPUT_STREAM_FACTORIES =
new HashMap<>();
/**
* The splitLength is set to -1L for reading the whole split.
*/
protected static final long READ_WHOLE_SPLIT_FLAG = -1L;
static {
initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
initDefaultInflaterInputStreamFactories();
}
/**
* Initialize defaults for input format. Needs to be a static method because it is configured for local
* cluster execution, see LocalFlinkMiniCluster.
* @param configuration The configuration to load defaults from
*/
private static void initDefaultsFromConfiguration(Configuration configuration) {
final long to = configuration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
if (to < 0) {
LOG.error("Invalid timeout value for filesystem stream opening: " + to + ". Using default value of " +
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
DEFAULT_OPENING_TIMEOUT = ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT;
} else if (to == 0) {
DEFAULT_OPENING_TIMEOUT = 300000; // 5 minutes
} else {
DEFAULT_OPENING_TIMEOUT = to;
}
}
private static void initDefaultInflaterInputStreamFactories() {
InflaterInputStreamFactory<?>[] defaultFactories = {
DeflateInflaterInputStreamFactory.getInstance(),
GzipInflaterInputStreamFactory.getInstance(),
Bzip2InputStreamFactory.getInstance(),
XZInputStreamFactory.getInstance(),
};
for (InflaterInputStreamFactory<?> inputStreamFactory : defaultFactories) {
for (String fileExtension : inputStreamFactory.getCommonFileExtensions()) {
registerInflaterInputStreamFactory(fileExtension, inputStreamFactory);
}
}
}
/**
* Registers a decompression algorithm through a {@link org.apache.flink.api.common.io.compression.InflaterInputStreamFactory}
* with a file extension for transparent decompression.
* @param fileExtension of the compressed files
* @param factory to create an {@link java.util.zip.InflaterInputStream} that handles the decompression format
*/
public static void registerInflaterInputStreamFactory(String fileExtension, InflaterInputStreamFactory<?> factory) {
synchronized (INFLATER_INPUT_STREAM_FACTORIES) {
if (INFLATER_INPUT_STREAM_FACTORIES.put(fileExtension, factory) != null) {
LOG.warn("Overwriting an existing decompression algorithm for \"{}\" files.", fileExtension);
}
}
}
protected static InflaterInputStreamFactory<?> getInflaterInputStreamFactory(String fileExtension) {
synchronized (INFLATER_INPUT_STREAM_FACTORIES) {
return INFLATER_INPUT_STREAM_FACTORIES.get(fileExtension);
}
}
/**
* Returns the extension of a file name (!= a path).
* @return the extension of the file name or {@code null} if there is no extension.
*/
protected static String extractFileExtension(String fileName) {
checkNotNull(fileName);
int lastPeriodIndex = fileName.lastIndexOf('.');
if (lastPeriodIndex < 0){
return null;
} else {
return fileName.substring(lastPeriodIndex + 1);
}
}
// --------------------------------------------------------------------------------------------
// Variables for internal operation.
// They are all transient, because we do not want them so be serialized
// --------------------------------------------------------------------------------------------
/**
* The input stream reading from the input file.
*/
protected transient FSDataInputStream stream;
/**
* The start of the split that this parallel instance must consume.
*/
protected transient long splitStart;
/**
* The length of the split that this parallel instance must consume.
*/
protected transient long splitLength;
/**
* The current split that this parallel instance must consume.
*/
protected transient FileInputSplit currentSplit;
// --------------------------------------------------------------------------------------------
// The configuration parameters. Configured on the instance and serialized to be shipped.
// --------------------------------------------------------------------------------------------
/**
* The path to the file that contains the input.
*
* @deprecated Please override {@link FileInputFormat#supportsMultiPaths()} and
* use {@link FileInputFormat#getFilePaths()} and {@link FileInputFormat#setFilePaths(Path...)}.
*/
@Deprecated
protected Path filePath;
/**
* The list of paths to files and directories that contain the input.
*/
private Path[] filePaths;
/**
* The minimal split size, set by the configure() method.
*/
protected long minSplitSize = 0;
/**
* The desired number of splits, as set by the configure() method.
*/
protected int numSplits = -1;
/**
* Stream opening timeout.
*/
protected long openTimeout = DEFAULT_OPENING_TIMEOUT;
/**
* Some file input formats are not splittable on a block level (avro, deflate)
* Therefore, the FileInputFormat can only read whole files.
*/
protected volatile boolean unsplittable = false;
/**
* The flag to specify whether recursive traversal of the input directory
* structure is enabled.
*/
protected boolean enumerateNestedFiles = false;
/**
* Files filter for determining what files/directories should be included.
*/
private FilePathFilter filesFilter = new GlobFilePathFilter();
/**
* thread pool size for split provider.
*/
private int splitProviderThreadPoolSize = Runtime.getRuntime().availableProcessors();
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
public FileInputFormat() {}
protected FileInputFormat(Path filePath) {
if (filePath != null) {
setFilePath(filePath);
}
}
// --------------------------------------------------------------------------------------------
// Getters/setters for the configurable parameters
// --------------------------------------------------------------------------------------------
/**
*
* @return The thread pool size when generate splits.
*/
public int getSplitProviderThreadPoolSize() {
return splitProviderThreadPoolSize;
}
/**
* Set the thread pool size when generate splits.
*
* @param splitProviderThreadPoolSize The thread pool size when generate splits.
*/
public void setSplitProviderThreadPoolSize(int splitProviderThreadPoolSize) {
this.splitProviderThreadPoolSize = splitProviderThreadPoolSize;
}
/**
*
* @return The path of the file to read.
*
* @deprecated Please use getFilePaths() instead.
*/
@Deprecated
public Path getFilePath() {
if (supportsMultiPaths()) {
if (this.filePaths == null || this.filePaths.length == 0) {
return null;
} else if (this.filePaths.length == 1) {
return this.filePaths[0];
} else {
throw new UnsupportedOperationException(
"FileInputFormat is configured with multiple paths. Use getFilePaths() instead.");
}
} else {
return filePath;
}
}
/**
* Returns the paths of all files to be read by the FileInputFormat.
*
* @return The list of all paths to read.
*/
public Path[] getFilePaths() {
if (supportsMultiPaths()) {
if (this.filePaths == null) {
return new Path[0];
}
return this.filePaths;
} else {
if (this.filePath == null) {
return new Path[0];
}
return new Path[] {filePath};
}
}
public void setFilePath(String filePath) {
if (filePath == null) {
throw new IllegalArgumentException("File path cannot be null.");
}
// TODO The job-submission web interface passes empty args (and thus empty
// paths) to compute the preview graph. The following is a workaround for
// this situation and we should fix this.
// comment (Stephan Ewen) this should be no longer relevant with the current Java/Scala APIs.
if (filePath.isEmpty()) {
setFilePath(new Path());
return;
}
try {
this.setFilePath(new Path(filePath));
} catch (RuntimeException rex) {
throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage());
}
}
/**
* Sets a single path of a file to be read.
*
* @param filePath The path of the file to read.
*/
public void setFilePath(Path filePath) {
if (filePath == null) {
throw new IllegalArgumentException("File path must not be null.");
}
setFilePaths(filePath);
}
/**
* Sets multiple paths of files to be read.
*
* @param filePaths The paths of the files to read.
*/
public void setFilePaths(String... filePaths) {
Path[] paths = new Path[filePaths.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(filePaths[i]);
}
setFilePaths(paths);
}
/**
* Sets multiple paths of files to be read.
*
* @param filePaths The paths of the files to read.
*/
public void setFilePaths(Path... filePaths) {
if (!supportsMultiPaths() && filePaths.length > 1) {
throw new UnsupportedOperationException(
"Multiple paths are not supported by this FileInputFormat.");
}
if (filePaths.length < 1) {
throw new IllegalArgumentException("At least one file path must be specified.");
}
if (filePaths.length == 1) {
// set for backwards compatibility
this.filePath = filePaths[0];
} else {
// clear file path in case it had been set before
this.filePath = null;
}
this.filePaths = filePaths;
}
public long getMinSplitSize() {
return minSplitSize;
}
public void setMinSplitSize(long minSplitSize) {
if (minSplitSize < 0) {
throw new IllegalArgumentException("The minimum split size cannot be negative.");
}
this.minSplitSize = minSplitSize;
}
public int getNumSplits() {
return numSplits;
}
public void setNumSplits(int numSplits) {
if (numSplits < -1 || numSplits == 0) {
throw new IllegalArgumentException("The desired number of splits must be positive or -1 (= don't care).");
}
this.numSplits = numSplits;
}
public long getOpenTimeout() {
return openTimeout;
}
public void setOpenTimeout(long openTimeout) {
if (openTimeout < 0) {
throw new IllegalArgumentException("The timeout for opening the input splits must be positive or zero (= infinite).");
}
this.openTimeout = openTimeout;
}
public void setNestedFileEnumeration(boolean enable) {
this.enumerateNestedFiles = enable;
}
public boolean getNestedFileEnumeration() {
return this.enumerateNestedFiles;
}
// --------------------------------------------------------------------------------------------
// Getting information about the split that is currently open
// --------------------------------------------------------------------------------------------
/**
* Gets the start of the current split.
*
* @return The start of the split.
*/
public long getSplitStart() {
return splitStart;
}
/**
* Gets the length or remaining length of the current split.
*
* @return The length or remaining length of the current split.
*/
public long getSplitLength() {
return splitLength;
}
public void setFilesFilter(FilePathFilter filesFilter) {
this.filesFilter = Preconditions.checkNotNull(filesFilter, "Files filter should not be null");
}
// --------------------------------------------------------------------------------------------
// Pre-flight: Configuration, Splits, Sampling
// --------------------------------------------------------------------------------------------
/**
* Configures the file input format by reading the file path from the configuration.
*
* @see org.apache.flink.api.common.io.InputFormat#configure(org.apache.flink.configuration.Configuration)
*/
@Override
public void configure(Configuration parameters) {
if (getFilePaths().length == 0) {
// file path was not specified yet. Try to set it from the parameters.
String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
if (filePath == null) {
throw new IllegalArgumentException("File path was not specified in input format or configuration.");
} else {
setFilePath(filePath);
}
}
if (!this.enumerateNestedFiles) {
this.enumerateNestedFiles = parameters.getBoolean(ENUMERATE_NESTED_FILES_FLAG, false);
}
}
/**
* Obtains basic file statistics containing only file size. If the input is a directory, then the size is the sum of all contained files.
*
* @see org.apache.flink.api.common.io.InputFormat#getStatistics(org.apache.flink.api.common.io.statistics.BaseStatistics)
*/
@Override
public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
(FileBaseStatistics) cachedStats : null;
try {
return getFileStats(cachedFileStats, getFilePaths(), new ArrayList<>(getFilePaths().length));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics for paths '" + Arrays.toString(getFilePaths()) + "' due to an io error: "
+ ioex.getMessage());
}
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problem while getting the file statistics for paths '" + Arrays.toString(getFilePaths()) + "': "
+ t.getMessage(), t);
}
}
// no statistics available
return null;
}
protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path[] filePaths, ArrayList<FileStatus> files) throws IOException {
long totalLength = 0;
long latestModTime = 0;
for (Path path : filePaths) {
final FileSystem fs = FileSystem.get(path.toUri());
final FileBaseStatistics stats = getFileStats(cachedStats, path, fs, files);
if (stats.getTotalInputSize() == BaseStatistics.SIZE_UNKNOWN) {
totalLength = BaseStatistics.SIZE_UNKNOWN;
} else if (totalLength != BaseStatistics.SIZE_UNKNOWN) {
totalLength += stats.getTotalInputSize();
}
latestModTime = Math.max(latestModTime, stats.getLastModificationTime());
}
// check whether the cached statistics are still valid, if we have any
if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
return cachedStats;
}
return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
}
protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path filePath, FileSystem fs, ArrayList<FileStatus> files) throws IOException {
// get the file info and check whether the cached statistics are still valid.
final FileStatus file = fs.getFileStatus(filePath);
long totalLength = 0;
// enumerate all files
if (file.isDir()) {
totalLength += addFilesInDir(file.getPath(), files, false);
} else {
files.add(file);
testForUnsplittable(file);
totalLength += file.getLen();
}
// check the modification time stamp
long latestModTime = 0;
for (FileStatus f : files) {
latestModTime = Math.max(f.getModificationTime(), latestModTime);
}
// check whether the cached statistics are still valid, if we have any
if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
return cachedStats;
}
// sanity check
if (totalLength <= 0) {
totalLength = BaseStatistics.SIZE_UNKNOWN;
}
return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
}
@Override
public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits) {
return new LocatableInputSplitAssigner(splits);
}
/**
* Computes the input splits for the file. By default, one file block is one split. If more splits
* are requested than blocks are available, then a split may be a fraction of a block and splits may cross
* block boundaries.
*
* @param minNumSplits The minimum desired number of file splits.
* @return The computed file splits.
*
* @see org.apache.flink.api.common.io.InputFormat#createInputSplits(int)
*/
@Override
public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
if (minNumSplits < 1) {
throw new IllegalArgumentException("Number of input splits has to be at least 1.");
}
final ExecutorService executorService = Executors.newFixedThreadPool(splitProviderThreadPoolSize, new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(@Nonnull Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("SplitProvider-" + (count++));
return thread;
}
});
try {
LOG.info("Start to get all files");
final Path[] paths = getFilePaths();
// get all the files that are involved in the splits
final BlockingQueue<LocatedFileStatus> files = new LinkedBlockingQueue<>();
getFileStatus(paths, files, executorService);
LOG.info("Start to create all file splits");
final FileInputSplit[] inputSplits = createFileInputSplits(minNumSplits, files, executorService);
LOG.info("Finish create input splits");
return inputSplits;
} catch (Exception e) {
throw new IOException(e);
} finally {
executorService.shutdownNow();
}
}
protected void getFileStatus(
Path[] paths,
BlockingQueue<LocatedFileStatus> files,
ExecutorService executorService
) throws InterruptedException, IOException, ExecutionException {
final BlockingQueue<Future> tasks = new LinkedBlockingQueue<>();
// trigger concurrent get
for (Path path : paths) {
final FileSystem fs = path.getFileSystem();
getFileStatusInternal(fs, path, files, tasks, executorService, true);
}
// wait all task finish
while(!tasks.isEmpty()) {
tasks.poll().get();
}
}
private void getFileStatusInternal(
FileSystem fs,
Path path,
BlockingQueue<LocatedFileStatus> files,
BlockingQueue<Future> tasks,
ExecutorService executorService,
boolean logExcludedFiles
) throws IOException {
LocatedFileStatus[] fileStatuses = fs.listLocatedStatus(path);
if (fileStatuses == null) {
throw new IOException(path + " not existed");
}
for (LocatedFileStatus fileStatus: fileStatuses) {
if (fileStatus.isDir()) {
if (enumerateNestedFiles && acceptFile(fileStatus)) {
tasks.add(executorService.submit(() -> {
try {
getFileStatusInternal(fs, fileStatus.getPath(), files, tasks, executorService, logExcludedFiles);
} catch (IOException e) {
LOG.error("Fail to get file status.", e);
throw new RuntimeException(e);
}
}));
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory " + fileStatus.getPath().toString() + " did not pass the file-filter and is excluded.");
}
}
} else { // !fileStatus.isDir()
if (acceptFile(fileStatus)) {
files.add(fileStatus);
testForUnsplittable(fileStatus);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory " + fileStatus.getPath().toString() + " did not pass the file-filter and is excluded.");
}
}
}
}
}
private FileInputSplit[] createFileInputSplits(
int minNumSplits,
BlockingQueue<LocatedFileStatus> files,
ExecutorService executorService
) throws ExecutionException, InterruptedException {
BlockingQueue<Future> tasks = new LinkedBlockingQueue<>();
AtomicInteger splitNumber = new AtomicInteger();
BlockingQueue<FileInputSplit> inputSplits = new LinkedBlockingQueue<>();
if (unsplittable) {
for (LocatedFileStatus file : files) {
tasks.add(executorService.submit(() ->{
try {
createSplitForUnSplittableFile(file, inputSplits, splitNumber);
} catch (IOException e) {
LOG.error("Fail to create split", e);
throw new RuntimeException(e);
}
}));
}
} else { // splittable
// take the desired number of splits into account
minNumSplits = Math.max(minNumSplits, this.numSplits);
long totalLength = 0;
for (FileStatus file: files) {
totalLength += file.getLen();
}
final long maxSplitSize = (minNumSplits < 1) ? Long.MAX_VALUE : (totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1));
// now that we have the files, generate the splits
for (LocatedFileStatus file : files) {
tasks.add(executorService.submit(() ->{
try {
createSplitForSplittableFile(file, inputSplits, maxSplitSize, splitNumber);
} catch (IOException e) {
LOG.error("Fail to create split", e);
throw new RuntimeException(e);
}
}));
}
}
// wait all task finish
while(!tasks.isEmpty()) {
tasks.poll().get();
}
return inputSplits.toArray(new FileInputSplit[0]);
}
private void createSplitForUnSplittableFile(
LocatedFileStatus file,
BlockingQueue<FileInputSplit> inputSplits,
AtomicInteger splitNumber) throws IOException {
final FileSystem fs = file.getPath().getFileSystem();
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
final Set<String> hosts = new HashSet<>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) {
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis = new FileInputSplit(splitNumber.incrementAndGet(), file.getPath(), 0, len,
hosts.toArray(new String[0]));
inputSplits.add(fis);
}
private void createSplitForSplittableFile(
LocatedFileStatus file,
BlockingQueue<FileInputSplit> inputSplits,
long maxSplitSize,
AtomicInteger splitNumber) throws IOException {
final long len = file.getLen();
final long blockSize = file.getBlockSize();
final long minSplitSize;
if (this.minSplitSize <= blockSize) {
minSplitSize = this.minSplitSize;
}
else {
if (LOG.isWarnEnabled()) {
LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " +
blockSize + ". Decreasing minimal split size to block size.");
}
minSplitSize = blockSize;
}
final long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
final long halfSplit = splitSize >>> 1;
final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);
if (len > 0) {
// get the block locations and make sure they are in order with respect to their offset
final BlockLocation[] blocks = file.getBlockLocation();
Arrays.sort(blocks);
int blockIndex = 0;
long position = 0;
long bytesUnassigned = len;
while (bytesUnassigned > maxBytesForLastSplit) {
// get the block containing the majority of the data
blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
// create a new split
FileInputSplit fis = new FileInputSplit(splitNumber.incrementAndGet(), file.getPath(), position, splitSize,
blocks[blockIndex].getHosts());
inputSplits.add(fis);
// adjust the positions
position += splitSize;
bytesUnassigned -= splitSize;
}
// assign the last split
if (bytesUnassigned > 0) {
blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
final FileInputSplit fis = new FileInputSplit(splitNumber.incrementAndGet(), file.getPath(), position,
bytesUnassigned, blocks[blockIndex].getHosts());
inputSplits.add(fis);
}
} else {
// special case with a file of zero bytes size
final FileSystem fs = file.getPath().getFileSystem();
final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, 0);
final String[] hosts = (blocks.length > 0) ? blocks[0].getHosts() : new String[0];
final FileInputSplit fis = new FileInputSplit(splitNumber.incrementAndGet(), file.getPath(), 0, 0, hosts);
inputSplits.add(fis);
}
}
/**
* Enumerate all files in the directory and recursive if enumerateNestedFiles is true.
* @return the total length of accepted files.
*/
private long addFilesInDir(Path path, List<FileStatus> files, boolean logExcludedFiles)
throws IOException {
final FileSystem fs = path.getFileSystem();
long length = 0;
for(FileStatus dir: fs.listStatus(path)) {
if (dir.isDir()) {
if (acceptFile(dir) && enumerateNestedFiles) {
length += addFilesInDir(dir.getPath(), files, logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded.");
}
}
}
else {
if(acceptFile(dir)) {
files.add(dir);
length += dir.getLen();
testForUnsplittable(dir);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded.");
}
}
}
}
return length;
}
protected boolean testForUnsplittable(FileStatus pathFile) {
if(getInflaterInputStreamFactory(pathFile.getPath()) != null) {
unsplittable = true;
return true;
}
return false;
}
private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {
String fileExtension = extractFileExtension(path.getName());
if (fileExtension != null) {
return getInflaterInputStreamFactory(fileExtension);
} else {
return null;
}
}
/**
* A simple hook to filter files and directories from the input.
* The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the
* same filters by default.
*
* @param fileStatus The file status to check.
* @return true, if the given file or directory is accepted
*/
public boolean acceptFile(FileStatus fileStatus) {
final String name = fileStatus.getPath().getName();
return !name.startsWith("_")
&& !name.startsWith(".")
&& !filesFilter.filterPath(fileStatus.getPath());
}
/**
* Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
* offset.
*
* @param blocks The different blocks of the file. Must be ordered by their offset.
* @param offset The offset of the position in the file.
* @param startIndex The earliest index to look at.
* @return The index of the block containing the given position.
*/
private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long halfSplitSize, int startIndex) {
// go over all indexes after the startIndex
for (int i = startIndex; i < blocks.length; i++) {
long blockStart = blocks[i].getOffset();
long blockEnd = blockStart + blocks[i].getLength();
if (offset >= blockStart && offset < blockEnd) {
// got the block where the split starts
// check if the next block contains more than this one does
if (i < blocks.length - 1 && blockEnd - offset < halfSplitSize) {
return i + 1;
} else {
return i;
}
}
}
throw new IllegalArgumentException("The given offset is not contained in the any block.");
}
// --------------------------------------------------------------------------------------------
/**
* Opens an input stream to the file defined in the input format.
* The stream is positioned at the beginning of the given split.
* <p>
* The stream is actually opened in an asynchronous thread to make sure any interruptions to the thread
* working on the input format do not reach the file system.
*/
@Override
public void open(FileInputSplit fileSplit) throws IOException {
this.currentSplit = fileSplit;
this.splitStart = fileSplit.getStart();
this.splitLength = fileSplit.getLength();
if (LOG.isDebugEnabled()) {
LOG.debug("Opening input split " + fileSplit.getPath() + " [" + this.splitStart + "," + this.splitLength + "]");
}
// open the split in an asynchronous thread
final InputSplitOpenThread isot = new InputSplitOpenThread(fileSplit, this.openTimeout);
isot.start();
try {
this.stream = isot.waitForCompletion();
this.stream = decorateInputStream(this.stream, fileSplit);
}
catch (Throwable t) {
throw new IOException("Error opening the Input Split " + fileSplit.getPath() +
" [" + splitStart + "," + splitLength + "]: " + t.getMessage(), t);
}
// get FSDataInputStream
if (this.splitStart != 0) {
this.stream.seek(this.splitStart);
}
}
/**
* This method allows to wrap/decorate the raw {@link FSDataInputStream} for a certain file split, e.g., for decoding.
* When overriding this method, also consider adapting {@link FileInputFormat#testForUnsplittable} if your
* stream decoration renders the input file unsplittable. Also consider calling existing superclass implementations.
*
* @param inputStream is the input stream to decorated
* @param fileSplit is the file split for which the input stream shall be decorated
* @return the decorated input stream
* @throws Throwable if the decoration fails
* @see org.apache.flink.api.common.io.InputStreamFSInputWrapper
*/
protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable {
// Wrap stream in a extracting (decompressing) stream if file ends with a known compression file extension.
InflaterInputStreamFactory<?> inflaterInputStreamFactory = getInflaterInputStreamFactory(fileSplit.getPath());
if (inflaterInputStreamFactory != null) {
return new InputStreamFSInputWrapper(inflaterInputStreamFactory.create(stream));
}
return inputStream;
}
/**
* Closes the file input stream of the input format.
*/
@Override
public void close() throws IOException {
if (this.stream != null) {
// close input stream
this.stream.close();
stream = null;
}
}
/**
* Override this method to supports multiple paths.
* When this method will be removed, all FileInputFormats have to support multiple paths.
*
* @return True if the FileInputFormat supports multiple paths, false otherwise.
*
* @deprecated Will be removed for Flink 2.0.
*/
@Deprecated
public boolean supportsMultiPaths() {
return false;
}
public String toString() {
return getFilePaths() == null || getFilePaths().length == 0 ?
"File Input (unknown file)" :
"File Input (" + Arrays.toString(this.getFilePaths()) + ')';
}
// ============================================================================================
/**
* Encapsulation of the basic statistics the optimizer obtains about a file. Contained are the size of the file
* and the average bytes of a single record. The statistics also have a time-stamp that records the modification
* time of the file and indicates as such for which time the statistics were valid.
*/
public static class FileBaseStatistics implements BaseStatistics {
protected final long fileModTime; // timestamp of the last modification
protected final long fileSize; // size of the file(s) in bytes
protected final float avgBytesPerRecord; // the average number of bytes for a record
/**
* Creates a new statistics object.
*
* @param fileModTime
* The timestamp of the latest modification of any of the involved files.
* @param fileSize
* The size of the file, in bytes. <code>-1</code>, if unknown.
* @param avgBytesPerRecord
* The average number of byte in a record, or <code>-1.0f</code>, if unknown.
*/
public FileBaseStatistics(long fileModTime, long fileSize, float avgBytesPerRecord) {
this.fileModTime = fileModTime;
this.fileSize = fileSize;
this.avgBytesPerRecord = avgBytesPerRecord;
}
/**
* Gets the timestamp of the last modification.
*
* @return The timestamp of the last modification.
*/
public long getLastModificationTime() {
return fileModTime;
}
/**
* Gets the file size.
*
* @return The fileSize.
* @see org.apache.flink.api.common.io.statistics.BaseStatistics#getTotalInputSize()
*/
@Override
public long getTotalInputSize() {
return this.fileSize;
}
/**
* Gets the estimates number of records in the file, computed as the file size divided by the
* average record width, rounded up.
*
* @return The estimated number of records in the file.
* @see org.apache.flink.api.common.io.statistics.BaseStatistics#getNumberOfRecords()
*/
@Override
public long getNumberOfRecords() {
return (this.fileSize == SIZE_UNKNOWN || this.avgBytesPerRecord == AVG_RECORD_BYTES_UNKNOWN) ?
NUM_RECORDS_UNKNOWN : (long) Math.ceil(this.fileSize / this.avgBytesPerRecord);
}
/**
* Gets the estimated average number of bytes per record.
*
* @return The average number of bytes per record.
* @see org.apache.flink.api.common.io.statistics.BaseStatistics#getAverageRecordWidth()
*/
@Override
public float getAverageRecordWidth() {
return this.avgBytesPerRecord;
}
@Override
public String toString() {
return "size=" + this.fileSize + ", recWidth=" + this.avgBytesPerRecord + ", modAt=" + this.fileModTime;
}
}
// ============================================================================================
/**
* Obtains a DataInputStream in an thread that is not interrupted.
* This is a necessary hack around the problem that the HDFS client is very sensitive to InterruptedExceptions.
*/
public static class InputSplitOpenThread extends Thread {
private final FileInputSplit split;
private final long timeout;
private volatile FSDataInputStream fdis;
private volatile Throwable error;
private volatile boolean aborted;
public InputSplitOpenThread(FileInputSplit split, long timeout) {
super("Transient InputSplit Opener");
setDaemon(true);
this.split = split;
this.timeout = timeout;
}
@Override
public void run() {
try {
final FileSystem fs = FileSystem.get(this.split.getPath().toUri());
this.fdis = fs.open(this.split.getPath());
// check for canceling and close the stream in that case, because no one will obtain it
if (this.aborted) {
final FSDataInputStream f = this.fdis;
this.fdis = null;
f.close();
}
}
catch (Throwable t) {
this.error = t;
}
}
public FSDataInputStream waitForCompletion() throws Throwable {
final long start = System.currentTimeMillis();
long remaining = this.timeout;
do {
try {
// wait for the task completion
this.join(remaining);
}
catch (InterruptedException iex) {
// we were canceled, so abort the procedure
abortWait();
throw iex;
}
}
while (this.error == null && this.fdis == null &&
(remaining = this.timeout + start - System.currentTimeMillis()) > 0);
if (this.error != null) {
throw this.error;
}
if (this.fdis != null) {
return this.fdis;
} else {
// double-check that the stream has not been set by now. we don't know here whether
// a) the opener thread recognized the canceling and closed the stream
// b) the flag was set such that the stream did not see it and we have a valid stream
// In any case, close the stream and throw an exception.
abortWait();
final boolean stillAlive = this.isAlive();
final StringBuilder bld = new StringBuilder(256);
for (StackTraceElement e : this.getStackTrace()) {
bld.append("\tat ").append(e.toString()).append('\n');
}
throw new IOException("Input opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") +
" alive. Stack of split open thread:\n" + bld.toString());
}
}
/**
* Double checked procedure setting the abort flag and closing the stream.
*/
private void abortWait() {
this.aborted = true;
final FSDataInputStream inStream = this.fdis;
this.fdis = null;
if (inStream != null) {
try {
inStream.close();
} catch (Throwable t) {}
}
}
}
// ============================================================================================
// Parameterization via configuration
// ============================================================================================
// ------------------------------------- Config Keys ------------------------------------------
/**
* The config parameter which defines the input file path.
*/
private static final String FILE_PARAMETER_KEY = "input.file.path";
/**
* The config parameter which defines whether input directories are recursively traversed.
*/
public static final String ENUMERATE_NESTED_FILES_FLAG = "recursive.file.enumeration";
}