blob: 83c479a8299aeb3c47895976f3ad2170db8d11ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.coprocessor;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.Task;
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
import org.apache.phoenix.hbase.index.parallel.TaskRunner;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class IndexRebuildRegionScanner extends BaseRegionScanner {
public static class VerificationResult {
public static class PhaseResult {
private long validIndexRowCount = 0;
private long expiredIndexRowCount = 0;
private long missingIndexRowCount = 0;
private long invalidIndexRowCount = 0;
public void add(PhaseResult phaseResult) {
validIndexRowCount += phaseResult.validIndexRowCount;
expiredIndexRowCount += phaseResult.expiredIndexRowCount;
missingIndexRowCount += phaseResult.missingIndexRowCount;
invalidIndexRowCount += phaseResult.invalidIndexRowCount;
}
public long getTotalCount() {
return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
}
@Override
public String toString() {
return "PhaseResult{" +
"validIndexRowCount=" + validIndexRowCount +
", expiredIndexRowCount=" + expiredIndexRowCount +
", missingIndexRowCount=" + missingIndexRowCount +
", invalidIndexRowCount=" + invalidIndexRowCount +
'}';
}
}
private long scannedDataRowCount = 0;
private long rebuiltIndexRowCount = 0;
private PhaseResult before = new PhaseResult();
private PhaseResult after = new PhaseResult();
@Override
public String toString() {
return "VerificationResult{" +
"scannedDataRowCount=" + scannedDataRowCount +
", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
", before=" + before +
", after=" + after +
'}';
}
public long getScannedDataRowCount() {
return scannedDataRowCount;
}
public long getRebuiltIndexRowCount() {
return rebuiltIndexRowCount;
}
public long getBeforeRebuildValidIndexRowCount() {
return before.validIndexRowCount;
}
public long getBeforeRebuildExpiredIndexRowCount() {
return before.expiredIndexRowCount;
}
public long getBeforeRebuildInvalidIndexRowCount() {
return before.invalidIndexRowCount;
}
public long getBeforeRebuildMissingIndexRowCount() {
return before.missingIndexRowCount;
}
public long getAfterRebuildValidIndexRowCount() {
return after.validIndexRowCount;
}
public long getAfterRebuildExpiredIndexRowCount() {
return after.expiredIndexRowCount;
}
public long getAfterRebuildInvalidIndexRowCount() {
return after.invalidIndexRowCount;
}
public long getAfterRebuildMissingIndexRowCount() {
return after.missingIndexRowCount;
}
private void addScannedDataRowCount(long count) {
this.scannedDataRowCount += count;
}
private void addRebuiltIndexRowCount(long count) {
this.rebuiltIndexRowCount += count;
}
private void addBeforeRebuildValidIndexRowCount(long count) {
before.validIndexRowCount += count;
}
private void addBeforeRebuildExpiredIndexRowCount(long count) {
before.expiredIndexRowCount += count;
}
private void addBeforeRebuildMissingIndexRowCount(long count) {
before.missingIndexRowCount += count;
}
private void addBeforeRebuildInvalidIndexRowCount(long count) {
before.invalidIndexRowCount += count;
}
private void addAfterRebuildValidIndexRowCount(long count) {
after.validIndexRowCount += count;
}
private void addAfterRebuildExpiredIndexRowCount(long count) {
after.expiredIndexRowCount += count;
}
private void addAfterRebuildMissingIndexRowCount(long count) {
after.missingIndexRowCount += count;
}
private void addAfterRebuildInvalidIndexRowCount(long count) {
after.invalidIndexRowCount += count;
}
private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
return true;
}
return false;
}
private long getValue(Cell cell) {
return Long.parseLong(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
}
private void update(Cell cell) {
if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
addScannedDataRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
addRebuiltIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
addBeforeRebuildValidIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
addBeforeRebuildExpiredIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
addBeforeRebuildMissingIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
addBeforeRebuildInvalidIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
addAfterRebuildValidIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
addAfterRebuildExpiredIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
addAfterRebuildMissingIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
addAfterRebuildInvalidIndexRowCount(getValue(cell));
}
}
public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
// Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
// Search for the place where the trailing 0xFFs start
int offset = rowKeyPrefix.length;
while (offset > 0) {
if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
break;
}
offset--;
}
if (offset == 0) {
// We got an 0xFFFF... (only FFs) stopRow value which is
// the last possible prefix before the end of the table.
// So set it to stop at the 'end of the table'
return HConstants.EMPTY_END_ROW;
}
// Copy the right length of the original
byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
// And increment the last one
newStopRow[newStopRow.length - 1]++;
return newStopRow;
}
public static VerificationResult getVerificationResult(Table hTable, long ts)
throws IOException {
VerificationResult verificationResult = new VerificationResult();
byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
Scan scan = new Scan();
scan.setStartRow(startRowKey);
scan.setStopRow(stopRowKey);
ResultScanner scanner = hTable.getScanner(scan);
for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
verificationResult.update(cell);
}
}
return verificationResult;
}
public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
return false;
}
if (verifyType == IndexTool.IndexVerifyType.ONLY) {
if (before.validIndexRowCount + before.expiredIndexRowCount != scannedDataRowCount) {
return true;
}
}
if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
return true;
}
if (before.validIndexRowCount + before.expiredIndexRowCount +
after.expiredIndexRowCount + after.validIndexRowCount != scannedDataRowCount) {
return true;
}
}
return false;
}
public void add(VerificationResult verificationResult) {
scannedDataRowCount += verificationResult.scannedDataRowCount;
rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
before.add(verificationResult.before);
after.add(verificationResult.after);
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
private long pageSizeInRows = Long.MAX_VALUE;
private int rowCountPerTask;
private boolean hasMore;
private final int maxBatchSize;
private UngroupedAggregateRegionObserver.MutationList mutations;
private final long maxBatchSizeBytes;
private final long blockingMemstoreSize;
private final byte[] clientVersionBytes;
private byte[] indexMetaData;
private boolean useProto = true;
private Scan scan;
private RegionScanner innerScanner;
private Region region;
private IndexMaintainer indexMaintainer;
private byte[] indexRowKey = null;
private Table indexHTable = null;
private Table outputHTable = null;
private Table resultHTable = null;
private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private boolean verify = false;
private Map<byte[], Put> indexKeyToDataPutMap;
private Map<byte[], Put> dataKeyToDataPutMap;
private TaskRunner pool;
private TaskBatch<Boolean> tasks;
private String exceptionMessage;
private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
private RegionCoprocessorEnvironment env;
private int indexTableTTL;
private VerificationResult verificationResult;
private boolean isBeforeRebuilt = true;
IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env,
UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
super(innerScanner);
final Configuration config = env.getConfiguration();
if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
}
maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config);
clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
if (indexMetaData == null) {
useProto = false;
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
if (!scan.isRaw()) {
// No need to deserialize index maintainers when the scan is raw. Raw scan is used by partial rebuilds
List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
indexMaintainer = maintainers.get(0);
}
this.scan = scan;
this.innerScanner = innerScanner;
this.region = region;
this.env = env;
this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
if (valueBytes != null) {
verificationResult = new VerificationResult();
verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
if (verifyType != IndexTool.IndexVerifyType.NONE) {
verify = true;
// Create the following objects only for rebuilds by IndexTool
indexHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
env).getTable(TableName.valueOf(indexMaintainer.getIndexTableName()));
indexTableTTL = indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
outputHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
env).getTable(TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES));
resultHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
env).getTable(TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES));
indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
new ThreadPoolBuilder("IndexVerify",
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
}
}
}
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
}
@Override
public boolean isFilterDone() { return false; }
private void logToIndexToolResultTable() throws IOException {
long scanMaxTs = scan.getTimeRange().getMax();
byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
byte[] regionName = Bytes.toBytes(region.getRegionInfo().getRegionNameAsString());
// The row key for the result table is the max timestamp of the scan + the table region name + scan start row
// + scan stop row
byte[] rowKey = new byte[keyPrefix.length + regionName.length + scan.getStartRow().length +
scan.getStopRow().length];
Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
Bytes.putBytes(rowKey, keyPrefix.length, regionName, 0, regionName.length);
Bytes.putBytes(rowKey, keyPrefix.length + regionName.length, scan.getStartRow(), 0,
scan.getStartRow().length);
Bytes.putBytes(rowKey, keyPrefix.length + regionName.length + scan.getStartRow().length,
scan.getStopRow(), 0, scan.getStopRow().length);
Put put = new Put(rowKey);
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.rebuiltIndexRowCount)));
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
verifyType == IndexTool.IndexVerifyType.ONLY) {
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.validIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.expiredIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount)));
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.validIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.expiredIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount)));
}
resultHTable.put(put);
}
@Override
public void close() throws IOException {
innerScanner.close();
if (verify) {
try {
logToIndexToolResultTable();
} finally {
this.pool.stop("IndexRebuildRegionScanner is closing");
indexHTable.close();
outputHTable.close();
resultHTable.close();
}
}
}
private void setMutationAttributes(Mutation m, byte[] uuidValue) {
m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
// Since we're replaying existing mutations, it makes no sense to write them to the wal
m.setDurability(Durability.SKIP_WAL);
}
private Delete generateDeleteMarkers(Put put) {
Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
int cellCount = put.size();
if (cellCount == allColumns.size() + 1) {
// We have all the columns for the index table. So, no delete marker is needed
return null;
}
Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount);
long ts = 0;
for (List<Cell> cells : put.getFamilyCellMap().values()) {
if (cells == null) {
break;
}
for (Cell cell : cells) {
includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
if (ts < cell.getTimestamp()) {
ts = cell.getTimestamp();
}
}
}
Delete del = null;
for (ColumnReference column : allColumns) {
if (!includedColumns.contains(column)) {
if (del == null) {
del = new Delete(put.getRow());
}
del.addColumns(column.getFamily(), column.getQualifier(), ts);
}
}
return del;
}
private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutationList, blockingMemstoreSize);
uuidValue = ServerCacheClient.generateId();
mutationList.clear();
}
return uuidValue;
}
private class SimpleValueGetter implements ValueGetter {
final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
final Put put;
SimpleValueGetter (final Put put) {
this.put = put;
}
@Override
public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
if (cellList == null || cellList.isEmpty()) {
return null;
}
Cell cell = cellList.get(0);
valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
return valuePtr;
}
@Override
public byte[] getRowKey() {
return put.getRow();
}
}
private byte[] getIndexRowKey(final Put dataRow) throws IOException {
ValueGetter valueGetter = new SimpleValueGetter(dataRow);
byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
null, null, HConstants.LATEST_TIMESTAMP);
return builtIndexRowKey;
}
private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
byte[] builtIndexRowKey = getIndexRowKey(put);
if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
indexRowKey, 0, indexRowKey.length) != 0) {
return false;
}
return true;
}
private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg) throws IOException {
logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
errorMsg, null, null);
}
private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
final int PREFIX_LENGTH = 3;
final int TOTAL_PREFIX_LENGTH = 6;
final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
long scanMaxTs = scan.getTimeRange().getMax();
byte[] keyPrefix = Bytes.toBytes(Long.toString(scanMaxTs));
byte[] rowKey;
// The row key for the output table is the max timestamp of the scan + data row key
if (dataRowKey != null) {
rowKey = new byte[keyPrefix.length + dataRowKey.length];
Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
Bytes.putBytes(rowKey, keyPrefix.length, dataRowKey, 0, dataRowKey.length);
} else {
rowKey = new byte[keyPrefix.length];
Bytes.putBytes(rowKey, 0, keyPrefix, 0, keyPrefix.length);
}
Put put = new Put(rowKey);
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
scanMaxTs, region.getRegionInfo().getTable().getName());
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
scanMaxTs, indexMaintainer.getIndexTableName());
if (dataRowKey != null) {
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
}
if (indexRowKey != null) {
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
scanMaxTs, indexRowKey);
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
}
byte[] errorMessageBytes;
if (expectedValue != null) {
errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
TOTAL_PREFIX_LENGTH];
Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
int length = errorMsg.length();
Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
length += PREFIX_LENGTH;
Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
length += expectedValue.length;
Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
length += PREFIX_LENGTH;
Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
}
else {
errorMessageBytes = Bytes.toBytes(errorMsg);
}
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
if (isBeforeRebuilt) {
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
} else {
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
}
outputHTable.put(put);
}
private long getMaxTimestamp(Result result) {
long ts = 0;
for (Cell cell : result.rawCells()) {
if (ts < cell.getTimestamp()) {
ts = cell.getTimestamp();
}
}
return ts;
}
private long getMaxTimestamp(Put put) {
long ts = 0;
for (List<Cell> cells : put.getFamilyCellMap().values()) {
if (cells == null) {
break;
}
for (Cell cell : cells) {
if (ts < cell.getTimestamp()) {
ts = cell.getTimestamp();
}
}
}
return ts;
}
private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
ValueGetter valueGetter = new SimpleValueGetter(dataRow);
long ts = getMaxTimestamp(dataRow);
Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
if (indexPut == null) {
// This means the data row does not have any covered column values
indexPut = new Put(indexRow.getRow());
}
else {
// Remove the empty column prepared by Index codec as we need to change its value
removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
indexMaintainer.getEmptyKeyValueQualifier());
}
// Add the empty column
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
int cellCount = 0;
long currentTime = EnvironmentEdgeManager.currentTime();
for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
if (cells == null) {
break;
}
for (Cell expectedCell : cells) {
byte[] family = CellUtil.cloneFamily(expectedCell);
byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
if (actualCell == null) {
// Check if cell expired as per the current server's time and data table ttl
// Index table should have the same ttl as the data table, hence we might not
// get a value back from index if it has already expired between our rebuild and
// verify
// TODO: have a metric to update for these cases
if (isTimestampBeforeTTL(currentTime, expectedCell.getTimestamp())) {
continue;
}
String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
Bytes.toString(qualifier);
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
return false;
}
if (actualCell.getTimestamp() < ts) {
// Skip older cells since a Phoenix index row is composed of cells with the same timestamp
continue;
}
// Check all columns
if (!CellUtil.matchingValue(actualCell, expectedCell)) {
String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" +
Bytes.toString(qualifier);
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
return false;
} else if (actualCell.getTimestamp() != ts) {
String errorMsg = "Not matching timestamp for " + Bytes.toString(family) + ":" +
Bytes.toString(qualifier) + " E: " + ts + " A: " +
actualCell.getTimestamp();
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
errorMsg, null, null);
return false;
}
cellCount++;
}
}
if (cellCount != indexRow.rawCells().length) {
String errorMsg = "Expected to find " + cellCount + " cells but got "
+ indexRow.rawCells().length + " cells";
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
return false;
}
return true;
}
private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap,
VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
int expectedRowCount = keys.size();
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
Scan indexScan = new Scan();
indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
scanRanges.initializeScan(indexScan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
indexScan.setFilter(skipScanFilter);
int rowCount = 0;
try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
Put dataPut = indexKeyToDataPutMap.get(result.getRow());
if (dataPut == null) {
// This should never happen
String errorMsg = "Missing data row";
logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
if (verifySingleIndexRow(result, dataPut)) {
verificationPhaseResult.validIndexRowCount++;
perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
} else {
verificationPhaseResult.invalidIndexRowCount++;
}
rowCount++;
}
} catch (Throwable t) {
ServerUtil.throwIOException(indexHTable.getName().toString(), t);
}
// Check if any expected rows from index(which we didn't get) are already expired due to TTL
// TODO: metrics for expired rows
if (!perTaskDataKeyToDataPutMap.isEmpty()) {
Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator();
long currentTime = EnvironmentEdgeManager.currentTime();
while(itr.hasNext()) {
Entry<byte[], Put> entry = itr.next();
long ts = getMaxTimestamp(entry.getValue());
if (isTimestampBeforeTTL(currentTime, ts)) {
itr.remove();
rowCount++;
verificationPhaseResult.expiredIndexRowCount++;
}
}
}
if (rowCount != expectedRowCount) {
for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
String errorMsg = "Missing index row";
logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()),
0, errorMsg);
}
verificationPhaseResult.missingIndexRowCount += expectedRowCount - rowCount;
}
}
private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
if (indexTableTTL == HConstants.FOREVER) {
return false;
}
return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
}
private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap,
final VerificationResult.PhaseResult verificationPhaseResult) {
tasks.add(new Task<Boolean>() {
@Override
public Boolean call() throws Exception {
try {
if (Thread.currentThread().isInterrupted()) {
exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult);
} catch (Exception e) {
throw e;
}
return Boolean.TRUE;
}
});
}
private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
for (Mutation mutation : mutations) {
indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
}
int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask;
tasks = new TaskBatch<>(taskCount);
List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
dataPutMapList.add(perTaskDataKeyToDataPutMap);
VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
if (keys.size() == rowCountPerTask) {
addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
keys = new ArrayList<>(rowCountPerTask);
perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
dataPutMapList.add(perTaskDataKeyToDataPutMap);
perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
}
}
if (keys.size() > 0) {
addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
}
List<Boolean> taskResultList = null;
try {
LOGGER.debug("Waiting on index verify tasks to complete...");
taskResultList = this.pool.submitUninterruptible(tasks);
} catch (ExecutionException e) {
throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
} catch (EarlyExitFailure e) {
throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
}
for (Boolean result : taskResultList) {
if (result == null) {
// there was a failure
throw new IOException(exceptionMessage);
}
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
for (Map<byte[], Put> dataPutMap : dataPutMapList) {
dataKeyToDataPutMap.putAll(dataPutMap);
}
}
for (VerificationResult.PhaseResult result : verificationPhaseResultList) {
verificationPhaseResult.add(result);
}
}
private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
byte[] uuidValue = ServerCacheClient.generateId();
UngroupedAggregateRegionObserver.MutationList currentMutationList =
new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
for (Mutation mutation : mutationList) {
Put put = (Put) mutation;
currentMutationList.add(mutation);
setMutationAttributes(put, uuidValue);
uuidValue = commitIfReady(uuidValue, currentMutationList);
Delete deleteMarkers = generateDeleteMarkers(put);
if (deleteMarkers != null) {
setMutationAttributes(deleteMarkers, uuidValue);
currentMutationList.add(deleteMarkers);
uuidValue = commitIfReady(uuidValue, currentMutationList);
}
}
if (!currentMutationList.isEmpty()) {
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, currentMutationList, blockingMemstoreSize);
}
}
private void verifyAndOrRebuildIndex() throws IOException {
VerificationResult nextVerificationResult = new VerificationResult();
nextVerificationResult.scannedDataRowCount = mutations.size();
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
// For these options we start with rebuilding index rows
rebuildIndexRows(mutations);
nextVerificationResult.rebuiltIndexRowCount = mutations.size();
isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.NONE) {
return;
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
verifyType == IndexTool.IndexVerifyType.ONLY) {
VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
// For these options we start with verifying index rows
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.before.add(verificationPhaseResult);
if (mutations.size() != verificationPhaseResult.getTotalCount()) {
throw new DoNotRetryIOException(
"mutations.size() != verificationPhaseResult.getTotalCount() at the before phase " +
nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
}
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
// For these options, we have identified the rows to be rebuilt and now need to rebuild them
// At this point, dataKeyToDataPutMap includes mapping only for the rows to be rebuilt
mutations.clear();
for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) {
mutations.add(entry.getValue());
}
rebuildIndexRows(mutations);
nextVerificationResult.rebuiltIndexRowCount += mutations.size();
isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
// We have rebuilt index row and now we need to verify them
indexKeyToDataPutMap.clear();
VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.after.add(verificationPhaseResult);
if (mutations.size() != verificationPhaseResult.getTotalCount()) {
throw new DoNotRetryIOException(
"mutations.size() != verificationPhaseResult.getTotalCount() at the after phase " +
nextVerificationResult + " dataKeyToDataPutMap.size() = " + dataKeyToDataPutMap.size());
}
}
indexKeyToDataPutMap.clear();
verificationResult.add(nextVerificationResult);
}
@Override
public boolean next(List<Cell> results) throws IOException {
Cell lastCell = null;
int rowCount = 0;
region.startRegionOperation();
try {
// Partial rebuilds by MetadataRegionObserver use raw scan. Inline verification is not supported for them
boolean partialRebuild = scan.isRaw();
byte[] uuidValue = ServerCacheClient.generateId();
synchronized (innerScanner) {
do {
List<Cell> row = new ArrayList<Cell>();
hasMore = innerScanner.nextRaw(row);
if (!row.isEmpty()) {
lastCell = row.get(0);
Put put = null;
Delete del = null;
for (Cell cell : row) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
mutations.add(put);
}
put.add(cell);
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
mutations.add(del);
}
del.addDeleteMarker(cell);
}
}
if (partialRebuild) {
if (put != null) {
setMutationAttributes(put, uuidValue);
}
if (del != null) {
setMutationAttributes(del, uuidValue);
}
uuidValue = commitIfReady(uuidValue, mutations);
}
if (indexRowKey != null) {
if (put != null) {
setMutationAttributes(put, uuidValue);
}
Delete deleteMarkers = generateDeleteMarkers(put);
if (deleteMarkers != null) {
setMutationAttributes(deleteMarkers, uuidValue);
mutations.add(deleteMarkers);
uuidValue = commitIfReady(uuidValue, mutations);
}
// GlobalIndexChecker passed the index row key. This is to build a single index row.
// Check if the data table row we have just scanned matches with the index row key.
// If not, there is no need to build the index row from this data table row,
// and just return zero row count.
if (checkIndexRow(indexRowKey, put)) {
rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
} else {
rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
}
break;
}
rowCount++;
}
} while (hasMore && rowCount < pageSizeInRows);
if (!partialRebuild && indexRowKey == null) {
verifyAndOrRebuildIndex();
} else {
if (!mutations.isEmpty()) {
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
}
}
}
} catch (IOException e) {
LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
throw e;
} finally {
region.closeRegionOperation();
mutations.clear();
if (verify) {
indexKeyToDataPutMap.clear();
dataKeyToDataPutMap.clear();
}
}
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
final Cell aggKeyValue;
if (lastCell == null) {
aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
} else {
aggKeyValue = PhoenixKeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
}
results.add(aggKeyValue);
return hasMore;
}
@Override
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
}