blob: cd2bee3d8c26db46231917e54d108bcb8b6eff9c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.datatorrent.api.Context.OperatorContext;
* HDFS file merger extends file merger to optimize for HDFS file copy usecase.
* This uses fast merge from HDFS if destination filesystem is same as
* application filesystem.
* @since 3.4.0
public class HDFSFileMerger extends FileMerger
* Fast merge is possible if append is allowed for output file system.
private transient boolean fastMergeActive;
* Default block size for output file system
private transient long defaultBlockSize;
* Decision maker to enable fast merge based on blocks directory, application
* directory, block size
private transient FastMergerDecisionMaker fastMergerDecisionMaker;
* Initializations based on output file system configuration
public void setup(OperatorContext context)
fastMergeActive = outputFS.getConf().getBoolean("", true)
&& appFS.getUri().equals(outputFS.getUri());
LOG.debug("appFS.getUri():{}", appFS.getUri());
LOG.debug("outputFS.getUri():{}", outputFS.getUri());
defaultBlockSize = outputFS.getDefaultBlockSize(new Path(filePath));
fastMergerDecisionMaker = new FastMergerDecisionMaker(blocksDirectoryPath, appFS, defaultBlockSize);
* Uses fast merge if possible. Else, fall back to serial merge.
protected void mergeBlocks(OutputFileMetadata outputFileMetadata) throws IOException
try {
LOG.debug("fastMergeActive: {}", fastMergeActive);
if (fastMergeActive && fastMergerDecisionMaker.isFastMergePossible(outputFileMetadata)
&& outputFileMetadata.getNumberOfBlocks() > 0) {
LOG.debug("Using fast merge on HDFS.");
LOG.debug("Falling back to slow merge on HDFS.");
} catch (BlockNotFoundException e) {
if (recover(outputFileMetadata)) {
LOG.debug("Recovery attempt successful.");
} else {
* Fast merge using HDFS block concat
* @param outputFileMetadata
* @throws IOException
private void concatBlocks(OutputFileMetadata outputFileMetadata) throws IOException
Path outputFilePath = new Path(filePath, outputFileMetadata.getRelativePath());
int numBlocks = outputFileMetadata.getNumberOfBlocks();
long[] blocksArray = outputFileMetadata.getBlockIds();
Path firstBlock = new Path(blocksDirectoryPath, Long.toString(blocksArray[0]));
if (numBlocks > 1) {
Path[] blockFiles = new Path[numBlocks - 1]; // Leave the first block
for (int index = 1; index < numBlocks; index++) {
blockFiles[index - 1] = new Path(blocksDirectoryPath, Long.toString(blocksArray[index]));
outputFS.concat(firstBlock, blockFiles);
moveToFinalFile(firstBlock, outputFilePath);
* Attempt for recovery if block concat is successful but temp file is not
* moved to final file
* @param outputFileMetadata
* @throws IOException
protected boolean recover(OutputFileMetadata outputFileMetadata) throws IOException
Path firstBlockPath = new Path(blocksDirectoryPath + Path.SEPARATOR + outputFileMetadata.getBlockIds()[0]);
Path outputFilePath = new Path(filePath, outputFileMetadata.getRelativePath());
if (appFS.exists(firstBlockPath)) {
FileStatus status = appFS.getFileStatus(firstBlockPath);
if (status.getLen() == outputFileMetadata.getFileLength()) {
moveToFinalFile(firstBlockPath, outputFilePath);
return true;
LOG.error("Unable to recover in FileMerger for file: {}", outputFilePath);
return false;
if (outputFS.exists(outputFilePath)) {
LOG.debug("Output file already present at the destination, nothing to recover.");
return true;
LOG.error("Unable to recover in FileMerger for file: {}", outputFilePath);
return false;
private static final Logger LOG = LoggerFactory.getLogger(HDFSFileMerger.class);
* Utility class to decide fast merge possibility
public static class FastMergerDecisionMaker
private String blocksDir;
private FileSystem appFS;
private long defaultBlockSize;
public FastMergerDecisionMaker(String blocksDir, FileSystem appFS, long defaultBlockSize)
this.blocksDir = blocksDir;
this.appFS = appFS;
this.defaultBlockSize = defaultBlockSize;
* Checks if fast merge is possible for given settings for blocks directory,
* application file system, block size
* @param outputFileMetadata
* @throws IOException
* @throws BlockNotFoundException
public boolean isFastMergePossible(OutputFileMetadata outputFileMetadata) throws IOException, BlockNotFoundException
short replicationFactor = 0;
boolean sameReplicationFactor = true;
boolean multipleOfBlockSize = true;
int numBlocks = outputFileMetadata.getNumberOfBlocks();
LOG.debug("fileMetadata.getNumberOfBlocks(): {}", outputFileMetadata.getNumberOfBlocks());
long[] blocksArray = outputFileMetadata.getBlockIds();
LOG.debug("fileMetadata.getBlockIds().len: {}", outputFileMetadata.getBlockIds().length);
for (int index = 0; index < numBlocks && (sameReplicationFactor && multipleOfBlockSize); index++) {
Path blockFilePath = new Path(blocksDir + Path.SEPARATOR + blocksArray[index]);
if (!appFS.exists(blockFilePath)) {
throw new BlockNotFoundException(blockFilePath);
FileStatus status = appFS.getFileStatus(new Path(blocksDir + Path.SEPARATOR + blocksArray[index]));
if (index == 0) {
replicationFactor = status.getReplication();
LOG.debug("replicationFactor: {}", replicationFactor);
} else {
sameReplicationFactor = (replicationFactor == status.getReplication());
LOG.debug("sameReplicationFactor: {}", sameReplicationFactor);
if (index != numBlocks - 1) {
multipleOfBlockSize = (status.getLen() % defaultBlockSize == 0);
LOG.debug("multipleOfBlockSize: {}", multipleOfBlockSize);
return sameReplicationFactor && multipleOfBlockSize;