blob: a325a2fb0ed5c37f9057bf94b24f11c590b7ea66 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.fs;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileSplitter.FileMetadata;
import com.datatorrent.lib.io.fs.FileStitcher.BlockNotFoundException;
/**
* Synchronizer waits for all data blocks for a file to be written to disk. It
* sends trigger to merge the file only after all blocks are written to HDFS
*
* @since 3.4.0
*/
public class Synchronizer extends BaseOperator
{
/**
* Map to FileMetadata for given file
*/
private Map<String, FileMetadata> fileMetadataMap = Maps.newHashMap();
/**
* Map maintaining BlockId to BlockMetadata mapping for given file
*/
private Map<String, Map<Long, BlockMetadata.FileBlockMetadata>> fileToReceivedBlocksMetadataMap = Maps.newHashMap();
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
}
@Override
public void endWindow()
{
}
public final transient DefaultInputPort<FileMetadata> filesMetadataInput = new DefaultInputPort<FileMetadata>()
{
@Override
public void process(FileMetadata fileMetadata)
{
String filePath = fileMetadata.getFilePath();
Map<Long, BlockMetadata.FileBlockMetadata> receivedBlocksMetadata = getReceivedBlocksMetadata(filePath);
fileMetadataMap.put(filePath, fileMetadata);
emitTriggerIfAllBlocksReceived(fileMetadata, receivedBlocksMetadata);
}
};
public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blocksMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>()
{
@Override
public void process(BlockMetadata.FileBlockMetadata blockMetadata)
{
String filePath = blockMetadata.getFilePath();
LOG.debug("received blockId {} for file {}", blockMetadata.getBlockId(), filePath);
Map<Long, BlockMetadata.FileBlockMetadata> receivedBlocksMetadata = getReceivedBlocksMetadata(filePath);
receivedBlocksMetadata.put(blockMetadata.getBlockId(), blockMetadata);
FileMetadata fileMetadata = fileMetadataMap.get(filePath);
if (fileMetadata != null) {
emitTriggerIfAllBlocksReceived(fileMetadata, receivedBlocksMetadata);
}
}
};
/**
* Checks if all blocks for given file are received. Sends triggger when all
* blocks are received.
*
* @param fileMetadata
* @param receivedBlocksMetadata
*/
private void emitTriggerIfAllBlocksReceived(FileMetadata fileMetadata,
Map<Long, BlockMetadata.FileBlockMetadata> receivedBlocksMetadata)
{
String filePath = fileMetadata.getFilePath();
if (receivedBlocksMetadata.size() != fileMetadata.getNumberOfBlocks()) {
//Some blocks are yet to be received
fileMetadataMap.put(filePath, fileMetadata);
} else {
//No of received blocks match number of file blocks
Set<Long> receivedBlocks = receivedBlocksMetadata.keySet();
boolean blockMissing = false;
if (!fileMetadata.isDirectory()) {
for (long blockId : fileMetadata.getBlockIds()) {
if (!receivedBlocks.contains(blockId)) {
blockMissing = true;
}
}
}
if (!blockMissing) {
//All blocks received emit the filemetadata
long fileProcessingTime = System.currentTimeMillis() - fileMetadata.getDiscoverTime();
List<StitchBlock> outputBlocks = constructOutputBlockMetadataList(fileMetadata);
OutputFileMetadata outputFileMetadata = new OutputFileMetadata(fileMetadata, outputBlocks);
trigger.emit(outputFileMetadata);
LOG.debug("Total time taken to process the file {} is {} ms", fileMetadata.getFilePath(), fileProcessingTime);
fileMetadataMap.remove(filePath);
}
}
}
private List<StitchBlock> constructOutputBlockMetadataList(FileMetadata fileMetadata)
{
String filePath = fileMetadata.getFilePath();
Map<Long, BlockMetadata.FileBlockMetadata> receivedBlocksMetadata = fileToReceivedBlocksMetadataMap.get(filePath);
List<StitchBlock> outputBlocks = Lists.newArrayList();
if (fileMetadata.isDirectory()) {
return outputBlocks;
}
long[] blockIDs = fileMetadata.getBlockIds();
for (int i = 0; i < blockIDs.length; i++) {
Long blockId = blockIDs[i];
StitchBlockMetaData outputFileBlockMetaData = new StitchBlockMetaData(receivedBlocksMetadata.get(blockId),
fileMetadata.getRelativePath(), (i == blockIDs.length - 1));
outputBlocks.add(outputFileBlockMetaData);
}
return outputBlocks;
}
private Map<Long, BlockMetadata.FileBlockMetadata> getReceivedBlocksMetadata(String filePath)
{
Map<Long, BlockMetadata.FileBlockMetadata> receivedBlocksMetadata = fileToReceivedBlocksMetadataMap.get(filePath);
if (receivedBlocksMetadata == null) {
//No blocks received till now
receivedBlocksMetadata = new HashMap<Long, BlockMetadata.FileBlockMetadata>();
fileToReceivedBlocksMetadataMap.put(filePath, receivedBlocksMetadata);
}
return receivedBlocksMetadata;
}
private static final Logger LOG = LoggerFactory.getLogger(Synchronizer.class);
public final transient DefaultOutputPort<OutputFileMetadata> trigger = new DefaultOutputPort<OutputFileMetadata>();
/**
* FileSticher needs information about original sequence of blocks.
* {@link OutputFileMetadata} maintains list of {@link StitchBlock} in
* addition to {@link FileMetadata}
*/
public static class OutputFileMetadata extends FileMetadata implements StitchedFileMetaData
{
private List<StitchBlock> stitchBlocksList;
protected OutputFileMetadata()
{
super();
stitchBlocksList = Lists.newArrayList();
}
protected OutputFileMetadata(FileMetadata fileMetaData, List<StitchBlock> stitchBlocksList)
{
super(fileMetaData);
this.stitchBlocksList = stitchBlocksList;
}
public OutputFileMetadata(@NotNull String filePath)
{
super(filePath);
}
public String getStitchedFileRelativePath()
{
return getRelativePath();
}
@Override
public List<StitchBlock> getStitchBlocksList()
{
return stitchBlocksList;
}
/**
* @param outputBlockMetaDataList
* the outputBlockMetaDataList to set
*/
public void setOutputBlockMetaDataList(List<StitchBlock> outputBlockMetaDataList)
{
this.stitchBlocksList = outputBlockMetaDataList;
}
}
/**
* StitchedFileMetaData is used by FileStitcher to define constituents of the
* output file.
*/
public static interface StitchedFileMetaData
{
/**
* @return the outputFileName
*/
public String getStitchedFileRelativePath();
public List<StitchBlock> getStitchBlocksList();
}
public static interface StitchBlock
{
public void writeTo(FileSystem blocksFS, String blocksDir, OutputStream outputStream)
throws IOException, BlockNotFoundException;
public long getBlockId();
}
/**
* Partition block representing chunk of data from block files
*/
public static class StitchBlockMetaData extends FileBlockMetadata implements StitchBlock
{
/**
* Buffer size to be used for reading data from block files
*/
public static final int BUFFER_SIZE = 64 * 1024;
/**
* Relative path of the source file (w.r.t input directory)
*/
String sourceRelativePath;
/**
* Is this the last block for source file
*/
boolean isLastBlockSource;
/**
* Default constructor for serialization
*/
public StitchBlockMetaData()
{
super();
}
/**
* @param fileBlockMetadata
* @param sourceRelativePath
* @param isLastBlockSource
*/
public StitchBlockMetaData(FileBlockMetadata fmd, String sourceRelativePath, boolean isLastBlockSource)
{
super(fmd.getFilePath(), fmd.getBlockId(), fmd.getOffset(), fmd.getLength(), fmd.isLastBlock(),
fmd.getPreviousBlockId());
this.sourceRelativePath = sourceRelativePath;
this.isLastBlockSource = isLastBlockSource;
}
/**
* @return the sourceRelativePath
*/
public String getSourceRelativePath()
{
return sourceRelativePath;
}
/**
* @param sourceRelativePath
* the sourceRelativePath to set
*/
public void setSourceRelativePath(String sourceRelativePath)
{
this.sourceRelativePath = sourceRelativePath;
}
/**
* @return the isLastBlockSource
*/
public boolean isLastBlockSource()
{
return isLastBlockSource;
}
/**
* @param isLastBlockSource
* the isLastBlockSource to set
*/
public void setLastBlockSource(boolean isLastBlockSource)
{
this.isLastBlockSource = isLastBlockSource;
}
@Override
public void writeTo(FileSystem appFS, String blocksDir, OutputStream outputStream)
throws IOException, BlockNotFoundException
{
Path blockPath = new Path(blocksDir, Long.toString(getBlockId()));
if (!appFS.exists(blockPath)) {
throw new BlockNotFoundException(blockPath);
}
writeTo(appFS, blocksDir, outputStream, 0, appFS.getFileStatus(blockPath).getLen());
}
public void writeTo(FileSystem appFS, String blocksDir, OutputStream outputStream, long offset, long length)
throws IOException, BlockNotFoundException
{
InputStream inStream = null;
byte[] buffer = new byte[BUFFER_SIZE];
int inputBytesRead;
Path blockPath = new Path(blocksDir, Long.toString(getBlockId()));
if (!appFS.exists(blockPath)) {
throw new BlockNotFoundException(blockPath);
}
inStream = appFS.open(blockPath);
try {
inStream.skip(offset);
long bytesRemainingToRead = length;
int bytesToread = Math.min(BUFFER_SIZE, (int)bytesRemainingToRead);
while (((inputBytesRead = inStream.read(buffer, 0, bytesToread)) != -1) && bytesRemainingToRead > 0) {
outputStream.write(buffer, 0, inputBytesRead);
bytesRemainingToRead -= inputBytesRead;
bytesToread = Math.min(BUFFER_SIZE, (int)bytesRemainingToRead);
}
} finally {
inStream.close();
}
}
}
}