blob: 958e1c60d90131d981b27ba35c8e3c9f710c77b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BitArray;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
public class RowFile {
public static final Log LOG = LogFactory.getLog(RowFile.class);
private static final int SYNC_ESCAPE = -1;
private static final int SYNC_HASH_SIZE = 16;
private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
private final static int DEFAULT_BUFFER_SIZE = 65535;
public static int SYNC_INTERVAL;
public static class RowFileScanner extends FileScanner {
private FileSystem fs;
private FSDataInputStream in;
private Tuple tuple;
private byte[] sync = new byte[SYNC_HASH_SIZE];
private byte[] checkSync = new byte[SYNC_HASH_SIZE];
private long start, end;
private ByteBuffer buffer;
private final int tupleHeaderSize;
private BitArray nullFlags;
private long bufferStartPos;
public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
throws IOException {
super(conf, schema, meta, fragment);
SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE;
nullFlags = new BitArray(schema.size());
tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
this.start = this.fragment.getStartKey();
this.end = this.start + this.fragment.getLength();
}
public void init() throws IOException {
// set default page size.
fs = fragment.getPath().getFileSystem(conf);
in = fs.open(fragment.getPath());
buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size());
buffer.flip();
readHeader();
// find the correct position from the start
if (this.start > in.getPos()) {
long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
in.seek(realStart);
}
bufferStartPos = in.getPos();
fillBuffer();
if (start != 0) {
// TODO: improve
boolean syncFound = false;
while (!syncFound) {
if (buffer.remaining() < SYNC_SIZE) {
fillBuffer();
}
buffer.mark();
syncFound = checkSync();
if (!syncFound) {
buffer.reset();
buffer.get(); // proceed one byte
}
}
bufferStartPos += buffer.position();
buffer.compact();
buffer.flip();
}
tuple = new VTuple(schema.size());
super.init();
}
private void readHeader() throws IOException {
SYNC_INTERVAL = in.readInt();
StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
}
/**
* Find the sync from the front of the buffer
*
* @return return true if it succeeds to find the sync.
* @throws java.io.IOException
*/
private boolean checkSync() throws IOException {
buffer.getInt(); // escape
buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync
return Arrays.equals(checkSync, sync);
}
private int fillBuffer() throws IOException {
bufferStartPos += buffer.position();
buffer.compact();
int remain = buffer.remaining();
int read = in.read(buffer);
if (read == -1) {
buffer.flip();
return read;
} else {
int totalRead = read;
if (remain > totalRead) {
read = in.read(buffer);
totalRead += read > 0 ? read : 0;
}
buffer.flip();
return totalRead;
}
}
@Override
public Tuple next() throws IOException {
while (buffer.remaining() < SYNC_SIZE) {
if (fillBuffer() < 0) {
return null;
}
}
buffer.mark();
if (!checkSync()) {
buffer.reset();
} else {
if (bufferStartPos + buffer.position() > end) {
return null;
}
}
while (buffer.remaining() < tupleHeaderSize) {
if (fillBuffer() < 0) {
return null;
}
}
int i;
int nullFlagSize = buffer.getShort();
byte[] nullFlagBytes = new byte[nullFlagSize];
buffer.get(nullFlagBytes, 0, nullFlagSize);
nullFlags = new BitArray(nullFlagBytes);
int tupleSize = buffer.getShort();
while (buffer.remaining() < (tupleSize)) {
if (fillBuffer() < 0) {
return null;
}
}
Datum datum;
Column col;
for (i = 0; i < schema.size(); i++) {
if (!nullFlags.get(i)) {
col = schema.getColumn(i);
switch (col.getDataType().getType()) {
case BOOLEAN :
datum = DatumFactory.createBool(buffer.get());
tuple.put(i, datum);
break;
case BIT:
datum = DatumFactory.createBit(buffer.get());
tuple.put(i, datum );
break;
case CHAR :
int realLen = buffer.getInt();
byte[] buf = new byte[col.getDataType().getLength()];
buffer.get(buf);
byte[] charBuf = Arrays.copyOf(buf, realLen);
tuple.put(i, DatumFactory.createChar(charBuf));
break;
case INT2 :
datum = DatumFactory.createInt2(buffer.getShort());
tuple.put(i, datum );
break;
case INT4 :
datum = DatumFactory.createInt4(buffer.getInt());
tuple.put(i, datum );
break;
case INT8 :
datum = DatumFactory.createInt8(buffer.getLong());
tuple.put(i, datum );
break;
case FLOAT4 :
datum = DatumFactory.createFloat4(buffer.getFloat());
tuple.put(i, datum);
break;
case FLOAT8 :
datum = DatumFactory.createFloat8(buffer.getDouble());
tuple.put(i, datum);
break;
case TEXT:
short bytelen = buffer.getShort();
byte[] strbytes = new byte[bytelen];
buffer.get(strbytes, 0, bytelen);
datum = DatumFactory.createText(strbytes);
tuple.put(i, datum);
break;
case BLOB:
short bytesLen = buffer.getShort();
byte [] bytesBuf = new byte[bytesLen];
buffer.get(bytesBuf);
datum = DatumFactory.createBlob(bytesBuf);
tuple.put(i, datum);
break;
default:
break;
}
} else {
tuple.put(i, DatumFactory.createNullDatum());
}
}
return tuple;
}
@Override
public void reset() throws IOException {
init();
}
@Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}
@Override
public boolean isProjectable() {
return false;
}
@Override
public boolean isSelectable() {
return false;
}
@Override
public void setFilter(EvalNode filter) {
throw new TajoRuntimeException(new UnsupportedException());
}
@Override
public boolean isSplittable(){
return true;
}
}
public static class RowFileAppender extends FileAppender {
private FSDataOutputStream out;
private long lastSyncPos;
private FileSystem fs;
private byte[] sync;
private ByteBuffer buffer;
private BitArray nullFlags;
// statistics
private TableStatistics stats;
private ShuffleType shuffleType;
public RowFileAppender(Configuration conf, final TaskAttemptId taskAttemptId,
final Schema schema, final TableMeta meta, final Path workDir)
throws IOException {
super(conf, taskAttemptId, schema, meta, workDir);
}
public void init() throws IOException {
SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal);
fs = path.getFileSystem(conf);
if (!fs.exists(path.getParent())) {
throw new FileNotFoundException(path.toString());
}
if (fs.exists(path)) {
throw new AlreadyExistsStorageException(path);
}
sync = new byte[SYNC_HASH_SIZE];
lastSyncPos = 0;
out = fs.create(path);
MessageDigest md;
try {
md = MessageDigest.getInstance("MD5");
md.update((path.toString()+System.currentTimeMillis()).getBytes());
sync = md.digest();
} catch (NoSuchAlgorithmException e) {
LOG.error(e);
}
writeHeader();
buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
nullFlags = new BitArray(schema.size());
if (tableStatsEnabled) {
this.stats = new TableStatistics(this.schema, columnStatsEnabled);
this.shuffleType = PlannerUtil.getShuffleType(
meta.getProperty(StorageConstants.SHUFFLE_TYPE,
PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)));
}
}
private void writeHeader() throws IOException {
out.writeInt(SYNC_INTERVAL);
out.write(sync);
out.flush();
lastSyncPos = out.getPos();
}
@Override
public void addTuple(Tuple t) throws IOException {
checkAndWriteSync();
Column col;
buffer.clear();
nullFlags.clear();
for (int i = 0; i < schema.size(); i++) {
if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
// it is to calculate min/max values, and it is only used for the intermediate file.
stats.analyzeField(i, t);
}
if (t.isBlankOrNull(i)) {
nullFlags.set(i);
} else {
col = schema.getColumn(i);
switch (col.getDataType().getType()) {
case BOOLEAN:
buffer.put(t.getByte(i));
break;
case BIT:
buffer.put(t.getByte(i));
break;
case CHAR:
byte[] src = t.getBytes(i);
byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
buffer.putInt(src.length);
buffer.put(dst);
break;
case TEXT:
byte [] strbytes = t.getBytes(i);
buffer.putShort((short)strbytes.length);
buffer.put(strbytes, 0, strbytes.length);
break;
case INT2:
buffer.putShort(t.getInt2(i));
break;
case INT4:
buffer.putInt(t.getInt4(i));
break;
case INT8:
buffer.putLong(t.getInt8(i));
break;
case FLOAT4:
buffer.putFloat(t.getFloat4(i));
break;
case FLOAT8:
buffer.putDouble(t.getFloat8(i));
break;
case BLOB:
byte [] bytes = t.getBytes(i);
buffer.putShort((short)bytes.length);
buffer.put(bytes);
break;
case NULL_TYPE:
nullFlags.set(i);
break;
default:
break;
}
}
}
byte[] bytes = nullFlags.toArray();
out.writeShort(bytes.length);
out.write(bytes);
bytes = buffer.array();
int dataLen = buffer.position();
out.writeShort(dataLen);
out.write(bytes, 0, dataLen);
// Statistical section
if (tableStatsEnabled) {
stats.incrementRow();
}
}
@Override
public long getOffset() throws IOException {
return out.getPos();
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
if (out != null) {
if (tableStatsEnabled) {
stats.setNumBytes(out.getPos());
}
sync();
out.flush();
IOUtils.cleanup(LOG, out);
}
}
private void sync() throws IOException {
if (lastSyncPos != out.getPos()) {
out.writeInt(SYNC_ESCAPE);
out.write(sync);
lastSyncPos = out.getPos();
}
}
private void checkAndWriteSync() throws IOException {
if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
sync();
}
}
@Override
public TableStats getStats() {
if (tableStatsEnabled) {
return stats.getTableStat();
} else {
return null;
}
}
}
}