blob: dacec744134e0289f512c6d01e5fb2b99e18113d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.KeyRange.Bound;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
/**
*
* Filter that seeks based on CNF containing anded and ored key ranges
*
* TODO: figure out when to reset/not reset position array
*
*
* @since 0.1
*/
public class SkipScanFilter extends FilterBase {
private enum Terminate {AT, AFTER};
// Conjunctive normal form of or-ed ranges or point lookups
private List<List<KeyRange>> slots;
// schema of the row key
private RowKeySchema schema;
// current position for each slot
private int[] position;
// buffer used for skip hint
private int maxKeyLength;
private byte[] startKey;
private int startKeyLength;
private byte[] endKey;
private int endKeyLength;
private boolean isDone;
private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
/**
* We know that initially the first row will be positioned at or
* after the first possible key.
*/
public SkipScanFilter() {
}
public SkipScanFilter(List<List<KeyRange>> slots, RowKeySchema schema) {
init(slots, schema);
}
private void init(List<List<KeyRange>> slots, RowKeySchema schema) {
for (List<KeyRange> ranges : slots) {
if (ranges.isEmpty()) {
throw new IllegalStateException();
}
}
this.slots = slots;
this.schema = schema;
this.maxKeyLength = SchemaUtil.getMaxKeyLength(schema, slots);
this.position = new int[slots.size()];
startKey = new byte[maxKeyLength];
endKey = new byte[maxKeyLength];
endKeyLength = 0;
}
// Exposed for testing.
List<List<KeyRange>> getSlots() {
return slots;
}
@Override
public boolean filterAllRemaining() {
return isDone;
}
@Override
public ReturnCode filterKeyValue(KeyValue kv) {
return navigate(kv.getBuffer(), kv.getRowOffset(),kv.getRowLength(),Terminate.AFTER);
}
@Override
public KeyValue getNextKeyHint(KeyValue kv) {
// TODO: don't allocate new key value every time here if possible
return isDone ? null : new KeyValue(startKey, 0, startKeyLength,
null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
}
public boolean hasIntersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) {
return intersect(lowerInclusiveKey, upperExclusiveKey, null);
}
/**
* Intersect the ranges of this filter with the ranges form by lowerInclusive and upperInclusive
* key and filter out the ones that are not included in the region. Return the new intersected
* SkipScanFilter or null if there is no intersection.
*/
public SkipScanFilter intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) {
List<List<KeyRange>> newSlots = Lists.newArrayListWithCapacity(slots.size());
if (intersect(lowerInclusiveKey, upperExclusiveKey, newSlots)) {
return new SkipScanFilter(newSlots, schema);
}
return null;
}
private boolean areSlotsSingleKey(int startPosInclusive, int endPosExclusive) {
for (int i = startPosInclusive; i < endPosExclusive; i++) {
if (!slots.get(i).get(position[i]).isSingleKey()) {
return false;
}
}
return true;
}
private boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey, List<List<KeyRange>> newSlots) {
boolean lowerUnbound = (lowerInclusiveKey.length == 0);
Arrays.fill(position, 0);
isDone = false;
int startPos = 0;
int lastSlot = slots.size()-1;
if (!lowerUnbound) {
// Find the position of the first slot of the lower range
schema.next(ptr, 0, schema.iterator(lowerInclusiveKey,ptr));
startPos = ScanUtil.searchClosestKeyRangeWithUpperHigherThanPtr(slots.get(0), ptr, 0);
// Lower range is past last upper range of first slot, so cannot possibly be in range
if (startPos >= slots.get(0).size()) {
return false;
}
}
boolean upperUnbound = (upperExclusiveKey.length == 0);
int endPos = slots.get(0).size()-1;
if (!upperUnbound) {
// Find the position of the first slot of the upper range
schema.next(ptr, 0, schema.iterator(upperExclusiveKey,ptr));
endPos = ScanUtil.searchClosestKeyRangeWithUpperHigherThanPtr(slots.get(0), ptr, startPos);
// Upper range lower than first lower range of first slot, so cannot possibly be in range
if (endPos == 0 && Bytes.compareTo(upperExclusiveKey, slots.get(0).get(0).getLowerRange()) <= 0) {
return false;
}
// Past last position, so we can include everything from the start position
if (endPos >= slots.get(0).size()) {
upperUnbound = true;
endPos = slots.get(0).size()-1;
}
}
if (!lowerUnbound) {
position[0] = startPos;
navigate(lowerInclusiveKey, 0, lowerInclusiveKey.length, Terminate.AFTER);
if (filterAllRemaining()) {
return false;
}
}
if (upperUnbound) {
if (newSlots != null) {
newSlots.add(slots.get(0).subList(startPos, endPos+1));
newSlots.addAll(slots.subList(1, slots.size()));
}
return true;
}
int[] lowerPosition = Arrays.copyOf(position, position.length);
// Navigate to the upperExclusiveKey, but not past it
ReturnCode endCode = navigate(upperExclusiveKey, 0, upperExclusiveKey.length, Terminate.AT);
if (endCode == ReturnCode.INCLUDE) {
setStartKey();
// If the upperExclusiveKey is equal to the start key, we've gone one position too far, since
// our upper key is exclusive. In that case, go to the previous key
if (Bytes.compareTo(startKey, 0, startKeyLength, upperExclusiveKey, 0, upperExclusiveKey.length) == 0 &&
(previousPosition(lastSlot) < 0 || position[0] < lowerPosition[0])) {
// If by backing up one position we have an empty range, then return
return false;
}
} else if (endCode == ReturnCode.SEEK_NEXT_USING_HINT) {
// The upperExclusive key is smaller than the slots stored in the position. Check if it's the same position
// as the slots for lowerInclusive. If so, there is no intersection.
if (Arrays.equals(lowerPosition, position) && areSlotsSingleKey(0, position.length-1)) {
return false;
}
}
// Copy inclusive all positions
for (int i = 0; i <= lastSlot; i++) {
List<KeyRange> newRanges = slots.get(i).subList(lowerPosition[i], Math.min(position[i] + 1, slots.get(i).size()));
if (newRanges.isEmpty()) {
return false;
}
if (newSlots != null) {
newSlots.add(newRanges);
}
if (position[i] > lowerPosition[i]) {
if (newSlots != null) {
newSlots.addAll(slots.subList(i+1, slots.size()));
}
break;
}
}
return true;
}
private int previousPosition(int i) {
while (i >= 0 && --position[i] < 0) {
position[i] = slots.get(i).size()-1;
i--;
}
return i;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="QBA_QUESTIONABLE_BOOLEAN_ASSIGNMENT",
justification="Assignment designed to work this way.")
private ReturnCode navigate(final byte[] currentKey, final int offset, final int length, Terminate terminate) {
int nSlots = slots.size();
// First check to see if we're in-range until we reach our end key
if (endKeyLength > 0) {
if (Bytes.compareTo(currentKey, offset, length, endKey, 0, endKeyLength) < 0) {
return ReturnCode.INCLUDE;
}
// If key range of last slot is a single key, we can increment our position
// since we know we'll be past the current row after including it.
if (slots.get(nSlots-1).get(position[nSlots-1]).isSingleKey()) {
if (nextPosition(nSlots-1) < 0) {
// Current row will be included, but we have no more
isDone = true;
return ReturnCode.NEXT_ROW;
}
}
else {
// Reset the positions to zero from the next slot after the earliest ranged slot, since the
// next key could be bigger at this ranged slot, and smaller than the current position of
// less significant slots.
int earliestRangeIndex = nSlots-1;
for (int i = 0; i < nSlots; i++) {
if (!slots.get(i).get(position[i]).isSingleKey()) {
earliestRangeIndex = i;
break;
}
}
Arrays.fill(position, earliestRangeIndex+1, position.length, 0);
}
}
endKeyLength = 0;
// We could have included the previous
if (isDone) {
return ReturnCode.NEXT_ROW;
}
int i = 0;
boolean seek = false;
int earliestRangeIndex = nSlots-1;
int minOffset = offset;
int maxOffset = schema.iterator(currentKey, minOffset, length, ptr);
schema.next(ptr, i, maxOffset);
while (true) {
// Increment to the next range while the upper bound of our current slot is less than our current key
while (position[i] < slots.get(i).size() && slots.get(i).get(position[i]).compareUpperToLowerBound(ptr) < 0) {
position[i]++;
}
Arrays.fill(position, i+1, position.length, 0);
if (position[i] >= slots.get(i).size()) {
// Our current key is bigger than the last range of the current slot.
// If navigating after current key, backtrack and increment the key of the previous slot values.
// If navigating to current key, just return
if (terminate == Terminate.AT) {
return ReturnCode.SEEK_NEXT_USING_HINT;
}
if (i == 0) {
isDone = true;
return ReturnCode.NEXT_ROW;
}
// Increment key and backtrack until in range. We know at this point that we'll be
// issuing a seek next hint.
seek = true;
Arrays.fill(position, i, position.length, 0);
int j = i - 1;
// If we're positioned at a single key, no need to copy the current key and get the next key .
// Instead, just increment to the next key and continue.
boolean incremented = false;
while (j >= 0 && slots.get(j).get(position[j]).isSingleKey() && (incremented=true) && (position[j] = (position[j] + 1) % slots.get(j).size()) == 0) {
j--;
incremented = false;
}
if (j < 0) {
isDone = true;
return ReturnCode.NEXT_ROW;
}
if (incremented) {
// Continue the loop after setting the start key, because our start key maybe smaller than
// the current key, so we'll end up incrementing the start key until it's bigger than the
// current key.
setStartKey();
schema.reposition(ptr, i, j, minOffset, maxOffset);
} else {
int currentLength = setStartKey(ptr, minOffset, j+1);
// From here on, we use startKey as our buffer (resetting minOffset and maxOffset)
// We've copied the part of the current key above that we need into startKey
// Reinitialize the iterator to be positioned at previous slot position
minOffset = 0;
maxOffset = startKeyLength;
schema.iterator(startKey, minOffset, maxOffset, ptr, j+1);
// Do nextKey after setting the accessor b/c otherwise the null byte may have
// been incremented causing us not to find it
ByteUtil.nextKey(startKey, currentLength);
}
i = j;
} else if (slots.get(i).get(position[i]).compareLowerToUpperBound(ptr) > 0) {
// Our current key is less than the lower range of the current position in the current slot.
// Seek to the lower range, since it's bigger than the current key
setStartKey(ptr, minOffset, i);
return ReturnCode.SEEK_NEXT_USING_HINT;
} else { // We're in range, check the next slot
if (!slots.get(i).get(position[i]).isSingleKey() && i < earliestRangeIndex) {
earliestRangeIndex = i;
}
// If we're past the last slot or we know we're seeking to the next (in
// which case the previously updated slot was verified to be within the
// range, so we don't need to check the rest of the slots. If we were
// to check the rest of the slots, we'd get into trouble because we may
// have a null byte that was incremented which screws up our schema.next call)
if (i == nSlots-1 || seek) {
break;
}
i++;
// If we run out of slots in our key, it means we have a partial key.
if (schema.next(ptr, i, maxOffset) == null) {
// If the rest of the slots are checking for IS NULL, then break because
// that's the case (since we don't store trailing nulls).
if (allTrailingNulls(i)) {
break;
}
// Otherwise we seek to the next start key because we're before it now
setStartKey(ptr, minOffset, i);
return ReturnCode.SEEK_NEXT_USING_HINT;
}
}
}
if (seek) {
return ReturnCode.SEEK_NEXT_USING_HINT;
}
// Else, we're in range for all slots and can include this row plus all rows
// up to the upper range of our last slot. We do this for ranges and single keys
// since we potentially have multiple key values for the same row key.
setEndKey(ptr, minOffset, i);
return ReturnCode.INCLUDE;
}
private boolean allTrailingNulls(int i) {
for (; i < slots.size(); i++) {
List<KeyRange> keyRanges = slots.get(i);
if (keyRanges.size() != 1) {
return false;
}
KeyRange keyRange = keyRanges.get(0);
if (!keyRange.isSingleKey()) {
return false;
}
if (keyRange.getLowerRange().length != 0) {
return false;
}
}
return true;
}
private int nextPosition(int i) {
while (i >= 0 && slots.get(i).get(position[i]).isSingleKey() && (position[i] = (position[i] + 1) % slots.get(i).size()) == 0) {
i--;
}
return i;
}
private void setStartKey() {
startKeyLength = setKey(Bound.LOWER, startKey, 0, 0);
}
private int setStartKey(ImmutableBytesWritable ptr, int offset, int i) {
int length = ptr.getOffset() - offset;
startKey = copyKey(startKey, length + this.maxKeyLength, ptr.get(), offset, length);
startKeyLength = length;
// Add separator byte if we're at the end of the buffer, since trailing separator bytes are stripped
if (ptr.getOffset() + ptr.getLength() == offset + length && i-1 > 0 && !schema.getField(i-1).getDataType().isFixedWidth()) {
startKey[startKeyLength++] = QueryConstants.SEPARATOR_BYTE;
}
startKeyLength += setKey(Bound.LOWER, startKey, startKeyLength, i);
return length;
}
private int setEndKey(ImmutableBytesWritable ptr, int offset, int i) {
int length = ptr.getOffset() - offset;
endKey = copyKey(endKey, length + this.maxKeyLength, ptr.get(), offset, length);
endKeyLength = length;
endKeyLength += setKey(Bound.UPPER, endKey, length, i);
return length;
}
private int setKey(Bound bound, byte[] key, int keyOffset, int slotStartIndex) {
return ScanUtil.setKey(schema, slots, position, bound, key, keyOffset, slotStartIndex, position.length);
}
private static byte[] copyKey(byte[] targetKey, int targetLength, byte[] sourceKey, int offset, int length) {
if (targetLength > targetKey.length) {
targetKey = new byte[targetLength];
}
System.arraycopy(sourceKey, offset, targetKey, 0, length);
return targetKey;
}
@Override
public void readFields(DataInput in) throws IOException {
RowKeySchema schema = new RowKeySchema();
schema.readFields(in);
int andLen = in.readInt();
List<List<KeyRange>> slots = Lists.newArrayListWithExpectedSize(andLen);
for (int i=0; i<andLen; i++) {
int orlen = in.readInt();
List<KeyRange> orclause = Lists.newArrayListWithExpectedSize(orlen);
slots.add(orclause);
for (int j=0; j<orlen; j++) {
KeyRange range = new KeyRange();
range.readFields(in);
orclause.add(range);
}
}
this.init(slots, schema);
}
@Override
public void write(DataOutput out) throws IOException {
schema.write(out);
out.writeInt(slots.size());
for (List<KeyRange> orclause : slots) {
out.writeInt(orclause.size());
for (KeyRange range : orclause) {
range.write(out);
}
}
}
@Override
public int hashCode() {
HashFunction hf = Hashing.goodFastHash(32);
Hasher h = hf.newHasher();
h.putInt(slots.size());
for (int i=0; i<slots.size(); i++) {
h.putInt(slots.get(i).size());
for (int j=0; j<slots.size(); j++) {
h.putBytes(slots.get(i).get(j).getLowerRange());
h.putBytes(slots.get(i).get(j).getUpperRange());
}
}
return h.hash().asInt();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof SkipScanFilter)) return false;
SkipScanFilter other = (SkipScanFilter)obj;
return Objects.equal(slots, other.slots) && Objects.equal(schema, other.schema);
}
@Override
public String toString() {
return "SkipScanFilter "+ slots.toString() ;
}
}