blob: 755647d6274448438b205dee5861c080955a7b49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.encoding;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Base class for all data block encoders that use a buffer.
*/
@InterfaceAudience.Private
abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
/**
* TODO: This datablockencoder is dealing in internals of hfileblocks. Purge reference to HFBs
*/
private static int INITIAL_KEY_BUFFER_SIZE = 512;
@Override
public ByteBuffer decodeKeyValues(DataInputStream source,
HFileBlockDecodingContext blkDecodingCtx) throws IOException {
if (blkDecodingCtx.getClass() != HFileBlockDefaultDecodingContext.class) {
throw new IOException(this.getClass().getName() + " only accepts "
+ HFileBlockDefaultDecodingContext.class.getName() + " as the decoding context.");
}
HFileBlockDefaultDecodingContext decodingCtx =
(HFileBlockDefaultDecodingContext) blkDecodingCtx;
if (decodingCtx.getHFileContext().isIncludesTags()
&& decodingCtx.getHFileContext().isCompressTags()) {
if (decodingCtx.getTagCompressionContext() != null) {
// It will be overhead to create the TagCompressionContext again and again for every block
// decoding.
decodingCtx.getTagCompressionContext().clear();
} else {
try {
TagCompressionContext tagCompressionContext = new TagCompressionContext(
LRUDictionary.class, Byte.MAX_VALUE);
decodingCtx.setTagCompressionContext(tagCompressionContext);
} catch (Exception e) {
throw new IOException("Failed to initialize TagCompressionContext", e);
}
}
}
return internalDecodeKeyValues(source, 0, 0, decodingCtx);
}
/********************* common prefixes *************************/
// Having this as static is fine but if META is having DBE then we should
// change this.
public static int compareCommonRowPrefix(Cell left, Cell right, int rowCommonPrefix) {
return Bytes.compareTo(left.getRowArray(), left.getRowOffset() + rowCommonPrefix,
left.getRowLength() - rowCommonPrefix, right.getRowArray(), right.getRowOffset()
+ rowCommonPrefix, right.getRowLength() - rowCommonPrefix);
}
public static int compareCommonFamilyPrefix(Cell left, Cell right, int familyCommonPrefix) {
return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset() + familyCommonPrefix,
left.getFamilyLength() - familyCommonPrefix, right.getFamilyArray(),
right.getFamilyOffset() + familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix);
}
public static int compareCommonQualifierPrefix(Cell left, Cell right, int qualCommonPrefix) {
return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset() + qualCommonPrefix,
left.getQualifierLength() - qualCommonPrefix, right.getQualifierArray(),
right.getQualifierOffset() + qualCommonPrefix, right.getQualifierLength()
- qualCommonPrefix);
}
protected static class SeekerState {
protected ByteBuff currentBuffer;
protected TagCompressionContext tagCompressionContext;
protected int valueOffset = -1;
protected int keyLength;
protected int valueLength;
protected int lastCommonPrefix;
protected int tagsLength = 0;
protected int tagsOffset = -1;
protected int tagsCompressedLength = 0;
protected boolean uncompressTags = true;
/** We need to store a copy of the key. */
protected byte[] keyBuffer = HConstants.EMPTY_BYTE_ARRAY;
protected byte[] tagsBuffer = HConstants.EMPTY_BYTE_ARRAY;
protected long memstoreTS;
protected int nextKvOffset;
protected KeyValue.KeyOnlyKeyValue currentKey = new KeyValue.KeyOnlyKeyValue();
// A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
// many object creations.
private final ObjectIntPair<ByteBuffer> tmpPair;
private final boolean includeTags;
public SeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
this.tmpPair = tmpPair;
this.includeTags = includeTags;
}
protected boolean isValid() {
return valueOffset != -1;
}
protected void invalidate() {
valueOffset = -1;
tagsCompressedLength = 0;
currentKey.clear();
uncompressTags = true;
currentBuffer = null;
}
protected void ensureSpaceForKey() {
if (keyLength > keyBuffer.length) {
int newKeyBufferLength = Integer.highestOneBit(Math.max(
INITIAL_KEY_BUFFER_SIZE, keyLength) - 1) << 1;
byte[] newKeyBuffer = new byte[newKeyBufferLength];
System.arraycopy(keyBuffer, 0, newKeyBuffer, 0, keyBuffer.length);
keyBuffer = newKeyBuffer;
}
}
protected void ensureSpaceForTags() {
if (tagsLength > tagsBuffer.length) {
int newTagsBufferLength = Integer.highestOneBit(Math.max(
INITIAL_KEY_BUFFER_SIZE, tagsLength) - 1) << 1;
byte[] newTagsBuffer = new byte[newTagsBufferLength];
System.arraycopy(tagsBuffer, 0, newTagsBuffer, 0, tagsBuffer.length);
tagsBuffer = newTagsBuffer;
}
}
protected void setKey(byte[] keyBuffer, long memTS) {
currentKey.setKey(keyBuffer, 0, keyLength);
memstoreTS = memTS;
}
/**
* Copy the state from the next one into this instance (the previous state
* placeholder). Used to save the previous state when we are advancing the
* seeker to the next key/value.
*/
protected void copyFromNext(SeekerState nextState) {
if (keyBuffer.length != nextState.keyBuffer.length) {
keyBuffer = nextState.keyBuffer.clone();
} else if (!isValid()) {
// Note: we can only call isValid before we override our state, so this
// comes before all the assignments at the end of this method.
System.arraycopy(nextState.keyBuffer, 0, keyBuffer, 0,
nextState.keyLength);
} else {
// don't copy the common prefix between this key and the previous one
System.arraycopy(nextState.keyBuffer, nextState.lastCommonPrefix,
keyBuffer, nextState.lastCommonPrefix, nextState.keyLength
- nextState.lastCommonPrefix);
}
currentKey.set(nextState.currentKey);
valueOffset = nextState.valueOffset;
keyLength = nextState.keyLength;
valueLength = nextState.valueLength;
lastCommonPrefix = nextState.lastCommonPrefix;
nextKvOffset = nextState.nextKvOffset;
memstoreTS = nextState.memstoreTS;
currentBuffer = nextState.currentBuffer;
tagsOffset = nextState.tagsOffset;
tagsLength = nextState.tagsLength;
if (nextState.tagCompressionContext != null) {
tagCompressionContext = nextState.tagCompressionContext;
}
}
public Cell toCell() {
// Buffer backing the value and tags part from the HFileBlock's buffer
// When tag compression in use, this will be only the value bytes area.
ByteBuffer valAndTagsBuffer;
int vOffset;
int valAndTagsLength = this.valueLength;
int tagsLenSerializationSize = 0;
if (this.includeTags && this.tagCompressionContext == null) {
// Include the tags part also. This will be the tags bytes + 2 bytes of for storing tags
// length
tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + this.valueLength);
valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
}
this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, this.tmpPair);
valAndTagsBuffer = this.tmpPair.getFirst();
vOffset = this.tmpPair.getSecond();// This is the offset to value part in the BB
if (valAndTagsBuffer.hasArray()) {
return toOnheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
} else {
return toOffheapCell(valAndTagsBuffer, vOffset, tagsLenSerializationSize);
}
}
private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
int tagsLenSerializationSize) {
byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
int tOffset = 0;
if (this.includeTags) {
if (this.tagCompressionContext == null) {
tagsArray = valAndTagsBuffer.array();
tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
+ tagsLenSerializationSize;
} else {
tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
tOffset = 0;
}
}
return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
currentKey.getRowLength(), currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer.array(),
valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, memstoreTS, tagsArray,
tOffset, this.tagsLength);
}
private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
int tagsLenSerializationSize) {
ByteBuffer tagsBuf = HConstants.EMPTY_BYTE_BUFFER;
int tOffset = 0;
if (this.includeTags) {
if (this.tagCompressionContext == null) {
tagsBuf = valAndTagsBuffer;
tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
} else {
tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, this.tagsLength));
tOffset = 0;
}
}
return new OffheapDecodedExtendedCell(
ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, this.keyLength)), currentKey.getRowLength(),
currentKey.getFamilyOffset(), currentKey.getFamilyLength(),
currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
currentKey.getTimestamp(), currentKey.getTypeByte(), valAndTagsBuffer, vOffset,
this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
}
}
/**
* Copies only the key part of the keybuffer by doing a deep copy and passes the
* seeker state members for taking a clone.
* Note that the value byte[] part is still pointing to the currentBuffer and
* represented by the valueOffset and valueLength
*/
// We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
// there. So this has to be an instance of ExtendedCell.
protected static class OnheapDecodedCell implements ExtendedCell {
private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.ARRAY));
private byte[] keyOnlyBuffer;
private short rowLength;
private int familyOffset;
private byte familyLength;
private int qualifierOffset;
private int qualifierLength;
private long timestamp;
private byte typeByte;
private byte[] valueBuffer;
private int valueOffset;
private int valueLength;
private byte[] tagsBuffer;
private int tagsOffset;
private int tagsLength;
private long seqId;
protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int familyOffset,
byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] tagsBuffer,
int tagsOffset, int tagsLength) {
this.keyOnlyBuffer = keyBuffer;
this.rowLength = rowLength;
this.familyOffset = familyOffset;
this.familyLength = familyLength;
this.qualifierOffset = qualOffset;
this.qualifierLength = qualLength;
this.timestamp = timeStamp;
this.typeByte = typeByte;
this.valueBuffer = valueBuffer;
this.valueOffset = valueOffset;
this.valueLength = valueLen;
this.tagsBuffer = tagsBuffer;
this.tagsOffset = tagsOffset;
this.tagsLength = tagsLength;
setSequenceId(seqId);
}
@Override
public byte[] getRowArray() {
return keyOnlyBuffer;
}
@Override
public byte[] getFamilyArray() {
return keyOnlyBuffer;
}
@Override
public byte[] getQualifierArray() {
return keyOnlyBuffer;
}
@Override
public int getRowOffset() {
return Bytes.SIZEOF_SHORT;
}
@Override
public short getRowLength() {
return rowLength;
}
@Override
public int getFamilyOffset() {
return familyOffset;
}
@Override
public byte getFamilyLength() {
return familyLength;
}
@Override
public int getQualifierOffset() {
return qualifierOffset;
}
@Override
public int getQualifierLength() {
return qualifierLength;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public byte getTypeByte() {
return typeByte;
}
@Override
public long getSequenceId() {
return seqId;
}
@Override
public byte[] getValueArray() {
return this.valueBuffer;
}
@Override
public int getValueOffset() {
return valueOffset;
}
@Override
public int getValueLength() {
return valueLength;
}
@Override
public byte[] getTagsArray() {
return this.tagsBuffer;
}
@Override
public int getTagsOffset() {
return this.tagsOffset;
}
@Override
public int getTagsLength() {
return tagsLength;
}
@Override
public String toString() {
return KeyValue.keyToString(this.keyOnlyBuffer, 0, KeyValueUtil.keyLength(this)) + "/vlen="
+ getValueLength() + "/seqid=" + seqId;
}
@Override
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
@Override
public long heapSize() {
return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + valueLength + tagsLength;
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = getSerializedSize(withTags);
ByteBufferUtils.putInt(out, keyOnlyBuffer.length);
ByteBufferUtils.putInt(out, valueLength);
// Write key
out.write(keyOnlyBuffer);
// Write value
out.write(this.valueBuffer, this.valueOffset, this.valueLength);
if (withTags && this.tagsLength > 0) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength));
out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite;
}
@Override
public int getSerializedSize(boolean withTags) {
return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
withTags);
}
@Override
public void write(ByteBuffer buf, int offset) {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
@Override
public void setTimestamp(long ts) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
@Override
public void setTimestamp(byte[] ts) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
@Override
public ExtendedCell deepClone() {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
}
protected static class OffheapDecodedExtendedCell extends ByteBufferExtendedCell {
private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * Bytes.SIZEOF_INT)
+ (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * ClassSize.BYTE_BUFFER));
private ByteBuffer keyBuffer;
private short rowLength;
private int familyOffset;
private byte familyLength;
private int qualifierOffset;
private int qualifierLength;
private long timestamp;
private byte typeByte;
private ByteBuffer valueBuffer;
private int valueOffset;
private int valueLength;
private ByteBuffer tagsBuffer;
private int tagsOffset;
private int tagsLength;
private long seqId;
protected OffheapDecodedExtendedCell(ByteBuffer keyBuffer, short rowLength, int familyOffset,
byte familyLength, int qualOffset, int qualLength, long timeStamp, byte typeByte,
ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, ByteBuffer tagsBuffer,
int tagsOffset, int tagsLength) {
// The keyBuffer is always onheap
assert keyBuffer.hasArray();
assert keyBuffer.arrayOffset() == 0;
this.keyBuffer = keyBuffer;
this.rowLength = rowLength;
this.familyOffset = familyOffset;
this.familyLength = familyLength;
this.qualifierOffset = qualOffset;
this.qualifierLength = qualLength;
this.timestamp = timeStamp;
this.typeByte = typeByte;
this.valueBuffer = valueBuffer;
this.valueOffset = valueOffset;
this.valueLength = valueLen;
this.tagsBuffer = tagsBuffer;
this.tagsOffset = tagsOffset;
this.tagsLength = tagsLength;
setSequenceId(seqId);
}
@Override
public byte[] getRowArray() {
return this.keyBuffer.array();
}
@Override
public int getRowOffset() {
return getRowPosition();
}
@Override
public short getRowLength() {
return this.rowLength;
}
@Override
public byte[] getFamilyArray() {
return this.keyBuffer.array();
}
@Override
public int getFamilyOffset() {
return getFamilyPosition();
}
@Override
public byte getFamilyLength() {
return this.familyLength;
}
@Override
public byte[] getQualifierArray() {
return this.keyBuffer.array();
}
@Override
public int getQualifierOffset() {
return getQualifierPosition();
}
@Override
public int getQualifierLength() {
return this.qualifierLength;
}
@Override
public long getTimestamp() {
return this.timestamp;
}
@Override
public byte getTypeByte() {
return this.typeByte;
}
@Override
public long getSequenceId() {
return this.seqId;
}
@Override
public byte[] getValueArray() {
return CellUtil.cloneValue(this);
}
@Override
public int getValueOffset() {
return 0;
}
@Override
public int getValueLength() {
return this.valueLength;
}
@Override
public byte[] getTagsArray() {
return PrivateCellUtil.cloneTags(this);
}
@Override
public int getTagsOffset() {
return 0;
}
@Override
public int getTagsLength() {
return this.tagsLength;
}
@Override
public ByteBuffer getRowByteBuffer() {
return this.keyBuffer;
}
@Override
public int getRowPosition() {
return Bytes.SIZEOF_SHORT;
}
@Override
public ByteBuffer getFamilyByteBuffer() {
return this.keyBuffer;
}
@Override
public int getFamilyPosition() {
return this.familyOffset;
}
@Override
public ByteBuffer getQualifierByteBuffer() {
return this.keyBuffer;
}
@Override
public int getQualifierPosition() {
return this.qualifierOffset;
}
@Override
public ByteBuffer getValueByteBuffer() {
return this.valueBuffer;
}
@Override
public int getValuePosition() {
return this.valueOffset;
}
@Override
public ByteBuffer getTagsByteBuffer() {
return this.tagsBuffer;
}
@Override
public int getTagsPosition() {
return this.tagsOffset;
}
@Override
public long heapSize() {
return FIXED_OVERHEAD;
}
@Override
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
@Override
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = getSerializedSize(withTags);
ByteBufferUtils.putInt(out, keyBuffer.capacity());
ByteBufferUtils.putInt(out, valueLength);
// Write key
out.write(keyBuffer.array());
// Write value
ByteBufferUtils.copyBufferToStream(out, this.valueBuffer, this.valueOffset, this.valueLength);
if (withTags && this.tagsLength > 0) {
// 2 bytes tags length followed by tags bytes
// tags length is serialized with 2 bytes only(short way) even if the type is int.
// As this is non -ve numbers, we save the sign bit. See HBASE-11437
out.write((byte) (0xff & (this.tagsLength >> 8)));
out.write((byte) (0xff & this.tagsLength));
ByteBufferUtils.copyBufferToStream(out, this.tagsBuffer, this.tagsOffset, this.tagsLength);
}
return lenToWrite;
}
@Override
public int getSerializedSize(boolean withTags) {
return KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength,
withTags);
}
@Override
public void setTimestamp(long ts) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
@Override
public void setTimestamp(byte[] ts) throws IOException {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
@Override
public void write(ByteBuffer buf, int offset) {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
@Override
public ExtendedCell deepClone() {
// This is not used in actual flow. Throwing UnsupportedOperationException
throw new UnsupportedOperationException();
}
}
protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
extends AbstractEncodedSeeker {
protected ByteBuff currentBuffer;
protected TagCompressionContext tagCompressionContext = null;
protected KeyValue.KeyOnlyKeyValue keyOnlyKV = new KeyValue.KeyOnlyKeyValue();
// A temp pair object which will be reused by ByteBuff#asSubByteBuffer calls. This avoids too
// many object creations.
protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
protected STATE current, previous;
public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
super(decodingCtx);
if (decodingCtx.getHFileContext().isCompressTags()) {
try {
tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize TagCompressionContext", e);
}
}
current = createSeekerState(); // always valid
previous = createSeekerState(); // may not be valid
}
@Override
public int compareKey(CellComparator comparator, Cell key) {
keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
return PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, keyOnlyKV);
}
@Override
public void setCurrentBuffer(ByteBuff buffer) {
if (this.tagCompressionContext != null) {
this.tagCompressionContext.clear();
}
currentBuffer = buffer;
current.currentBuffer = currentBuffer;
if(tagCompressionContext != null) {
current.tagCompressionContext = tagCompressionContext;
}
decodeFirst();
current.setKey(current.keyBuffer, current.memstoreTS);
previous.invalidate();
}
@Override
public Cell getKey() {
byte[] key = new byte[current.keyLength];
System.arraycopy(current.keyBuffer, 0, key, 0, current.keyLength);
return new KeyValue.KeyOnlyKeyValue(key);
}
@Override
public ByteBuffer getValueShallowCopy() {
currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, tmpPair);
ByteBuffer dup = tmpPair.getFirst().duplicate();
dup.position(tmpPair.getSecond());
dup.limit(tmpPair.getSecond() + current.valueLength);
return dup.slice();
}
@Override
public Cell getCell() {
return current.toCell();
}
@Override
public void rewind() {
currentBuffer.rewind();
if (tagCompressionContext != null) {
tagCompressionContext.clear();
}
decodeFirst();
current.setKey(current.keyBuffer, current.memstoreTS);
previous.invalidate();
}
@Override
public boolean next() {
if (!currentBuffer.hasRemaining()) {
return false;
}
decodeNext();
current.setKey(current.keyBuffer, current.memstoreTS);
previous.invalidate();
return true;
}
protected void decodeTags() {
current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
if (tagCompressionContext != null) {
if (current.uncompressTags) {
// Tag compression is been used. uncompress it into tagsBuffer
current.ensureSpaceForTags();
try {
current.tagsCompressedLength = tagCompressionContext.uncompressTags(currentBuffer,
current.tagsBuffer, 0, current.tagsLength);
} catch (IOException e) {
throw new RuntimeException("Exception while uncompressing tags", e);
}
} else {
currentBuffer.skip(current.tagsCompressedLength);
current.uncompressTags = true;// Reset this.
}
current.tagsOffset = -1;
} else {
// When tag compress is not used, let us not do copying of tags bytes into tagsBuffer.
// Just mark the tags Offset so as to create the KV buffer later in getKeyValueBuffer()
current.tagsOffset = currentBuffer.position();
currentBuffer.skip(current.tagsLength);
}
}
@Override
public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
int rowCommonPrefix = 0;
int familyCommonPrefix = 0;
int qualCommonPrefix = 0;
previous.invalidate();
do {
int comp;
keyOnlyKV.setKey(current.keyBuffer, 0, current.keyLength);
if (current.lastCommonPrefix != 0) {
// The KV format has row key length also in the byte array. The
// common prefix
// includes it. So we need to subtract to find out the common prefix
// in the
// row part alone
rowCommonPrefix = Math.min(rowCommonPrefix, current.lastCommonPrefix - 2);
}
if (current.lastCommonPrefix <= 2) {
rowCommonPrefix = 0;
}
rowCommonPrefix += findCommonPrefixInRowPart(seekCell, keyOnlyKV, rowCommonPrefix);
comp = compareCommonRowPrefix(seekCell, keyOnlyKV, rowCommonPrefix);
if (comp == 0) {
comp = compareTypeBytes(seekCell, keyOnlyKV);
if (comp == 0) {
// Subtract the fixed row key length and the family key fixed length
familyCommonPrefix = Math.max(
0,
Math.min(familyCommonPrefix,
current.lastCommonPrefix - (3 + keyOnlyKV.getRowLength())));
familyCommonPrefix += findCommonPrefixInFamilyPart(seekCell, keyOnlyKV,
familyCommonPrefix);
comp = compareCommonFamilyPrefix(seekCell, keyOnlyKV, familyCommonPrefix);
if (comp == 0) {
// subtract the rowkey fixed length and the family key fixed
// length
qualCommonPrefix = Math.max(
0,
Math.min(
qualCommonPrefix,
current.lastCommonPrefix
- (3 + keyOnlyKV.getRowLength() + keyOnlyKV.getFamilyLength())));
qualCommonPrefix += findCommonPrefixInQualifierPart(seekCell, keyOnlyKV,
qualCommonPrefix);
comp = compareCommonQualifierPrefix(seekCell, keyOnlyKV, qualCommonPrefix);
if (comp == 0) {
comp = CellComparator.getInstance().compareTimestamps(seekCell, keyOnlyKV);
if (comp == 0) {
// Compare types. Let the delete types sort ahead of puts;
// i.e. types
// of higher numbers sort before those of lesser numbers.
// Maximum
// (255)
// appears ahead of everything, and minimum (0) appears
// after
// everything.
comp = (0xff & keyOnlyKV.getTypeByte()) - (0xff & seekCell.getTypeByte());
}
}
}
}
}
if (comp == 0) { // exact match
if (seekBefore) {
if (!previous.isValid()) {
// The caller (seekBefore) has to ensure that we are not at the
// first key in the block.
throw new IllegalStateException("Cannot seekBefore if "
+ "positioned at the first key in the block: key="
+ Bytes.toStringBinary(seekCell.getRowArray()));
}
moveToPrevious();
return 1;
}
return 0;
}
if (comp < 0) { // already too large, check previous
if (previous.isValid()) {
moveToPrevious();
} else {
return HConstants.INDEX_KEY_MAGIC; // using optimized index key
}
return 1;
}
// move to next, if more data is available
if (currentBuffer.hasRemaining()) {
previous.copyFromNext(current);
decodeNext();
current.setKey(current.keyBuffer, current.memstoreTS);
} else {
break;
}
} while (true);
// we hit the end of the block, not an exact match
return 1;
}
private int compareTypeBytes(Cell key, Cell right) {
if (key.getFamilyLength() + key.getQualifierLength() == 0
&& key.getTypeByte() == Type.Minimum.getCode()) {
// left is "bigger", i.e. it appears later in the sorted order
return 1;
}
if (right.getFamilyLength() + right.getQualifierLength() == 0
&& right.getTypeByte() == Type.Minimum.getCode()) {
return -1;
}
return 0;
}
private static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
return Bytes.findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
- rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
+ rowCommonPrefix, right.getRowOffset() + rowCommonPrefix);
}
private static int findCommonPrefixInFamilyPart(Cell left, Cell right, int familyCommonPrefix) {
return Bytes
.findCommonPrefix(left.getFamilyArray(), right.getFamilyArray(), left.getFamilyLength()
- familyCommonPrefix, right.getFamilyLength() - familyCommonPrefix,
left.getFamilyOffset() + familyCommonPrefix, right.getFamilyOffset()
+ familyCommonPrefix);
}
private static int findCommonPrefixInQualifierPart(Cell left, Cell right,
int qualifierCommonPrefix) {
return Bytes.findCommonPrefix(left.getQualifierArray(), right.getQualifierArray(),
left.getQualifierLength() - qualifierCommonPrefix, right.getQualifierLength()
- qualifierCommonPrefix, left.getQualifierOffset() + qualifierCommonPrefix,
right.getQualifierOffset() + qualifierCommonPrefix);
}
private void moveToPrevious() {
if (!previous.isValid()) {
throw new IllegalStateException(
"Can move back only once and not in first key in the block.");
}
STATE tmp = previous;
previous = current;
current = tmp;
// move after last key value
currentBuffer.position(current.nextKvOffset);
// Already decoded the tag bytes. We cache this tags into current state and also the total
// compressed length of the tags bytes. For the next time decodeNext() we don't need to decode
// the tags again. This might pollute the Data Dictionary what we use for the compression.
// When current.uncompressTags is false, we will just reuse the current.tagsBuffer and skip
// 'tagsCompressedLength' bytes of source stream.
// See in decodeTags()
current.tagsBuffer = previous.tagsBuffer;
current.tagsCompressedLength = previous.tagsCompressedLength;
current.uncompressTags = false;
// The current key has to be reset with the previous Cell
current.setKey(current.keyBuffer, current.memstoreTS);
previous.invalidate();
}
@SuppressWarnings("unchecked")
protected STATE createSeekerState() {
// This will fail for non-default seeker state if the subclass does not
// override this method.
return (STATE) new SeekerState(this.tmpPair, this.includesTags());
}
abstract protected void decodeFirst();
abstract protected void decodeNext();
}
/**
* @return unencoded size added
*/
protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
int size = 0;
if (encodingCtx.getHFileContext().isIncludesTags()) {
int tagsLength = cell.getTagsLength();
ByteBufferUtils.putCompressedInt(out, tagsLength);
// There are some tags to be written
if (tagsLength > 0) {
TagCompressionContext tagCompressionContext = encodingCtx.getTagCompressionContext();
// When tag compression is enabled, tagCompressionContext will have a not null value. Write
// the tags using Dictionary compression in such a case
if (tagCompressionContext != null) {
// Not passing tagsLength considering that parsing of the tagsLength is not costly
PrivateCellUtil.compressTags(out, cell, tagCompressionContext);
} else {
PrivateCellUtil.writeTags(out, cell, tagsLength);
}
}
size += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
}
if (encodingCtx.getHFileContext().isIncludesMvcc()) {
// Copy memstore timestamp from the byte buffer to the output stream.
long memstoreTS = cell.getSequenceId();
WritableUtils.writeVLong(out, memstoreTS);
// TODO use a writeVLong which returns the #bytes written so that 2 time parsing can be
// avoided.
size += WritableUtils.getVIntSize(memstoreTS);
}
return size;
}
protected final void afterDecodingKeyValue(DataInputStream source,
ByteBuffer dest, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
if (decodingCtx.getHFileContext().isIncludesTags()) {
int tagsLength = ByteBufferUtils.readCompressedInt(source);
// Put as unsigned short
dest.put((byte) ((tagsLength >> 8) & 0xff));
dest.put((byte) (tagsLength & 0xff));
if (tagsLength > 0) {
TagCompressionContext tagCompressionContext = decodingCtx.getTagCompressionContext();
// When tag compression is been used in this file, tagCompressionContext will have a not
// null value passed.
if (tagCompressionContext != null) {
tagCompressionContext.uncompressTags(source, dest, tagsLength);
} else {
ByteBufferUtils.copyFromStreamToBuffer(dest, source, tagsLength);
}
}
}
if (decodingCtx.getHFileContext().isIncludesMvcc()) {
long memstoreTS = -1;
try {
// Copy memstore timestamp from the data input stream to the byte
// buffer.
memstoreTS = WritableUtils.readVLong(source);
ByteBufferUtils.writeVLong(dest, memstoreTS);
} catch (IOException ex) {
throw new RuntimeException("Unable to copy memstore timestamp " +
memstoreTS + " after decoding a key/value");
}
}
}
protected abstract ByteBuffer internalDecodeKeyValues(DataInputStream source,
int allocateHeaderLength, int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx)
throws IOException;
/**
* Asserts that there is at least the given amount of unfilled space
* remaining in the given buffer.
* @param out typically, the buffer we are writing to
* @param length the required space in the buffer
* @throws EncoderBufferTooSmallException If there are no enough bytes.
*/
protected static void ensureSpace(ByteBuffer out, int length)
throws EncoderBufferTooSmallException {
if (out.position() + length > out.limit()) {
throw new EncoderBufferTooSmallException(
"Buffer position=" + out.position() +
", buffer limit=" + out.limit() +
", length to be written=" + length);
}
}
@Override
public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
throws IOException {
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
throw new IOException(this.getClass().getName() + " only accepts "
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " +
"encoding context.");
}
HFileBlockDefaultEncodingContext encodingCtx =
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding(out);
if (encodingCtx.getHFileContext().isIncludesTags()
&& encodingCtx.getHFileContext().isCompressTags()) {
if (encodingCtx.getTagCompressionContext() != null) {
// It will be overhead to create the TagCompressionContext again and again for every block
// encoding.
encodingCtx.getTagCompressionContext().clear();
} else {
try {
TagCompressionContext tagCompressionContext = new TagCompressionContext(
LRUDictionary.class, Byte.MAX_VALUE);
encodingCtx.setTagCompressionContext(tagCompressionContext);
} catch (Exception e) {
throw new IOException("Failed to initialize TagCompressionContext", e);
}
}
}
StreamUtils.writeInt(out, 0); // DUMMY length. This will be updated in endBlockEncoding()
blkEncodingCtx.setEncodingState(new BufferedDataBlockEncodingState());
}
private static class BufferedDataBlockEncodingState extends EncodingState {
int unencodedDataSizeWritten = 0;
}
@Override
public int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException {
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
.getEncodingState();
int encodedKvSize = internalEncode(cell, (HFileBlockDefaultEncodingContext) encodingCtx, out);
state.unencodedDataSizeWritten += encodedKvSize;
return encodedKvSize;
}
public abstract int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCtx,
DataOutputStream out) throws IOException;
@Override
public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader) throws IOException {
BufferedDataBlockEncodingState state = (BufferedDataBlockEncodingState) encodingCtx
.getEncodingState();
// Write the unencodedDataSizeWritten (with header size)
Bytes.putInt(uncompressedBytesWithHeader,
HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
state.unencodedDataSizeWritten);
postEncoding(encodingCtx);
}
}