| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.filter; |
| |
| import java.io.DataInput; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.TreeSet; |
| |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.phoenix.expression.Expression; |
| import org.apache.phoenix.expression.KeyValueColumnExpression; |
| import org.apache.phoenix.expression.visitor.ExpressionVisitor; |
| import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; |
| import org.apache.phoenix.schema.tuple.BaseTuple; |
| |
| |
| /** |
| * |
| * Modeled after {@link org.apache.hadoop.hbase.filter.SingleColumnValueFilter}, |
| * but for general expression evaluation in the case where multiple KeyValue |
| * columns are referenced in the expression. |
| * |
| */ |
| public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFilter { |
| private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; |
| |
| private Boolean matchedColumn; |
| protected final IncrementalResultTuple inputTuple = new IncrementalResultTuple(); |
| protected TreeSet<byte[]> cfSet; |
| |
| public MultiKeyValueComparisonFilter() { |
| } |
| |
| public MultiKeyValueComparisonFilter(Expression expression) { |
| super(expression); |
| init(); |
| } |
| |
| private static final class CellRef { |
| public Cell cell; |
| |
| @Override |
| public String toString() { |
| if(cell != null) { |
| return cell.toString() + " value = " + Bytes.toStringBinary( |
| cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); |
| } else { |
| return super.toString(); |
| } |
| } |
| } |
| |
| protected abstract Object setColumnKey(byte[] cf, int cfOffset, int cfLength, byte[] cq, int cqOffset, int cqLength); |
| protected abstract Object newColumnKey(byte[] cf, int cfOffset, int cfLength, byte[] cq, int cqOffset, int cqLength); |
| |
| private final class IncrementalResultTuple extends BaseTuple { |
| private int refCount; |
| private final ImmutableBytesWritable keyPtr = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER); |
| private final Map<Object,CellRef> foundColumns = new HashMap<Object,CellRef>(5); |
| |
| public void reset() { |
| refCount = 0; |
| keyPtr.set(UNITIALIZED_KEY_BUFFER); |
| for (CellRef ref : foundColumns.values()) { |
| ref.cell = null; |
| } |
| } |
| |
| @Override |
| public boolean isImmutable() { |
| return refCount == foundColumns.size(); |
| } |
| |
| public void setImmutable() { |
| refCount = foundColumns.size(); |
| } |
| |
| private ReturnCode resolveColumn(Cell value) { |
| // Always set key, in case we never find a key value column of interest, |
| // and our expression uses row key columns. |
| setKey(value); |
| Object ptr = setColumnKey(value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(), |
| value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength()); |
| CellRef ref = foundColumns.get(ptr); |
| if (ref == null) { |
| // Return INCLUDE_AND_NEXT_COL here. Although this filter doesn't need this KV |
| // it should still be projected into the Result |
| return ReturnCode.INCLUDE_AND_NEXT_COL; |
| } |
| // Since we only look at the latest key value for a given column, |
| // we are not interested in older versions |
| // TODO: test with older versions to confirm this doesn't get tripped |
| // This shouldn't be necessary, because a scan only looks at the latest |
| // version |
| if (ref.cell != null) { |
| // Can't do NEXT_ROW, because then we don't match the other columns |
| // SKIP, INCLUDE, and NEXT_COL seem to all act the same |
| return ReturnCode.NEXT_COL; |
| } |
| ref.cell = value; |
| refCount++; |
| return null; |
| } |
| |
| public void addColumn(byte[] cf, byte[] cq) { |
| Object ptr = MultiKeyValueComparisonFilter.this.newColumnKey(cf, 0, cf.length, cq, 0, cq.length); |
| foundColumns.put(ptr, new CellRef()); |
| } |
| |
| public void setKey(Cell value) { |
| keyPtr.set(value.getRowArray(), value.getRowOffset(), value.getRowLength()); |
| } |
| |
| @Override |
| public void getKey(ImmutableBytesWritable ptr) { |
| ptr.set(keyPtr.get(),keyPtr.getOffset(),keyPtr.getLength()); |
| } |
| |
| @Override |
| public Cell getValue(byte[] cf, byte[] cq) { |
| Object ptr = setColumnKey(cf, 0, cf.length, cq, 0, cq.length); |
| CellRef ref = foundColumns.get(ptr); |
| return ref == null ? null : ref.cell; |
| } |
| |
| @Override |
| public String toString() { |
| return foundColumns.toString(); |
| } |
| |
| @Override |
| public int size() { |
| return refCount; |
| } |
| |
| @Override |
| public Cell getValue(int index) { |
| // This won't perform very well, but it's not |
| // currently used anyway |
| for (CellRef ref : foundColumns.values()) { |
| if (ref.cell == null) { |
| continue; |
| } |
| if (index == 0) { |
| return ref.cell; |
| } |
| index--; |
| } |
| throw new IndexOutOfBoundsException(Integer.toString(index)); |
| } |
| |
| @Override |
| public boolean getValue(byte[] family, byte[] qualifier, |
| ImmutableBytesWritable ptr) { |
| Cell cell = getValue(family, qualifier); |
| if (cell == null) |
| return false; |
| ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); |
| return true; |
| } |
| } |
| |
| protected void init() { |
| cfSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); |
| ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { |
| @Override |
| public Void visit(KeyValueColumnExpression expression) { |
| inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); |
| return null; |
| } |
| }; |
| expression.accept(visitor); |
| } |
| |
| @Override |
| public ReturnCode filterKeyValue(Cell cell) { |
| if (Boolean.TRUE.equals(this.matchedColumn)) { |
| // We already found and matched the single column, all keys now pass |
| return ReturnCode.INCLUDE_AND_NEXT_COL; |
| } |
| if (Boolean.FALSE.equals(this.matchedColumn)) { |
| // We found all the columns, but did not match the expression, so skip to next row |
| return ReturnCode.NEXT_ROW; |
| } |
| // This is a key value we're not interested in (TODO: why INCLUDE here instead of NEXT_COL?) |
| ReturnCode code = inputTuple.resolveColumn(cell); |
| if (code != null) { |
| return code; |
| } |
| |
| // We found a new column, so we can re-evaluate |
| // TODO: if we have row key columns in our expression, should |
| // we always evaluate or just wait until the end? |
| this.matchedColumn = this.evaluate(inputTuple); |
| if (this.matchedColumn == null) { |
| if (inputTuple.isImmutable()) { |
| this.matchedColumn = Boolean.FALSE; |
| } else { |
| return ReturnCode.INCLUDE_AND_NEXT_COL; |
| } |
| } |
| return this.matchedColumn ? ReturnCode.INCLUDE_AND_NEXT_COL : ReturnCode.NEXT_ROW; |
| } |
| |
| @Override |
| public boolean filterRow() { |
| if (this.matchedColumn == null && !inputTuple.isImmutable() && expression.requiresFinalEvaluation()) { |
| inputTuple.setImmutable(); |
| this.matchedColumn = this.evaluate(inputTuple); |
| } |
| |
| return ! (Boolean.TRUE.equals(this.matchedColumn)); |
| } |
| |
| @Override |
| public void reset() { |
| matchedColumn = null; |
| inputTuple.reset(); |
| super.reset(); |
| } |
| |
| @Override |
| public boolean isFamilyEssential(byte[] name) { |
| // Only the column families involved in the expression are essential. |
| // The others are for columns projected in the select expression. |
| return cfSet.contains(name); |
| } |
| |
| @Override |
| public void readFields(DataInput input) throws IOException { |
| super.readFields(input); |
| init(); |
| } |
| } |