PHOENIX-7313 All cell versions should not be retained during flushes … (#1888)

diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index fa45062..d27a187 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -357,20 +357,33 @@
                 dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
     }
 
-    public void setScanOptionsForFlushesAndCompactions(ScanOptions options) {
+    public void setScanOptionsForFlushesAndCompactions(Store store, ScanOptions options,
+            boolean retainAllVersions) {
         // We want the store to give us all the deleted cells to StoreCompactionScanner
         options.setKeepDeletedCells(KeepDeletedCells.TTL);
         options.setTTL(HConstants.FOREVER);
-        options.setMaxVersions(Integer.MAX_VALUE);
-        options.setMinVersions(Integer.MAX_VALUE);
+        if (retainAllVersions) {
+            options.setMaxVersions(Integer.MAX_VALUE);
+            options.setMinVersions(Integer.MAX_VALUE);
+        } else {
+            options.setMinVersions(Math.max(Math.max(options.getMaxVersions(),
+                    store.getColumnFamilyDescriptor().getMaxVersions()), 1));
+            options.setMinVersions(Math.max(Math.max(options.getMinVersions(),
+                    store.getColumnFamilyDescriptor().getMaxVersions()), 1));
+        }
+
     }
+
     @Override
     public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
             ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
             CompactionRequest request) throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
+                    BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf))
+                    || request.isMajor();
+            setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
             return;
         }
         long maxLookbackAge = getMaxLookbackAge(c);
@@ -384,10 +397,14 @@
     public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
             ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
+
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
+                    BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
+            setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
             return;
         }
+
         long maxLookbackAge = getMaxLookbackAge(c);
         if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
             setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
@@ -401,7 +418,9 @@
             throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
+                    BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
+            setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
             return;
         }
         long maxLookbackAge = getMaxLookbackAge(c);
