| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.apex.malhar.lib.fs; |
| |
| import javax.validation.constraints.Min; |
| |
| import javax.validation.constraints.NotNull; |
| import javax.validation.constraints.Size; |
| |
| import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; |
| import org.apache.hadoop.conf.Configuration; |
| |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.Module; |
| import com.datatorrent.common.partitioner.StatelessPartitioner; |
| import com.datatorrent.lib.codec.KryoSerializableStreamCodec; |
| import com.datatorrent.lib.io.block.BlockMetadata; |
| import com.datatorrent.lib.io.block.FSSliceReader; |
| import com.datatorrent.lib.io.fs.FileSplitterInput; |
| |
| /** |
| * This module is used for reading records/tuples from FileSystem. Records can |
| * be read in parallel using multiple partitions of record reader operator. |
| * (Ordering is not guaranteed when records are read in parallel) |
| * |
| * Input directory is scanned at specified interval to poll for new data. |
| * |
| * The module reads data in parallel, following parameters can be configured |
| * <br/> |
| * 1. files: list of file(s)/directories to read<br/> |
| * 2. filePatternRegularExp: Files with names matching given regex will be read |
| * <br/> |
| * 3. scanIntervalMillis: interval between two scans to discover new files in |
| * input directory<br/> |
| * 4. recursive: if true, scan input directories recursively<br/> |
| * 5. blockSize: block size used to read input blocks of file<br/> |
| * 6. readersCount: count of readers to read input file<br/> |
| * 7. sequentialFileRead: if true, then each reader partition will read |
| * different file. <br/> |
| * instead of reading different offsets of the same file. <br/> |
| * (File level parallelism instead of block level parallelism)<br/> |
| * 8. blocksThreshold: number of blocks emitted per window |
| * |
| * @since 3.5.0 |
| */ |
| @org.apache.hadoop.classification.InterfaceStability.Evolving |
| public class FSRecordReaderModule implements Module |
| { |
| @NotNull |
| @Size(min = 1) |
| private String files; |
| private String filePatternRegularExp; |
| @Min(1) |
| private long scanIntervalMillis = 5000; |
| private boolean recursive = true; |
| private boolean sequentialFileRead = false; |
| @Min(1) |
| private int readersCount = 1; |
| @Min(1) |
| protected int blocksThreshold = 1; |
| |
| public final transient ProxyOutputPort<byte[]> records = new ProxyOutputPort<byte[]>(); |
| |
| /** |
| * Criteria for record split |
| */ |
| private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; |
| |
| /** |
| * Length for fixed width record |
| */ |
| private int recordLength; |
| |
| /** |
| * Creates an instance of FileSplitter |
| * |
| * @return |
| */ |
| public FileSplitterInput createFileSplitter() |
| { |
| return new FileSplitterInput(); |
| } |
| |
| /** |
| * Creates an instance of Record Reader |
| * |
| * @return FSRecordReader instance |
| */ |
| public FSRecordReader createRecordReader() |
| { |
| FSRecordReader recordReader = new FSRecordReader(); |
| recordReader.setMode(mode); |
| recordReader.setRecordLength(recordLength); |
| |
| return recordReader; |
| } |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration configuration) |
| { |
| FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); |
| FSRecordReader recordReader = dag.addOperator("BlockReader", createRecordReader()); |
| |
| dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, recordReader.blocksMetadataInput); |
| |
| if (sequentialFileRead) { |
| dag.setInputPortAttribute(recordReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC, |
| new SequentialFileBlockMetadataCodec()); |
| } |
| |
| FileSplitterInput.TimeBasedDirectoryScanner fileScanner = fileSplitter.getScanner(); |
| fileScanner.setFiles(files); |
| if (scanIntervalMillis != 0) { |
| fileScanner.setScanIntervalMillis(scanIntervalMillis); |
| } |
| fileScanner.setRecursive(recursive); |
| if (filePatternRegularExp != null) { |
| fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp); |
| } |
| |
| recordReader.setBasePath(files); |
| if (readersCount != 0) { |
| dag.setAttribute(recordReader, Context.OperatorContext.PARTITIONER, |
| new StatelessPartitioner<FSSliceReader>(readersCount)); |
| } |
| fileSplitter.setBlocksThreshold(blocksThreshold); |
| records.set(recordReader.records); |
| } |
| |
| /** |
| * A comma separated list of directories to scan. If the path is not fully |
| * qualified the default file system is used. A fully qualified path can be |
| * provided to scan directories in other filesystems. |
| * |
| * @param files |
| * files |
| */ |
| public void setFiles(String files) |
| { |
| this.files = files; |
| } |
| |
| /** |
| * Gets the files to be scanned. |
| * |
| * @return files to be scanned. |
| */ |
| public String getFiles() |
| { |
| return files; |
| } |
| |
| /** |
| * Gets the regular expression for file names to split |
| * |
| * @return regular expression |
| */ |
| public String getFilePatternRegularExp() |
| { |
| return filePatternRegularExp; |
| } |
| |
| /** |
| * Only files width names matching the given java regular expression are split |
| * |
| * @param filePatternRegexp |
| * regular expression |
| */ |
| public void setFilePatternRegularExp(String filePatternRegexp) |
| { |
| this.filePatternRegularExp = filePatternRegexp; |
| } |
| |
| /** |
| * Gets scan interval in milliseconds, interval between two scans to discover |
| * new files in input directory |
| * |
| * @return scanInterval milliseconds |
| */ |
| public long getScanIntervalMillis() |
| { |
| return scanIntervalMillis; |
| } |
| |
| /** |
| * Sets scan interval in milliseconds, interval between two scans to discover |
| * new files in input directory |
| * |
| * @param scanIntervalMillis |
| */ |
| public void setScanIntervalMillis(long scanIntervalMillis) |
| { |
| this.scanIntervalMillis = scanIntervalMillis; |
| } |
| |
| /** |
| * Get is scan recursive |
| * |
| * @return isRecursive |
| */ |
| public boolean isRecursive() |
| { |
| return recursive; |
| } |
| |
| /** |
| * set is scan recursive |
| * |
| * @param recursive |
| */ |
| public void setRecursive(boolean recursive) |
| { |
| this.recursive = recursive; |
| } |
| |
| /** |
| * Gets readers count |
| * |
| * @return readersCount |
| */ |
| public int getReadersCount() |
| { |
| return readersCount; |
| } |
| |
| /** |
| * Static count of readers to read input file |
| * |
| * @param readersCount |
| */ |
| public void setReadersCount(int readersCount) |
| { |
| this.readersCount = readersCount; |
| } |
| |
| /** |
| * Gets is sequential file read |
| * |
| * @return sequentialFileRead |
| */ |
| public boolean isSequentialFileRead() |
| { |
| return sequentialFileRead; |
| } |
| |
| /** |
| * Sets is sequential file read |
| * |
| * @param sequentialFileRead |
| */ |
| |
| public void setSequentialFileRead(boolean sequentialFileRead) |
| { |
| this.sequentialFileRead = sequentialFileRead; |
| } |
| |
| /** |
| * Sets number of blocks to be emitted per window.<br/> |
| * A lot of blocks emitted per window can overwhelm the downstream operators. |
| * Set this value considering blockSize and readersCount. |
| * |
| * @param threshold |
| */ |
| public void setBlocksThreshold(int threshold) |
| { |
| this.blocksThreshold = threshold; |
| } |
| |
| /** |
| * Gets number of blocks to be emitted per window.<br/> |
| * A lot of blocks emitted per window can overwhelm the downstream operators. |
| * Set this value considering blockSize and readersCount. |
| * |
| * @return |
| */ |
| public int getBlocksThreshold() |
| { |
| return blocksThreshold; |
| } |
| |
| /** |
| * Criteria for record split |
| * |
| * @return mode |
| */ |
| public RECORD_READER_MODE getMode() |
| { |
| return mode; |
| } |
| |
| /** |
| * Criteria for record split |
| * |
| * @param mode |
| * Mode |
| */ |
| public void setMode(RECORD_READER_MODE mode) |
| { |
| this.mode = mode; |
| } |
| |
| /** |
| * Length for fixed width record |
| * |
| * @return record length |
| */ |
| public int getRecordLength() |
| { |
| return recordLength; |
| } |
| |
| /** |
| * Length for fixed width record |
| * |
| * @param recordLength |
| */ |
| public void setRecordLength(int recordLength) |
| { |
| this.recordLength = recordLength; |
| } |
| |
| public static class SequentialFileBlockMetadataCodec |
| extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata> |
| { |
| @Override |
| public int getPartition(BlockMetadata.FileBlockMetadata fileBlockMetadata) |
| { |
| return fileBlockMetadata.hashCode(); |
| } |
| } |
| } |