blob: 5b53e772b2806284585a6d85c3cb4ed9ef468416 [file] [log] [blame]
/*
* Copyright © 2014 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package co.cask.tephra.hbase98.coprocessor;
import co.cask.tephra.Transaction;
import co.cask.tephra.TxConstants;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Map;
import javax.annotation.Nullable;
/**
* Applies filtering of data based on transactional visibility (HBase 0.98+ specific version).
* Note: this is intended for server-side use only, as additional properties need to be set on
* any {@code Scan} or {@code Get} operation performed.
*/
public class TransactionVisibilityFilter extends FilterBase {
private static final Log LOG = LogFactory.getLog(TransactionVisibilityFilter.class);
private final Transaction tx;
// oldest visible timestamp by column family, used to apply TTL when reading
private final Map<byte[], Long> oldestTsByFamily;
// whether or not we can remove delete markers
// these can only be safely removed when we are traversing all storefiles
private final boolean clearDeletes;
// optional sub-filter to apply to visible cells
private final Filter cellFilter;
// since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV
private byte[] currentFamily = new byte[0];
private long currentOldestTs;
/**
* Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
*
* @param tx the current transaction to apply. Only data visible to this transaction will be returned.
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param scanType the type of scan operation being performed
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, ScanType scanType) {
this(tx, ttlByFamily, scanType, null);
}
/**
* Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions.
*
* @param tx the current transaction to apply. Only data visible to this transaction will be returned.
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
* @param scanType the type of scan operation being performed
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, ScanType scanType,
@Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
}
this.clearDeletes =
scanType == ScanType.COMPACT_DROP_DELETES || scanType == ScanType.USER_SCAN;
this.cellFilter = cellFilter;
}
@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
if (!CellUtil.matchingFamily(cell, currentFamily)) {
// column family changed
currentFamily = CellUtil.cloneFamily(cell);
Long familyOldestTs = oldestTsByFamily.get(currentFamily);
currentOldestTs = familyOldestTs != null ? familyOldestTs : 0;
}
// need to apply TTL for the column family here
long kvTimestamp = cell.getTimestamp();
if (kvTimestamp < currentOldestTs) {
// passed TTL for this column, seek to next
return ReturnCode.NEXT_COL;
} else if (tx.isVisible(kvTimestamp)) {
if (isDelete(cell)) {
if (clearDeletes) {
LOG.info("Clearing delete marker");
// skip "deleted" cell
return ReturnCode.NEXT_COL;
} else {
LOG.info("Keeping delete marker");
// keep the marker but skip any remaining versions
return ReturnCode.INCLUDE_AND_NEXT_COL;
}
}
// cell is visible
if (cellFilter != null) {
return cellFilter.filterKeyValue(cell);
} else {
// as soon as we find a KV to include we can move to the next column
return ReturnCode.INCLUDE_AND_NEXT_COL;
}
} else {
return ReturnCode.SKIP;
}
}
private boolean isDelete(Cell cell) {
LOG.info("Cell: " + cell);
LOG.info("Tag offset: " + cell.getTagsOffset() + ", length: " + cell.getTagsLength());
Tag deleteTag = null;
if (cell.getTagsLength() > 0) {
deleteTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
TxConstants.HBase.CELL_TAG_TYPE_DELETE);
}
LOG.info("Tag: " + deleteTag);
return deleteTag != null;
}
@Override
public byte[] toByteArray() throws IOException {
return super.toByteArray();
}
}