blob: 0949abaf31dd006738165eeea439894b34a7f273 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
import com.datatorrent.netlet.util.DTThrowable;
/**
* Input operator that scans a directory for files and splits a file into blocks.<br/>
* The operator emits block metadata and file metadata.<br/>
*
* The file system/directory space should be different for different partitions of file splitter.
*
* @deprecated use {@link FileSplitterInput}. This splitter has issues with recovery and fixing that breaks backward compatibility.
* @displayName File Splitter
* @category Input
* @tags file
* @since 2.0.0
*/
@OperatorAnnotation(checkpointableWithinAppWindow = false)
@Deprecated
public class FileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener
{
protected Long blockSize;
private int sequenceNo;
/**
* This is a threshold on the no. of blocks emitted per window. A lot of blocks emitted
* per window can overwhelm the downstream operators. This setting helps to control that.
*/
@Min(1)
protected int blocksThreshold;
protected transient long blockCount;
protected Iterator<FileBlockMetadata> blockMetadataIterator;
@NotNull
protected TimeBasedDirectoryScanner scanner;
@NotNull
protected WindowDataManager windowDataManager;
@NotNull
protected final transient LinkedList<FileInfo> currentWindowRecoveryState;
protected transient FileSystem fs;
protected transient int operatorId;
protected transient Context.OperatorContext context;
protected transient long currentWindowId;
protected final BasicCounters<MutableLong> fileCounters;
public final transient DefaultOutputPort<FileMetadata> filesMetadataOutput = new DefaultOutputPort<FileMetadata>();
public final transient DefaultOutputPort<FileBlockMetadata> blocksMetadataOutput = new DefaultOutputPort<FileBlockMetadata>();
public FileSplitter()
{
currentWindowRecoveryState = Lists.newLinkedList();
fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
windowDataManager = new WindowDataManager.NoopWindowDataManager();
scanner = new TimeBasedDirectoryScanner();
blocksThreshold = Integer.MAX_VALUE;
}
@Override
public void setup(Context.OperatorContext context)
{
Preconditions.checkArgument(!scanner.files.isEmpty(), "empty files");
Preconditions.checkArgument(blockSize == null || blockSize > 0, "invalid block size");
operatorId = context.getId();
this.context = context;
fileCounters.setCounter(Counters.PROCESSED_FILES, new MutableLong());
windowDataManager.setup(context);
try {
fs = scanner.getFSInstance();
} catch (IOException e) {
throw new RuntimeException("creating fs", e);
}
if (blockSize == null) {
blockSize = fs.getDefaultBlockSize(new Path(scanner.files.iterator().next()));
}
if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) {
blockMetadataIterator = null;
} else {
//don't setup scanner while recovery
scanner.setup(context);
}
}
@SuppressWarnings("ThrowFromFinallyBlock")
@Override
public void teardown()
{
try {
scanner.teardown();
} catch (Throwable t) {
DTThrowable.rethrow(t);
} finally {
try {
fs.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Override
public void beginWindow(long windowId)
{
blockCount = 0;
currentWindowId = windowId;
if (windowId <= windowDataManager.getLargestCompletedWindow()) {
replay(windowId);
}
}
protected void replay(long windowId)
{
try {
@SuppressWarnings("unchecked")
LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>)windowDataManager.retrieve(windowId);
if (recoveredData == null) {
//This could happen when there are multiple physical instances and one of them is ahead in processing windows.
return;
}
if (blockMetadataIterator != null) {
emitBlockMetadata();
}
for (FileInfo info : recoveredData) {
if (info.directoryPath != null) {
scanner.lastModifiedTimes.put(info.directoryPath, info.modifiedTime);
} else { //no directory
scanner.lastModifiedTimes.put(info.relativeFilePath, info.modifiedTime);
}
FileMetadata fileMetadata = buildFileMetadata(info);
fileCounters.getCounter(Counters.PROCESSED_FILES).increment();
filesMetadataOutput.emit(fileMetadata);
blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, blockSize);
if (!emitBlockMetadata()) {
break;
}
}
if (windowId == windowDataManager.getLargestCompletedWindow()) {
scanner.setup(context);
}
} catch (IOException e) {
throw new RuntimeException("replay", e);
}
}
@Override
public void emitTuples()
{
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
return;
}
Throwable throwable;
if ((throwable = scanner.atomicThrowable.get()) != null) {
DTThrowable.rethrow(throwable);
}
if (blockMetadataIterator != null && blockCount < blocksThreshold) {
emitBlockMetadata();
}
FileInfo fileInfo;
while (blockCount < blocksThreshold && (fileInfo = scanner.pollFile()) != null) {
currentWindowRecoveryState.add(fileInfo);
try {
FileMetadata fileMetadata = buildFileMetadata(fileInfo);
filesMetadataOutput.emit(fileMetadata);
fileCounters.getCounter(Counters.PROCESSED_FILES).increment();
if (!fileMetadata.isDirectory()) {
blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, blockSize);
if (!emitBlockMetadata()) {
//block threshold reached
break;
}
}
if (fileInfo.lastFileOfScan) {
break;
}
} catch (IOException e) {
throw new RuntimeException("creating metadata", e);
}
}
}
@Override
public void endWindow()
{
if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
try {
windowDataManager.save(currentWindowRecoveryState, currentWindowId);
} catch (IOException e) {
throw new RuntimeException("saving recovery", e);
}
}
currentWindowRecoveryState.clear();
context.setCounters(fileCounters);
}
/**
* @return true if all the blocks were emitted; false otherwise
*/
protected boolean emitBlockMetadata()
{
while (blockMetadataIterator.hasNext()) {
if (blockCount++ < blocksThreshold) {
this.blocksMetadataOutput.emit(blockMetadataIterator.next());
} else {
return false;
}
}
blockMetadataIterator = null;
return true;
}
/**
* Can be overridden for creating block metadata of a type that extends {@link FileBlockMetadata}
*/
protected FileBlockMetadata createBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber,
FileMetadata fileMetadata, boolean isLast)
{
return new FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getBlockIds()[blockNumber - 1], pos,
lengthOfFileInBlock, isLast, blockNumber == 1 ? -1 : fileMetadata.getBlockIds()[blockNumber - 2],
fileMetadata.getFileLength());
}
/**
* Creates file-metadata and populates no. of blocks in the metadata.
*
* @param fileInfo file information
* @return file-metadata
* @throws IOException
*/
protected FileMetadata buildFileMetadata(FileInfo fileInfo) throws IOException
{
String filePathStr = fileInfo.getFilePath();
LOG.debug("file {}", filePathStr);
FileMetadata fileMetadata = new FileMetadata(filePathStr);
Path path = new Path(filePathStr);
fileMetadata.setFileName(path.getName());
FileStatus status = fs.getFileStatus(path);
fileMetadata.setDirectory(status.isDirectory());
fileMetadata.setFileLength(status.getLen());
if (!status.isDirectory()) {
int noOfBlocks = (int)((status.getLen() / blockSize) + (((status.getLen() % blockSize) == 0) ? 0 : 1));
if (fileMetadata.getDataOffset() >= status.getLen()) {
noOfBlocks = 0;
}
fileMetadata.setNumberOfBlocks(noOfBlocks);
populateBlockIds(fileMetadata);
}
return fileMetadata;
}
protected void populateBlockIds(FileMetadata fileMetadata)
{
// block ids are 32 bits of operatorId | 32 bits of sequence number
long[] blockIds = new long[fileMetadata.getNumberOfBlocks()];
long longLeftSide = ((long)operatorId) << 32;
for (int i = 0; i < fileMetadata.getNumberOfBlocks(); i++) {
blockIds[i] = longLeftSide | sequenceNo++ & 0xFFFFFFFFL;
}
fileMetadata.setBlockIds(blockIds);
}
public void setBlockSize(Long blockSize)
{
this.blockSize = blockSize;
}
public Long getBlockSize()
{
return blockSize;
}
public void setBlocksThreshold(int threshold)
{
this.blocksThreshold = threshold;
}
public int getBlocksThreshold()
{
return blocksThreshold;
}
public void setScanner(TimeBasedDirectoryScanner scanner)
{
this.scanner = scanner;
}
public TimeBasedDirectoryScanner getScanner()
{
return this.scanner;
}
public void setWindowDataManager(WindowDataManager windowDataManager)
{
this.windowDataManager = windowDataManager;
}
public WindowDataManager getWindowDataManager()
{
return this.windowDataManager;
}
@Override
public void beforeCheckpoint(long l)
{
}
@Override
public void checkpointed(long l)
{
}
@Override
public void committed(long l)
{
try {
windowDataManager.committed(l);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* An {@link Iterator} for Block-Metadatas of a file.
*/
public static class BlockMetadataIterator implements Iterator<FileBlockMetadata>
{
private final FileMetadata fileMetadata;
private final long blockSize;
private long pos;
private int blockNumber;
private final FileSplitter splitter;
protected BlockMetadataIterator()
{
//for kryo
fileMetadata = null;
blockSize = -1;
splitter = null;
}
public BlockMetadataIterator(FileSplitter splitter, FileMetadata fileMetadata, long blockSize)
{
this.splitter = splitter;
this.fileMetadata = fileMetadata;
this.blockSize = blockSize;
this.pos = fileMetadata.getDataOffset();
this.blockNumber = 0;
}
@Override
public boolean hasNext()
{
return pos < fileMetadata.getFileLength();
}
@SuppressWarnings("StatementWithEmptyBody")
@Override
public FileBlockMetadata next()
{
long length;
while ((length = blockSize * ++blockNumber) <= pos) {
}
boolean isLast = length >= fileMetadata.getFileLength();
long lengthOfFileInBlock = isLast ? fileMetadata.getFileLength() : length;
FileBlockMetadata fileBlock = splitter.createBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast);
pos = lengthOfFileInBlock;
return fileBlock;
}
@Override
public void remove()
{
throw new UnsupportedOperationException("remove not supported");
}
}
/**
* Represents the file metadata - file path, name, no. of blocks, etc.
*/
@Deprecated
public static class FileMetadata
{
@NotNull
private String filePath;
private String fileName;
private int numberOfBlocks;
private long dataOffset;
private long fileLength;
private long discoverTime;
private long[] blockIds;
private boolean isDirectory;
@SuppressWarnings("unused")
protected FileMetadata()
{
//for kryo
filePath = null;
discoverTime = System.currentTimeMillis();
}
/**
* Constructs file metadata
*
* @param filePath file path
*/
public FileMetadata(@NotNull String filePath)
{
this.filePath = filePath;
discoverTime = System.currentTimeMillis();
}
/**
* Returns the total number of blocks.
*/
public int getNumberOfBlocks()
{
return numberOfBlocks;
}
/**
* Sets the total number of blocks.
*/
public void setNumberOfBlocks(int numberOfBlocks)
{
this.numberOfBlocks = numberOfBlocks;
}
/**
* Returns the file name.
*/
public String getFileName()
{
return fileName;
}
/**
* Sets the file name.
*/
public void setFileName(String fileName)
{
this.fileName = fileName;
}
/**
* Sets the file path.
*/
public void setFilePath(String filePath)
{
this.filePath = filePath;
}
/**
* Returns the file path.
*/
public String getFilePath()
{
return filePath;
}
/**
* Returns the data offset.
*/
public long getDataOffset()
{
return dataOffset;
}
/**
* Sets the data offset.
*/
public void setDataOffset(long offset)
{
this.dataOffset = offset;
}
/**
* Returns the file length.
*/
public long getFileLength()
{
return fileLength;
}
/**
* Sets the file length.
*/
public void setFileLength(long fileLength)
{
this.fileLength = fileLength;
}
/**
* Returns the file discover time.
*/
public long getDiscoverTime()
{
return discoverTime;
}
/**
* Sets the discover time.
*/
public void setDiscoverTime(long discoverTime)
{
this.discoverTime = discoverTime;
}
/**
* Returns the block ids associated with the file.
*/
public long[] getBlockIds()
{
return blockIds;
}
/**
* Sets the blocks ids of the file.
*/
public void setBlockIds(long[] blockIds)
{
this.blockIds = blockIds;
}
/**
* Sets whether the file metadata is a directory.
*/
public void setDirectory(boolean isDirectory)
{
this.isDirectory = isDirectory;
}
/**
* @return true if it is a directory; false otherwise.
*/
public boolean isDirectory()
{
return isDirectory;
}
}
@Deprecated
public static class TimeBasedDirectoryScanner implements Component<Context.OperatorContext>, Runnable
{
private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
protected boolean recursive;
protected transient volatile boolean trigger;
@NotNull
protected final Map<String, Long> lastModifiedTimes;
@NotNull
protected final Set<String> files;
@Min(0)
protected long scanIntervalMillis;
private String filePatternRegularExp;
protected transient long lastScanMillis;
protected transient FileSystem fs;
protected final transient LinkedBlockingDeque<FileInfo> discoveredFiles;
protected final transient ExecutorService scanService;
protected final transient AtomicReference<Throwable> atomicThrowable;
private transient volatile boolean running;
protected final transient HashSet<String> ignoredFiles;
protected transient Pattern regex;
protected transient long sleepMillis;
public TimeBasedDirectoryScanner()
{
lastModifiedTimes = Maps.newHashMap();
recursive = true;
scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
files = Sets.newLinkedHashSet();
scanService = Executors.newSingleThreadExecutor();
discoveredFiles = new LinkedBlockingDeque<FileInfo>();
atomicThrowable = new AtomicReference<Throwable>();
ignoredFiles = Sets.newHashSet();
}
@Override
public void setup(Context.OperatorContext context)
{
sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
if (filePatternRegularExp != null) {
regex = Pattern.compile(filePatternRegularExp);
}
try {
fs = getFSInstance();
} catch (IOException e) {
throw new RuntimeException("opening fs", e);
}
scanService.submit(this);
}
@Override
public void teardown()
{
running = false;
scanService.shutdownNow();
try {
fs.close();
} catch (IOException e) {
throw new RuntimeException("closing fs", e);
}
}
protected FileSystem getFSInstance() throws IOException
{
return FileSystem.newInstance(new Path(files.iterator().next()).toUri(), new Configuration());
}
@Override
public void run()
{
running = true;
try {
while (running) {
if (trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) {
trigger = false;
for (String afile : files) {
scan(new Path(afile), null);
}
scanComplete();
} else {
Thread.sleep(sleepMillis);
}
}
} catch (Throwable throwable) {
LOG.error("service", throwable);
running = false;
atomicThrowable.set(throwable);
DTThrowable.rethrow(throwable);
}
}
/**
* Operations that need to be done once a scan is complete.
*/
protected void scanComplete()
{
LOG.debug("scan complete {}", lastScanMillis);
FileInfo fileInfo = discoveredFiles.peekLast();
if (fileInfo != null) {
fileInfo.lastFileOfScan = true;
}
lastScanMillis = System.currentTimeMillis();
}
protected void scan(@NotNull Path filePath, Path rootPath)
{
try {
FileStatus parentStatus = fs.getFileStatus(filePath);
String parentPathStr = filePath.toUri().getPath();
LOG.debug("scan {}", parentPathStr);
Long oldModificationTime = lastModifiedTimes.get(parentPathStr);
lastModifiedTimes.put(parentPathStr, parentStatus.getModificationTime());
if (skipFile(filePath, parentStatus.getModificationTime(), oldModificationTime)) {
return;
}
LOG.debug("scan {}", filePath.toUri().getPath());
FileStatus[] childStatuses = fs.listStatus(filePath);
for (FileStatus status : childStatuses) {
Path childPath = status.getPath();
String childPathStr = childPath.toUri().getPath();
if (skipFile(childPath, status.getModificationTime(), oldModificationTime)) {
continue;
}
if (status.isDirectory()) {
if (recursive) {
scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath);
}
//a directory is treated like any other discovered file.
}
if (ignoredFiles.contains(childPathStr)) {
continue;
}
if (acceptFile(childPathStr)) {
LOG.debug("found {}", childPathStr);
FileInfo info;
if (rootPath == null) {
info = parentStatus.isDirectory() ?
new FileInfo(parentPathStr, childPath.getName(), parentStatus.getModificationTime()) :
new FileInfo(null, childPathStr, parentStatus.getModificationTime());
} else {
URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
info = new FileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(),
parentStatus.getModificationTime());
}
discoveredFiles.add(info);
} else {
// don't look at it again
ignoredFiles.add(childPathStr);
}
}
} catch (FileNotFoundException fnf) {
LOG.warn("Failed to list directory {}", filePath, fnf);
} catch (IOException e) {
throw new RuntimeException("listing files", e);
}
}
/**
* Skips file/directory based on their modification time.<br/>
*
* @param path file path
* @param modificationTime modification time
* @param lastModificationTime last cached directory modification time
* @return true to skip; false otherwise.
* @throws IOException
*/
protected boolean skipFile(@SuppressWarnings("unused") @NotNull Path path, @NotNull Long modificationTime,
Long lastModificationTime) throws IOException
{
return (!(lastModificationTime == null || modificationTime > lastModificationTime));
}
/**
* Accepts file which match a regular pattern.
*
* @param filePathStr file path
* @return true if the path matches the pattern; false otherwise;
*/
protected boolean acceptFile(String filePathStr)
{
if (regex != null) {
Matcher matcher = regex.matcher(filePathStr);
if (!matcher.matches()) {
return false;
}
}
return true;
}
public FileInfo pollFile()
{
return discoveredFiles.poll();
}
/**
* Gets the regular expression for file names to split.
*
* @return regular expression
*/
public String getFilePatternRegularExp()
{
return filePatternRegularExp;
}
/**
* Only files with names matching the given java regular expression are split.
*
* @param filePatternRegexp regular expression
*/
public void setFilePatternRegularExp(String filePatternRegexp)
{
this.filePatternRegularExp = filePatternRegexp;
}
/**
* A comma separated list of directories to scan. If the path is not fully qualified the default
* file system is used. A fully qualified path can be provided to scan directories in other filesystems.
*
* @param files files
*/
public void setFiles(String files)
{
Iterables.addAll(this.files, Splitter.on(",").omitEmptyStrings().split(files));
}
/**
* Gets the files to be scanned.
*
* @return files to be scanned.
*/
public String getFiles()
{
return Joiner.on(",").join(this.files);
}
/**
* True if recursive; false otherwise.
*
* @param recursive true if recursive; false otherwise.
*/
public void setRecursive(boolean recursive)
{
this.recursive = recursive;
}
/**
* Sets whether scan will be recursive.
*
* @return true if recursive; false otherwise.
*/
public boolean isRecursive()
{
return this.recursive;
}
/**
* Sets the trigger which will initiate scan.
*
* @param trigger
*/
public void setTrigger(boolean trigger)
{
this.trigger = trigger;
}
/**
* The trigger which will initiate scan.
*
* @return trigger
*/
public boolean isTrigger()
{
return this.trigger;
}
/**
* Returns the frequency with which new files are scanned for in milliseconds.
*
* @return The scan interval in milliseconds.
*/
public long getScanIntervalMillis()
{
return scanIntervalMillis;
}
/**
* Sets the frequency with which new files are scanned for in milliseconds.
*
* @param scanIntervalMillis The scan interval in milliseconds.
*/
public void setScanIntervalMillis(long scanIntervalMillis)
{
this.scanIntervalMillis = scanIntervalMillis;
}
}
/**
* A class that represents the file discovered by time-based scanner.
*/
@Deprecated
protected static class FileInfo
{
protected final String directoryPath;
protected final String relativeFilePath;
protected final long modifiedTime;
protected transient boolean lastFileOfScan;
private FileInfo()
{
directoryPath = null;
relativeFilePath = null;
modifiedTime = -1;
}
protected FileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath, long modifiedTime)
{
this.directoryPath = directoryPath;
this.relativeFilePath = relativeFilePath;
this.modifiedTime = modifiedTime;
}
/**
* @return directory path
*/
public String getDirectoryPath()
{
return directoryPath;
}
/**
* @return path relative to directory
*/
public String getRelativeFilePath()
{
return relativeFilePath;
}
/**
* @return full path of the file
*/
public String getFilePath()
{
if (directoryPath == null) {
return relativeFilePath;
}
return new Path(directoryPath, relativeFilePath).toUri().getPath();
}
public boolean isLastFileOfScan()
{
return lastFileOfScan;
}
}
public static enum Counters
{
PROCESSED_FILES
}
private static final Logger LOG = LoggerFactory.getLogger(FileSplitter.class);
}