blob: 010b4f9e1e8802116d8aef7e48ced5a876e3b73b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.EnvironmentEdge;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.AfterClass;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
import static org.apache.phoenix.util.TestUtil.assertRawCellCount;
import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
@NeedsOwnMiniClusterTest
public class MaxLookbackIT extends BaseUniqueNamesOwnClusterIT {
private static final Log LOG = LogFactory.getLog(MaxLookbackIT.class);
private static final int MAX_LOOKBACK_AGE = 15;
private static final int ROWS_POPULATED = 2;
public static final int WAIT_AFTER_TABLE_CREATION_MILLIS = 1;
private String tableDDLOptions;
private StringBuilder optionBuilder;
ManualEnvironmentEdge injectEdge;
private int ttl;
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@Before
public void beforeTest(){
EnvironmentEdgeManager.reset();
optionBuilder = new StringBuilder();
this.tableDDLOptions = optionBuilder.toString();
ttl = 0;
injectEdge = new ManualEnvironmentEdge();
injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
}
@AfterClass
public static synchronized void afterClass() {
EnvironmentEdgeManager.reset();
}
@Test
public void testTooLowSCNWithMaxLookbackAge() throws Exception {
String dataTableName = generateUniqueName();
createTable(dataTableName);
//increase long enough to make sure we can find the syscat row for the table
injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
populateTable(dataTableName);
long populateTime = EnvironmentEdgeManager.currentTimeMillis();
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(populateTime));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
connscn.createStatement().executeQuery("select * from " + dataTableName);
} catch (SQLException se) {
SQLExceptionCode code =
SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_MAX_LOOKBACK_AGE;
TestUtil.assertSqlExceptionCode(code, se);
return;
}
Assert.fail("We should have thrown an exception for the too-early SCN");
}
@Test(timeout=120000L)
public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
String indexName = generateUniqueName();
createTable(dataTableName);
injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
TableName dataTable = TableName.valueOf(dataTableName);
populateTable(dataTableName);
createIndex(dataTableName, indexName, 1);
TableName indexTable = TableName.valueOf(indexName);
//make sure we're after the inserts have been committed
injectEdge.incrementValue(1);
long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
injectEdge.incrementValue(10); //make sure we delete at a different ts
Statement stmt = conn.createStatement();
stmt.execute("DELETE FROM " + dataTableName + " WHERE " + " id = 'a'");
Assert.assertEquals(1, stmt.getUpdateCount());
conn.commit();
//select stmt to get row we deleted
String sql = String.format("SELECT * FROM %s WHERE id = 'a'", dataTableName);
String indexSql = String.format("SELECT * FROM %s WHERE val1 = 'ab'", dataTableName);
int rowsPlusDeleteMarker = ROWS_POPULATED;
assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
assertExplainPlan(conn, indexSql, dataTableName, indexName);
assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
flush(dataTable);
flush(indexTable);
assertRowExistsAtSCN(getUrl(), sql, beforeDeleteSCN, true);
assertRowExistsAtSCN(getUrl(), indexSql, beforeDeleteSCN, true);
long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
injectEdge.incrementValue(1); //new ts for major compaction
majorCompact(dataTable);
majorCompact(indexTable);
assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
//wait for the lookback time. After this compactions should purge the deleted row
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
String notDeletedRowSql =
String.format("SELECT * FROM %s WHERE id = 'b'", dataTableName);
String notDeletedIndexRowSql =
String.format("SELECT * FROM %s WHERE val1 = 'bc'", dataTableName);
assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
assertRawRowCount(conn, dataTable, ROWS_POPULATED);
assertRawRowCount(conn, indexTable, ROWS_POPULATED);
conn.createStatement().execute("upsert into " + dataTableName +
" values ('c', 'cd', 'cde', 'cdef')");
conn.commit();
majorCompact(dataTable);
majorCompact(indexTable);
//should still be ROWS_POPULATED because we added one and deleted one
assertRawRowCount(conn, dataTable, ROWS_POPULATED);
assertRawRowCount(conn, indexTable, ROWS_POPULATED);
//deleted row should be gone, but not deleted row should still be there.
assertRowExistsAtSCN(getUrl(), sql, beforeSecondCompactSCN, false);
assertRowExistsAtSCN(getUrl(), indexSql, beforeSecondCompactSCN, false);
assertRowExistsAtSCN(getUrl(), notDeletedRowSql, beforeSecondCompactSCN, true);
assertRowExistsAtSCN(getUrl(), notDeletedIndexRowSql, beforeSecondCompactSCN, true);
}
}
@Test(timeout=60000L)
public void testTTLAndMaxLookbackAge() throws Exception {
ttl = 20;
optionBuilder.append("TTL=" + ttl);
tableDDLOptions = optionBuilder.toString();
Configuration conf = getUtility().getConfiguration();
//disable automatic memstore flushes
long oldMemstoreFlushInterval = conf.getLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL,
HRegion.DEFAULT_CACHE_FLUSH_INTERVAL);
conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0L);
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
String indexName = generateUniqueName();
createTable(dataTableName);
//increment to make sure we don't "look back" past table creation
injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
populateTable(dataTableName);
createIndex(dataTableName, indexName, 1);
injectEdge.incrementValue(1);
long afterFirstInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
TableName dataTable = TableName.valueOf(dataTableName);
TableName indexTable = TableName.valueOf(indexName);
assertTableHasTtl(conn, dataTable, ttl);
assertTableHasTtl(conn, indexTable, ttl);
//first make sure we inserted correctly
String sql = String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
String indexSql = String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
assertRowExistsAtSCN(getUrl(),sql, afterFirstInsertSCN, true);
assertExplainPlan(conn, indexSql, dataTableName, indexName);
assertRowExistsAtSCN(getUrl(),indexSql, afterFirstInsertSCN, true);
int originalRowCount = 2;
assertRawRowCount(conn, dataTable, originalRowCount);
assertRawRowCount(conn, indexTable, originalRowCount);
//force a flush
flush(dataTable);
flush(indexTable);
//flush shouldn't have changed it
assertRawRowCount(conn, dataTable, originalRowCount);
assertRawRowCount(conn, indexTable, originalRowCount);
assertExplainPlan(conn, indexSql, dataTableName, indexName);
long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) -
(EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
if (timeToAdvance > 0) {
injectEdge.incrementValue(timeToAdvance);
}
//make sure it's still on disk
assertRawRowCount(conn, dataTable, originalRowCount);
assertRawRowCount(conn, indexTable, originalRowCount);
injectEdge.incrementValue(1); //get a new timestamp for compaction
majorCompact(dataTable);
majorCompact(indexTable);
//nothing should have been purged by this major compaction
assertRawRowCount(conn, dataTable, originalRowCount);
assertRawRowCount(conn, indexTable, originalRowCount);
//now wait the TTL
timeToAdvance = (ttl * 1000) -
(EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN);
if (timeToAdvance > 0) {
injectEdge.incrementValue(timeToAdvance);
}
//make sure that we can compact away the now-expired rows
majorCompact(dataTable);
majorCompact(indexTable);
//note that before HBase 1.4, we don't have HBASE-17956
// and this will always return 0 whether it's still on-disk or not
assertRawRowCount(conn, dataTable, 0);
assertRawRowCount(conn, indexTable, 0);
} finally{
conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, oldMemstoreFlushInterval);
}
}
@Test(timeout=60000)
public void testRecentMaxVersionsNotCompactedAway() throws Exception {
int versions = 2;
optionBuilder.append("VERSIONS=" + versions);
tableDDLOptions = optionBuilder.toString();
String firstValue = "abc";
String secondValue = "def";
String thirdValue = "ghi";
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
String indexName = generateUniqueName();
createTable(dataTableName);
//increment to make sure we don't "look back" past table creation
injectEdge.incrementValue(WAIT_AFTER_TABLE_CREATION_MILLIS);
populateTable(dataTableName);
createIndex(dataTableName, indexName, versions);
injectEdge.incrementValue(1); //increment by 1 so we can see our write
long afterInsertSCN = EnvironmentEdgeManager.currentTimeMillis();
//make sure table and index metadata is set up right for versions
TableName dataTable = TableName.valueOf(dataTableName);
TableName indexTable = TableName.valueOf(indexName);
assertTableHasVersions(conn, dataTable, versions);
assertTableHasVersions(conn, indexTable, versions);
//check query optimizer is doing what we expect
String dataTableSelectSql =
String.format("SELECT val2 FROM %s WHERE id = 'a'", dataTableName);
String indexTableSelectSql =
String.format("SELECT val2 FROM %s WHERE val1 = 'ab'", dataTableName);
assertExplainPlan(conn, indexTableSelectSql, dataTableName, indexName);
//make sure the data was inserted correctly in the first place
assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, afterInsertSCN, firstValue);
assertRowHasExpectedValueAtSCN(getUrl(), indexTableSelectSql, afterInsertSCN, firstValue);
//force first update to get a distinct ts
injectEdge.incrementValue(1);
updateColumn(conn, dataTableName, "id", "a", "val2", secondValue);
injectEdge.incrementValue(1); //now make update visible
long afterFirstUpdateSCN = EnvironmentEdgeManager.currentTimeMillis();
//force second update to get a distinct ts
injectEdge.incrementValue(1);
updateColumn(conn, dataTableName, "id", "a", "val2", thirdValue);
injectEdge.incrementValue(1);
long afterSecondUpdateSCN = EnvironmentEdgeManager.currentTimeMillis();
injectEdge.incrementValue(1);
//check to make sure we can see all three versions at the appropriate times
String[] allValues = {firstValue, secondValue, thirdValue};
long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN, afterSecondUpdateSCN};
assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
flush(dataTable);
flush(indexTable);
//after flush, check to make sure we can see all three versions at the appropriate times
assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
majorCompact(dataTable);
majorCompact(indexTable);
//after major compaction, check to make sure we can see all three versions
// at the appropriate times
assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
long afterLookbackAgeSCN = EnvironmentEdgeManager.currentTimeMillis();
majorCompact(dataTable);
majorCompact(indexTable);
//empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6
assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
//2 versions of empty column, 2 versions of val2,
// 2 versions of val3 (since we write whole rows to index) = 6
assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6);
//empty column + 1 version each of val1,2 and 3 = 4
assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
//1 version of empty column, 1 version of val2, 1 version of val3 = 3
assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"), 3);
}
}
private void flush(TableName table) throws IOException {
Admin admin = getUtility().getHBaseAdmin();
admin.flush(table);
}
private void majorCompact(TableName table) throws Exception {
TestUtil.majorCompact(getUtility(), table);
}
private void assertMultiVersionLookbacks(String dataTableSelectSql,
String[] values, long[] scns)
throws Exception {
//make sure we can still look back after updating
for (int k = 0; k < values.length; k++){
assertRowHasExpectedValueAtSCN(getUrl(), dataTableSelectSql, scns[k], values[k]);
}
}
private void updateColumn(Connection conn, String dataTableName,
String idColumn, String id, String valueColumn, String value)
throws SQLException {
String upsertSql = String.format("UPSERT INTO %s (%s, %s) VALUES ('%s', '%s')",
dataTableName, idColumn, valueColumn, id, value);
conn.createStatement().execute(upsertSql);
conn.commit();
}
private void createTable(String tableName) throws SQLException {
try(Connection conn = DriverManager.getConnection(getUrl())) {
String createSql = "create table " + tableName +
" (id varchar(10) not null primary key, val1 varchar(10), " +
"val2 varchar(10), val3 varchar(10))" + tableDDLOptions;
conn.createStatement().execute(createSql);
conn.commit();
}
}
private void populateTable(String tableName) throws SQLException {
try(Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
conn.commit();
conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
conn.commit();
}
}
private void createIndex(String dataTableName, String indexTableName, int indexVersions)
throws SQLException {
try(Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
dataTableName + " (val1) include (val2, val3)" +
" VERSIONS=" + indexVersions);
conn.commit();
}
}
public static void assertExplainPlan(Connection conn, String selectSql,
String dataTableFullName, String indexTableFullName) throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
IndexToolIT.assertExplainPlan(false, actualExplainPlan, dataTableFullName, indexTableFullName);
}
}