blob: 522de9362f558fe70ef0b7af8bb2a7e5941f84d1 [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.io.fs;
import com.datatorrent.api.Context.CountersAggregator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.esotericsoftware.kryo.Kryo;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.validation.constraints.NotNull;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Input operator that reads files from a directory.
* <p/>
* Derived class defines how to read entries from the input stream and emit to the port.
* <p/>
* The directory scanning logic is pluggable to support custom directory layouts and naming schemes. The default
* implementation scans a single directory.
* <p/>
* Fault tolerant by tracking previously read files and current offset as part of checkpoint state. In case of failure
* the operator will skip files that were already processed and fast forward to the offset of the current file.
* <p/>
* Supports partitioning and dynamic changes to number of partitions through property {@link #partitionCount}. The
* directory scanner is responsible to only accept the files that belong to a partition.
* <p/>
* This class supports retrying of failed files by putting them into failed list, and retrying them after pending
* files are processed. Retrying is disabled when maxRetryCount is set to zero.
*
* @since 1.0.2
*/
public abstract class AbstractFSDirectoryInputOperator<T> implements InputOperator, Partitioner<AbstractFSDirectoryInputOperator<T>>, StatsListener
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractFSDirectoryInputOperator.class);
@NotNull
protected String directory;
@NotNull
protected DirectoryScanner scanner = new DirectoryScanner();
protected int scanIntervalMillis = 5000;
protected int offset;
protected String currentFile;
protected Set<String> processedFiles = new HashSet<String>();
protected int emitBatchSize = 1000;
protected int currentPartitions = 1 ;
protected int partitionCount = 1;
private int retryCount = 0;
private int maxRetryCount = 5;
transient protected int skipCount = 0;
private transient OperatorContext context;
protected long globalNumberOfFailures = 0;
protected long localNumberOfFailures = 0;
protected long globalNumberOfRetries = 0;
protected long localNumberOfRetries = 0;
private transient int globalProcessedFileCount = 0;
private transient int localProcessedFileCount = 0;
/**
* Class representing failed file, When read fails on a file in middle, then the file is
* added to failedList along with last read offset.
* The files from failedList will be processed after all pendingFiles are processed, but
* before checking for new files.
* failed file is retried for maxRetryCount number of times, after that the file is
* ignored.
*/
protected static class FailedFile {
String path;
int offset;
int retryCount;
long lastFailedTime;
/* For kryo serialization */
protected FailedFile() {}
protected FailedFile(String path, int offset) {
this.path = path;
this.offset = offset;
this.retryCount = 0;
}
protected FailedFile(String path, int offset, int retryCount) {
this.path = path;
this.offset = offset;
this.retryCount = retryCount;
}
@Override
public String toString()
{
return "FailedFile[" +
"path='" + path + '\'' +
", offset=" + offset +
", retryCount=" + retryCount +
", lastFailedTime=" + lastFailedTime +
']';
}
}
public final static class FileCounters implements Serializable
{
public int globalProcessedFiles;
public int localProcessedFiles;
public long globalNumberOfFailures;
public long localNumberOfFailures;
public long globalNumberOfRetries;
public long localNumberOfRetries;
public long pendingFiles;
public FileCounters()
{
this.globalProcessedFiles = 0;
this.localProcessedFiles = 0;
this.globalNumberOfFailures = 0;
this.localNumberOfFailures = 0;
this.globalNumberOfRetries = 0;
this.localNumberOfRetries = 0;
this.pendingFiles = 0;
}
public FileCounters(int globalProcessedFiles,
int localProcessedFiles,
long globalNumberOfFailures,
long localNumberOfFailures,
long globalNumberOfRetries,
long localNumberOfRetries,
long pendingFiles)
{
this.globalProcessedFiles = globalProcessedFiles;
this.localProcessedFiles = localProcessedFiles;
this.globalNumberOfFailures = globalNumberOfFailures;
this.localNumberOfFailures = localNumberOfFailures;
this.globalNumberOfRetries = globalNumberOfRetries;
this.localNumberOfRetries = localNumberOfRetries;
this.pendingFiles = pendingFiles;
}
public void addL(FileCounters fileCounters)
{
this.localProcessedFiles += fileCounters.localProcessedFiles;
this.localNumberOfFailures += fileCounters.localNumberOfFailures;
this.localNumberOfRetries += fileCounters.localNumberOfRetries;
this.pendingFiles += fileCounters.pendingFiles;
}
}
public static class FileCountersAggregator implements CountersAggregator,
Serializable
{
@Override
public Object aggregate(Collection<?> countersList)
{
FileCounters totalFileCounters = new FileCounters();
for(Object fileCounters: countersList) {
totalFileCounters.addL((FileCounters) fileCounters);
}
return totalFileCounters;
}
}
protected long lastRepartition = 0;
private transient boolean emit = true;
protected boolean idempotentEmit = false;
/* List of unfinished files */
protected Queue<FailedFile> unfinishedFiles = new LinkedList<FailedFile>();
/* List of failed file */
protected Queue<FailedFile> failedFiles = new LinkedList<FailedFile>();
protected transient FileSystem fs;
protected transient Configuration configuration;
protected transient long lastScanMillis;
protected transient Path filePath;
protected transient InputStream inputStream;
protected Set<String> pendingFiles = new LinkedHashSet<String>();
public String getDirectory()
{
return directory;
}
public void setDirectory(String directory)
{
this.directory = directory;
}
public DirectoryScanner getScanner()
{
return scanner;
}
public void setScanner(DirectoryScanner scanner)
{
this.scanner = scanner;
}
public int getScanIntervalMillis()
{
return scanIntervalMillis;
}
public void setScanIntervalMillis(int scanIntervalMillis)
{
this.scanIntervalMillis = scanIntervalMillis;
}
public int getEmitBatchSize()
{
return emitBatchSize;
}
public void setEmitBatchSize(int emitBatchSize)
{
this.emitBatchSize = emitBatchSize;
}
public void setIdempotentEmit(boolean idempotentEmit)
{
this.idempotentEmit = idempotentEmit;
}
public boolean isIdempotentEmit()
{
return idempotentEmit;
}
public int getPartitionCount()
{
return partitionCount;
}
public void setPartitionCount(int requiredPartitions)
{
this.partitionCount = requiredPartitions;
}
public int getCurrentPartitions()
{
return currentPartitions;
}
@Override
public void setup(OperatorContext context)
{
globalProcessedFileCount = processedFiles.size();
this.context = context;
try {
filePath = new Path(directory);
configuration = new Configuration();
fs = FileSystem.newInstance(filePath.toUri(), configuration);
if(!unfinishedFiles.isEmpty()) {
retryFailedFile(unfinishedFiles.poll());
skipCount = 0;
} else if(!failedFiles.isEmpty()) {
retryFailedFile(failedFiles.poll());
skipCount = 0;
}
long startTime = System.currentTimeMillis();
LOG.info("Continue reading {} from index {} time={}", currentFile, offset, startTime);
// fast forward to previous offset
if(inputStream != null) {
for(int index = 0; index < offset; index++) {
readEntity();
}
}
LOG.info("Read offset={} records in setup time={}", offset, System.currentTimeMillis() - startTime);
}
catch (IOException ex) {
failureHandling(ex);
}
}
@Override
public void teardown()
{
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(fs);
}
@Override
public void beginWindow(long windowId)
{
emit = true;
}
@Override
public void endWindow()
{
if(context != null) {
long pendingFileCount = ((long) pendingFiles.size()) +
((long) failedFiles.size()) +
((long) unfinishedFiles.size());
if(currentFile != null) {
pendingFileCount++;
}
context.setCounters(new FileCounters(processedFileCount,
pendingFileCount,
numberOfFailures,
numberOfRetries));
}
}
@Override
public void emitTuples()
{
//emit will be true if the operator is not idempotent. If the operator is
//idempotent then emit will be true the first time emitTuples is called
//within a window and false the other times emit tuples is called within a
//window
if(emit)
{
if (inputStream == null) {
try {
if(!unfinishedFiles.isEmpty()) {
retryFailedFile(unfinishedFiles.poll());
}
else if (!pendingFiles.isEmpty()) {
String newPathString = pendingFiles.iterator().next();
pendingFiles.remove(newPathString);
this.inputStream = openFile(new Path(newPathString));
}
else if (!failedFiles.isEmpty()) {
retryFailedFile(failedFiles.poll());
}
else {
scanDirectory();
}
} catch (IOException ex) {
failureHandling(ex);
}
}
if (inputStream != null) {
try {
int counterForTuple = 0;
while (counterForTuple++ < emitBatchSize) {
T line = readEntity();
if (line == null) {
LOG.info("done reading file ({} entries).", offset);
closeFile(inputStream);
break;
}
// If skipCount is non zero, then failed file recovery is going on, skipCount is
// used to prevent already emitted records from being emitted again during recovery.
// When failed file is open, skipCount is set to the last read offset for that file.
//
if (skipCount == 0) {
offset++;
emit(line);
}
else
skipCount--;
}
} catch (IOException e) {
failureHandling(e);
}
}
//If the operator is idempotent, do nothing on other calls to emittuples
//within the same window
if(idempotentEmit)
{
emit = false;
}
}
}
protected void scanDirectory()
{
if(System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) {
Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
for(Path newPath : newPaths) {
String newPathString = newPath.toString();
pendingFiles.add(newPathString);
processedFiles.add(newPathString);
localProcessedFileCount++;
}
lastScanMillis = System.currentTimeMillis();
}
}
private void failureHandling(Exception e)
{
localNumberOfFailures++;
if(maxRetryCount <= 0) {
throw new RuntimeException(e);
}
LOG.error("FS reader error", e);
addToFailedList();
}
protected void addToFailedList() {
FailedFile ff = new FailedFile(currentFile, offset, retryCount);
try {
// try to close file
if (this.inputStream != null)
this.inputStream.close();
} catch(IOException e) {
localNumberOfFailures++;
LOG.error("Could not close input stream on: " + currentFile);
}
ff.retryCount ++;
ff.lastFailedTime = System.currentTimeMillis();
ff.offset = this.offset;
// Clear current file state.
this.currentFile = null;
this.inputStream = null;
this.offset = 0;
if (ff.retryCount > maxRetryCount)
return;
localNumberOfRetries++;
LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
failedFiles.add(ff);
}
protected InputStream retryFailedFile(FailedFile ff) throws IOException
{
LOG.info("retrying failed file {} offset {} retry {}", ff.path, ff.offset, ff.retryCount);
String path = ff.path;
this.inputStream = openFile(new Path(path));
this.offset = ff.offset;
this.retryCount = ff.retryCount;
this.skipCount = ff.offset;
return this.inputStream;
}
protected InputStream openFile(Path path) throws IOException
{
LOG.info("opening file {}", path);
InputStream input = fs.open(path);
currentFile = path.toString();
offset = 0;
retryCount = 0;
skipCount = 0;
return input;
}
protected void closeFile(InputStream is) throws IOException
{
LOG.info("closing file {} offset {}", currentFile, offset);
if (is != null)
is.close();
currentFile = null;
inputStream = null;
}
@Override
public Collection<Partition<AbstractFSDirectoryInputOperator<T>>> definePartitions(Collection<Partition<AbstractFSDirectoryInputOperator<T>>> partitions, int incrementalCapacity)
{
lastRepartition = System.currentTimeMillis();
int totalCount = computedNewPartitionCount(partitions, incrementalCapacity);
LOG.debug("Computed new partitions: {}", totalCount);
if (totalCount == partitions.size()) {
return partitions;
}
AbstractFSDirectoryInputOperator<T> oper = partitions.iterator().next().getPartitionedInstance();
long globalNumberOfRetries = oper.globalNumberOfRetries;
long globalNumberOfFailures = ;
long totalRetryCount = 0;
long totalFailureCount = 0;
/*
* Build collective state from all instances of the operator.
*/
Set<String> totalProcessedFiles = new HashSet<String>();
Set<FailedFile> currentFiles = new HashSet<FailedFile>();
List<DirectoryScanner> oldscanners = new LinkedList<DirectoryScanner>();
List<FailedFile> totalFailedFiles = new LinkedList<FailedFile>();
List<String> totalPendingFiles = new LinkedList<String>();
for(Partition<AbstractFSDirectoryInputOperator<T>> partition : partitions) {
AbstractFSDirectoryInputOperator<T> oper = partition.getPartitionedInstance();
totalProcessedFiles.addAll(oper.processedFiles);
totalFailedFiles.addAll(oper.failedFiles);
totalPendingFiles.addAll(oper.pendingFiles);
currentFiles.addAll(unfinishedFiles);
totalRetryCount += oper.localNumberOfRetries;
totalFailureCount += oper.localNumberOfFailures;
if (oper.currentFile != null)
currentFiles.add(new FailedFile(oper.currentFile, oper.offset));
oldscanners.add(oper.getScanner());
}
/*
* Create partitions of scanners, scanner's partition method will do state
* transfer for DirectoryScanner objects.
*/
List<DirectoryScanner> scanners = scanner.partition(totalCount, oldscanners);
Kryo kryo = new Kryo();
Collection<Partition<AbstractFSDirectoryInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount);
for (int i=0; i<scanners.size(); i++) {
AbstractFSDirectoryInputOperator<T> oper = kryo.copy(this);
DirectoryScanner scn = scanners.get(i);
oper.setScanner(scn);
// Do state transfer for processed files.
oper.processedFiles.addAll(totalProcessedFiles);
/* redistribute unfinished files properly */
oper.unfinishedFiles.clear();
oper.currentFile = null;
oper.offset = 0;
Iterator<FailedFile> unfinishedIter = currentFiles.iterator();
while(unfinishedIter.hasNext()) {
FailedFile unfinishedFile = unfinishedIter.next();
if (scn.acceptFile(unfinishedFile.path)) {
oper.unfinishedFiles.add(unfinishedFile);
unfinishedIter.remove();
}
}
/* transfer failed files */
oper.failedFiles.clear();
Iterator<FailedFile> iter = totalFailedFiles.iterator();
while (iter.hasNext()) {
FailedFile ff = iter.next();
if (scn.acceptFile(ff.path)) {
oper.failedFiles.add(ff);
iter.remove();
}
}
/* redistribute pending files properly */
oper.pendingFiles.clear();
Iterator<String> pendingFilesIterator = totalPendingFiles.iterator();
while(pendingFilesIterator.hasNext()) {
String pathString = pendingFilesIterator.next();
if(scn.acceptFile(pathString)) {
oper.pendingFiles.add(pathString);
pendingFilesIterator.remove();
}
}
newPartitions.add(new DefaultPartition<AbstractFSDirectoryInputOperator<T>>(oper));
}
LOG.info("definePartitions called returning {} partitions", newPartitions.size());
return newPartitions;
}
protected int computedNewPartitionCount(Collection<Partition<AbstractFSDirectoryInputOperator<T>>> partitions, int incrementalCapacity)
{
boolean isInitialParitition = partitions.iterator().next().getStats() == null;
if (isInitialParitition && partitionCount == 1) {
partitionCount = currentPartitions = partitions.size() + incrementalCapacity;
} else {
incrementalCapacity = partitionCount - currentPartitions;
}
int totalCount = partitions.size() + incrementalCapacity;
LOG.info("definePartitions trying to create {} partitions, current {} required {}", totalCount, partitionCount, currentPartitions);
return totalCount;
}
@Override
public void partitioned(Map<Integer, Partition<AbstractFSDirectoryInputOperator<T>>> partitions)
{
currentPartitions = partitions.size();
}
/**
* Read the next item from the stream. Depending on the type of stream, this could be a byte array, line or object.
* Upon return of null, the stream will be considered fully consumed.
*/
abstract protected T readEntity() throws IOException;
/**
* Emit the tuple on the port
* @param tuple
*/
abstract protected void emit(T tuple);
/**
* Repartition is required when number of partitions are not equal to required
* partitions.
*/
@Override
public Response processStats(BatchedOperatorStats batchedOperatorStats)
{
Response res = new Response();
res.repartitionRequired = false;
if (currentPartitions != partitionCount) {
LOG.info("processStats: trying repartition of input operator current {} required {}", currentPartitions, partitionCount);
res.repartitionRequired = true;
}
return res;
}
public int getMaxRetryCount()
{
return maxRetryCount;
}
public void setMaxRetryCount(int maxRetryCount)
{
this.maxRetryCount = maxRetryCount;
}
public static class DirectoryScanner implements Serializable
{
private static final long serialVersionUID = 4535844463258899929L;
private String filePatternRegexp;
private transient Pattern regex = null;
private int partitionIndex;
private int partitionCount;
private final transient HashSet<String> ignoredFiles = new HashSet<String>();
public String getFilePatternRegexp()
{
return filePatternRegexp;
}
public void setFilePatternRegexp(String filePatternRegexp)
{
this.filePatternRegexp = filePatternRegexp;
this.regex = null;
}
public Pattern getRegex() {
if (this.regex == null && this.filePatternRegexp != null)
this.regex = Pattern.compile(this.filePatternRegexp);
return this.regex;
}
public int getPartitionCount() {
return partitionCount;
}
public int getPartitionIndex() {
return partitionIndex;
}
public LinkedHashSet<Path> scan(FileSystem fs, Path filePath, Set<String> consumedFiles)
{
if (filePatternRegexp != null && this.regex == null) {
this.regex = Pattern.compile(this.filePatternRegexp);
}
LinkedHashSet<Path> pathSet = Sets.newLinkedHashSet();
try {
LOG.debug("Scanning {} with pattern {}", filePath, this.filePatternRegexp);
FileStatus[] files = fs.listStatus(filePath);
for (FileStatus status : files)
{
Path path = status.getPath();
String filePathStr = path.toString();
if (consumedFiles.contains(filePathStr)) {
continue;
}
if (ignoredFiles.contains(filePathStr)) {
continue;
}
if (acceptFile(filePathStr)) {
LOG.debug("Found {}", filePathStr);
pathSet.add(path);
} else {
// don't look at it again
ignoredFiles.add(filePathStr);
}
}
} catch (FileNotFoundException e) {
LOG.warn("Failed to list directory {}", filePath, e);
} catch (Exception e) {
throw new RuntimeException(e);
}
return pathSet;
}
protected boolean acceptFile(String filePathStr)
{
if (partitionCount > 1) {
int i = filePathStr.hashCode();
int mod = i % partitionCount;
if (mod < 0) {
mod += partitionCount;
}
LOG.debug("partition {} {} {} {}", partitionIndex, filePathStr, i, mod);
if (mod != partitionIndex) {
return false;
}
}
if (filePatternRegexp != null && this.regex == null) {
regex = Pattern.compile(this.filePatternRegexp);
}
if (regex != null)
{
Matcher matcher = regex.matcher(filePathStr);
if (!matcher.matches()) {
return false;
}
}
return true;
}
public List<DirectoryScanner> partition(int count)
{
ArrayList<DirectoryScanner> partitions = Lists.newArrayListWithExpectedSize(count);
for (int i=0; i<count; i++) {
partitions.add(this.createPartition(i, count));
}
return partitions;
}
public List<DirectoryScanner> partition(int count , Collection<DirectoryScanner> scanners) {
return partition(count);
}
protected DirectoryScanner createPartition(int partitionIndex, int partitionCount)
{
DirectoryScanner that = new DirectoryScanner();
that.filePatternRegexp = this.filePatternRegexp;
that.regex = this.regex;
that.partitionIndex = partitionIndex;
that.partitionCount = partitionCount;
return that;
}
@Override
public String toString()
{
return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + " partitionIndex=" +
partitionIndex + " partitionCount=" + partitionCount + "]";
}
}
}