PHOENIX-7313 All cell versions should not be retained during flushes … (#1888)
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index fa45062..d27a187 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -357,20 +357,33 @@
dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}
- public void setScanOptionsForFlushesAndCompactions(ScanOptions options) {
+ public void setScanOptionsForFlushesAndCompactions(Store store, ScanOptions options,
+ boolean retainAllVersions) {
// We want the store to give us all the deleted cells to StoreCompactionScanner
options.setKeepDeletedCells(KeepDeletedCells.TTL);
options.setTTL(HConstants.FOREVER);
- options.setMaxVersions(Integer.MAX_VALUE);
- options.setMinVersions(Integer.MAX_VALUE);
+ if (retainAllVersions) {
+ options.setMaxVersions(Integer.MAX_VALUE);
+ options.setMinVersions(Integer.MAX_VALUE);
+ } else {
+ options.setMinVersions(Math.max(Math.max(options.getMaxVersions(),
+ store.getColumnFamilyDescriptor().getMaxVersions()), 1));
+ options.setMinVersions(Math.max(Math.max(options.getMinVersions(),
+ store.getColumnFamilyDescriptor().getMaxVersions()), 1));
+ }
+
}
+
@Override
public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(options);
+ boolean retainAllVersions = isMaxLookbackTimeEnabled(
+ BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf))
+ || request.isMajor();
+ setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
return;
}
long maxLookbackAge = getMaxLookbackAge(c);
@@ -384,10 +397,14 @@
public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
+
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(options);
+ boolean retainAllVersions = isMaxLookbackTimeEnabled(
+ BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
+ setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
return;
}
+
long maxLookbackAge = getMaxLookbackAge(c);
if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
@@ -401,7 +418,9 @@
throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(options);
+ boolean retainAllVersions = isMaxLookbackTimeEnabled(
+ BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
+ setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
return;
}
long maxLookbackAge = getMaxLookbackAge(c);
@@ -428,7 +447,7 @@
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
- setScanOptionsForFlushesAndCompactions(options);
+ setScanOptionsForFlushesAndCompactions(store, options, true);
return;
}
if (!storeFileScanDoesntNeedAlteration(options)) {
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 3bcc2ce..ebe92b8 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -76,6 +76,8 @@
private final byte[] emptyCF;
private final byte[] emptyCQ;
private final byte[] storeColumnFamily;
+ private final String tableName;
+ private final String columnFamilyName;
private static Map<String, Long> maxLookbackMap = new ConcurrentHashMap<>();
private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
private HBaseLevelRowCompactor hBaseLevelRowCompactor;
@@ -94,19 +96,18 @@
this.emptyCQ = emptyCQ;
this.config = env.getConfiguration();
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
- this.maxLookbackInMillis = maxLookbackInMillis;
- String columnFamilyName = store.getColumnFamilyName();
+ columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
- String tableName = region.getRegionInfo().getTable().getNameAsString();
+ tableName = region.getRegionInfo().getTable().getNameAsString();
Long overriddenMaxLookback =
maxLookbackMap.remove(tableName + SEPARATOR + columnFamilyName);
- maxLookbackInMillis = overriddenMaxLookback == null ?
+ this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackInMillis : Math.max(maxLookbackInMillis, overriddenMaxLookback);
// The oldest scn is current time - maxLookbackInMillis. Phoenix sets the scan time range
// for scn queries [0, scn). This means that the maxlookback size should be
// maxLookbackInMillis + 1 so that the oldest scn does not return empty row
- this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
- compactionTime : compactionTime - (maxLookbackInMillis + 1);
+ this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
+ compactionTime : compactionTime - (this.maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
ttl = cfd.getTimeToLive();
this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttl * 1000;
@@ -121,6 +122,9 @@
|| localIndex;
phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+ LOGGER.info("Starting Phoenix CompactionScanner for table " + tableName + " store "
+ + columnFamilyName + " ttl " + ttl + "ms " + "max lookback "
+ + maxLookbackInMillis + "ms");
}
/**
@@ -155,6 +159,8 @@
@Override
public void close() throws IOException {
+ LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName + " store "
+ + columnFamilyName);
storeScanner.close();
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 261ef94..aa11961 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -70,14 +70,14 @@
long currentTime = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP ?
EnvironmentEdgeManager.currentTimeMillis() : scan.getTimeRange().getMax();
ttl = env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
- ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000;
- ttl *= 1000;
// Regardless if the Phoenix Table TTL feature is disabled cluster wide or the client is
// an older client and does not supply the empty column parameters, the masking should not
- // be done here.
- isMaskingEnabled = emptyCF != null && emptyCQ != null &&
- env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+ // be done here. We also disable masking when TTL is HConstants.FOREVER.
+ isMaskingEnabled = emptyCF != null && emptyCQ != null && ttl != HConstants.FOREVER
+ && env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+ ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000;
+ ttl *= 1000;
}
private void init() throws IOException {
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c07e1e2..4ae157a 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -597,7 +597,14 @@
InternalScanner internalScanner = scanner;
if (request.isMajor()) {
boolean isDisabled = false;
- final String fullTableName = tableName.getNameAsString();
+ boolean isMultiTenantIndexTable = false;
+ if (tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
+ isMultiTenantIndexTable = true;
+ }
+ final String fullTableName = isMultiTenantIndexTable ?
+ SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
+ MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
+ tableName.getNameAsString();
PTable table = null;
try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(
compactionConfig).unwrap(PhoenixConnection.class)) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index f959181..f9900fd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -22,17 +22,22 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.CompactionScanner;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
@@ -45,7 +50,9 @@
import java.io.IOException;
import java.sql.Connection;
+import java.sql.Date;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -60,6 +67,9 @@
import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
@@ -317,6 +327,65 @@
}
@Test(timeout=60000L)
+ public void testViewIndexIsCompacted() throws Exception {
+ if(hasTableLevelMaxLookback) {
+ return;
+ }
+ String baseTable = SchemaUtil.getTableName("SCHEMA1", generateUniqueName());
+ String globalViewName = generateUniqueName();
+ String fullGlobalViewName = SchemaUtil.getTableName("SCHEMA2", globalViewName);
+ String globalViewIdx = generateUniqueName();
+ TableName dataTable = TableName.valueOf(baseTable);
+ TableName indexTable = TableName.valueOf("_IDX_" + baseTable);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE " + baseTable
+ + " (TENANT_ID CHAR(15) NOT NULL, PK2 INTEGER NOT NULL, PK3 INTEGER NOT NULL, "
+ + "COL1 VARCHAR, COL2 VARCHAR, COL3 CHAR(15) CONSTRAINT PK PRIMARY KEY"
+ + "(TENANT_ID, PK2, PK3)) MULTI_TENANT=true");
+ conn.createStatement().execute("CREATE VIEW " + fullGlobalViewName
+ + " AS SELECT * FROM " + baseTable);
+ conn.createStatement().execute("CREATE INDEX " + globalViewIdx + " ON "
+ + fullGlobalViewName + " (COL1) INCLUDE (COL2)");
+
+ conn.createStatement().executeUpdate("UPSERT INTO " + fullGlobalViewName
+ + " (TENANT_ID, PK2, PK3, COL1, COL2) VALUES ('TenantId1',1, 2, 'a', 'b')");
+ conn.commit();
+
+ String query = "SELECT COL2 FROM " + fullGlobalViewName + " WHERE COL1 = 'a'";
+ // Verify that query uses the global view index
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ PTable table = ((PhoenixResultSet)rs).getContext().getCurrentTable().getTable();
+ assertTrue(table.getSchemaName().getString().equals("SCHEMA2") &&
+ table.getTableName().getString().equals(globalViewIdx));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertFalse(rs.next());
+ // Force a flush
+ flush(dataTable);
+ flush(indexTable);
+ assertRawRowCount(conn, dataTable, 1);
+ assertRawRowCount(conn, indexTable, 1);
+ // Delete the row from both tables
+ conn.createStatement().execute("DELETE FROM " + fullGlobalViewName
+ + " WHERE TENANT_ID = 'TenantId1'");
+ conn.commit();
+ // Force a flush
+ flush(dataTable);
+ flush(indexTable);
+ assertRawRowCount(conn, dataTable, 1);
+ assertRawRowCount(conn, indexTable, 1);
+ // Move change beyond the max lookback window
+ injectEdge.setValue(System.currentTimeMillis() + MAX_LOOKBACK_AGE * 1000 + 1);
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ // Major compact both tables
+ majorCompact(dataTable);
+ majorCompact(indexTable);
+ // Everything should have been purged by major compaction
+ assertRawRowCount(conn, dataTable, 0);
+ assertRawRowCount(conn, indexTable, 0);
+ }
+ }
+ @Test(timeout=60000L)
public void testTTLAndMaxLookbackAge() throws Exception {
if(hasTableLevelMaxLookback) {
return;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 852cf8f..3b6d127 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -131,8 +132,8 @@
{ true, false, KeepDeletedCells.FALSE, 5, 50, null},
{ true, false, KeepDeletedCells.TRUE, 1, 25, null},
{ true, false, KeepDeletedCells.TTL, 5, 100, null},
- { false, false, KeepDeletedCells.FALSE, 1, 100, 15},
- { false, false, KeepDeletedCells.TRUE, 5, 50, 15},
+ { false, false, KeepDeletedCells.FALSE, 1, 100, 0},
+ { false, false, KeepDeletedCells.TRUE, 5, 50, 0},
{ false, false, KeepDeletedCells.TTL, 1, 25, 15}});
}
@@ -155,7 +156,7 @@
@Test
public void testMaskingAndCompaction() throws Exception {
final int maxLookbackAge = tableLevelMaxLooback != null ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
- final int maxDeleteCounter = maxLookbackAge;
+ final int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge;
final int maxCompactionCounter = ttl / 2;
final int maxMaskingCounter = 2 * ttl;
final byte[] rowKey = Bytes.toBytes("a");
@@ -232,13 +233,54 @@
}
@Test
- public void testRowSpansMultipleTTLWindows() throws Exception {
- if (tableLevelMaxLooback != null) {
+ public void testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
+ throws Exception {
+ final int maxLookbackAge = tableLevelMaxLooback != null
+ ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
+ if (maxLookbackAge > 0) {
return;
}
try (Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
+ conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = 0");
+ conn.commit();
+ final int flushCount = 10;
+ byte[] row = Bytes.toBytes("a");
+ for (int i = 0; i < flushCount; i++) {
+ // Generate more row versions than the maximum cell versions for the table
+ int updateCount = RAND.nextInt(10) + versions;
+ for (int j = 0; j < updateCount; j++) {
+ updateRow(conn, tableName, "a");
+ }
+ flush(TableName.valueOf(tableName));
+ // At every flush, extra cell versions should be removed.
+ // MAX_COLUMN_INDEX table columns and one empty column will be retained for
+ // each row version.
+ TestUtil.assertRawCellCount(conn, TableName.valueOf(tableName), row,
+ (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+ }
+ // Run one minor compaction (in case no minor compaction has happened yet)
+ Admin admin = utility.getAdmin();
+ admin.compact(TableName.valueOf(tableName));
+ int waitCount = 0;
+ while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
+ Bytes.toBytes("a")) < flushCount * (MAX_COLUMN_INDEX + 1) * versions) {
+ // Wait for major compactions to happen
+ Thread.sleep(1000);
+ waitCount++;
+ if (waitCount > 30) {
+ Assert.fail();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testRowSpansMultipleTTLWindows() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String tableName = generateUniqueName();
+ createTable(tableName);
String noCompactTableName = generateUniqueName();
createTable(noCompactTableName);
long startTime = System.currentTimeMillis() + 1000;
@@ -303,6 +345,16 @@
conn.commit();
}
+ private void updateRow(Connection conn, String tableName, String id)
+ throws SQLException {
+
+ for (int i = 1; i <= MAX_COLUMN_INDEX; i++) {
+ String value = Integer.toString(RAND.nextInt(1000));
+ updateColumn(conn, tableName, id, i, value);
+ }
+ conn.commit();
+ }
+
private void compareRow(Connection conn, String tableName1, String tableName2, String id,
int maxColumnIndex) throws SQLException, IOException {
StringBuilder queryBuilder = new StringBuilder("SELECT ");
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 79b0168..c15bd40 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1364,13 +1364,17 @@
assertEquals(expectedRowCount, count);
}
- public static void assertRawCellCount(Connection conn, TableName tableName,
- byte[] row, int expectedCellCount)
- throws SQLException, IOException {
+ public static int getRawCellCount(Connection conn, TableName tableName, byte[] row)
+ throws SQLException, IOException {
ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
Table table = cqs.getTable(tableName.getName());
CellCount cellCount = getCellCount(table, true);
- int count = cellCount.getCellCount(Bytes.toString(row));
+ return cellCount.getCellCount(Bytes.toString(row));
+ }
+ public static void assertRawCellCount(Connection conn, TableName tableName,
+ byte[] row, int expectedCellCount)
+ throws SQLException, IOException {
+ int count = getRawCellCount(conn, tableName, row);
assertEquals(expectedCellCount, count);
}