blob: 3bcc2cefa818823ea626dbf1c2b9a804c9effd6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
/**
* The store scanner that implements Phoenix TTL and Max Lookback. Phoenix overrides the
* HBase implementation of data retention policies which is built at the cell level, and implements
* its row level data retention within this store scanner.
*/
public class CompactionScanner implements InternalScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(CompactionScanner.class);
public static final String SEPARATOR = ":";
private final InternalScanner storeScanner;
private final Region region;
private final Store store;
private final Configuration config;
private final RegionCoprocessorEnvironment env;
private long maxLookbackWindowStart;
private long ttlWindowStart;
private long ttl;
private final long maxLookbackInMillis;
private int minVersion;
private int maxVersion;
private final boolean emptyCFStore;
private final boolean localIndex;
private final int familyCount;
private KeepDeletedCells keepDeletedCells;
private long compactionTime;
private final byte[] emptyCF;
private final byte[] emptyCQ;
private final byte[] storeColumnFamily;
private static Map<String, Long> maxLookbackMap = new ConcurrentHashMap<>();
private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
private HBaseLevelRowCompactor hBaseLevelRowCompactor;
public CompactionScanner(RegionCoprocessorEnvironment env,
Store store,
InternalScanner storeScanner,
long maxLookbackInMillis,
byte[] emptyCF,
byte[] emptyCQ) {
this.storeScanner = storeScanner;
this.region = env.getRegion();
this.store = store;
this.env = env;
this.emptyCF = emptyCF;
this.emptyCQ = emptyCQ;
this.config = env.getConfiguration();
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
this.maxLookbackInMillis = maxLookbackInMillis;
String columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
String tableName = region.getRegionInfo().getTable().getNameAsString();
Long overriddenMaxLookback =
maxLookbackMap.remove(tableName + SEPARATOR + columnFamilyName);
maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackInMillis : Math.max(maxLookbackInMillis, overriddenMaxLookback);
// The oldest scn is current time - maxLookbackInMillis. Phoenix sets the scan time range
// for scn queries [0, scn). This means that the maxlookback size should be
// maxLookbackInMillis + 1 so that the oldest scn does not return empty row
this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
ttl = cfd.getTimeToLive();
this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttl * 1000;
ttl *= 1000;
this.maxLookbackWindowStart = Math.max(ttlWindowStart, maxLookbackWindowStart);
this.minVersion = cfd.getMinVersions();
this.maxVersion = cfd.getMaxVersions();
this.keepDeletedCells = cfd.getKeepDeletedCells();
familyCount = region.getTableDescriptor().getColumnFamilies().length;
localIndex = columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
emptyCFStore = familyCount == 1 || columnFamilyName.equals(Bytes.toString(emptyCF))
|| localIndex;
phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
}
/**
* Any coprocessors within a JVM can extend the max lookback window for a column family
* by calling this static method.
*/
public static void overrideMaxLookback(String tableName, String columnFamilyName,
long maxLookbackInMillis) {
if (tableName == null || columnFamilyName == null) {
return;
}
Long old = maxLookbackMap.putIfAbsent(tableName + SEPARATOR + columnFamilyName,
maxLookbackInMillis);
if (old != null && old < maxLookbackInMillis) {
maxLookbackMap.put(tableName + SEPARATOR + columnFamilyName, maxLookbackInMillis);
}
}
@Override
public boolean next(List<Cell> result) throws IOException {
boolean hasMore = storeScanner.next(result);
if (!result.isEmpty()) {
phoenixLevelRowCompactor.compact(result, false);
}
return hasMore;
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@Override
public void close() throws IOException {
storeScanner.close();
}
/**
* The context for a given row during compaction. A row may have multiple compaction row
* versions. CompactionScanner uses the same row context for these versions.
*/
static class RowContext {
Cell familyDeleteMarker = null;
Cell familyVersionDeleteMarker = null;
List<Cell> columnDeleteMarkers = null;
int version = 0;
long maxTimestamp;
long minTimestamp;
private void addColumnDeleteMarker(Cell deleteMarker) {
if (columnDeleteMarkers == null) {
columnDeleteMarkers = new ArrayList<>();
columnDeleteMarkers.add(deleteMarker);
return;
}
int i = 0;
// Replace the existing delete marker for the same column
for (Cell cell : columnDeleteMarkers) {
if (cell.getType() == deleteMarker.getType() &&
CellUtil.matchingColumn(cell, deleteMarker)) {
columnDeleteMarkers.remove(i);
break;
}
i++;
}
columnDeleteMarkers.add(deleteMarker);
}
private void retainFamilyDeleteMarker(List<Cell> retainedCells) {
if (familyVersionDeleteMarker != null) {
retainedCells.add(familyVersionDeleteMarker);
// Set it to null so it will be used once
familyVersionDeleteMarker = null;
} else {
retainedCells.add(familyDeleteMarker);
}
}
/**
* Based on the column delete markers decide if the cells should be retained. If a
* deleted cell is retained, the delete marker is also retained.
*/
private void retainCell(Cell cell, List<Cell> retainedCells,
KeepDeletedCells keepDeletedCells, long ttlWindowStart) {
int i = 0;
for (Cell dm : columnDeleteMarkers) {
if (cell.getTimestamp() > dm.getTimestamp()) {
continue;
}
if ((CellUtil.matchingFamily(cell, dm)) &&
CellUtil.matchingQualifier(cell, dm)) {
if (dm.getType() == Cell.Type.Delete) {
// Delete is for deleting a specific cell version. Thus, it can be used
// to delete only one cell.
columnDeleteMarkers.remove(i);
}
if (maxTimestamp >= ttlWindowStart) {
// Inside the TTL window
if (keepDeletedCells != KeepDeletedCells.FALSE ) {
retainedCells.add(cell);
retainedCells.add(dm);
}
} else if (keepDeletedCells == KeepDeletedCells.TTL &&
dm.getTimestamp() >= ttlWindowStart) {
retainedCells.add(cell);
retainedCells.add(dm);
}
return;
}
i++;
}
// No delete marker for this cell
retainedCells.add(cell);
}
/**
* This method finds out the maximum and minimum timestamp of the cells of the next row
* version. Cells are organized into columns based on the pair of family name and column
* qualifier. This means that the delete family markers for a column family will have their
* own column. However, the delete column markers will be packed with the put cells. The cells
* within a column are ordered in descending timestamps.
*/
private void getNextRowVersionTimestamps(LinkedList<LinkedList<Cell>> columns,
byte[] columnFamily) {
maxTimestamp = 0;
minTimestamp = Long.MAX_VALUE;
Cell firstCell;
LinkedList<Cell> deleteColumn = null;
long ts;
for (LinkedList<Cell> column : columns) {
firstCell = column.getFirst();
ts = firstCell.getTimestamp();
if ((firstCell.getType() == Cell.Type.DeleteFamily ||
firstCell.getType() == Cell.Type.DeleteFamilyVersion) &&
CellUtil.matchingFamily(firstCell, columnFamily)) {
deleteColumn = column;
}
if (maxTimestamp < ts) {
maxTimestamp = ts;
}
if (minTimestamp > ts) {
minTimestamp = ts;
}
}
if (deleteColumn != null) {
// A row version do not cross a family delete marker. This means
// min timestamp cannot be lower than the delete markers timestamp
for (Cell cell : deleteColumn) {
ts = cell.getTimestamp();
if (ts < maxTimestamp) {
minTimestamp = ts + 1;
break;
}
}
}
}
}
/**
* HBaseLevelRowCompactor ensures that the cells of a given row are retained according to the
* HBase data retention rules.
*
*/
class HBaseLevelRowCompactor {
/**
* A compaction row version includes the latest put cell versions from each column such that
* the cell versions do not cross delete family markers. In other words, the compaction row
* versions are built from cell versions that are all either before or after the next delete
* family or delete family version maker if family delete markers exist. Also, when the cell
* timestamps are ordered for a given row version, the difference between two subsequent
* timestamps has to be less than the ttl value. This is taken care before calling
* HBaseLevelRowCompactor#compact().
*
* Compaction row versions are disjoint sets. A compaction row version does not share a cell
* version with the next compaction row version. A compaction row version includes at most
* one cell version from a column.
*
* After creating the first compaction row version, we form the next compaction row version
* from the remaining cell versions.
*
* Compaction row versions are used for compaction purposes to efficiently determine which
* cell versions to retain based on the HBase data retention parameters.
*/
class CompactionRowVersion {
// Cells included in the row version
List<Cell> cells = new ArrayList<>();
// The timestamp of the row version
long ts = 0;
// The version of a row version. It is the minimum of the versions of the cells included
// in the row version
int version = 0;
@Override
public String toString() {
StringBuilder output = new StringBuilder();
output.append("Cell count: " + cells.size() + "\n");
for (Cell cell : cells) {
output.append(cell + "\n");
}
output.append("ts:" + ts + " v:" + version);
return output.toString();
}
}
/**
* Decide if compaction row versions inside the TTL window should be retained. The
* versions are retained if one of the following conditions holds
* 1. The compaction row version is alive and its version is less than VERSIONS
* 2. The compaction row version is deleted and KeepDeletedCells is TTL
* 3. The compaction row version is deleted, its version is less than MIN_VERSIONS and
* KeepDeletedCells is TRUE
*
*/
private void retainInsideTTLWindow(CompactionRowVersion rowVersion, RowContext rowContext,
List<Cell> retainedCells) {
if (rowContext.familyDeleteMarker == null && rowContext.familyVersionDeleteMarker == null) {
// The compaction row version is alive
if (rowVersion.version < maxVersion) {
// Rule 1
retainCells(rowVersion, rowContext, retainedCells);
}
} else {
// Deleted
if ((rowVersion.version < maxVersion && keepDeletedCells == KeepDeletedCells.TRUE) ||
keepDeletedCells == KeepDeletedCells.TTL) {
// Retain based on rule 2 or 3
retainCells(rowVersion, rowContext, retainedCells);
rowContext.retainFamilyDeleteMarker(retainedCells);
}
}
}
/**
* Decide if compaction row versions outside the TTL window should be retained. The
* versions are retained if one of the following conditions holds
*
* 1. Live row versions less than MIN_VERSIONS are retained
* 2. Delete row versions whose delete markers are inside the TTL window and
* KeepDeletedCells is TTL are retained
*/
private void retainOutsideTTLWindow(CompactionRowVersion rowVersion, RowContext rowContext,
List<Cell> retainedCells) {
if (rowContext.familyDeleteMarker == null
&& rowContext.familyVersionDeleteMarker == null) {
// Live compaction row version
if (rowVersion.version < minVersion) {
// Rule 1
retainCells(rowVersion, rowContext, retainedCells);
}
} else {
// Deleted compaction row version
if (keepDeletedCells == KeepDeletedCells.TTL && (
(rowContext.familyVersionDeleteMarker != null &&
rowContext.familyVersionDeleteMarker.getTimestamp() > ttlWindowStart) ||
(rowContext.familyDeleteMarker != null &&
rowContext.familyDeleteMarker.getTimestamp() > ttlWindowStart)
)) {
// Rule 2
retainCells(rowVersion, rowContext, retainedCells);
rowContext.retainFamilyDeleteMarker(retainedCells);
}
}
}
private void retainCells(CompactionRowVersion rowVersion, RowContext rowContext,
List<Cell> retainedCells) {
if (rowContext.columnDeleteMarkers == null) {
retainedCells.addAll(rowVersion.cells);
return;
}
for (Cell cell : rowVersion.cells) {
rowContext.retainCell(cell, retainedCells, keepDeletedCells, ttlWindowStart);
}
}
/**
* Form the next compaction row version by picking (removing) the first cell from each
* column. Put cells are used to form the next compaction row version. Delete markers
* are added to the row context which are processed to decide which row versions
* or cell version to delete.
*/
private void formNextCompactionRowVersion(LinkedList<LinkedList<Cell>> columns,
RowContext rowContext, List<Cell> retainedCells) {
CompactionRowVersion rowVersion = new CompactionRowVersion();
rowContext.getNextRowVersionTimestamps(columns, storeColumnFamily);
rowVersion.ts = rowContext.maxTimestamp;
rowVersion.version = rowContext.version++;
for (LinkedList<Cell> column : columns) {
Cell cell = column.getFirst();
if (column.getFirst().getTimestamp() < rowContext.minTimestamp) {
continue;
}
if (cell.getType() == Cell.Type.DeleteFamily) {
if (cell.getTimestamp() >= rowContext.maxTimestamp) {
rowContext.familyDeleteMarker = cell;
column.removeFirst();
}
continue;
}
else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
if (cell.getTimestamp() >= rowContext.maxTimestamp) {
rowContext.familyVersionDeleteMarker = cell;
column.removeFirst();
}
continue;
}
column.removeFirst();
if (cell.getType() == Cell.Type.DeleteColumn ||
cell.getType() == Cell.Type.Delete) {
rowContext.addColumnDeleteMarker(cell);
continue;
}
rowVersion.cells.add(cell);
}
if (rowVersion.cells.isEmpty()) {
return;
}
if (rowVersion.ts >= ttlWindowStart) {
retainInsideTTLWindow(rowVersion, rowContext, retainedCells);
} else {
retainOutsideTTLWindow(rowVersion, rowContext, retainedCells);
}
}
private void formCompactionRowVersions(LinkedList<LinkedList<Cell>> columns,
List<Cell> result) {
RowContext rowContext = new RowContext();
while (!columns.isEmpty()) {
formNextCompactionRowVersion(columns, rowContext, result);
// Remove the columns that are empty
Iterator<LinkedList<Cell>> iterator = columns.iterator();
while (iterator.hasNext()) {
LinkedList<Cell> column = iterator.next();
if (column.isEmpty()) {
iterator.remove();
}
}
}
}
/**
* Group the cells that are ordered lexicographically into columns based on
* the pair of family name and column qualifier. While doing that also add the delete
* markers to a separate list.
*/
private void formColumns(List<Cell> result, LinkedList<LinkedList<Cell>> columns,
List<Cell> deleteMarkers) {
Cell currentColumnCell = null;
LinkedList<Cell> currentColumn = null;
for (Cell cell : result) {
if (cell.getType() != Cell.Type.Put) {
deleteMarkers.add(cell);
}
if (currentColumnCell == null) {
currentColumn = new LinkedList<>();
currentColumnCell = cell;
currentColumn.add(cell);
} else if (!CellUtil.matchingColumn(cell, currentColumnCell)) {
columns.add(currentColumn);
currentColumn = new LinkedList<>();
currentColumnCell = cell;
currentColumn.add(cell);
} else {
currentColumn.add(cell);
}
}
if (currentColumn != null) {
columns.add(currentColumn);
}
}
/**
* Compacts a single row at the HBase level. The result parameter is the input row and
* modified to be the output of the compaction.
*/
private void compact(List<Cell> result) {
if (result.isEmpty()) {
return;
}
LinkedList<LinkedList<Cell>> columns = new LinkedList<>();
List<Cell> deleteMarkers = new ArrayList<>();
formColumns(result, columns, deleteMarkers);
result.clear();
formCompactionRowVersions(columns, result);
}
}
/**
* PhoenixLevelRowCompactor ensures that the cells of the latest row version and the
* row versions that are visible through the max lookback window are retained including delete
* markers placed after these cells. This is the complete set of cells that Phoenix
* needs for its queries. Beyond these cells, HBase retention rules may require more
* cells to be retained. These cells are identified by the HBase level compaction implemented
* by HBaseLevelRowCompactor.
*
*/
class PhoenixLevelRowCompactor {
/**
* The cells of the row (i.e., result) read from HBase store are lexicographically ordered
* for tables using the key part of the cells which includes row, family, qualifier,
* timestamp and type. The cells belong of a column are ordered from the latest to
* the oldest. The method leverages this ordering and groups the cells into their columns
* based on the pair of family name and column qualifier.
*
* The cells within the max lookback window except the once at the lower edge of the
* max lookback window are retained immediately and not included in the constructed columns.
*/
private void getColumns(List<Cell> result, LinkedList<LinkedList<Cell>> columns,
List<Cell> retainedCells) {
Cell currentColumnCell = null;
LinkedList<Cell> currentColumn = null;
for (Cell cell : result) {
if (cell.getTimestamp() > maxLookbackWindowStart) {
retainedCells.add(cell);
continue;
}
if (currentColumnCell == null) {
currentColumn = new LinkedList<>();
currentColumnCell = cell;
currentColumn.add(cell);
} else if (!CellUtil.matchingColumn(cell, currentColumnCell)) {
columns.add(currentColumn);
currentColumn = new LinkedList<>();
currentColumnCell = cell;
currentColumn.add(cell);
} else {
currentColumn.add(cell);
}
}
if (currentColumn != null) {
columns.add(currentColumn);
}
}
/**
* Close the gap between the two timestamps, max and min, with the minimum number of cells
* from the input list such that the timestamp difference between two cells should
* not more than ttl. The cells that are used to close the gap are added to the output
* list.
*/
private void closeGap(long max, long min, List<Cell> input, List<Cell> output) {
int previous = -1;
long ts;
for (Cell cell : input) {
ts = cell.getTimestamp();
if (ts >= max) {
previous++;
continue;
}
if (previous == -1) {
break;
}
if (max - ts > ttl) {
max = input.get(previous).getTimestamp();
output.add(input.remove(previous));
if (max - min > ttl) {
closeGap(max, min, input, output);
}
return;
}
previous++;
}
}
/**
* Retain the last row version visible through the max lookback window
*/
private void retainCellsOfLastRowVersion(LinkedList<LinkedList<Cell>> columns,
List<Cell> retainedCells) {
if (columns.isEmpty()) {
return;
}
RowContext rowContext = new RowContext();
rowContext.getNextRowVersionTimestamps(columns, storeColumnFamily);
List<Cell> retainedPutCells = new ArrayList<>();
for (LinkedList<Cell> column : columns) {
Cell cell = column.getFirst();
if (cell.getTimestamp() < rowContext.minTimestamp) {
continue;
}
if (cell.getType() == Cell.Type.Put) {
retainedPutCells.add(cell);
} else if (cell.getType() == Cell.Type.DeleteFamily ||
cell.getType() == Cell.Type.DeleteFamilyVersion) {
if (cell.getTimestamp() >= rowContext.maxTimestamp) {
// This means that the row version outside the max lookback window is
// deleted and thus should not be visible to the scn queries
if (cell.getTimestamp() == maxLookbackWindowStart) {
// Include delete markers at maxLookbackWindowStart
retainedCells.add(cell);
}
return;
}
} else if (cell.getTimestamp() == maxLookbackWindowStart) {
// Include delete markers at maxLookbackWindowStart
retainedCells.add(cell);
}
}
if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis + ttl) {
// The row version should not be visible via the max lookback window. Nothing to do
return;
}
retainedCells.addAll(retainedPutCells);
// If the gap between two back to back mutations is more than ttl then the older
// mutation will be considered expired and masked. If the length of the time range of
// a row version is not more than ttl, then we know the cells covered by the row
// version are not apart from each other more than ttl and will not be masked.
if (rowContext.maxTimestamp - rowContext.minTimestamp <= ttl) {
return;
}
// The quick time range check did not pass. We need get at least one empty cell to cover
// the gap so that the row version will not be masked by PhoenixTTLRegionScanner.
List<Cell> emptyCellColumn = null;
for (LinkedList<Cell> column : columns) {
if (ScanUtil.isEmptyColumn(column.getFirst(), emptyCF, emptyCQ)) {
emptyCellColumn = column;
break;
}
}
if (emptyCellColumn == null) {
return;
}
int size = retainedPutCells.size();
long tsArray[] = new long[size];
int i = 0;
for (Cell cell : retainedPutCells) {
tsArray[i++] = cell.getTimestamp();
}
Arrays.sort(tsArray);
for (i = size - 1; i > 0; i--) {
if (tsArray[i] - tsArray[i - 1] > ttl) {
closeGap(tsArray[i], tsArray[i - 1], emptyCellColumn, retainedCells);
}
}
}
private boolean retainCellsForMaxLookback(List<Cell> result, boolean regionLevel,
List<Cell> retainedCells) {
LinkedList<LinkedList<Cell>> columns = new LinkedList<>();
getColumns(result, columns, retainedCells);
long maxTimestamp = 0;
long minTimestamp = Long.MAX_VALUE;
long ts;
for (LinkedList<Cell> column : columns) {
ts = column.getFirst().getTimestamp();
if (ts > maxTimestamp) {
maxTimestamp = ts;
}
ts = column.getLast().getTimestamp();
if (ts < minTimestamp) {
minTimestamp = ts;
}
}
if (compactionTime - maxTimestamp > maxLookbackInMillis + ttl) {
if (!emptyCFStore && !regionLevel) {
// The row version is more than maxLookbackInMillis + ttl old. We cannot decide
// if we should retain it with the store level compaction when the current
// store is not the empty column family store.
return false;
}
return true;
}
// If the time gap between two back to back mutations is more than ttl then we know
// that the row is expired within the time gap.
if (maxTimestamp - minTimestamp > ttl) {
if ((familyCount > 1 && !regionLevel && !localIndex)) {
// When there are more than one column family for a given table and a row
// version constructed at the store level covers a time span larger than ttl,
// we need region level compaction to see if the other stores have more cells
// for any of these large time gaps. A store level compaction may incorrectly
// remove some cells due to a large time gap which may not there at the region
// level.
return false;
}
// We either have one column family or are doing region level compaction. In both
// case, we can safely trim the cells beyond the first time gap larger ttl
int size = result.size();
long tsArray[] = new long[size];
int i = 0;
for (Cell cell : result) {
tsArray[i++] = cell.getTimestamp();
}
Arrays.sort(tsArray);
boolean gapFound = false;
// Since timestamps are sorted in ascending order, traverse them in reverse order
for (i = size - 1; i > 0; i--) {
if (tsArray[i] - tsArray[i - 1] > ttl) {
minTimestamp = tsArray[i];
gapFound = true;
break;
}
}
if (gapFound) {
List<Cell> trimmedResult = new ArrayList<>(size - i);
for (Cell cell : result) {
if (cell.getTimestamp() >= minTimestamp) {
trimmedResult.add(cell);
}
}
columns.clear();
retainedCells.clear();
getColumns(trimmedResult, columns, retainedCells);
}
}
retainCellsOfLastRowVersion(columns, retainedCells);
return true;
}
/**
* Compacts a single row at the Phoenix level. The result parameter is the input row and
* modified to be the output of the compaction process.
*/
private void compact(List<Cell> result, boolean regionLevel) throws IOException {
if (result.isEmpty()) {
return;
}
List<Cell> phoenixResult = new ArrayList<>(result.size());
if (!retainCellsForMaxLookback(result, regionLevel, phoenixResult)) {
if (familyCount == 1 || regionLevel) {
throw new RuntimeException("UNEXPECTED");
}
phoenixResult.clear();
compactRegionLevel(result, phoenixResult);
}
if (maxVersion == 1 && minVersion == 0 && keepDeletedCells == KeepDeletedCells.FALSE) {
// We need to Phoenix level compaction only
Collections.sort(phoenixResult, CellComparator.getInstance());
result.clear();
result.addAll(phoenixResult);
return;
}
// We may need to do retain more cells, and so we need to run HBase level compaction
// too. The result of two compactions will be merged and duplicate cells are removed.
int phoenixResultSize = phoenixResult.size();
List<Cell> hbaseResult = new ArrayList<>(result);
hBaseLevelRowCompactor.compact(hbaseResult);
phoenixResult.addAll(hbaseResult);
Collections.sort(phoenixResult, CellComparator.getInstance());
result.clear();
Cell previousCell = null;
// Eliminate duplicates
for (Cell cell : phoenixResult) {
if (previousCell == null ||
cell.getTimestamp() != previousCell.getTimestamp() ||
cell.getType() != previousCell.getType() ||
!CellUtil.matchingColumn(cell, previousCell)) {
result.add(cell);
}
previousCell = cell;
}
if (result.size() > phoenixResultSize) {
LOGGER.debug("HBase level compaction retained " +
(result.size() - phoenixResultSize) + " more cells");
}
}
private int compareTypes(Cell a, Cell b) {
Cell.Type aType = a.getType();
Cell.Type bType = b.getType();
if (aType == bType) {
return 0;
}
if (aType == Cell.Type.DeleteFamily) {
return -1;
}
if (bType == Cell.Type.DeleteFamily) {
return 1;
}
if (aType == Cell.Type.DeleteFamilyVersion) {
return -1;
}
if (bType == Cell.Type.DeleteFamilyVersion) {
return 1;
}
if (aType == Cell.Type.DeleteColumn) {
return -1;
}
return 1;
}
private int compare(Cell a, Cell b) {
int result;
result = Bytes.compareTo(a.getFamilyArray(), a.getFamilyOffset(),
a.getFamilyLength(),
b.getFamilyArray(), b.getFamilyOffset(), b.getFamilyLength());
if (result != 0) {
return result;
}
result = Bytes.compareTo(a.getQualifierArray(), a.getQualifierOffset(),
a.getQualifierLength(),
b.getQualifierArray(), b.getQualifierOffset(), b.getQualifierLength());
if (result != 0) {
return result;
}
if (a.getTimestamp() > b.getTimestamp()) {
return -1;
}
if (a.getTimestamp() < b.getTimestamp()) {
return 1;
}
return compareTypes(a, b);
}
/**
* The generates the intersection of regionResult and input. The result is the resulting
* intersection.
*/
private void trimRegionResult(List<Cell> regionResult, List<Cell> input,
List<Cell> result) {
if (regionResult.isEmpty()) {
return;
}
int index = 0;
int size = regionResult.size();
int compare;
for (Cell originalCell : input) {
Cell regionCell = regionResult.get(index);
compare = compare(originalCell, regionCell);
while (compare > 0) {
index++;
if (index == size) {
break;
}
regionCell = regionResult.get(index);
compare = compare(originalCell, regionCell);
}
if (compare == 0) {
result.add(originalCell);
index++;
}
if (index == size) {
break;
}
}
}
/**
* This is used only when the Phoenix level compaction cannot be done at the store level.
*/
private void compactRegionLevel(List<Cell> input, List<Cell> result) throws IOException {
byte[] rowKey = CellUtil.cloneRow(input.get(0));
Scan scan = new Scan();
scan.setRaw(true);
scan.readAllVersions();
// compaction + 1 because the upper limit of the time range is not inclusive
scan.setTimeRange(0, compactionTime + 1);
scan.withStartRow(rowKey, true);
scan.withStopRow(rowKey, true);
RegionScanner scanner = region.getScanner(scan);
List<Cell> regionResult = new ArrayList<>(result.size());
scanner.next(regionResult);
scanner.close();
Collections.sort(regionResult, CellComparator.getInstance());
compact(regionResult, true);
result.clear();
trimRegionResult(regionResult, input, result);
}
}
}