blob: d1d101f287ef82f08ba99cbe46acc41d784b7ca7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.phoenix.mapreduce.PhoenixTTLTool;
import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
public class PhoenixTTLToolIT extends ParallelStatsDisabledIT {
private final long PHOENIX_TTL_EXPIRE_IN_A_SECOND = 1;
private final long MILLISECOND = 1000;
private final long PHOENIX_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24;
private final String VIEW_PREFIX1 = "V01";
private final String VIEW_PREFIX2 = "V02";
private final String UPSERT_TO_GLOBAL_VIEW_QUERY =
"UPSERT INTO %s (PK1,A,B,C,D) VALUES(1,1,1,1,1)";
private final String UPSERT_TO_LEAF_VIEW_QUERY =
"UPSERT INTO %s (PK1,A,B,C,D,E,F) VALUES(1,1,1,1,1,1,1)";
private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL = "CREATE VIEW %s (" +
"PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
" AS SELECT * FROM %s WHERE ID = '%s' PHOENIX_TTL = %d";
private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)";
private final String TENANT_VIEW_DDL =
"CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
private void verifyNumberOfRowsFromHBaseLevel(String tableName, String regrex, int expectedRows)
throws Exception {
try (Table table = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES)
.getTable(SchemaUtil.getTableNameAsBytes(
SchemaUtil.getSchemaNameFromFullName(tableName),
SchemaUtil.getTableNameFromFullName(tableName)))) {
Filter filter =
new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex));
Scan scan = new Scan();
scan.setFilter(filter);
assertEquals(expectedRows, getRowCount(table,scan));
}
}
private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows,
Connection conn) throws Exception {
String query = "SELECT COUNT(*) FROM " + tableName;
if (tenantId != null) {
query = query + " WHERE TENANT_ID = '" + tenantId + "'";
}
try (Statement stm = conn.createStatement()) {
ResultSet rs = stm.executeQuery(query);
assertTrue(rs.next());
assertEquals(expectedRows, rs.getInt(1));
}
}
private long getRowCount(Table table, Scan scan) throws Exception {
ResultScanner scanner = table.getScanner(scan);
int numMatchingRows = 0;
for (Result result = scanner.next(); result != null; result = scanner.next()) {
numMatchingRows++;
}
scanner.close();
return numMatchingRows;
}
private void createMultiTenantTable(Connection conn, String tableName) throws Exception {
String ddl = "CREATE TABLE " + tableName +
" (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
"PK PRIMARY KEY (TENANT_ID,ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0";
try (Statement stmt = conn.createStatement()) {
stmt.execute(ddl);
}
}
/*
BaseMultiTenantTable
GlobalView1 with TTL(1 ms)
Index1 Index2
Creating 2 tenantViews and Upserting data.
After running the MR job, it should delete all data.
*/
@Test
public void testTenantViewOnGlobalViewWithMoreThanOneIndex() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String indexTable1 = generateUniqueName() + "_IDX";
String indexTable2 = generateUniqueName() + "_IDX";
String globalViewName = schema + "." + generateUniqueName();
String tenant1 = generateUniqueName();
String tenant2 = generateUniqueName();
String tenantView1 = schema + "." + generateUniqueName();
String tenantView2 = schema + "." + generateUniqueName();
String indexTable = "_IDX_" + baseTableFullName;
try (Connection globalConn = DriverManager.getConnection(getUrl());
Connection tenant1Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
Connection tenant2Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
createMultiTenantTable(globalConn, baseTableFullName);
globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
globalViewName, baseTableFullName, VIEW_PREFIX1,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
globalConn.createStatement().execute(
String.format(VIEW_INDEX_DDL, indexTable1, globalViewName, "A,B"));
globalConn.createStatement().execute(
String.format(VIEW_INDEX_DDL, indexTable2, globalViewName, "C,D"));
tenant1Connection.setAutoCommit(true);
tenant2Connection.setAutoCommit(true);
tenant1Connection.createStatement().execute(
String.format(TENANT_VIEW_DDL,tenantView1, globalViewName));
tenant2Connection.createStatement().execute(
String.format(TENANT_VIEW_DDL,tenantView2, globalViewName));
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantView1));
verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantView2));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
// the view has 2 view indexes, so upsert 1 row(base table) will result
// 2 rows(index table)
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
assertEquals(0, status);
verifyNumberOfRows(baseTableFullName, tenant1, 0, globalConn);
verifyNumberOfRows(baseTableFullName, tenant2, 0, globalConn);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 0);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 0);
}
}
/*
BaseMultiTenantTable
GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
Index1 Index2 Index3 Index4
Upserting data to both global views and run the MR job.
It should only delete GlobalView1 data not remove GlobalView2 data.
*/
@Test
public void testGlobalViewWithMoreThanOneIndex() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName1 = schema + "." + generateUniqueName();
String globalViewName2 = schema + "." + generateUniqueName();
String indexTable1 = generateUniqueName() + "_IDX";
String indexTable2 = generateUniqueName() + "_IDX";
String indexTable3 = generateUniqueName() + "_IDX";
String indexTable4 = generateUniqueName() + "_IDX";
String indexTable = "_IDX_" + baseTableFullName;
String tenant1 = generateUniqueName();
String tenant2 = generateUniqueName();
try (Connection globalConn = DriverManager.getConnection(getUrl());
Connection tenant1Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
Connection tenant2Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
createMultiTenantTable(globalConn, baseTableFullName);
globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
globalViewName1, baseTableFullName, VIEW_PREFIX1,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL,
globalViewName2, baseTableFullName, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
globalConn.createStatement().execute(
String.format(VIEW_INDEX_DDL, indexTable1, globalViewName1, "A,B"));
globalConn.createStatement().execute(
String.format(VIEW_INDEX_DDL, indexTable2, globalViewName1, "C,D"));
globalConn.createStatement().execute(
String.format(VIEW_INDEX_DDL, indexTable3, globalViewName2, "A,B"));
globalConn.createStatement().execute(
String.format(VIEW_INDEX_DDL, indexTable4, globalViewName2, "C,D"));
tenant1Connection.setAutoCommit(true);
tenant2Connection.setAutoCommit(true);
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 4);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 4);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
assertEquals(0, status);
verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
}
}
/*
BaseMultiTenantTable
GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
Index1 Index2 Index3 Index4
TenantView1 TenantView2
Upserting data to both global views, and run the MR job.
It should only delete GlobalView1 data not remove GlobalView2 data.
*/
@Test
public void testTenantViewCase() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName1 = schema + "." + generateUniqueName();
String globalViewName2 = schema + "." + generateUniqueName();
String tenantViewName1 = schema + "." + generateUniqueName();
String tenantViewName2 = schema + "." + generateUniqueName();
String indexTable1 = generateUniqueName() + "_IDX";
String indexTable2 = generateUniqueName() + "_IDX";
String indexTable3 = generateUniqueName() + "_IDX";
String indexTable4 = generateUniqueName() + "_IDX";
String indexTable = "_IDX_" + baseTableFullName;
String tenant1 = generateUniqueName();
String tenant2 = generateUniqueName();
try (Connection globalConn = DriverManager.getConnection(getUrl());
Connection tenant1Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
Connection tenant2Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
createMultiTenantTable(globalConn, baseTableFullName);
String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
"A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
" AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
globalConn.createStatement().execute(
String.format(ddl, globalViewName1, VIEW_PREFIX1,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
globalConn.createStatement().execute(
String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
ddl = "CREATE INDEX %s ON %s(%s)";
globalConn.createStatement().execute(
String.format(ddl, indexTable1, globalViewName1, "A,B"));
globalConn.createStatement().execute(
String.format(ddl, indexTable2, globalViewName1, "C,D"));
globalConn.createStatement().execute(
String.format(ddl, indexTable3, globalViewName2, "A,B"));
globalConn.createStatement().execute(
String.format(ddl, indexTable4, globalViewName2, "C,D"));
ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s";
tenant1Connection.createStatement().execute(
String.format(ddl, tenantViewName1, globalViewName1));
tenant1Connection.createStatement().execute(
String.format(ddl, tenantViewName2, globalViewName2));
tenant2Connection.createStatement().execute(
String.format(ddl, tenantViewName1, globalViewName1));
tenant2Connection.createStatement().execute(
String.format(ddl, tenantViewName2, globalViewName2));
tenant1Connection.setAutoCommit(true);
tenant2Connection.setAutoCommit(true);
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 4);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 4);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
assertEquals(0, status);
verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2);
}
}
/*
BaseMultiTenantTable
GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
Upserting data to both global views, and run the MR job.
It should only delete GlobalView1 data not remove GlobalView2 data.
*/
@Test
public void testGlobalViewWithNoIndex() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName1 = schema + "." + generateUniqueName();
String globalViewName2 = schema + "." + generateUniqueName();
String tenant1 = generateUniqueName();
String tenant2 = generateUniqueName();
try (Connection globalConn = DriverManager.getConnection(getUrl());
Connection tenant1Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
Connection tenant2Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
createMultiTenantTable(globalConn, baseTableFullName);
String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
"A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
" AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
globalConn.createStatement().execute(
String.format(ddl, globalViewName1, VIEW_PREFIX1,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
globalConn.createStatement().execute(
String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
tenant1Connection.setAutoCommit(true);
tenant2Connection.setAutoCommit(true);
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
assertEquals(0, status);
verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn);
}
}
/*
BaseTable
GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
Upserting data to both global views, and run the MR job.
It should only delete GlobalView1 data not remove GlobalView2 data.
*/
@Test
public void testGlobalViewOnNonMultiTenantTable() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName1 = schema + "." + generateUniqueName();
String globalViewName2 = schema + "." + generateUniqueName();
try (Connection globalConn = DriverManager.getConnection(getUrl())) {
String ddl = "CREATE TABLE " + baseTableFullName +
" (ID CHAR(10) NOT NULL PRIMARY KEY, NUM BIGINT)";
globalConn.createStatement().execute(ddl);
ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
"A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
" AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d";
globalConn.createStatement().execute(
String.format(ddl, globalViewName1, VIEW_PREFIX1,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
globalConn.createStatement().execute(
String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
globalConn.setAutoCommit(true);
globalConn.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1));
globalConn.createStatement().execute(
String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
assertEquals(0, status);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
}
}
/*
BaseTable
GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY)
Index1 Index2 Index3 Index4
Upserting data to both global views, and run the MR job.
It should only delete GlobalView1 data not remove GlobalView2 data.
*/
@Test
public void testGlobalViewOnNonMultiTenantTableWithIndex() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName1 = schema + "." + generateUniqueName();
String globalViewName2 = schema + "." + generateUniqueName();
String indexTable1 = generateUniqueName() + "_IDX";
String indexTable2 = generateUniqueName() + "_IDX";
String indexTable3 = generateUniqueName() + "_IDX";
String indexTable4 = generateUniqueName() + "_IDX";
String indexTable = "_IDX_" + baseTableFullName;
try (Connection globalConn = DriverManager.getConnection(getUrl())) {
String ddl = "CREATE TABLE " + baseTableFullName +
" (PK1 BIGINT NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " +
"PK PRIMARY KEY (PK1,ID))";
globalConn.createStatement().execute(ddl);
ddl = "CREATE VIEW %s (PK2 BIGINT PRIMARY KEY, " +
"A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
" AS SELECT * FROM " + baseTableFullName + " WHERE PK1=%d PHOENIX_TTL = %d";
globalConn.createStatement().execute(
String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
globalConn.createStatement().execute(
String.format(ddl, globalViewName2, 2, PHOENIX_TTL_EXPIRE_IN_A_DAY));
ddl = "CREATE INDEX %s ON %s(%s)";
globalConn.createStatement().execute(
String.format(ddl, indexTable1, globalViewName1, "A,ID,B"));
globalConn.createStatement().execute(
String.format(ddl, indexTable2, globalViewName1, "C,ID,D"));
globalConn.createStatement().execute(
String.format(ddl, indexTable3, globalViewName2, "A,ID,B"));
globalConn.createStatement().execute(
String.format(ddl, indexTable4, globalViewName2, "C,ID,D"));
globalConn.setAutoCommit(true);
String query = "UPSERT INTO %s (PK2,A,B,C,D,ID) VALUES(1,1,1,1,1,'%s')";
globalConn.createStatement().execute(
String.format(query, globalViewName1, VIEW_PREFIX1));
globalConn.createStatement().execute(
String.format(query, globalViewName2, VIEW_PREFIX2));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX1 + ".*", 2);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX2 + ".*", 2);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
assertEquals(0, status);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX1 + ".*", 0);
verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX2 + ".*", 2);
}
}
/*
BaseMultiTenantTable
GlobalView1
TenantView1 TenantView2
*/
@Test
public void testDeleteByViewAndTenant() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName1 = schema + "." + generateUniqueName();
String tenantViewName1 = schema + "." + generateUniqueName();
String tenantViewName2 = schema + "." + generateUniqueName();
String tenant1 = generateUniqueName();
try (Connection globalConn = DriverManager.getConnection(getUrl());
Connection tenant1Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1)) {
createMultiTenantTable(globalConn, baseTableFullName);
String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
"A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
" AS SELECT * FROM " + baseTableFullName + " WHERE NUM = 1";
globalConn.createStatement().execute(String.format(ddl, globalViewName1));
ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " +
"WHERE ID = '%s' PHOENIX_TTL = %d";
tenant1Connection.createStatement().execute(
String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
tenant1Connection.createStatement().execute(
String.format(ddl, tenantViewName2, globalViewName1, VIEW_PREFIX2,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
tenant1Connection.setAutoCommit(true);
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-v", tenantViewName2, "-i", tenant1});
assertEquals(0, status);
verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 0);
}
}
/*
BaseMultiTenantTable
GlobalView1 GlobalView1
TenantView1 TenantView2 TenantView1 TenantView2
*/
@Test
public void testDeleteByTenant() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName1 = schema + "." + generateUniqueName();
String globalViewName2 = schema + "." + generateUniqueName();
String tenantViewName1 = schema + "." + generateUniqueName();
String tenantViewName2 = schema + "." + generateUniqueName();
String tenantViewName3 = schema + "." + generateUniqueName();
String tenantViewName4 = schema + "." + generateUniqueName();
String tenant1 = generateUniqueName();
String tenant2 = generateUniqueName();
try (Connection globalConn = DriverManager.getConnection(getUrl());
Connection tenant1Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
Connection tenant2Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
createMultiTenantTable(globalConn, baseTableFullName);
String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
"A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
" AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d";
globalConn.createStatement().execute(String.format(ddl, globalViewName1, 1));
globalConn.createStatement().execute(String.format(ddl, globalViewName2, 2));
ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " +
"WHERE ID = '%s' PHOENIX_TTL = %d";
tenant1Connection.createStatement().execute(
String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
tenant1Connection.createStatement().execute(
String.format(ddl, tenantViewName2, globalViewName2, VIEW_PREFIX2,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
tenant2Connection.createStatement().execute(
String.format(ddl, tenantViewName3, globalViewName1, VIEW_PREFIX1,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
tenant2Connection.createStatement().execute(
String.format(ddl, tenantViewName4, globalViewName2, VIEW_PREFIX2,
PHOENIX_TTL_EXPIRE_IN_A_SECOND));
tenant1Connection.setAutoCommit(true);
tenant2Connection.setAutoCommit(true);
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName3));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName4));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant1 + ".*", 2);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant2 + ".*", 2);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-i", tenant1});
assertEquals(0, status);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant2 + ".*", 2);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant1 + ".*", 0);
}
}
/*
BaseMultiTenantTable
GlobalView1 GlobalView1
TenantView1 TenantView2 TenantView1 TenantView2
*/
@Test
public void testDeleteByViewName() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName1 = schema + "." + generateUniqueName();
String globalViewName2 = schema + "." + generateUniqueName();
String tenantViewName1 = schema + "." + generateUniqueName();
String tenantViewName2 = schema + "." + generateUniqueName();
String tenantViewName3 = schema + "." + generateUniqueName();
String tenantViewName4 = schema + "." + generateUniqueName();
String tenant1 = generateUniqueName();
String tenant2 = generateUniqueName();
try (Connection globalConn = DriverManager.getConnection(getUrl());
Connection tenant1Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1);
Connection tenant2Connection =
PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) {
createMultiTenantTable(globalConn, baseTableFullName);
String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
"A BIGINT, B BIGINT, C BIGINT, D BIGINT)" +
" AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d PHOENIX_TTL = %d";
globalConn.createStatement().execute(
String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
globalConn.createStatement().execute(
String.format(ddl, globalViewName2, 2, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s WHERE ID = '%s'";
tenant1Connection.createStatement().execute(
String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1));
tenant1Connection.createStatement().execute(
String.format(ddl, tenantViewName2, globalViewName2, VIEW_PREFIX2));
tenant2Connection.createStatement().execute(
String.format(ddl, tenantViewName3, globalViewName1, VIEW_PREFIX1));
tenant2Connection.createStatement().execute(
String.format(ddl, tenantViewName4, globalViewName2, VIEW_PREFIX2));
tenant1Connection.setAutoCommit(true);
tenant2Connection.setAutoCommit(true);
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1));
tenant1Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName3));
tenant2Connection.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName4));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 2);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 2);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-v", globalViewName1});
assertEquals(0, status);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 2);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
}
}
/*
BaseTable
GlobalView1 with TTL
MiddleLevelView1 with TTL(1 ms) MiddleLevelView2 with TTL(1 DAY)
LeafView1 LeafView2
Upserting data to both leafView, and run the MR job.
It should only delete MiddleLevelView1 data not remove MiddleLevelView2 data.
*/
@Test
public void testCleanMoreThanThreeLevelViewCase() throws Exception {
String schema = generateUniqueName();
String baseTableFullName = schema + "." + generateUniqueName();
String globalViewName = schema + "." + generateUniqueName();
String middleLevelViewName1 = schema + "." + generateUniqueName();
String middleLevelViewName2 = schema + "." + generateUniqueName();
String leafViewName1 = schema + "." + generateUniqueName();
String leafViewName2 = schema + "." + generateUniqueName();
try (Connection globalConn = DriverManager.getConnection(getUrl())) {
String baseTableDdl = "CREATE TABLE " + baseTableFullName +
" (ID CHAR(10) NOT NULL PRIMARY KEY, NUM BIGINT)";
globalConn.createStatement().execute(baseTableDdl);
String globalViewDdl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " +
"A BIGINT, B BIGINT)" + " AS SELECT * FROM " + baseTableFullName;
globalConn.createStatement().execute(String.format(globalViewDdl, globalViewName));
String middleLevelViewDdl = "CREATE VIEW %s (C BIGINT, D BIGINT)" +
" AS SELECT * FROM %s WHERE ID ='%s' PHOENIX_TTL = %d";
globalConn.createStatement().execute(String.format(middleLevelViewDdl,
middleLevelViewName1, globalViewName,
VIEW_PREFIX1, PHOENIX_TTL_EXPIRE_IN_A_SECOND));
globalConn.createStatement().execute(String.format(middleLevelViewDdl,
middleLevelViewName2, globalViewName, VIEW_PREFIX2,
PHOENIX_TTL_EXPIRE_IN_A_DAY));
String leafViewDdl = "CREATE VIEW %s (E BIGINT, F BIGINT)" +
" AS SELECT * FROM %s";
globalConn.createStatement().execute(String.format(leafViewDdl,
leafViewName1, middleLevelViewName1));
globalConn.createStatement().execute(String.format(leafViewDdl,
leafViewName2, middleLevelViewName2));
globalConn.setAutoCommit(true);
globalConn.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, leafViewName1));
globalConn.createStatement().execute(
String.format(UPSERT_TO_LEAF_VIEW_QUERY, leafViewName2));
// wait the row to be expired and index to be updated
Thread.sleep(PHOENIX_TTL_EXPIRE_IN_A_SECOND * MILLISECOND);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
// running MR job to delete expired rows.
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
assertEquals(0, status);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1);
verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0);
}
}
@Test
public void testNoViewCase() throws Exception {
PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
phoenixTtlTool.setConf(conf);
int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"});
assertEquals(0, status);
}
}