blob: d027a08064126481af9788b7027d8625d8099c6b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.compat;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_NONE;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_READONLY;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_READWRITE;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.ACCESSTYPE_WRITEONLY;
import static org.apache.impala.service.MetadataOp.TABLE_TYPE_TABLE;
import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.MessageSerializer;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
import org.apache.hive.service.rpc.thrift.TGetTablesReq;
import org.apache.impala.authorization.User;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.common.TransactionException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.Frontend;
import org.apache.impala.service.MetadataOp;
import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AcidUtils.TblTransaction;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* A wrapper around some of Hive's Metastore API's to abstract away differences
* between major versions of Hive. This implements the shimmed methods for Hive 3.
*/
public class MetastoreShim {
private static final Logger LOG = Logger.getLogger(MetastoreShim.class);
private static final String EXTWRITE = "EXTWRITE";
private static final String EXTREAD = "EXTREAD";
private static final String HIVEBUCKET2 = "HIVEBUCKET2";
private static final String HIVEFULLACIDREAD = "HIVEFULLACIDREAD";
private static final String HIVEFULLACIDWRITE = "HIVEFULLACIDWRITE";
private static final String HIVEMANAGEDINSERTREAD = "HIVEMANAGEDINSERTREAD";
private static final String HIVEMANAGEDINSERTWRITE = "HIVEMANAGEDINSERTWRITE";
private static final String HIVEMANAGESTATS = "HIVEMANAGESTATS";
// Materialized View
private static final String HIVEMQT = "HIVEMQT";
// Virtual View
private static final String HIVESQL = "HIVESQL";
private static final long MAJOR_VERSION = 3;
private static boolean capabilitiestSet_ = false;
// Number of retries to acquire an HMS ACID lock.
private static final int LOCK_RETRIES = 10;
// Time interval between retries of acquiring an HMS ACID lock
private static final int LOCK_RETRY_WAIT_SECONDS = 3;
private final static String HMS_RPC_ERROR_FORMAT_STR =
"Error making '%s' RPC to Hive Metastore: ";
// Id used to register transactions / locks.
// Not final, as it makes sense to set it based on role + instance, see IMPALA-8853.
public static String TRANSACTION_USER_ID = "Impala";
/**
* Initializes and returns a TblTransaction object for table 'tbl'.
* Opens a new transaction if txnId is not valid.
*/
public static TblTransaction createTblTransaction(
IMetaStoreClient client, Table tbl, long txnId)
throws TransactionException {
TblTransaction tblTxn = new TblTransaction();
try {
if (txnId <= 0) {
txnId = openTransaction(client);
tblTxn.ownsTxn = true;
}
tblTxn.txnId = txnId;
tblTxn.writeId =
allocateTableWriteId(client, txnId, tbl.getDbName(), tbl.getTableName());
tblTxn.validWriteIds =
getValidWriteIdListInTxn(client, tbl.getDbName(), tbl.getTableName(), txnId);
return tblTxn;
}
catch (TException e) {
if (tblTxn.ownsTxn) abortTransactionNoThrow(client, tblTxn.txnId);
throw new TransactionException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createTblTransaction"), e);
}
}
static public void commitTblTransactionIfNeeded(IMetaStoreClient client,
TblTransaction tblTxn) throws TransactionException {
if (tblTxn.ownsTxn) commitTransaction(client, tblTxn.txnId);
}
static public void abortTblTransactionIfNeeded(IMetaStoreClient client,
TblTransaction tblTxn) {
if (tblTxn.ownsTxn) abortTransactionNoThrow(client, tblTxn.txnId);
}
/**
* Constant variable that stores engine value needed to store / access
* Impala column statistics.
*/
protected static final String IMPALA_ENGINE = "impala";
/**
* Wrapper around MetaStoreUtils.validateName() to deal with added arguments.
*/
public static boolean validateName(String name) {
return MetaStoreUtils.validateName(name, null);
}
/**
* Wrapper around IMetaStoreClient.alter_table with validWriteIds as a param.
*/
public static void alterTableWithTransaction(IMetaStoreClient client,
Table tbl, TblTransaction tblTxn)
throws ImpalaRuntimeException {
tbl.setWriteId(tblTxn.writeId);
try {
client.alter_table(null, tbl.getDbName(), tbl.getTableName(),
tbl, null, tblTxn.validWriteIds);
}
catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
}
}
/**
* Wrapper around IMetaStoreClient.alter_partition() to deal with added
* arguments.
*/
public static void alterPartition(IMetaStoreClient client, Partition partition)
throws InvalidOperationException, MetaException, TException {
client.alter_partition(
partition.getDbName(), partition.getTableName(), partition, null);
}
/**
* Wrapper around IMetaStoreClient.alter_partitions() to deal with added
* arguments.
*/
public static void alterPartitions(IMetaStoreClient client, String dbName,
String tableName, List<Partition> partitions)
throws InvalidOperationException, MetaException, TException {
client.alter_partitions(dbName, tableName, partitions, null);
}
/**
* Wrapper around IMetaStoreClient.createTableWithConstraints() to deal with added
* arguments. Hive four new arguments are uniqueConstraints, notNullConstraints,
* defaultConstraints, and checkConstraints.
*/
public static void createTableWithConstraints(IMetaStoreClient client,
Table newTbl, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys)
throws InvalidOperationException, MetaException, TException {
client.createTableWithConstraints(newTbl,primaryKeys, foreignKeys, null, null,
null, null);
}
/**
* Wrapper around IMetaStoreClient.alter_partitions with transaction information
*/
public static void alterPartitionsWithTransaction(IMetaStoreClient client,
String dbName, String tblName, List<Partition> partitions, TblTransaction tblTxn
) throws InvalidOperationException, MetaException, TException {
for (Partition part : partitions) {
part.setWriteId(tblTxn.writeId);
}
// Correct validWriteIdList is needed
// to commit the alter partitions operation in hms side.
client.alter_partitions(dbName, tblName, partitions, null,
tblTxn.validWriteIds, tblTxn.writeId);
}
/**
* Wrapper around IMetaStoreClient.getTableColumnStatistics() to deal with added
* arguments.
*/
public static List<ColumnStatisticsObj> getTableColumnStatistics(
IMetaStoreClient client, String dbName, String tableName, List<String> colNames)
throws NoSuchObjectException, MetaException, TException {
return client.getTableColumnStatistics(dbName, tableName, colNames,
/*engine*/IMPALA_ENGINE);
}
/**
* Wrapper around IMetaStoreClient.deleteTableColumnStatistics() to deal with added
* arguments.
*/
public static boolean deleteTableColumnStatistics(IMetaStoreClient client,
String dbName, String tableName, String colName)
throws NoSuchObjectException, MetaException, InvalidObjectException, TException,
InvalidInputException {
return client.deleteTableColumnStatistics(dbName, tableName, colName,
/*engine*/IMPALA_ENGINE);
}
/**
* Wrapper around ColumnStatistics c'tor to deal with the added engine property.
*/
public static ColumnStatistics createNewHiveColStats() {
ColumnStatistics colStats = new ColumnStatistics();
colStats.setEngine(IMPALA_ENGINE);
return colStats;
}
/**
* Wrapper around MetaStoreUtils.updatePartitionStatsFast() to deal with added
* arguments.
*/
public static void updatePartitionStatsFast(Partition partition, Table tbl,
Warehouse warehouse) throws MetaException {
MetaStoreUtils.updatePartitionStatsFast(partition, tbl, warehouse, /*madeDir*/false,
/*forceRecompute*/false,
/*environmentContext*/null, /*isCreate*/false);
}
/**
* Return the maximum number of Metastore objects that should be retrieved in
* a batch.
*/
public static String metastoreBatchRetrieveObjectsMaxConfigKey() {
return MetastoreConf.ConfVars.BATCH_RETRIEVE_OBJECTS_MAX.toString();
}
/**
* Return the key and value that should be set in the partition parameters to
* mark that the stats were generated automatically by a stats task.
*/
public static Pair<String, String> statsGeneratedViaStatsTaskParam() {
return Pair.create(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
}
public static TResultSet execGetFunctions(
Frontend frontend, TMetadataOpRequest request, User user) throws ImpalaException {
TGetFunctionsReq req = request.getGet_functions_req();
return MetadataOp.getFunctions(
frontend, req.getCatalogName(), req.getSchemaName(), req.getFunctionName(), user);
}
public static TResultSet execGetColumns(
Frontend frontend, TMetadataOpRequest request, User user) throws ImpalaException {
TGetColumnsReq req = request.getGet_columns_req();
return MetadataOp.getColumns(frontend, req.getCatalogName(), req.getSchemaName(),
req.getTableName(), req.getColumnName(), user);
}
public static TResultSet execGetTables(
Frontend frontend, TMetadataOpRequest request, User user) throws ImpalaException {
TGetTablesReq req = request.getGet_tables_req();
return MetadataOp.getTables(frontend, req.getCatalogName(), req.getSchemaName(),
req.getTableName(), req.getTableTypes(), user);
}
public static TResultSet execGetSchemas(
Frontend frontend, TMetadataOpRequest request, User user) throws ImpalaException {
TGetSchemasReq req = request.getGet_schemas_req();
return MetadataOp.getSchemas(
frontend, req.getCatalogName(), req.getSchemaName(), user);
}
/**
* Supported HMS-3 types
*/
public static final EnumSet<TableType> IMPALA_SUPPORTED_TABLE_TYPES = EnumSet
.of(TableType.EXTERNAL_TABLE, TableType.MANAGED_TABLE, TableType.VIRTUAL_VIEW,
TableType.MATERIALIZED_VIEW);
/**
* mapping between the HMS-3 type the Impala types
*/
public static final ImmutableMap<String, String> HMS_TO_IMPALA_TYPE =
new ImmutableMap.Builder<String, String>()
.put("EXTERNAL_TABLE", TABLE_TYPE_TABLE)
.put("MANAGED_TABLE", TABLE_TYPE_TABLE)
.put("INDEX_TABLE", TABLE_TYPE_TABLE)
.put("VIRTUAL_VIEW", TABLE_TYPE_VIEW)
.put("MATERIALIZED_VIEW", TABLE_TYPE_VIEW).build();
/**
* Method which maps Metastore's TableType to Impala's table type. In metastore 2
* Materialized view is not supported
*/
public static String mapToInternalTableType(String typeStr) {
String defaultTableType = TABLE_TYPE_TABLE;
TableType tType;
if (typeStr == null) return defaultTableType;
try {
tType = TableType.valueOf(typeStr.toUpperCase());
} catch (Exception e) {
return defaultTableType;
}
switch (tType) {
case EXTERNAL_TABLE:
case MANAGED_TABLE:
//Deprecated and removed in Hive-3.. //TODO throw exception?
case INDEX_TABLE:
return TABLE_TYPE_TABLE;
case VIRTUAL_VIEW:
case MATERIALIZED_VIEW:
return TABLE_TYPE_VIEW;
default:
return defaultTableType;
}
}
//hive-3 has a different class to encode and decode event messages
private static final MessageEncoder eventMessageEncoder_ =
MessageFactory.getDefaultInstance(MetastoreConf.newMetastoreConf());
/**
* Wrapper method which returns HMS-3 Message factory in case Impala is
* building against Hive-3
*/
public static MessageDeserializer getMessageDeserializer() {
return eventMessageEncoder_.getDeserializer();
}
/**
* Wrapper around HMS-3 message encoder to get the serializer
* @return
*/
public static MessageSerializer getMessageSerializer() {
return eventMessageEncoder_.getSerializer();
}
/**
* Wrapper around FileUtils.makePartName to deal with package relocation in Hive 3.
* This method uses the metastore's FileUtils method instead of one from hive-exec
* @param partitionColNames
* @param values
* @return
*/
public static String makePartName(List<String> partitionColNames, List<String> values) {
return FileUtils.makePartName(partitionColNames, values);
}
/**
* Wrapper method around message factory's build alter table message due to added
* arguments in hive 3.
*/
@VisibleForTesting
public static AlterTableMessage buildAlterTableMessage(Table before, Table after,
boolean isTruncateOp, long writeId) {
return MessageBuilder.getInstance().buildAlterTableMessage(before, after,
isTruncateOp, writeId);
}
/**
* Wrapper around HMS-3 message serializer
* @param message
* @return serialized string to use used in the NotificationEvent's message field
*/
@VisibleForTesting
public static String serializeEventMessage(EventMessage message) {
return getMessageSerializer().serialize(message);
}
/**
* Wrapper method to get the formatted string to represent the columns information of
* a metastore table. This method was changed in Hive-3 significantly when compared
* to Hive-2. In order to avoid adding unnecessary dependency to hive-exec this
* method copies the source code from hive-2's MetaDataFormatUtils class for this
* method.
* TODO : In order to avoid this copy, we move move this code from hive's ql module
* to a util method in MetastoreUtils in metastore module
* @return
*/
public static String getAllColumnsInformation(List<FieldSchema> tabCols,
List<FieldSchema> partitionCols, boolean printHeader, boolean isOutputPadded,
boolean showPartColsSeparately) {
return HiveMetadataFormatUtils
.getAllColumnsInformation(tabCols, partitionCols, printHeader, isOutputPadded,
showPartColsSeparately);
}
/**
* Wrapper method around Hive's MetadataFormatUtils.getTableInformation which has
* changed significantly in Hive-3
* @return
*/
public static String getTableInformation(
org.apache.hadoop.hive.ql.metadata.Table table) {
return HiveMetadataFormatUtils.getTableInformation(table.getTTable(), false);
}
/**
* This method has been copied from BaseSemanticAnalyzer class of Hive and is fairly
* stable now (last change was in mid 2016 as of April 2019). Copying is preferred over
* adding dependency to this class which pulls in a lot of other transitive
* dependencies from hive-exec
*/
public static String unescapeSQLString(String stringLiteral) {
{
Character enclosure = null;
// Some of the strings can be passed in as unicode. For example, the
// delimiter can be passed in as \002 - So, we first check if the
// string is a unicode number, else go back to the old behavior
StringBuilder sb = new StringBuilder(stringLiteral.length());
for (int i = 0; i < stringLiteral.length(); i++) {
char currentChar = stringLiteral.charAt(i);
if (enclosure == null) {
if (currentChar == '\'' || stringLiteral.charAt(i) == '\"') {
enclosure = currentChar;
}
// ignore all other chars outside the enclosure
continue;
}
if (enclosure.equals(currentChar)) {
enclosure = null;
continue;
}
if (currentChar == '\\' && (i + 6 < stringLiteral.length()) && stringLiteral.charAt(i + 1) == 'u') {
int code = 0;
int base = i + 2;
for (int j = 0; j < 4; j++) {
int digit = Character.digit(stringLiteral.charAt(j + base), 16);
code = (code << 4) + digit;
}
sb.append((char)code);
i += 5;
continue;
}
if (currentChar == '\\' && (i + 4 < stringLiteral.length())) {
char i1 = stringLiteral.charAt(i + 1);
char i2 = stringLiteral.charAt(i + 2);
char i3 = stringLiteral.charAt(i + 3);
if ((i1 >= '0' && i1 <= '1') && (i2 >= '0' && i2 <= '7')
&& (i3 >= '0' && i3 <= '7')) {
byte bVal = (byte) ((i3 - '0') + ((i2 - '0') * 8) + ((i1 - '0') * 8 * 8));
byte[] bValArr = new byte[1];
bValArr[0] = bVal;
String tmp = new String(bValArr);
sb.append(tmp);
i += 3;
continue;
}
}
if (currentChar == '\\' && (i + 2 < stringLiteral.length())) {
char n = stringLiteral.charAt(i + 1);
switch (n) {
case '0':
sb.append("\0");
break;
case '\'':
sb.append("'");
break;
case '"':
sb.append("\"");
break;
case 'b':
sb.append("\b");
break;
case 'n':
sb.append("\n");
break;
case 'r':
sb.append("\r");
break;
case 't':
sb.append("\t");
break;
case 'Z':
sb.append("\u001A");
break;
case '\\':
sb.append("\\");
break;
// The following 2 lines are exactly what MySQL does TODO: why do we do this?
case '%':
sb.append("\\%");
break;
case '_':
sb.append("\\_");
break;
default:
sb.append(n);
}
i++;
} else {
sb.append(currentChar);
}
}
return sb.toString();
}
}
/**
* Get valid write ids from HMS for the acid table
* @param client the client to access HMS
* @param tableFullName the name for the table
* @return ValidWriteIdList object
*/
public static ValidWriteIdList fetchValidWriteIds(IMetaStoreClient client,
String tableFullName) throws TException {
return client.getValidWriteIds(tableFullName);
}
/**
* Get ValidWriteIdList object by given string
* @param validWriteIds ValidWriteIdList object in String
* @return ValidWriteIdList object
*/
public static ValidWriteIdList getValidWriteIdListFromString(String validWriteIds) {
Preconditions.checkNotNull(validWriteIds);
return new ValidReaderWriteIdList(validWriteIds);
}
/**
* Returns a ValidTxnList object that helps to identify in-progress and aborted
* transactions.
*/
public static ValidTxnList getValidTxns(IMetaStoreClient client) throws TException {
return client.getValidTxns();
}
/**
* Get validWriteIds in string with txnId and table name
* arguments.
*/
private static String getValidWriteIdListInTxn(IMetaStoreClient client, String dbName,
String tblName, long txnId)
throws TException {
ValidTxnList txns = client.getValidTxns(txnId);
String tableFullName = dbName + "." + tblName;
List<TableValidWriteIds> writeIdsObj = client.getValidWriteIds(
Lists.newArrayList(tableFullName), txns.toString());
ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(txnId);
for (TableValidWriteIds tableWriteIds : writeIdsObj) {
validTxnWriteIdList.addTableValidWriteIdList(
createValidReaderWriteIdList(tableWriteIds));
}
String validWriteIds =
validTxnWriteIdList.getTableValidWriteIdList(tableFullName).writeToString();
return validWriteIds;
}
/**
* Wrapper around HMS Partition object to get writeID
* WriteID is introduced in ACID 2
* It is used to detect changes of the partition
*/
public static long getWriteIdFromMSPartition(Partition partition) {
Preconditions.checkNotNull(partition);
return partition.getWriteId();
}
/**
* Wrapper around HMS Table object to get writeID
* Per table writeId is introduced in ACID 2
* It is used to detect changes of the table
*/
public static long getWriteIdFromMSTable(Table msTbl) {
Preconditions.checkNotNull(msTbl);
return msTbl.getWriteId();
}
/**
* Opens a new transaction.
* Sets userId to TRANSACTION_USER_ID.
* @param client is the HMS client to be used.
* @param userId of user who is opening this transaction.
* @return the new transaction id.
* @throws TransactionException
*/
public static long openTransaction(IMetaStoreClient client)
throws TransactionException {
try {
return client.openTxn(TRANSACTION_USER_ID);
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Commits a transaction.
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
* @throws TransactionException
*/
public static void commitTransaction(IMetaStoreClient client, long txnId)
throws TransactionException {
try {
client.commitTxn(txnId);
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Aborts a transaction.
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
* @throws TransactionException
*/
public static void abortTransaction(IMetaStoreClient client, long txnId)
throws TransactionException {
try {
client.abortTxns(Arrays.asList(txnId));
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Heartbeats a transaction and/or lock to keep them alive.
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
* @param lockId is the lock id.
* @return True on success, false if the transaction or lock is non-existent
* anymore.
* @throws In case of any other failures.
*/
public static boolean heartbeat(IMetaStoreClient client,
long txnId, long lockId) throws TransactionException {
String errorMsg = "Caught exception during heartbeating transaction " +
String.valueOf(txnId) + " lock " + String.valueOf(lockId);
LOG.info("Sending heartbeat for transaction " + String.valueOf(txnId) +
" lock " + String.valueOf(lockId));
try {
client.heartbeat(txnId, lockId);
} catch (NoSuchLockException e) {
LOG.info(errorMsg, e);
return false;
} catch (NoSuchTxnException e) {
LOG.info(errorMsg, e);
return false;
} catch (TxnAbortedException e) {
LOG.info(errorMsg, e);
return false;
} catch (TException e) {
throw new TransactionException(e.getMessage());
}
return true;
}
/**
* Creates a lock for the given lock components. Returns the acquired lock, this
* might involve some waiting.
* @param client is the HMS client to be used.
* @param txnId The transaction ID associated with the lock. Zero if the lock doesn't
* belong to a transaction.
* @param lockComponents the lock components to include in this lock.
* @return the lock id
* @throws TransactionException in case of failure
*/
public static long acquireLock(IMetaStoreClient client, long txnId,
List<LockComponent> lockComponents)
throws TransactionException {
LockRequestBuilder lockRequestBuilder = new LockRequestBuilder();
lockRequestBuilder.setUser(TRANSACTION_USER_ID);
if (txnId > 0) lockRequestBuilder.setTransactionId(txnId);
for (LockComponent lockComponent : lockComponents) {
lockRequestBuilder.addLockComponent(lockComponent);
}
LockRequest lockRequest = lockRequestBuilder.build();
try {
LockResponse lockResponse = client.lock(lockRequest);
long lockId = lockResponse.getLockid();
int retries = 0;
while (lockResponse.getState() == LockState.WAITING && retries < LOCK_RETRIES) {
try {
//TODO: add profile counter for lock waits.
LOG.info("Waiting " + String.valueOf(LOCK_RETRY_WAIT_SECONDS) +
" seconds for lock " + String.valueOf(lockId) + " of transaction " +
Long.toString(txnId));
Thread.sleep(LOCK_RETRY_WAIT_SECONDS * 1000);
++retries;
lockResponse = client.checkLock(lockId);
} catch (InterruptedException e) {
// Since wait time and number of retries is configurable it wouldn't add
// much value to make acquireLock() interruptible so we just swallow the
// exception here.
}
}
if (lockResponse.getState() == LockState.ACQUIRED) return lockId;
if (lockId > 0) {
try {
releaseLock(client, lockId);
} catch (TransactionException te) {
LOG.error("Failed to release lock as a cleanup step after acquiring a lock " +
"has failed: " + lockId + " " + te.getMessage());
}
}
throw new TransactionException("Failed to acquire lock for transaction " +
String.valueOf(txnId));
} catch (TException e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Releases a lock in HMS.
* @param client is the HMS client to be used.
* @param lockId is the lock ID to be released.
* @throws TransactionException
*/
public static void releaseLock(IMetaStoreClient client, long lockId)
throws TransactionException {
try {
client.unlock(lockId);
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Aborts a transaction and logs the error if there is an exception.
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
*/
public static void abortTransactionNoThrow(IMetaStoreClient client, long txnId) {
try {
client.abortTxns(Arrays.asList(txnId));
} catch (Exception e) {
LOG.error("Error in abortTxns.", e);
}
}
/**
* Allocates a write id for the given table.
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
* @param dbName is the database name.
* @param tableName is the target table name.
* @return the allocated write id.
* @throws TransactionException
*/
public static long allocateTableWriteId(IMetaStoreClient client, long txnId,
String dbName, String tableName) throws TransactionException {
try {
return client.allocateTableWriteId(txnId, dbName, tableName);
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Set impala capabilities to hive client
* Impala supports:
* - external table read/write
* - insert-only Acid table read
* - virtual view read
* - materialized view read
*/
public static synchronized void setHiveClientCapabilities() {
String hostName;
if (capabilitiestSet_) return;
try {
hostName = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException ue) {
hostName = "unknown";
}
String buildVersion = BackendConfig.INSTANCE != null ?
BackendConfig.INSTANCE.getImpalaBuildVersion() : String.valueOf(MAJOR_VERSION);
if (buildVersion == null) buildVersion = String.valueOf(MAJOR_VERSION);
String impalaId = String.format("Impala%s@%s", buildVersion, hostName);
String[] capabilities = new String[] {
EXTWRITE, // External table write
EXTREAD, // External table read
HIVEMANAGEDINSERTREAD, // Insert-only table read
HIVEMANAGEDINSERTWRITE, // Insert-only table write
HIVESQL,
HIVEMQT,
HIVEBUCKET2 // Includes the capability to get the correct bucket number.
// Currently, without this capability, for an external bucketed
// table, Hive will return the table as Read-only with bucket
// number -1. It makes clients unable to know it is a bucketed table.
// TODO: will remove this capability when Hive can provide
// API calls to tell the changing of bucket number.
};
HiveMetaStoreClient.setProcessorIdentifier(impalaId);
HiveMetaStoreClient.setProcessorCapabilities(capabilities);
capabilitiestSet_ = true;
}
/**
* Check if a table has a capability
* @param msTble hms table
* @param requireCapability hive access types or combination of them
* @return true if the table has the capability
*/
public static boolean hasTableCapability(Table msTbl, byte requiredCapability) {
Preconditions.checkNotNull(msTbl);
// access types in binary:
// ACCESSTYPE_NONE: 00000001
// ACCESSTYPE_READONLY: 00000010
// ACCESSTYPE_WRITEONLY: 00000100
// ACCESSTYPE_READWRITE: 00001000
return requiredCapability != ACCESSTYPE_NONE
&& ((msTbl.getAccessType() & requiredCapability) != 0);
}
/**
* Get Access type in string
* @param msTble hms table
* @return the string represents the table access type.
*/
public static String getTableAccessType(Table msTbl) {
Preconditions.checkNotNull(msTbl);
switch (msTbl.getAccessType()) {
case ACCESSTYPE_READONLY:
return "READONLY";
case ACCESSTYPE_WRITEONLY:
return "WRITEONLY";
case ACCESSTYPE_READWRITE:
return "READWRITE";
case ACCESSTYPE_NONE:
default:
return "NONE";
}
}
/**
* Set table access type. This is useful for hms Table object constructed for create
* table statement. For example, to create a table, we need Read/Write capabilities
* not default 0(not defined)
*/
public static void setTableAccessType(Table msTbl, byte accessType) {
Preconditions.checkNotNull(msTbl);
msTbl.setAccessType(accessType);
}
public static void setTableColumnStatsTransactional(IMetaStoreClient client,
Table msTbl, ColumnStatistics colStats, TblTransaction tblTxn)
throws ImpalaRuntimeException {
List<ColumnStatistics> colStatsList = new ArrayList<>();
colStatsList.add(colStats);
SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStatsList);
request.setWriteId(tblTxn.writeId);
request.setValidWriteIdList(tblTxn.validWriteIds);
try {
// Despite its name, the function below can and (and currently must) be used
// to set table level column statistics in transactional tables.
client.setPartitionColumnStatistics(request);
}
catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "setPartitionColumnStatistics"), e);
}
}
/**
* @return the hive major version
*/
public static long getMajorVersion() {
return MAJOR_VERSION;
}
/**
* Borrowed code from hive.
* This assumes that the caller intends to
* read the files, and thus treats both open and aborted write ids as invalid.
* @param tableWriteIds valid write ids for the given table from the metastore
* @return a valid write IDs list for the input table
*/
private static ValidReaderWriteIdList createValidReaderWriteIdList(
TableValidWriteIds tableWriteIds) {
String fullTableName = tableWriteIds.getFullTableName();
long highWater = tableWriteIds.getWriteIdHighWaterMark();
List<Long> invalids = tableWriteIds.getInvalidWriteIds();
BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits());
long[] exceptions = new long[invalids.size()];
int i = 0;
for (long writeId : invalids) {
exceptions[i++] = writeId;
}
if (tableWriteIds.isSetMinOpenWriteId()) {
return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits,
highWater, tableWriteIds.getMinOpenWriteId());
} else {
return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits,
highWater);
}
}
/**
* Return the default table path for a new table.
*
* Hive-3 doesn't allow managed table to be non transactional after HIVE-22158.
* Creating a non transactional managed table will finally result in an external table
* with table property "external.table.purge" set to true. As the table type become
* EXTERNAL, the location will be under "metastore.warehouse.external.dir" (HIVE-19837,
* introduces in hive-2.7, not in hive-2.1.x-cdh6.x yet).
*/
public static String getPathForNewTable(Database db, Table tbl)
throws MetaException {
Warehouse wh = new Warehouse(new HiveConf());
// Non transactional tables are all translated to external tables by HMS's default
// transformer (HIVE-22158). Note that external tables can't be transactional.
// So the request and result of the default transformer is:
// non transactional managed table => external table
// non transactional external table => external table
// transactional managed table => managed table
// transactional external table (not allowed)
boolean isExternal = !AcidUtils.isTransactionalTable(tbl.getParameters());
// TODO(IMPALA-9088): deal with customized transformer in HMS.
return wh.getDefaultTablePath(db, tbl.getTableName().toLowerCase(), isExternal)
.toString();
}
}