| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.end2end; |
| |
| import org.apache.hadoop.hbase.KeepDeletedCells; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; |
| import org.apache.phoenix.query.BaseTest; |
| import org.apache.phoenix.query.QueryServices; |
| import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; |
| import org.apache.phoenix.util.EnvironmentEdgeManager; |
| import org.apache.phoenix.util.ManualEnvironmentEdge; |
| import org.apache.phoenix.util.PhoenixRuntime; |
| import org.apache.phoenix.util.ReadOnlyProps; |
| import org.apache.phoenix.util.TestUtil; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| |
| @Category(NeedsOwnMiniClusterTest.class) |
| @RunWith(Parameterized.class) |
| public class TableTTLIT extends BaseTest { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TableTTLIT.class); |
| private static final Random RAND = new Random(11); |
| private static final int MAX_COLUMN_INDEX = 7; |
| private static final int MAX_LOOKBACK_AGE = 10; |
| private final int ttl; |
| private String tableDDLOptions; |
| private StringBuilder optionBuilder; |
| ManualEnvironmentEdge injectEdge; |
| private int versions; |
| private final boolean multiCF; |
| private final boolean columnEncoded; |
| private final KeepDeletedCells keepDeletedCells; |
| private final Integer tableLevelMaxLooback; |
| |
| public TableTTLIT(boolean multiCF, boolean columnEncoded, |
| KeepDeletedCells keepDeletedCells, int versions, int ttl, Integer tableLevelMaxLooback) { |
| this.multiCF = multiCF; |
| this.columnEncoded = columnEncoded; |
| this.keepDeletedCells = keepDeletedCells; |
| this.versions = versions; |
| this.ttl = ttl; |
| this.tableLevelMaxLooback = tableLevelMaxLooback; |
| } |
| @BeforeClass |
| public static synchronized void doSetup() throws Exception { |
| Map<String, String> props = Maps.newHashMapWithExpectedSize(1); |
| props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); |
| props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); |
| setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); |
| } |
| |
| @Before |
| public void beforeTest(){ |
| EnvironmentEdgeManager.reset(); |
| optionBuilder = new StringBuilder(); |
| optionBuilder.append(" TTL=" + ttl); |
| optionBuilder.append(", VERSIONS=" + versions); |
| if (keepDeletedCells == KeepDeletedCells.FALSE) { |
| optionBuilder.append(", KEEP_DELETED_CELLS=FALSE"); |
| } else if (keepDeletedCells == KeepDeletedCells.TRUE) { |
| optionBuilder.append(", KEEP_DELETED_CELLS=TRUE"); |
| } else { |
| optionBuilder.append(", KEEP_DELETED_CELLS=TTL"); |
| } |
| if (columnEncoded) { |
| optionBuilder.append(", COLUMN_ENCODED_BYTES=2"); |
| } else { |
| optionBuilder.append(", COLUMN_ENCODED_BYTES=0"); |
| } |
| if (tableLevelMaxLooback != null) { |
| optionBuilder.append(", MAX_LOOKBACK_AGE=").append(tableLevelMaxLooback * 1000); |
| } |
| this.tableDDLOptions = optionBuilder.toString(); |
| injectEdge = new ManualEnvironmentEdge(); |
| injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis()); |
| } |
| |
| @After |
| public synchronized void afterTest() throws Exception { |
| boolean refCountLeaked = isAnyStoreRefCountLeaked(); |
| |
| EnvironmentEdgeManager.reset(); |
| Assert.assertFalse("refCount leaked", refCountLeaked); |
| } |
| |
| @Parameterized.Parameters(name = "TableTTLIT_multiCF={0}, columnEncoded={1}, " |
| + "keepDeletedCells={2}, versions={3}, ttl={4}, tableLevelMaxLookback={5}") |
| public static synchronized Collection<Object[]> data() { |
| return Arrays.asList(new Object[][] { |
| { false, false, KeepDeletedCells.FALSE, 1, 100, null}, |
| { false, false, KeepDeletedCells.TRUE, 5, 50, null}, |
| { false, false, KeepDeletedCells.TTL, 1, 25, null}, |
| { true, false, KeepDeletedCells.FALSE, 5, 50, null}, |
| { true, false, KeepDeletedCells.TRUE, 1, 25, null}, |
| { true, false, KeepDeletedCells.TTL, 5, 100, null}, |
| { false, false, KeepDeletedCells.FALSE, 1, 100, 15}, |
| { false, false, KeepDeletedCells.TRUE, 5, 50, 15}, |
| { false, false, KeepDeletedCells.TTL, 1, 25, 15}}); |
| } |
| |
| /** |
| * This test creates two tables with the same schema. The same row is updated in a loop on |
| * both tables with the same content. Each update changes one or more columns chosen |
| * randomly with randomly generated values. |
| * |
| * After every upsert, all versions of the rows are retrieved from each table and compared. |
| * The test also occasionally deletes the row from both tables and but compacts only the |
| * first table during this test. |
| * |
| * Both tables are subject to masking during queries. |
| * |
| * This test expects that both tables return the same row content for the same row version. |
| * |
| * @throws Exception |
| */ |
| |
| @Test |
| public void testMaskingAndCompaction() throws Exception { |
| final int maxLookbackAge = tableLevelMaxLooback != null ? tableLevelMaxLooback : MAX_LOOKBACK_AGE; |
| final int maxDeleteCounter = maxLookbackAge; |
| final int maxCompactionCounter = ttl / 2; |
| final int maxMaskingCounter = 2 * ttl; |
| final byte[] rowKey = Bytes.toBytes("a"); |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| String tableName = generateUniqueName(); |
| createTable(tableName); |
| String noCompactTableName = generateUniqueName(); |
| createTable(noCompactTableName); |
| long startTime = System.currentTimeMillis() + 1000; |
| startTime = (startTime / 1000) * 1000; |
| EnvironmentEdgeManager.injectEdge(injectEdge); |
| injectEdge.setValue(startTime); |
| int deleteCounter = RAND.nextInt(maxDeleteCounter) + 1; |
| int compactionCounter = RAND.nextInt(maxCompactionCounter) + 1; |
| int maskingCounter = RAND.nextInt(maxMaskingCounter) + 1; |
| boolean afterCompaction = false; |
| for (int i = 0; i < 500; i++) { |
| if (compactionCounter-- == 0) { |
| injectEdge.incrementValue(1000); |
| LOG.debug("Compaction " + i + " current time: " + injectEdge.currentTime()); |
| flush(TableName.valueOf(tableName)); |
| majorCompact(TableName.valueOf(tableName)); |
| compactionCounter = RAND.nextInt(maxCompactionCounter) + 1; |
| afterCompaction = true; |
| } |
| if (maskingCounter-- == 0) { |
| updateRow(conn, tableName, noCompactTableName, "a"); |
| injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000); |
| LOG.debug("Masking " + i + " current time: " + injectEdge.currentTime()); |
| ResultSet rs = conn.createStatement().executeQuery( |
| "SELECT count(*) FROM " + tableName); |
| Assert.assertTrue(rs.next()); |
| Assert.assertEquals(rs.getLong(1), 0); |
| rs = conn.createStatement().executeQuery( |
| "SELECT count(*) FROM " + noCompactTableName); |
| Assert.assertTrue(rs.next()); |
| Assert.assertEquals(rs.getLong(1), 0); |
| flush(TableName.valueOf(tableName)); |
| majorCompact(TableName.valueOf(tableName)); |
| TestUtil.assertRawCellCount(conn, TableName.valueOf(tableName), rowKey, 0); |
| maskingCounter = RAND.nextInt(maxMaskingCounter) + 1; |
| } |
| if (deleteCounter-- == 0) { |
| LOG.debug("Delete " + i + " current time: " + injectEdge.currentTime()); |
| deleteRow(conn, tableName, "a"); |
| deleteRow(conn, noCompactTableName, "a"); |
| deleteCounter = RAND.nextInt(maxDeleteCounter) + 1; |
| injectEdge.incrementValue(1000); |
| } |
| updateRow(conn, tableName, noCompactTableName, "a"); |
| if (!afterCompaction) { |
| injectEdge.incrementValue(1000); |
| // We are interested in the correctness of compaction and masking. Thus, we |
| // only need to do the latest version and scn queries to after compaction. |
| continue; |
| } |
| afterCompaction = false; |
| compareRow(conn, tableName, noCompactTableName, "a", MAX_COLUMN_INDEX); |
| long scn = injectEdge.currentTime() - maxLookbackAge * 1000; |
| long scnEnd = injectEdge.currentTime(); |
| long scnStart = Math.max(scn, startTime); |
| for (scn = scnEnd; scn >= scnStart; scn -= 1000) { |
| // Compare all row versions using scn queries |
| Properties props = new Properties(); |
| props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); |
| try (Connection scnConn = DriverManager.getConnection(url, props)) { |
| compareRow(scnConn, tableName, noCompactTableName, "a", |
| MAX_COLUMN_INDEX); |
| } |
| } |
| injectEdge.incrementValue(1000); |
| } |
| } |
| } |
| |
| @Test |
| public void testRowSpansMultipleTTLWindows() throws Exception { |
| if (tableLevelMaxLooback != null) { |
| return; |
| } |
| try (Connection conn = DriverManager.getConnection(getUrl())) { |
| String tableName = generateUniqueName(); |
| createTable(tableName); |
| String noCompactTableName = generateUniqueName(); |
| createTable(noCompactTableName); |
| long startTime = System.currentTimeMillis() + 1000; |
| startTime = (startTime / 1000) * 1000; |
| EnvironmentEdgeManager.injectEdge(injectEdge); |
| injectEdge.setValue(startTime); |
| for (int columnIndex = 1; columnIndex <= MAX_COLUMN_INDEX; columnIndex++) { |
| String value = Integer.toString(RAND.nextInt(1000)); |
| updateColumn(conn, tableName, "a", columnIndex, value); |
| updateColumn(conn, noCompactTableName, "a", columnIndex, value); |
| conn.commit(); |
| injectEdge.incrementValue(ttl * 1000 - 1000); |
| } |
| flush(TableName.valueOf(tableName)); |
| majorCompact(TableName.valueOf(tableName)); |
| compareRow(conn, tableName, noCompactTableName, "a", MAX_COLUMN_INDEX); |
| injectEdge.incrementValue(1000); |
| } |
| } |
| private void flush(TableName table) throws IOException { |
| Admin admin = getUtility().getAdmin(); |
| admin.flush(table); |
| } |
| |
| private void majorCompact(TableName table) throws Exception { |
| TestUtil.majorCompact(getUtility(), table); |
| } |
| |
| private void deleteRow(Connection conn, String tableName, String id) throws SQLException{ |
| String dml = "DELETE from " + tableName + " WHERE id = '" + id + "'"; |
| conn.createStatement().executeUpdate(dml); |
| conn.commit(); |
| } |
| private void updateColumn(Connection conn, String dataTableName, String id, |
| int columnIndex, String value) |
| throws SQLException { |
| String upsertSql; |
| if (value == null) { |
| upsertSql = String.format("UPSERT INTO %s (id, %s) VALUES ('%s', null)", |
| dataTableName, "val" + columnIndex, id); |
| } else { |
| upsertSql = String.format("UPSERT INTO %s (id, %s) VALUES ('%s', '%s')", |
| dataTableName, "val" + columnIndex, id, value); |
| } |
| conn.createStatement().execute(upsertSql); |
| } |
| |
| private void updateRow(Connection conn, String tableName1, String tableName2, String id) |
| throws SQLException { |
| |
| int columnCount = RAND.nextInt(MAX_COLUMN_INDEX) + 1; |
| for (int i = 0; i < columnCount; i++) { |
| int columnIndex = RAND.nextInt(MAX_COLUMN_INDEX) + 1; |
| String value = null; |
| // Leave the value null once in a while |
| if (RAND.nextInt(MAX_COLUMN_INDEX) > 0) { |
| value = Integer.toString(RAND.nextInt(1000)); |
| } |
| updateColumn(conn, tableName1, id, columnIndex, value); |
| updateColumn(conn, tableName2, id, columnIndex, value); |
| } |
| conn.commit(); |
| } |
| |
| private void compareRow(Connection conn, String tableName1, String tableName2, String id, |
| int maxColumnIndex) throws SQLException, IOException { |
| StringBuilder queryBuilder = new StringBuilder("SELECT "); |
| for (int i = 1; i < maxColumnIndex; i++) { |
| queryBuilder.append("val" + i + ", "); |
| } |
| queryBuilder.append("val" + maxColumnIndex + " FROM %s "); |
| queryBuilder.append("where id='" + id + "'"); |
| ResultSet rs1 = conn.createStatement().executeQuery( |
| String.format(queryBuilder.toString(), tableName1)); |
| ResultSet rs2 = conn.createStatement().executeQuery( |
| String.format(queryBuilder.toString(), tableName2)); |
| |
| boolean hasRow1 = rs1.next(); |
| boolean hasRow2 = rs2.next(); |
| Assert.assertEquals(hasRow1, hasRow2); |
| if (hasRow1) { |
| int i; |
| for (i = 1; i <= maxColumnIndex; i++) { |
| if (rs1.getString(i) != null) { |
| if (!rs1.getString(i).equals(rs2.getString(i))) { |
| LOG.debug("VAL" + i + " " + rs2.getString(i) + " : " + rs1.getString(i)); |
| } |
| } else if (rs2.getString(i) != null) { |
| LOG.debug("VAL" + i + " " + rs2.getString(i) + " : " + rs1.getString(i)); |
| } |
| Assert.assertEquals("VAL" + i, rs2.getString(i), rs1.getString(i)); |
| } |
| } |
| } |
| |
| private void createTable(String tableName) throws SQLException { |
| try(Connection conn = DriverManager.getConnection(getUrl())) { |
| String createSql; |
| if (multiCF) { |
| createSql = "create table " + tableName + |
| " (id varchar not null primary key, val1 varchar, " + |
| "a.val2 varchar, a.val3 varchar, a.val4 varchar, " + |
| "b.val5 varchar, a.val6 varchar, b.val7 varchar) " + |
| tableDDLOptions; |
| } |
| else { |
| createSql = "create table " + tableName + |
| " (id varchar not null primary key, val1 varchar, " + |
| "val2 varchar, val3 varchar, val4 varchar, " + |
| "val5 varchar, val6 varchar, val7 varchar) " + |
| tableDDLOptions; |
| } |
| conn.createStatement().execute(createSql); |
| conn.commit(); |
| } |
| } |
| } |