blob: 0bd8c7c1ff36c8e1b5ced25241728ebe8b42e570 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.Task;
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
import org.apache.phoenix.hbase.index.parallel.TaskRunner;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
public class IndexRebuildRegionScanner extends BaseRegionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
public static final String NO_EXPECTED_MUTATION = "No expected mutation";
public static final String
ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
private long pageSizeInRows = Long.MAX_VALUE;
private int rowCountPerTask;
private boolean hasMore;
private final int maxBatchSize;
private UngroupedAggregateRegionObserver.MutationList mutations;
private final long maxBatchSizeBytes;
private final long blockingMemstoreSize;
private final byte[] clientVersionBytes;
private byte[] indexMetaData;
private boolean useProto = true;
private Scan scan;
private RegionScanner innerScanner;
private Region region;
private IndexMaintainer indexMaintainer;
private byte[] indexRowKey = null;
private Table indexHTable = null;
private Table outputHTable = null;
private Table resultHTable = null;
private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private boolean verify = false;
private Map<byte[], List<Mutation>> indexKeyToMutationMap;
private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap;
private TaskRunner pool;
private TaskBatch<Boolean> tasks;
private String exceptionMessage;
private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
private RegionCoprocessorEnvironment env;
private HTableFactory hTableFactory;
private int indexTableTTL = 0;
private IndexToolVerificationResult verificationResult;
private boolean isBeforeRebuilt = true;
private boolean partialRebuild = false;
private int singleRowRebuildReturnCode;
private Map<byte[], NavigableSet<byte[]>> familyMap;
private byte[][] viewConstants;
@VisibleForTesting
public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env,
UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
super(innerScanner);
final Configuration config = env.getConfiguration();
if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
} else {
partialRebuild = true;
}
maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config);
clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
if (indexMetaData == null) {
useProto = false;
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
indexMaintainer = maintainers.get(0);
this.scan = scan;
familyMap = scan.getFamilyMap();
if (familyMap.isEmpty()) {
familyMap = null;
}
this.innerScanner = innerScanner;
this.region = region;
this.env = env;
this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
if (indexRowKey != null) {
setReturnCodeForSingleRowRebuild();
pageSizeInRows = 1;
}
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
if (valueBytes != null) {
verificationResult = new IndexToolVerificationResult();
verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
if (verifyType != IndexTool.IndexVerifyType.NONE) {
verify = true;
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
// Create the following objects only for rebuilds by IndexTool
hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES));
indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
new ThreadPoolBuilder("IndexVerify",
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
}
}
}
private void setReturnCodeForSingleRowRebuild() throws IOException {
try (RegionScanner scanner = region.getScanner(scan)) {
List<Cell> row = new ArrayList<>();
scanner.next(row);
// Check if the data table row we have just scanned matches with the index row key.
// If not, there is no need to build the index row from this data table row,
// and just return zero row count.
if (row.isEmpty()) {
singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue();
} else {
Put put = new Put(CellUtil.cloneRow(row.get(0)));
for (Cell cell : row) {
put.add(cell);
}
if (checkIndexRow(indexRowKey, put)) {
singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
} else {
singleRowRebuildReturnCode = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
}
}
}
}
@Override
public HRegionInfo getRegionInfo() {
return region.getRegionInfo();
}
@Override
public boolean isFilterDone() {
return false;
}
private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName,
byte[] startRow, byte[] stopRow) {
byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
int targetOffset = 0;
// The row key for the result table : timestamp | index table name | datable table region name |
// scan start row | scan stop row
byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length +
startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length];
Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
targetOffset += keyPrefix.length;
Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
targetOffset += indexTableName.length;
Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length);
targetOffset += regionName.length;
Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length);
targetOffset += startRow.length;
Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length);
return rowKey;
}
private void logToIndexToolResultTable() throws IOException {
long scanMaxTs = scan.getTimeRange().getMax();
byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexHTable.getName().toBytes(),
Bytes.toBytes(region.getRegionInfo().getRegionNameAsString()), scan.getStartRow(), scan.getStopRow());
Put put = new Put(rowKey);
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.rebuiltIndexRowCount)));
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
verifyType == IndexTool.IndexVerifyType.ONLY) {
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.validIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.expiredIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount)));
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.validIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.expiredIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount)));
}
resultHTable.put(put);
}
@Override
public void close() throws IOException {
innerScanner.close();
if (verify) {
try {
logToIndexToolResultTable();
} finally {
this.pool.stop("IndexRebuildRegionScanner is closing");
hTableFactory.shutdown();
indexHTable.close();
outputHTable.close();
resultHTable.close();
}
}
}
private void setMutationAttributes(Mutation m, byte[] uuidValue) {
m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
// Since we're replaying existing mutations, it makes no sense to write them to the wal
m.setDurability(Durability.SKIP_WAL);
}
private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
ungroupedAggregateRegionObserver.checkForRegionClosing();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutationList, blockingMemstoreSize);
uuidValue = ServerCacheClient.generateId();
mutationList.clear();
}
return uuidValue;
}
@VisibleForTesting
public int setIndexTableTTL(int ttl) {
indexTableTTL = ttl;
return 0;
}
@VisibleForTesting
public int setIndexMaintainer(IndexMaintainer indexMaintainer) {
this.indexMaintainer = indexMaintainer;
return 0;
}
@VisibleForTesting
public int setIndexKeyToMutationMap(Map<byte[], List<Mutation>> newTreeMap) {
this.indexKeyToMutationMap = newTreeMap;
return 0;
}
public static class SimpleValueGetter implements ValueGetter {
final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
final Put put;
public SimpleValueGetter(final Put put) {
this.put = put;
}
@Override
public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
if (cellList == null || cellList.isEmpty()) {
return null;
}
Cell cell = cellList.get(0);
valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
return valuePtr;
}
@Override
public byte[] getRowKey() {
return put.getRow();
}
}
public byte[] getIndexRowKey(final Put dataRow) throws IOException {
ValueGetter valueGetter = new SimpleValueGetter(dataRow);
byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
null, null, HConstants.LATEST_TIMESTAMP);
return builtIndexRowKey;
}
private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
byte[] builtIndexRowKey = getIndexRowKey(put);
if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
indexRowKey, 0, indexRowKey.length) != 0) {
return false;
}
return true;
}
@VisibleForTesting
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg) throws IOException {
logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
errorMsg, null, null);
}
private static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
byte[] rowKey;
int targetOffset = 0;
// The row key for the output table : timestamp | index table name | data row key
rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length];
Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
targetOffset += keyPrefix.length;
Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
targetOffset += indexTableName.length;
Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length);
return rowKey;
}
@VisibleForTesting
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
final int PREFIX_LENGTH = 3;
final int TOTAL_PREFIX_LENGTH = 6;
final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
long scanMaxTs = scan.getTimeRange().getMax();
byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), dataRowKey);
Put put = new Put(rowKey);
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
scanMaxTs, region.getRegionInfo().getTable().getName());
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
scanMaxTs, indexMaintainer.getIndexTableName());
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
scanMaxTs, indexRowKey);
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
byte[] errorMessageBytes;
if (expectedValue != null) {
errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
TOTAL_PREFIX_LENGTH];
Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
int length = errorMsg.length();
Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
length += PREFIX_LENGTH;
Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
length += expectedValue.length;
Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
length += PREFIX_LENGTH;
Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
} else {
errorMessageBytes = Bytes.toBytes(errorMsg);
}
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
if (isBeforeRebuilt) {
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
} else {
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
}
outputHTable.put(put);
}
private static long getMaxTimestamp(Mutation m) {
long ts = 0;
for (List<Cell> cells : m.getFamilyCellMap().values()) {
if (cells == null) {
continue;
}
for (Cell cell : cells) {
if (ts < cell.getTimestamp()) {
ts = cell.getTimestamp();
}
}
}
return ts;
}
private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
List<Cell> cellList = m.getFamilyCellMap().get(family);
if (cellList == null) {
return null;
}
for (Cell cell : cellList) {
if (CellUtil.matchingQualifier(cell, qualifier)) {
return cell;
}
}
return null;
}
private boolean isMatchingMutation(Mutation expected, Mutation actual, int iteration) throws IOException {
if (getTimestamp(expected) != getTimestamp(actual)) {
String errorMsg = "Not matching timestamp";
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
errorMsg, null, null);
return false;
}
int expectedCellCount = 0;
for (List<Cell> cells : expected.getFamilyCellMap().values()) {
if (cells == null) {
continue;
}
for (Cell expectedCell : cells) {
expectedCellCount++;
byte[] family = CellUtil.cloneFamily(expectedCell);
byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
Cell actualCell = getCell(actual, family, qualifier);
if (actualCell == null ||
!CellUtil.matchingType(expectedCell, actualCell)) {
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
return false;
}
if (!CellUtil.matchingValue(actualCell, expectedCell)) {
String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
return false;
}
}
}
int actualCellCount = 0;
for (List<Cell> cells : actual.getFamilyCellMap().values()) {
if (cells == null) {
continue;
}
actualCellCount += cells.size();
}
if (expectedCellCount != actualCellCount) {
String errorMsg = "Index has extra cells (in iteration " + iteration + ")";
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
errorMsg);
return false;
}
return true;
}
private boolean isVerified(Put mutation) throws IOException {
List<Cell> cellList = mutation.get(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
indexMaintainer.getEmptyKeyValueQualifier());
Cell cell = (cellList != null && !cellList.isEmpty()) ? cellList.get(0) : null;
if (cell == null) {
return false;
}
if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
VERIFIED_BYTES, 0, VERIFIED_BYTES.length) == 0) {
return true;
}
return false;
}
/**
* This is to reorder the mutations in descending order by the tuple of timestamp and mutation type where
* delete comes before put
*/
public static final Comparator<Mutation> MUTATION_TS_DESC_COMPARATOR = new Comparator<Mutation>() {
@Override
public int compare(Mutation o1, Mutation o2) {
long ts1 = getTimestamp(o1);
long ts2 = getTimestamp(o2);
if (ts1 > ts2) {
return -1;
}
if (ts1 < ts2) {
return 1;
}
if (o1 instanceof Delete && o2 instanceof Put) {
return -1;
}
if (o1 instanceof Put && o2 instanceof Delete) {
return 1;
}
return 0;
}
};
private boolean isDeleteFamily(Mutation mutation) {
for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
for (Cell cell : cells) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamily) {
return true;
}
}
}
return false;
}
private boolean isDeleteFamilyVersion(Mutation mutation) {
for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
for (Cell cell : cells) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamilyVersion) {
return true;
}
}
}
return false;
}
@VisibleForTesting
public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
Put put = null;
Delete del = null;
for (Cell cell : indexRow.rawCells()) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
}
put.add(cell);
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
}
del.addDeleteMarker(cell);
}
}
return getMutationsWithSameTS(put, del);
}
/**
* In this method, the actual list is repaired in memory using the expected list which is actually the output of
* rebuilding the index table row. The result of this repair is used only for verification.
*/
private void repairActualMutationList(List<Mutation> actualMutationList, List<Mutation> expectedMutationList)
throws IOException {
// Find the first (latest) actual unverified put mutation
List<Mutation> repairedMutationList = new ArrayList<>(expectedMutationList.size());
for (Mutation actual : actualMutationList) {
if (actual instanceof Put && !isVerified((Put) actual)) {
long ts = getTimestamp(actual);
int expectedIndex;
int expectedListSize = expectedMutationList.size();
for (expectedIndex = 0; expectedIndex < expectedListSize; expectedIndex++) {
if (getTimestamp(expectedMutationList.get(expectedIndex)) <= ts) {
if (expectedIndex > 0) {
expectedIndex--;
}
break;
}
}
if (expectedIndex == expectedListSize) {
continue;
}
for (; expectedIndex < expectedListSize; expectedIndex++) {
Mutation mutation = expectedMutationList.get(expectedIndex);
if (mutation instanceof Put) {
mutation = new Put((Put) mutation);
} else {
mutation = new Delete((Delete) mutation);
}
repairedMutationList.add(mutation);
}
// Since we repair the entire history, there is no need to more than once
break;
}
}
if (repairedMutationList.isEmpty()) {
return;
}
actualMutationList.addAll(repairedMutationList);
Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
}
private void cleanUpActualMutationList(List<Mutation> actualMutationList)
throws IOException {
Iterator<Mutation> iterator = actualMutationList.iterator();
Mutation previous = null;
while (iterator.hasNext()) {
Mutation mutation = iterator.next();
if ((mutation instanceof Put && !isVerified((Put) mutation)) ||
(mutation instanceof Delete && isDeleteFamilyVersion(mutation))) {
iterator.remove();
} else {
if (previous != null && getTimestamp(previous) == getTimestamp(mutation) &&
((previous instanceof Put && mutation instanceof Put) ||
previous instanceof Delete && mutation instanceof Delete)) {
iterator.remove();
} else {
previous = mutation;
}
}
}
}
/**
* There are two types of verification: without repair and with repair. Without-repair verification is done before
* or after index rebuild. It is done before index rebuild to identify the rows to be rebuilt. It is done after
* index rebuild to verify the rows that have been rebuilt. With-repair verification can be done anytime using
* the “-v ONLY” option to check the consistency of the index table. Note that with-repair verification simulates
* read repair in-memory for the purpose of verification, but does not actually repair the data in the index.
*
* Unverified Rows
*
* For each mutable data table mutation during regular data table updates, two operations are done on the data table.
* One is to read the existing row state, and the second is to update the data table for this row. The processing of
* concurrent data mutations are serialized once for reading the existing row states, and then serialized again
* for updating the data table. In other words, they go through locking twice, i.e., [lock, read, unlock] and
* [lock, write, unlock]. Because of this two phase locking, for a pair of concurrent mutations (for the same row),
* the same row state can be read from the data table. This means the same existing index row can be made unverified
* twice with different timestamps, one for each concurrent mutation. These unverified mutations can be repaired
* from the data table later during HBase scans using the index read repair process. This is one of the reasons
* for having extra unverified rows in the index table. The other reason is the data table write failures.
* When a data table write fails, it leaves an unverified index row behind. These rows are never returned to clients,
* instead they are repaired, which means either they are rebuilt from their data table rows or they are deleted if
* their data table rows do not exist.
*
* Delete Family Version Markers
*
* The family version delete markers are generated by the read repair to remove extra unverified rows. They only
* show up in the actual mutation list since they are not generated for regular table updates or index rebuilds.
* For the verification purpose, these delete markers can be treated as extra unverified rows and can be safely
* skipped.
*
* Delete Family Markers
* Delete family markers are generated during read repair, regular table updates and index rebuilds to delete index
* table rows. The read repair generates them to delete extra unverified rows. During regular table updates or
* index rebuilds, the delete family markers are used to delete index rows due to data table row deletes or
* data table row overwrites.
*
* Verification Algorithm
*
* IndexTool verification generates an expected list of index mutations from the data table rows and uses this list
* to check if index table rows are consistent with the data table.
*
* The expect list is generated using the index rebuild algorithm. This mean for a given row, the list can include
* a number of put and delete mutations such that the followings hold:
*
* Every mutation will include a set of cells with the same timestamp
* Every mutation has a different timestamp
* A delete mutation will include only delete family cells and it is for deleting the entire row and its versions
* Every put mutation is verified
*
* For both verification types, after the expected list of index mutations is constructed for a given data table,
* another list called the actual list of index mutations is constructed by reading the index table row using HBase
* raw scan and all versions of the cells of the row are retrieved.
*
* As in the construction for the expected list, the cells are grouped into a put and a delete set. The put and
* delete sets for a given row are further grouped based on their timestamps into put and delete mutations such that
* all the cells in a mutation have the timestamps. The put and delete mutations are then sorted within a single
* list. Mutations in this list are sorted in ascending order of their timestamp. This list is the actual list.
*
* For the without-repair verification, unverified mutations and family version delete markers are removed from
* the actual list and then the list is compared with the expected list.
*
* In case of the with-repair verification, the actual list is first repaired, then unverified mutations and family
* version delete markers are removed from the actual list and finally the list is compared with the expected list.
*
* The actual list is repaired as follows: Every unverified mutation is repaired using the method read repair uses.
* However, instead of going through actual repair implementation, the expected mutations are used for repair.
*/
@VisibleForTesting
public boolean verifySingleIndexRow(Result indexRow, IndexToolVerificationResult.PhaseResult verificationPhaseResult)
throws IOException {
List<Mutation> expectedMutationList = indexKeyToMutationMap.get(indexRow.getRow());
if (expectedMutationList == null) {
throw new DoNotRetryIOException(NO_EXPECTED_MUTATION);
}
List<Mutation> actualMutationList = prepareActualIndexMutations(indexRow);
if (actualMutationList == null || actualMutationList.isEmpty()) {
throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
}
Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
if (verifyType == IndexTool.IndexVerifyType.ONLY) {
repairActualMutationList(actualMutationList, expectedMutationList);
}
cleanUpActualMutationList(actualMutationList);
long currentTime = EnvironmentEdgeManager.currentTime();
int actualIndex = 0;
int expectedIndex = 0;
int matchingCount = 0;
int expectedSize = expectedMutationList.size();
int actualSize = actualMutationList.size();
Mutation expected = null;
Mutation previousExpected;
Mutation actual;
while (expectedIndex < expectedSize && actualIndex <actualSize) {
previousExpected = expected;
expected = expectedMutationList.get(expectedIndex);
// Check if cell expired as per the current server's time and data table ttl
// Index table should have the same ttl as the data table, hence we might not
// get a value back from index if it has already expired between our rebuild and
// verify
// TODO: have a metric to update for these cases
if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
verificationPhaseResult.expiredIndexRowCount++;
return true;
}
actual = actualMutationList.get(actualIndex);
if (expected instanceof Put) {
if (previousExpected instanceof Delete) {
// Between an expected delete and put, there can be one or more deletes due to
// concurrent mutations or data table write failures. Skip all of them if any
while (getTimestamp(actual) > getTimestamp(expected) && (actual instanceof Delete)) {
actualIndex++;
if (actualIndex == actualSize) {
break;
}
actual = actualMutationList.get(actualIndex);
}
if (actualIndex == actualSize) {
break;
}
}
if (actual instanceof Delete) {
break;
}
if (isMatchingMutation(expected, actual, expectedIndex)) {
expectedIndex++;
actualIndex++;
matchingCount++;
continue;
}
} else { // expected instanceof Delete
// Between put and delete, delete and delete, or before the first delete, there can be other deletes.
// Skip all of them if any
while (getTimestamp(actual) > getTimestamp(expected) && actual instanceof Delete) {
actualIndex++;
if (actualIndex == actualSize) {
break;
}
actual = actualMutationList.get(actualIndex);
}
if (actualIndex == actualSize) {
break;
}
if (getTimestamp(actual) == getTimestamp(expected) &&
(actual instanceof Delete && isDeleteFamily(actual))) {
expectedIndex++;
actualIndex++;
matchingCount++;
continue;
}
}
if (matchingCount > 0) {
break;
}
verificationPhaseResult.invalidIndexRowCount++;
return false;
}
if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
if (matchingCount > 0) {
if (verifyType != IndexTool.IndexVerifyType.ONLY) {
// We do not consider this as a verification issue but log it for further information.
// This may happen due to compaction
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got "
+ actualMutationList.size();
logToIndexToolOutputTable(dataKey, indexRow.getRow(),
getTimestamp(expectedMutationList.get(0)),
getTimestamp(actualMutationList.get(0)), errorMsg);
}
} else {
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
String errorMsg = "Not matching index row";
logToIndexToolOutputTable(dataKey, indexRow.getRow(),
getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
verificationPhaseResult.invalidIndexRowCount++;
return false;
}
}
verificationPhaseResult.validIndexRowCount++;
return true;
}
private static long getMaxTimestamp(Pair<Put, Delete> pair) {
Put put = pair.getFirst();
long ts1 = 0;
if (put != null) {
ts1 = getMaxTimestamp(put);
}
Delete del = pair.getSecond();
long ts2 = 0;
if (del != null) {
ts1 = getMaxTimestamp(del);
}
return (ts1 > ts2) ? ts1 : ts2;
}
private void verifyIndexRows(List<KeyRange> keys,
IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
List<KeyRange> invalidKeys = new ArrayList<>();
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
Scan indexScan = new Scan();
indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
scanRanges.initializeScan(indexScan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
indexScan.setRaw(true);
indexScan.setMaxVersions();
try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
KeyRange keyRange = PVarbinary.INSTANCE.getKeyRange(result.getRow());
if (!keys.contains(keyRange)) {
continue;
}
if (!verifySingleIndexRow(result, verificationPhaseResult)) {
invalidKeys.add(keyRange);
}
keys.remove(keyRange);
}
} catch (Throwable t) {
ServerUtil.throwIOException(indexHTable.getName().toString(), t);
}
// Check if any expected rows from index(which we didn't get) are already expired due to TTL
// TODO: metrics for expired rows
if (!keys.isEmpty()) {
Iterator<KeyRange> itr = keys.iterator();
long currentTime = EnvironmentEdgeManager.currentTime();
while(itr.hasNext()) {
KeyRange keyRange = itr.next();
byte[] key = keyRange.getLowerRange();
List<Mutation> mutationList = indexKeyToMutationMap.get(key);
if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
itr.remove();
verificationPhaseResult.expiredIndexRowCount++;
}
}
}
if (keys.size() > 0) {
for (KeyRange keyRange : keys) {
String errorMsg = "Missing index row";
byte[] key = keyRange.getLowerRange();
List<Mutation> mutationList = indexKeyToMutationMap.get(key);
if (mutationList.get(mutationList.size() - 1) instanceof Delete) {
continue;
}
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
logToIndexToolOutputTable(dataKey,
keyRange.getLowerRange(),
getMaxTimestamp(dataKeyToMutationMap.get(dataKey)),
getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg);
verificationPhaseResult.missingIndexRowCount++;
}
}
keys.addAll(invalidKeys);
}
private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
if (indexTableTTL == HConstants.FOREVER) {
return false;
}
return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
}
private void addVerifyTask(final List<KeyRange> keys,
final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
tasks.add(new Task<Boolean>() {
@Override
public Boolean call() throws Exception {
try {
if (Thread.currentThread().isInterrupted()) {
exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
verifyIndexRows(keys, verificationPhaseResult);
} catch (Exception e) {
throw e;
}
return Boolean.TRUE;
}
});
}
private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / rowCountPerTask;
tasks = new TaskBatch<>(taskCount);
List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount);
List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
listOfKeyRangeList.add(keys);
IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
for (byte[] indexKey: indexKeyToMutationMap.keySet()) {
keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
if (keys.size() == rowCountPerTask) {
addVerifyTask(keys, perTaskVerificationPhaseResult);
keys = new ArrayList<>(rowCountPerTask);
listOfKeyRangeList.add(keys);
perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
}
}
if (keys.size() > 0) {
addVerifyTask(keys, perTaskVerificationPhaseResult);
}
List<Boolean> taskResultList = null;
try {
LOGGER.debug("Waiting on index verify tasks to complete...");
taskResultList = this.pool.submitUninterruptible(tasks);
} catch (ExecutionException e) {
throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
} catch (EarlyExitFailure e) {
throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
}
for (Boolean result : taskResultList) {
if (result == null) {
// there was a failure
throw new IOException(exceptionMessage);
}
}
for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
verificationPhaseResult.add(result);
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
Map<byte[], Pair<Put, Delete>> newDataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (List<KeyRange> keyRangeList : listOfKeyRangeList) {
for (KeyRange keyRange : keyRangeList) {
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
newDataKeyToMutationMap.put(dataKey, dataKeyToMutationMap.get(dataKey));
}
}
dataKeyToMutationMap.clear();
dataKeyToMutationMap = newDataKeyToMutationMap;
}
}
private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
byte[] uuidValue = ServerCacheClient.generateId();
UngroupedAggregateRegionObserver.MutationList currentMutationList =
new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
Put put = null;
for (Mutation mutation : mutationList) {
if (mutation instanceof Put) {
if (put != null) {
// back to back put, i.e., no delete in between. we can commit the previous put
uuidValue = commitIfReady(uuidValue, currentMutationList);
}
currentMutationList.add(mutation);
setMutationAttributes(mutation, uuidValue);
put = (Put)mutation;
} else {
currentMutationList.add(mutation);
setMutationAttributes(mutation, uuidValue);
uuidValue = commitIfReady(uuidValue, currentMutationList);
put = null;
}
}
if (!currentMutationList.isEmpty()) {
ungroupedAggregateRegionObserver.checkForRegionClosing();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, currentMutationList, blockingMemstoreSize);
}
}
private void verifyAndOrRebuildIndex() throws IOException {
IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult();
nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size();
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
// For these options we start with rebuilding index rows
rebuildIndexRows(mutations);
nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size();
isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.NONE) {
return;
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
verifyType == IndexTool.IndexVerifyType.ONLY) {
IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
// For these options we start with verifying index rows
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.before.add(verificationPhaseResult);
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
// For these options, we have identified the rows to be rebuilt and now need to rebuild them
// At this point, dataKeyToDataPutMap includes mapping only for the rows to be rebuilt
mutations.clear();
for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
if (entry.getValue().getFirst() != null) {
mutations.add(entry.getValue().getFirst());
}
if (entry.getValue().getSecond() != null) {
mutations.add(entry.getValue().getSecond());
}
}
rebuildIndexRows(mutations);
nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size();
isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
// We have rebuilt index row and now we need to verify them
IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
indexKeyToMutationMap.clear();
for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond());
}
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.after.add(verificationPhaseResult);
}
verificationResult.add(nextVerificationResult);
}
private boolean isColumnIncluded(Cell cell) {
byte[] family = CellUtil.cloneFamily(cell);
if (!familyMap.containsKey(family)) {
return false;
}
NavigableSet<byte[]> set = familyMap.get(family);
if (set == null || set.isEmpty()) {
return true;
}
byte[] qualifier = CellUtil.cloneQualifier(cell);
return set.contains(qualifier);
}
public static long getTimestamp(Mutation m) {
for (List<Cell> cells : m.getFamilyCellMap().values()) {
for (Cell cell : cells) {
return cell.getTimestamp();
}
}
throw new IllegalStateException("No cell found");
}
/**
* This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where
* put comes before delete
*/
public static final Comparator<Mutation> MUTATION_TS_COMPARATOR = new Comparator<Mutation>() {
@Override
public int compare(Mutation o1, Mutation o2) {
long ts1 = getTimestamp(o1);
long ts2 = getTimestamp(o2);
if (ts1 < ts2) {
return -1;
}
if (ts1 > ts2) {
return 1;
}
if (o1 instanceof Put && o2 instanceof Delete) {
return -1;
}
if (o1 instanceof Delete && o2 instanceof Put) {
return 1;
}
return 0;
}
};
public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) {
List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(2);
if (put != null) {
mutationList.add(put);
}
if (del != null) {
mutationList.add(del);
}
// Group the cells within a mutation based on their timestamps and create a separate mutation for each group
mutationList = (List<Mutation>) IndexManagementUtil.flattenMutationsByTimestamp(mutationList);
// Reorder the mutations on the same row so that delete comes before put when they have the same timestamp
Collections.sort(mutationList, MUTATION_TS_COMPARATOR);
return mutationList;
}
private static Put prepareIndexPutForRebuid(IndexMaintainer indexMaintainer, ImmutableBytesPtr rowKeyPtr,
ValueGetter mergedRowVG, long ts)
throws IOException {
Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
mergedRowVG, rowKeyPtr, ts, null, null);
if (indexPut == null) {
// No covered column. Just prepare an index row with the empty column
byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr,
null, null, HConstants.LATEST_TIMESTAMP);
indexPut = new Put(indexRowKey);
} else {
removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
indexMaintainer.getEmptyKeyValueQualifier());
}
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
return indexPut;
}
public static void removeColumn(Put put, Cell deleteCell) {
byte[] family = CellUtil.cloneFamily(deleteCell);
List<Cell> cellList = put.getFamilyCellMap().get(family);
if (cellList == null) {
return;
}
Iterator<Cell> cellIterator = cellList.iterator();
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next();
if (CellUtil.matchingQualifier(cell, deleteCell)) {
cellIterator.remove();
if (cellList.isEmpty()) {
put.getFamilyCellMap().remove(family);
}
return;
}
}
}
public static void apply(Put destination, Put source) throws IOException {
for (List<Cell> cells : source.getFamilyCellMap().values()) {
for (Cell cell : cells) {
if (!destination.has(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell))) {
destination.add(cell);
}
}
}
}
public static Put applyNew(Put destination, Put source) throws IOException {
Put next = new Put(destination);
apply(next, source);
return next;
}
private static void applyDeleteOnPut(Delete del, Put put) throws IOException {
for (List<Cell> cells : del.getFamilyCellMap().values()) {
for (Cell cell : cells) {
switch ((KeyValue.Type.codeToType(cell.getTypeByte()))) {
case DeleteFamily:
put.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
break;
case DeleteColumn:
removeColumn(put, cell);
break;
default:
// We do not expect this can happen
throw new DoNotRetryIOException("Single version delete marker in data mutation " +
del);
}
}
}
}
/**
* Generate the index update for a data row from the mutation that are obtained by merging the previous data row
* state with the pending row mutation for index rebuild. This method is called only for global indexes.
* pendingMutations is a sorted list of data table mutations that are used to replay index table mutations.
* This list is sorted in ascending order by the tuple of row key, timestamp and mutation type where delete comes
* after put.
*/
public static List<Mutation> prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer,
Put dataPut, Delete dataDel) throws IOException {
List<Mutation> dataMutations = getMutationsWithSameTS(dataPut, dataDel);
List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
// The row key ptr of the data table row for which we will build index rows here
ImmutableBytesPtr rowKeyPtr = (dataPut != null) ? new ImmutableBytesPtr(dataPut.getRow()) :
new ImmutableBytesPtr(dataDel.getRow());
// Start with empty data table row
Put currentDataRowState = null;
// The index row key corresponding to the current data row
byte[] indexRowKeyForCurrentDataRow = null;
int dataMutationListSize = dataMutations.size();
for (int i = 0; i < dataMutationListSize; i++) {
Mutation mutation = dataMutations.get(i);
long ts = getTimestamp(mutation);
if (mutation instanceof Put) {
if (i < dataMutationListSize - 1) {
// If there is a pair of a put and delete mutation with the same timestamp then apply the delete
// mutation on the put. If the delete mutation deletes all the cells in the put mutation, the family
// cell map of the put mutation becomes empty and the mutation is ignored later
Mutation nextMutation = dataMutations.get(i + 1);
if (getTimestamp(nextMutation) == ts && nextMutation instanceof Delete) {
applyDeleteOnPut((Delete) nextMutation, (Put) mutation);
// Apply the delete mutation on the current data row state too
if (currentDataRowState != null) {
applyDeleteOnPut((Delete) nextMutation, currentDataRowState);
if (currentDataRowState.getFamilyCellMap().size() == 0) {
currentDataRowState = null;
indexRowKeyForCurrentDataRow = null;
}
}
// This increment is to skip the next (delete) mutation as we have already processed it
i++;
}
}
if (mutation.getFamilyCellMap().size() != 0) {
// Add this put on top of the current data row state to get the next data row state
Put nextDataRow = (currentDataRowState == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRowState);
ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
indexMutations.add(indexPut);
// Delete the current index row if the new index key is different than the current one
if (currentDataRowState != null) {
if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
indexMutations.add(del);
}
}
// For the next iteration of the for loop
currentDataRowState = nextDataRow;
indexRowKeyForCurrentDataRow = indexPut.getRow();
} else {
if (currentDataRowState != null) {
Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
indexMutations.add(del);
// For the next iteration of the for loop
currentDataRowState = null;
indexRowKeyForCurrentDataRow = null;
}
}
} else { // mutation instanceof Delete
if (currentDataRowState != null) {
// We apply delete mutations only on the current data row state to obtain the next data row state.
// For the index table, we are only interested in if the index row should be deleted or not.
// There is no need to apply column deletes to index rows since index rows are always full rows
// and all the cells in an index row have the same timestamp value. Because of this index rows
// versions do not share cells.
applyDeleteOnPut((Delete) mutation, currentDataRowState);
Put nextDataRowState = currentDataRowState;
if (nextDataRowState.getFamilyCellMap().size() == 0) {
Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
indexMutations.add(del);
currentDataRowState = null;
indexRowKeyForCurrentDataRow = null;
} else {
ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
indexMutations.add(indexPut);
// Delete the current index row if the new index key is different than the current one
if (indexRowKeyForCurrentDataRow != null) {
if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
indexMutations.add(del);
}
}
indexRowKeyForCurrentDataRow = indexPut.getRow();
}
}
}
}
return indexMutations;
}
@VisibleForTesting
public int prepareIndexMutations(Put put, Delete del) throws IOException {
List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
for (Mutation mutation : indexMutations) {
byte[] indexRowKey = mutation.getRow();
List<Mutation> mutationList = indexKeyToMutationMap.get(indexRowKey);
if (mutationList == null) {
mutationList = new ArrayList<>();
mutationList.add(mutation);
indexKeyToMutationMap.put(indexRowKey, mutationList);
} else {
mutationList.add(mutation);
}
}
return 0;
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (indexRowKey != null &&
singleRowRebuildReturnCode == GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) {
byte[] rowCountBytes =
PLong.INSTANCE.toBytes(Long.valueOf(singleRowRebuildReturnCode));
final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
results.add(aggKeyValue);
return false;
}
Cell lastCell = null;
int rowCount = 0;
region.startRegionOperation();
try {
byte[] uuidValue = ServerCacheClient.generateId();
synchronized (innerScanner) {
do {
List<Cell> row = new ArrayList<Cell>();
hasMore = innerScanner.nextRaw(row);
if (!row.isEmpty()) {
lastCell = row.get(0); // lastCell is any cell from the last visited row
Put put = null;
Delete del = null;
for (Cell cell : row) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
if (!partialRebuild && familyMap != null && !isColumnIncluded(cell)) {
continue;
}
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
}
put.add(cell);
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
}
del.addDeleteMarker(cell);
}
}
if (put == null && del == null) {
continue;
}
// Always add the put first and then delete for a given row. This simplifies the logic in
// IndexRegionObserver
if (put != null) {
mutations.add(put);
}
if (del != null) {
mutations.add(del);
}
if (!verify) {
if (put != null) {
setMutationAttributes(put, uuidValue);
}
if (del != null) {
setMutationAttributes(del, uuidValue);
}
uuidValue = commitIfReady(uuidValue, mutations);
} else {
byte[] dataKey = (put != null) ? put.getRow() : del.getRow();
prepareIndexMutations(put, del);
dataKeyToMutationMap.put(dataKey, new Pair<Put, Delete>(put, del));
}
rowCount++;
}
} while (hasMore && rowCount < pageSizeInRows);
if (!mutations.isEmpty()) {
if (verify) {
verifyAndOrRebuildIndex();
} else {
ungroupedAggregateRegionObserver.checkForRegionClosing();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
}
}
}
} catch (IOException e) {
LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
throw e;
} finally {
region.closeRegionOperation();
mutations.clear();
if (verify) {
dataKeyToMutationMap.clear();
indexKeyToMutationMap.clear();
}
}
if (indexRowKey != null) {
rowCount = singleRowRebuildReturnCode;
}
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
final Cell aggKeyValue;
if (lastCell == null) {
aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
} else {
aggKeyValue = KeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
}
results.add(aggKeyValue);
return hasMore;
}
@Override
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
}