blob: db3d7416b5957e3b494b66d00059cc12cd278bd5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.controlprogram.parfor;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.parfor.util.Cell;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.controlprogram.parfor.util.StagingFileUtils;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.FastStringTokenizer;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.LocalFileUtils;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map.Entry;
/**
*
* TODO potential extension: parallel merge (create individual staging files concurrently)
*
* NOTE: file merge typically used due to memory constraints - parallel merge would increase the memory
* consumption again.
*/
public class ResultMergeLocalFile extends ResultMerge
{
private static final long serialVersionUID = -6905893742840020489L;
//NOTE: if we allow simple copies, this might result in a scattered file and many MR tasks for subsequent jobs
public static final boolean ALLOW_COPY_CELLFILES = false;
//internal comparison matrix
private IDSequence _seq = null;
public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum )
{
super( out, in, outputFilename, accum );
_seq = new IDSequence();
}
@Override
public MatrixObject executeSerialMerge() {
MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Execute serial merge for output "
+_output.hashCode()+" (fname="+_output.getFileName()+")");
try
{
//collect all relevant inputs
ArrayList<MatrixObject> inMO = new ArrayList<>();
for( MatrixObject in : _inputs )
{
//check for empty inputs (no iterations executed)
if( in !=null && in != _output )
{
//ensure that input file resides on disk
in.exportData();
//add to merge list
inMO.add( in );
}
}
if( !inMO.isEmpty() )
{
//ensure that outputfile (for comparison) resides on disk
_output.exportData();
//actual merge
merge( _outputFName, _output, inMO );
//create new output matrix (e.g., to prevent potential export<->read file access conflict
moNew = createNewMatrixObject( _output, inMO );
}
else
{
moNew = _output; //return old matrix, to prevent copy
}
}
catch(Exception ex)
{
throw new DMLRuntimeException(ex);
}
//LOG.trace("ResultMerge (local, file): Executed serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
return moNew;
}
@Override
public MatrixObject executeParallelMerge(int par) {
//graceful degradation to serial merge
return executeSerialMerge();
}
private MatrixObject createNewMatrixObject(MatrixObject output, ArrayList<MatrixObject> inMO) {
MetaDataFormat metadata = (MetaDataFormat) _output.getMetaData();
MatrixObject moNew = new MatrixObject( _output.getValueType(), _outputFName );
//create deep copy of metadata obj
DataCharacteristics mcOld = metadata.getDataCharacteristics();
MatrixCharacteristics mc = new MatrixCharacteristics(mcOld);
mc.setNonZeros(_isAccum ? -1 : computeNonZeros(output, inMO));
moNew.setMetaData(new MetaDataFormat(mc, metadata.getFileFormat()));
return moNew;
}
private void merge( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
{
FileFormat fmt = ((MetaDataFormat)outMo.getMetaData()).getFileFormat();
boolean withCompare = ( outMo.getNnz() != 0 ); //if nnz exist or unknown (-1)
if( fmt == FileFormat.TEXT ) {
if(withCompare)
mergeTextCellWithComp(fnameNew, outMo, inMO);
else
mergeTextCellWithoutComp( fnameNew, outMo, inMO );
}
else if( fmt == FileFormat.BINARY ) {
if(withCompare)
mergeBinaryBlockWithComp( fnameNew, outMo, inMO );
else
mergeBinaryBlockWithoutComp( fnameNew, outMo, inMO );
}
}
private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
{
try
{
//delete target file if already exists
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
if( ALLOW_COPY_CELLFILES )
{
copyAllFiles(fnameNew, inMO);
return; //we're done
}
//actual merge
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fnameNew );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
String valueStr = null;
try
{
for( MatrixObject in : inMO ) //read/write all inputs
{
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="
+in.getFileName()+") via stream merge");
JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
Path tmpPath = new Path(in.getFileName());
FileInputFormat.addInputPath(tmpJob, tmpPath);
TextInputFormat informat = new TextInputFormat();
informat.configure(tmpJob);
InputSplit[] splits = informat.getSplits(tmpJob, 1);
LongWritable key = new LongWritable();
Text value = new Text();
for(InputSplit split: splits)
{
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
try
{
while(reader.next(key, value))
{
valueStr = value.toString().trim();
out.write( valueStr+"\n" );
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
}
finally {
IOUtilFunctions.closeSilently(out);
}
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to merge text cell results.", ex);
}
}
private void mergeTextCellWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
{
String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
try
{
//delete target file if already exists
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
//Step 0) write compare blocks to staging area (if necessary)
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Create merge compare matrix for output "
+outMo.hashCode()+" (fname="+outMo.getFileName()+")");
createTextCellStagingFile(fnameStagingCompare, outMo, 0);
//Step 1) read and write blocks to staging area
for( MatrixObject in : inMO )
{
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
long ID = _seq.getNextID();
createTextCellStagingFile( fnameStaging, in, ID );
}
//Step 2) read blocks, consolidate, and write to HDFS
createTextCellResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MetaDataFormat)outMo.getMetaData(), true);
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to merge text cell results.", ex);
}
LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
}
private void mergeBinaryBlockWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
{
String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
try
{
//delete target file if already exists
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
//Step 1) read and write blocks to staging area
for( MatrixObject in : inMO )
{
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
createBinaryBlockStagingFile( fnameStaging, in );
}
//Step 2) read blocks, consolidate, and write to HDFS
createBinaryBlockResultFile(fnameStaging, null, fnameNew, (MetaDataFormat)outMo.getMetaData(), false);
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to merge binary block results.", ex);
}
LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
}
private void mergeBinaryBlockWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
{
String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
try
{
//delete target file if already exists
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
//Step 0) write compare blocks to staging area (if necessary)
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Create merge compare matrix for output "
+outMo.hashCode()+" (fname="+outMo.getFileName()+")");
createBinaryBlockStagingFile(fnameStagingCompare, outMo);
//Step 1) read and write blocks to staging area
for( MatrixObject in : inMO )
{
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
createBinaryBlockStagingFile( fnameStaging, in );
}
//Step 2) read blocks, consolidate, and write to HDFS
createBinaryBlockResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MetaDataFormat)outMo.getMetaData(), true);
}
catch(Exception ex)
{
throw new DMLRuntimeException("Unable to merge binary block results.", ex);
}
LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
}
@SuppressWarnings("deprecation")
private void createBinaryBlockStagingFile( String fnameStaging, MatrixObject mo )
throws IOException
{
MatrixIndexes key = new MatrixIndexes();
MatrixBlock value = new MatrixBlock();
JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
Path tmpPath = new Path(mo.getFileName());
FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob);
for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath))
{
SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
try
{
while(reader.next(key, value)) //for each block
{
String lname = key.getRowIndex()+"_"+key.getColumnIndex();
String dir = fnameStaging+"/"+lname;
if( value.getNonZeros()>0 ) //write only non-empty blocks
{
LocalFileUtils.checkAndCreateStagingDir( dir );
LocalFileUtils.writeMatrixBlockToLocal(dir+"/"+_seq.getNextID(), value);
}
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
private static void createTextCellStagingFile( String fnameStaging, MatrixObject mo, long ID )
throws IOException, DMLRuntimeException
{
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(mo.getFileName());
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LinkedList<Cell> buffer = new LinkedList<>();
LongWritable key = new LongWritable();
Text value = new Text();
DataCharacteristics mc = mo.getDataCharacteristics();
int blen = mc.getBlocksize();
//long row = -1, col = -1; //FIXME needs reconsideration whenever textcell is used actively
//NOTE MB: Originally, we used long row, col but this led reproducibly to JIT compilation
// errors during runtime; experienced under WINDOWS, Intel x86-64, IBM JDK 64bit/32bit.
// It works fine with int row, col but we require long for larger matrices.
// Since, textcell is never used for result merge (hybrid/hadoop: binaryblock, singlenode:binarycell)
// we just propose the to exclude it with -Xjit:exclude={package.method*}(count=0,optLevel=0)
FastStringTokenizer st = new FastStringTokenizer(' ');
for(InputSplit split : splits)
{
RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try
{
while(reader.next(key, value))
{
st.reset( value.toString() ); //reset tokenizer
long row = st.nextLong();
long col = st.nextLong();
double lvalue = Double.parseDouble( st.nextToken() );
Cell tmp = new Cell( row, col, lvalue );
buffer.addLast( tmp );
if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
{
appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
buffer.clear();
}
}
//final flush
if( !buffer.isEmpty() )
{
appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
buffer.clear();
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
private static void appendCellBufferToStagingArea( String fnameStaging, long ID, LinkedList<Cell> buffer, int blen )
throws IOException
{
HashMap<Long,HashMap<Long,LinkedList<Cell>>> sortedBuffer = new HashMap<>();
long brow, bcol, row_offset, col_offset;
for( Cell c : buffer )
{
brow = (c.getRow()-1)/blen + 1;
bcol = (c.getCol()-1)/blen + 1;
row_offset = (brow-1)*blen + 1;
col_offset = (bcol-1)*blen + 1;
c.setRow( c.getRow() - row_offset);
c.setCol(c.getCol() - col_offset);
if( !sortedBuffer.containsKey(brow) )
sortedBuffer.put(brow, new HashMap<Long,LinkedList<Cell>>());
if( !sortedBuffer.get(brow).containsKey(bcol) )
sortedBuffer.get(brow).put(bcol, new LinkedList<Cell>());
sortedBuffer.get(brow).get(bcol).addLast(c);
}
//write lists of cells to local files
for( Entry<Long,HashMap<Long,LinkedList<Cell>>> e : sortedBuffer.entrySet() )
{
brow = e.getKey();
for( Entry<Long,LinkedList<Cell>> e2 : e.getValue().entrySet() )
{
bcol = e2.getKey();
String lname = brow+"_"+bcol;
String dir = fnameStaging+"/"+lname;
LocalFileUtils.checkAndCreateStagingDir( dir );
StagingFileUtils.writeCellListToLocal(dir+"/"+ID, e2.getValue());
}
}
}
@SuppressWarnings("deprecation")
private void createBinaryBlockResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean withCompare )
throws IOException, DMLRuntimeException
{
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fnameNew );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
DataCharacteristics mc = metadata.getDataCharacteristics();
long rlen = mc.getRows();
long clen = mc.getCols();
int blen = mc.getBlocksize();
try(SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class))
{
MatrixIndexes indexes = new MatrixIndexes();
for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)blen); brow++)
for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)blen); bcol++)
{
File dir = new File(fnameStaging+"/"+brow+"_"+bcol);
File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol);
MatrixBlock mb = null;
if( dir.exists() )
{
if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK
{
//copy only values that are different from the original
String[] lnames2 = dir2.list();
if( lnames2.length != 1 ) //there should be exactly 1 compare block
throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found.");
mb = LocalFileUtils.readMatrixBlockFromLocal( dir2+"/"+lnames2[0] );
boolean appendOnly = mb.isInSparseFormat();
DenseBlock compare = DataConverter.convertToDenseBlock(mb, false);
for( String lname : dir.list() ) {
MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
mergeWithComp(mb, tmp, compare);
}
//sort sparse due to append-only
if( appendOnly && !_isAccum )
mb.sortSparseRows();
//change sparsity if required after
mb.examSparsity();
}
else //WITHOUT COMPARE BLOCK
{
//copy all non-zeros from all workers
boolean appendOnly = false;
for( String lname : dir.list() ) {
if( mb == null ) {
mb = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
appendOnly = mb.isInSparseFormat();
}
else {
MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
mergeWithoutComp(mb, tmp, appendOnly);
}
}
//sort sparse due to append-only
if( appendOnly && !_isAccum )
mb.sortSparseRows();
//change sparsity if required after
mb.examSparsity();
}
}
else {
//NOTE: whenever runtime does not need all blocks anymore, this can be removed
int maxRow = (int)(((brow-1)*blen + blen < rlen) ? blen : rlen - (brow-1)*blen);
int maxCol = (int)(((bcol-1)*blen + blen < clen) ? blen : clen - (bcol-1)*blen);
mb = new MatrixBlock(maxRow, maxCol, true);
}
//mb.examSparsity(); //done on write anyway and mb not reused
indexes.setIndexes(brow, bcol);
writer.append(indexes, mb);
}
}
}
@SuppressWarnings("resource")
private void createTextCellResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean withCompare )
throws IOException, DMLRuntimeException
{
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fnameNew );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
DataCharacteristics mc = metadata.getDataCharacteristics();
long rlen = mc.getRows();
long clen = mc.getCols();
int blen = mc.getBlocksize();
try( BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))) ) {
//for obj reuse and preventing repeated buffer re-allocations
StringBuilder sb = new StringBuilder();
boolean written=false;
for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)blen); brow++)
for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)blen); bcol++)
{
File dir = new File(fnameStaging+"/"+brow+"_"+bcol);
File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol);
MatrixBlock mb = null;
long row_offset = (brow-1)*blen + 1;
long col_offset = (bcol-1)*blen + 1;
if( dir.exists() )
{
if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK
{
//copy only values that are different from the original
String[] lnames2 = dir2.list();
if( lnames2.length != 1 ) //there should be exactly 1 compare block
throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found.");
mb = StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], blen );
boolean appendOnly = mb.isInSparseFormat();
DenseBlock compare = DataConverter.convertToDenseBlock(mb, false);
for( String lname : dir.list() ) {
MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen );
mergeWithComp(mb, tmp, compare);
}
//sort sparse and exam sparsity due to append-only
if( appendOnly && !_isAccum )
mb.sortSparseRows();
//change sparsity if required after
mb.examSparsity();
}
else //WITHOUT COMPARE BLOCK
{
//copy all non-zeros from all workers
boolean appendOnly = false;
for( String lname : dir.list() ) {
if( mb == null ) {
mb = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen );
appendOnly = mb.isInSparseFormat();
}
else {
MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen );
mergeWithoutComp(mb, tmp, appendOnly);
}
}
//sort sparse due to append-only
if( appendOnly && !_isAccum )
mb.sortSparseRows();
//change sparsity if required after
mb.examSparsity();
}
}
//write the block to text cell
if( mb!=null )
{
if( mb.isInSparseFormat() ) {
Iterator<IJV> iter = mb.getSparseBlockIterator();
while( iter.hasNext() ) {
IJV lcell = iter.next();
sb.append(row_offset+lcell.getI());
sb.append(' ');
sb.append(col_offset+lcell.getJ());
sb.append(' ');
sb.append(lcell.getV());
sb.append('\n');
out.write( sb.toString() );
sb.setLength(0);
written = true;
}
}
else {
for( int i=0; i<blen; i++ )
for( int j=0; j<blen; j++ )
{
double lvalue = mb.getValueDenseUnsafe(i, j);
if( lvalue != 0 ) //for nnz
{
sb.append(row_offset+i);
sb.append(' ');
sb.append(col_offset+j);
sb.append(' ');
sb.append(lvalue);
sb.append('\n');
out.write( sb.toString() );
sb.setLength(0);
written = true;
}
}
}
}
}
if( !written )
out.write(IOUtilFunctions.EMPTY_TEXT_LINE);
}
}
private static void copyAllFiles( String fnameNew, ArrayList<MatrixObject> inMO )
throws IOException
{
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fnameNew );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
//create output dir
fs.mkdirs(path);
//merge in all input matrix objects
IDSequence seq = new IDSequence();
for( MatrixObject in : inMO )
{
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()
+" (fname="+in.getFileName()+") via file rename.");
//copy over files (just rename file or entire dir)
Path tmpPath = new Path(in.getFileName());
String lname = tmpPath.getName();
fs.rename(tmpPath, new Path(fnameNew+"/"+lname+seq.getNextID()));
}
}
}