blob: afb3e6d663f188bd397dedbca2f613b5fb36a9a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Compress using:
* - store size of common prefix
* - save column family once, it is same within HFile
* - use integer compression for key, value and prefix (7-bit encoding)
* - use bits to avoid duplication key length, value length
* and type if it same as previous
* - store in 3 bits length of timestamp field
* - allow diff in timestamp instead of actual value
*
* Format:
* - 1 byte: flag
* - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag)
* - 1-5 bytes: value length (only if FLAG_SAME_VALUE_LENGTH is not set in flag)
* - 1-5 bytes: prefix length
* - ... bytes: rest of the row (if prefix length is small enough)
* - ... bytes: qualifier (or suffix depending on prefix length)
* - 1-8 bytes: timestamp or diff
* - 1 byte: type (only if FLAG_SAME_TYPE is not set in the flag)
* - ... bytes: value
*/
@InterfaceAudience.Private
public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder {
static final int FLAG_SAME_KEY_LENGTH = 1;
static final int FLAG_SAME_VALUE_LENGTH = 1 << 1;
static final int FLAG_SAME_TYPE = 1 << 2;
static final int FLAG_TIMESTAMP_IS_DIFF = 1 << 3;
static final int MASK_TIMESTAMP_LENGTH = (1 << 4) | (1 << 5) | (1 << 6);
static final int SHIFT_TIMESTAMP_LENGTH = 4;
static final int FLAG_TIMESTAMP_SIGN = 1 << 7;
protected static class DiffCompressionState extends CompressionState {
long timestamp;
byte[] familyNameWithSize;
@Override
protected void readTimestamp(ByteBuffer in) {
timestamp = in.getLong();
}
@Override
void copyFrom(CompressionState state) {
super.copyFrom(state);
DiffCompressionState state2 = (DiffCompressionState) state;
timestamp = state2.timestamp;
}
}
private void uncompressSingleKeyValue(DataInputStream source,
ByteBuffer buffer,
DiffCompressionState state)
throws IOException, EncoderBufferTooSmallException {
// read the column family at the beginning
if (state.isFirst()) {
state.familyLength = source.readByte();
state.familyNameWithSize =
new byte[(state.familyLength & 0xff) + KeyValue.FAMILY_LENGTH_SIZE];
state.familyNameWithSize[0] = state.familyLength;
int read = source.read(state.familyNameWithSize, KeyValue.FAMILY_LENGTH_SIZE,
state.familyLength);
assert read == state.familyLength;
}
// read flag
byte flag = source.readByte();
// read key/value/common lengths
int keyLength;
int valueLength;
if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
keyLength = state.keyLength;
} else {
keyLength = ByteBufferUtils.readCompressedInt(source);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) != 0) {
valueLength = state.valueLength;
} else {
valueLength = ByteBufferUtils.readCompressedInt(source);
}
int commonPrefix = ByteBufferUtils.readCompressedInt(source);
// create KeyValue buffer and fill it prefix
int keyOffset = buffer.position();
ensureSpace(buffer, keyLength + valueLength + KeyValue.ROW_OFFSET);
buffer.putInt(keyLength);
buffer.putInt(valueLength);
// copy common from previous key
if (commonPrefix > 0) {
ByteBufferUtils.copyFromBufferToBuffer(buffer, buffer, state.prevOffset
+ KeyValue.ROW_OFFSET, commonPrefix);
}
// copy the rest of the key from the buffer
int keyRestLength;
if (state.isFirst() || commonPrefix <
state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
// omit the family part of the key, it is always the same
short rowLength;
int rowRestLength;
// check length of row
if (commonPrefix < KeyValue.ROW_LENGTH_SIZE) {
// not yet copied, do it now
ByteBufferUtils.copyFromStreamToBuffer(buffer, source,
KeyValue.ROW_LENGTH_SIZE - commonPrefix);
ByteBufferUtils.skip(buffer, -KeyValue.ROW_LENGTH_SIZE);
rowLength = buffer.getShort();
rowRestLength = rowLength;
} else {
// already in buffer, just read it
rowLength = buffer.getShort(keyOffset + KeyValue.ROW_OFFSET);
rowRestLength = rowLength + KeyValue.ROW_LENGTH_SIZE - commonPrefix;
}
// copy the rest of row
ByteBufferUtils.copyFromStreamToBuffer(buffer, source, rowRestLength);
state.rowLength = rowLength;
// copy the column family
buffer.put(state.familyNameWithSize);
keyRestLength = keyLength - rowLength -
state.familyNameWithSize.length -
(KeyValue.ROW_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
} else {
// prevRowWithSizeLength is the same as on previous row
keyRestLength = keyLength - commonPrefix - KeyValue.TIMESTAMP_TYPE_SIZE;
}
// copy the rest of the key, after column family -> column qualifier
ByteBufferUtils.copyFromStreamToBuffer(buffer, source, keyRestLength);
// handle timestamp
int timestampFitsInBytes =
((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
long timestamp = ByteBufferUtils.readLong(source, timestampFitsInBytes);
if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
timestamp = -timestamp;
}
if ((flag & FLAG_TIMESTAMP_IS_DIFF) != 0) {
timestamp = state.timestamp - timestamp;
}
buffer.putLong(timestamp);
// copy the type field
byte type;
if ((flag & FLAG_SAME_TYPE) != 0) {
type = state.type;
} else {
type = source.readByte();
}
buffer.put(type);
// copy value part
ByteBufferUtils.copyFromStreamToBuffer(buffer, source, valueLength);
state.keyLength = keyLength;
state.valueLength = valueLength;
state.prevOffset = keyOffset;
state.timestamp = timestamp;
state.type = type;
// state.qualifier is unused
}
@Override
public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingContext,
DataOutputStream out) throws IOException {
EncodingState state = encodingContext.getEncodingState();
int size = compressSingleKeyValue(out, cell, state.prevCell);
size += afterEncodingKeyValue(cell, out, encodingContext);
state.prevCell = cell;
return size;
}
private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCell)
throws IOException {
int flag = 0; // Do not use more bits that can fit into a byte
int kLength = KeyValueUtil.keyLength(cell);
int vLength = cell.getValueLength();
long timestamp;
long diffTimestamp = 0;
int diffTimestampFitsInBytes = 0;
int timestampFitsInBytes;
int commonPrefix = 0;
if (prevCell == null) {
timestamp = cell.getTimestamp();
if (timestamp < 0) {
flag |= FLAG_TIMESTAMP_SIGN;
timestamp = -timestamp;
}
timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
// put column family
byte familyLength = cell.getFamilyLength();
out.write(familyLength);
PrivateCellUtil.writeFamily(out, cell, familyLength);
} else {
// Finding common prefix
int preKeyLength = KeyValueUtil.keyLength(prevCell);
commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
if (kLength == preKeyLength) {
flag |= FLAG_SAME_KEY_LENGTH;
}
if (vLength == prevCell.getValueLength()) {
flag |= FLAG_SAME_VALUE_LENGTH;
}
if (cell.getTypeByte() == prevCell.getTypeByte()) {
flag |= FLAG_SAME_TYPE;
}
// don't compress timestamp and type using prefix encode timestamp
timestamp = cell.getTimestamp();
diffTimestamp = prevCell.getTimestamp() - timestamp;
boolean negativeTimestamp = timestamp < 0;
if (negativeTimestamp) {
timestamp = -timestamp;
}
timestampFitsInBytes = ByteBufferUtils.longFitsIn(timestamp);
boolean minusDiffTimestamp = diffTimestamp < 0;
if (minusDiffTimestamp) {
diffTimestamp = -diffTimestamp;
}
diffTimestampFitsInBytes = ByteBufferUtils.longFitsIn(diffTimestamp);
if (diffTimestampFitsInBytes < timestampFitsInBytes) {
flag |= (diffTimestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
flag |= FLAG_TIMESTAMP_IS_DIFF;
if (minusDiffTimestamp) {
flag |= FLAG_TIMESTAMP_SIGN;
}
} else {
flag |= (timestampFitsInBytes - 1) << SHIFT_TIMESTAMP_LENGTH;
if (negativeTimestamp) {
flag |= FLAG_TIMESTAMP_SIGN;
}
}
}
out.write(flag);
if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, kLength);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
ByteBufferUtils.putCompressedInt(out, vLength);
}
ByteBufferUtils.putCompressedInt(out, commonPrefix);
short rLen = cell.getRowLength();
if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
// Previous and current rows are different. Copy the differing part of
// the row, skip the column family, and copy the qualifier.
PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
} else {
// The common part includes the whole row. As the column family is the
// same across the whole file, it will automatically be included in the
// common prefix, so we need not special-case it here.
// What we write here is the non common part of the qualifier
int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
- (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(),
commonQualPrefix);
}
if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) {
ByteBufferUtils.putLong(out, timestamp, timestampFitsInBytes);
} else {
ByteBufferUtils.putLong(out, diffTimestamp, diffTimestampFitsInBytes);
}
if ((flag & FLAG_SAME_TYPE) == 0) {
out.write(cell.getTypeByte());
}
PrivateCellUtil.writeValue(out, cell, vLength);
return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
}
@Override
public Cell getFirstKeyCellInBlock(ByteBuff block) {
block.mark();
block.position(Bytes.SIZEOF_INT);
byte familyLength = block.get();
block.skip(familyLength);
byte flag = block.get();
int keyLength = ByteBuff.readCompressedInt(block);
// TODO : See if we can avoid these reads as the read values are not getting used
ByteBuff.readCompressedInt(block); // valueLength
ByteBuff.readCompressedInt(block); // commonLength
ByteBuffer result = ByteBuffer.allocate(keyLength);
// copy row
assert !(result.isDirect());
int pos = result.arrayOffset();
block.get(result.array(), pos, Bytes.SIZEOF_SHORT);
pos += Bytes.SIZEOF_SHORT;
short rowLength = result.getShort();
block.get(result.array(), pos, rowLength);
pos += rowLength;
// copy family
int savePosition = block.position();
block.position(Bytes.SIZEOF_INT);
block.get(result.array(), pos, familyLength + Bytes.SIZEOF_BYTE);
pos += familyLength + Bytes.SIZEOF_BYTE;
// copy qualifier
block.position(savePosition);
int qualifierLength =
keyLength - pos + result.arrayOffset() - KeyValue.TIMESTAMP_TYPE_SIZE;
block.get(result.array(), pos, qualifierLength);
pos += qualifierLength;
// copy the timestamp and type
int timestampFitInBytes =
((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1;
long timestamp = ByteBuff.readLong(block, timestampFitInBytes);
if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
timestamp = -timestamp;
}
result.putLong(pos, timestamp);
pos += Bytes.SIZEOF_LONG;
block.get(result.array(), pos, Bytes.SIZEOF_BYTE);
block.reset();
// The result is already a BB. So always we will create a KeyOnlyKv.
return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength);
}
@Override
public String toString() {
return DiffKeyDeltaEncoder.class.getSimpleName();
}
protected static class DiffSeekerState extends SeekerState {
private int rowLengthWithSize;
private long timestamp;
public DiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair,
boolean includeTags) {
super(tmpPair, includeTags);
}
@Override
protected void copyFromNext(SeekerState that) {
super.copyFromNext(that);
DiffSeekerState other = (DiffSeekerState) that;
rowLengthWithSize = other.rowLengthWithSize;
timestamp = other.timestamp;
}
}
@Override
public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<DiffSeekerState>(decodingCtx) {
private byte[] familyNameWithSize;
private static final int TIMESTAMP_WITH_TYPE_LENGTH =
Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
private void decode(boolean isFirst) {
byte flag = currentBuffer.get();
byte type = 0;
if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
if (!isFirst) {
type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
}
current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
}
if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
}
current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
current.ensureSpaceForKey();
if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
// length of row is different, copy everything except family
// copy the row size
currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) +
Bytes.SIZEOF_SHORT;
// copy the rest of row
currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
// copy the column family
System.arraycopy(familyNameWithSize, 0, current.keyBuffer,
current.rowLengthWithSize, familyNameWithSize.length);
// copy the qualifier
currentBuffer.get(current.keyBuffer,
current.rowLengthWithSize + familyNameWithSize.length,
current.keyLength - current.rowLengthWithSize -
familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
} else if (current.lastCommonPrefix < current.rowLengthWithSize) {
// we have to copy part of row and qualifier,
// but column family is in right place
// before column family (rest of row)
currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
current.rowLengthWithSize - current.lastCommonPrefix);
// after column family (qualifier)
currentBuffer.get(current.keyBuffer,
current.rowLengthWithSize + familyNameWithSize.length,
current.keyLength - current.rowLengthWithSize -
familyNameWithSize.length - TIMESTAMP_WITH_TYPE_LENGTH);
} else {
// copy just the ending
currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH -
current.lastCommonPrefix);
}
// timestamp
int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
int timestampFitInBytes = 1 +
((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
long timestampOrDiff = ByteBuff.readLong(currentBuffer, timestampFitInBytes);
if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
timestampOrDiff = -timestampOrDiff;
}
if ((flag & FLAG_TIMESTAMP_IS_DIFF) == 0) { // it is timestamp
current.timestamp = timestampOrDiff;
} else { // it is diff
current.timestamp = current.timestamp - timestampOrDiff;
}
Bytes.putLong(current.keyBuffer, pos, current.timestamp);
pos += Bytes.SIZEOF_LONG;
// type
if ((flag & FLAG_SAME_TYPE) == 0) {
currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
} else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
current.keyBuffer[pos] = type;
}
current.valueOffset = currentBuffer.position();
currentBuffer.skip(current.valueLength);
if (includesTags()) {
decodeTags();
}
if (includesMvcc()) {
current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
} else {
current.memstoreTS = 0;
}
current.nextKvOffset = currentBuffer.position();
}
@Override
protected void decodeFirst() {
currentBuffer.skip(Bytes.SIZEOF_INT);
// read column family
byte familyNameLength = currentBuffer.get();
familyNameWithSize = new byte[familyNameLength + Bytes.SIZEOF_BYTE];
familyNameWithSize[0] = familyNameLength;
currentBuffer.get(familyNameWithSize, Bytes.SIZEOF_BYTE,
familyNameLength);
decode(true);
}
@Override
protected void decodeNext() {
decode(false);
}
@Override
protected DiffSeekerState createSeekerState() {
return new DiffSeekerState(this.tmpPair, this.includesTags());
}
};
}
@Override
protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
int decompressedSize = source.readInt();
ByteBuffer buffer = ByteBuffer.allocate(decompressedSize +
allocateHeaderLength);
buffer.position(allocateHeaderLength);
DiffCompressionState state = new DiffCompressionState();
while (source.available() > skipLastBytes) {
uncompressSingleKeyValue(source, buffer, state);
afterDecodingKeyValue(source, buffer, decodingCtx);
}
if (source.available() != skipLastBytes) {
throw new IllegalStateException("Read too much bytes.");
}
return buffer;
}
}