blob: 8f4a6ae76aa1508ebe86ac1c0fdff6737779c8f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.rawfile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.FileAppender;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TableStatistics;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.tuple.memory.OffHeapRowBlockUtils.TupleConverter;
import org.apache.tajo.tuple.memory.RowWriter;
import org.apache.tajo.tuple.memory.UnSafeTuple;
import org.apache.tajo.unit.StorageUnit;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
public class DirectRawFileWriter extends FileAppender {
private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class);
public static final String WRITE_BUFFER_SIZE = "tajo.storage.raw.io.write-buffer.bytes";
public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB;
private static final float OVERFLOW_RATIO = 1.1f;
protected FileChannel channel;
protected RandomAccessFile randomAccessFile;
protected FSDataOutputStream fos;
protected long pos;
protected TableStatistics stats;
protected TupleConverter tupleConverter;
protected MemoryRowBlock rowBlock;
protected boolean analyzeField;
protected boolean hasExternalBuf;
protected boolean isLocal;
public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId,
final Schema schema, final TableMeta meta, final Path path)
throws IOException {
this(conf, taskAttemptId, schema, meta, path, null);
}
public DirectRawFileWriter(Configuration conf, TaskAttemptId taskAttemptId,
final Schema schema, final TableMeta meta, final Path path,
MemoryRowBlock rowBlock) throws IOException {
super(conf, taskAttemptId, schema, meta, path);
this.rowBlock = rowBlock;
this.hasExternalBuf = rowBlock != null;
}
@Override
public void init() throws IOException {
File file;
FileSystem fs = path.getFileSystem(conf);
if (fs instanceof LocalFileSystem) {
try {
if (path.toUri().getScheme() != null) {
file = new File(path.toUri());
} else {
file = new File(path.toString());
}
} catch (IllegalArgumentException iae) {
throw new IOException(iae);
}
randomAccessFile = new RandomAccessFile(file, "rw");
channel = randomAccessFile.getChannel();
isLocal = true;
} else {
fos = fs.create(path, true);
isLocal = false;
}
if (tableStatsEnabled) {
this.stats = new TableStatistics(this.schema, columnStatsEnabled);
if (ShuffleType.RANGE_SHUFFLE == PlannerUtil.getShuffleType(
meta.getProperty(StorageConstants.SHUFFLE_TYPE,
PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)))) {
this.analyzeField = true;
}
}
if (rowBlock == null) {
int bufferSize = (int) (conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE) * OVERFLOW_RATIO);
rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(schema), bufferSize, true, meta.getDataFormat());
}
tupleConverter = initConverter();
pos = 0;
super.init();
}
public TupleConverter initConverter() {
switch (meta.getDataFormat()) {
case BuiltinStorages.DRAW:
return getDrawConverter();
case BuiltinStorages.RAW:
return getRawConverter();
default:
throw new TajoInternalError(new UnsupportedException());
}
}
private TupleConverter getDrawConverter() {
return new TupleConverter() {
@Override
public void convert(Tuple tuple, RowWriter writer) {
if (analyzeField) {
if (tuple instanceof UnSafeTuple) {
for (int i = 0; i < writer.dataTypes().length; i++) {
// it is to calculate min/max values, and it is only used for the intermediate file.
stats.analyzeField(i, tuple);
}
// write direct to memory
writer.addTuple(tuple);
} else {
writer.startRow();
for (int i = 0; i < writer.dataTypes().length; i++) {
// it is to calculate min/max values, and it is only used for the intermediate file.
stats.analyzeField(i, tuple);
writeField(i, tuple, writer);
}
writer.endRow();
}
} else {
// write direct to memory
writer.addTuple(tuple);
}
}
};
}
private TupleConverter getRawConverter() {
return new TupleConverter() {
@Override
public void convert(Tuple tuple, RowWriter writer) {
writer.startRow();
for (int i = 0; i < writer.dataTypes().length; i++) {
// it is to calculate min/max values, and it is only used for the intermediate file.
if (analyzeField) {
stats.analyzeField(i, tuple);
}
writeField(i, tuple, writer);
}
writer.endRow();
}
};
}
@Override
public long getOffset() throws IOException {
return hasExternalBuf ? pos : pos + rowBlock.getMemory().writerPosition();
}
public void writeRowBlock(MemoryRowBlock rowBlock) throws IOException {
if(isLocal) {
pos += rowBlock.getMemory().writeTo(channel);
} else {
pos += rowBlock.getMemory().writeTo(fos);
}
if (tableStatsEnabled) {
stats.incrementRows(rowBlock.rows());
}
}
@Override
public void addTuple(Tuple t) throws IOException {
tupleConverter.convert(t, rowBlock.getWriter());
if(rowBlock.usedMem() > DEFAULT_BUFFER_SIZE) {
writeRowBlock(rowBlock);
rowBlock.clear();
}
}
@Override
public void flush() throws IOException {
if(!hasExternalBuf && rowBlock.getMemory().isReadable()) {
writeRowBlock(rowBlock);
rowBlock.clear();
}
}
@Override
public void close() throws IOException {
flush();
if (tableStatsEnabled) {
stats.setNumBytes(getOffset());
}
if (LOG.isDebugEnabled()) {
LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
}
IOUtils.cleanup(LOG, channel, randomAccessFile, fos);
if(!hasExternalBuf && rowBlock != null) {
rowBlock.release();
}
}
@Override
public TableStats getStats() {
if (tableStatsEnabled) {
stats.setNumBytes(pos);
return stats.getTableStat();
} else {
return null;
}
}
}