blob: 880420a7421d33af157b4bc697ae1641210cd8b2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.broadcast.output;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import com.google.common.base.Preconditions;
public class FileBasedKVWriter implements KeyValueWriter {
private static final Log LOG = LogFactory.getLog(FileBasedKVWriter.class);
public static final int INDEX_RECORD_LENGTH = 24;
private final Configuration conf;
private int numRecords = 0;
@SuppressWarnings("rawtypes")
private final Class keyClass;
@SuppressWarnings("rawtypes")
private final Class valClass;
private final CompressionCodec codec;
private final FileSystem rfs;
private final IFile.Writer writer;
private final Path outputPath;
private Path indexPath;
private final TezTaskOutput ouputFileManager;
private boolean closed = false;
// Number of output key-value pairs
private final TezCounter outputRecordsCounter;
// Number of bytes of actual output - uncompressed.
private final TezCounter outputBytesCounter;
// Size of the data with additional meta-data
private final TezCounter outputBytesCounterWithOverhead;
// Actual physical size of the data on disk.
private final TezCounter outputMaterializedBytesCounter;
// TODO NEWTEZ Define Counters
// Number of records
// Time waiting for a write to complete, if that's possible.
// Size of key-value pairs written.
public FileBasedKVWriter(TezOutputContext outputContext, Configuration conf) throws IOException {
this.conf = conf;
this.outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
this.outputBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
this.outputBytesCounterWithOverhead = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
this.outputMaterializedBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
// Setup serialization
keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
// Setup compression
if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
Class<? extends CompressionCodec> codecClass = ConfigUtils
.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, this.conf);
} else {
codec = null;
}
this.ouputFileManager = TezRuntimeUtils.instantiateTaskOutputManager(conf,
outputContext);
LOG.info("Created KVWriter -> " + "compressionCodec: " + (codec == null ? "NoCompressionCodec"
: codec.getClass().getName()));
this.outputPath = ouputFileManager.getOutputFileForWrite();
LOG.info("Writing data file: " + outputPath);
// TODO NEWTEZ Consider making the buffer size configurable. Also consider
// setting up an in-memory buffer which is occasionally flushed to disk so
// that the output does not block.
// TODO NEWTEZ maybe use appropriate counter
this.writer = new IFile.Writer(conf, rfs, outputPath, keyClass, valClass,
codec, null, outputBytesCounter);
}
/**
* @return true if any output was generated. false otherwise
* @throws IOException
*/
public boolean close() throws IOException {
this.closed = true;
this.writer.close();
long rawLen = writer.getRawLength();
long compLen = writer.getCompressedLength();
outputBytesCounterWithOverhead.increment(rawLen);
outputMaterializedBytesCounter.increment(compLen);
TezIndexRecord rec = new TezIndexRecord(0, rawLen, compLen);
TezSpillRecord sr = new TezSpillRecord(1);
sr.putIndex(rec, 0);
this.indexPath = ouputFileManager
.getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
LOG.info("Writing index file: " + indexPath);
sr.writeToFile(indexPath, conf);
return numRecords > 0;
}
@Override
public void write(Object key, Object value) throws IOException {
this.writer.append(key, value);
this.outputRecordsCounter.increment(1);
numRecords++;
}
public long getRawLength() {
Preconditions.checkState(closed, "Only available after the Writer has been closed");
return this.writer.getRawLength();
}
public long getCompressedLength() {
Preconditions.checkState(closed, "Only available after the Writer has been closed");
return this.writer.getCompressedLength();
}
public byte[] getData() throws IOException {
Preconditions.checkState(closed,
"Only available after the Writer has been closed");
FSDataInputStream inStream = null;
byte[] buf = null;
try {
inStream = rfs.open(outputPath);
buf = new byte[(int) getCompressedLength()];
IOUtils.readFully(inStream, buf, 0, (int) getCompressedLength());
} finally {
if (inStream != null) {
inStream.close();
}
}
return buf;
}
}