blob: 62fefc72cb673a842ad70b2ef88f82aa36a28875 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.util;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.CURRENT_CLIENT_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
import static org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.LinkType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDecimal;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PFloat;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class UpgradeUtil {
private static final Logger logger = LoggerFactory.getLogger(UpgradeUtil.class);
private static final byte[] SEQ_PREFIX_BYTES = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("_SEQ_"));
public static final byte[] UPGRADE_TO_4_7_COLUMN_NAME = Bytes.toBytes("UPGRADE_TO_4_7");
/**
* Attribute for Phoenix's internal purposes only. When this attribute is set on a phoenix connection, then
* the upgrade code for upgrading the cluster to the new minor release is not triggered. Note that presence
* of this attribute overrides a true value for {@value QueryServices#AUTO_UPGRADE_ENABLED}.
*/
private static final String DO_NOT_UPGRADE = "DoNotUpgrade";
public static String UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW = "UPSERT "
+ "INTO SYSTEM.CATALOG "
+ "(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, BASE_COLUMN_COUNT) "
+ "VALUES (?, ?, ?, ?, ?, ?) ";
public static String SELECT_BASE_COLUMN_COUNT_FROM_HEADER_ROW = "SELECT "
+ "BASE_COLUMN_COUNT "
+ "FROM SYSTEM.CATALOG "
+ "WHERE "
+ "COLUMN_NAME IS NULL "
+ "AND "
+ "COLUMN_FAMILY IS NULL "
+ "AND "
+ "TENANT_ID %s "
+ "AND "
+ "TABLE_SCHEM %s "
+ "AND "
+ "TABLE_NAME = ? "
;
private static final String UPDATE_LINK =
"UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
TENANT_ID + "," +
TABLE_SCHEM + "," +
TABLE_NAME + "," +
COLUMN_FAMILY + "," +
LINK_TYPE + "," +
TABLE_SEQ_NUM +"," +
TABLE_TYPE +
") SELECT " + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + ",'%s' AS "
+ COLUMN_FAMILY + " ," + LINK_TYPE + "," + TABLE_SEQ_NUM + "," + TABLE_TYPE +" FROM " + SYSTEM_CATALOG_SCHEMA + ".\""
+ SYSTEM_CATALOG_TABLE + "\" WHERE (" + TABLE_SCHEM + "=? OR (" + TABLE_SCHEM + " IS NULL AND ? IS NULL)) AND " + TABLE_NAME + "=? AND " + COLUMN_FAMILY + "=? AND " + LINK_TYPE + " = "
+ LinkType.PHYSICAL_TABLE.getSerializedValue();
private static final String DELETE_LINK = "DELETE FROM " + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_CATALOG_TABLE
+ " WHERE (" + TABLE_SCHEM + "=? OR (" + TABLE_SCHEM + " IS NULL AND ? IS NULL)) AND " + TABLE_NAME + "=? AND " + COLUMN_FAMILY + "=? AND " + LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue();
private static final String GET_VIEWS_QUERY = "SELECT " + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME
+ " FROM " + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_CATALOG_TABLE + " WHERE " + COLUMN_FAMILY + " = ? AND "
+ LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue() + " AND ( " + TABLE_TYPE + "=" + "'"
+ PTableType.VIEW.getSerializedValue() + "' OR " + TABLE_TYPE + " IS NULL) ORDER BY "+TENANT_ID;
private UpgradeUtil() {
}
private static byte[] getSequenceSnapshotName() {
return Bytes.toBytes("_BAK_" + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME);
}
private static void createSequenceSnapshot(HBaseAdmin admin, PhoenixConnection conn) throws SQLException {
byte[] tableName = getSequenceSnapshotName();
HColumnDescriptor columnDesc = new HColumnDescriptor(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES);
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
desc.addFamily(columnDesc);
try {
admin.createTable(desc);
copyTable(conn, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, tableName);
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
}
private static void restoreSequenceSnapshot(HBaseAdmin admin, PhoenixConnection conn) throws SQLException {
byte[] tableName = getSequenceSnapshotName();
copyTable(conn, tableName, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES);
}
private static void deleteSequenceSnapshot(HBaseAdmin admin) throws SQLException {
byte[] tableName = getSequenceSnapshotName();
try {
admin.disableTable(tableName);;
admin.deleteTable(tableName);
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
}
@SuppressWarnings("deprecation")
private static void copyTable(PhoenixConnection conn, byte[] sourceName, byte[] targetName) throws SQLException {
int batchSizeBytes = 100 * 1024; // 100K chunks
int sizeBytes = 0;
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000);
Scan scan = new Scan();
scan.setRaw(true);
scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
ResultScanner scanner = null;
HTableInterface source = null;
HTableInterface target = null;
try {
source = conn.getQueryServices().getTable(sourceName);
target = conn.getQueryServices().getTable(targetName);
scanner = source.getScanner(scan);
Result result;
while ((result = scanner.next()) != null) {
for (KeyValue keyValue : result.raw()) {
sizeBytes += keyValue.getLength();
if (KeyValue.Type.codeToType(keyValue.getType()) == KeyValue.Type.Put) {
// Put new value
Put put = new Put(keyValue.getRow());
put.add(keyValue);
mutations.add(put);
} else if (KeyValue.Type.codeToType(keyValue.getType()) == KeyValue.Type.Delete){
// Copy delete marker using new key so that it continues
// to delete the key value preceding it that will be updated
// as well.
Delete delete = new Delete(keyValue.getRow());
delete.addDeleteMarker(keyValue);
mutations.add(delete);
}
}
if (sizeBytes >= batchSizeBytes) {
logger.info("Committing bactch of temp rows");
target.batch(mutations);
mutations.clear();
sizeBytes = 0;
}
}
if (!mutations.isEmpty()) {
logger.info("Committing last bactch of temp rows");
target.batch(mutations);
}
logger.info("Successfully completed copy");
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw ServerUtil.parseServerException(e);
} finally {
try {
if (scanner != null) scanner.close();
} finally {
try {
if (source != null) source.close();
} catch (IOException e) {
logger.warn("Exception during close of source table",e);
} finally {
try {
if (target != null) target.close();
} catch (IOException e) {
logger.warn("Exception during close of target table",e);
}
}
}
}
}
private static void preSplitSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws SQLException {
HBaseAdmin admin = conn.getQueryServices().getAdmin();
boolean snapshotCreated = false;
boolean success = false;
try {
if (nSaltBuckets <= 0) {
return;
}
logger.warn("Pre-splitting SYSTEM.SEQUENCE table " + nSaltBuckets + "-ways. This may take some time - please do not close window.");
HTableDescriptor desc = admin.getTableDescriptor(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES);
createSequenceSnapshot(admin, conn);
snapshotCreated = true;
admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME);
admin.deleteTable(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME);
byte[][] splitPoints = SaltingUtil.getSalteByteSplitPoints(nSaltBuckets);
admin.createTable(desc, splitPoints);
restoreSequenceSnapshot(admin, conn);
success = true;
logger.warn("Completed pre-splitting SYSTEM.SEQUENCE table");
} catch (IOException e) {
throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e);
} finally {
try {
if (snapshotCreated && success) {
try {
deleteSequenceSnapshot(admin);
} catch (SQLException e) {
logger.warn("Exception while deleting SYSTEM.SEQUENCE snapshot during pre-split", e);
}
}
} finally {
try {
admin.close();
} catch (IOException e) {
logger.warn("Exception while closing admin during pre-split", e);
}
}
}
}
public static PhoenixConnection upgradeLocalIndexes(PhoenixConnection metaConnection)
throws SQLException, IOException, org.apache.hadoop.hbase.TableNotFoundException {
Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
Long originalScn = null;
String str = props.getProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB);
if (str != null) {
originalScn = Long.valueOf(str);
}
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP));
PhoenixConnection globalConnection = null;
PhoenixConnection toReturn = null;
globalConnection = new PhoenixConnection(metaConnection, metaConnection.getQueryServices(), props);
SQLException sqlEx = null;
try (HBaseAdmin admin = globalConnection.getQueryServices().getAdmin()) {
ResultSet rs = globalConnection.createStatement().executeQuery("SELECT TABLE_SCHEM, TABLE_NAME, DATA_TABLE_NAME, TENANT_ID, MULTI_TENANT, SALT_BUCKETS FROM SYSTEM.CATALOG "
+ " WHERE COLUMN_NAME IS NULL"
+ " AND COLUMN_FAMILY IS NULL"
+ " AND INDEX_TYPE=" + IndexType.LOCAL.getSerializedValue());
boolean droppedLocalIndexes = false;
while (rs.next()) {
if(!droppedLocalIndexes) {
HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
String localIndexSplitter = LocalIndexSplitter.class.getName();
for (HTableDescriptor table : localIndexTables) {
HTableDescriptor dataTableDesc = admin.getTableDescriptor(TableName.valueOf(MetaDataUtil.getLocalIndexUserTableName(table.getNameAsString())));
HColumnDescriptor[] columnFamilies = dataTableDesc.getColumnFamilies();
boolean modifyTable = false;
for(HColumnDescriptor cf : columnFamilies) {
String localIndexCf = QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX+cf.getNameAsString();
if(dataTableDesc.getFamily(Bytes.toBytes(localIndexCf))==null){
HColumnDescriptor colDef =
new HColumnDescriptor(localIndexCf);
for(Entry<ImmutableBytesWritable, ImmutableBytesWritable>keyValue: cf.getValues().entrySet()){
colDef.setValue(keyValue.getKey().copyBytes(), keyValue.getValue().copyBytes());
}
dataTableDesc.addFamily(colDef);
modifyTable = true;
}
}
List<String> coprocessors = dataTableDesc.getCoprocessors();
for(String coprocessor: coprocessors) {
if(coprocessor.equals(localIndexSplitter)) {
dataTableDesc.removeCoprocessor(localIndexSplitter);
modifyTable = true;
}
}
if(modifyTable) {
admin.modifyTable(dataTableDesc.getName(), dataTableDesc);
}
}
admin.disableTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
admin.deleteTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*");
droppedLocalIndexes = true;
}
String schemaName = rs.getString(1);
String indexTableName = rs.getString(2);
String dataTableName = rs.getString(3);
String tenantId = rs.getString(4);
boolean multiTenantTable = rs.getBoolean(5);
int numColumnsToSkip = 1 + (multiTenantTable ? 1 : 0);
String getColumns =
"SELECT COLUMN_NAME, COLUMN_FAMILY FROM SYSTEM.CATALOG WHERE TABLE_SCHEM "
+ (schemaName == null ? "IS NULL " : "='" + schemaName+ "'")
+ " AND TENANT_ID "+(tenantId == null ? "IS NULL " : "='" + tenantId + "'")
+ " and TABLE_NAME='" + indexTableName
+ "' AND COLUMN_NAME IS NOT NULL AND KEY_SEQ > "+ numColumnsToSkip +" ORDER BY KEY_SEQ";
ResultSet getColumnsRs = globalConnection.createStatement().executeQuery(getColumns);
List<String> indexedColumns = new ArrayList<String>(1);
List<String> coveredColumns = new ArrayList<String>(1);
while (getColumnsRs.next()) {
String column = getColumnsRs.getString(1);
String columnName = IndexUtil.getDataColumnName(column);
String columnFamily = IndexUtil.getDataColumnFamilyName(column);
if (getColumnsRs.getString(2) == null) {
if (columnFamily != null && !columnFamily.isEmpty()) {
if (SchemaUtil.normalizeIdentifier(columnFamily).equals(QueryConstants.DEFAULT_COLUMN_FAMILY)) {
indexedColumns.add(columnName);
} else {
indexedColumns.add(SchemaUtil.getCaseSensitiveColumnDisplayName(
columnFamily, columnName));
}
} else {
indexedColumns.add(columnName);
}
} else {
coveredColumns.add(SchemaUtil.normalizeIdentifier(columnFamily)
.equals(QueryConstants.DEFAULT_COLUMN_FAMILY) ? columnName
: SchemaUtil.getCaseSensitiveColumnDisplayName(
columnFamily, columnName));
}
}
StringBuilder createIndex = new StringBuilder("CREATE LOCAL INDEX ");
createIndex.append(indexTableName);
createIndex.append(" ON ");
createIndex.append(SchemaUtil.getTableName(schemaName, dataTableName));
createIndex.append("(");
for (int i = 0; i < indexedColumns.size(); i++) {
createIndex.append(indexedColumns.get(i));
if (i < indexedColumns.size() - 1) {
createIndex.append(",");
}
}
createIndex.append(")");
if (!coveredColumns.isEmpty()) {
createIndex.append(" INCLUDE(");
for (int i = 0; i < coveredColumns.size(); i++) {
createIndex.append(coveredColumns.get(i));
if (i < coveredColumns.size() - 1) {
createIndex.append(",");
}
}
createIndex.append(")");
}
createIndex.append(" ASYNC");
logger.info("Index creation query is : " + createIndex.toString());
logger.info("Dropping the index " + indexTableName
+ " to clean up the index details from SYSTEM.CATALOG.");
PhoenixConnection localConnection = null;
if (tenantId != null) {
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
localConnection = new PhoenixConnection(globalConnection, globalConnection.getQueryServices(), props);
}
try {
(localConnection == null ? globalConnection : localConnection).createStatement().execute(
"DROP INDEX IF EXISTS " + indexTableName + " ON "
+ SchemaUtil.getTableName(schemaName, dataTableName));
logger.info("Recreating the index " + indexTableName);
(localConnection == null ? globalConnection : localConnection).createStatement().execute(createIndex.toString());
logger.info("Created the index " + indexTableName);
} finally {
props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
if (localConnection != null) {
sqlEx = closeConnection(localConnection, sqlEx);
if (sqlEx != null) {
throw sqlEx;
}
}
}
}
globalConnection.createStatement().execute("DELETE FROM SYSTEM.CATALOG WHERE SUBSTR(TABLE_NAME,0,11)='"+MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+"'");
if (originalScn != null) {
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(originalScn));
}
toReturn = new PhoenixConnection(globalConnection, globalConnection.getQueryServices(), props);
} catch (SQLException e) {
sqlEx = e;
} finally {
sqlEx = closeConnection(metaConnection, sqlEx);
sqlEx = closeConnection(globalConnection, sqlEx);
if (sqlEx != null) {
throw sqlEx;
}
}
return toReturn;
}
public static PhoenixConnection disableViewIndexes(PhoenixConnection connParam) throws SQLException, IOException, InterruptedException, TimeoutException {
Properties props = PropertiesUtil.deepCopy(connParam.getClientInfo());
Long originalScn = null;
String str = props.getProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB);
if (str != null) {
originalScn = Long.valueOf(str);
}
// don't use the passed timestamp as scn because we want to query all view indexes up to now.
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(HConstants.LATEST_TIMESTAMP));
Set<String> physicalTables = new HashSet<>();
SQLException sqlEx = null;
PhoenixConnection globalConnection = null;
PhoenixConnection toReturn = null;
try {
globalConnection = new PhoenixConnection(connParam, connParam.getQueryServices(), props);
String tenantId = null;
try (HBaseAdmin admin = globalConnection.getQueryServices().getAdmin()) {
String fetchViewIndexes = "SELECT " + TENANT_ID + ", " + TABLE_SCHEM + ", " + TABLE_NAME +
", " + DATA_TABLE_NAME + " FROM " + SYSTEM_CATALOG_NAME + " WHERE " + VIEW_INDEX_ID
+ " IS NOT NULL";
String disableIndexDDL = "ALTER INDEX %s ON %s DISABLE";
try (ResultSet rs = globalConnection.createStatement().executeQuery(fetchViewIndexes)) {
while (rs.next()) {
tenantId = rs.getString(1);
String indexSchema = rs.getString(2);
String indexName = rs.getString(3);
String viewName = rs.getString(4);
String fullIndexName = SchemaUtil.getTableName(indexSchema, indexName);
String fullViewName = SchemaUtil.getTableName(indexSchema, viewName);
PTable viewPTable = null;
// Disable the view index and truncate the underlying hbase table.
// Users would need to rebuild the view indexes.
if (tenantId != null && !tenantId.isEmpty()) {
Properties newProps = PropertiesUtil.deepCopy(globalConnection.getClientInfo());
newProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
PTable indexPTable = null;
try (PhoenixConnection tenantConnection = new PhoenixConnection(globalConnection, globalConnection.getQueryServices(), newProps)) {
viewPTable = PhoenixRuntime.getTable(tenantConnection, fullViewName);
tenantConnection.createStatement().execute(String.format(disableIndexDDL, indexName, fullViewName));
indexPTable = PhoenixRuntime.getTable(tenantConnection, fullIndexName);
}
int offset = indexPTable.getBucketNum() != null ? 1 : 0;
int existingTenantIdPosition = ++offset; // positions are stored 1 based
int existingViewIdxIdPosition = ++offset;
int newTenantIdPosition = existingViewIdxIdPosition;
int newViewIdxPosition = existingTenantIdPosition;
String tenantIdColumn = indexPTable.getColumns().get(existingTenantIdPosition - 1).getName().getString();
int index = 0;
String updatePosition =
"UPSERT INTO "
+ SYSTEM_CATALOG_NAME
+ " ( "
+ TENANT_ID
+ ","
+ TABLE_SCHEM
+ ","
+ TABLE_NAME
+ ","
+ COLUMN_NAME
+ ","
+ COLUMN_FAMILY
+ ","
+ ORDINAL_POSITION
+ ") SELECT "
+ TENANT_ID
+ ","
+ TABLE_SCHEM
+ ","
+ TABLE_NAME
+ ","
+ COLUMN_NAME
+ ","
+ COLUMN_FAMILY
+ ","
+ "?"
+ " FROM "
+ SYSTEM_CATALOG_NAME
+ " WHERE "
+ TENANT_ID
+ " = ? "
+ " AND "
+ TABLE_NAME
+ " = ? "
+ " AND "
+ (indexSchema == null ? TABLE_SCHEM + " IS NULL" : TABLE_SCHEM + " = ? ")
+ " AND "
+ COLUMN_NAME
+ " = ? ";
// update view index position
try (PreparedStatement s = globalConnection.prepareStatement(updatePosition)) {
index = 0;
s.setInt(++index, newViewIdxPosition);
s.setString(++index, tenantId);
s.setString(++index, indexName);
if (indexSchema != null) {
s.setString(++index, indexSchema);
}
s.setString(++index, MetaDataUtil.getViewIndexIdColumnName());
s.executeUpdate();
}
// update tenant id position
try (PreparedStatement s = globalConnection.prepareStatement(updatePosition)) {
index = 0;
s.setInt(++index, newTenantIdPosition);
s.setString(++index, tenantId);
s.setString(++index, indexName);
if (indexSchema != null) {
s.setString(++index, indexSchema);
}
s.setString(++index, tenantIdColumn);
s.executeUpdate();
}
globalConnection.commit();
} else {
viewPTable = PhoenixRuntime.getTable(globalConnection, fullViewName);
globalConnection.createStatement().execute(String.format(disableIndexDDL, indexName, fullViewName));
}
String indexPhysicalTableName = MetaDataUtil.getViewIndexTableName(viewPTable.getPhysicalName().getString());
if (physicalTables.add(indexPhysicalTableName)) {
final TableName tableName = TableName.valueOf(indexPhysicalTableName);
admin.disableTable(tableName);
admin.truncateTable(tableName, false);
}
}
}
}
if (originalScn != null) {
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(originalScn));
}
toReturn = new PhoenixConnection(globalConnection, globalConnection.getQueryServices(), props);
} catch (SQLException e) {
sqlEx = e;
} finally {
sqlEx = closeConnection(connParam, sqlEx);
sqlEx = closeConnection(globalConnection, sqlEx);
if (sqlEx != null) {
throw sqlEx;
}
}
return toReturn;
}
public static SQLException closeConnection(PhoenixConnection conn, SQLException sqlEx) {
SQLException toReturn = sqlEx;
try {
conn.close();
} catch (SQLException e) {
if (toReturn != null) {
toReturn.setNextException(e);
} else {
toReturn = e;
}
}
return toReturn;
}
@SuppressWarnings("deprecation")
public static boolean upgradeSequenceTable(PhoenixConnection conn, int nSaltBuckets, PTable oldTable) throws SQLException {
logger.info("Upgrading SYSTEM.SEQUENCE table");
byte[] seqTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE);
HTableInterface sysTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
try {
logger.info("Setting SALT_BUCKETS property of SYSTEM.SEQUENCE to " + SaltingUtil.MAX_BUCKET_NUM);
KeyValue saltKV = KeyValueUtil.newKeyValue(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
PInteger.INSTANCE.toBytes(nSaltBuckets));
Put saltPut = new Put(seqTableKey);
saltPut.add(saltKV);
// Prevent multiple clients from doing this upgrade
if (!sysTable.checkAndPut(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES, null, saltPut)) {
if (oldTable == null) { // Unexpected, but to be safe just run pre-split code
preSplitSequenceTable(conn, nSaltBuckets);
return true;
}
// If upgrading from 4.2.0, then we need this special case of pre-splitting the table.
// This is needed as a fix for https://issues.apache.org/jira/browse/PHOENIX-1401
if (oldTable.getTimeStamp() == MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0) {
byte[] oldSeqNum = PLong.INSTANCE.toBytes(oldTable.getSequenceNumber());
KeyValue seqNumKV = KeyValueUtil.newKeyValue(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
PLong.INSTANCE.toBytes(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
Put seqNumPut = new Put(seqTableKey);
seqNumPut.add(seqNumKV);
// Increment TABLE_SEQ_NUM in checkAndPut as semaphore so that only single client
// pre-splits the sequence table.
if (sysTable.checkAndPut(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES, oldSeqNum, seqNumPut)) {
preSplitSequenceTable(conn, nSaltBuckets);
return true;
}
}
logger.info("SYSTEM.SEQUENCE table has already been upgraded");
return false;
}
// if the SYSTEM.SEQUENCE table is at 4.1.0 or before then we need to salt the table
// and pre-split it.
if (oldTable.getTimeStamp() <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) {
int batchSizeBytes = 100 * 1024; // 100K chunks
int sizeBytes = 0;
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(10000);
boolean success = false;
Scan scan = new Scan();
scan.setRaw(true);
scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
HTableInterface seqTable = conn.getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES);
try {
boolean committed = false;
logger.info("Adding salt byte to all SYSTEM.SEQUENCE rows");
ResultScanner scanner = seqTable.getScanner(scan);
try {
Result result;
while ((result = scanner.next()) != null) {
for (KeyValue keyValue : result.raw()) {
KeyValue newKeyValue = addSaltByte(keyValue, nSaltBuckets);
if (newKeyValue != null) {
sizeBytes += newKeyValue.getLength();
if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Put) {
// Delete old value
byte[] buf = keyValue.getBuffer();
Delete delete = new Delete(keyValue.getRow());
KeyValue deleteKeyValue = new KeyValue(buf, keyValue.getRowOffset(), keyValue.getRowLength(),
buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
keyValue.getTimestamp(), KeyValue.Type.Delete,
ByteUtil.EMPTY_BYTE_ARRAY,0,0);
delete.addDeleteMarker(deleteKeyValue);
mutations.add(delete);
sizeBytes += deleteKeyValue.getLength();
// Put new value
Put put = new Put(newKeyValue.getRow());
put.add(newKeyValue);
mutations.add(put);
} else if (KeyValue.Type.codeToType(newKeyValue.getType()) == KeyValue.Type.Delete){
// Copy delete marker using new key so that it continues
// to delete the key value preceding it that will be updated
// as well.
Delete delete = new Delete(newKeyValue.getRow());
delete.addDeleteMarker(newKeyValue);
mutations.add(delete);
}
}
if (sizeBytes >= batchSizeBytes) {
logger.info("Committing bactch of SYSTEM.SEQUENCE rows");
seqTable.batch(mutations);
mutations.clear();
sizeBytes = 0;
committed = true;
}
}
}
if (!mutations.isEmpty()) {
logger.info("Committing last bactch of SYSTEM.SEQUENCE rows");
seqTable.batch(mutations);
}
preSplitSequenceTable(conn, nSaltBuckets);
logger.info("Successfully completed upgrade of SYSTEM.SEQUENCE");
success = true;
return true;
} catch (InterruptedException e) {
throw ServerUtil.parseServerException(e);
} finally {
try {
scanner.close();
} finally {
if (!success) {
if (!committed) { // Try to recover by setting salting back to off, as we haven't successfully committed anything
// Don't use Delete here as we'd never be able to change it again at this timestamp.
KeyValue unsaltKV = KeyValueUtil.newKeyValue(seqTableKey,
PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES,
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP,
PInteger.INSTANCE.toBytes(0));
Put unsaltPut = new Put(seqTableKey);
unsaltPut.add(unsaltKV);
try {
sysTable.put(unsaltPut);
success = true;
} finally {
if (!success) logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
}
} else { // We're screwed b/c we've already committed some salted sequences...
logger.error("SYSTEM.SEQUENCE TABLE LEFT IN CORRUPT STATE");
}
}
}
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
try {
seqTable.close();
} catch (IOException e) {
logger.warn("Exception during close",e);
}
}
}
return false;
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
try {
sysTable.close();
} catch (IOException e) {
logger.warn("Exception during close",e);
}
}
}
@SuppressWarnings("deprecation")
private static KeyValue addSaltByte(KeyValue keyValue, int nSaltBuckets) {
byte[] buf = keyValue.getBuffer();
int length = keyValue.getRowLength();
int offset = keyValue.getRowOffset();
boolean isViewSeq = length > SEQ_PREFIX_BYTES.length && Bytes.compareTo(SEQ_PREFIX_BYTES, 0, SEQ_PREFIX_BYTES.length, buf, offset, SEQ_PREFIX_BYTES.length) == 0;
if (!isViewSeq && nSaltBuckets == 0) {
return null;
}
byte[] newBuf;
if (isViewSeq) { // We messed up the name for the sequences for view indexes so we'll take this opportunity to fix it
if (buf[length-1] == 0) { // Global indexes on views have trailing null byte
length--;
}
byte[][] rowKeyMetaData = new byte[3][];
SchemaUtil.getVarChars(buf, offset, length, 0, rowKeyMetaData);
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
byte[] unprefixedSchemaName = new byte[schemaName.length - MetaDataUtil.VIEW_INDEX_SEQUENCE_PREFIX_BYTES.length];
System.arraycopy(schemaName, MetaDataUtil.VIEW_INDEX_SEQUENCE_PREFIX_BYTES.length, unprefixedSchemaName, 0, unprefixedSchemaName.length);
byte[] tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
PName physicalName = PNameFactory.newName(unprefixedSchemaName);
// Reformulate key based on correct data
newBuf = MetaDataUtil.getViewIndexSequenceKey(tableName == null ? null : Bytes.toString(tableName),
physicalName, nSaltBuckets, false).getKey();
} else {
newBuf = new byte[length + 1];
System.arraycopy(buf, offset, newBuf, SaltingUtil.NUM_SALTING_BYTES, length);
newBuf[0] = SaltingUtil.getSaltingByte(newBuf, SaltingUtil.NUM_SALTING_BYTES, length, nSaltBuckets);
}
return new KeyValue(newBuf, 0, newBuf.length,
buf, keyValue.getFamilyOffset(), keyValue.getFamilyLength(),
buf, keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
keyValue.getTimestamp(), KeyValue.Type.codeToType(keyValue.getType()),
buf, keyValue.getValueOffset(), keyValue.getValueLength());
}
/**
* Upgrade the metadata in the catalog table to enable adding columns to tables with views
* @param oldMetaConnection caller should take care of closing the passed connection appropriately
* @throws SQLException
*/
public static void upgradeTo4_5_0(PhoenixConnection oldMetaConnection) throws SQLException {
PhoenixConnection metaConnection = null;
try {
// Need to use own connection with max time stamp to be able to read all data from SYSTEM.CATALOG
metaConnection = new PhoenixConnection(oldMetaConnection, HConstants.LATEST_TIMESTAMP);
logger.info("Upgrading metadata to support adding columns to tables with views");
String getBaseTableAndViews = "SELECT "
+ COLUMN_FAMILY + " AS BASE_PHYSICAL_TABLE, "
+ TENANT_ID + ", "
+ TABLE_SCHEM + " AS VIEW_SCHEMA, "
+ TABLE_NAME + " AS VIEW_NAME "
+ "FROM " + SYSTEM_CATALOG_NAME
+ " WHERE " + COLUMN_FAMILY + " IS NOT NULL " // column_family column points to the physical table name.
+ " AND " + COLUMN_NAME + " IS NULL "
+ " AND " + LINK_TYPE + " = ? ";
// Build a map of base table name -> list of views on the table.
Map<String, List<ViewKey>> parentTableViewsMap = new HashMap<>();
try (PreparedStatement stmt = metaConnection.prepareStatement(getBaseTableAndViews)) {
// Get back view rows that have links back to the base physical table. This takes care
// of cases when we have a hierarchy of views too.
stmt.setByte(1, LinkType.PHYSICAL_TABLE.getSerializedValue());
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
// this is actually SCHEMANAME.TABLENAME
String parentTable = rs.getString("BASE_PHYSICAL_TABLE");
String tenantId = rs.getString(TENANT_ID);
String viewSchema = rs.getString("VIEW_SCHEMA");
String viewName = rs.getString("VIEW_NAME");
List<ViewKey> viewKeysList = parentTableViewsMap.get(parentTable);
if (viewKeysList == null) {
viewKeysList = new ArrayList<>();
parentTableViewsMap.put(parentTable, viewKeysList);
}
viewKeysList.add(new ViewKey(tenantId, viewSchema, viewName));
}
}
}
boolean clearCache = false;
for (Entry<String, List<ViewKey>> entry : parentTableViewsMap.entrySet()) {
// Fetch column information for the base physical table
String physicalTable = entry.getKey();
String baseTableSchemaName = SchemaUtil.getSchemaNameFromFullName(physicalTable).equals(StringUtil.EMPTY_STRING) ? null : SchemaUtil.getSchemaNameFromFullName(physicalTable);
String baseTableName = SchemaUtil.getTableNameFromFullName(physicalTable);
List<ColumnDetails> basePhysicalTableColumns = new ArrayList<>();
// Columns fetched in order of ordinal position
String fetchColumnInfoForBasePhysicalTable = "SELECT " +
COLUMN_NAME + "," +
COLUMN_FAMILY + "," +
DATA_TYPE + "," +
COLUMN_SIZE + "," +
DECIMAL_DIGITS + "," +
ORDINAL_POSITION + "," +
SORT_ORDER + "," +
ARRAY_SIZE + " " +
"FROM SYSTEM.CATALOG " +
"WHERE " +
"TABLE_SCHEM %s " +
"AND TABLE_NAME = ? " +
"AND COLUMN_NAME IS NOT NULL " +
"ORDER BY " +
ORDINAL_POSITION;
PreparedStatement stmt = null;
if (baseTableSchemaName == null) {
fetchColumnInfoForBasePhysicalTable =
String.format(fetchColumnInfoForBasePhysicalTable, "IS NULL ");
stmt = metaConnection.prepareStatement(fetchColumnInfoForBasePhysicalTable);
stmt.setString(1, baseTableName);
} else {
fetchColumnInfoForBasePhysicalTable =
String.format(fetchColumnInfoForBasePhysicalTable, " = ? ");
stmt = metaConnection.prepareStatement(fetchColumnInfoForBasePhysicalTable);
stmt.setString(1, baseTableSchemaName);
stmt.setString(2, baseTableName);
}
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
basePhysicalTableColumns.add(new ColumnDetails(rs.getString(COLUMN_FAMILY), rs
.getString(COLUMN_NAME), rs.getInt(ORDINAL_POSITION), rs
.getInt(DATA_TYPE), rs.getInt(COLUMN_SIZE), rs.getInt(DECIMAL_DIGITS),
rs.getInt(SORT_ORDER), rs.getInt(ARRAY_SIZE)));
}
}
// Fetch column information for all the views on the base physical table ordered by ordinal position.
List<ViewKey> viewKeys = entry.getValue();
StringBuilder sb = new StringBuilder();
sb.append("SELECT " +
TENANT_ID + "," +
TABLE_SCHEM + "," +
TABLE_NAME + "," +
COLUMN_NAME + "," +
COLUMN_FAMILY + "," +
DATA_TYPE + "," +
COLUMN_SIZE + "," +
DECIMAL_DIGITS + "," +
ORDINAL_POSITION + "," +
SORT_ORDER + "," +
ARRAY_SIZE + " " +
"FROM SYSTEM.CATALOG " +
"WHERE " +
COLUMN_NAME + " IS NOT NULL " +
"AND " +
ORDINAL_POSITION + " <= ? " + // fetch only those columns that would impact setting of base column count
"AND " +
"(" + TENANT_ID+ ", " + TABLE_SCHEM + ", " + TABLE_NAME + ") IN (");
int numViews = viewKeys.size();
for (int i = 0; i < numViews; i++) {
sb.append(" (?, ?, ?) ");
if (i < numViews - 1) {
sb.append(", ");
}
}
sb.append(" ) ");
sb.append(" GROUP BY " +
TENANT_ID + "," +
TABLE_SCHEM + "," +
TABLE_NAME + "," +
COLUMN_NAME + "," +
COLUMN_FAMILY + "," +
DATA_TYPE + "," +
COLUMN_SIZE + "," +
DECIMAL_DIGITS + "," +
ORDINAL_POSITION + "," +
SORT_ORDER + "," +
ARRAY_SIZE + " " +
"ORDER BY " +
TENANT_ID + "," + TABLE_SCHEM + ", " + TABLE_NAME + ", " + ORDINAL_POSITION);
String fetchViewColumnsSql = sb.toString();
stmt = metaConnection.prepareStatement(fetchViewColumnsSql);
int numColsInBaseTable = basePhysicalTableColumns.size();
stmt.setInt(1, numColsInBaseTable);
int paramIndex = 1;
stmt.setInt(paramIndex++, numColsInBaseTable);
for (ViewKey view : viewKeys) {
stmt.setString(paramIndex++, view.tenantId);
stmt.setString(paramIndex++, view.schema);
stmt.setString(paramIndex++, view.name);
}
String currentTenantId = null;
String currentViewSchema = null;
String currentViewName = null;
try (ResultSet rs = stmt.executeQuery()) {
int numBaseTableColsMatched = 0;
boolean ignore = false;
boolean baseColumnCountUpserted = false;
while (rs.next()) {
String viewTenantId = rs.getString(TENANT_ID);
String viewSchema = rs.getString(TABLE_SCHEM);
String viewName = rs.getString(TABLE_NAME);
if (!(Objects.equal(viewTenantId, currentTenantId) && Objects.equal(viewSchema, currentViewSchema) && Objects.equal(viewName, currentViewName))) {
// We are about to iterate through columns of a different view. Check whether base column count was upserted.
// If it wasn't then it is likely the case that a column inherited from the base table was dropped from view.
if (currentViewName != null && !baseColumnCountUpserted && numBaseTableColsMatched < numColsInBaseTable) {
upsertBaseColumnCountInHeaderRow(metaConnection, currentTenantId, currentViewSchema, currentViewName, DIVERGED_VIEW_BASE_COLUMN_COUNT);
clearCache = true;
}
// reset the values as we are now going to iterate over columns of a new view.
numBaseTableColsMatched = 0;
currentTenantId = viewTenantId;
currentViewSchema = viewSchema;
currentViewName = viewName;
ignore = false;
baseColumnCountUpserted = false;
}
if (!ignore) {
/*
* Iterate over all the columns of the base physical table and the columns of the view. Compare the
* two till one of the following happens:
*
* 1) We run into a view column which is different from column in the base physical table.
* This means that the view has diverged from the base physical table. In such a case
* we will set a special value for the base column count. That special value will also be used
* on the server side to filter out the diverged view so that meta-data changes on the base
* physical table are not propagated to it.
*
* 2) Every physical table column is present in the view. In that case we set the base column count
* as the number of columns in the base physical table. At that point we ignore rest of the columns
* of the view.
*
*/
ColumnDetails baseTableColumn = basePhysicalTableColumns.get(numBaseTableColsMatched);
String columName = rs.getString(COLUMN_NAME);
String columnFamily = rs.getString(COLUMN_FAMILY);
int ordinalPos = rs.getInt(ORDINAL_POSITION);
int dataType = rs.getInt(DATA_TYPE);
int columnSize = rs.getInt(COLUMN_SIZE);
int decimalDigits = rs.getInt(DECIMAL_DIGITS);
int sortOrder = rs.getInt(SORT_ORDER);
int arraySize = rs.getInt(ARRAY_SIZE);
ColumnDetails viewColumn = new ColumnDetails(columnFamily, columName, ordinalPos, dataType, columnSize, decimalDigits, sortOrder, arraySize);
if (baseTableColumn.equals(viewColumn)) {
numBaseTableColsMatched++;
if (numBaseTableColsMatched == numColsInBaseTable) {
upsertBaseColumnCountInHeaderRow(metaConnection, viewTenantId, viewSchema, viewName, numColsInBaseTable);
// No need to ignore the rest of the columns of the view here since the
// query retrieved only those columns that had ordinal position <= numColsInBaseTable
baseColumnCountUpserted = true;
clearCache = true;
}
} else {
// special value to denote that the view has diverged from the base physical table.
upsertBaseColumnCountInHeaderRow(metaConnection, viewTenantId, viewSchema, viewName, DIVERGED_VIEW_BASE_COLUMN_COUNT);
baseColumnCountUpserted = true;
clearCache = true;
// ignore rest of the rows for the view.
ignore = true;
}
}
}
}
// set base column count for the header row of the base table too. We use this information
// to figure out whether the upgrade is in progress or hasn't started.
upsertBaseColumnCountInHeaderRow(metaConnection, null, baseTableSchemaName, baseTableName, BASE_TABLE_BASE_COLUMN_COUNT);
metaConnection.commit();
}
// clear metadata cache on region servers to force loading of the latest metadata
if (clearCache) {
metaConnection.getQueryServices().clearCache();
}
} finally {
if (metaConnection != null) {
metaConnection.close();
}
}
}
private static void upsertBaseColumnCountInHeaderRow(PhoenixConnection metaConnection,
String tenantId, String schemaName, String viewOrTableName, int baseColumnCount)
throws SQLException {
try (PreparedStatement stmt =
metaConnection.prepareStatement(UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW)) {
stmt.setString(1, tenantId);
stmt.setString(2, schemaName);
stmt.setString(3, viewOrTableName);
stmt.setString(4, null);
stmt.setString(5, null);
stmt.setInt(6, baseColumnCount);
stmt.executeUpdate();
}
}
private static class ColumnDetails {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + columnName.hashCode();
result = prime * result + ((columnFamily == null) ? 0 : columnFamily.hashCode());
result = prime * result + arraySize;
result = prime * result + dataType;
result = prime * result + maxLength;
result = prime * result + ordinalValue;
result = prime * result + scale;
result = prime * result + sortOrder;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
ColumnDetails other = (ColumnDetails) obj;
if (!columnName.equals(other.columnName)) return false;
if (columnFamily == null) {
if (other.columnFamily != null) return false;
} else if (!columnFamily.equals(other.columnFamily)) return false;
if (arraySize != other.arraySize) return false;
if (dataType != other.dataType) return false;
if (maxLength != other.maxLength) return false;
if (ordinalValue != other.ordinalValue) return false;
if (scale != other.scale) return false;
if (sortOrder != other.sortOrder) return false;
return true;
}
@Nullable
private final String columnFamily;
@Nonnull
private final String columnName;
private final int ordinalValue;
private final int dataType;
private final int maxLength;
private final int scale;
private final int sortOrder;
private final int arraySize;
ColumnDetails(String columnFamily, String columnName, int ordinalValue, int dataType,
int maxLength, int scale, int sortOrder, int arraySize) {
checkNotNull(columnName);
checkNotNull(ordinalValue);
checkNotNull(dataType);
this.columnFamily = columnFamily;
this.columnName = columnName;
this.ordinalValue = ordinalValue;
this.dataType = dataType;
this.maxLength = maxLength;
this.scale = scale;
this.sortOrder = sortOrder;
this.arraySize = arraySize;
}
}
private static class ViewKey {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((tenantId == null) ? 0 : tenantId.hashCode());
result = prime * result + name.hashCode();
result = prime * result + ((schema == null) ? 0 : schema.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
ViewKey other = (ViewKey) obj;
if (tenantId == null) {
if (other.tenantId != null) return false;
} else if (!tenantId.equals(other.tenantId)) return false;
if (!name.equals(other.name)) return false;
if (schema == null) {
if (other.schema != null) return false;
} else if (!schema.equals(other.schema)) return false;
return true;
}
@Nullable
private final String tenantId;
@Nullable
private final String schema;
@Nonnull
private final String name;
private ViewKey(String tenantId, String schema, String viewName) {
this.tenantId = tenantId;
this.schema = schema;
this.name = viewName;
}
}
private static String getTableRVC(List<String> tableNames) {
StringBuilder query = new StringBuilder("(");
for (int i = 0; i < tableNames.size(); i+=3) {
String tenantId = tableNames.get(i);
String schemaName = tableNames.get(i+1);
String tableName = tableNames.get(i+2);
query.append('(');
query.append(tenantId == null ? "null" : ("'" + tenantId + "'"));
query.append(',');
query.append(schemaName == null ? "null" : ("'" + schemaName + "'"));
query.append(',');
query.append("'" + tableName + "'");
query.append("),");
}
// Replace trailing , with ) to end IN expression
query.setCharAt(query.length()-1, ')');
return query.toString();
}
private static List<String> addPhysicalTables(PhoenixConnection conn, ResultSet rs, PTableType otherType, Set<String> physicalTables) throws SQLException {
List<String> tableNames = Lists.newArrayListWithExpectedSize(1024);
while (rs.next()) {
tableNames.add(rs.getString(1));
tableNames.add(rs.getString(2));
tableNames.add(rs.getString(3));
}
if (tableNames.isEmpty()) {
return Collections.emptyList();
}
List<String> otherTables = Lists.newArrayListWithExpectedSize(tableNames.size());
// Find the header rows for tables that have not been upgraded already.
// We don't care about views, as the row key cannot be different than the table.
// We need this query to find physical tables which won't have a link row.
String query = "SELECT TENANT_ID,TABLE_SCHEM,TABLE_NAME,TABLE_TYPE\n" +
"FROM SYSTEM.CATALOG (ROW_KEY_ORDER_OPTIMIZABLE BOOLEAN)\n" +
"WHERE COLUMN_NAME IS NULL\n" +
"AND COLUMN_FAMILY IS NULL\n" +
"AND ROW_KEY_ORDER_OPTIMIZABLE IS NULL\n" +
"AND TABLE_TYPE IN ('" + PTableType.TABLE.getSerializedValue() + "','" + otherType.getSerializedValue() + "')\n" +
"AND (TENANT_ID, TABLE_SCHEM, TABLE_NAME) IN " + getTableRVC(tableNames);
rs = conn.createStatement().executeQuery(query);
while (rs.next()) {
if (PTableType.TABLE.getSerializedValue().equals(rs.getString(4))) {
physicalTables.add(SchemaUtil.getTableName(rs.getString(2), rs.getString(3)));
} else {
otherTables.add(rs.getString(1));
otherTables.add(rs.getString(2));
otherTables.add(rs.getString(3));
}
}
return otherTables;
}
// Return all types that are descending and either:
// 1) variable length, which includes all array types (PHOENIX-2067)
// 2) fixed length with padding (PHOENIX-2120)
// 3) float and double (PHOENIX-2171)
// We exclude VARBINARY as we no longer support DESC for it.
private static String getAffectedDataTypes() {
StringBuilder buf = new StringBuilder("("
+ PVarchar.INSTANCE.getSqlType() + "," +
+ PChar.INSTANCE.getSqlType() + "," +
+ PBinary.INSTANCE.getSqlType() + "," +
+ PFloat.INSTANCE.getSqlType() + "," +
+ PDouble.INSTANCE.getSqlType() + "," +
+ PDecimal.INSTANCE.getSqlType() + ","
);
for (PDataType type : PDataType.values()) {
if (type.isArrayType()) {
buf.append(type.getSqlType());
buf.append(',');
}
}
buf.setCharAt(buf.length()-1, ')');
return buf.toString();
}
/**
* Identify the tables that are DESC VARBINARY as this is no longer supported
*/
public static List<String> getPhysicalTablesWithDescVarbinaryRowKey(PhoenixConnection conn) throws SQLException {
String query = "SELECT TENANT_ID,TABLE_SCHEM,TABLE_NAME\n" +
"FROM SYSTEM.CATALOG cat1\n" +
"WHERE COLUMN_NAME IS NOT NULL\n" +
"AND COLUMN_FAMILY IS NULL\n" +
"AND SORT_ORDER = " + SortOrder.DESC.getSystemValue() + "\n" +
"AND DATA_TYPE = " + PVarbinary.INSTANCE.getSqlType() + "\n" +
"GROUP BY TENANT_ID,TABLE_SCHEM,TABLE_NAME";
return getPhysicalTablesWithDescRowKey(query, conn);
}
/**
* Identify the tables that need to be upgraded due to PHOENIX-2067 and PHOENIX-2120
*/
public static List<String> getPhysicalTablesWithDescRowKey(PhoenixConnection conn) throws SQLException {
String query = "SELECT TENANT_ID,TABLE_SCHEM,TABLE_NAME\n" +
"FROM SYSTEM.CATALOG cat1\n" +
"WHERE COLUMN_NAME IS NOT NULL\n" +
"AND COLUMN_FAMILY IS NULL\n" +
"AND ( ( SORT_ORDER = " + SortOrder.DESC.getSystemValue() + "\n" +
" AND DATA_TYPE IN " + getAffectedDataTypes() + ")\n" +
" OR ( SORT_ORDER = " + SortOrder.ASC.getSystemValue() + "\n" +
" AND DATA_TYPE = " + PBinary.INSTANCE.getSqlType() + "\n" +
" AND COLUMN_SIZE > 1 ) )\n" +
"GROUP BY TENANT_ID,TABLE_SCHEM,TABLE_NAME";
return getPhysicalTablesWithDescRowKey(query, conn);
}
/**
* Identify the tables that need to be upgraded due to PHOENIX-2067
*/
private static List<String> getPhysicalTablesWithDescRowKey(String query, PhoenixConnection conn) throws SQLException {
// First query finds column rows of tables that need to be upgraded.
// We cannot tell if the column is from a table, view, or index however.
ResultSet rs = conn.createStatement().executeQuery(query);
Set<String> physicalTables = Sets.newHashSetWithExpectedSize(1024);
List<String> remainingTableNames = addPhysicalTables(conn, rs, PTableType.INDEX, physicalTables);
if (!remainingTableNames.isEmpty()) {
// Find tables/views for index
String indexLinkQuery = "SELECT TENANT_ID,TABLE_SCHEM,TABLE_NAME\n" +
"FROM SYSTEM.CATALOG\n" +
"WHERE COLUMN_NAME IS NULL\n" +
"AND (TENANT_ID, TABLE_SCHEM, COLUMN_FAMILY) IN " + getTableRVC(remainingTableNames) + "\n" +
"AND LINK_TYPE = " + LinkType.INDEX_TABLE.getSerializedValue();
rs = conn.createStatement().executeQuery(indexLinkQuery);
remainingTableNames = addPhysicalTables(conn, rs, PTableType.VIEW, physicalTables);
if (!remainingTableNames.isEmpty()) {
// Find physical table name from views, splitting on '.' to get schema name and table name
String physicalLinkQuery = "SELECT null, " +
" CASE WHEN INSTR(COLUMN_FAMILY,'.') = 0 THEN NULL ELSE SUBSTR(COLUMN_FAMILY,1,INSTR(COLUMN_FAMILY,'.')) END,\n" +
" CASE WHEN INSTR(COLUMN_FAMILY,'.') = 0 THEN COLUMN_FAMILY ELSE SUBSTR(COLUMN_FAMILY,INSTR(COLUMN_FAMILY,'.')+1) END\n" +
"FROM SYSTEM.CATALOG\n" +
"WHERE COLUMN_NAME IS NULL\n" +
"AND COLUMN_FAMILY IS NOT NULL\n" +
"AND (TENANT_ID, TABLE_SCHEM, TABLE_NAME) IN " + getTableRVC(remainingTableNames) + "\n" +
"AND LINK_TYPE = " + LinkType.PHYSICAL_TABLE.getSerializedValue();
rs = conn.createStatement().executeQuery(physicalLinkQuery);
// Add any tables (which will all be physical tables) which have not already been upgraded.
addPhysicalTables(conn, rs, PTableType.TABLE, physicalTables);
}
}
List<String> sortedPhysicalTables = new ArrayList<String>(physicalTables);
Collections.sort(sortedPhysicalTables);
return sortedPhysicalTables;
}
private static void upgradeDescVarLengthRowKeys(PhoenixConnection upgradeConn, PhoenixConnection globalConn, String schemaName, String tableName, boolean isTable, boolean bypassUpgrade) throws SQLException {
String physicalName = SchemaUtil.getTableName(schemaName, tableName);
long currentTime = System.currentTimeMillis();
String snapshotName = physicalName + "_" + currentTime;
HBaseAdmin admin = null;
if (isTable && !bypassUpgrade) {
admin = globalConn.getQueryServices().getAdmin();
}
boolean restoreSnapshot = false;
boolean success = false;
try {
if (isTable && !bypassUpgrade) {
String msg = "Taking snapshot of physical table " + physicalName + " prior to upgrade...";
System.out.println(msg);
logger.info(msg);
admin.disableTable(physicalName);
admin.snapshot(snapshotName, physicalName);
admin.enableTable(physicalName);
restoreSnapshot = true;
}
String escapedTableName = SchemaUtil.getEscapedTableName(schemaName, tableName);
String tenantInfo = "";
PName tenantId = PName.EMPTY_NAME;
if (upgradeConn.getTenantId() != null) {
tenantId = upgradeConn.getTenantId();
tenantInfo = " for tenant " + tenantId.getString();
}
String msg = "Starting upgrade of " + escapedTableName + tenantInfo + "...";
System.out.println(msg);
logger.info(msg);
ResultSet rs;
if (!bypassUpgrade) {
rs = upgradeConn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + escapedTableName);
rs.next(); // Run query
}
List<String> tableNames = Lists.newArrayListWithExpectedSize(1024);
tableNames.add(tenantId == PName.EMPTY_NAME ? null : tenantId.getString());
tableNames.add(schemaName);
tableNames.add(tableName);
// Find views to mark as upgraded
if (isTable) {
String query =
"SELECT TENANT_ID,TABLE_SCHEM,TABLE_NAME\n" +
"FROM SYSTEM.CATALOG\n" +
"WHERE COLUMN_NAME IS NULL\n" +
"AND COLUMN_FAMILY = '" + physicalName + "'" +
"AND LINK_TYPE = " + LinkType.PHYSICAL_TABLE.getSerializedValue();
rs = globalConn.createStatement().executeQuery(query);
while (rs.next()) {
tableNames.add(rs.getString(1));
tableNames.add(rs.getString(2));
tableNames.add(rs.getString(3));
}
}
// Mark the table and views as upgraded now
for (int i = 0; i < tableNames.size(); i += 3) {
String theTenantId = tableNames.get(i);
String theSchemaName = tableNames.get(i+1);
String theTableName = tableNames.get(i+2);
globalConn.createStatement().execute("UPSERT INTO " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
" (" + PhoenixDatabaseMetaData.TENANT_ID + "," +
PhoenixDatabaseMetaData.TABLE_SCHEM + "," +
PhoenixDatabaseMetaData.TABLE_NAME + "," +
MetaDataEndpointImpl.ROW_KEY_ORDER_OPTIMIZABLE + " BOOLEAN"
+ ") VALUES (" +
"'" + (theTenantId == null ? StringUtil.EMPTY_STRING : theTenantId) + "'," +
"'" + (theSchemaName == null ? StringUtil.EMPTY_STRING : theSchemaName) + "'," +
"'" + theTableName + "'," +
"TRUE)");
}
globalConn.commit();
for (int i = 0; i < tableNames.size(); i += 3) {
String theTenantId = tableNames.get(i);
String theSchemaName = tableNames.get(i+1);
String theTableName = tableNames.get(i+2);
globalConn.getQueryServices().clearTableFromCache(
theTenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(theTenantId),
theSchemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : Bytes.toBytes(schemaName),
Bytes.toBytes(theTableName), HConstants.LATEST_TIMESTAMP);
}
success = true;
msg = "Completed upgrade of " + escapedTableName + tenantInfo;
System.out.println(msg);
logger.info(msg);
} catch (Exception e) {
logger.error("Exception during upgrade of " + physicalName + ":", e);
} finally {
boolean restored = false;
try {
if (!success && restoreSnapshot) {
admin.disableTable(physicalName);
admin.restoreSnapshot(snapshotName, false);
admin.enableTable(physicalName);
String msg = "Restored snapshot of " + physicalName + " due to failure of upgrade";
System.out.println(msg);
logger.info(msg);
}
restored = true;
} catch (Exception e) {
logger.warn("Unable to restoring snapshot " + snapshotName + " after failed upgrade", e);
} finally {
try {
if (restoreSnapshot && restored) {
admin.deleteSnapshot(snapshotName);
}
} catch (Exception e) {
logger.warn("Unable to delete snapshot " + snapshotName + " after upgrade:", e);
} finally {
try {
if (admin != null) {
admin.close();
}
} catch (IOException e) {
logger.warn("Unable to close admin after upgrade:", e);
}
}
}
}
}
private static boolean isInvalidTableToUpgrade(PTable table) throws SQLException {
return (table.getType() != PTableType.TABLE || // Must be a table
table.getTenantId() != null || // Must be global
!table.getPhysicalName().equals(table.getName())); // Must be the physical table
}
/**
* Upgrade tables and their indexes due to a bug causing descending row keys to have a row key that
* prevents them from being sorted correctly (PHOENIX-2067).
*/
public static void upgradeDescVarLengthRowKeys(PhoenixConnection conn, List<String> tablesToUpgrade, boolean bypassUpgrade) throws SQLException {
if (tablesToUpgrade.isEmpty()) {
return;
}
List<PTable> tablesNeedingUpgrading = Lists.newArrayListWithExpectedSize(tablesToUpgrade.size());
List<String> invalidTables = Lists.newArrayListWithExpectedSize(tablesToUpgrade.size());
for (String fullTableName : tablesToUpgrade) {
PTable table = PhoenixRuntime.getTable(conn, fullTableName);
if (isInvalidTableToUpgrade(table)) {
invalidTables.add(fullTableName);
} else {
tablesNeedingUpgrading.add(table);
}
}
if (!invalidTables.isEmpty()) {
StringBuilder buf = new StringBuilder("Only physical tables should be upgraded as their views and indexes will be updated with them: ");
for (String fullTableName : invalidTables) {
buf.append(fullTableName);
buf.append(' ');
}
throw new SQLException(buf.toString());
}
PhoenixConnection upgradeConn = new PhoenixConnection(conn, true, true);
try {
upgradeConn.setAutoCommit(true);
for (PTable table : tablesNeedingUpgrading) {
boolean wasUpgraded = false;
if (!table.rowKeyOrderOptimizable()) {
wasUpgraded = true;
upgradeDescVarLengthRowKeys(upgradeConn, conn, table.getSchemaName().getString(), table.getTableName().getString(), true, bypassUpgrade);
}
// Upgrade global indexes
for (PTable index : table.getIndexes()) {
if (!index.rowKeyOrderOptimizable() && index.getIndexType() != IndexType.LOCAL) {
wasUpgraded = true;
upgradeDescVarLengthRowKeys(upgradeConn, conn, index.getSchemaName().getString(), index.getTableName().getString(), false, bypassUpgrade);
}
}
String sharedViewIndexName = Bytes.toString(MetaDataUtil.getViewIndexPhysicalName(table.getName().getBytes()));
// Upgrade view indexes
wasUpgraded |= upgradeSharedIndex(upgradeConn, conn, sharedViewIndexName, bypassUpgrade);
String sharedLocalIndexName = Bytes.toString(MetaDataUtil.getLocalIndexPhysicalName(table.getName().getBytes()));
// Upgrade local indexes
wasUpgraded |= upgradeSharedIndex(upgradeConn, conn, sharedLocalIndexName, bypassUpgrade);
if (!wasUpgraded) {
System.out.println("Upgrade not required for this table or its indexes: " + table.getName().getString());
}
}
} finally {
upgradeConn.close();
}
}
/**
* Upgrade shared indexes by querying for all that are associated with our
* physical table.
* @return true if any upgrades were performed and false otherwise.
*/
private static boolean upgradeSharedIndex(PhoenixConnection upgradeConn, PhoenixConnection globalConn, String physicalName, boolean bypassUpgrade) throws SQLException {
String query =
"SELECT TENANT_ID,TABLE_SCHEM,TABLE_NAME\n" +
"FROM SYSTEM.CATALOG cat1\n" +
"WHERE COLUMN_NAME IS NULL\n" +
"AND COLUMN_FAMILY = '" + physicalName + "'\n" +
"AND LINK_TYPE = " + LinkType.PHYSICAL_TABLE.getSerializedValue() + "\n" +
"ORDER BY TENANT_ID";
ResultSet rs = globalConn.createStatement().executeQuery(query);
String lastTenantId = null;
Connection conn = globalConn;
String url = globalConn.getURL();
boolean wasUpgraded = false;
while (rs.next()) {
String fullTableName = SchemaUtil.getTableName(
rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM),
rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
String tenantId = rs.getString(1);
if (tenantId != null && !tenantId.equals(lastTenantId)) {
if (lastTenantId != null) {
conn.close();
}
// Open tenant-specific connection when we find a new one
Properties props = new Properties(globalConn.getClientInfo());
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
conn = DriverManager.getConnection(url, props);
lastTenantId = tenantId;
}
PTable table = PhoenixRuntime.getTable(conn, fullTableName);
String tableTenantId = table.getTenantId() == null ? null : table.getTenantId().getString();
if (Objects.equal(lastTenantId, tableTenantId) && !table.rowKeyOrderOptimizable()) {
upgradeDescVarLengthRowKeys(upgradeConn, globalConn, table.getSchemaName().getString(), table.getTableName().getString(), false, bypassUpgrade);
wasUpgraded = true;
}
}
rs.close();
if (lastTenantId != null) {
conn.close();
}
return wasUpgraded;
}
public static void addRowKeyOrderOptimizableCell(List<Mutation> tableMetadata, byte[] tableHeaderRowKey, long clientTimeStamp) {
Put put = new Put(tableHeaderRowKey, clientTimeStamp);
put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
MetaDataEndpointImpl.ROW_KEY_ORDER_OPTIMIZABLE_BYTES, PBoolean.INSTANCE.toBytes(true));
tableMetadata.add(put);
}
public static boolean truncateStats(HTableInterface metaTable, HTableInterface statsTable)
throws IOException, InterruptedException {
byte[] statsTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME,
PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE);
List<Cell> columnCells = metaTable.get(new Get(statsTableKey))
.getColumnCells(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
long timestamp;
if (!columnCells.isEmpty() && (timestamp = columnCells.get(0)
.getTimestamp()) < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) {
KeyValue upgradeKV = KeyValueUtil.newKeyValue(statsTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
UPGRADE_TO_4_7_COLUMN_NAME, timestamp, PBoolean.INSTANCE.toBytes(true));
Put upgradePut = new Put(statsTableKey);
upgradePut.add(upgradeKV);
// check for null in UPGRADE_TO_4_7_COLUMN_NAME in checkAndPut so that only single client
// drop the rows of SYSTEM.STATS
if (metaTable.checkAndPut(statsTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
UPGRADE_TO_4_7_COLUMN_NAME, null, upgradePut)) {
List<Mutation> mutations = Lists.newArrayListWithExpectedSize(1000);
Scan scan = new Scan();
scan.setRaw(true);
scan.setMaxVersions();
ResultScanner statsScanner = statsTable.getScanner(scan);
Result r;
mutations.clear();
int count = 0;
while ((r = statsScanner.next()) != null) {
Delete delete = null;
for (KeyValue keyValue : r.raw()) {
if (KeyValue.Type.codeToType(keyValue.getType()) == KeyValue.Type.Put) {
if (delete == null) {
delete = new Delete(keyValue.getRow());
}
KeyValue deleteKeyValue = new KeyValue(keyValue.getRowArray(), keyValue.getRowOffset(),
keyValue.getRowLength(), keyValue.getFamilyArray(), keyValue.getFamilyOffset(),
keyValue.getFamilyLength(), keyValue.getQualifierArray(),
keyValue.getQualifierOffset(), keyValue.getQualifierLength(),
keyValue.getTimestamp(), KeyValue.Type.Delete, ByteUtil.EMPTY_BYTE_ARRAY, 0, 0);
delete.addDeleteMarker(deleteKeyValue);
}
}
if (delete != null) {
mutations.add(delete);
if (count > 10) {
statsTable.batch(mutations);
mutations.clear();
count = 0;
}
count++;
}
}
if (!mutations.isEmpty()) {
statsTable.batch(mutations);
}
return true;
}
}
return false;
}
private static void mapTableToNamespace(HBaseAdmin admin, HTableInterface metatable, String srcTableName,
String destTableName, ReadOnlyProps props, Long ts, String phoenixTableName, PTableType pTableType,PName tenantId)
throws SnapshotCreationException, IllegalArgumentException, IOException, InterruptedException,
SQLException {
srcTableName = SchemaUtil.normalizeIdentifier(srcTableName);
if (!SchemaUtil.isNamespaceMappingEnabled(pTableType,
props)) { throw new IllegalArgumentException(SchemaUtil.isSystemTable(srcTableName.getBytes())
? "For system table " + QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE
+ " also needs to be enabled along with " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
: QueryServices.IS_NAMESPACE_MAPPING_ENABLED + " is not enabled"); }
boolean srcTableExists=admin.tableExists(srcTableName);
// we need to move physical table in actual namespace for TABLE and Index
if (srcTableExists && (PTableType.TABLE.equals(pTableType)
|| PTableType.INDEX.equals(pTableType) || PTableType.SYSTEM.equals(pTableType))) {
boolean destTableExists=admin.tableExists(destTableName);
if (!destTableExists) {
String snapshotName = QueryConstants.UPGRADE_TABLE_SNAPSHOT_PREFIX + srcTableName;
logger.info("Disabling table " + srcTableName + " ..");
admin.disableTable(srcTableName);
logger.info(String.format("Taking snapshot %s of table %s..", snapshotName, srcTableName));
admin.snapshot(snapshotName, srcTableName);
logger.info(
String.format("Restoring snapshot %s in destination table %s..", snapshotName, destTableName));
admin.cloneSnapshot(Bytes.toBytes(snapshotName), Bytes.toBytes(destTableName));
logger.info(String.format("deleting old table %s..", srcTableName));
admin.deleteTable(srcTableName);
logger.info(String.format("deleting snapshot %s..", snapshotName));
admin.deleteSnapshot(snapshotName);
}
}
byte[] tableKey = SchemaUtil.getTableKey(tenantId != null ? tenantId.getString() : null,
SchemaUtil.getSchemaNameFromFullName(phoenixTableName),
SchemaUtil.getTableNameFromFullName(phoenixTableName));
List<Cell> columnCells = metatable.get(new Get(tableKey))
.getColumnCells(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
if (ts == null) {
if (!columnCells.isEmpty()) {
ts = columnCells.get(0).getTimestamp();
} else if (PTableType.SYSTEM != pTableType) { throw new IllegalArgumentException(
"Timestamp passed is null and cannot derive timestamp for " + tableKey + " from meta table!!"); }
}
if (ts != null) {
// Update flag to represent table is mapped to namespace
logger.info(String.format("Updating meta information of phoenix table '%s' to map to namespace..",
phoenixTableName));
Put put = new Put(tableKey, ts);
put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED_BYTES,
PBoolean.INSTANCE.toBytes(Boolean.TRUE));
metatable.put(put);
}
}
/*
* Method to map existing phoenix table to a namespace. Should not be use if tables has views and indexes ,instead
* use map table utility in psql.py
*/
public static void mapTableToNamespace(HBaseAdmin admin, HTableInterface metatable, String tableName,
ReadOnlyProps props, Long ts, PTableType pTableType, PName tenantId) throws SnapshotCreationException,
IllegalArgumentException, IOException, InterruptedException, SQLException {
String destTablename = SchemaUtil
.normalizeIdentifier(SchemaUtil.getPhysicalTableName(tableName, props).getNameAsString());
mapTableToNamespace(admin, metatable, tableName, destTablename, props, ts, tableName, pTableType, tenantId);
}
public static void upgradeTable(PhoenixConnection conn, String srcTable) throws SQLException,
SnapshotCreationException, IllegalArgumentException, IOException, InterruptedException {
ReadOnlyProps readOnlyProps = conn.getQueryServices().getProps();
if (conn.getSchema() != null) { throw new IllegalArgumentException(
"Schema should not be set for connection!!"); }
if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.TABLE,
readOnlyProps)) { throw new IllegalArgumentException(
QueryServices.IS_NAMESPACE_MAPPING_ENABLED + " is not enabled!!"); }
try (HBaseAdmin admin = conn.getQueryServices().getAdmin();
HTableInterface metatable = conn.getQueryServices()
.getTable(SchemaUtil
.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, readOnlyProps)
.getName());) {
String tableName = SchemaUtil.normalizeIdentifier(srcTable);
String schemaName = SchemaUtil.getSchemaNameFromFullName(tableName);
// Confirm table is not already upgraded
PTable table = PhoenixRuntime.getTable(conn, tableName);
// Upgrade is not required if schemaName is not present.
if (schemaName.equals("") && !PTableType.VIEW
.equals(table.getType())) { throw new IllegalArgumentException("Table doesn't have schema name"); }
if (table.isNamespaceMapped()) { throw new IllegalArgumentException("Table is already upgraded"); }
if (!schemaName.equals("")) {
logger.info(String.format("Creating schema %s..", schemaName));
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
}
String oldPhysicalName = table.getPhysicalName().getString();
String newPhysicalTablename = SchemaUtil.normalizeIdentifier(
SchemaUtil.getPhysicalTableName(oldPhysicalName, readOnlyProps).getNameAsString());
logger.info(String.format("Upgrading %s %s..", table.getType(), tableName));
// Upgrade the data or main table
mapTableToNamespace(admin, metatable, tableName, newPhysicalTablename, readOnlyProps,
PhoenixRuntime.getCurrentScn(readOnlyProps), tableName, table.getType(),conn.getTenantId());
// clear the cache and get new table
conn.getQueryServices().clearTableFromCache(
conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : conn.getTenantId().getBytes(),
table.getSchemaName().getBytes(), table.getTableName().getBytes(),
PhoenixRuntime.getCurrentScn(readOnlyProps));
MetaDataMutationResult result = new MetaDataClient(conn).updateCache(conn.getTenantId(),schemaName,
SchemaUtil.getTableNameFromFullName(tableName),true);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { throw new TableNotFoundException(
tableName); }
table = result.getTable();
// check whether table is properly upgraded before upgrading indexes
if (table.isNamespaceMapped()) {
for (PTable index : table.getIndexes()) {
String srcTableName = index.getPhysicalName().getString();
String destTableName = null;
String phoenixTableName = index.getName().getString();
boolean updateLink = true;
if (srcTableName.contains(QueryConstants.NAMESPACE_SEPARATOR)) {
// Skip already migrated
logger.info(String.format("skipping as it seems index '%s' is already upgraded..",
index.getName()));
continue;
}
if (MetaDataUtil.isLocalIndex(srcTableName)) {
logger.info(String.format("local index '%s' found with physical hbase table name ''..",
index.getName(), srcTableName));
destTableName = Bytes
.toString(MetaDataUtil.getLocalIndexPhysicalName(newPhysicalTablename.getBytes()));
// update parent_table property in local index table descriptor
conn.createStatement()
.execute(String.format("ALTER TABLE %s set " + MetaDataUtil.PARENT_TABLE_KEY + "='%s'",
phoenixTableName, table.getPhysicalName()));
} else if (MetaDataUtil.isViewIndex(srcTableName)) {
logger.info(String.format("View index '%s' found with physical hbase table name ''..",
index.getName(), srcTableName));
destTableName = Bytes
.toString(MetaDataUtil.getViewIndexPhysicalName(newPhysicalTablename.getBytes()));
} else {
logger.info(String.format("Global index '%s' found with physical hbase table name ''..",
index.getName(), srcTableName));
destTableName = SchemaUtil
.getPhysicalTableName(index.getPhysicalName().getString(), readOnlyProps)
.getNameAsString();
}
logger.info(String.format("Upgrading index %s..", index.getName()));
if (!(table.getType() == PTableType.VIEW && !MetaDataUtil.isViewIndex(srcTableName)
&& IndexType.LOCAL != index.getIndexType())) {
mapTableToNamespace(admin, metatable, srcTableName, destTableName, readOnlyProps,
PhoenixRuntime.getCurrentScn(readOnlyProps), phoenixTableName, index.getType(),
conn.getTenantId());
}
if (updateLink) {
logger.info(String.format("Updating link information for index '%s' ..", index.getName()));
updateLink(conn, srcTableName, destTableName,index.getSchemaName(),index.getTableName());
conn.commit();
}
conn.getQueryServices().clearTableFromCache(
conn.getTenantId() == null ? ByteUtil.EMPTY_BYTE_ARRAY : conn.getTenantId().getBytes(),
index.getSchemaName().getBytes(), index.getTableName().getBytes(),
PhoenixRuntime.getCurrentScn(readOnlyProps));
}
updateIndexesSequenceIfPresent(conn, table);
conn.commit();
} else {
throw new RuntimeException("Error: problem occured during upgrade. Table is not upgraded successfully");
}
if (table.getType() == PTableType.VIEW) {
updateLink(conn, oldPhysicalName, newPhysicalTablename,table.getSchemaName(),table.getTableName());
conn.commit();
}
}
}
private static void updateIndexesSequenceIfPresent(PhoenixConnection connection, PTable dataTable)
throws SQLException {
PName tenantId = connection.getTenantId();
PName physicalName = dataTable.getPhysicalName();
PName oldPhysicalName = PNameFactory.newName(
physicalName.toString().replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR));
String oldSchemaName = MetaDataUtil.getViewIndexSequenceSchemaName(oldPhysicalName, false);
String newSchemaName = MetaDataUtil.getViewIndexSequenceSchemaName(physicalName, true);
String newSequenceName = MetaDataUtil.getViewIndexSequenceName(physicalName, tenantId, true);
// create new entry with new schema format
String upsert = "UPSERT INTO " + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " SELECT REGEXP_SPLIT("
+ PhoenixDatabaseMetaData.SEQUENCE_NAME + ",'_')[3] ,\'" + newSchemaName + "\',\'" + newSequenceName
+ "\'," + START_WITH + "," + CURRENT_VALUE + "," + INCREMENT_BY + "," + CACHE_SIZE + "," + MIN_VALUE
+ "," + MAX_VALUE + "," + CYCLE_FLAG + "," + LIMIT_REACHED_FLAG + " FROM "
+ PhoenixDatabaseMetaData.SYSTEM_SEQUENCE + " WHERE " + PhoenixDatabaseMetaData.TENANT_ID
+ " IS NULL AND " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + " = '" + oldSchemaName + "'";
connection.createStatement().executeUpdate(upsert);
// delete old sequence
MetaDataUtil.deleteViewIndexSequences(connection, oldPhysicalName, false);
}
private static void updateLink(PhoenixConnection conn, String srcTableName, String destTableName, PName schemaName, PName tableName)
throws SQLException {
PreparedStatement updateLinkStatment = conn.prepareStatement(String.format(UPDATE_LINK,destTableName));
updateLinkStatment.setString(1, schemaName.getString());
updateLinkStatment.setString(2, schemaName.getString());
updateLinkStatment.setString(3, tableName.getString());
updateLinkStatment.setString(4, srcTableName);
updateLinkStatment.execute();
PreparedStatement deleteLinkStatment = conn.prepareStatement(DELETE_LINK);
deleteLinkStatment.setString(1, schemaName.getString());
deleteLinkStatment.setString(2, schemaName.getString());
deleteLinkStatment.setString(3, tableName.getString());
deleteLinkStatment.setString(4, srcTableName);
deleteLinkStatment.execute();
}
public static void mapChildViewsToNamespace(PhoenixConnection conn, String table, Properties props)
throws SQLException, SnapshotCreationException, IllegalArgumentException, IOException,
InterruptedException {
PreparedStatement preparedStatment = conn.prepareStatement(GET_VIEWS_QUERY);
preparedStatment.setString(1, SchemaUtil.normalizeIdentifier(table));
ResultSet rs = preparedStatment.executeQuery();
String tenantId = null;
String prevTenantId = null;
PhoenixConnection passedConn = conn;
while (rs.next()) {
tenantId = rs.getString(1);
if (prevTenantId != tenantId) {
if (tenantId != null) {
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
} else {
props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
}
if (passedConn != conn) {
conn.close();
}
conn = DriverManager.getConnection(conn.getURL(), props).unwrap(PhoenixConnection.class);
}
String viewName=SchemaUtil.getTableName(rs.getString(2), rs.getString(3));
logger.info(String.format("Upgrading view %s for tenantId %s..", viewName,tenantId));
UpgradeUtil.upgradeTable(conn, viewName);
prevTenantId = tenantId;
}
if (passedConn != conn) {
conn.close();
}
}
public static final String getSysCatalogSnapshotName(long currentSystemTableTimestamp) {
String tableString = SYSTEM_CATALOG_NAME;
Format formatter = new SimpleDateFormat("yyyyMMddHHmmss");
String date = formatter.format(new Date(System.currentTimeMillis()));
String upgradingFrom = getVersion(currentSystemTableTimestamp);
return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date;
}
public static boolean isNoUpgradeSet(Properties props) {
return Boolean.compare(true, Boolean.valueOf(props.getProperty(DO_NOT_UPGRADE))) == 0;
}
public static void doNotUpgradeOnFirstConnection(Properties props) {
props.setProperty(DO_NOT_UPGRADE, String.valueOf(true));
}
}