| /* |
| * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.datatorrent.lib.io.fs; |
| |
| import java.io.*; |
| import java.util.*; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import javax.validation.constraints.NotNull; |
| |
| import com.esotericsoftware.kryo.Kryo; |
| import com.esotericsoftware.kryo.io.Input; |
| import com.esotericsoftware.kryo.io.Output; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.lang.mutable.MutableLong; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| |
| import com.datatorrent.lib.counters.BasicCounters; |
| |
| import com.datatorrent.api.Context.CountersAggregator; |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DefaultPartition; |
| import com.datatorrent.api.InputOperator; |
| import com.datatorrent.api.Partitioner; |
| import com.datatorrent.api.StatsListener; |
| |
| /** |
| * This is the base implementation of a directory input operator, which scans a directory for files. |
| * Files are then read and split into tuples, which are emitted. |
| * Subclasses should implement the methods required to read and emit tuples from files. |
| * <p> |
| * Derived class defines how to read entries from the input stream and emit to the port. |
| * </p> |
| * <p> |
| * The directory scanning logic is pluggable to support custom directory layouts and naming schemes. The default |
| * implementation scans a single directory. |
| * </p> |
| * <p> |
| * Fault tolerant by tracking previously read files and current offset as part of checkpoint state. In case of failure |
| * the operator will skip files that were already processed and fast forward to the offset of the current file. |
| * </p> |
| * <p> |
| * Supports partitioning and dynamic changes to number of partitions through property {@link #partitionCount}. The |
| * directory scanner is responsible to only accept the files that belong to a partition. |
| * </p> |
| * <p> |
| * This class supports retrying of failed files by putting them into failed list, and retrying them after pending |
| * files are processed. Retrying is disabled when maxRetryCount is set to zero. |
| * </p> |
| * @displayName FS Directory Scan Input |
| * @category Input |
| * @tags fs, file, input operator |
| * |
| * @param <T> The type of the object that this input operator reads. |
| * @since 1.0.2 |
| */ |
| public abstract class AbstractFSDirectoryInputOperator<T> implements InputOperator, Partitioner<AbstractFSDirectoryInputOperator<T>>, StatsListener |
| { |
| private static final Logger LOG = LoggerFactory.getLogger(AbstractFSDirectoryInputOperator.class); |
| |
| @NotNull |
| protected String directory; |
| @NotNull |
| protected DirectoryScanner scanner = new DirectoryScanner(); |
| protected int scanIntervalMillis = 5000; |
| protected int offset; |
| protected String currentFile; |
| protected Set<String> processedFiles = new HashSet<String>(); |
| protected int emitBatchSize = 1000; |
| protected int currentPartitions = 1 ; |
| protected int partitionCount = 1; |
| private int retryCount = 0; |
| private int maxRetryCount = 5; |
| transient protected int skipCount = 0; |
| private transient OperatorContext context; |
| |
| private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class); |
| protected MutableLong globalNumberOfFailures = new MutableLong(); |
| protected MutableLong localNumberOfFailures = new MutableLong(); |
| protected MutableLong globalNumberOfRetries = new MutableLong(); |
| protected MutableLong localNumberOfRetries = new MutableLong(); |
| private transient MutableLong globalProcessedFileCount = new MutableLong(); |
| private transient MutableLong localProcessedFileCount = new MutableLong(); |
| private transient MutableLong pendingFileCount = new MutableLong(); |
| |
| /** |
| * Class representing failed file, When read fails on a file in middle, then the file is |
| * added to failedList along with last read offset. |
| * The files from failedList will be processed after all pendingFiles are processed, but |
| * before checking for new files. |
| * failed file is retried for maxRetryCount number of times, after that the file is |
| * ignored. |
| */ |
| protected static class FailedFile { |
| String path; |
| int offset; |
| int retryCount; |
| long lastFailedTime; |
| |
| /* For kryo serialization */ |
| protected FailedFile() {} |
| |
| protected FailedFile(String path, int offset) { |
| this.path = path; |
| this.offset = offset; |
| this.retryCount = 0; |
| } |
| |
| protected FailedFile(String path, int offset, int retryCount) { |
| this.path = path; |
| this.offset = offset; |
| this.retryCount = retryCount; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "FailedFile[" + |
| "path='" + path + '\'' + |
| ", offset=" + offset + |
| ", retryCount=" + retryCount + |
| ", lastFailedTime=" + lastFailedTime + |
| ']'; |
| } |
| } |
| |
| /** |
| * Enums for aggregated counters about file processing. |
| * <p/> |
| * Contains the enums representing number of files processed, number of |
| * pending files, number of file errors, and number of retries. |
| * <p/> |
| * @since 1.0.4 |
| */ |
| public static enum AggregatedFileCounters |
| { |
| /** |
| * The number of files processed by the logical operator up until this. |
| * point in time |
| */ |
| PROCESSED_FILES, |
| /** |
| * The number of files waiting to be processed by the logical operator. |
| */ |
| PENDING_FILES, |
| /** |
| * The number of IO errors encountered by the logical operator. |
| */ |
| NUMBER_OF_ERRORS, |
| /** |
| * The number of times the logical operator tried to resume reading a file |
| * on which it encountered an error. |
| */ |
| NUMBER_OF_RETRIES |
| } |
| |
| /** |
| * The enums used to track statistics about the |
| * AbstractFSDirectoryInputOperator. |
| */ |
| protected static enum FileCounters |
| { |
| /** |
| * The number of files that were in the processed list up to the last |
| * repartition of the operator. |
| */ |
| GLOBAL_PROCESSED_FILES, |
| /** |
| * The number of files added to the processed list by the physical operator |
| * since the last repartition. |
| */ |
| LOCAL_PROCESSED_FILES, |
| /** |
| * The number of io errors encountered up to the last repartition of the |
| * operator. |
| */ |
| GLOBAL_NUMBER_OF_FAILURES, |
| /** |
| * The number of failures encountered by the physical operator since the |
| * last repartition. |
| */ |
| LOCAL_NUMBER_OF_FAILURES, |
| /** |
| * The number of retries encountered by the physical operator up to the last |
| * repartition. |
| */ |
| GLOBAL_NUMBER_OF_RETRIES, |
| /** |
| * The number of retries encountered by the physical operator since the last |
| * repartition. |
| */ |
| LOCAL_NUMBER_OF_RETRIES, |
| /** |
| * The number of files pending on the physical operator. |
| */ |
| PENDING_FILES |
| } |
| |
| /** |
| * A counter aggregator for AbstractFSDirectoryInputOperator. |
| * <p/> |
| * In order for this CountersAggregator to be used on your operator, you must |
| * set it within your application like this. |
| * <p/> |
| * <code> |
| * dag.getOperatorMeta("fsinputoperator").getAttributes().put(OperatorContext.COUNTERS_AGGREGATOR, |
| * new AbstractFSDirectoryInputOperator.FileCountersAggregator()); |
| * </code> |
| * <p/> |
| * The value of the aggregated counter can be retrieved by issuing a get |
| * request to the host running your gateway like this. |
| * <p/> |
| * <code> |
| * http://<your host>:9090/ws/v1/applications/<your app id>/logicalPlan/operators/<operatorname>/aggregation |
| * </code> |
| * <p/> |
| * @since 1.0.4 |
| */ |
| public final static class FileCountersAggregator implements CountersAggregator, Serializable |
| { |
| private static final long serialVersionUID = 201409041428L; |
| MutableLong totalLocalProcessedFiles = new MutableLong(); |
| MutableLong pendingFiles = new MutableLong(); |
| MutableLong totalLocalNumberOfFailures = new MutableLong(); |
| MutableLong totalLocalNumberOfRetries = new MutableLong(); |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public Object aggregate(Collection<?> countersList) |
| { |
| if(countersList.isEmpty()) { |
| return null; |
| } |
| |
| BasicCounters<MutableLong> tempFileCounters = (BasicCounters<MutableLong>) countersList.iterator().next(); |
| MutableLong globalProcessedFiles = tempFileCounters.getCounter(FileCounters.GLOBAL_PROCESSED_FILES); |
| MutableLong globalNumberOfFailures = tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES); |
| MutableLong globalNumberOfRetries = tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES); |
| totalLocalProcessedFiles.setValue(0); |
| pendingFiles.setValue(0); |
| totalLocalNumberOfFailures.setValue(0); |
| totalLocalNumberOfRetries.setValue(0); |
| |
| for(Object fileCounters: countersList) { |
| BasicCounters<MutableLong> basicFileCounters = (BasicCounters<MutableLong>) fileCounters; |
| totalLocalProcessedFiles.add(basicFileCounters.getCounter(FileCounters.LOCAL_PROCESSED_FILES)); |
| pendingFiles.add(basicFileCounters.getCounter(FileCounters.PENDING_FILES)); |
| totalLocalNumberOfFailures.add(basicFileCounters.getCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES)); |
| totalLocalNumberOfRetries.add(basicFileCounters.getCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES)); |
| } |
| |
| globalProcessedFiles.add(totalLocalProcessedFiles); |
| globalProcessedFiles.subtract(pendingFiles); |
| globalNumberOfFailures.add(totalLocalNumberOfFailures); |
| globalNumberOfRetries.add(totalLocalNumberOfRetries); |
| |
| BasicCounters<MutableLong> aggregatedCounters = new BasicCounters<MutableLong>(MutableLong.class); |
| aggregatedCounters.setCounter(AggregatedFileCounters.PROCESSED_FILES, globalProcessedFiles); |
| aggregatedCounters.setCounter(AggregatedFileCounters.PENDING_FILES, pendingFiles); |
| aggregatedCounters.setCounter(AggregatedFileCounters.NUMBER_OF_ERRORS, totalLocalNumberOfFailures); |
| aggregatedCounters.setCounter(AggregatedFileCounters.NUMBER_OF_RETRIES, totalLocalNumberOfRetries); |
| |
| return aggregatedCounters; |
| } |
| } |
| |
| protected long lastRepartition = 0; |
| /* List of unfinished files */ |
| protected Queue<FailedFile> unfinishedFiles = new LinkedList<FailedFile>(); |
| /* List of failed file */ |
| protected Queue<FailedFile> failedFiles = new LinkedList<FailedFile>(); |
| |
| protected transient FileSystem fs; |
| protected transient Configuration configuration; |
| protected transient long lastScanMillis; |
| protected transient Path filePath; |
| protected transient InputStream inputStream; |
| protected Set<String> pendingFiles = new LinkedHashSet<String>(); |
| |
| public String getDirectory() |
| { |
| return directory; |
| } |
| |
| public void setDirectory(String directory) |
| { |
| this.directory = directory; |
| } |
| |
| public DirectoryScanner getScanner() |
| { |
| return scanner; |
| } |
| |
| public void setScanner(DirectoryScanner scanner) |
| { |
| this.scanner = scanner; |
| } |
| |
| /** |
| * Returns the frequency with which new files are scanned for in milliseconds. |
| * @return The scan interval in milliseconds. |
| */ |
| public int getScanIntervalMillis() |
| { |
| return scanIntervalMillis; |
| } |
| |
| /** |
| * Sets the frequency with which new files are scanned for in milliseconds. |
| * @param scanIntervalMillis The scan interval in milliseconds. |
| */ |
| public void setScanIntervalMillis(int scanIntervalMillis) |
| { |
| this.scanIntervalMillis = scanIntervalMillis; |
| } |
| |
| /** |
| * Returns the number of tuples emitted in a batch. If the operator is |
| * idempotent then this is the number of tuples emitted in a window. |
| * @return The number of tuples emitted in a batch. |
| */ |
| public int getEmitBatchSize() |
| { |
| return emitBatchSize; |
| } |
| |
| /** |
| * Sets the number of tuples to emit in a batch. If the operator is |
| * idempotent then this is the number of tuples emitted in a window. |
| * @param emitBatchSize The number of tuples to emit in a batch. |
| */ |
| public void setEmitBatchSize(int emitBatchSize) |
| { |
| this.emitBatchSize = emitBatchSize; |
| } |
| |
| /** |
| * Returns the desired number of partitions. |
| * @return the desired number of partitions. |
| */ |
| public int getPartitionCount() |
| { |
| return partitionCount; |
| } |
| |
| /** |
| * Sets the desired number of partitions. |
| * @param requiredPartitions The desired number of partitions. |
| */ |
| public void setPartitionCount(int requiredPartitions) |
| { |
| this.partitionCount = requiredPartitions; |
| } |
| |
| /** |
| * Returns the current number of partitions for the operator. |
| * @return The current number of partitions for the operator. |
| */ |
| public int getCurrentPartitions() |
| { |
| return currentPartitions; |
| } |
| |
| @Override |
| public void setup(OperatorContext context) |
| { |
| globalProcessedFileCount.setValue(processedFiles.size()); |
| LOG.debug("Setup processed file count: {}", globalProcessedFileCount); |
| this.context = context; |
| |
| try { |
| filePath = new Path(directory); |
| configuration = new Configuration(); |
| fs = getFSInstance(); |
| if(!unfinishedFiles.isEmpty()) { |
| retryFailedFile(unfinishedFiles.poll()); |
| skipCount = 0; |
| } else if(!failedFiles.isEmpty()) { |
| retryFailedFile(failedFiles.poll()); |
| skipCount = 0; |
| } |
| long startTime = System.currentTimeMillis(); |
| LOG.info("Continue reading {} from index {} time={}", currentFile, offset, startTime); |
| // fast forward to previous offset |
| if(inputStream != null) { |
| for(int index = 0; index < offset; index++) { |
| readEntity(); |
| } |
| } |
| LOG.info("Read offset={} records in setup time={}", offset, System.currentTimeMillis() - startTime); |
| } |
| catch (IOException ex) { |
| failureHandling(ex); |
| } |
| |
| fileCounters.setCounter(FileCounters.GLOBAL_PROCESSED_FILES, |
| globalProcessedFileCount); |
| fileCounters.setCounter(FileCounters.LOCAL_PROCESSED_FILES, |
| localProcessedFileCount); |
| fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES, |
| globalNumberOfFailures); |
| fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES, |
| localNumberOfFailures); |
| fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES, |
| globalNumberOfRetries); |
| fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, |
| localNumberOfRetries); |
| fileCounters.setCounter(FileCounters.PENDING_FILES, |
| pendingFileCount); |
| |
| } |
| |
| /** |
| * Override this method to change the FileSystem instance that is used by the operator. |
| * |
| * @return A FileSystem object. |
| * @throws IOException |
| */ |
| protected FileSystem getFSInstance() throws IOException |
| { |
| return FileSystem.newInstance(filePath.toUri(), configuration); |
| } |
| |
| @Override |
| public void teardown() |
| { |
| IOException savedException = null; |
| boolean fileFailed = false; |
| |
| try { |
| if(inputStream != null) { |
| inputStream.close(); |
| } |
| } |
| catch (IOException ex) { |
| savedException = ex; |
| fileFailed = true; |
| } |
| |
| boolean fsFailed = false; |
| |
| try { |
| fs.close(); |
| } |
| catch (IOException ex) { |
| savedException = ex; |
| fsFailed = true; |
| } |
| |
| if(savedException != null) { |
| String errorMessage = ""; |
| |
| if(fileFailed) { |
| errorMessage += "Failed to close " + currentFile + ". "; |
| } |
| |
| if(fsFailed) { |
| errorMessage += "Failed to close filesystem."; |
| } |
| |
| throw new RuntimeException(errorMessage, savedException); |
| } |
| } |
| |
| @Override |
| public void beginWindow(long windowId) |
| { |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| if(context != null) { |
| pendingFileCount.setValue(pendingFiles.size() + |
| failedFiles.size() + |
| unfinishedFiles.size()); |
| |
| if(currentFile != null) { |
| pendingFileCount.increment(); |
| } |
| |
| context.setCounters(fileCounters); |
| } |
| } |
| |
| @Override |
| public void emitTuples() |
| { |
| if (inputStream == null) { |
| try { |
| if (!unfinishedFiles.isEmpty()) { |
| retryFailedFile(unfinishedFiles.poll()); |
| } |
| else if (!pendingFiles.isEmpty()) { |
| String newPathString = pendingFiles.iterator().next(); |
| pendingFiles.remove(newPathString); |
| this.inputStream = openFile(new Path(newPathString)); |
| } |
| else if (!failedFiles.isEmpty()) { |
| retryFailedFile(failedFiles.poll()); |
| } |
| else { |
| scanDirectory(); |
| } |
| } |
| catch (IOException ex) { |
| failureHandling(ex); |
| } |
| } |
| |
| if (inputStream != null) { |
| try { |
| int counterForTuple = 0; |
| while (counterForTuple++ < emitBatchSize) { |
| T line = readEntity(); |
| if (line == null) { |
| LOG.info("done reading file ({} entries).", offset); |
| closeFile(inputStream); |
| break; |
| } |
| |
| // If skipCount is non zero, then failed file recovery is going on, skipCount is |
| // used to prevent already emitted records from being emitted again during recovery. |
| // When failed file is open, skipCount is set to the last read offset for that file. |
| // |
| if (skipCount == 0) { |
| offset++; |
| emit(line); |
| } |
| else { |
| skipCount--; |
| } |
| } |
| } |
| catch (IOException e) { |
| failureHandling(e); |
| } |
| } |
| } |
| |
| /** |
| * Scans the directory for new files. |
| */ |
| protected void scanDirectory() |
| { |
| if(System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis) { |
| Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles); |
| |
| for(Path newPath : newPaths) { |
| String newPathString = newPath.toString(); |
| pendingFiles.add(newPathString); |
| processedFiles.add(newPathString); |
| localProcessedFileCount.increment(); |
| } |
| |
| lastScanMillis = System.currentTimeMillis(); |
| } |
| } |
| |
| /** |
| * Helper method for handling IOExceptions. |
| * @param e The caught IOException. |
| */ |
| private void failureHandling(Exception e) |
| { |
| localNumberOfFailures.increment(); |
| if(maxRetryCount <= 0) { |
| throw new RuntimeException(e); |
| } |
| LOG.error("FS reader error", e); |
| addToFailedList(); |
| } |
| |
| protected void addToFailedList() { |
| |
| FailedFile ff = new FailedFile(currentFile, offset, retryCount); |
| |
| try { |
| // try to close file |
| if (this.inputStream != null) |
| this.inputStream.close(); |
| } catch(IOException e) { |
| localNumberOfFailures.increment(); |
| LOG.error("Could not close input stream on: " + currentFile); |
| } |
| |
| ff.retryCount ++; |
| ff.lastFailedTime = System.currentTimeMillis(); |
| ff.offset = this.offset; |
| |
| // Clear current file state. |
| this.currentFile = null; |
| this.inputStream = null; |
| this.offset = 0; |
| |
| if (ff.retryCount > maxRetryCount) |
| return; |
| |
| localNumberOfRetries.increment(); |
| LOG.info("adding to failed list path {} offset {} retry {}", ff.path, ff.offset, ff.retryCount); |
| failedFiles.add(ff); |
| } |
| |
| protected InputStream retryFailedFile(FailedFile ff) throws IOException |
| { |
| LOG.info("retrying failed file {} offset {} retry {}", ff.path, ff.offset, ff.retryCount); |
| String path = ff.path; |
| this.inputStream = openFile(new Path(path)); |
| this.offset = ff.offset; |
| this.retryCount = ff.retryCount; |
| this.skipCount = ff.offset; |
| return this.inputStream; |
| } |
| |
| protected InputStream openFile(Path path) throws IOException |
| { |
| LOG.info("opening file {}", path); |
| InputStream input = fs.open(path); |
| currentFile = path.toString(); |
| offset = 0; |
| retryCount = 0; |
| skipCount = 0; |
| return input; |
| } |
| |
| protected void closeFile(InputStream is) throws IOException |
| { |
| LOG.info("closing file {} offset {}", currentFile, offset); |
| |
| if (is != null) |
| is.close(); |
| |
| currentFile = null; |
| inputStream = null; |
| } |
| |
| @Override |
| public Collection<Partition<AbstractFSDirectoryInputOperator<T>>> definePartitions(Collection<Partition<AbstractFSDirectoryInputOperator<T>>> partitions, int incrementalCapacity) |
| { |
| lastRepartition = System.currentTimeMillis(); |
| |
| int totalCount = computedNewPartitionCount(partitions, incrementalCapacity); |
| |
| LOG.debug("Computed new partitions: {}", totalCount); |
| |
| if (totalCount == partitions.size()) { |
| return partitions; |
| } |
| |
| AbstractFSDirectoryInputOperator<T> tempOperator = partitions.iterator().next().getPartitionedInstance(); |
| |
| MutableLong tempGlobalNumberOfRetries = tempOperator.globalNumberOfRetries; |
| MutableLong tempGlobalNumberOfFailures = tempOperator.globalNumberOfRetries; |
| |
| /* |
| * Build collective state from all instances of the operator. |
| */ |
| Set<String> totalProcessedFiles = Sets.newHashSet(); |
| Set<FailedFile> currentFiles = Sets.newHashSet(); |
| List<DirectoryScanner> oldscanners = Lists.newLinkedList(); |
| List<FailedFile> totalFailedFiles = Lists.newLinkedList(); |
| List<String> totalPendingFiles = Lists.newLinkedList(); |
| for(Partition<AbstractFSDirectoryInputOperator<T>> partition : partitions) { |
| AbstractFSDirectoryInputOperator<T> oper = partition.getPartitionedInstance(); |
| totalProcessedFiles.addAll(oper.processedFiles); |
| totalFailedFiles.addAll(oper.failedFiles); |
| totalPendingFiles.addAll(oper.pendingFiles); |
| currentFiles.addAll(unfinishedFiles); |
| tempGlobalNumberOfRetries.add(oper.localNumberOfRetries); |
| tempGlobalNumberOfFailures.add(oper.localNumberOfFailures); |
| if (oper.currentFile != null) |
| currentFiles.add(new FailedFile(oper.currentFile, oper.offset)); |
| oldscanners.add(oper.getScanner()); |
| } |
| |
| /* |
| * Create partitions of scanners, scanner's partition method will do state |
| * transfer for DirectoryScanner objects. |
| */ |
| List<DirectoryScanner> scanners = scanner.partition(totalCount, oldscanners); |
| |
| Kryo kryo = new Kryo(); |
| Collection<Partition<AbstractFSDirectoryInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount); |
| for (int i=0; i<scanners.size(); i++) { |
| |
| // Kryo.copy fails as it attempts to clone transient fields |
| ByteArrayOutputStream bos = new ByteArrayOutputStream(); |
| Output loutput = new Output(bos); |
| kryo.writeObject(loutput, this); |
| loutput.close(); |
| Input lInput = new Input(bos.toByteArray()); |
| @SuppressWarnings("unchecked") |
| AbstractFSDirectoryInputOperator<T> oper = kryo.readObject(lInput, this.getClass()); |
| lInput.close(); |
| |
| DirectoryScanner scn = scanners.get(i); |
| oper.setScanner(scn); |
| |
| // Do state transfer for processed files. |
| oper.processedFiles.addAll(totalProcessedFiles); |
| oper.globalNumberOfFailures = tempGlobalNumberOfRetries; |
| oper.localNumberOfFailures.setValue(0); |
| oper.globalNumberOfRetries = tempGlobalNumberOfFailures; |
| oper.localNumberOfRetries.setValue(0); |
| |
| /* redistribute unfinished files properly */ |
| oper.unfinishedFiles.clear(); |
| oper.currentFile = null; |
| oper.offset = 0; |
| Iterator<FailedFile> unfinishedIter = currentFiles.iterator(); |
| while(unfinishedIter.hasNext()) { |
| FailedFile unfinishedFile = unfinishedIter.next(); |
| if (scn.acceptFile(unfinishedFile.path)) { |
| oper.unfinishedFiles.add(unfinishedFile); |
| unfinishedIter.remove(); |
| } |
| } |
| |
| /* transfer failed files */ |
| oper.failedFiles.clear(); |
| Iterator<FailedFile> iter = totalFailedFiles.iterator(); |
| while (iter.hasNext()) { |
| FailedFile ff = iter.next(); |
| if (scn.acceptFile(ff.path)) { |
| oper.failedFiles.add(ff); |
| iter.remove(); |
| } |
| } |
| |
| /* redistribute pending files properly */ |
| oper.pendingFiles.clear(); |
| Iterator<String> pendingFilesIterator = totalPendingFiles.iterator(); |
| while(pendingFilesIterator.hasNext()) { |
| String pathString = pendingFilesIterator.next(); |
| if(scn.acceptFile(pathString)) { |
| oper.pendingFiles.add(pathString); |
| pendingFilesIterator.remove(); |
| } |
| } |
| newPartitions.add(new DefaultPartition<AbstractFSDirectoryInputOperator<T>>(oper)); |
| } |
| |
| LOG.info("definePartitions called returning {} partitions", newPartitions.size()); |
| return newPartitions; |
| } |
| |
| protected int computedNewPartitionCount(Collection<Partition<AbstractFSDirectoryInputOperator<T>>> partitions, int incrementalCapacity) |
| { |
| boolean isInitialParitition = partitions.iterator().next().getStats() == null; |
| |
| if (isInitialParitition && partitionCount == 1) { |
| partitionCount = currentPartitions = partitions.size() + incrementalCapacity; |
| } else { |
| incrementalCapacity = partitionCount - currentPartitions; |
| } |
| |
| int totalCount = partitions.size() + incrementalCapacity; |
| LOG.info("definePartitions trying to create {} partitions, current {} required {}", totalCount, partitionCount, currentPartitions); |
| return totalCount; |
| } |
| |
| @Override |
| public void partitioned(Map<Integer, Partition<AbstractFSDirectoryInputOperator<T>>> partitions) |
| { |
| currentPartitions = partitions.size(); |
| } |
| |
| /** |
| * Read the next item from the stream. Depending on the type of stream, this could be a byte array, line or object. |
| * Upon return of null, the stream will be considered fully consumed. |
| * @throws IOException |
| * @return Depending on the type of stream an object is returned. When null is returned the stream is consumed. |
| */ |
| abstract protected T readEntity() throws IOException; |
| |
| /** |
| * Emit the tuple on the port |
| * @param tuple |
| */ |
| abstract protected void emit(T tuple); |
| |
| |
| /** |
| * Repartition is required when number of partitions are not equal to required |
| * partitions. |
| * @param batchedOperatorStats the stats to use when repartitioning. |
| * @return Returns the stats listener response. |
| */ |
| @Override |
| public Response processStats(BatchedOperatorStats batchedOperatorStats) |
| { |
| Response res = new Response(); |
| res.repartitionRequired = false; |
| if (currentPartitions != partitionCount) { |
| LOG.info("processStats: trying repartition of input operator current {} required {}", currentPartitions, partitionCount); |
| res.repartitionRequired = true; |
| } |
| return res; |
| } |
| |
| /** |
| * Returns the maximum number of times the operator will attempt to process |
| * a file on which it encounters an error. |
| * @return The maximum number of times the operator will attempt to process a |
| * file on which it encounters an error. |
| */ |
| public int getMaxRetryCount() |
| { |
| return maxRetryCount; |
| } |
| |
| /** |
| * Sets the maximum number of times the operator will attempt to process |
| * a file on which it encounters an error. |
| * @param maxRetryCount The maximum number of times the operator will attempt |
| * to process a file on which it encounters an error. |
| */ |
| public void setMaxRetryCount(int maxRetryCount) |
| { |
| this.maxRetryCount = maxRetryCount; |
| } |
| |
| /** |
| * The class that is used to scan for new files in the directory for the |
| * AbstractFSDirectoryInputOperator. |
| */ |
| public static class DirectoryScanner implements Serializable |
| { |
| private static final long serialVersionUID = 4535844463258899929L; |
| private String filePatternRegexp; |
| private transient Pattern regex = null; |
| private int partitionIndex; |
| private int partitionCount; |
| private final transient HashSet<String> ignoredFiles = new HashSet<String>(); |
| |
| public String getFilePatternRegexp() |
| { |
| return filePatternRegexp; |
| } |
| |
| public void setFilePatternRegexp(String filePatternRegexp) |
| { |
| this.filePatternRegexp = filePatternRegexp; |
| this.regex = null; |
| } |
| |
| public int getPartitionCount() { |
| return partitionCount; |
| } |
| |
| public int getPartitionIndex() { |
| return partitionIndex; |
| } |
| |
| protected Pattern getRegex() { |
| if (this.regex == null && this.filePatternRegexp != null) |
| this.regex = Pattern.compile(this.filePatternRegexp); |
| return this.regex; |
| } |
| |
| public LinkedHashSet<Path> scan(FileSystem fs, Path filePath, Set<String> consumedFiles) |
| { |
| LinkedHashSet<Path> pathSet = Sets.newLinkedHashSet(); |
| try { |
| LOG.debug("Scanning {} with pattern {}", filePath, this.filePatternRegexp); |
| FileStatus[] files = fs.listStatus(filePath); |
| for (FileStatus status : files) |
| { |
| Path path = status.getPath(); |
| String filePathStr = path.toString(); |
| |
| if (consumedFiles.contains(filePathStr)) { |
| continue; |
| } |
| |
| if (ignoredFiles.contains(filePathStr)) { |
| continue; |
| } |
| |
| if (acceptFile(filePathStr)) { |
| LOG.debug("Found {}", filePathStr); |
| pathSet.add(path); |
| } else { |
| // don't look at it again |
| ignoredFiles.add(filePathStr); |
| } |
| } |
| } catch (FileNotFoundException e) { |
| LOG.warn("Failed to list directory {}", filePath, e); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| return pathSet; |
| } |
| |
| protected boolean acceptFile(String filePathStr) |
| { |
| if (partitionCount > 1) { |
| int i = filePathStr.hashCode(); |
| int mod = i % partitionCount; |
| if (mod < 0) { |
| mod += partitionCount; |
| } |
| LOG.debug("partition {} {} {} {}", partitionIndex, filePathStr, i, mod); |
| |
| if (mod != partitionIndex) { |
| return false; |
| } |
| } |
| Pattern regex = this.getRegex(); |
| if (regex != null) |
| { |
| Matcher matcher = regex.matcher(filePathStr); |
| if (!matcher.matches()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| public List<DirectoryScanner> partition(int count) |
| { |
| ArrayList<DirectoryScanner> partitions = Lists.newArrayListWithExpectedSize(count); |
| for (int i=0; i<count; i++) { |
| partitions.add(this.createPartition(i, count)); |
| } |
| return partitions; |
| } |
| |
| public List<DirectoryScanner> partition(int count , Collection<DirectoryScanner> scanners) { |
| return partition(count); |
| } |
| |
| protected DirectoryScanner createPartition(int partitionIndex, int partitionCount) |
| { |
| DirectoryScanner that = new DirectoryScanner(); |
| that.filePatternRegexp = this.filePatternRegexp; |
| that.regex = this.regex; |
| that.partitionIndex = partitionIndex; |
| that.partitionCount = partitionCount; |
| return that; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + " partitionIndex=" + |
| partitionIndex + " partitionCount=" + partitionCount + "]"; |
| } |
| } |
| } |