blob: 70c1d277e559c68ab21b9517a592114d33634824 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.commons.collections4.map.LRUMap;
import org.apache.omid.HBaseShims;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.Client;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.transaction.CellUtils;
import org.apache.omid.transaction.CellInfo;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
public class CompactorScanner implements InternalScanner {
private static final Logger LOG = LoggerFactory.getLogger(CompactorScanner.class);
private final InternalScanner internalScanner;
private final CommitTable.Client commitTableClient;
private final boolean isMajorCompaction;
private final boolean retainNonTransactionallyDeletedCells;
private final long lowWatermark;
private final Region hRegion;
private boolean hasMoreRows = false;
private List<Cell> currentRowWorthValues = new ArrayList<Cell>();
private final LRUMap<Long ,Optional<CommitTimestamp>> commitCache;
public CompactorScanner(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner internalScanner,
Client commitTableClient,
boolean isMajorCompaction,
boolean preserveNonTransactionallyDeletedCells) throws IOException {
this.internalScanner = internalScanner;
this.commitTableClient = commitTableClient;
this.isMajorCompaction = isMajorCompaction;
this.retainNonTransactionallyDeletedCells = preserveNonTransactionallyDeletedCells;
this.lowWatermark = getLowWatermarkFromCommitTable();
// Obtain the table in which the scanner is going to operate
this.hRegion = HBaseShims.getRegionCoprocessorRegion(e.getEnvironment());
commitCache = new LRUMap<>(1000);
LOG.info("Scanner cleaning up uncommitted txs older than LW [{}] in region [{}]",
lowWatermark, hRegion.getRegionInfo());
}
@Override
public boolean next(List<Cell> results) throws IOException {
return next(results, -1);
}
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
int limit = scannerContext.getBatchLimit();
return next(result, limit);
}
public boolean next(List<Cell> result, int limit) throws IOException {
if (currentRowWorthValues.isEmpty()) {
// 1) Read next row
List<Cell> scanResult = new ArrayList<Cell>();
hasMoreRows = internalScanner.next(scanResult);
if (LOG.isTraceEnabled()) {
LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
}
// 2) Traverse result list separating normal cells from shadow
// cells and building a map to access easily the shadow cells.
SortedMap<Cell, Optional<Cell>> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);
// 3) traverse the list of row key values isolated before and
// check which ones should be discarded
Map<String, CellInfo> lastTimestampedCellsInRow = new HashMap<>();
PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter
= Iterators.peekingIterator(cellToSc.entrySet().iterator());
while (iter.hasNext()) {
Map.Entry<Cell, Optional<Cell>> entry = iter.next();
Cell cell = entry.getKey();
Optional<Cell> shadowCellOp = entry.getValue();
if (cell.getTimestamp() > lowWatermark) {
retain(currentRowWorthValues, cell, shadowCellOp);
continue;
}
if (shouldRetainNonTransactionallyDeletedCell(cell)) {
retain(currentRowWorthValues, cell, shadowCellOp);
continue;
}
// During a minor compaction the coprocessor may only see a
// subset of store files and may not have the all the versions
// of a cell available for consideration. Therefore, if it
// deletes a cell with a tombstone during a minor compaction,
// an older version of the cell may become visible again. So,
// we have to remove tombstones only in major compactions.
if (isMajorCompaction) {
// Strong assumption that family delete cells arrive first before any other column
if (CellUtils.isTombstone(cell)) {
if (shadowCellOp.isPresent()) {
skipToNextColumn(cell, iter);
} else {
Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
// Clean the cell only if it is valid
if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
skipToNextColumn(cell, iter);
}
}
continue;
}
}
if (shadowCellOp.isPresent()) {
saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCellOp.get());
} else {
Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
// Build the missing shadow cell...
byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
} else {
LOG.trace("Discarding cell {}", cell);
}
}
}
retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);
// 4) Sort the list
Collections.sort(currentRowWorthValues, KeyValue.COMPARATOR);
}
// Chomp current row worth values up to the limit
if (currentRowWorthValues.size() <= limit || limit == -1) {
result.addAll(currentRowWorthValues);
currentRowWorthValues.clear();
} else {
result.addAll(currentRowWorthValues.subList(0, limit));
currentRowWorthValues.subList(0, limit).clear();
}
LOG.trace("Results to preserve {}", result);
return hasMoreRows;
}
@Override
public void close() throws IOException {
internalScanner.close();
}
// ----------------------------------------------------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
@VisibleForTesting
public boolean shouldRetainNonTransactionallyDeletedCell(Cell cell) {
return (CellUtil.isDelete(cell) || CellUtil.isDeleteFamily(cell))
&&
retainNonTransactionallyDeletedCells;
}
private void saveLastTimestampedCell(Map<String, CellInfo> lastCells, Cell cell, Cell shadowCell) {
String cellKey = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
+ ":"
+ Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
LOG.trace("Cell Key: {}", cellKey);
if (!lastCells.containsKey(cellKey)) {
lastCells.put(cellKey, new CellInfo(cell, shadowCell));
} else {
if (lastCells.get(cellKey).getTimestamp() < cell.getTimestamp()) {
lastCells.put(cellKey, new CellInfo(cell, shadowCell));
} else {
LOG.trace("Forgetting old cell {}", cell);
}
}
}
private long getLowWatermarkFromCommitTable() throws IOException {
try {
LOG.trace("About to read log watermark from commit table");
return commitTableClient.readLowWatermark().get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted getting low watermark from commit table", ie);
throw new IOException("Interrupted getting low watermark from commit table");
} catch (ExecutionException ee) {
LOG.warn("Problem getting low watermark from commit table");
throw new IOException("Problem getting low watermark from commit table", ee.getCause());
}
}
private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
if (cachedValue != null) {
return cachedValue;
}
try {
Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
if (ct.isPresent()) {
commitCache.put(cell.getTimestamp(), ct);
return Optional.of(ct.get());
} else {
Get g = new Get(CellUtil.cloneRow(cell));
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
cell.getQualifierOffset(),
cell.getQualifierLength());
g.addColumn(family, qualifier);
g.setTimeStamp(cell.getTimestamp());
Result r = hRegion.get(g);
if (r.containsColumn(family, qualifier)) {
Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
Bytes.toLong(r.getValue(family, qualifier)), true));
commitCache.put(cell.getTimestamp(), retval);
return retval;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while getting commit timestamp from commit table");
} catch (ExecutionException e) {
throw new IOException("Error getting commit timestamp from commit table", e);
}
commitCache.put(cell.getTimestamp(), Optional.<CommitTimestamp>absent());
return Optional.absent();
}
private void retain(List<Cell> result, Cell cell, Optional<Cell> shadowCell) {
LOG.trace("Retaining cell {}", cell);
result.add(cell);
if (shadowCell.isPresent()) {
LOG.trace("...with shadow cell {}", cell, shadowCell.get());
result.add(shadowCell.get());
} else {
LOG.trace("...without shadow cell! (TS is above Low Watermark)");
}
}
private void retainLastTimestampedCellsSaved(List<Cell> result, Map<String, CellInfo> lastTimestampedCellsInRow) {
for (CellInfo cellInfo : lastTimestampedCellsInRow.values()) {
LOG.trace("Retaining last cell {} with shadow cell {}", cellInfo.getCell(), cellInfo.getShadowCell());
result.add(cellInfo.getCell());
result.add(cellInfo.getShadowCell());
}
}
private void skipToNextColumn(Cell cell, PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter) {
boolean isFamilyDelete = CellUtils.isFamilyDeleteCell(cell);
while (iter.hasNext()
&& CellUtil.matchingFamily(iter.peek().getKey(), cell)
&& (CellUtil.matchingQualifier(iter.peek().getKey(), cell) || isFamilyDelete)) {
iter.next();
}
}
}