blob: a6850c14c2b197325a94a2d501f7e1f67ed52125 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage;
import io.netty.buffer.ByteBuf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.rawfile.DirectRawFileWriter;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.BitArray;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
public class RawFile {
private static final Log LOG = LogFactory.getLog(RawFile.class);
public static final String READ_BUFFER_SIZE = "tajo.storage.raw.io.read-buffer.bytes";
public static final String WRITE_BUFFER_SIZE = "tajo.storage.raw.io.write-buffer.bytes";
public static final int DEFAULT_BUFFER_SIZE = 128 * StorageUnit.KB;
public static class RawFileScanner extends FileScanner implements SeekableScanner {
private FileChannel channel;
private DataType[] columnTypes;
private ByteBuffer buffer;
private ByteBuf buf;
private Tuple outTuple;
private int headerSize = 0; // Header size of a tuple
private BitArray nullFlags;
private static final int RECORD_SIZE = 4;
private boolean eos = false;
private long startOffset;
private long endOffset;
private FileInputStream fis;
private long recordCount;
private long totalReadBytes;
private long filePosition;
private boolean forceFillBuffer;
public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
super(conf, schema, meta, fragment);
}
@Override
public void init() throws IOException {
File file;
try {
if (fragment.getPath().toUri().getScheme() != null) {
file = new File(fragment.getPath().toUri());
} else {
file = new File(fragment.getPath().toString());
}
} catch (IllegalArgumentException iae) {
throw new IOException(iae);
}
fis = new FileInputStream(file);
channel = fis.getChannel();
filePosition = startOffset = fragment.getStartKey();
endOffset = fragment.getEndKey();
if (LOG.isDebugEnabled()) {
LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size()
+ ", fragment length :" + fragment.getLength());
}
if(buf == null) {
buf = BufferPool.directBuffer(conf.getInt(READ_BUFFER_SIZE, DEFAULT_BUFFER_SIZE)).order(ByteOrder.LITTLE_ENDIAN);
buffer = buf.nioBuffer(0, buf.capacity());
}
columnTypes = new DataType[schema.size()];
for (int i = 0; i < schema.size(); i++) {
columnTypes[i] = schema.getColumn(i).getDataType();
}
outTuple = new VTuple(columnTypes.length);
nullFlags = new BitArray(schema.size());
headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize
// initial set position
if (fragment.getStartKey() > 0) {
channel.position(fragment.getStartKey());
}
forceFillBuffer = true;
super.init();
}
@Override
public long getNextOffset() throws IOException {
return filePosition - (forceFillBuffer ? 0 : buffer.remaining());
}
@Override
public void seek(long offset) throws IOException {
eos = false;
filePosition = channel.position();
// do not fill the buffer if the offset is already included in the buffer.
if(!forceFillBuffer && filePosition > offset && offset > filePosition - buffer.limit()){
buffer.position((int)(offset - (filePosition - buffer.limit())));
} else {
if(offset < startOffset || offset > startOffset + fragment.getLength()){
throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d",
startOffset, startOffset + fragment.getLength(), offset));
}
channel.position(offset);
filePosition = offset;
buffer.clear();
forceFillBuffer = true;
fillBuffer();
}
}
private boolean fillBuffer() throws IOException {
if(!forceFillBuffer) buffer.compact();
int bytesRead = channel.read(buffer);
forceFillBuffer = false;
if (bytesRead == -1) {
eos = true;
return false;
} else {
buffer.flip(); //The limit is set to the current filePosition and then the filePosition is set to zero
filePosition += bytesRead;
totalReadBytes += bytesRead;
return true;
}
}
/**
* Decode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers
* into values that can be efficiently encoded with varint. (Otherwise,
* negative values must be sign-extended to 64 bits to be varint encoded,
* thus always taking 10 bytes on the wire.)
*
* @param n An unsigned 32-bit integer, stored in a signed int because
* Java has no explicit unsigned support.
* @return A signed 32-bit integer.
*/
public static int decodeZigZag32(final int n) {
return (n >>> 1) ^ -(n & 1);
}
/**
* Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers
* into values that can be efficiently encoded with varint. (Otherwise,
* negative values must be sign-extended to 64 bits to be varint encoded,
* thus always taking 10 bytes on the wire.)
*
* @param n An unsigned 64-bit integer, stored in a signed int because
* Java has no explicit unsigned support.
* @return A signed 64-bit integer.
*/
public static long decodeZigZag64(final long n) {
return (n >>> 1) ^ -(n & 1);
}
/**
* Read a raw Varint from the stream. If larger than 32 bits, discard the
* upper bits.
*/
public int readRawVarint32() throws IOException {
byte tmp = buffer.get();
if (tmp >= 0) {
return tmp;
}
int result = tmp & 0x7f;
if ((tmp = buffer.get()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 0x7f) << 7;
if ((tmp = buffer.get()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 0x7f) << 14;
if ((tmp = buffer.get()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = buffer.get()) << 28;
if (tmp < 0) {
// Discard upper 32 bits.
for (int i = 0; i < 5; i++) {
if (buffer.get() >= 0) {
return result;
}
}
throw new IOException("Invalid Variable int32");
}
}
}
}
return result;
}
/** Read a raw Varint from the stream. */
public long readRawVarint64() throws IOException {
int shift = 0;
long result = 0;
while (shift < 64) {
final byte b = buffer.get();
result |= (long)(b & 0x7F) << shift;
if ((b & 0x80) == 0) {
return result;
}
shift += 7;
}
throw new IOException("Invalid Variable int64");
}
@Override
public Tuple next() throws IOException {
if(eos) return null;
if (forceFillBuffer || buffer.remaining() < headerSize) {
if (!fillBuffer()) {
return null;
}
}
// backup the buffer state
int bufferLimit = buffer.limit();
int recordSize = buffer.getInt();
int nullFlagSize = buffer.getShort();
buffer.limit(buffer.position() + nullFlagSize);
nullFlags.fromByteBuffer(buffer);
// restore the start of record contents
buffer.limit(bufferLimit);
if (buffer.remaining() < (recordSize - headerSize)) {
//if the buffer reaches the writable size, the buffer increase the record size
reSizeBuffer(recordSize);
if (!fillBuffer()) {
return null;
}
}
for (int i = 0; i < columnTypes.length; i++) {
// check if the i'th column is null
if (nullFlags.get(i)) {
outTuple.put(i, DatumFactory.createNullDatum());
continue;
}
switch (columnTypes[i].getType()) {
case BOOLEAN:
outTuple.put(i, DatumFactory.createBool(buffer.get()));
break;
case BIT:
outTuple.put(i, DatumFactory.createBit(buffer.get()));
break;
case CHAR:
int realLen = readRawVarint32();
byte[] buf = new byte[realLen];
buffer.get(buf);
outTuple.put(i, DatumFactory.createChar(buf));
break;
case INT2:
outTuple.put(i, DatumFactory.createInt2(buffer.getShort()));
break;
case INT4:
outTuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32())));
break;
case INT8:
outTuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64())));
break;
case FLOAT4:
outTuple.put(i, DatumFactory.createFloat4(buffer.getFloat()));
break;
case FLOAT8:
outTuple.put(i, DatumFactory.createFloat8(buffer.getDouble()));
break;
case TEXT: {
int len = readRawVarint32();
byte[] strBytes = new byte[len];
buffer.get(strBytes);
outTuple.put(i, DatumFactory.createText(strBytes));
break;
}
case BLOB: {
int len = readRawVarint32();
byte[] rawBytes = new byte[len];
buffer.get(rawBytes);
outTuple.put(i, DatumFactory.createBlob(rawBytes));
break;
}
case PROTOBUF: {
int len = readRawVarint32();
byte[] rawBytes = new byte[len];
buffer.get(rawBytes);
outTuple.put(i, ProtobufDatumFactory.createDatum(columnTypes[i], rawBytes));
break;
}
case DATE: {
int val = buffer.getInt();
if (val < Integer.MIN_VALUE + 1) {
outTuple.put(i, DatumFactory.createNullDatum());
} else {
outTuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val));
}
break;
}
case TIME:
case TIMESTAMP: {
long val = buffer.getLong();
if (val < Long.MIN_VALUE + 1) {
outTuple.put(i, DatumFactory.createNullDatum());
} else {
outTuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val));
}
break;
}
case NULL_TYPE:
outTuple.put(i, NullDatum.get());
break;
default:
}
}
recordCount++;
if(filePosition - buffer.remaining() >= endOffset){
eos = true;
}
return outTuple;
}
private void reSizeBuffer(int writableBytes){
if (buffer.capacity() - buffer.remaining() < writableBytes) {
buf.setIndex(buffer.position(), buffer.limit());
buf.markReaderIndex();
buf.discardReadBytes();
buf.ensureWritable(writableBytes);
buffer = buf.nioBuffer(0, buf.capacity());
buffer.limit(buf.writerIndex());
}
}
@Override
public void reset() throws IOException {
// reset the buffer
buffer.clear();
forceFillBuffer = true;
filePosition = fragment.getStartKey();
recordCount = 0;
channel.position(filePosition);
eos = false;
}
@Override
public void close() throws IOException {
if(buf != null){
buffer.clear();
buffer = null;
buf.release();
buf = null;
}
IOUtils.cleanup(LOG, channel, fis);
}
@Override
public boolean isProjectable() {
return false;
}
@Override
public boolean isSelectable() {
return false;
}
@Override
public void setFilter(EvalNode filter) {
throw new TajoRuntimeException(new UnsupportedException());
}
@Override
public boolean isSplittable(){
return false;
}
@Override
public TableStats getInputStats() {
if(tableStats != null){
tableStats.setNumRows(recordCount);
tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n)
tableStats.setNumBytes(fragment.getLength());
}
return tableStats;
}
@Override
public float getProgress() {
if(eos) {
return 1.0f;
}
long readBytes = filePosition - startOffset;
if (readBytes == 0) {
return 0.0f;
} else {
return Math.min(1.0f, ((float) readBytes / fragment.getLength()));
}
}
}
public static class RawFileAppender extends DirectRawFileWriter {
public RawFileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema,
TableMeta meta, Path workDir) throws IOException {
super(conf, taskAttemptId, schema, meta, workDir, null);
}
}
}