| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.apex.malhar.lib.fs; |
| |
| import javax.validation.constraints.Min; |
| import javax.validation.constraints.NotNull; |
| import javax.validation.constraints.Pattern; |
| import javax.validation.constraints.Size; |
| |
| import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; |
| import org.apache.apex.malhar.lib.io.fs.FSInputModule.SequentialFileBlockMetadataCodec; |
| import org.apache.apex.malhar.lib.io.fs.FileSplitterInput; |
| import org.apache.hadoop.conf.Configuration; |
| |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.Module; |
| |
| /** |
| * This module is used for reading records/tuples from FileSystem. Records can |
| * be read in parallel using multiple partitions of record reader operator. |
| * (Ordering is not guaranteed when records are read in parallel) |
| * |
| * Input directory is scanned at specified interval to poll for new data. |
| * |
| * The module reads data in parallel, following parameters can be configured |
| * <br/> |
| * 1. files: list of file(s)/directories to read<br/> |
| * 2. filePatternRegularExp: Files with names matching given regex will be read |
| * <br/> |
| * 3. scanIntervalMillis: interval between two scans to discover new files in |
| * input directory<br/> |
| * 4. recursive: if true, scan input directories recursively<br/> |
| * 5. blockSize: block size used to read input blocks of file<br/> |
| * 6. sequentialFileRead: if true, then each reader partition will read |
| * different file. <br/> |
| * instead of reading different offsets of the same file. <br/> |
| * (File level parallelism instead of block level parallelism)<br/> |
| * 7. blocksThreshold: number of blocks emitted per window<br/> |
| * 8. minReaders: Minimum number of block readers for dynamic partitioning<br/> |
| * 9. maxReaders: Maximum number of block readers for dynamic partitioning<br/> |
| * 10. repartitionCheckInterval: Interval for re-evaluating dynamic partitioning<br/> |
| * @since 3.5.0 |
| */ |
| @org.apache.hadoop.classification.InterfaceStability.Evolving |
| public class FSRecordReaderModule implements Module |
| { |
| @NotNull |
| @Size(min = 1) |
| private String files; |
| private String filePatternRegularExp; |
| @Min(1) |
| private long scanIntervalMillis = 5000; |
| private boolean recursive = true; |
| private boolean sequentialFileRead = false; |
| @Min(0) |
| private long blockSize; |
| @Min(1) |
| protected int blocksThreshold = 1; |
| protected int minReaders; |
| protected int maxReaders; |
| protected long repartitionCheckInterval; |
| |
| public final transient ProxyOutputPort<byte[]> records = new ProxyOutputPort<byte[]>(); |
| |
| /** |
| * Criteria for record split |
| */ |
| private String mode = RECORD_READER_MODE.DELIMITED_RECORD.toString(); |
| |
| /** |
| * Length for fixed width record |
| */ |
| private int recordLength; |
| |
| /** |
| * Creates an instance of FileSplitter |
| * |
| * @return |
| */ |
| public FileSplitterInput createFileSplitter() |
| { |
| return new FileSplitterInput(); |
| } |
| |
| /** |
| * Creates an instance of Record Reader |
| * |
| * @return FSRecordReader instance |
| */ |
| public FSRecordReader createRecordReader() |
| { |
| FSRecordReader recordReader = new FSRecordReader(); |
| recordReader.setMode(mode); |
| recordReader.setRecordLength(recordLength); |
| |
| return recordReader; |
| } |
| |
| @Override |
| public void populateDAG(DAG dag, Configuration configuration) |
| { |
| FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); |
| FSRecordReader recordReader = dag.addOperator("BlockReader", createRecordReader()); |
| |
| dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, recordReader.blocksMetadataInput); |
| |
| if (sequentialFileRead) { |
| dag.setInputPortAttribute(recordReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC, |
| new SequentialFileBlockMetadataCodec()); |
| } |
| |
| FileSplitterInput.TimeBasedDirectoryScanner fileScanner = fileSplitter.getScanner(); |
| fileScanner.setFiles(files); |
| if (scanIntervalMillis != 0) { |
| fileScanner.setScanIntervalMillis(scanIntervalMillis); |
| } |
| fileScanner.setRecursive(recursive); |
| if (filePatternRegularExp != null) { |
| fileSplitter.getScanner().setFilePatternRegularExp(filePatternRegularExp); |
| } |
| recordReader.setBasePath(files); |
| fileSplitter.setBlocksThreshold(blocksThreshold); |
| |
| if (minReaders != 0) { |
| recordReader.setMinReaders(minReaders); |
| } |
| if (maxReaders != 0) { |
| recordReader.setMaxReaders(maxReaders); |
| } |
| if (repartitionCheckInterval != 0) { |
| recordReader.setIntervalMillis(repartitionCheckInterval); |
| } |
| |
| /** |
| * Override the split size or input blocks of a file. If not specified, |
| * it would use default blockSize of the filesystem. |
| */ |
| if (blockSize != 0) { |
| fileSplitter.setBlockSize(blockSize); |
| } |
| records.set(recordReader.records); |
| } |
| |
| /** |
| * A comma separated list of directories to scan. If the path is not fully |
| * qualified the default file system is used. A fully qualified path can be |
| * provided to scan directories in other filesystems. |
| * |
| * @param files |
| * files |
| */ |
| public void setFiles(String files) |
| { |
| this.files = files; |
| } |
| |
| /** |
| * Gets the files to be scanned. |
| * |
| * @return files to be scanned. |
| */ |
| public String getFiles() |
| { |
| return files; |
| } |
| |
| /** |
| * Gets the regular expression for file names to split |
| * |
| * @return regular expression |
| */ |
| public String getFilePatternRegularExp() |
| { |
| return filePatternRegularExp; |
| } |
| |
| /** |
| * Only files width names matching the given java regular expression are split |
| * |
| * @param filePatternRegexp |
| * regular expression |
| */ |
| public void setFilePatternRegularExp(String filePatternRegexp) |
| { |
| this.filePatternRegularExp = filePatternRegexp; |
| } |
| |
| /** |
| * Gets scan interval in milliseconds, interval between two scans to discover |
| * new files in input directory |
| * |
| * @return scanInterval milliseconds |
| */ |
| public long getScanIntervalMillis() |
| { |
| return scanIntervalMillis; |
| } |
| |
| /** |
| * Sets scan interval in milliseconds, interval between two scans to discover |
| * new files in input directory |
| * |
| * @param scanIntervalMillis |
| */ |
| public void setScanIntervalMillis(long scanIntervalMillis) |
| { |
| this.scanIntervalMillis = scanIntervalMillis; |
| } |
| |
| /** |
| * Get is scan recursive |
| * |
| * @return isRecursive |
| */ |
| public boolean isRecursive() |
| { |
| return recursive; |
| } |
| |
| /** |
| * set is scan recursive |
| * |
| * @param recursive |
| */ |
| public void setRecursive(boolean recursive) |
| { |
| this.recursive = recursive; |
| } |
| |
| /** |
| * Gets is sequential file read |
| * |
| * @return sequentialFileRead |
| */ |
| public boolean isSequentialFileRead() |
| { |
| return sequentialFileRead; |
| } |
| |
| /** |
| * Sets is sequential file read |
| * |
| * @param sequentialFileRead |
| */ |
| |
| public void setSequentialFileRead(boolean sequentialFileRead) |
| { |
| this.sequentialFileRead = sequentialFileRead; |
| } |
| |
| /** |
| * Sets number of blocks to be emitted per window.<br/> |
| * A lot of blocks emitted per window can overwhelm the downstream operators. |
| * Set this value considering blockSize, minReaders and maxReaders. |
| * |
| * @param threshold |
| */ |
| public void setBlocksThreshold(int threshold) |
| { |
| this.blocksThreshold = threshold; |
| } |
| |
| /** |
| * Gets number of blocks to be emitted per window.<br/> |
| * A lot of blocks emitted per window can overwhelm the downstream operators. |
| * Set this value considering blockSize, minReaders and maxReaders. |
| * |
| * @return blocksThreshold |
| */ |
| public int getBlocksThreshold() |
| { |
| return blocksThreshold; |
| } |
| |
| /** |
| * Gets block size used to read input blocks of file |
| * |
| * @return blockSize |
| */ |
| public long getBlockSize() |
| { |
| return blockSize; |
| } |
| |
| /** |
| * Sets block size used to read input blocks of file |
| * |
| * @param blockSize |
| */ |
| public void setBlockSize(long blockSize) |
| { |
| this.blockSize = blockSize; |
| } |
| |
| /** |
| * Gets minimum number of block readers for dynamic partitioning. |
| * |
| * @return minimum instances of block reader. |
| */ |
| public int getMinReaders() |
| { |
| return minReaders; |
| } |
| |
| /** |
| * Sets minimum number of block readers for dynamic partitioning. |
| * |
| * @param minReaders minimum number of readers. |
| */ |
| public void setMinReaders(int minReaders) |
| { |
| this.minReaders = minReaders; |
| } |
| |
| /** |
| * Gets maximum number of block readers for dynamic partitioning. |
| * |
| * @return maximum instances of block reader. |
| */ |
| public int getMaxReaders() |
| { |
| return maxReaders; |
| } |
| |
| /** |
| * Sets maximum number of block readers for dynamic partitioning. |
| * |
| * @param maxReaders maximum number of readers. |
| */ |
| public void setMaxReaders(int maxReaders) |
| { |
| this.maxReaders = maxReaders; |
| } |
| |
| /** |
| * Gets Interval for re-evaluating dynamic partitioning. |
| * |
| * @return interval for re-evaluating dynamic partitioning |
| */ |
| public long getRepartitionCheckInterval() |
| { |
| return repartitionCheckInterval; |
| } |
| |
| /** |
| * Sets Interval for re-evaluating dynamic partitioning. |
| * |
| * @param repartitionCheckInterval interval for re-evaluating dynamic partitioning |
| */ |
| public void setRepartitionCheckInterval(long repartitionCheckInterval) |
| { |
| this.repartitionCheckInterval = repartitionCheckInterval; |
| } |
| |
| /** |
| * Criteria for record split : FIXED_WIDTH_RECORD or DELIMITED_RECORD |
| * |
| * @param mode |
| * Mode |
| */ |
| public void setMode( |
| @Pattern(regexp = "FIXED_WIDTH_RECORD|DELIMITED_RECORD", flags = Pattern.Flag.CASE_INSENSITIVE) String mode) |
| { |
| this.mode = mode; |
| } |
| |
| /** |
| * Criteria for record split |
| * |
| * @return mode |
| */ |
| public String getMode() |
| { |
| return mode; |
| } |
| |
| /** |
| * Length for fixed width record |
| * |
| * @return record length |
| */ |
| public int getRecordLength() |
| { |
| return recordLength; |
| } |
| |
| /** |
| * Length for fixed width record |
| * |
| * @param recordLength |
| */ |
| public void setRecordLength(int recordLength) |
| { |
| this.recordLength = recordLength; |
| } |
| } |