PHOENIX-5558 Eliminate the second single data row scan during read repairs
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 4a13c74..1795c37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -106,6 +106,7 @@
public static final String PHYSICAL_DATA_TABLE_NAME = "_PhysicalDataTableName";
public static final String EMPTY_COLUMN_FAMILY_NAME = "_EmptyCFName";
public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
+ public static final String INDEX_ROW_KEY = "_IndexRowKey";
public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d358297..c3d8dd9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -38,9 +38,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -1102,8 +1100,9 @@
private boolean useProto = true;
private Scan scan;
private RegionScanner innerScanner;
- final Region region;
- IndexMaintainer indexMaintainer;
+ private Region region;
+ private IndexMaintainer indexMaintainer;
+ private byte[] indexRowKey = null;
IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
final Configuration config) {
@@ -1131,6 +1130,7 @@
this.scan = scan;
this.innerScanner = innerScanner;
this.region = region;
+ indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
}
@Override
@@ -1194,6 +1194,35 @@
return uuidValue;
}
+ private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
+ ValueGetter getter = new ValueGetter() {
+ final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+ List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+ if (cellList == null || cellList.isEmpty()) {
+ return null;
+ }
+ Cell cell = cellList.get(0);
+ valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ return valuePtr;
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return put.getRow();
+ }
+ };
+ byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(put.getRow()),
+ null, null, HConstants.LATEST_TIMESTAMP);
+ if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+ indexRowKey, 0, indexRowKey.length) != 0) {
+ return false;
+ }
+ return true;
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
int rowCount = 0;
@@ -1224,6 +1253,15 @@
del.addDeleteMarker(cell);
}
}
+ if (indexRowKey != null) {
+ // GlobalIndexChecker passed the index row key. This is to build a single index row.
+ // Check if the data table row we have just scanned matches with the index row key.
+ // If not, there is no need to build the index row from this data table row,
+ // and just return zero row count.
+ if (!checkIndexRow(indexRowKey, put)) {
+ break;
+ }
+ }
uuidValue = commitIfReady(uuidValue);
if (!scan.isRaw()) {
Delete deleteMarkers = generateDeleteMarkers(row);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index d8e1660..5a22a4b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -55,12 +55,13 @@
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -85,7 +86,6 @@
private Scan scan;
private Scan indexScan;
private Scan singleRowIndexScan;
- private Scan singleRowDataScan;
private Scan buildIndexScan = null;
private Table dataHTable = null;
private byte[] emptyCF;
@@ -223,53 +223,10 @@
}
}
- private boolean doesDataRowExist(byte[] indexRowKey, byte[] dataRowKey) throws IOException {
- singleRowDataScan.withStartRow(dataRowKey, true);
- singleRowDataScan.withStopRow(dataRowKey, true);
- singleRowDataScan.setTimeRange(0, maxTimestamp);
- try (ResultScanner resultScanner = dataHTable.getScanner(singleRowDataScan)) {
- final Result result = resultScanner.next();
- if (result == null) {
- // There is no data table row for this index unverified index row. We need to skip it.
- return false;
- }
- else {
- ValueGetter getter = new ValueGetter() {
- final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
-
- @Override
- public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
- Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
- if (cell == null) {
- return null;
- }
- valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- return valuePtr;
- }
-
- @Override
- public byte[] getRowKey() {
- return result.getRow();
- }
- };
- byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
- if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
- indexRowKey, 0, indexRowKey.length) != 0) {
- // The row key of the index row that has been built is different than the row key of the unverified
- // index row
- return false;
- }
- }
- } catch (Throwable t) {
- ServerUtil.throwIOException(dataHTable.getName().toString(), t);
- }
- return true;
- }
private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException {
// Build the data table row key from the index table row key
if (buildIndexScan == null) {
buildIndexScan = new Scan();
- singleRowDataScan = new Scan();
indexScan = new Scan(scan);
singleRowIndexScan = new Scan(scan);
byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
@@ -295,6 +252,11 @@
buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+ // Scan only columns included in the index table plus the empty column
+ for (ColumnReference column : indexMaintainer.getAllColumns()) {
+ buildIndexScan.addColumn(column.getFamily(), column.getQualifier());
+ }
+ buildIndexScan.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
}
// Rebuild the index row from the corresponding the row in the the data table
// Get the data row key from the index row key
@@ -302,12 +264,20 @@
buildIndexScan.withStartRow(dataRowKey, true);
buildIndexScan.withStopRow(dataRowKey, true);
buildIndexScan.setTimeRange(0, maxTimestamp);
+ // Pass the index row key to the partial index builder which will build the index row only when the data
+ // table row for dataRowKey matches with this unverified index row.
+ buildIndexScan.setAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY, indexRowKey);
+ Result result = null;
try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
- resultScanner.next();
+ result = resultScanner.next();
} catch (Throwable t) {
ServerUtil.throwIOException(dataHTable.getName().toString(), t);
}
- if (!doesDataRowExist(indexRowKey, dataRowKey)) {
+ // A single cell will be returned. We decode that here
+ byte[] value = result.value();
+ long rowCount = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(value), SortOrder.getDefault());
+ if (rowCount == 0) {
+ // This means there does not exist a data table row for this unverified index row
// Delete the unverified row from index if it is old enough
deleteRowIfAgedEnough(indexRowKey, row, ts, false);
// Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is