@@ -428,7 +447,7 @@
 
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            setScanOptionsForFlushesAndCompactions(store, options, true);
             return;
         }
         if (!storeFileScanDoesntNeedAlteration(options)) {
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 3bcc2ce..ebe92b8 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -76,6 +76,8 @@
     private final byte[] emptyCF;
     private final byte[] emptyCQ;
     private final byte[] storeColumnFamily;
+    private final String tableName;
+    private final String columnFamilyName;
     private static Map<String, Long> maxLookbackMap = new ConcurrentHashMap<>();
     private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
     private HBaseLevelRowCompactor hBaseLevelRowCompactor;
@@ -94,19 +96,18 @@
         this.emptyCQ = emptyCQ;
         this.config = env.getConfiguration();
         compactionTime = EnvironmentEdgeManager.currentTimeMillis();
-        this.maxLookbackInMillis = maxLookbackInMillis;
-        String columnFamilyName = store.getColumnFamilyName();
+        columnFamilyName = store.getColumnFamilyName();
         storeColumnFamily = columnFamilyName.getBytes();
-        String tableName = region.getRegionInfo().getTable().getNameAsString();
+        tableName = region.getRegionInfo().getTable().getNameAsString();
         Long overriddenMaxLookback =
                 maxLookbackMap.remove(tableName + SEPARATOR + columnFamilyName);
-        maxLookbackInMillis = overriddenMaxLookback == null ?
+        this.maxLookbackInMillis = overriddenMaxLookback == null ?
                 maxLookbackInMillis : Math.max(maxLookbackInMillis, overriddenMaxLookback);
         // The oldest scn is current time - maxLookbackInMillis. Phoenix sets the scan time range
         // for scn queries [0, scn). This means that the maxlookback size should be
         // maxLookbackInMillis + 1 so that the oldest scn does not return empty row
-        this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
-                compactionTime : compactionTime - (maxLookbackInMillis + 1);
+        this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
+                compactionTime : compactionTime - (this.maxLookbackInMillis + 1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
         ttl = cfd.getTimeToLive();
         this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttl * 1000;
@@ -121,6 +122,9 @@
                         || localIndex;
         phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
         hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+        LOGGER.info("Starting Phoenix CompactionScanner for table " + tableName + " store "
+                + columnFamilyName + " ttl " + ttl + "ms " + "max lookback "
+                + maxLookbackInMillis + "ms");
     }
 
     /**
@@ -155,6 +159,8 @@
 
     @Override
     public void close() throws IOException {
+        LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName + " store "
+                + columnFamilyName);
         storeScanner.close();
     }
 
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 261ef94..aa11961 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -70,14 +70,14 @@
         long currentTime = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP ?
                 EnvironmentEdgeManager.currentTimeMillis() : scan.getTimeRange().getMax();
         ttl = env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
-        ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000;
-        ttl *= 1000;
         // Regardless if the Phoenix Table TTL feature is disabled cluster wide or the client is
         // an older client and does not supply the empty column parameters, the masking should not
-        // be done here.
-        isMaskingEnabled = emptyCF != null && emptyCQ != null &&
-                env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+        // be done here. We also disable masking when TTL is HConstants.FOREVER.
+        isMaskingEnabled = emptyCF != null && emptyCQ != null && ttl != HConstants.FOREVER
+                && env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
                         QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+        ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000;
+        ttl *= 1000;
     }
 
     private void init() throws IOException {
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c07e1e2..4ae157a 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -597,7 +597,14 @@
                     InternalScanner internalScanner = scanner;
                     if (request.isMajor()) {
                         boolean isDisabled = false;
-                        final String fullTableName = tableName.getNameAsString();
+                        boolean isMultiTenantIndexTable = false;
+                        if (tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
+                            isMultiTenantIndexTable = true;
+                        }
+                        final String fullTableName = isMultiTenantIndexTable ?
+                                SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
+                                        MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
+                                tableName.getNameAsString();
                         PTable table = null;
                         try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(
                                 compactionConfig).unwrap(PhoenixConnection.class)) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index f959181..f9900fd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -22,17 +22,22 @@
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.CompactionScanner;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ManualEnvironmentEdge;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.Assert;
@@ -45,7 +50,9 @@
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -60,6 +67,9 @@
 import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
 import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
 import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
@@ -317,6 +327,65 @@
     }
 
     @Test(timeout=60000L)
+    public void testViewIndexIsCompacted() throws Exception {
+        if(hasTableLevelMaxLookback) {
+            return;
+        }
+        String baseTable =  SchemaUtil.getTableName("SCHEMA1", generateUniqueName());
+        String globalViewName = generateUniqueName();
+        String fullGlobalViewName = SchemaUtil.getTableName("SCHEMA2", globalViewName);
+        String globalViewIdx =  generateUniqueName();
+        TableName dataTable = TableName.valueOf(baseTable);
+        TableName indexTable = TableName.valueOf("_IDX_" + baseTable);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + baseTable
+                    + " (TENANT_ID CHAR(15) NOT NULL, PK2 INTEGER NOT NULL, PK3 INTEGER NOT NULL, "
+                    + "COL1 VARCHAR, COL2 VARCHAR, COL3 CHAR(15) CONSTRAINT PK PRIMARY KEY"
+                    + "(TENANT_ID, PK2, PK3)) MULTI_TENANT=true");
+            conn.createStatement().execute("CREATE VIEW " + fullGlobalViewName
+                    + " AS SELECT * FROM " + baseTable);
+            conn.createStatement().execute("CREATE INDEX " + globalViewIdx + " ON "
+                    + fullGlobalViewName + " (COL1) INCLUDE (COL2)");
+
+            conn.createStatement().executeUpdate("UPSERT INTO  " + fullGlobalViewName
+                    + " (TENANT_ID, PK2, PK3, COL1, COL2) VALUES ('TenantId1',1, 2, 'a', 'b')");
+            conn.commit();
+
+            String query = "SELECT COL2 FROM " + fullGlobalViewName + " WHERE  COL1 = 'a'";
+            // Verify that query uses the global view index
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            PTable table = ((PhoenixResultSet)rs).getContext().getCurrentTable().getTable();
+            assertTrue(table.getSchemaName().getString().equals("SCHEMA2") &&
+                    table.getTableName().getString().equals(globalViewIdx));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertFalse(rs.next());
+            // Force a flush
+            flush(dataTable);
+            flush(indexTable);
+            assertRawRowCount(conn, dataTable, 1);
+            assertRawRowCount(conn, indexTable, 1);
+            // Delete the row from both tables
+            conn.createStatement().execute("DELETE FROM " + fullGlobalViewName
+                            + " WHERE TENANT_ID = 'TenantId1'");
+            conn.commit();
+            // Force a flush
+            flush(dataTable);
+            flush(indexTable);
+            assertRawRowCount(conn, dataTable, 1);
+            assertRawRowCount(conn, indexTable, 1);
+            // Move change beyond the max lookback window
+            injectEdge.setValue(System.currentTimeMillis() + MAX_LOOKBACK_AGE * 1000 + 1);
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            // Major compact both tables
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            // Everything should have been purged by major compaction
+            assertRawRowCount(conn, dataTable, 0);
+            assertRawRowCount(conn, indexTable, 0);
+        }
+    }
+    @Test(timeout=60000L)
     public void testTTLAndMaxLookbackAge() throws Exception {
         if(hasTableLevelMaxLookback) {
             return;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 852cf8f..3b6d127 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -131,8 +132,8 @@
                     { true, false, KeepDeletedCells.FALSE, 5, 50, null},
                     { true, false, KeepDeletedCells.TRUE, 1, 25, null},
                     { true, false, KeepDeletedCells.TTL, 5, 100, null},
-                    { false, false, KeepDeletedCells.FALSE, 1, 100, 15},
-                    { false, false, KeepDeletedCells.TRUE, 5, 50, 15},
+                    { false, false, KeepDeletedCells.FALSE, 1, 100, 0},
+                    { false, false, KeepDeletedCells.TRUE, 5, 50, 0},
                     { false, false, KeepDeletedCells.TTL, 1, 25, 15}});
     }
 
@@ -155,7 +156,7 @@
     @Test
     public void testMaskingAndCompaction() throws Exception {
         final int maxLookbackAge = tableLevelMaxLooback != null ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
-        final int maxDeleteCounter = maxLookbackAge;
+        final int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge;
         final int maxCompactionCounter = ttl / 2;
         final int maxMaskingCounter = 2 * ttl;
         final byte[] rowKey = Bytes.toBytes("a");
@@ -232,13 +233,54 @@
     }
 
     @Test
-    public void testRowSpansMultipleTTLWindows() throws Exception {
-        if (tableLevelMaxLooback != null) {
+    public void testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
+            throws Exception {
+        final int maxLookbackAge = tableLevelMaxLooback != null
+                ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
+        if (maxLookbackAge > 0) {
             return;
         }
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tableName = generateUniqueName();
             createTable(tableName);
+            conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = 0");
+            conn.commit();
+            final int flushCount = 10;
+            byte[] row = Bytes.toBytes("a");
+            for (int i = 0; i < flushCount; i++) {
+                // Generate more row versions than the maximum cell versions for the table
+                int updateCount = RAND.nextInt(10) + versions;
+                for (int j = 0; j < updateCount; j++) {
+                    updateRow(conn, tableName, "a");
+                }
+                flush(TableName.valueOf(tableName));
+                // At every flush, extra cell versions should be removed.
+                // MAX_COLUMN_INDEX table columns and one empty column will be retained for
+                // each row version.
+                TestUtil.assertRawCellCount(conn, TableName.valueOf(tableName), row,
+                        (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+            }
+            // Run one minor compaction (in case no minor compaction has happened yet)
+            Admin admin = utility.getAdmin();
+            admin.compact(TableName.valueOf(tableName));
+            int waitCount = 0;
+            while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
+                    Bytes.toBytes("a")) < flushCount * (MAX_COLUMN_INDEX + 1) * versions) {
+                // Wait for major compactions to happen
+                Thread.sleep(1000);
+                waitCount++;
+                if (waitCount > 30) {
+                    Assert.fail();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testRowSpansMultipleTTLWindows() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            createTable(tableName);
             String noCompactTableName = generateUniqueName();
             createTable(noCompactTableName);
             long startTime = System.currentTimeMillis() + 1000;
@@ -303,6 +345,16 @@
         conn.commit();
     }
 
+    private void updateRow(Connection conn, String tableName, String id)
+            throws SQLException {
+
+        for (int i = 1; i <= MAX_COLUMN_INDEX; i++) {
+            String value = Integer.toString(RAND.nextInt(1000));
+            updateColumn(conn, tableName, id, i, value);
+        }
+        conn.commit();
+    }
+
     private void compareRow(Connection conn, String tableName1, String tableName2, String id,
             int maxColumnIndex) throws SQLException, IOException {
         StringBuilder queryBuilder = new StringBuilder("SELECT ");
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 79b0168..c15bd40 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1364,13 +1364,17 @@
         assertEquals(expectedRowCount, count);
     }
 
-    public static void assertRawCellCount(Connection conn, TableName tableName,
-                                          byte[] row, int expectedCellCount)
-        throws SQLException, IOException {
+    public static int getRawCellCount(Connection conn, TableName tableName, byte[] row)
+            throws SQLException, IOException {
         ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
         Table table = cqs.getTable(tableName.getName());
         CellCount cellCount = getCellCount(table, true);
-        int count = cellCount.getCellCount(Bytes.toString(row));
+        return cellCount.getCellCount(Bytes.toString(row));
+    }
+    public static void assertRawCellCount(Connection conn, TableName tableName,
+                                          byte[] row, int expectedCellCount)
+        throws SQLException, IOException {
+        int count = getRawCellCount(conn, tableName, row);
         assertEquals(expectedCellCount, count);
     }