blob: a8fe3b2e87e8b39d6d69cc1b88a07116b47e18ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.transaction;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.commons.collections4.map.LRUMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.OmidFilterBase;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TransactionVisibilityFilterBase extends OmidFilterBase {
// optional sub-filter to apply to visible cells
private final Filter userFilter;
private final SnapshotFilterImpl snapshotFilter;
private final LRUMap<Long ,Long> commitCache;
private final HBaseTransaction hbaseTransaction;
// This cache is cleared when moving to the next row
// So no need to keep row name
private final Map<ImmutableBytesWritable, Long> familyDeletionCache;
public TransactionVisibilityFilterBase(Filter cellFilter,
SnapshotFilterImpl snapshotFilter,
HBaseTransaction hbaseTransaction) {
this.userFilter = cellFilter;
this.snapshotFilter = snapshotFilter;
commitCache = new LRUMap<>(1000);
this.hbaseTransaction = hbaseTransaction;
familyDeletionCache = new HashMap<>();
}
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
if (CellUtils.isShadowCell(v)) {
Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
commitCache.put(v.getTimestamp(), commitTs);
// Continue getting shadow cells until one of them fits this transaction
if (hbaseTransaction.getStartTimestamp() >= commitTs) {
return ReturnCode.NEXT_COL;
} else {
return ReturnCode.SKIP;
}
}
Optional<Long> commitTS = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
if (commitTS.isPresent()) {
if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
return runUserFilter(v, ReturnCode.INCLUDE);
}
if (CellUtils.isFamilyDeleteCell(v)) {
familyDeletionCache.put(createImmutableBytesWritable(v), commitTS.get());
if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
} else {
return ReturnCode.NEXT_COL;
}
}
Long deleteCommit = familyDeletionCache.get(createImmutableBytesWritable(v));
if (deleteCommit != null && deleteCommit >= v.getTimestamp()) {
if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
} else {
return ReturnCode.NEXT_COL;
}
}
if (CellUtils.isTombstone(v)) {
if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
} else {
return ReturnCode.NEXT_COL;
}
}
return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
}
return ReturnCode.SKIP;
}
private ImmutableBytesWritable createImmutableBytesWritable(Cell v) {
return new ImmutableBytesWritable(v.getFamilyArray(),
v.getFamilyOffset(),v.getFamilyLength());
}
private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
throws IOException {
assert(snapshotReturn == ReturnCode.INCLUDE_AND_NEXT_COL || snapshotReturn == ReturnCode.INCLUDE);
if (userFilter == null) {
return snapshotReturn;
}
ReturnCode userRes = userFilter.filterKeyValue(v);
switch (userRes) {
case INCLUDE:
return snapshotReturn;
case SKIP:
return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
default:
return userRes;
}
}
// For family delete cells, the sc hasn't arrived yet so get sc from region before going to ct
private Optional<Long> getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException {
Long cachedCommitTS = commitCache.get(v.getTimestamp());
if (cachedCommitTS != null) {
if (hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
return Optional.of(cachedCommitTS);
} else {
return Optional.absent();
}
}
if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
return Optional.of(v.getTimestamp());
}
if (getShadowCellBeforeCT) {
// Try to get shadow cell from region
final Get get = new Get(CellUtil.cloneRow(v));
get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
Result shadowCell = snapshotFilter.getTableAccessWrapper().get(get);
if (!shadowCell.isEmpty()) {
long commitTS = Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0]));
commitCache.put(v.getTimestamp(), commitTS);
if (commitTS <= hbaseTransaction.getStartTimestamp()) {
return Optional.of(commitTS);
}
}
}
Optional<Long> commitTS = snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
if (commitTS.isPresent()) {
commitCache.put(v.getTimestamp(), commitTS.get());
} else {
commitCache.put(v.getTimestamp(), Long.MAX_VALUE);
}
return commitTS;
}
@Override
public void reset() throws IOException {
familyDeletionCache.clear();
if (userFilter != null) {
userFilter.reset();
}
}
@Override
public boolean filterRow() throws IOException {
if (userFilter != null) {
return userFilter.filterRow();
}
return super.filterRow();
}
@Override
public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
if (userFilter != null) {
return userFilter.filterRowKey(buffer, offset, length);
}
return super.filterRowKey(buffer, offset, length);
}
@Override
public boolean filterAllRemaining() throws IOException {
if (userFilter != null) {
return userFilter.filterAllRemaining();
}
return super.filterAllRemaining();
}
@Override
public void filterRowCells(List<Cell> kvs) throws IOException {
if (userFilter != null) {
userFilter.filterRowCells(kvs);
} else {
super.filterRowCells(kvs);
}
}
@Override
public boolean hasFilterRow() {
if (userFilter != null) {
return userFilter.hasFilterRow();
}
return super.hasFilterRow();
}
@Override
public Cell getNextCellHint(Cell currentKV) throws IOException {
if (userFilter != null) {
return userFilter.getNextCellHint(currentKV);
}
return super.getNextCellHint(currentKV);
}
@Override
public boolean isFamilyEssential(byte[] name) throws IOException {
if (userFilter != null) {
return userFilter.isFamilyEssential(name);
}
return super.isFamilyEssential(name);
}
@Override
public byte[] toByteArray() throws IOException {
return super.toByteArray();
}
public Filter getInnerFilter() {
return userFilter;
}
}