blob: 49fdba64e597c6cc0fb5ce21f8dca3f5706f6ee6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX;
import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_LOCKED;
import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_UNLOCKED;
import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
import static org.apache.phoenix.util.UpgradeUtil.SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.UpgradeInProgressException;
import org.apache.phoenix.exception.UpgradeRequiredException;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.query.DelegateConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.UpgradeUtil;
import org.junit.Before;
import org.junit.Test;
public class UpgradeIT extends ParallelStatsDisabledIT {
private String tenantId;
@Before
public void generateTenantId() {
tenantId = "T_" + generateUniqueName();
}
@Test
public void testUpgradeForTenantViewWithSameColumnsAsBaseTable() throws Exception {
String tableWithViewName = generateUniqueName();
String viewTableName = generateUniqueName();
testViewUpgrade(true, tenantId, null, tableWithViewName + "1", null, viewTableName + "1", ColumnDiff.EQUAL);
testViewUpgrade(true, tenantId, "TABLESCHEMA", tableWithViewName + "", null, viewTableName + "2",
ColumnDiff.EQUAL);
testViewUpgrade(true, tenantId, null, tableWithViewName + "3", viewTableName + "SCHEMA", viewTableName + "3",
ColumnDiff.EQUAL);
testViewUpgrade(true, tenantId, "TABLESCHEMA", tableWithViewName + "4", viewTableName + "SCHEMA", viewTableName + "4",
ColumnDiff.EQUAL);
testViewUpgrade(true, tenantId, "SAMESCHEMA", tableWithViewName + "5", "SAMESCHEMA", viewTableName + "5",
ColumnDiff.EQUAL);
}
@Test
public void testUpgradeForTenantViewWithMoreColumnsThanBaseTable() throws Exception {
String tableWithViewName = generateUniqueName();
String viewTableName = generateUniqueName();
testViewUpgrade(true, tenantId, null, tableWithViewName + "1", null, viewTableName + "1", ColumnDiff.MORE);
testViewUpgrade(true, tenantId, "TABLESCHEMA", tableWithViewName + "", null, viewTableName + "2",
ColumnDiff.MORE);
testViewUpgrade(true, tenantId, null, tableWithViewName + "3", "VIEWSCHEMA", viewTableName + "3",
ColumnDiff.MORE);
testViewUpgrade(true, tenantId, "TABLESCHEMA", tableWithViewName + "4", "VIEWSCHEMA", viewTableName + "4",
ColumnDiff.MORE);
testViewUpgrade(true, tenantId, "SAMESCHEMA", tableWithViewName + "5", "SAMESCHEMA", viewTableName + "5",
ColumnDiff.MORE);
}
@Test
public void testUpgradeForViewWithSameColumnsAsBaseTable() throws Exception {
String tableWithViewName = generateUniqueName();
String viewTableName = generateUniqueName();
testViewUpgrade(false, null, null, tableWithViewName + "1", null, viewTableName + "1", ColumnDiff.EQUAL);
testViewUpgrade(false, null, "TABLESCHEMA", tableWithViewName + "", null, viewTableName + "2",
ColumnDiff.EQUAL);
testViewUpgrade(false, null, null, tableWithViewName + "3", "VIEWSCHEMA", viewTableName + "3",
ColumnDiff.EQUAL);
testViewUpgrade(false, null, "TABLESCHEMA", tableWithViewName + "4", "VIEWSCHEMA", viewTableName + "4",
ColumnDiff.EQUAL);
testViewUpgrade(false, null, "SAMESCHEMA", tableWithViewName + "5", "SAMESCHEMA", viewTableName + "5",
ColumnDiff.EQUAL);
}
@Test
public void testUpgradeForViewWithMoreColumnsThanBaseTable() throws Exception {
String tableWithViewName = generateUniqueName();
String viewTableName = generateUniqueName();
testViewUpgrade(false, null, null, tableWithViewName + "1", null, viewTableName + "1", ColumnDiff.MORE);
testViewUpgrade(false, null, "TABLESCHEMA", tableWithViewName + "", null, viewTableName + "2", ColumnDiff.MORE);
testViewUpgrade(false, null, null, tableWithViewName + "3", "VIEWSCHEMA", viewTableName + "3", ColumnDiff.MORE);
testViewUpgrade(false, null, "TABLESCHEMA", tableWithViewName + "4", "VIEWSCHEMA", viewTableName + "4",
ColumnDiff.MORE);
testViewUpgrade(false, null, "SAMESCHEMA", tableWithViewName + "5", "SAMESCHEMA", viewTableName + "5",
ColumnDiff.MORE);
}
@Test
public void testSettingBaseColumnCountWhenBaseTableColumnDropped() throws Exception {
String tableWithViewName = generateUniqueName();
String viewTableName = generateUniqueName();
testViewUpgrade(true, tenantId, null, tableWithViewName + "1", null, viewTableName + "1", ColumnDiff.MORE);
testViewUpgrade(true, tenantId, "TABLESCHEMA", tableWithViewName + "", null, viewTableName + "2",
ColumnDiff.LESS);
testViewUpgrade(true, tenantId, null, tableWithViewName + "3", "VIEWSCHEMA", viewTableName + "3",
ColumnDiff.LESS);
testViewUpgrade(true, tenantId, "TABLESCHEMA", tableWithViewName + "4", "VIEWSCHEMA", viewTableName + "4",
ColumnDiff.LESS);
testViewUpgrade(true, tenantId, "SAMESCHEMA", tableWithViewName + "5", "SAMESCHEMA", viewTableName + "5",
ColumnDiff.LESS);
}
@Test
public void testMapTableToNamespaceDuringUpgrade()
throws SQLException, IOException, IllegalArgumentException, InterruptedException {
String[] strings = new String[] { "a", "b", "c", "d" };
try (Connection conn = DriverManager.getConnection(getUrl())) {
String schemaName = "TEST";
String phoenixFullTableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String localIndexName = "LIDX_" + generateUniqueName();
String viewName = "VIEW_" + generateUniqueName();
String viewIndexName = "VIDX_" + generateUniqueName();
String[] tableNames = new String[] { phoenixFullTableName, schemaName + "." + indexName,
schemaName + "." + localIndexName, "diff." + viewName, "test." + viewName, viewName};
String[] viewIndexes = new String[] { "diff." + viewIndexName, "test." + viewIndexName };
conn.createStatement().execute("CREATE TABLE " + phoenixFullTableName
+ "(k VARCHAR PRIMARY KEY, v INTEGER, f INTEGER, g INTEGER NULL, h INTEGER NULL)");
PreparedStatement upsertStmt = conn
.prepareStatement("UPSERT INTO " + phoenixFullTableName + " VALUES(?, ?, 0, 0, 0)");
int i = 1;
for (String str : strings) {
upsertStmt.setString(1, str);
upsertStmt.setInt(2, i++);
upsertStmt.execute();
}
conn.commit();
// creating local index
conn.createStatement()
.execute("create local index " + localIndexName + " on " + phoenixFullTableName + "(K)");
// creating global index
conn.createStatement().execute("create index " + indexName + " on " + phoenixFullTableName + "(k)");
// creating view in schema 'diff'
conn.createStatement().execute("CREATE VIEW diff." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
// creating view in schema 'test'
conn.createStatement().execute("CREATE VIEW test." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
conn.createStatement().execute("CREATE VIEW " + viewName + "(col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
// Creating index on views
conn.createStatement().execute("create index " + viewIndexName + " on diff." + viewName + "(col)");
conn.createStatement().execute("create index " + viewIndexName + " on test." + viewName + "(col)");
// validate data
for (String tableName : tableNames) {
ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName);
for (String str : strings) {
assertTrue(rs.next());
assertEquals(str, rs.getString(1));
}
}
// validate view Index data
for (String viewIndex : viewIndexes) {
ResultSet rs = conn.createStatement().executeQuery("select * from " + viewIndex);
for (String str : strings) {
assertTrue(rs.next());
assertEquals(str, rs.getString(2));
}
}
HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
assertTrue(admin.tableExists(phoenixFullTableName));
assertTrue(admin.tableExists(schemaName + QueryConstants.NAME_SEPARATOR + indexName));
assertTrue(admin.tableExists(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(phoenixFullTableName))));
Properties props = new Properties();
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
props.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.toString(false));
admin.close();
PhoenixConnection phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
UpgradeUtil.upgradeTable(phxConn, phoenixFullTableName);
UpgradeUtil.mapChildViewsToNamespace(phxConn, phoenixFullTableName,props);
phxConn.close();
props = new Properties();
phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
admin = phxConn.getQueryServices().getAdmin();
String hbaseTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(phoenixFullTableName), true)
.getNameAsString();
assertTrue(admin.tableExists(hbaseTableName));
assertTrue(admin.tableExists(Bytes.toBytes(hbaseTableName)));
assertTrue(admin.tableExists(schemaName + QueryConstants.NAMESPACE_SEPARATOR + indexName));
assertTrue(admin.tableExists(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(hbaseTableName))));
i = 0;
// validate data
for (String tableName : tableNames) {
ResultSet rs = phxConn.createStatement().executeQuery("select * from " + tableName);
for (String str : strings) {
assertTrue(rs.next());
assertEquals(str, rs.getString(1));
}
}
// validate view Index data
for (String viewIndex : viewIndexes) {
ResultSet rs = conn.createStatement().executeQuery("select * from " + viewIndex);
for (String str : strings) {
assertTrue(rs.next());
assertEquals(str, rs.getString(2));
}
}
PName tenantId = phxConn.getTenantId();
PName physicalName = PNameFactory.newName(hbaseTableName);
String oldSchemaName = MetaDataUtil.getViewIndexSequenceSchemaName(PNameFactory.newName(phoenixFullTableName),
false);
String newSchemaName = MetaDataUtil.getViewIndexSequenceSchemaName(physicalName, true);
String newSequenceName = MetaDataUtil.getViewIndexSequenceName(physicalName, tenantId, true);
ResultSet rs = phxConn.createStatement()
.executeQuery("SELECT " + PhoenixDatabaseMetaData.CURRENT_VALUE + " FROM "
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " WHERE " + PhoenixDatabaseMetaData.TENANT_ID
+ " IS NULL AND " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" + newSchemaName
+ "' AND " + PhoenixDatabaseMetaData.SEQUENCE_NAME + "='" + newSequenceName + "'");
assertTrue(rs.next());
assertEquals("-32765", rs.getString(1));
rs = phxConn.createStatement().executeQuery("SELECT " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + ","
+ PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + "," + PhoenixDatabaseMetaData.CURRENT_VALUE + " FROM "
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " WHERE " + PhoenixDatabaseMetaData.TENANT_ID
+ " IS NULL AND " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" + oldSchemaName + "'");
assertFalse(rs.next());
phxConn.close();
admin.close();
}
}
@Test
public void testMapMultiTenantTableToNamespaceDuringUpgrade() throws SQLException, SnapshotCreationException,
IllegalArgumentException, IOException, InterruptedException {
String[] strings = new String[] { "a", "b", "c", "d" };
String schemaName1 = "S_" +generateUniqueName(); // TEST
String schemaName2 = "S_" +generateUniqueName(); // DIFF
String phoenixFullTableName = schemaName1 + "." + generateUniqueName();
String hbaseTableName = SchemaUtil.getPhysicalTableName(Bytes.toBytes(phoenixFullTableName), true)
.getNameAsString();
String indexName = "IDX_" + generateUniqueName();
String viewName = "V_" + generateUniqueName();
String viewName1 = "V1_" + generateUniqueName();
String viewIndexName = "V_IDX_" + generateUniqueName();
String tenantViewIndexName = "V1_IDX_" + generateUniqueName();
String[] tableNames = new String[] { phoenixFullTableName, schemaName2 + "." + viewName1, schemaName1 + "." + viewName1, viewName1 };
String[] viewIndexes = new String[] { schemaName1 + "." + viewIndexName, schemaName2 + "." + viewIndexName };
String[] tenantViewIndexes = new String[] { schemaName1 + "." + tenantViewIndexName, schemaName2 + "." + tenantViewIndexName };
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("CREATE TABLE " + phoenixFullTableName
+ "(k VARCHAR not null, v INTEGER not null, f INTEGER, g INTEGER NULL, h INTEGER NULL CONSTRAINT pk PRIMARY KEY(k,v)) MULTI_TENANT=true");
PreparedStatement upsertStmt = conn
.prepareStatement("UPSERT INTO " + phoenixFullTableName + " VALUES(?, ?, 0, 0, 0)");
int i = 1;
for (String str : strings) {
upsertStmt.setString(1, str);
upsertStmt.setInt(2, i++);
upsertStmt.execute();
}
conn.commit();
// creating global index
conn.createStatement().execute("create index " + indexName + " on " + phoenixFullTableName + "(f)");
// creating view in schema 'diff'
conn.createStatement().execute("CREATE VIEW " + schemaName2 + "." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
// creating view in schema 'test'
conn.createStatement().execute("CREATE VIEW " + schemaName1 + "." + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
conn.createStatement().execute("CREATE VIEW " + viewName + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
// Creating index on views
conn.createStatement().execute("create local index " + viewIndexName + " on " + schemaName2 + "." + viewName + "(col)");
conn.createStatement().execute("create local index " + viewIndexName + " on " + schemaName1 + "." + viewName + "(col)");
}
Properties props = new Properties();
String tenantId = strings[0];
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
PreparedStatement upsertStmt = conn
.prepareStatement("UPSERT INTO " + phoenixFullTableName + "(k,v,f,g,h) VALUES(?, ?, 0, 0, 0)");
int i = 1;
for (String str : strings) {
upsertStmt.setString(1, str);
upsertStmt.setInt(2, i++);
upsertStmt.execute();
}
conn.commit();
// creating view in schema 'diff'
conn.createStatement()
.execute("CREATE VIEW " + schemaName2 + "." + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
// creating view in schema 'test'
conn.createStatement()
.execute("CREATE VIEW " + schemaName1 + "." + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
conn.createStatement().execute("CREATE VIEW " + viewName1 + " (col VARCHAR) AS SELECT * FROM " + phoenixFullTableName);
// Creating index on views
conn.createStatement().execute("create index " + tenantViewIndexName + " on " + schemaName2 + "." + viewName1 + "(col)");
conn.createStatement().execute("create index " + tenantViewIndexName + " on " + schemaName1 + "." + viewName1 + "(col)");
}
props = new Properties();
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
props.setProperty(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, Boolean.toString(false));
PhoenixConnection phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
UpgradeUtil.upgradeTable(phxConn, phoenixFullTableName);
UpgradeUtil.mapChildViewsToNamespace(phxConn,phoenixFullTableName,props);
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
int i = 1;
String indexPhysicalTableName = Bytes
.toString(MetaDataUtil.getViewIndexPhysicalName(Bytes.toBytes(hbaseTableName)));
// validate data with tenant
for (String tableName : tableNames) {
assertTableUsed(phxConn, tableName, hbaseTableName);
ResultSet rs = phxConn.createStatement().executeQuery("select * from " + tableName);
assertTrue(rs.next());
do {
assertEquals(i++, rs.getInt(1));
} while (rs.next());
i = 1;
}
// validate view Index data
for (String viewIndex : tenantViewIndexes) {
assertTableUsed(phxConn, viewIndex, indexPhysicalTableName);
ResultSet rs = phxConn.createStatement().executeQuery("select * from " + viewIndex);
assertTrue(rs.next());
do {
assertEquals(i++, rs.getInt(2));
} while (rs.next());
i = 1;
}
phxConn.close();
props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
phxConn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
// validate view Index data
for (String viewIndex : viewIndexes) {
assertTableUsed(phxConn, viewIndex, hbaseTableName);
ResultSet rs = phxConn.createStatement().executeQuery("select * from " + viewIndex);
for (String str : strings) {
assertTrue(rs.next());
assertEquals(str, rs.getString(1));
}
}
phxConn.close();
}
public void assertTableUsed(Connection conn, String phoenixTableName, String hbaseTableName) throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + phoenixTableName);
assertTrue(rs.next());
assertTrue(rs.getString(1).contains(hbaseTableName));
}
@Test
public void testSettingBaseColumnCountForMultipleViewsOnTable() throws Exception {
String baseSchema = "S_" + generateUniqueName();
String baseTable = "T_" + generateUniqueName();
String fullBaseTableName = SchemaUtil.getTableName(baseSchema, baseTable);
try (Connection conn = DriverManager.getConnection(getUrl())) {
String baseTableDDL = "CREATE TABLE " + fullBaseTableName + " (TENANT_ID VARCHAR NOT NULL, PK1 VARCHAR NOT NULL, V1 INTEGER, V2 INTEGER CONSTRAINT NAME_PK PRIMARY KEY(TENANT_ID, PK1)) MULTI_TENANT = true";
conn.createStatement().execute(baseTableDDL);
String[] tenants = new String[] {"T_" + generateUniqueName(), "T_" + generateUniqueName()};
Collections.sort(Arrays.asList(tenants));
String[] tenantViews = new String[] {"V_" + generateUniqueName(), "V_" + generateUniqueName(), "V_" + generateUniqueName()};
Collections.sort(Arrays.asList(tenantViews));
String[] globalViews = new String[] {"G_" + generateUniqueName(), "G_" + generateUniqueName(), "G_" + generateUniqueName()};
Collections.sort(Arrays.asList(globalViews));
for (int i = 0; i < 2; i++) {
// Create views for tenants;
String tenant = tenants[i];
try (Connection tenantConn = createTenantConnection(tenant)) {
String view = tenantViews[0];
// view with its own column
String viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
tenantConn.createStatement().execute(viewDDL);
String addCols = "ALTER VIEW " + view + " ADD COL1 VARCHAR ";
tenantConn.createStatement().execute(addCols);
removeBaseColumnCountKV(tenant, null, view);
// view that has the last base table column removed
view = tenantViews[1];
viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
tenantConn.createStatement().execute(viewDDL);
String droplastBaseCol = "ALTER VIEW " + view + " DROP COLUMN V2";
tenantConn.createStatement().execute(droplastBaseCol);
removeBaseColumnCountKV(tenant, null, view);
// view that has the middle base table column removed
view = tenantViews[2];
viewDDL = "CREATE VIEW " + view + " AS SELECT * FROM " + fullBaseTableName;
tenantConn.createStatement().execute(viewDDL);
String dropMiddileBaseCol = "ALTER VIEW " + view + " DROP COLUMN V1";
tenantConn.createStatement().execute(dropMiddileBaseCol);
removeBaseColumnCountKV(tenant, null, view);
}
}
// create global views
try (Connection globalConn = DriverManager.getConnection(getUrl())) {
String globalView = globalViews[0];
// view with its own column
String viewDDL = "CREATE VIEW " + globalView + " AS SELECT * FROM " + fullBaseTableName;
globalConn.createStatement().execute(viewDDL);
String addCols = "ALTER VIEW " + globalView + " ADD COL1 VARCHAR ";
globalConn.createStatement().execute(addCols);
removeBaseColumnCountKV(null, null, globalView);
// view that has the last base table column removed
globalView = globalViews[1];
viewDDL = "CREATE VIEW " + globalView + " AS SELECT * FROM " + fullBaseTableName;
globalConn.createStatement().execute(viewDDL);
String droplastBaseCol = "ALTER VIEW " + globalView + " DROP COLUMN V2";
globalConn.createStatement().execute(droplastBaseCol);
removeBaseColumnCountKV(null, null, globalView);
// view that has the middle base table column removed
globalView = globalViews[2];
viewDDL = "CREATE VIEW " + globalView + " AS SELECT * FROM " + fullBaseTableName;
globalConn.createStatement().execute(viewDDL);
String dropMiddileBaseCol = "ALTER VIEW " + globalView + " DROP COLUMN V1";
globalConn.createStatement().execute(dropMiddileBaseCol);
removeBaseColumnCountKV(null, null, globalView);
}
// run upgrade
UpgradeUtil.upgradeTo4_5_0(conn.unwrap(PhoenixConnection.class));
// Verify base column counts for tenant specific views
for (int i = 0; i < 2 ; i++) {
String tenantId = tenants[i];
checkBaseColumnCount(tenantId, null, tenantViews[0], 4);
checkBaseColumnCount(tenantId, null, tenantViews[1], DIVERGED_VIEW_BASE_COLUMN_COUNT);
checkBaseColumnCount(tenantId, null, tenantViews[2], DIVERGED_VIEW_BASE_COLUMN_COUNT);
}
// Verify base column count for global views
checkBaseColumnCount(null, null, globalViews[0], 4);
checkBaseColumnCount(null, null, globalViews[1], DIVERGED_VIEW_BASE_COLUMN_COUNT);
checkBaseColumnCount(null, null, globalViews[2], DIVERGED_VIEW_BASE_COLUMN_COUNT);
}
}
private enum ColumnDiff {
MORE, EQUAL, LESS
};
private void testViewUpgrade(boolean tenantView, String tenantId, String baseTableSchema,
String baseTableName, String viewSchema, String viewName, ColumnDiff diff)
throws Exception {
if (tenantView) {
checkNotNull(tenantId);
} else {
checkArgument(tenantId == null);
}
Connection conn = DriverManager.getConnection(getUrl());
String fullViewName = SchemaUtil.getTableName(viewSchema, viewName);
String fullBaseTableName = SchemaUtil.getTableName(baseTableSchema, baseTableName);
try {
int expectedBaseColumnCount;
conn.createStatement().execute(
"CREATE TABLE IF NOT EXISTS " + fullBaseTableName + " ("
+ " TENANT_ID CHAR(15) NOT NULL, " + " PK1 integer NOT NULL, "
+ "PK2 bigint NOT NULL, " + "CF1.V1 VARCHAR, " + "CF2.V2 VARCHAR, "
+ "V3 CHAR(100) ARRAY[4] "
+ " CONSTRAINT NAME_PK PRIMARY KEY (TENANT_ID, PK1, PK2)"
+ " ) MULTI_TENANT= true");
// create a view with same columns as base table.
try (Connection conn2 = getConnection(tenantView, tenantId)) {
conn2.createStatement().execute(
"CREATE VIEW " + fullViewName + " AS SELECT * FROM " + fullBaseTableName);
}
if (diff == ColumnDiff.MORE) {
// add a column to the view
try (Connection conn3 = getConnection(tenantView, tenantId)) {
conn3.createStatement().execute(
"ALTER VIEW " + fullViewName + " ADD VIEW_COL1 VARCHAR");
}
}
if (diff == ColumnDiff.LESS) {
try (Connection conn3 = getConnection(tenantView, tenantId)) {
conn3.createStatement().execute(
"ALTER VIEW " + fullViewName + " DROP COLUMN CF2.V2");
}
expectedBaseColumnCount = DIVERGED_VIEW_BASE_COLUMN_COUNT;
} else {
expectedBaseColumnCount = 6;
}
checkBaseColumnCount(tenantId, viewSchema, viewName, expectedBaseColumnCount);
checkBaseColumnCount(null, baseTableSchema, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
// remove base column count kv so we can check whether the upgrade code is setting the
// base column count correctly.
removeBaseColumnCountKV(tenantId, viewSchema, viewName);
removeBaseColumnCountKV(null, baseTableSchema, baseTableName);
// assert that the removing base column count key value worked correctly.
checkBaseColumnCount(tenantId, viewSchema, viewName, 0);
checkBaseColumnCount(null, baseTableSchema, baseTableName, 0);
// run upgrade
UpgradeUtil.upgradeTo4_5_0(conn.unwrap(PhoenixConnection.class));
checkBaseColumnCount(tenantId, viewSchema, viewName, expectedBaseColumnCount);
checkBaseColumnCount(null, baseTableSchema, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
} finally {
conn.close();
}
}
private static void checkBaseColumnCount(String tenantId, String schemaName, String tableName,
int expectedBaseColumnCount) throws Exception {
checkNotNull(tableName);
Connection conn = DriverManager.getConnection(getUrl());
String sql = SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW;
sql =
String.format(sql, tenantId == null ? " IS NULL " : " = ? ",
schemaName == null ? "IS NULL" : " = ? ");
int paramIndex = 1;
PreparedStatement stmt = conn.prepareStatement(sql);
if (tenantId != null) {
stmt.setString(paramIndex++, tenantId);
}
if (schemaName != null) {
stmt.setString(paramIndex++, schemaName);
}
stmt.setString(paramIndex, tableName);
ResultSet rs = stmt.executeQuery();
assertTrue(rs.next());
assertEquals(expectedBaseColumnCount, rs.getInt(1));
assertFalse(rs.next());
}
private static void
removeBaseColumnCountKV(String tenantId, String schemaName, String tableName)
throws Exception {
byte[] rowKey =
SchemaUtil.getTableKey(tenantId == null ? new byte[0] : Bytes.toBytes(tenantId),
schemaName == null ? new byte[0] : Bytes.toBytes(schemaName),
Bytes.toBytes(tableName));
Put viewColumnDefinitionPut = new Put(rowKey, HConstants.LATEST_TIMESTAMP);
viewColumnDefinitionPut.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES, HConstants.LATEST_TIMESTAMP, null);
try (PhoenixConnection conn =
(DriverManager.getConnection(getUrl())).unwrap(PhoenixConnection.class)) {
try (HTableInterface htable =
conn.getQueryServices().getTable(
Bytes.toBytes(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME))) {
RowMutations mutations = new RowMutations(rowKey);
mutations.add(viewColumnDefinitionPut);
htable.mutateRow(mutations);
}
}
}
@Test
public void testUpgradeRequiredPreventsSQL() throws SQLException {
String tableName = generateUniqueName();
try (Connection conn = getConnection(false, null)) {
conn.createStatement().execute(
"CREATE TABLE " + tableName
+ " (PK1 VARCHAR NOT NULL, PK2 VARCHAR, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2))");
final ConnectionQueryServices delegate = conn.unwrap(PhoenixConnection.class).getQueryServices();
ConnectionQueryServices servicesWithUpgrade = new DelegateConnectionQueryServices(delegate) {
@Override
public boolean isUpgradeRequired() {
return true;
}
};
try (PhoenixConnection phxConn = new PhoenixConnection(servicesWithUpgrade,
conn.unwrap(PhoenixConnection.class), HConstants.LATEST_TIMESTAMP)) {
try {
phxConn.createStatement().execute(
"CREATE TABLE " + generateUniqueName()
+ " (k1 VARCHAR NOT NULL, k2 VARCHAR, CONSTRAINT PK PRIMARY KEY(K1,K2))");
fail("CREATE TABLE should have failed with UpgradeRequiredException");
} catch (UpgradeRequiredException expected) {
}
try {
phxConn.createStatement().execute("SELECT * FROM " + tableName);
fail("SELECT should have failed with UpgradeRequiredException");
} catch (UpgradeRequiredException expected) {
}
try {
phxConn.createStatement().execute("DELETE FROM " + tableName);
fail("DELETE should have failed with UpgradeRequiredException");
} catch (UpgradeRequiredException expected) {
}
try {
phxConn.createStatement().execute(
"CREATE INDEX " + tableName + "_IDX ON " + tableName + " (KV1) INCLUDE (KV2)" );
fail("CREATE INDEX should have failed with UpgradeRequiredException");
} catch (UpgradeRequiredException expected) {
}
try {
phxConn.createStatement().execute(
"UPSERT INTO " + tableName + " VALUES ('PK1', 'PK2', 'KV1', 'KV2')" );
fail("UPSERT VALUES should have failed with UpgradeRequiredException");
} catch (UpgradeRequiredException expected) {
}
}
}
}
@Test
public void testUpgradingConnectionBypassesUpgradeRequiredCheck() throws Exception {
String tableName = generateUniqueName();
try (Connection conn = getConnection(false, null)) {
conn.createStatement()
.execute(
"CREATE TABLE "
+ tableName
+ " (PK1 VARCHAR NOT NULL, PK2 VARCHAR, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2))");
final ConnectionQueryServices delegate = conn.unwrap(PhoenixConnection.class).getQueryServices();
ConnectionQueryServices servicesWithUpgrade = new DelegateConnectionQueryServices(delegate) {
@Override
public boolean isUpgradeRequired() {
return true;
}
};
try (PhoenixConnection phxConn = new PhoenixConnection(servicesWithUpgrade,
conn.unwrap(PhoenixConnection.class), HConstants.LATEST_TIMESTAMP)) {
// Because upgrade is required, this SQL should fail.
try {
phxConn.createStatement().executeQuery("SELECT * FROM " + tableName);
fail("SELECT should have failed with UpgradeRequiredException");
} catch (UpgradeRequiredException expected) {
}
// Marking connection as the one running upgrade should let SQL execute fine.
phxConn.setRunningUpgrade(true);
phxConn.createStatement().execute(
"UPSERT INTO " + tableName + " VALUES ('PK1', 'PK2', 'KV1', 'KV2')" );
phxConn.commit();
try (ResultSet rs = phxConn.createStatement().executeQuery("SELECT * FROM " + tableName)) {
assertTrue(rs.next());
assertFalse(rs.next());
}
}
}
}
private void putUnlockKVInSysMutex(byte[] row) throws Exception {
try (Connection conn = getConnection(false, null)) {
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
try (HTableInterface sysMutexTable = services.getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
byte[] qualifier = UPGRADE_MUTEX;
Put put = new Put(row);
put.add(family, qualifier, UPGRADE_MUTEX_UNLOCKED);
sysMutexTable.put(put);
sysMutexTable.flushCommits();
}
}
}
@Test
public void testAcquiringAndReleasingUpgradeMutex() throws Exception {
ConnectionQueryServices services = null;
byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
generateUniqueName());
try (Connection conn = getConnection(false, null)) {
services = conn.unwrap(PhoenixConnection.class).getQueryServices();
putUnlockKVInSysMutex(mutexRowKey);
assertTrue(((ConnectionQueryServicesImpl)services)
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey));
try {
((ConnectionQueryServicesImpl)services)
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey);
fail();
} catch (UpgradeInProgressException expected) {
}
assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
}
}
@Test
public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Exception {
final AtomicBoolean mutexStatus1 = new AtomicBoolean(false);
final AtomicBoolean mutexStatus2 = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(2);
final AtomicInteger numExceptions = new AtomicInteger(0);
ConnectionQueryServices services = null;
final byte[] mutexKey = Bytes.toBytes(generateUniqueName());
try (Connection conn = getConnection(false, null)) {
services = conn.unwrap(PhoenixConnection.class).getQueryServices();
putUnlockKVInSysMutex(mutexKey);
FutureTask<Void> task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey));
FutureTask<Void> task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey));
Thread t1 = new Thread(task1);
t1.setDaemon(true);
Thread t2 = new Thread(task2);
t2.setDaemon(true);
t1.start();
t2.start();
latch.await();
// make sure tasks didn't fail by calling get()
task1.get();
task2.get();
assertTrue("One of the threads should have acquired the mutex", mutexStatus1.get() || mutexStatus2.get());
assertNotEquals("One and only one thread should have acquired the mutex ", mutexStatus1.get(),
mutexStatus2.get());
assertEquals("One and only one thread should have caught UpgradeRequiredException ", 1, numExceptions.get());
}
}
private static class AcquireMutexRunnable implements Callable<Void> {
private final AtomicBoolean acquireStatus;
private final ConnectionQueryServices services;
private final CountDownLatch latch;
private final AtomicInteger numExceptions;
private final byte[] mutexRowKey;
public AcquireMutexRunnable(AtomicBoolean acquireStatus, ConnectionQueryServices services, CountDownLatch latch, AtomicInteger numExceptions, byte[] mutexKey) {
this.acquireStatus = acquireStatus;
this.services = services;
this.latch = latch;
this.numExceptions = numExceptions;
this.mutexRowKey = mutexKey;
}
@Override
public Void call() throws Exception {
try {
((ConnectionQueryServicesImpl)services).acquireUpgradeMutex(
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey);
acquireStatus.set(true);
} catch (UpgradeInProgressException e) {
numExceptions.incrementAndGet();
} finally {
latch.countDown();
}
return null;
}
}
private Connection createTenantConnection(String tenantId) throws SQLException {
Properties props = new Properties();
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
return DriverManager.getConnection(getUrl(), props);
}
private Connection getConnection(boolean tenantSpecific, String tenantId) throws SQLException {
if (tenantSpecific) {
checkNotNull(tenantId);
return createTenantConnection(tenantId);
}
return DriverManager.getConnection(getUrl());
}
}