blob: cd2bee3d8c26db46231917e54d108bcb8b6eff9c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.fs;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.annotations.VisibleForTesting;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.lib.io.fs.Synchronizer.OutputFileMetadata;
/**
* HDFS file merger extends file merger to optimize for HDFS file copy usecase.
* This uses fast merge from HDFS if destination filesystem is same as
* application filesystem.
*
* @since 3.4.0
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class HDFSFileMerger extends FileMerger
{
/**
* Fast merge is possible if append is allowed for output file system.
*/
private transient boolean fastMergeActive;
/**
* Default block size for output file system
*/
private transient long defaultBlockSize;
/**
* Decision maker to enable fast merge based on blocks directory, application
* directory, block size
*/
private transient FastMergerDecisionMaker fastMergerDecisionMaker;
/**
* Initializations based on output file system configuration
*/
@Override
public void setup(OperatorContext context)
{
super.setup(context);
fastMergeActive = outputFS.getConf().getBoolean("dfs.support.append", true)
&& appFS.getUri().equals(outputFS.getUri());
LOG.debug("appFS.getUri():{}", appFS.getUri());
LOG.debug("outputFS.getUri():{}", outputFS.getUri());
defaultBlockSize = outputFS.getDefaultBlockSize(new Path(filePath));
fastMergerDecisionMaker = new FastMergerDecisionMaker(blocksDirectoryPath, appFS, defaultBlockSize);
}
/**
* Uses fast merge if possible. Else, fall back to serial merge.
*/
@Override
protected void mergeBlocks(OutputFileMetadata outputFileMetadata) throws IOException
{
try {
LOG.debug("fastMergeActive: {}", fastMergeActive);
if (fastMergeActive && fastMergerDecisionMaker.isFastMergePossible(outputFileMetadata)
&& outputFileMetadata.getNumberOfBlocks() > 0) {
LOG.debug("Using fast merge on HDFS.");
concatBlocks(outputFileMetadata);
return;
}
LOG.debug("Falling back to slow merge on HDFS.");
super.mergeBlocks(outputFileMetadata);
} catch (BlockNotFoundException e) {
if (recover(outputFileMetadata)) {
LOG.debug("Recovery attempt successful.");
successfulFiles.add(outputFileMetadata);
} else {
failedFiles.add(outputFileMetadata);
}
}
}
/**
* Fast merge using HDFS block concat
*
* @param outputFileMetadata
* @throws IOException
*/
private void concatBlocks(OutputFileMetadata outputFileMetadata) throws IOException
{
Path outputFilePath = new Path(filePath, outputFileMetadata.getRelativePath());
int numBlocks = outputFileMetadata.getNumberOfBlocks();
long[] blocksArray = outputFileMetadata.getBlockIds();
Path firstBlock = new Path(blocksDirectoryPath, Long.toString(blocksArray[0]));
if (numBlocks > 1) {
Path[] blockFiles = new Path[numBlocks - 1]; // Leave the first block
for (int index = 1; index < numBlocks; index++) {
blockFiles[index - 1] = new Path(blocksDirectoryPath, Long.toString(blocksArray[index]));
}
outputFS.concat(firstBlock, blockFiles);
}
moveToFinalFile(firstBlock, outputFilePath);
}
/**
* Attempt for recovery if block concat is successful but temp file is not
* moved to final file
*
* @param outputFileMetadata
* @throws IOException
*/
@VisibleForTesting
protected boolean recover(OutputFileMetadata outputFileMetadata) throws IOException
{
Path firstBlockPath = new Path(blocksDirectoryPath + Path.SEPARATOR + outputFileMetadata.getBlockIds()[0]);
Path outputFilePath = new Path(filePath, outputFileMetadata.getRelativePath());
if (appFS.exists(firstBlockPath)) {
FileStatus status = appFS.getFileStatus(firstBlockPath);
if (status.getLen() == outputFileMetadata.getFileLength()) {
moveToFinalFile(firstBlockPath, outputFilePath);
return true;
}
LOG.error("Unable to recover in FileMerger for file: {}", outputFilePath);
return false;
}
if (outputFS.exists(outputFilePath)) {
LOG.debug("Output file already present at the destination, nothing to recover.");
return true;
}
LOG.error("Unable to recover in FileMerger for file: {}", outputFilePath);
return false;
}
private static final Logger LOG = LoggerFactory.getLogger(HDFSFileMerger.class);
/**
* Utility class to decide fast merge possibility
*/
public static class FastMergerDecisionMaker
{
private String blocksDir;
private FileSystem appFS;
private long defaultBlockSize;
public FastMergerDecisionMaker(String blocksDir, FileSystem appFS, long defaultBlockSize)
{
this.blocksDir = blocksDir;
this.appFS = appFS;
this.defaultBlockSize = defaultBlockSize;
}
/**
* Checks if fast merge is possible for given settings for blocks directory,
* application file system, block size
*
* @param outputFileMetadata
* @throws IOException
* @throws BlockNotFoundException
*/
public boolean isFastMergePossible(OutputFileMetadata outputFileMetadata) throws IOException, BlockNotFoundException
{
short replicationFactor = 0;
boolean sameReplicationFactor = true;
boolean multipleOfBlockSize = true;
int numBlocks = outputFileMetadata.getNumberOfBlocks();
LOG.debug("fileMetadata.getNumberOfBlocks(): {}", outputFileMetadata.getNumberOfBlocks());
long[] blocksArray = outputFileMetadata.getBlockIds();
LOG.debug("fileMetadata.getBlockIds().len: {}", outputFileMetadata.getBlockIds().length);
for (int index = 0; index < numBlocks && (sameReplicationFactor && multipleOfBlockSize); index++) {
Path blockFilePath = new Path(blocksDir + Path.SEPARATOR + blocksArray[index]);
if (!appFS.exists(blockFilePath)) {
throw new BlockNotFoundException(blockFilePath);
}
FileStatus status = appFS.getFileStatus(new Path(blocksDir + Path.SEPARATOR + blocksArray[index]));
if (index == 0) {
replicationFactor = status.getReplication();
LOG.debug("replicationFactor: {}", replicationFactor);
} else {
sameReplicationFactor = (replicationFactor == status.getReplication());
LOG.debug("sameReplicationFactor: {}", sameReplicationFactor);
}
if (index != numBlocks - 1) {
multipleOfBlockSize = (status.getLen() % defaultBlockSize == 0);
LOG.debug("multipleOfBlockSize: {}", multipleOfBlockSize);
}
}
return sameReplicationFactor && multipleOfBlockSize;
}
}
}