blob: 5689a0db2ffe69dc3e95cc0ed68529123b8561a5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.service;
import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READWRITE;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.PartitionDropOptions;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.io.IOUtils;
import org.apache.impala.analysis.AlterTableSortByStmt;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.analysis.TableName;
import org.apache.impala.authorization.AuthorizationConfig;
import org.apache.impala.authorization.AuthorizationDelta;
import org.apache.impala.authorization.AuthorizationManager;
import org.apache.impala.authorization.User;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.ColumnNotFoundException;
import org.apache.impala.catalog.ColumnStats;
import org.apache.impala.catalog.DataSource;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.FeCatalogUtils;
import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.HiveStorageDescriptorFactory;
import org.apache.impala.catalog.IncompleteTable;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.PartitionNotFoundException;
import org.apache.impala.catalog.PartitionStatsUtil;
import org.apache.impala.catalog.RowFormat;
import org.apache.impala.catalog.ScalarFunction;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.Transaction;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.View;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.JniUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.common.Reference;
import org.apache.impala.common.TransactionException;
import org.apache.impala.common.TransactionKeepalive.HeartbeatContext;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.JniCatalogConstants;
import org.apache.impala.thrift.TAlterDbParams;
import org.apache.impala.thrift.TAlterDbSetOwnerParams;
import org.apache.impala.thrift.TAlterTableAddColsParams;
import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams;
import org.apache.impala.thrift.TAlterTableAddPartitionParams;
import org.apache.impala.thrift.TAlterTableAlterColParams;
import org.apache.impala.thrift.TAlterTableDropColParams;
import org.apache.impala.thrift.TAlterTableDropPartitionParams;
import org.apache.impala.thrift.TAlterTableOrViewSetOwnerParams;
import org.apache.impala.thrift.TAlterTableParams;
import org.apache.impala.thrift.TAlterTableReplaceColsParams;
import org.apache.impala.thrift.TAlterTableSetCachedParams;
import org.apache.impala.thrift.TAlterTableSetFileFormatParams;
import org.apache.impala.thrift.TAlterTableSetLocationParams;
import org.apache.impala.thrift.TAlterTableSetRowFormatParams;
import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
import org.apache.impala.thrift.TAlterTableType;
import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TCatalogServiceRequestHeader;
import org.apache.impala.thrift.TCatalogUpdateResult;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TColumnName;
import org.apache.impala.thrift.TColumnStats;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TColumnValue;
import org.apache.impala.thrift.TCommentOnParams;
import org.apache.impala.thrift.TCreateDataSourceParams;
import org.apache.impala.thrift.TCreateDbParams;
import org.apache.impala.thrift.TCreateDropRoleParams;
import org.apache.impala.thrift.TCreateFunctionParams;
import org.apache.impala.thrift.TCreateOrAlterViewParams;
import org.apache.impala.thrift.TCreateTableLikeParams;
import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TDdlExecRequest;
import org.apache.impala.thrift.TDdlExecResponse;
import org.apache.impala.thrift.TDropDataSourceParams;
import org.apache.impala.thrift.TDropDbParams;
import org.apache.impala.thrift.TDropFunctionParams;
import org.apache.impala.thrift.TDropStatsParams;
import org.apache.impala.thrift.TDropTableOrViewParams;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TFunctionBinaryType;
import org.apache.impala.thrift.TGrantRevokePrivParams;
import org.apache.impala.thrift.TGrantRevokeRoleParams;
import org.apache.impala.thrift.THdfsCachingOp;
import org.apache.impala.thrift.THdfsFileFormat;
import org.apache.impala.thrift.TCopyTestCaseReq;
import org.apache.impala.thrift.TPartitionDef;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TPartitionStats;
import org.apache.impala.thrift.TRangePartitionOperationType;
import org.apache.impala.thrift.TResetMetadataRequest;
import org.apache.impala.thrift.TResetMetadataResponse;
import org.apache.impala.thrift.TResultRow;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.thrift.TStatus;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableRowFormat;
import org.apache.impala.thrift.TTableStats;
import org.apache.impala.thrift.TTestCaseData;
import org.apache.impala.thrift.TTruncateParams;
import org.apache.impala.thrift.TUpdateCatalogRequest;
import org.apache.impala.thrift.TUpdateCatalogResponse;
import org.apache.impala.util.CompressionUtil;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AcidUtils.TblTransaction;
import org.apache.impala.util.FunctionUtils;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.impala.util.KuduUtil;
import org.apache.impala.util.MetaStoreUtil;
import org.slf4j.Logger;
import org.apache.thrift.TException;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.slf4j.LoggerFactory;
/**
* Class used to execute Catalog Operations, including DDL and refresh/invalidate
* metadata requests. Acts as a bridge between the Thrift catalog operation requests
* and the non-thrift Java Catalog objects.
*
* Updates are applied first to the Hive Metastore and only if they succeed, are then
* applied to the catalog objects. To ensure consistency in the presence of failed HMS
* updates, DDL operations should not directly modify the HMS objects of the catalog
* objects but should operate on copies instead.
*
* The CatalogOpExecutor uses table-level locking to protect table metadata during
* concurrent modifications and is responsible for assigning a new catalog version when
* a table is modified (e.g. alterTable()).
*
* The following locking protocol is employed to ensure that modifying
* the table metadata and assigning a new catalog version is performed atomically and
* consistently in the presence of concurrent DDL operations. The following pattern
* ensures that the catalog lock is never held for a long period of time, preventing
* other DDL operations from making progress. This pattern only applies to single-table
* update operations and requires the use of fair table locks to prevent starvation.
*
* DO {
* Acquire the catalog lock (see CatalogServiceCatalog.versionLock_)
* Try to acquire a table lock
* IF the table lock acquisition fails {
* Release the catalog lock
* YIELD()
* ELSE
* BREAK
* } WHILE (TIMEOUT);
*
* If (TIMEOUT) report error
*
* Increment and get a new catalog version
* Release the catalog lock
* Modify table metadata
* Release table lock
*
* Note: The getCatalogObjects() function is the only case where this locking pattern is
* not used since it accesses multiple catalog entities in order to compute a snapshot
* of catalog metadata.
*
* Operations that CREATE/DROP catalog objects such as tables and databases employ the
* following locking protocol:
* 1. Acquire the metastoreDdlLock_
* 2. Update the Hive Metastore
* 3. Increment and get a new catalog version
* 4. Update the catalog
* 5. Grant/revoke owner privilege if authorization with ownership is enabled.
* 6. Release the metastoreDdlLock_
*
* It is imperative that other operations that need to hold both the catalog lock and
* table locks at the same time follow the same locking protocol and acquire these
* locks in that particular order. Also, operations that modify table metadata
* (e.g. alter table statements) should not acquire the metastoreDdlLock_.
*
* TODO: Refactor the CatalogOpExecutor and CatalogServiceCatalog classes and consolidate
* the locking protocol into a single class.
*
* TODO: Improve catalog's consistency guarantees by using a hierarchical locking scheme.
* Currently, only concurrent modidications to table metadata are guaranteed to be
* serialized. Concurrent DDL operations that DROP/ADD catalog objects,
* especially in the presence of INVALIDATE METADATA and REFRESH, are not guaranteed to
* be consistent (see IMPALA-2774).
*
* TODO: Create a Hive Metastore utility class to move code that interacts with the
* metastore out of this class.
*/
public class CatalogOpExecutor {
private static final Logger LOG = LoggerFactory.getLogger(CatalogOpExecutor.class);
// Format string for exceptions returned by Hive Metastore RPCs.
private final static String HMS_RPC_ERROR_FORMAT_STR =
"Error making '%s' RPC to Hive Metastore: ";
// Error string for inconsistent blacklisted dbs/tables configs between catalogd and
// coordinators.
private final static String BLACKLISTED_DBS_INCONSISTENT_ERR_STR =
"--blacklisted_dbs may be inconsistent between catalogd and coordinators";
private final static String BLACKLISTED_TABLES_INCONSISTENT_ERR_STR =
"--blacklisted_tables may be inconsistent between catalogd and coordinators";
// Table capabilities property name
private static final String CAPABILITIES_KEY = "OBJCAPABILITIES";
// Table default capabilities
private static final String ACIDINSERTONLY_CAPABILITIES =
"HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE";
private static final String NONACID_CAPABILITIES = "EXTREAD,EXTWRITE";
// The maximum number of partitions to update in one Hive Metastore RPC.
// Used when persisting the results of COMPUTE STATS statements.
// It is also used as an upper limit for the number of partitions allowed in one ADD
// PARTITION statement.
public final static short MAX_PARTITION_UPDATES_PER_RPC = 500;
private final CatalogServiceCatalog catalog_;
private final AuthorizationConfig authzConfig_;
private final AuthorizationManager authzManager_;
// Lock used to ensure that CREATE[DROP] TABLE[DATABASE] operations performed in
// catalog_ and the corresponding RPC to apply the change in HMS are atomic.
private final Object metastoreDdlLock_ = new Object();
public CatalogOpExecutor(CatalogServiceCatalog catalog, AuthorizationConfig authzConfig,
AuthorizationManager authzManager) throws ImpalaException {
Preconditions.checkNotNull(authzManager);
catalog_ = Preconditions.checkNotNull(catalog);
authzConfig_ = Preconditions.checkNotNull(authzConfig);
authzManager_ = Preconditions.checkNotNull(authzManager);
}
public CatalogServiceCatalog getCatalog() { return catalog_; }
public AuthorizationManager getAuthzManager() { return authzManager_; }
public TDdlExecResponse execDdlRequest(TDdlExecRequest ddlRequest)
throws ImpalaException {
TDdlExecResponse response = new TDdlExecResponse();
response.setResult(new TCatalogUpdateResult());
response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
User requestingUser = null;
if (ddlRequest.isSetHeader()) {
requestingUser = new User(ddlRequest.getHeader().getRequesting_user());
}
boolean syncDdl = ddlRequest.isSync_ddl();
switch (ddlRequest.ddl_type) {
case ALTER_DATABASE:
alterDatabase(ddlRequest.getAlter_db_params(), response);
break;
case ALTER_TABLE:
alterTable(ddlRequest.getAlter_table_params(), response);
break;
case ALTER_VIEW:
alterView(ddlRequest.getAlter_view_params(), response);
break;
case CREATE_DATABASE:
createDatabase(ddlRequest.getCreate_db_params(), response);
break;
case CREATE_TABLE_AS_SELECT:
response.setNew_table_created(createTable(
ddlRequest.getCreate_table_params(), response, syncDdl));
break;
case CREATE_TABLE:
createTable(ddlRequest.getCreate_table_params(), response, syncDdl);
break;
case CREATE_TABLE_LIKE:
createTableLike(ddlRequest.getCreate_table_like_params(), response, syncDdl);
break;
case CREATE_VIEW:
createView(ddlRequest.getCreate_view_params(), response);
break;
case CREATE_FUNCTION:
createFunction(ddlRequest.getCreate_fn_params(), response);
break;
case CREATE_DATA_SOURCE:
createDataSource(ddlRequest.getCreate_data_source_params(), response);
break;
case COMPUTE_STATS:
Preconditions.checkState(false, "Compute stats should trigger an ALTER TABLE.");
break;
case DROP_STATS:
dropStats(ddlRequest.getDrop_stats_params(), response);
break;
case DROP_DATABASE:
dropDatabase(ddlRequest.getDrop_db_params(), response);
break;
case DROP_TABLE:
case DROP_VIEW:
dropTableOrView(ddlRequest.getDrop_table_or_view_params(), response);
break;
case TRUNCATE_TABLE:
truncateTable(ddlRequest.getTruncate_params(), response);
break;
case DROP_FUNCTION:
dropFunction(ddlRequest.getDrop_fn_params(), response);
break;
case DROP_DATA_SOURCE:
dropDataSource(ddlRequest.getDrop_data_source_params(), response);
break;
case CREATE_ROLE:
createRole(requestingUser, ddlRequest.getCreate_drop_role_params(), response);
break;
case DROP_ROLE:
dropRole(requestingUser, ddlRequest.getCreate_drop_role_params(), response);
break;
case GRANT_ROLE:
grantRoleToGroup(requestingUser, ddlRequest.getGrant_revoke_role_params(),
response);
break;
case REVOKE_ROLE:
revokeRoleFromGroup(requestingUser, ddlRequest.getGrant_revoke_role_params(),
response);
break;
case GRANT_PRIVILEGE:
grantPrivilege(ddlRequest.getHeader(), ddlRequest.getGrant_revoke_priv_params(),
response);
break;
case REVOKE_PRIVILEGE:
revokePrivilege(ddlRequest.getHeader(), ddlRequest.getGrant_revoke_priv_params(),
response);
break;
case COMMENT_ON:
alterCommentOn(ddlRequest.getComment_on_params(), response);
break;
case COPY_TESTCASE:
copyTestCaseData(ddlRequest.getCopy_test_case_params(), response);
break;
default: throw new IllegalStateException("Unexpected DDL exec request type: " +
ddlRequest.ddl_type);
}
// If SYNC_DDL is set, set the catalog update that contains the results of this DDL
// operation. The version of this catalog update is returned to the requesting
// impalad which will wait until this catalog update has been broadcast to all the
// coordinators.
if (syncDdl) {
response.getResult().setVersion(
catalog_.waitForSyncDdlVersion(response.getResult()));
}
// At this point, the operation is considered successful. If any errors occurred
// during execution, this function will throw an exception and the CatalogServer
// will handle setting a bad status code.
response.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>()));
return response;
}
/**
* Loads the testcase metadata from the request into the catalog cache and returns
* the query statement this input testcase corresponds to. When loading the table and
* database objects, this method overwrites any existing tables or databases with the
* same name. However, these overwrites are *not* persistent. The old table/db
* states can be recovered by blowing away the cache using INVALIDATE METADATA.
*/
@VisibleForTesting
public String copyTestCaseData(
TCopyTestCaseReq request, TDdlExecResponse response)
throws ImpalaException {
Path inputPath = new Path(Preconditions.checkNotNull(request.input_path));
// Read the data from the source FS.
FileSystem fs;
FSDataInputStream in;
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
fs = FileSystemUtil.getFileSystemForPath(inputPath);
in = fs.open(inputPath);
IOUtils.copyBytes(in, out, fs.getConf(), /*close streams*/true);
} catch (IOException e) {
throw new ImpalaRuntimeException(String.format("Error reading test case data from" +
" path: %s", inputPath), e);
}
byte[] decompressedBytes = CompressionUtil.deflateDecompress(out.toByteArray());
TTestCaseData testCaseData = new TTestCaseData();
try {
JniUtil.deserializeThrift(testCaseData, decompressedBytes);
} catch (ImpalaException e) {
throw new CatalogException(String.format("Error deserializing the testcase data " +
"at path %s. File data may be corrupt or incompatible with the current version "
+ "of Impala.", inputPath.toString()),e);
}
// Add the databases first, followed by the table and views information.
// Overwrites any existing Db/Table objects with name clashes. Since we overwrite
// the state in-memory and do not flush it to HMS, the older state can be recovered
// by loading everything back from HMS. For ex: INVALIDATE METADATA.
int numDbsAdded = 0;
if (testCaseData.getDbs() != null) {
for (TDatabase thriftDb : testCaseData.getDbs()) {
Db db = Db.fromTDatabase(thriftDb);
// Set a new version to force an overwrite if a Db already exists with the same
// name.
db.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
Db ret = catalog_.addDb(db.getName(), db.getMetaStoreDb());
if (ret != null) {
++numDbsAdded;
response.result.addToUpdated_catalog_objects(db.toTCatalogObject());
}
}
}
int numTblsAdded = 0;
int numViewsAdded = 0;
if (testCaseData.getTables_and_views() != null) {
for (TTable tTable : testCaseData.tables_and_views) {
Db db = catalog_.getDb(tTable.db_name);
// Db should have been created by now.
Preconditions.checkNotNull(db, String.format("Missing db %s", tTable.db_name));
Table t = Table.fromThrift(db, tTable);
// Set a new version to force an overwrite if a table already exists with the same
// name.
t.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
catalog_.addTable(db, t);
if (t instanceof View) {
++numViewsAdded;
} else {
++numTblsAdded;
}
// The table lock is needed here since toTCatalogObject() calls Table#toThrift()
// which expects the current thread to hold this lock. For more details refer
// to IMPALA-4092.
t.getLock().lock();
try {
response.result.addToUpdated_catalog_objects(t.toTCatalogObject());
} finally {
t.getLock().unlock();
}
}
}
StringBuilder responseStr = new StringBuilder();
responseStr.append(String.format("Testcase generated using Impala version %s. ",
testCaseData.getImpala_version()));
responseStr.append(String.format(
"%d db(s), %d table(s) and %d view(s) imported for query: ", numDbsAdded,
numTblsAdded, numViewsAdded));
responseStr.append("\n\n").append(testCaseData.getQuery_stmt());
LOG.info(String.format("%s. Testcase path: %s", responseStr, inputPath));
addSummary(response, responseStr.toString());
return testCaseData.getQuery_stmt();
}
/**
* Create result set from string 'summary', and attach it to 'response'.
*/
private static void addSummary(TDdlExecResponse response, String summary) {
TColumnValue resultColVal = new TColumnValue();
resultColVal.setString_val(summary);
TResultSet resultSet = new TResultSet();
resultSet.setSchema(new TResultSetMetadata(Lists.newArrayList(new TColumn(
"summary", Type.STRING.toThrift()))));
TResultRow resultRow = new TResultRow();
resultRow.setColVals(Lists.newArrayList(resultColVal));
resultSet.setRows(Lists.newArrayList(resultRow));
response.setResult_set(resultSet);
}
/**
* This method checks if the write lock of 'catalog_' is unlocked. If it's still locked
* then it logs an error and unlocks it.
*/
private void UnlockWriteLockIfErronouslyLocked() {
if(catalog_.getLock().isWriteLockedByCurrentThread()) {
LOG.error("Write lock should have been released.");
catalog_.getLock().writeLock().unlock();
}
}
/**
* Execute the ALTER TABLE command according to the TAlterTableParams and refresh the
* table metadata, except for RENAME, ADD PARTITION and DROP PARTITION. This call is
* thread-safe, i.e. concurrent operations on the same table are serialized.
*/
private void alterTable(TAlterTableParams params, TDdlExecResponse response)
throws ImpalaException {
// When true, loads the file/block metadata.
boolean reloadFileMetadata = false;
// When true, loads the table schema and the column stats from the Hive Metastore.
boolean reloadTableSchema = false;
Reference<Long> numUpdatedPartitions = new Reference<>(0L);
TableName tableName = TableName.fromThrift(params.getTable_name());
Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
"Load for ALTER TABLE");
if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
|| params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
TableName newTableName = TableName.fromThrift(
params.getRename_params().getNew_table_name());
Preconditions.checkState(!catalog_.isBlacklistedTable(newTableName),
String.format("Can't rename to blacklisted table name: %s. %s", newTableName,
BLACKLISTED_DBS_INCONSISTENT_ERR_STR));
}
tryLock(tbl);
// Get a new catalog version to assign to the table being altered.
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(), newCatalogVersion);
final Timer.Context context
= tbl.getMetrics().getTimer(Table.ALTER_DURATION_METRIC).time();
try {
if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
|| params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
// RENAME is implemented as an ADD + DROP, so we need to execute it as we hold
// the catalog lock.
try {
alterTableOrViewRename(tbl,
TableName.fromThrift(params.getRename_params().getNew_table_name()),
response);
return;
} finally {
// release the version taken in the tryLock call above
catalog_.getLock().writeLock().unlock();
}
}
Table refreshedTable = null;
boolean reloadMetadata = true;
catalog_.getLock().writeLock().unlock();
if (tbl instanceof KuduTable && altersKuduTable(params.getAlter_type())) {
alterKuduTable(params, response, (KuduTable) tbl, newCatalogVersion);
return;
}
switch (params.getAlter_type()) {
case ADD_COLUMNS:
TAlterTableAddColsParams addColParams = params.getAdd_cols_params();
boolean added = alterTableAddCols(tbl, addColParams.getColumns(),
addColParams.isIf_not_exists());
reloadTableSchema = true;
if (added) {
addSummary(response, "New column(s) have been added to the table.");
} else {
addSummary(response, "No new column(s) have been added to the table.");
}
break;
case REPLACE_COLUMNS:
TAlterTableReplaceColsParams replaceColParams = params.getReplace_cols_params();
alterTableReplaceCols(tbl, replaceColParams.getColumns());
reloadTableSchema = true;
addSummary(response, "Table columns have been replaced.");
break;
case ADD_PARTITION:
// Create and add HdfsPartition objects to the corresponding HdfsTable and load
// their block metadata. Get the new table object with an updated catalog
// version.
refreshedTable = alterTableAddPartitions(tbl, params.getAdd_partition_params());
if (refreshedTable != null) {
refreshedTable.setCatalogVersion(newCatalogVersion);
addTableToCatalogUpdate(refreshedTable, response.result);
}
reloadMetadata = false;
addSummary(response, "New partition has been added to the table.");
break;
case DROP_COLUMN:
TAlterTableDropColParams dropColParams = params.getDrop_col_params();
alterTableDropCol(tbl, dropColParams.getCol_name());
reloadTableSchema = true;
addSummary(response, "Column has been dropped.");
break;
case ALTER_COLUMN:
TAlterTableAlterColParams alterColParams = params.getAlter_col_params();
alterTableAlterCol(tbl, alterColParams.getCol_name(),
alterColParams.getNew_col_def());
reloadTableSchema = true;
addSummary(response, "Column has been altered.");
break;
case DROP_PARTITION:
TAlterTableDropPartitionParams dropPartParams =
params.getDrop_partition_params();
// Drop the partition from the corresponding table. Get the table object
// with an updated catalog version. If the partition does not exist and
// "IfExists" is true, null is returned. If "purge" option is specified
// partition data is purged by skipping Trash, if configured.
refreshedTable = alterTableDropPartition(
tbl, dropPartParams.getPartition_set(),
dropPartParams.isIf_exists(),
dropPartParams.isPurge(), numUpdatedPartitions);
if (refreshedTable != null) {
refreshedTable.setCatalogVersion(newCatalogVersion);
addTableToCatalogUpdate(refreshedTable, response.result);
}
addSummary(response,
"Dropped " + numUpdatedPartitions.getRef() + " partition(s).");
reloadMetadata = false;
break;
case RENAME_TABLE:
case RENAME_VIEW:
Preconditions.checkState(false,
"RENAME TABLE/VIEW operation has been processed");
break;
case SET_FILE_FORMAT:
TAlterTableSetFileFormatParams fileFormatParams =
params.getSet_file_format_params();
reloadFileMetadata = alterTableSetFileFormat(
tbl, fileFormatParams.getPartition_set(),
fileFormatParams.getFile_format(), numUpdatedPartitions);
if (fileFormatParams.isSetPartition_set()) {
addSummary(response,
"Updated " + numUpdatedPartitions.getRef() + " partition(s).");
} else {
addSummary(response, "Updated table.");
}
break;
case SET_ROW_FORMAT:
TAlterTableSetRowFormatParams rowFormatParams =
params.getSet_row_format_params();
reloadFileMetadata = alterTableSetRowFormat(tbl,
rowFormatParams.getPartition_set(), rowFormatParams.getRow_format(),
numUpdatedPartitions);
if (rowFormatParams.isSetPartition_set()) {
addSummary(response,
"Updated " + numUpdatedPartitions.getRef() + " partition(s).");
} else {
addSummary(response, "Updated table.");
}
break;
case SET_LOCATION:
TAlterTableSetLocationParams setLocationParams =
params.getSet_location_params();
List<TPartitionKeyValue> partitionSpec = setLocationParams.getPartition_spec();
reloadFileMetadata = alterTableSetLocation(tbl, partitionSpec,
setLocationParams.getLocation());
if (partitionSpec == null) {
addSummary(response, "New location has been set.");
} else {
addSummary(response, "New location has been set for the specified partition.");
}
break;
case SET_TBL_PROPERTIES:
alterTableSetTblProperties(tbl, params.getSet_tbl_properties_params(),
numUpdatedPartitions);
reloadTableSchema = true;
if (params.getSet_tbl_properties_params().isSetPartition_set()) {
addSummary(response,
"Updated " + numUpdatedPartitions.getRef() + " partition(s).");
} else {
addSummary(response, "Updated table.");
}
break;
case UPDATE_STATS:
Preconditions.checkState(params.isSetUpdate_stats_params());
Reference<Long> numUpdatedColumns = new Reference<>(0L);
alterTableUpdateStats(tbl, params.getUpdate_stats_params(),
numUpdatedPartitions, numUpdatedColumns);
reloadTableSchema = true;
addSummary(response, "Updated " + numUpdatedPartitions.getRef() +
" partition(s) and " + numUpdatedColumns.getRef() + " column(s).");
break;
case SET_CACHED:
Preconditions.checkState(params.isSetSet_cached_params());
String op = params.getSet_cached_params().getCache_op().isSet_cached() ?
"Cached " : "Uncached ";
if (params.getSet_cached_params().getPartition_set() == null) {
reloadFileMetadata =
alterTableSetCached(tbl, params.getSet_cached_params());
addSummary(response, op + "table.");
} else {
alterPartitionSetCached(tbl, params.getSet_cached_params(),
numUpdatedPartitions);
addSummary(response,
op + numUpdatedPartitions.getRef() + " partition(s).");
}
break;
case RECOVER_PARTITIONS:
alterTableRecoverPartitions(tbl);
addSummary(response, "Partitions have been recovered.");
break;
case SET_OWNER:
Preconditions.checkState(params.isSetSet_owner_params());
alterTableOrViewSetOwner(tbl, params.getSet_owner_params(), response);
addSummary(response, "Updated table/view.");
break;
default:
throw new UnsupportedOperationException(
"Unknown ALTER TABLE operation type: " + params.getAlter_type());
}
if (reloadMetadata) {
loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata,
reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name());
addTableToCatalogUpdate(tbl, response.result);
}
// now that HMS alter operation has succeeded, add this version to list of inflight
// events in catalog table if event processing is enabled
catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
} finally {
context.stop();
UnlockWriteLockIfErronouslyLocked();
tbl.getLock().unlock();
}
}
/**
* Returns true if the given alteration type changes the underlying table stored in
* Kudu in addition to the HMS table.
*/
private boolean altersKuduTable(TAlterTableType type) {
return type == TAlterTableType.ADD_COLUMNS
|| type == TAlterTableType.REPLACE_COLUMNS
|| type == TAlterTableType.DROP_COLUMN
|| type == TAlterTableType.ALTER_COLUMN
|| type == TAlterTableType.ADD_DROP_RANGE_PARTITION;
}
/**
* Executes the ALTER TABLE command for a Kudu table and reloads its metadata.
*/
private void alterKuduTable(TAlterTableParams params, TDdlExecResponse response,
KuduTable tbl, long newCatalogVersion) throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
switch (params.getAlter_type()) {
case ADD_COLUMNS:
TAlterTableAddColsParams addColParams = params.getAdd_cols_params();
KuduCatalogOpExecutor.addColumn(tbl, addColParams.getColumns());
addSummary(response, "Column(s) have been added.");
break;
case REPLACE_COLUMNS:
TAlterTableReplaceColsParams replaceColParams = params.getReplace_cols_params();
KuduCatalogOpExecutor.addColumn(tbl, replaceColParams.getColumns());
addSummary(response, "Column(s) have been replaced.");
break;
case DROP_COLUMN:
TAlterTableDropColParams dropColParams = params.getDrop_col_params();
KuduCatalogOpExecutor.dropColumn(tbl, dropColParams.getCol_name());
addSummary(response, "Column has been dropped.");
break;
case ALTER_COLUMN:
TAlterTableAlterColParams alterColParams = params.getAlter_col_params();
KuduCatalogOpExecutor.alterColumn(tbl, alterColParams.getCol_name(),
alterColParams.getNew_col_def());
addSummary(response, "Column has been altered.");
break;
case ADD_DROP_RANGE_PARTITION:
TAlterTableAddDropRangePartitionParams partParams =
params.getAdd_drop_range_partition_params();
KuduCatalogOpExecutor.addDropRangePartition(tbl, partParams);
addSummary(response, "Range partition has been " +
(partParams.type == TRangePartitionOperationType.ADD ?
"added." : "dropped."));
break;
default:
throw new UnsupportedOperationException(
"Unsupported ALTER TABLE operation for Kudu tables: " +
params.getAlter_type());
}
loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER KUDU TABLE " +
params.getAlter_type().name());
addTableToCatalogUpdate(tbl, response.result);
}
/**
* Loads the metadata of a table 'tbl' and assigns a new catalog version.
* 'reloadFileMetadata', 'reloadTableSchema', and 'partitionsToUpdate'
* are used only for HdfsTables and control which metadata to reload.
* Throws a CatalogException if there is an error loading table metadata.
*/
private void loadTableMetadata(Table tbl, long newCatalogVersion,
boolean reloadFileMetadata, boolean reloadTableSchema,
Set<String> partitionsToUpdate, String reason) throws CatalogException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Table msTbl =
getMetaStoreTable(msClient, tbl);
if (tbl instanceof HdfsTable) {
((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
reloadFileMetadata, reloadTableSchema, partitionsToUpdate, reason);
} else {
tbl.load(true, msClient.getHiveClient(), msTbl, reason);
}
}
tbl.setCatalogVersion(newCatalogVersion);
}
/**
* Serializes and adds table 'tbl' to a TCatalogUpdateResult object. Uses the
* version of the serialized table as the version of the catalog update result.
*/
private static void addTableToCatalogUpdate(Table tbl, TCatalogUpdateResult result) {
Preconditions.checkNotNull(tbl);
TCatalogObject updatedCatalogObject = tbl.toTCatalogObject();
// TODO(todd): if client is a 'v2' impalad, only send back invalidation
result.addToUpdated_catalog_objects(updatedCatalogObject);
result.setVersion(updatedCatalogObject.getCatalog_version());
}
private Table addHdfsPartitions(Table tbl, List<Partition> partitions)
throws CatalogException {
Preconditions.checkNotNull(tbl);
Preconditions.checkNotNull(partitions);
if (!(tbl instanceof HdfsTable)) {
throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table");
}
HdfsTable hdfsTable = (HdfsTable) tbl;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
List<HdfsPartition> hdfsPartitions = hdfsTable.createAndLoadPartitions(
msClient.getHiveClient(), partitions);
for (HdfsPartition hdfsPartition: hdfsPartitions) {
catalog_.addPartition(hdfsPartition);
}
}
return hdfsTable;
}
/**
* Alters an existing view's definition in the metastore. Throws an exception
* if the view does not exist or if the existing metadata entry is
* a table instead of a a view.
*/
private void alterView(TCreateOrAlterViewParams params, TDdlExecResponse resp)
throws ImpalaException {
TableName tableName = TableName.fromThrift(params.getView_name());
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
Preconditions.checkState(params.getColumns() != null &&
params.getColumns().size() > 0,
"Null or empty column list given as argument to DdlExecutor.alterView");
Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
"Load for ALTER VIEW");
Preconditions.checkState(tbl instanceof View, "Expected view: %s",
tableName);
tryLock(tbl);
try {
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
catalog_.getLock().writeLock().unlock();
addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(),
newCatalogVersion);
// Operate on a copy of the metastore table to avoid prematurely applying the
// alteration to our cached table in case the actual alteration fails.
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
if (!msTbl.getTableType().equalsIgnoreCase(
(TableType.VIRTUAL_VIEW.toString()))) {
throw new ImpalaRuntimeException(
String.format("ALTER VIEW not allowed on a table: %s",
tableName.toString()));
}
// Set the altered view attributes and update the metastore.
setAlterViewAttributes(params, msTbl);
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Altering view %s", tableName));
}
applyAlterTable(msTbl);
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
tbl.load(true, msClient.getHiveClient(), msTbl, "ALTER VIEW");
}
addSummary(resp, "View has been altered.");
tbl.setCatalogVersion(newCatalogVersion);
addTableToCatalogUpdate(tbl, resp.result);
} finally {
UnlockWriteLockIfErronouslyLocked();
tbl.getLock().unlock();
}
}
/**
* Adds the catalog service id and the given catalog version to the table
* parameters. No-op if event processing is disabled
*/
private void addCatalogServiceIdentifiers(Table tbl, String catalogServiceId,
long newCatalogVersion) {
if (!catalog_.isExternalEventProcessingEnabled()) return;
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable();
msTbl.putToParameters(
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalogServiceId);
msTbl.putToParameters(
MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
String.valueOf(newCatalogVersion));
}
/**
* Alters an existing table's table and/or column statistics. Partitions are updated
* in batches of size 'MAX_PARTITION_UPDATES_PER_RPC'.
* This function is used by COMPUTE STATS, COMPUTE INCREMENTAL STATS and
* ALTER TABLE SET COLUMN STATS.
* Updates table property 'impala.lastComputeStatsTime' for COMPUTE (INCREMENTAL) STATS,
* but not for ALTER TABLE SET COLUMN STATS.
* Returns the number of updated partitions and columns in 'numUpdatedPartitions'
* and 'numUpdatedColumns', respectively.
*/
private void alterTableUpdateStats(Table table, TAlterTableUpdateStatsParams params,
Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns)
throws ImpalaException {
Preconditions.checkState(table.getLock().isHeldByCurrentThread());
Preconditions.checkState(params.isSetTable_stats() || params.isSetColumn_stats());
TableName tableName = table.getTableName();
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
if (LOG.isInfoEnabled()) {
int numPartitions =
params.isSetPartition_stats() ? params.partition_stats.size() : 0;
int numColumns =
params.isSetColumn_stats() ? params.column_stats.size() : 0;
LOG.info(String.format(
"Updating stats for table %s: table-stats=%s partitions=%d column-stats=%d",
tableName, params.isSetTable_stats(), numPartitions, numColumns));
}
// Deep copy the msTbl to avoid updating our cache before successfully persisting
// the results to the metastore.
org.apache.hadoop.hive.metastore.api.Table msTbl =
table.getMetaStoreTable().deepCopy();
// TODO: Transaction committing / aborting seems weird for stat update, but I don't
// see other ways to get a new write id (which is needed to update
// transactional tables). Hive seems to use internal API for this.
// See IMPALA-8865 about plans to improve this.
TblTransaction tblTxn = null;
try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
try {
if (AcidUtils.isTransactionalTable(msTbl.getParameters())) {
tblTxn = MetastoreShim.createTblTransaction(
msClient.getHiveClient(), msTbl, -1 /* opens new transaction */);
}
alterTableUpdateStatsInner(table, msTbl, params,
numUpdatedPartitions, numUpdatedColumns, msClient, tblTxn);
if (tblTxn != null) {
MetastoreShim.commitTblTransactionIfNeeded(msClient.getHiveClient(), tblTxn);
}
} catch (Exception ex) {
if (tblTxn != null) {
MetastoreShim.abortTblTransactionIfNeeded(msClient.getHiveClient(), tblTxn);
}
throw ex;
}
}
}
private void alterTableUpdateStatsInner(Table table,
org.apache.hadoop.hive.metastore.api.Table msTbl,
TAlterTableUpdateStatsParams params,
Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns,
MetaStoreClient msClient, TblTransaction tblTxn)
throws ImpalaException {
// Update column stats.
numUpdatedColumns.setRef(0L);
if (params.isSetColumn_stats()) {
ColumnStatistics colStats = createHiveColStats(params, table);
if (colStats.getStatsObjSize() > 0) {
if (tblTxn != null) {
MetastoreShim.setTableColumnStatsTransactional(
msClient.getHiveClient(), msTbl, colStats, tblTxn);
} else {
try {
msClient.getHiveClient().updateTableColumnStatistics(colStats);
} catch (Exception e) {
throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
"updateTableColumnStatistics"), e);
}
}
}
numUpdatedColumns.setRef((long) colStats.getStatsObjSize());
}
// Update partition-level row counts and incremental column stats for
// partitioned Hdfs tables.
List<HdfsPartition> modifiedParts = null;
if (params.isSetPartition_stats() && table.getNumClusteringCols() > 0) {
Preconditions.checkState(table instanceof HdfsTable);
modifiedParts = updatePartitionStats(params, (HdfsTable) table);
bulkAlterPartitions(table, modifiedParts, tblTxn);
}
if (params.isSetTable_stats()) {
// Update table row count and total file bytes.
updateTableStats(params, msTbl);
// Set impala.lastComputeStatsTime just before alter_table to ensure that it is as
// accurate as possible.
Table.updateTimestampProperty(msTbl, HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME);
}
// Apply property changes like numRows.
msTbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
applyAlterTable(msTbl, false, tblTxn);
numUpdatedPartitions.setRef(0L);
if (modifiedParts != null) {
numUpdatedPartitions.setRef((long) modifiedParts.size());
} else if (params.isSetTable_stats()) {
numUpdatedPartitions.setRef(1L);
}
}
/**
* Updates the row counts and incremental column stats of the partitions in the given
* Impala table based on the given update stats parameters. Returns the modified Impala
* partitions.
* Row counts for missing or new partitions as a result of concurrent table alterations
* are set to 0.
*/
private List<HdfsPartition> updatePartitionStats(TAlterTableUpdateStatsParams params,
HdfsTable table) throws ImpalaException {
Preconditions.checkState(params.isSetPartition_stats());
List<HdfsPartition> modifiedParts = Lists.newArrayList();
// TODO(todd) only load the partitions that were modified in 'params'.
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions(table);
for (FeFsPartition fePartition: parts) {
// TODO(todd): avoid downcast to implementation class
HdfsPartition partition = (HdfsPartition)fePartition;
// NULL keys are returned as 'NULL' in the partition_stats map, so don't substitute
// this partition's keys with Hive's replacement value.
List<String> partitionValues = partition.getPartitionValuesAsStrings(false);
TPartitionStats partitionStats = params.partition_stats.get(partitionValues);
if (partitionStats == null) {
// No stats were collected for this partition. This means that it was not included
// in the original computation statements. If the backend does not find any rows
// for a partition that should be included, it will generate an empty
// TPartitionStats object.
if (params.expect_all_partitions == false) continue;
// If all partitions are expected, fill in any missing stats with an empty entry.
partitionStats = new TPartitionStats();
if (params.is_incremental) {
partitionStats.intermediate_col_stats = Maps.newHashMap();
}
partitionStats.stats = new TTableStats();
partitionStats.stats.setNum_rows(0L);
}
// Unconditionally update the partition stats and row count, even if the partition
// already has identical ones. This behavior results in possibly redundant work,
// but it is predictable and easy to reason about because it does not depend on the
// existing state of the metadata. See IMPALA-2201.
long numRows = partitionStats.stats.num_rows;
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Updating stats for partition %s: numRows=%d",
partition.getValuesAsString(), numRows));
}
PartitionStatsUtil.partStatsToPartition(partitionStats, partition);
partition.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
// HMS requires this param for stats changes to take effect.
partition.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
partition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
modifiedParts.add(partition);
}
return modifiedParts;
}
/**
* Updates the row count and total file bytes of the given HMS table based on the
* the update stats parameters.
*/
private void updateTableStats(TAlterTableUpdateStatsParams params,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaException {
Preconditions.checkState(params.isSetTable_stats());
long numRows = params.table_stats.num_rows;
// Update the table's ROW_COUNT and TOTAL_SIZE parameters.
msTbl.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
if (params.getTable_stats().isSetTotal_file_bytes()) {
msTbl.putToParameters(StatsSetupConst.TOTAL_SIZE,
String.valueOf(params.getTable_stats().total_file_bytes));
}
// HMS requires this param for stats changes to take effect.
Pair<String, String> statsTaskParam = MetastoreShim.statsGeneratedViaStatsTaskParam();
msTbl.putToParameters(statsTaskParam.first, statsTaskParam.second);
}
/**
* Create HMS column statistics for the given table based on the give map from column
* name to column stats. Missing or new columns as a result of concurrent table
* alterations are ignored.
*/
private static ColumnStatistics createHiveColStats(
TAlterTableUpdateStatsParams params, Table table) {
Preconditions.checkState(params.isSetColumn_stats());
// Collection of column statistics objects to be returned.
ColumnStatistics colStats = MetastoreShim.createNewHiveColStats();
colStats.setStatsDesc(
new ColumnStatisticsDesc(true, table.getDb().getName(), table.getName()));
// Generate Hive column stats objects from the update stats params.
for (Map.Entry<String, TColumnStats> entry: params.getColumn_stats().entrySet()) {
String colName = entry.getKey();
Column tableCol = table.getColumn(entry.getKey());
// Ignore columns that were dropped in the meantime.
if (tableCol == null) continue;
// If we know the number of rows in the table, cap NDV of the column appropriately.
long ndvCap = params.isSetTable_stats() ? params.table_stats.num_rows : -1;
ColumnStatisticsData colStatsData = ColumnStats.createHiveColStatsData(
ndvCap, entry.getValue(), tableCol.getType());
if (colStatsData == null) continue;
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Updating column stats for %s: numDVs=%d numNulls=%d " +
"maxSize=%d avgSize=%.2f", colName, entry.getValue().getNum_distinct_values(),
entry.getValue().getNum_nulls(), entry.getValue().getMax_size(),
entry.getValue().getAvg_size()));
}
ColumnStatisticsObj colStatsObj = new ColumnStatisticsObj(colName,
tableCol.getType().toString().toLowerCase(), colStatsData);
colStats.addToStatsObj(colStatsObj);
}
return colStats;
}
/**
* Creates a new database in the metastore and adds the db name to the internal
* metadata cache, marking its metadata to be lazily loaded on the next access.
* Re-throws any Hive Meta Store exceptions encountered during the create, these
* may vary depending on the Meta Store connection type (thrift vs direct db).
*/
private void createDatabase(TCreateDbParams params, TDdlExecResponse resp)
throws ImpalaException {
Preconditions.checkNotNull(params);
String dbName = params.getDb();
Preconditions.checkState(dbName != null && !dbName.isEmpty(),
"Null or empty database name passed as argument to Catalog.createDatabase");
Preconditions.checkState(!catalog_.isBlacklistedDb(dbName),
String.format("Can't create blacklisted database: %s. %s", dbName,
BLACKLISTED_DBS_INCONSISTENT_ERR_STR));
Db existingDb = catalog_.getDb(dbName);
if (params.if_not_exists && existingDb != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping database creation because " + dbName + " already exists "
+ "and IF NOT EXISTS was specified.");
}
Preconditions.checkNotNull(existingDb);
// TODO(todd): if client is a 'v2' impalad, only send back invalidation
resp.getResult().addToUpdated_catalog_objects(existingDb.toTCatalogObject());
resp.getResult().setVersion(existingDb.getCatalogVersion());
addSummary(resp, "Database already exists.");
return;
}
org.apache.hadoop.hive.metastore.api.Database db =
new org.apache.hadoop.hive.metastore.api.Database();
db.setName(dbName);
if (params.getComment() != null) {
db.setDescription(params.getComment());
}
if (params.getLocation() != null) {
db.setLocationUri(params.getLocation());
}
db.setOwnerName(params.getOwner());
db.setOwnerType(PrincipalType.USER);
if (LOG.isTraceEnabled()) LOG.trace("Creating database " + dbName);
Db newDb = null;
synchronized (metastoreDdlLock_) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
try {
msClient.getHiveClient().createDatabase(db);
// Load the database back from the HMS. It's unfortunate we need two
// RPCs here, but otherwise we can't populate the location field of the
// DB properly. We'll take the slight chance of a race over the incorrect
// behavior of showing no location in 'describe database' (IMPALA-7439).
db = msClient.getHiveClient().getDatabase(dbName);
newDb = catalog_.addDb(dbName, db);
addSummary(resp, "Database has been created.");
} catch (AlreadyExistsException e) {
if (!params.if_not_exists) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e);
}
addSummary(resp, "Database already exists.");
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Ignoring '%s' when creating database %s because " +
"IF NOT EXISTS was specified.", e, dbName));
}
newDb = catalog_.getDb(dbName);
if (newDb == null) {
try {
org.apache.hadoop.hive.metastore.api.Database msDb =
msClient.getHiveClient().getDatabase(dbName);
newDb = catalog_.addDb(dbName, msDb);
} catch (TException e1) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e1);
}
}
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createDatabase"), e);
}
}
Preconditions.checkNotNull(newDb);
// TODO(todd): if client is a 'v2' impalad, only send back invalidation
resp.result.addToUpdated_catalog_objects(newDb.toTCatalogObject());
if (authzConfig_.isEnabled()) {
authzManager_.updateDatabaseOwnerPrivilege(params.server_name, newDb.getName(),
/* oldOwner */ null, /* oldOwnerType */ null,
newDb.getMetaStoreDb().getOwnerName(), newDb.getMetaStoreDb().getOwnerType(),
resp);
}
}
resp.result.setVersion(newDb.getCatalogVersion());
}
private void createFunction(TCreateFunctionParams params, TDdlExecResponse resp)
throws ImpalaException {
Function fn = Function.fromThrift(params.getFn());
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Adding %s: %s",
fn.getClass().getSimpleName(), fn.signatureString()));
}
boolean isPersistentJavaFn =
(fn.getBinaryType() == TFunctionBinaryType.JAVA) && fn.isPersistent();
synchronized (metastoreDdlLock_) {
Db db = catalog_.getDb(fn.dbName());
if (db == null) {
throw new CatalogException("Database: " + fn.dbName() + " does not exist.");
}
// Get a new catalog version to assign to the database being altered. This is
// needed for events processor as this method creates alter database events.
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
// Search for existing functions with the same name or signature that would
// conflict with the function being added.
for (Function function: db.getFunctions(fn.functionName())) {
if (isPersistentJavaFn || (function.isPersistent() &&
(function.getBinaryType() == TFunctionBinaryType.JAVA)) ||
function.compare(fn, Function.CompareMode.IS_INDISTINGUISHABLE)) {
if (!params.if_not_exists) {
throw new CatalogException("Function " + fn.functionName() +
" already exists.");
}
addSummary(resp, "Function already exists.");
return;
}
}
List<TCatalogObject> addedFunctions = Lists.newArrayList();
if (isPersistentJavaFn) {
// For persistent Java functions we extract all supported function signatures from
// the corresponding Jar and add each signature to the catalog.
Preconditions.checkState(fn instanceof ScalarFunction);
org.apache.hadoop.hive.metastore.api.Function hiveFn =
((ScalarFunction)fn).toHiveFunction();
List<Function> funcs = FunctionUtils.extractFunctions(fn.dbName(), hiveFn,
BackendConfig.INSTANCE.getBackendCfg().local_library_path);
if (funcs.isEmpty()) {
throw new CatalogException(
"No compatible function signatures found in class: " + hiveFn.getClassName());
}
if (addJavaFunctionToHms(fn.dbName(), hiveFn, params.if_not_exists)) {
for (Function addedFn: funcs) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Adding function: %s.%s", addedFn.dbName(),
addedFn.signatureString()));
}
Preconditions.checkState(catalog_.addFunction(addedFn));
addedFunctions.add(addedFn.toTCatalogObject());
}
}
} else {
if (catalog_.addFunction(fn)) {
// Flush DB changes to metastore
applyAlterDatabase(db.getMetaStoreDb());
addedFunctions.add(fn.toTCatalogObject());
// now that HMS alter database has succeeded, add this version to list of
// inflight events in catalog database if event processing is enabled.
catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
}
}
if (!addedFunctions.isEmpty()) {
resp.result.setUpdated_catalog_objects(addedFunctions);
resp.result.setVersion(catalog_.getCatalogVersion());
addSummary(resp, "Function has been created.");
} else {
addSummary(resp, "Function already exists.");
}
}
}
private void createDataSource(TCreateDataSourceParams params, TDdlExecResponse resp)
throws ImpalaException {
// TODO(IMPALA-7131): support data sources with LocalCatalog.
if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + params.toString()); }
DataSource dataSource = DataSource.fromThrift(params.getData_source());
DataSource existingDataSource = catalog_.getDataSource(dataSource.getName());
if (existingDataSource != null) {
if (!params.if_not_exists) {
throw new ImpalaRuntimeException("Data source " + dataSource.getName() +
" already exists.");
}
addSummary(resp, "Data source already exists.");
resp.result.addToUpdated_catalog_objects(existingDataSource.toTCatalogObject());
resp.result.setVersion(existingDataSource.getCatalogVersion());
return;
}
catalog_.addDataSource(dataSource);
resp.result.addToUpdated_catalog_objects(dataSource.toTCatalogObject());
resp.result.setVersion(dataSource.getCatalogVersion());
addSummary(resp, "Data source has been created.");
}
private void dropDataSource(TDropDataSourceParams params, TDdlExecResponse resp)
throws ImpalaException {
// TODO(IMPALA-7131): support data sources with LocalCatalog.
if (LOG.isTraceEnabled()) LOG.trace("Drop DATA SOURCE: " + params.toString());
DataSource dataSource = catalog_.removeDataSource(params.getData_source());
if (dataSource == null) {
if (!params.if_exists) {
throw new ImpalaRuntimeException("Data source " + params.getData_source() +
" does not exists.");
}
addSummary(resp, "Data source does not exist.");
// No data source was removed.
resp.result.setVersion(catalog_.getCatalogVersion());
return;
}
resp.result.addToRemoved_catalog_objects(dataSource.toTCatalogObject());
resp.result.setVersion(dataSource.getCatalogVersion());
addSummary(resp, "Data source has been dropped.");
}
/**
* Drops all table and column stats from the target table in the HMS and
* updates the Impala catalog. Throws an ImpalaException if any errors are
* encountered as part of this operation. Acquires a lock on the modified table
* to protect against concurrent modifications.
*/
private void dropStats(TDropStatsParams params, TDdlExecResponse resp)
throws ImpalaException {
Table table = getExistingTable(params.getTable_name().getDb_name(),
params.getTable_name().getTable_name(), "Load for DROP STATS");
Preconditions.checkNotNull(table);
// There is no transactional HMS API to drop stats at the moment (HIVE-22104).
Preconditions.checkState(!AcidUtils.isTransactionalTable(
table.getMetaStoreTable().getParameters()));
tryLock(table, "dropping stats");
try {
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
catalog_.getLock().writeLock().unlock();
if (params.getPartition_set() == null) {
// TODO: Report the number of updated partitions/columns to the user?
// TODO: bulk alter the partitions.
dropColumnStats(table);
dropTableStats(table);
} else {
HdfsTable hdfsTbl = (HdfsTable) table;
List<HdfsPartition> partitions =
hdfsTbl.getPartitionsFromPartitionSet(params.getPartition_set());
if (partitions.isEmpty()) {
addSummary(resp, "No partitions found for table.");
return;
}
for(HdfsPartition partition : partitions) {
if (partition.getPartitionStatsCompressed() != null) {
partition.dropPartitionStats();
try {
applyAlterPartition(table, partition);
} finally {
partition.markDirty();
}
}
}
}
loadTableMetadata(table, newCatalogVersion, false, true, null, "DROP STATS");
addTableToCatalogUpdate(table, resp.result);
addSummary(resp, "Stats have been dropped.");
} finally {
UnlockWriteLockIfErronouslyLocked();
table.getLock().unlock();
}
}
/**
* Drops all column stats from the table in the HMS. Returns the number of columns
* that were updated as part of this operation.
*/
private int dropColumnStats(Table table) throws ImpalaRuntimeException {
Preconditions.checkState(table.getLock().isHeldByCurrentThread());
int numColsUpdated = 0;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
for (Column col: table.getColumns()) {
// Skip columns that don't have stats.
if (!col.getStats().hasStats()) continue;
try {
MetastoreShim.deleteTableColumnStatistics(msClient.getHiveClient(),
table.getDb().getName(), table.getName(), col.getName());
++numColsUpdated;
} catch (NoSuchObjectException e) {
// We don't care if the column stats do not exist, just ignore the exception.
// We would only expect to make it here if the Impala and HMS metadata
// diverged.
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR,
"delete_table_column_statistics"), e);
}
}
}
return numColsUpdated;
}
/**
* Drops all table and partition stats from this table in the HMS.
* Partitions are updated in batches of MAX_PARTITION_UPDATES_PER_RPC. Returns
* the number of partitions updated as part of this operation, or 1 if the table
* is unpartitioned.
*/
private int dropTableStats(Table table) throws ImpalaException {
Preconditions.checkState(table.getLock().isHeldByCurrentThread());
// Delete the ROW_COUNT from the table (if it was set).
org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable();
int numTargetedPartitions = 0;
boolean droppedRowCount =
msTbl.getParameters().remove(StatsSetupConst.ROW_COUNT) != null;
boolean droppedTotalSize =
msTbl.getParameters().remove(StatsSetupConst.TOTAL_SIZE) != null;
if (droppedRowCount || droppedTotalSize) {
applyAlterTable(msTbl, false, null);
++numTargetedPartitions;
}
if (!(table instanceof HdfsTable) || table.getNumClusteringCols() == 0) {
// If this is not an HdfsTable or if the table is not partitioned, there
// is no more work to be done so just return.
return numTargetedPartitions;
}
// Now clear the stats for all partitions in the table.
HdfsTable hdfsTable = (HdfsTable) table;
Preconditions.checkNotNull(hdfsTable);
// List of partitions that were modified as part of this operation.
List<HdfsPartition> modifiedParts = Lists.newArrayList();
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions(hdfsTable);
for (FeFsPartition fePart: parts) {
// TODO(todd): avoid downcast
HdfsPartition part = (HdfsPartition) fePart;
boolean isModified = false;
if (part.getPartitionStatsCompressed() != null) {
part.dropPartitionStats();
isModified = true;
}
// Remove the ROW_COUNT parameter if it has been set.
if (part.getParameters().remove(StatsSetupConst.ROW_COUNT) != null) {
isModified = true;
}
if (isModified) modifiedParts.add(part);
}
bulkAlterPartitions(table, modifiedParts, null);
return modifiedParts.size();
}
/**
* Drops a database from the metastore and removes the database's metadata from the
* internal cache. Attempts to remove the HDFS cache directives of the underlying
* tables. Re-throws any HMS exceptions encountered during the drop.
*/
private void dropDatabase(TDropDbParams params, TDdlExecResponse resp)
throws ImpalaException {
Preconditions.checkNotNull(params);
String dbName = params.getDb();
Preconditions.checkState(dbName != null && !dbName.isEmpty(),
"Null or empty database name passed as argument to Catalog.dropDatabase");
Preconditions.checkState(!catalog_.isBlacklistedDb(dbName) || params.if_exists,
String.format("Can't drop blacklisted database: %s. %s", dbName,
BLACKLISTED_DBS_INCONSISTENT_ERR_STR));
if (catalog_.isBlacklistedDb(dbName)) {
// It's expected to go here if "if_exists" is set to true.
addSummary(resp, "Can't drop blacklisted database: " + dbName);
return;
}
LOG.trace("Dropping database " + dbName);
Db db = catalog_.getDb(dbName);
if (db != null && db.numFunctions() > 0 && !params.cascade) {
throw new CatalogException("Database " + db.getName() + " is not empty");
}
TCatalogObject removedObject = null;
synchronized (metastoreDdlLock_) {
// Remove all the Kudu tables of 'db' from the Kudu storage engine.
if (db != null && params.cascade) dropTablesFromKudu(db);
// The Kudu tables in the HMS should have been dropped at this point
// with the Hive Metastore integration enabled.
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
// HMS client does not have a way to identify if the database was dropped or
// not if the ignoreIfUnknown flag is true. Hence we always pass the
// ignoreIfUnknown as false and catch the NoSuchObjectFoundException and
// determine if we should throw or not
msClient.getHiveClient().dropDatabase(
dbName, /* deleteData */true, /* ignoreIfUnknown */false,
params.cascade);
addSummary(resp, "Database has been dropped.");
} catch (TException e) {
if (e instanceof NoSuchObjectException && params.if_exists) {
// if_exists param was set; we ignore the NoSuchObjectFoundException
addSummary(resp, "Database does not exist.");
} else {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDatabase"), e);
}
}
Db removedDb = catalog_.removeDb(dbName);
if (removedDb == null) {
// Nothing was removed from the catalogd's cache.
resp.result.setVersion(catalog_.getCatalogVersion());
return;
}
// Make sure the cache directives, if any, of the underlying tables are removed
for (String tableName: removedDb.getAllTableNames()) {
uncacheTable(removedDb.getTable(tableName));
}
removedObject = removedDb.toTCatalogObject();
if (authzConfig_.isEnabled()) {
authzManager_.updateDatabaseOwnerPrivilege(params.server_name, dbName,
db.getMetaStoreDb().getOwnerName(), db.getMetaStoreDb().getOwnerType(),
/* newOwner */ null, /* newOwnerType */ null, resp);
}
}
Preconditions.checkNotNull(removedObject);
resp.result.setVersion(removedObject.getCatalog_version());
resp.result.addToRemoved_catalog_objects(removedObject);
// it is possible that HMS database has been removed out of band externally. In
// such a case we still would want to add the summary of the operation as database
// has been dropped since we cleaned up state from CatalogServer
addSummary(resp, "Database has been dropped.");
}
/**
* Drops all the Kudu tables of database 'db' from the Kudu storage engine. Retrieves
* the Kudu table name of each table in 'db' from HMS. Throws an ImpalaException if
* metadata for Kudu tables cannot be loaded from HMS or if an error occurs while
* trying to drop a table from Kudu.
*/
private void dropTablesFromKudu(Db db) throws ImpalaException {
// If the table format isn't available, because the table hasn't been loaded yet,
// the metadata must be fetched from the Hive Metastore.
List<String> incompleteTableNames = Lists.newArrayList();
List<org.apache.hadoop.hive.metastore.api.Table> msTables = Lists.newArrayList();
for (Table table: db.getTables()) {
org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable();
if (msTable == null) {
incompleteTableNames.add(table.getName());
} else {
msTables.add(msTable);
}
}
if (!incompleteTableNames.isEmpty()) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msTables.addAll(msClient.getHiveClient().getTableObjectsByName(
db.getName(), incompleteTableNames));
} catch (TException e) {
LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTableObjectsByName") +
e.getMessage());
}
}
for (org.apache.hadoop.hive.metastore.api.Table msTable: msTables) {
if (!KuduTable.isKuduTable(msTable) || !KuduTable
.isSynchronizedTable(msTable)) continue;
// The operation will be aborted if the Kudu table cannot be dropped. If for
// some reason Kudu is permanently stuck in a non-functional state, the user is
// expected to ALTER TABLE to either set the table to UNMANAGED or set the format
// to something else.
KuduCatalogOpExecutor.dropTable(msTable, /*if exists*/ true);
}
}
private boolean isKuduHmsIntegrationEnabled(
org.apache.hadoop.hive.metastore.api.Table msTbl)
throws ImpalaRuntimeException {
// Check if Kudu's integration with the Hive Metastore is enabled, and validate
// the configuration.
Preconditions.checkState(KuduTable.isKuduTable(msTbl));
String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
String hmsUris = MetaStoreUtil.getHiveMetastoreUris();
return KuduTable.isHMSIntegrationEnabledAndValidate(masterHosts, hmsUris);
}
/**
* Drops a table or view from the metastore and removes it from the catalog.
* Also drops all associated caching requests on the table and/or table's partitions,
* uncaching all table data. If params.purge is true, table data is permanently
* deleted.
* In case of transactional tables acquires an exclusive HMS table lock before
* executing the drop operation.
*/
private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse resp)
throws ImpalaException {
TableName tableName = TableName.fromThrift(params.getTable_name());
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
Preconditions.checkState(!catalog_.isBlacklistedTable(tableName) || params.if_exists,
String.format("Can't drop blacklisted table: %s. %s", tableName,
BLACKLISTED_TABLES_INCONSISTENT_ERR_STR));
if (catalog_.isBlacklistedTable(tableName)) {
// It's expected to go here if "if_exists" is set to true.
addSummary(resp, "Can't drop blacklisted table: " + tableName);
return;
}
LOG.trace(String.format("Dropping table/view %s", tableName));
// If the table exists, ensure that it is loaded before we try to operate on it.
// We do this up here rather than down below to avoid doing too much table-loading
// work while holding the DDL lock. We can't simply use 'getExistingTable' because
// we rely on more granular checks to provide the correct summary message for
// the 'IF EXISTS' case.
//
// In the standard catalogd implementation, the table will most likely already
// be loaded because the planning phase on the impalad side triggered the loading.
// In the LocalCatalog configuration, however, this is often necessary.
try {
catalog_.getOrLoadTable(params.getTable_name().db_name,
params.getTable_name().table_name, "Load for DROP TABLE/VIEW");
} catch (CatalogException e) {
// Ignore exceptions -- the above was just to trigger loading. Failure to load
// or non-existence of the database will be handled down below.
}
Table tbl = catalog_.getTableIfCachedNoThrow(tableName.getDb(), tableName.getTbl());
long lockId = -1;
if (tbl != null && !(tbl instanceof IncompleteTable) &&
AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) {
HeartbeatContext ctx = new HeartbeatContext(
String.format("Drop table/view %s.%s", tableName.getDb(), tableName.getTbl()),
System.nanoTime());
lockId = catalog_.lockTableStandalone(tableName.getDb(), tableName.getTbl(), ctx);
}
try {
dropTableOrViewInternal(params, tableName, resp);
} finally {
if (lockId > 0) catalog_.releaseTableLock(lockId);
}
}
/**
* Helper function for dropTableOrView().
*/
private void dropTableOrViewInternal(TDropTableOrViewParams params,
TableName tableName, TDdlExecResponse resp) throws ImpalaException {
TCatalogObject removedObject = new TCatalogObject();
synchronized (metastoreDdlLock_) {
Db db = catalog_.getDb(params.getTable_name().db_name);
if (db == null) {
String dbNotExist = "Database does not exist: " + params.getTable_name().db_name;
if (params.if_exists) {
addSummary(resp, dbNotExist);
return;
}
throw new CatalogException(dbNotExist);
}
Table existingTbl = db.getTable(params.getTable_name().table_name);
if (existingTbl == null) {
if (params.if_exists) {
addSummary(resp, (params.is_table ? "Table " : "View ") + "does not exist.");
return;
}
throw new CatalogException("Table/View does not exist.");
}
// Retrieve the HMS table to determine if this is a Kudu table.
org.apache.hadoop.hive.metastore.api.Table msTbl = existingTbl.getMetaStoreTable();
if (msTbl == null) {
Preconditions.checkState(existingTbl instanceof IncompleteTable);
Stopwatch hmsLoadSW = new Stopwatch().start();
long hmsLoadTime;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msTbl = msClient.getHiveClient().getTable(tableName.getDb(),
tableName.getTbl());
} catch (TException e) {
LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTable") + e.getMessage());
} finally {
hmsLoadTime = hmsLoadSW.elapsed(TimeUnit.NANOSECONDS);
}
existingTbl.updateHMSLoadTableSchemaTime(hmsLoadTime);
}
boolean isSynchronizedTable = msTbl != null &&
KuduTable.isKuduTable(msTbl) && KuduTable.isSynchronizedTable(msTbl);
if (isSynchronizedTable) {
KuduCatalogOpExecutor.dropTable(msTbl, /* if exists */ true);
}
// Check to make sure we don't drop a view with "drop table" statement and
// vice versa. is_table field is marked optional in TDropTableOrViewParams to
// maintain catalog api compatibility.
// TODO: Remove params.isSetIs_table() check once catalog api compatibility is
// fixed.
if (params.isSetIs_table() && ((params.is_table && existingTbl instanceof View)
|| (!params.is_table && !(existingTbl instanceof View)))) {
String errorMsg = "DROP " + (params.is_table ? "TABLE " : "VIEW ") +
"not allowed on a " + (params.is_table ? "view: " : "table: ") + tableName;
if (params.if_exists) {
addSummary(resp, "Drop " + (params.is_table ? "table " : "view ") +
"is not allowed on a " + (params.is_table ? "view." : "table."));
return;
}
throw new CatalogException(errorMsg);
}
// When Kudu's HMS integration is enabled, Kudu will drop the managed table
// entries automatically. In all other cases, we need to drop the HMS table
// entry ourselves.
if (!isSynchronizedTable || !isKuduHmsIntegrationEnabled(msTbl)) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msClient.getHiveClient().dropTable(
tableName.getDb(), tableName.getTbl(), true,
params.if_exists, params.purge);
} catch (NoSuchObjectException e) {
throw new ImpalaRuntimeException(String.format("Table %s no longer exists " +
"in the Hive MetaStore. Run 'invalidate metadata %s' to update the " +
"Impala catalog.", tableName, tableName));
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "dropTable"), e);
}
}
addSummary(resp, (params.is_table ? "Table " : "View ") + "has been dropped.");
Table table = catalog_.removeTable(params.getTable_name().db_name,
params.getTable_name().table_name);
if (table == null) {
// Nothing was removed from the catalogd's cache.
resp.result.setVersion(catalog_.getCatalogVersion());
return;
}
resp.result.setVersion(table.getCatalogVersion());
uncacheTable(table);
if (table.getMetaStoreTable() != null) {
if (authzConfig_.isEnabled()) {
authzManager_.updateTableOwnerPrivilege(params.server_name,
table.getDb().getName(), table.getName(),
table.getMetaStoreTable().getOwner(),
table.getMetaStoreTable().getOwnerType(), /* newOwner */ null,
/* newOwnerType */ null, resp);
}
}
}
removedObject.setType(TCatalogObjectType.TABLE);
removedObject.setTable(new TTable());
removedObject.getTable().setTbl_name(tableName.getTbl());
removedObject.getTable().setDb_name(tableName.getDb());
removedObject.setCatalog_version(resp.result.getVersion());
resp.result.addToRemoved_catalog_objects(removedObject);
}
/**
* Drops all associated caching requests on the table and/or table's partitions,
* uncaching all table data, if applicable. Throws no exceptions, only logs errors.
* Does not update the HMS.
*/
private static void uncacheTable(FeTable table) {
if (!(table instanceof FeFsTable)) return;
FeFsTable hdfsTable = (FeFsTable) table;
if (hdfsTable.isMarkedCached()) {
try {
HdfsCachingUtil.removeTblCacheDirective(table.getMetaStoreTable());
} catch (Exception e) {
LOG.error("Unable to uncache table: " + table.getFullName(), e);
}
}
if (table.getNumClusteringCols() > 0) {
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions(hdfsTable);
for (FeFsPartition part: parts) {
if (part.isMarkedCached()) {
try {
HdfsCachingUtil.removePartitionCacheDirective(part);
} catch (Exception e) {
LOG.error("Unable to uncache partition: " + part.getPartitionName(), e);
}
}
}
}
}
/**
* Truncate a table by deleting all files in its partition directories, and dropping
* all column and table statistics. Acquires a table lock to protect against
* concurrent table modifications.
* TODO truncate specified partitions.
*/
private void truncateTable(TTruncateParams params, TDdlExecResponse resp)
throws ImpalaException {
TTableName tblName = params.getTable_name();
Table table = null;
try {
table = getExistingTable(tblName.getDb_name(), tblName.getTable_name(),
"Load for TRUNCATE TABLE");
} catch (TableNotFoundException e) {
if (params.if_exists) {
addSummary(resp, "Table does not exist.");
return;
}
throw e;
}
Preconditions.checkNotNull(table);
if (!(table instanceof HdfsTable)) {
throw new CatalogException(
String.format("TRUNCATE TABLE not supported on non-HDFS table: %s",
table.getFullName()));
}
// Lock table to check transactional properties.
// If non-transactional, the lock will be held during truncation.
// If transactional, the lock will be released for some time to acquire the HMS Acid
// lock. It's safe because transactional -> non-transactional conversion is not
// allowed.
tryLock(table, "truncating");
try {
long newCatalogVersion = 0;
try {
if (AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
newCatalogVersion = truncateTransactionalTable(params, table);
} else {
newCatalogVersion = truncateNonTransactionalTable(params, table);
}
} catch (Exception e) {
String fqName = tblName.db_name + "." + tblName.table_name;
throw new CatalogException(String.format("Failed to truncate table: %s.\n" +
"Table may be in a partially truncated state.", fqName), e);
}
Preconditions.checkState(newCatalogVersion > 0);
addSummary(resp, "Table has been truncated.");
loadTableMetadata(table, newCatalogVersion, true, true, null, "TRUNCATE");
addTableToCatalogUpdate(table, resp.result);
} finally {
UnlockWriteLockIfErronouslyLocked();
if (table.getLock().isHeldByCurrentThread()) {
table.getLock().unlock();
}
}
}
/**
* Truncates a transactional table. It creates new empty base directories in all
* partitions of the table. That way queries started earlier can still read a
* valid snapshot version of the data. HMS's cleaner should remove obsolete
* directories later.
* After that empty directory creation it removes stats-related parameters of
* the table and its partitions.
*/
private long truncateTransactionalTable(TTruncateParams params, Table table)
throws ImpalaException {
Preconditions.checkState(table.getLock().isHeldByCurrentThread());
Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
catalog_.getLock().writeLock().unlock();
TableName tblName = TableName.fromThrift(params.getTable_name());
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
long newCatalogVersion = 0;
IMetaStoreClient hmsClient = msClient.getHiveClient();
HeartbeatContext ctx = new HeartbeatContext(
String.format("Truncate table %s.%s", tblName.getDb(), tblName.getTbl()),
System.nanoTime());
try (Transaction txn = catalog_.openTransaction(hmsClient, ctx)) {
Preconditions.checkState(txn.getId() > 0);
// We need to release catalog table lock here, because HMS Acid table lock
// must be locked in advance to avoid dead lock.
table.getLock().unlock();
//TODO: if possible, set DataOperationType to something better than NO_TXN.
catalog_.lockTableInTransaction(tblName.getDb(), tblName.getTbl(), txn,
DataOperationType.NO_TXN, ctx);
tryLock(table, "truncating");
newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
catalog_.getLock().writeLock().unlock();
TblTransaction tblTxn = MetastoreShim.createTblTransaction(hmsClient,
table.getMetaStoreTable(), txn.getId());
HdfsTable hdfsTable = (HdfsTable)table;
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions(hdfsTable);
createEmptyBaseDirectories(parts, tblTxn.writeId);
// Currently Impala cannot update the statistics properly. So instead of
// writing correct stats, let's just remove COLUMN_STATS_ACCURATE parameter from
// each partition.
// TODO(IMPALA-8883): properly update statistics
List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
Lists.newArrayListWithCapacity(parts.size());
if (table.getNumClusteringCols() > 0) {
for (FeFsPartition part: parts) {
org.apache.hadoop.hive.metastore.api.Partition hmsPart =
((HdfsPartition)part).toHmsPartition();
Preconditions.checkNotNull(hmsPart);
if (hmsPart.getParameters() != null) {
hmsPart.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
hmsPartitions.add(hmsPart);
}
}
}
// For partitioned tables we need to alter all the partitions in HMS.
if (!hmsPartitions.isEmpty()) {
unsetPartitionsColStats(table.getMetaStoreTable(), hmsPartitions, tblTxn);
}
// Remove COLUMN_STATS_ACCURATE property from the table.
unsetTableColStats(table.getMetaStoreTable(), tblTxn);
txn.commit();
}
return newCatalogVersion;
} catch (Exception e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "truncateTable"), e);
}
}
private long truncateNonTransactionalTable(TTruncateParams params, Table table)
throws Exception {
Preconditions.checkState(table.getLock().isHeldByCurrentThread());
Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
catalog_.getLock().writeLock().unlock();
HdfsTable hdfsTable = (HdfsTable) table;
Collection<? extends FeFsPartition> parts = FeCatalogUtils
.loadAllPartitions(hdfsTable);
for (FeFsPartition part : parts) {
FileSystemUtil.deleteAllVisibleFiles(new Path(part.getLocation()));
}
dropColumnStats(table);
dropTableStats(table);
return newCatalogVersion;
}
/**
* Creates new empty base directories for an ACID table. The directories won't be
* really empty, they will contain the hidden "_empty" file. It's needed because
* FileSystemUtil.listFiles() doesn't see empty directories. See IMPALA-8739.
* @param partitions the partitions in which we create new directories.
* @param writeId the write id of the new base directory.
* @throws IOException
*/
private void createEmptyBaseDirectories(
Collection<? extends FeFsPartition> partitions, long writeId) throws IOException {
for (FeFsPartition part: partitions) {
Path partPath = new Path(part.getLocation());
FileSystem fs = FileSystemUtil.getFileSystemForPath(partPath);
String baseDirStr =
part.getLocation() + Path.SEPARATOR + "base_" + String.valueOf(writeId);
fs.mkdirs(new Path(baseDirStr));
String emptyFile = baseDirStr + Path.SEPARATOR + "_empty";
fs.create(new Path(emptyFile));
}
}
private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
throws ImpalaException {
FunctionName fName = FunctionName.fromThrift(params.fn_name);
synchronized (metastoreDdlLock_) {
Db db = catalog_.getDb(fName.getDb());
if (db == null) {
if (!params.if_exists) {
throw new CatalogException("Database: " + fName.getDb()
+ " does not exist.");
}
addSummary(resp, "Database does not exist.");
return;
}
// Get a new catalog version to assign to the database being altered. This is
// needed for events processor as this method creates alter database events.
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
List<TCatalogObject> removedFunctions = Lists.newArrayList();
if (!params.isSetSignature()) {
dropJavaFunctionFromHms(fName.getDb(), fName.getFunction(), params.if_exists);
for (Function fn: db.getFunctions(fName.getFunction())) {
if (fn.getBinaryType() != TFunctionBinaryType.JAVA
|| !fn.isPersistent()) {
continue;
}
Preconditions.checkNotNull(catalog_.removeFunction(fn));
removedFunctions.add(fn.toTCatalogObject());
}
} else {
ArrayList<Type> argTypes = Lists.newArrayList();
for (TColumnType t: params.arg_types) {
argTypes.add(Type.fromThrift(t));
}
Function desc = new Function(fName, argTypes, Type.INVALID, false);
Function fn = catalog_.removeFunction(desc);
if (fn == null) {
if (!params.if_exists) {
throw new CatalogException(
"Function: " + desc.signatureString() + " does not exist.");
}
} else {
// Flush DB changes to metastore
applyAlterDatabase(db.getMetaStoreDb());
removedFunctions.add(fn.toTCatalogObject());
// now that HMS alter operation has succeeded, add this version to list of
// inflight events in catalog database if event processing is enabled.
catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
}
}
if (!removedFunctions.isEmpty()) {
addSummary(resp, "Function has been dropped.");
resp.result.setRemoved_catalog_objects(removedFunctions);
} else {
addSummary(resp, "Function does not exist.");
}
resp.result.setVersion(catalog_.getCatalogVersion());
}
}
/**
* Creates a new table in the metastore and adds an entry to the metadata cache to
* lazily load the new metadata on the next access. If this is a Synchronized Kudu
* table, the table is also created in the Kudu storage engine. Re-throws any HMS or
* Kudu exceptions encountered during the create.
* @param syncDdl tells if SYNC_DDL option is enabled on this DDL request.
* @return true if a new table has been created with the given params, false
* otherwise.
*/
private boolean createTable(TCreateTableParams params, TDdlExecResponse response,
boolean syncDdl) throws ImpalaException {
Preconditions.checkNotNull(params);
TableName tableName = TableName.fromThrift(params.getTable_name());
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
Preconditions.checkState(params.getColumns() != null,
"Null column list given as argument to Catalog.createTable");
Preconditions.checkState(!catalog_.isBlacklistedTable(tableName),
String.format("Can't create blacklisted table: %s. %s", tableName,
BLACKLISTED_TABLES_INCONSISTENT_ERR_STR));
Table existingTbl = catalog_.getTableNoThrow(tableName.getDb(), tableName.getTbl());
if (params.if_not_exists && existingTbl != null) {
addSummary(response, "Table already exists.");
LOG.trace("Skipping table creation because {} already exists and " +
"IF NOT EXISTS was specified.", tableName);
tryLock(existingTbl);
try {
if (syncDdl) {
// When SYNC_DDL is enabled and the table already exists, we force a version
// bump on it so that it is added to the next statestore update. Without this
// we could potentially be referring to a table object that has already been
// GC'ed from the TopicUpdateLog and waitForSyncDdlVersion() cannot find a
// covering topic version (IMPALA-7961).
//
// This is a conservative hack to not break the SYNC_DDL semantics and could
// possibly result in false-positive invalidates on this table. However, that is
// better than breaking the SYNC_DDL semantics and the subsequent queries
// referring to this table failing with "table not found" errors.
long newVersion = catalog_.incrementAndGetCatalogVersion();
existingTbl.setCatalogVersion(newVersion);
LOG.trace("Table {} version bumped to {} because SYNC_DDL is enabled.",
tableName, newVersion);
}
addTableToCatalogUpdate(existingTbl, response.result);
} finally {
// Release the locks held in tryLock().
catalog_.getLock().writeLock().unlock();
existingTbl.getLock().unlock();
}
return false;
}
org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
LOG.trace("Creating table {}", tableName);
if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, response);
Preconditions.checkState(params.getColumns().size() > 0,
"Empty column list given as argument to Catalog.createTable");
return createTable(tbl, params.if_not_exists, params.getCache_op(),
params.server_name, params.getPrimary_keys(), params.getForeign_keys(), response);
}
/**
* Utility function that creates a hive.metastore.api.Table object based on the given
* TCreateTableParams.
* TODO: Extract metastore object creation utility functions into a separate
* helper/factory class.
*/
public static org.apache.hadoop.hive.metastore.api.Table createMetaStoreTable(
TCreateTableParams params) {
Preconditions.checkNotNull(params);
TableName tableName = TableName.fromThrift(params.getTable_name());
org.apache.hadoop.hive.metastore.api.Table tbl =
new org.apache.hadoop.hive.metastore.api.Table();
tbl.setDbName(tableName.getDb());
tbl.setTableName(tableName.getTbl());
tbl.setOwner(params.getOwner());
if (params.isSetTable_properties()) {
tbl.setParameters(params.getTable_properties());
} else {
tbl.setParameters(new HashMap<String, String>());
}
if (params.isSetSort_columns() && !params.sort_columns.isEmpty()) {
tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
Joiner.on(",").join(params.sort_columns));
TSortingOrder sortingOrder = params.isSetSorting_order() ?
params.sorting_order : TSortingOrder.LEXICAL;
tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_ORDER,
sortingOrder.toString());
}
if (params.getComment() != null) {
tbl.getParameters().put("comment", params.getComment());
}
if (params.is_external) {
tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
tbl.putToParameters("EXTERNAL", "TRUE");
} else {
tbl.setTableType(TableType.MANAGED_TABLE.toString());
}
tbl.setSd(createSd(params));
if (params.getPartition_columns() != null) {
// Add in any partition keys that were specified
tbl.setPartitionKeys(buildFieldSchemaList(params.getPartition_columns()));
} else {
tbl.setPartitionKeys(new ArrayList<FieldSchema>());
}
setDefaultTableCapabilities(tbl);
return tbl;
}
private static StorageDescriptor createSd(TCreateTableParams params) {
StorageDescriptor sd = HiveStorageDescriptorFactory.createSd(
params.getFile_format(), RowFormat.fromThrift(params.getRow_format()));
if (params.isSetSerde_properties()) {
if (sd.getSerdeInfo().getParameters() == null) {
sd.getSerdeInfo().setParameters(params.getSerde_properties());
} else {
sd.getSerdeInfo().getParameters().putAll(params.getSerde_properties());
}
}
if (params.getLocation() != null) sd.setLocation(params.getLocation());
// Add in all the columns
sd.setCols(buildFieldSchemaList(params.getColumns()));
return sd;
}
/**
* Creates a new Kudu table. It should be noted that since HIVE-22158, HMS transforms
* a create managed Kudu table request to a create external Kudu table with
* <code>external.table.purge</code> property set to true. Such transformed Kudu
* tables should be treated as managed (synchronized) tables to keep the user facing
* behavior consistent.
*
* For managed tables:
* 1. If Kudu's integration with the Hive Metastore is not enabled, the Kudu
* table is first created in Kudu, then in the HMS.
* 2. Otherwise, when the table is created in Kudu, we rely on Kudu to have
* created the table in the HMS.
* For external tables:
* 1. We only create the table in the HMS (regardless of Kudu's integration
* with the Hive Metastore).
*
* After the above is complete, we create the table in the catalog cache.
*
* 'response' is populated with the results of this operation. Returns true if a new
* table was created as part of this call, false otherwise.
*/
private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable,
TCreateTableParams params, TDdlExecResponse response) throws ImpalaException {
Preconditions.checkState(KuduTable.isKuduTable(newTable));
if (KuduTable.isExternalTable(newTable)) {
KuduCatalogOpExecutor.populateExternalTableColsFromKudu(newTable);
} else {
KuduCatalogOpExecutor.createManagedTable(newTable, params);
}
// When Kudu's integration with the Hive Metastore is enabled, Kudu will create
// the HMS table for managed tables.
boolean createsHMSTable = KuduTable.isExternalTable(newTable) ?
true : !isKuduHmsIntegrationEnabled(newTable);
try {
// Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
// ensure the atomicity of these operations.
synchronized (metastoreDdlLock_) {
if (createsHMSTable) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msClient.getHiveClient().createTable(newTable);
}
}
// Add the table to the catalog cache
Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
newTable.getTableName());
addTableToCatalogUpdate(newTbl, response.result);
}
} catch (Exception e) {
try {
// Error creating the table in HMS, drop the managed table from Kudu.
if (!KuduTable.isExternalTable(newTable)) {
KuduCatalogOpExecutor.dropTable(newTable, false);
}
} catch (Exception logged) {
String kuduTableName = newTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
LOG.error(String.format("Failed to drop Kudu table '%s'", kuduTableName),
logged);
throw new RuntimeException(String.format("Failed to create the table '%s' in " +
" the Metastore and the newly created Kudu table '%s' could not be " +
" dropped. The log contains more information.", newTable.getTableName(),
kuduTableName), e);
}
if (e instanceof AlreadyExistsException && params.if_not_exists) {
addSummary(response, "Table already exists.");
return false;
}
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
}
addSummary(response, "Table has been created.");
return true;
}
/**
* Creates a new table. The table is initially created in HMS and, if that operation
* succeeds, it is then added in the catalog cache. It also sets HDFS caching if
* 'cacheOp' is not null. 'response' is populated with the results of this operation.
* Returns true if a new table was created as part of this call, false otherwise.
*/
private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable,
boolean if_not_exists, THdfsCachingOp cacheOp, String serverName,
List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
TDdlExecResponse response) throws ImpalaException {
Preconditions.checkState(!KuduTable.isKuduTable(newTable));
synchronized (metastoreDdlLock_) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
if (primaryKeys == null && foreignKeys == null) {
msClient.getHiveClient().createTable(newTable);
} else {
MetastoreShim.createTableWithConstraints(
msClient.getHiveClient(), newTable,
primaryKeys == null ? new ArrayList<>() : primaryKeys,
foreignKeys == null ? new ArrayList<>() : foreignKeys);
}
// TODO (HIVE-21807): Creating a table and retrieving the table information is
// not atomic.
addSummary(response, "Table has been created.");
org.apache.hadoop.hive.metastore.api.Table msTable = msClient.getHiveClient()
.getTable(newTable.getDbName(), newTable.getTableName());
long tableCreateTime = msTable.getCreateTime();
response.setTable_name(newTable.getDbName() + "." + newTable.getTableName());
response.setTable_create_time(tableCreateTime);
// For external tables set table location needed for lineage generation.
if (newTable.getTableType() == TableType.EXTERNAL_TABLE.toString()) {
String tableLocation = newTable.getSd().getLocation();
// If location was not specified in the query, get it from newly created
// metastore table.
if (tableLocation == null) {
tableLocation = msTable.getSd().getLocation();
}
response.setTable_location(tableLocation);
}
// If this table should be cached, and the table location was not specified by
// the user, an extra step is needed to read the table to find the location.
if (cacheOp != null && cacheOp.isSet_cached() &&
newTable.getSd().getLocation() == null) {
newTable = msClient.getHiveClient().getTable(
newTable.getDbName(), newTable.getTableName());
}
} catch (Exception e) {
if (e instanceof AlreadyExistsException && if_not_exists) {
addSummary(response, "Table already exists");
return false;
}
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
}
// Submit the cache request and update the table metadata.
if (cacheOp != null && cacheOp.isSet_cached()) {
short replication = cacheOp.isSetReplication() ? cacheOp.getReplication() :
JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR;
long id = HdfsCachingUtil.submitCacheTblDirective(newTable,
cacheOp.getCache_pool_name(), replication);
catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
new TTableName(newTable.getDbName(), newTable.getTableName()),
"CREATE TABLE CACHED");
applyAlterTable(newTable);
}
Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
newTable.getTableName());
addTableToCatalogUpdate(newTbl, response.result);
if (authzConfig_.isEnabled()) {
authzManager_.updateTableOwnerPrivilege(serverName, newTable.getDbName(),
newTable.getTableName(), /* oldOwner */ null,
/* oldOwnerType */ null, newTable.getOwner(), newTable.getOwnerType(),
response);
}
}
return true;
}
/**
* Creates a new view in the metastore and adds an entry to the metadata cache to
* lazily load the new metadata on the next access. Re-throws any Metastore
* exceptions encountered during the create.
*/
private void createView(TCreateOrAlterViewParams params, TDdlExecResponse response)
throws ImpalaException {
TableName tableName = TableName.fromThrift(params.getView_name());
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
Preconditions.checkState(params.getColumns() != null &&
params.getColumns().size() > 0,
"Null or empty column list given as argument to DdlExecutor.createView");
Preconditions.checkState(!catalog_.isBlacklistedTable(tableName),
String.format("Can't create view with blacklisted table name: %s. %s", tableName,
BLACKLISTED_TABLES_INCONSISTENT_ERR_STR));
if (params.if_not_exists &&
catalog_.containsTable(tableName.getDb(), tableName.getTbl())) {
LOG.trace(String.format("Skipping view creation because %s already exists and " +
"ifNotExists is true.", tableName));
}
// Create new view.
org.apache.hadoop.hive.metastore.api.Table view =
new org.apache.hadoop.hive.metastore.api.Table();
setCreateViewAttributes(params, view);
LOG.trace(String.format("Creating view %s", tableName));
if (!createTable(view, params.if_not_exists, null, params.server_name,
new ArrayList<>(), new ArrayList<>(), response)) {
addSummary(response, "View already exists.");
} else {
addSummary(response, "View has been created.");
}
}
/**
* Creates a new table in the metastore based on the definition of an existing table.
* No data is copied as part of this process, it is a metadata only operation. If the
* creation succeeds, an entry is added to the metadata cache to lazily load the new
* table's metadata on the next access.
* @param syncDdl tells is SYNC_DDL is enabled for this DDL request.
*/
private void createTableLike(TCreateTableLikeParams params, TDdlExecResponse response,
boolean syncDdl) throws ImpalaException {
Preconditions.checkNotNull(params);
THdfsFileFormat fileFormat =
params.isSetFile_format() ? params.getFile_format() : null;
String comment = params.isSetComment() ? params.getComment() : null;
TableName tblName = TableName.fromThrift(params.getTable_name());
TableName srcTblName = TableName.fromThrift(params.getSrc_table_name());
Preconditions.checkState(tblName != null && tblName.isFullyQualified());
Preconditions.checkState(srcTblName != null && srcTblName.isFullyQualified());
Preconditions.checkState(!catalog_.isBlacklistedTable(tblName),
String.format("Can't create blacklisted table: %s. %s", tblName,
BLACKLISTED_TABLES_INCONSISTENT_ERR_STR));
Table existingTbl = catalog_.getTableNoThrow(tblName.getDb(), tblName.getTbl());
if (params.if_not_exists && existingTbl != null) {
addSummary(response, "Table already exists.");
LOG.trace(String.format("Skipping table creation because %s already exists and " +
"IF NOT EXISTS was specified.", tblName));
tryLock(existingTbl);
try {
if (syncDdl) {
// When SYNC_DDL is enabled and the table already exists, we force a version
// bump on it so that it is added to the next statestore update. Without this
// we could potentially be referring to a table object that has already been
// GC'ed from the TopicUpdateLog and waitForSyncDdlVersion() cannot find a
// covering topic version (IMPALA-7961).
//
// This is a conservative hack to not break the SYNC_DDL semantics and could
// possibly result in false-positive invalidates on this table. However, that is
// better than breaking the SYNC_DDL semantics and the subsequent queries
// referring to this table failing with "table not found" errors.
long newVersion = catalog_.incrementAndGetCatalogVersion();
existingTbl.setCatalogVersion(newVersion);
LOG.trace("Table {} version bumped to {} because SYNC_DDL is enabled.",
existingTbl.getFullName(), newVersion);
}
addTableToCatalogUpdate(existingTbl, response.result);
} finally {
// Release the locks held in tryLock().
catalog_.getLock().writeLock().unlock();
existingTbl.getLock().unlock();
}
return;
}
Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl(),
"Load source for CREATE TABLE LIKE");
org.apache.hadoop.hive.metastore.api.Table tbl =
srcTable.getMetaStoreTable().deepCopy();
Preconditions.checkState(!KuduTable.isKuduTable(tbl),
"CREATE TABLE LIKE is not supported for Kudu tables.");
tbl.setDbName(tblName.getDb());
tbl.setTableName(tblName.getTbl());
tbl.setOwner(params.getOwner());
if (tbl.getParameters() == null) {
tbl.setParameters(new HashMap<String, String>());
}
if (params.isSetSort_columns() && !params.sort_columns.isEmpty()) {
tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS,
Joiner.on(",").join(params.sort_columns));
TSortingOrder sortingOrder = params.isSetSorting_order() ?
params.sorting_order : TSortingOrder.LEXICAL;
tbl.getParameters().put(AlterTableSortByStmt.TBL_PROP_SORT_ORDER,
sortingOrder.toString());
}
if (comment != null) {
tbl.getParameters().put("comment", comment);
}
// The EXTERNAL table property should not be copied from the old table.
if (params.is_external) {
tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
tbl.putToParameters("EXTERNAL", "TRUE");
} else {
tbl.setTableType(TableType.MANAGED_TABLE.toString());
if (tbl.getParameters().containsKey("EXTERNAL")) {
tbl.getParameters().remove("EXTERNAL");
}
}
// We should not propagate hdfs caching parameters to the new table.
if (tbl.getParameters().containsKey(
HdfsCachingUtil.CACHE_DIR_ID_PROP_NAME)) {
tbl.getParameters().remove(HdfsCachingUtil.CACHE_DIR_ID_PROP_NAME);
}
if (tbl.getParameters().containsKey(
HdfsCachingUtil.CACHE_DIR_REPLICATION_PROP_NAME)) {
tbl.getParameters().remove(
HdfsCachingUtil.CACHE_DIR_REPLICATION_PROP_NAME);
}
// The LOCATION property should not be copied from the old table. If the location
// is null (the caller didn't specify a custom location) this will clear the value
// and the table will use the default table location from the parent database.
tbl.getSd().setLocation(params.getLocation());
if (fileFormat != null) {
setStorageDescriptorFileFormat(tbl.getSd(), fileFormat);
} else if (srcTable instanceof View) {
// Here, source table is a view which has no input format. So to be
// consistent with CREATE TABLE, default input format is assumed to be
// TEXT unless otherwise specified.
setStorageDescriptorFileFormat(tbl.getSd(), THdfsFileFormat.TEXT);
}
// Set the row count of this table to unknown.
tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1");
setDefaultTableCapabilities(tbl);
LOG.trace(String.format("Creating table %s LIKE %s", tblName, srcTblName));
createTable(tbl, params.if_not_exists, null, params.server_name, null, null,
response);
}
private static void setDefaultTableCapabilities(
org.apache.hadoop.hive.metastore.api.Table tbl) {
if (MetastoreShim.getMajorVersion() > 2) {
// This hms table is for create table.
// It needs read/write access type, not default value(0, undefined)
MetastoreShim.setTableAccessType(tbl, ACCESSTYPE_READWRITE);
// Set table default capabilities in HMS
if (tbl.getParameters().containsKey(CAPABILITIES_KEY)) return;
if (AcidUtils.isTransactionalTable(tbl.getParameters())) {
Preconditions.checkState(!AcidUtils.isFullAcidTable(tbl.getParameters()));
tbl.getParameters().put(CAPABILITIES_KEY, ACIDINSERTONLY_CAPABILITIES);
} else {
// Managed KUDU table has issues with extra table properties:
// 1. The property is not stored. 2. The table cannot be found after created.
// Related jira: IMPALA-8751
// Skip adding default capabilities for KUDU tables before the issues are fixed.
if (!KuduTable.isKuduTable(tbl)) {
tbl.getParameters().put(CAPABILITIES_KEY, NONACID_CAPABILITIES);
}
}
}
}
/**
* Sets the given params in the metastore table as appropriate for a
* create view operation.
*/
private void setCreateViewAttributes(TCreateOrAlterViewParams params,
org.apache.hadoop.hive.metastore.api.Table view) {
view.setTableType(TableType.VIRTUAL_VIEW.toString());
view.setViewOriginalText(params.getOriginal_view_def());
view.setViewExpandedText(params.getExpanded_view_def());
view.setDbName(params.getView_name().getDb_name());
view.setTableName(params.getView_name().getTable_name());
view.setOwner(params.getOwner());
if (view.getParameters() == null) view.setParameters(new HashMap<String, String>());
if (params.isSetComment() && params.getComment() != null) {
view.getParameters().put("comment", params.getComment());
}
StorageDescriptor sd = new StorageDescriptor();
// Add all the columns to a new storage descriptor.
sd.setCols(buildFieldSchemaList(params.getColumns()));
// Set a dummy SerdeInfo for Hive.
sd.setSerdeInfo(new SerDeInfo());
view.setSd(sd);
}
/**
* Sets the given params in the metastore table as appropriate for an
* alter view operation.
*/
private void setAlterViewAttributes(TCreateOrAlterViewParams params,
org.apache.hadoop.hive.metastore.api.Table view) {
view.setViewOriginalText(params.getOriginal_view_def());
view.setViewExpandedText(params.getExpanded_view_def());
if (params.isSetComment() && params.getComment() != null) {
view.getParameters().put("comment", params.getComment());
}
// Add all the columns to a new storage descriptor.
view.getSd().setCols(buildFieldSchemaList(params.getColumns()));
}
/**
* Appends one or more columns to the given table. Returns true if there a column was
* added; false otherwise.
*/
private boolean alterTableAddCols(Table tbl, List<TColumn> columns, boolean ifNotExists)
throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
List<TColumn> colsToAdd = new ArrayList<>();
for (TColumn column: columns) {
Column col = tbl.getColumn(column.getColumnName());
if (ifNotExists && col != null) continue;
if (col != null) {
throw new CatalogException(
String.format("Column '%s' in table '%s' already exists.",
col.getName(), tbl.getName()));
}
colsToAdd.add(column);
}
// Only add columns that do not exist.
if (!colsToAdd.isEmpty()) {
// Append the new column to the existing list of columns.
msTbl.getSd().getCols().addAll(buildFieldSchemaList(colsToAdd));
applyAlterTable(msTbl);
return true;
}
return false;
}
/**
* Replaces all existing columns to the given table.
*/
private void alterTableReplaceCols(Table tbl, List<TColumn> columns)
throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
List<FieldSchema> newColumns = buildFieldSchemaList(columns);
msTbl.getSd().setCols(newColumns);
String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS;
if (msTbl.getParameters().containsKey(sortByKey)) {
String oldColumns = msTbl.getParameters().get(sortByKey);
String alteredColumns = MetaStoreUtil.intersectCsvListWithColumNames(oldColumns,
columns);
msTbl.getParameters().put(sortByKey, alteredColumns);
}
applyAlterTable(msTbl);
}
/**
* Changes the column definition of an existing column. This can be used to rename a
* column, add a comment to a column, or change the datatype of a column.
*/
private void alterTableAlterCol(Table tbl, String colName,
TColumn newCol) throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
// Find the matching column name and change it.
Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator();
while (iterator.hasNext()) {
FieldSchema fs = iterator.next();
if (fs.getName().toLowerCase().equals(colName.toLowerCase())) {
fs.setName(newCol.getColumnName());
Type type = Type.fromThrift(newCol.getColumnType());
fs.setType(type.toSql().toLowerCase());
// Don't overwrite the existing comment unless a new comment is given
if (newCol.getComment() != null) {
fs.setComment(newCol.getComment());
}
String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS;
if (msTbl.getParameters().containsKey(sortByKey)) {
String oldColumns = msTbl.getParameters().get(sortByKey);
String alteredColumns = MetaStoreUtil.replaceValueInCsvList(oldColumns, colName,
newCol.getColumnName());
msTbl.getParameters().put(sortByKey, alteredColumns);
}
break;
}
if (!iterator.hasNext()) {
throw new ColumnNotFoundException(String.format(
"Column name %s not found in table %s.", colName, tbl.getFullName()));
}
}
applyAlterTable(msTbl);
}
/**
* Adds new partitions to the given table in HMS. Also creates and adds new
* HdfsPartitions to the corresponding HdfsTable. Returns the table object with an
* updated catalog version or null if the table is not altered because all the
* partitions already exist and IF NOT EXISTS is specified.
* If IF NOT EXISTS is not used and there is a conflict with the partitions that already
* exist in HMS or catalog cache, then:
* - HMS and catalog cache are left intact, and
* - ImpalaRuntimeException is thrown.
* If IF NOT EXISTS is used, conflicts are handled as follows:
* 1. If a partition exists in catalog cache, ignore it.
* 2. If a partition exists in HMS but not in catalog cache, reload partition from HMS.
* Caching directives are only applied to new partitions that were absent from both the
* catalog cache and the HMS.
*/
private Table alterTableAddPartitions(Table tbl,
TAlterTableAddPartitionParams addPartParams) throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
TableName tableName = tbl.getTableName();
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
boolean ifNotExists = addPartParams.isIf_not_exists();
List<Partition> allHmsPartitionsToAdd = Lists.newArrayList();
Map<List<String>, THdfsCachingOp> partitionCachingOpMap = Maps.newHashMap();
for (TPartitionDef partParams: addPartParams.getPartitions()) {
List<TPartitionKeyValue> partitionSpec = partParams.getPartition_spec();
if (catalog_.containsHdfsPartition(tableName.getDb(), tableName.getTbl(),
partitionSpec)) {
String partitionSpecStr = Joiner.on(", ").join(partitionSpec);
if (!ifNotExists) {
throw new ImpalaRuntimeException(String.format("Partition already " +
"exists: (%s)", partitionSpecStr));
}
LOG.trace(String.format("Skipping partition creation because (%s) already " +
"exists and IF NOT EXISTS was specified.", partitionSpecStr));
continue;
}
Partition hmsPartition =
createHmsPartition(partitionSpec, msTbl, tableName, partParams.getLocation());
allHmsPartitionsToAdd.add(hmsPartition);
THdfsCachingOp cacheOp = partParams.getCache_op();
if (cacheOp != null) partitionCachingOpMap.put(hmsPartition.getValues(), cacheOp);
}
if (allHmsPartitionsToAdd.isEmpty()) return null;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
List<Partition> addedHmsPartitions = Lists.newArrayList();
for (List<Partition> hmsSublist :
Lists.partition(allHmsPartitionsToAdd, MAX_PARTITION_UPDATES_PER_RPC)) {
try {
addedHmsPartitions.addAll(msClient.getHiveClient().add_partitions(hmsSublist,
ifNotExists, true));
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
}
}
// Handle HDFS cache. This is done in a separate round bacause we have to apply
// caching only to newly added partitions.
alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions,
partitionCachingOpMap);
// If 'ifNotExists' is true, add_partitions() may fail to add all the partitions to
// HMS because some of them may already exist there. In that case, we load in the
// catalog the partitions that already exist in HMS but aren't in the catalog yet.
if (allHmsPartitionsToAdd.size() != addedHmsPartitions.size()) {
List<Partition> difference = computeDifference(allHmsPartitionsToAdd,
addedHmsPartitions);
addedHmsPartitions.addAll(
getPartitionsFromHms(msTbl, msClient, tableName, difference));
}
addHdfsPartitions(tbl, addedHmsPartitions);
}
return tbl;
}
/**
* Returns the list of Partition objects from 'aList' that cannot be found in 'bList'.
* Partition objects are distinguished by partition values only.
*/
private List<Partition> computeDifference(List<Partition> aList,
List<Partition> bList) {
Set<List<String>> bSet = Sets.newHashSet();
for (Partition b: bList) bSet.add(b.getValues());
List<Partition> diffList = Lists.newArrayList();
for (Partition a: aList) {
if (!bSet.contains(a.getValues())) diffList.add(a);
}
return diffList;
}
/**
* Returns a list of partitions retrieved from HMS for each 'hmsPartitions' element.
*/
private List<Partition> getPartitionsFromHms(
org.apache.hadoop.hive.metastore.api.Table msTbl, MetaStoreClient msClient,
TableName tableName, List<Partition> hmsPartitions)
throws ImpalaException {
List<String> partitionCols = Lists.newArrayList();
for (FieldSchema fs: msTbl.getPartitionKeys()) partitionCols.add(fs.getName());
List<String> partitionNames = Lists.newArrayListWithCapacity(hmsPartitions.size());
for (Partition part: hmsPartitions) {
String partName = org.apache.hadoop.hive.common.FileUtils.makePartName(
partitionCols, part.getValues());
partitionNames.add(partName);
}
try {
return msClient.getHiveClient().getPartitionsByNames(tableName.getDb(),
tableName.getTbl(), partitionNames);
} catch (TException e) {
throw new ImpalaRuntimeException("Metadata inconsistency has occured. Please run "
+ "'invalidate metadata <tablename>' to resolve the problem.", e);
}
}
/**
* Applies HDFS caching ops on 'hmsPartitions' and updates their metadata in Hive
* Metastore.
* 'partitionCachingOpMap' maps partitions (identified by their partition values) to
* their corresponding HDFS caching ops.
*/
private void alterTableCachePartitions(org.apache.hadoop.hive.metastore.api.Table msTbl,
MetaStoreClient msClient, TableName tableName, List<Partition> hmsPartitions,
Map<List<String>, THdfsCachingOp> partitionCachingOpMap)
throws ImpalaException {
// Handle HDFS cache
List<Long> cacheIds = Lists.newArrayList();
List<Partition> hmsPartitionsToCache = Lists.newArrayList();
Long parentTblCacheDirId = HdfsCachingUtil.getCacheDirectiveId(msTbl.getParameters());
for (Partition partition: hmsPartitions) {
THdfsCachingOp cacheOp = partitionCachingOpMap.get(partition.getValues());
String cachePoolName = null;
Short replication = null;
if (cacheOp == null && parentTblCacheDirId != null) {
// The user didn't specify an explicit caching operation, inherit the value
// from the parent table.
cachePoolName = HdfsCachingUtil.getCachePool(parentTblCacheDirId);
Preconditions.checkNotNull(cachePoolName);
replication = HdfsCachingUtil.getCacheReplication(parentTblCacheDirId);
Preconditions.checkNotNull(replication);
} else if (cacheOp != null && cacheOp.isSet_cached()) {
// The user explicitly stated that this partition should be cached.
cachePoolName = cacheOp.getCache_pool_name();
// When the new partition should be cached and and no replication factor
// was specified, inherit the replication factor from the parent table if
// it is cached. If the parent is not cached and no replication factor is
// explicitly set, use the default value.
if (!cacheOp.isSetReplication() && parentTblCacheDirId != null) {
replication = HdfsCachingUtil.getCacheReplication(parentTblCacheDirId);
} else {
replication = HdfsCachingUtil.getReplicationOrDefault(cacheOp);
}
}
// If cache pool name is not null, it indicates this partition should be cached.
if (cachePoolName != null) {
long id = HdfsCachingUtil.submitCachePartitionDirective(partition,
cachePoolName, replication);
cacheIds.add(id);
hmsPartitionsToCache.add(partition);
}
}
// Update the partition metadata to include the cache directive id.
if (!cacheIds.isEmpty()) {
applyAlterHmsPartitions(msTbl, msClient, tableName, hmsPartitionsToCache);
catalog_.watchCacheDirs(cacheIds, tableName.toThrift(),
"ALTER TABLE CACHE PARTITIONS");
}
}
/**
* Drops existing partitions from the given table in Hive. If a partition is cached,
* the associated cache directive will also be removed.
* Also drops the corresponding partitions from its Hdfs table.
* Returns the table object with an updated catalog version. If none of the partitions
* exists and "IfExists" is true, null is returned. If purge is true, partition data is
* permanently deleted. numUpdatedPartitions is used to inform the client how many
* partitions being dropped in this operation.
*/
private Table alterTableDropPartition(Table tbl,
List<List<TPartitionKeyValue>> partitionSet,
boolean ifExists, boolean purge, Reference<Long> numUpdatedPartitions)
throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
Preconditions.checkNotNull(partitionSet);
TableName tableName = tbl.getTableName();
if (!ifExists) {
Preconditions.checkState(!partitionSet.isEmpty());
} else {
if (partitionSet.isEmpty()) {
LOG.trace(String.format("Ignoring empty partition list when dropping " +
"partitions from %s because ifExists is true.", tableName));
return tbl;
}
}
Preconditions.checkArgument(tbl instanceof HdfsTable);
List<HdfsPartition> parts =
((HdfsTable) tbl).getPartitionsFromPartitionSet(partitionSet);
if (!ifExists && parts.isEmpty()) {
throw new PartitionNotFoundException(
"The partitions being dropped don't exist any more");
}
PartitionDropOptions dropOptions = PartitionDropOptions.instance();
dropOptions.purgeData(purge);
long numTargetedPartitions = 0L;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
for (HdfsPartition part : parts) {
try {
msClient.getHiveClient().dropPartition(tableName.getDb(), tableName.getTbl(),
part.getPartitionValuesAsStrings(true), dropOptions);
++numTargetedPartitions;
} catch (NoSuchObjectException e) {
if (!ifExists) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "dropPartition"), e);
}
LOG.trace(
String.format("Ignoring '%s' when dropping partitions from %s because" +
" ifExists is true.", e, tableName));
}
}
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "dropPartition"), e);
}
numUpdatedPartitions.setRef(numTargetedPartitions);
return catalog_.dropPartitions(tbl, partitionSet);
}
/**
* Removes a column from the given table.
*/
private void alterTableDropCol(Table tbl, String colName) throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
// Find the matching column name and remove it.
Iterator<FieldSchema> iterator = msTbl.getSd().getColsIterator();
while (iterator.hasNext()) {
FieldSchema fs = iterator.next();
if (fs.getName().toLowerCase().equals(colName.toLowerCase())) {
iterator.remove();
break;
}
if (!iterator.hasNext()) {
throw new ColumnNotFoundException(String.format(
"Column name %s not found in table %s.", colName, tbl.getFullName()));
}
}
String sortByKey = AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS;
if (msTbl.getParameters().containsKey(sortByKey)) {
String oldColumns = msTbl.getParameters().get(sortByKey);
String alteredColumns = MetaStoreUtil.removeValueFromCsvList(oldColumns, colName);
msTbl.getParameters().put(sortByKey, alteredColumns);
}
applyAlterTable(msTbl);
}
/**
* Renames an existing table or view.
* After renaming the table/view, its metadata is marked as invalid and will be
* reloaded on the next access.
*/
private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
TDdlExecResponse response) throws ImpalaException {
Preconditions.checkState(oldTbl.getLock().isHeldByCurrentThread()
&& catalog_.getLock().isWriteLockedByCurrentThread());
TableName tableName = oldTbl.getTableName();
org.apache.hadoop.hive.metastore.api.Table msTbl =
oldTbl.getMetaStoreTable().deepCopy();
msTbl.setDbName(newTableName.getDb());
msTbl.setTableName(newTableName.getTbl());
// If oldTbl is a synchronized Kudu table, rename the underlying Kudu table.
boolean isSynchronizedTable = (oldTbl instanceof KuduTable) &&
KuduTable.isSynchronizedTable(msTbl);
boolean altersHMSTable = true;
if (isSynchronizedTable) {
Preconditions.checkState(KuduTable.isKuduTable(msTbl));
boolean isKuduHmsIntegrationEnabled = isKuduHmsIntegrationEnabled(msTbl);
altersHMSTable = !isKuduHmsIntegrationEnabled;
renameManagedKuduTable((KuduTable) oldTbl, msTbl, newTableName,
isKuduHmsIntegrationEnabled);
}
// Always updates the HMS metadata for non-Kudu tables. For Kudu tables, when
// Kudu is not integrated with the Hive Metastore or if this is an external table,
// Kudu will not automatically update the HMS metadata, we have to do it
// manually.
if (altersHMSTable) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msClient.getHiveClient().alter_table(
tableName.getDb(), tableName.getTbl(), msTbl);
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
}
}
// Rename the table in the Catalog and get the resulting catalog object.
// ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
Pair<Table, Table> result =
catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
if (result.first == null || result.second == null) {
// The rename succeeded in the HMS but failed in the catalog cache. The cache is in
// an inconsistent state, but can likely be fixed by running "invalidate metadata".
throw new ImpalaRuntimeException(String.format(
"Table/view rename succeeded in the Hive Metastore, but failed in Impala's " +
"Catalog Server. Running 'invalidate metadata <tbl>' on the old table name " +
"'%s' and the new table name '%s' may fix the problem." , tableName.toString(),
newTableName.toString()));
}
// TODO(todd): if client is a 'v2' impalad, only send back invalidation
response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
response.result.setVersion(result.second.getCatalogVersion());
addSummary(response, "Renaming was successful.");
}
/**
* Renames the underlying Kudu table for the given managed table. If the new Kudu
* table name is the same as the old Kudu table name, this method does nothing.
*/
private void renameManagedKuduTable(KuduTable oldTbl,
org.apache.hadoop.hive.metastore.api.Table oldMsTbl,
TableName newTableName, boolean isHMSIntegrationEanbled)
throws ImpalaRuntimeException {
String newKuduTableName = KuduUtil.getDefaultKuduTableName(
newTableName.getDb(), newTableName.getTbl(),
isHMSIntegrationEanbled);
// If the name of the Kudu table has not changed, do nothing
if (oldTbl.getKuduTableName().equals(newKuduTableName)) return;
KuduCatalogOpExecutor.renameTable(oldTbl, newKuduTableName);
// Add the name of the new Kudu table to the HMS table parameters
oldMsTbl.getParameters().put(KuduTable.KEY_TABLE_NAME, newKuduTableName);
}
/**
* Changes the file format for the given table or partitions. This is a metadata only
* operation, existing table data will not be converted to the new format. Returns
* true if the file metadata to be reloaded.
*/
private boolean alterTableSetFileFormat(Table tbl,
List<List<TPartitionKeyValue>> partitionSet, THdfsFileFormat fileFormat,
Reference<Long> numUpdatedPartitions)
throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
boolean reloadFileMetadata = false;
if (partitionSet == null) {
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
setStorageDescriptorFileFormat(msTbl.getSd(), fileFormat);
// The prototype partition must be updated if the file format is changed so that new
// partitions are created with the new file format.
if (tbl instanceof HdfsTable) ((HdfsTable) tbl).setPrototypePartition(msTbl.getSd());
applyAlterTable(msTbl);
reloadFileMetadata = true;
} else {
Preconditions.checkArgument(tbl instanceof HdfsTable);
List<HdfsPartition> partitions =
((HdfsTable) tbl).getPartitionsFromPartitionSet(partitionSet);
List<HdfsPartition> modifiedParts = Lists.newArrayList();
for(HdfsPartition partition: partitions) {
partition.setFileFormat(HdfsFileFormat.fromThrift(fileFormat));
modifiedParts.add(partition);
}
bulkAlterPartitions(tbl, modifiedParts, null);
numUpdatedPartitions.setRef((long) modifiedParts.size());
}
return reloadFileMetadata;
}
/**
* Changes the row format for the given table or partitions. This is a metadata only
* operation, existing table data will not be converted to the new format. Returns
* true if the file metadata to be reloaded.
*/
private boolean alterTableSetRowFormat(Table tbl,
List<List<TPartitionKeyValue>> partitionSet, TTableRowFormat tRowFormat,
Reference<Long> numUpdatedPartitions)
throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
Preconditions.checkArgument(tbl instanceof HdfsTable);
boolean reloadFileMetadata = false;
RowFormat rowFormat = RowFormat.fromThrift(tRowFormat);
if (partitionSet == null) {
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
StorageDescriptor sd = msTbl.getSd();
HiveStorageDescriptorFactory.setSerdeInfo(rowFormat, sd.getSerdeInfo());
// The prototype partition must be updated if the row format is changed so that new
// partitions are created with the new file format.
((HdfsTable) tbl).setPrototypePartition(msTbl.getSd());
applyAlterTable(msTbl);
reloadFileMetadata = true;
} else {
List<HdfsPartition> partitions =
((HdfsTable) tbl).getPartitionsFromPartitionSet(partitionSet);
List<HdfsPartition> modifiedParts = Lists.newArrayList();
for(HdfsPartition partition: partitions) {
HiveStorageDescriptorFactory.setSerdeInfo(rowFormat, partition.getSerdeInfo());
modifiedParts.add(partition);
}
bulkAlterPartitions(tbl, modifiedParts, null);
numUpdatedPartitions.setRef((long) modifiedParts.size());
}
return reloadFileMetadata;
}
/**
* Helper method for setting the file format on a given storage descriptor.
*/
private static void setStorageDescriptorFileFormat(StorageDescriptor sd,
THdfsFileFormat fileFormat) {
StorageDescriptor tempSd =
HiveStorageDescriptorFactory.createSd(fileFormat, RowFormat.DEFAULT_ROW_FORMAT);
sd.setInputFormat(tempSd.getInputFormat());
sd.setOutputFormat(tempSd.getOutputFormat());
sd.getSerdeInfo().setSerializationLib(tempSd.getSerdeInfo().getSerializationLib());
}
/**
* Changes the HDFS storage location for the given table. This is a metadata only
* operation, existing table data will not be as part of changing the location.
*/
private boolean alterTableSetLocation(Table tbl,
List<TPartitionKeyValue> partitionSpec, String location) throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
boolean reloadFileMetadata = false;
if (partitionSpec == null) {
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
if (msTbl.getPartitionKeysSize() == 0) reloadFileMetadata = true;
msTbl.getSd().setLocation(location);
applyAlterTable(msTbl);
} else {
TableName tableName = tbl.getTableName();
HdfsPartition partition = catalog_.getHdfsPartition(
tableName.getDb(), tableName.getTbl(), partitionSpec);
partition.setLocation(location);
try {
applyAlterPartition(tbl, partition);
} finally {
partition.markDirty();
}
}
return reloadFileMetadata;
}
/**
* Appends to the table or partitions property metadata for the given table, replacing
* the values of any keys that already exist.
*/
private void alterTableSetTblProperties(Table tbl,
TAlterTableSetTblPropertiesParams params, Reference<Long> numUpdatedPartitions)
throws ImpalaException {
Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
Map<String, String> properties = params.getProperties();
Preconditions.checkNotNull(properties);
if (params.isSetPartition_set()) {
Preconditions.checkArgument(tbl instanceof HdfsTable);
List<HdfsPartition> partitions =
((HdfsTable) tbl).getPartitionsFromPartitionSet(params.getPartition_set());
List<HdfsPartition> modifiedParts = Lists.newArrayList();
for(HdfsPartition partition: partitions) {
switch (params.getTarget()) {
case TBL_PROPERTY:
partition.getParameters().putAll(properties);
break;
case SERDE_PROPERTY:
partition.getSerdeInfo().getParameters().putAll(properties);
break;
default:
throw new UnsupportedOperationException(
"Unknown target TTablePropertyType: " + params.getTarget());
}
modifiedParts.add(partition);
}
try {
bulkAlterPartitions(tbl, modifiedParts, null);
} finally {
for (HdfsPartition modifiedPart : modifiedParts) {
modifiedPart.markDirty();
}
}
numUpdatedPartitions.setRef((long) modifiedParts.size());
} else {
// Alter table params.
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
switch (params.getTarget()) {
case TBL_PROPERTY:
if (KuduTable.isKuduTable(msTbl)) {
// If 'kudu.table_name' is specified and this is a synchronized table, rename
// the underlying Kudu table.
// TODO(IMPALA-8618): this should be disallowed since IMPALA-5654
if (properties.containsKey(KuduTable.KEY_TABLE_NAME)
&& !properties.get(KuduTable.KEY_TABLE_NAME).equals(
msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME))
&& KuduTable.isSynchronizedTable(msTbl)) {
KuduCatalogOpExecutor.renameTable((KuduTable) tbl,
properties.get(KuduTable.KEY_TABLE_NAME));
}
msTbl.getParameters().putAll(properties);
// Validate that the new table properties are valid and that
// the Kudu table is accessible.
KuduCatalogOpExecutor.validateKuduTblExists(msTbl);
} else {
msTbl.getParameters().putAll(properties);
}
break;
case SERDE_PROPERTY:
msTbl.getSd().getSerdeInfo().getParameters().putAll(properties);
break;
default:
throw new UnsupportedOperationException(
"Unknown target TTablePropertyType: " + params.getTarget());
}
applyAlterTable(msTbl);
}
}
/**
* Caches or uncaches the HDFS location of the target table and updates the
* table's metadata in Hive Metastore Store. If this is a partitioned table,
* all uncached partitions will also be cached. The table/partition metadata
* will be updated to include the ID of each cache directive that was submitted.
* If the table is being uncached, any outstanding cache directives will be dropped
* and the cache directive ID property key will be cleared. For partitioned tables,
* marks the partitions that are affected as 'dirty'. For unpartitioned tables, it
* returns true to indicate that the file metadata of the table must be reloaded.
*/
private boolean alterTableSetCached(Table tbl, TAlterTableSetCachedParams params)
throws ImpalaException {
Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
THdfsCachingOp cacheOp = params.getCache_op();
Preconditions.checkNotNull(cacheOp);
// Alter table params.
if (!(tbl instanceof HdfsTable)) {
throw new ImpalaRuntimeException("ALTER TABLE SET CACHED/UNCACHED must target " +
"an HDFS table.");
}
boolean loadFileMetadata = false;
TableName tableName = tbl.getTableName();
HdfsTable hdfsTable = (HdfsTable) tbl;
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
Long cacheDirId = HdfsCachingUtil.getCacheDirectiveId(msTbl.getParameters());
if (cacheOp.isSet_cached()) {
// List of cache directive IDs that were submitted as part of this
// ALTER TABLE operation.
List<Long> cacheDirIds = Lists.newArrayList();
short cacheReplication = HdfsCachingUtil.getReplicationOrDefault(cacheOp);
// If the table was not previously cached (cacheDirId == null) we issue a new
// directive for this table. If the table was already cached, we validate
// the pool name and update the cache replication factor if necessary
if (cacheDirId == null) {
cacheDirIds.add(HdfsCachingUtil.submitCacheTblDirective(msTbl,
cacheOp.getCache_pool_name(), cacheReplication));
} else {
// Check if the cache directive needs to be changed
if (HdfsCachingUtil.isUpdateOp(cacheOp, msTbl.getParameters())) {
HdfsCachingUtil.validateCachePool(cacheOp, cacheDirId, tableName);
cacheDirIds.add(HdfsCachingUtil.modifyCacheDirective(cacheDirId, msTbl,
cacheOp.getCache_pool_name(), cacheReplication));
}
}
if (tbl.getNumClusteringCols() > 0) {
// If this is a partitioned table, submit cache directives for all uncached
// partitions.
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions(hdfsTable);
for (FeFsPartition fePartition: parts) {
// TODO(todd): avoid downcast
HdfsPartition partition = (HdfsPartition) fePartition;
// Only issue cache directives if the data is uncached or the cache directive
// needs to be updated
if (!partition.isMarkedCached() ||
HdfsCachingUtil.isUpdateOp(cacheOp, partition.getParameters())) {
try {
// If the partition was already cached, update the directive otherwise
// issue new cache directive
if (!partition.isMarkedCached()) {
cacheDirIds.add(HdfsCachingUtil.submitCachePartitionDirective(
partition, cacheOp.getCache_pool_name(), cacheReplication));
} else {
Long directiveId = HdfsCachingUtil.getCacheDirectiveId(
partition.getParameters());
cacheDirIds.add(HdfsCachingUtil.modifyCacheDirective(directiveId,
partition, cacheOp.getCache_pool_name(), cacheReplication));
}
} catch (ImpalaRuntimeException e) {
if (partition.isMarkedCached()) {
LOG.error("Unable to modify cache partition: " +
partition.getPartitionName(), e);
} else {
LOG.error("Unable to cache partition: " +
partition.getPartitionName(), e);
}
}
// Update the partition metadata.
try {
applyAlterPartition(tbl, partition);
} finally {
partition.markDirty();
}
}
}
} else {
loadFileMetadata = true;
}
// Nothing to do.
if (cacheDirIds.isEmpty()) return loadFileMetadata;
// Submit a request to watch these cache directives. The TableLoadingMgr will
// asynchronously refresh the table metadata once the directives complete.
catalog_.watchCacheDirs(cacheDirIds, tableName.toThrift(),
"ALTER TABLE SET CACHED");
} else {
// Uncache the table.
if (cacheDirId != null) HdfsCachingUtil.removeTblCacheDirective(msTbl);
// Uncache all table partitions.
if (tbl.getNumClusteringCols() > 0) {
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions(hdfsTable);
for (FeFsPartition fePartition: parts) {
// TODO(todd): avoid downcast
HdfsPartition partition = (HdfsPartition) fePartition;
if (partition.isMarkedCached()) {
HdfsCachingUtil.removePartitionCacheDirective(partition);
try {
applyAlterPartition(tbl, partition);
} finally {
partition.markDirty();
}
}
}
} else {
loadFileMetadata = true;
}
}
// Update the table metadata.
applyAlterTable(msTbl);
return loadFileMetadata;
}
/**
* Caches or uncaches the HDFS location of the target partitions and updates the
* partitions' metadata in Hive Metastore Store. If a partition is being cached, the
* partition properties will have the ID of the cache directive added. If the partition
* is being uncached, any outstanding cache directive will be dropped and the cache
* directive ID property key will be cleared.
*/
private void alterPartitionSetCached(Table tbl,
TAlterTableSetCachedParams params, Reference<Long> numUpdatedPartitions)
throws ImpalaException {
Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
THdfsCachingOp cacheOp = params.getCache_op();
Preconditions.checkNotNull(cacheOp);
Preconditions.checkNotNull(params.getPartition_set());
TableName tableName = tbl.getTableName();
Preconditions.checkArgument(tbl instanceof HdfsTable);
List<HdfsPartition> partitions =
((HdfsTable) tbl).getPartitionsFromPartitionSet(params.getPartition_set());
List<HdfsPartition> modifiedParts = Lists.newArrayList();
if (cacheOp.isSet_cached()) {
for (HdfsPartition partition : partitions) {
// The directive is null if the partition is not cached
Long directiveId =
HdfsCachingUtil.getCacheDirectiveId(partition.getParameters());
short replication = HdfsCachingUtil.getReplicationOrDefault(cacheOp);
List<Long> cacheDirs = Lists.newArrayList();
if (directiveId == null) {
cacheDirs.add(HdfsCachingUtil.submitCachePartitionDirective(
partition, cacheOp.getCache_pool_name(), replication));
} else {
if (HdfsCachingUtil.isUpdateOp(cacheOp, partition.getParameters())) {
HdfsCachingUtil.validateCachePool(cacheOp, directiveId, tableName, partition);
cacheDirs.add(HdfsCachingUtil.modifyCacheDirective(
directiveId, partition, cacheOp.getCache_pool_name(),
replication));
}
}
// Once the cache directives are submitted, observe the status of the caching
// until no more progress is made -- either fully cached or out of cache memory
if (!cacheDirs.isEmpty()) {
catalog_.watchCacheDirs(cacheDirs, tableName.toThrift(),
"ALTER PARTITION SET CACHED");
}
if (!partition.isMarkedCached()) {
modifiedParts.add(partition);
}
}
} else {
for (HdfsPartition partition : partitions) {
if (partition.isMarkedCached()) {
HdfsCachingUtil.removePartitionCacheDirective(partition);
modifiedParts.add(partition);
}
}
}
try {
bulkAlterPartitions(tbl, modifiedParts, null);
} finally {
for (HdfsPartition modifiedPart : modifiedParts) {
modifiedPart.markDirty();
}
}
numUpdatedPartitions.setRef((long) modifiedParts.size());
}
/**
* Recover partitions of specified table.
* Add partitions to metastore which exist in HDFS but not in metastore.
*/
private void alterTableRecoverPartitions(Table tbl) throws ImpalaException {
Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
if (!(tbl instanceof HdfsTable)) {
throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table");
}
HdfsTable hdfsTable = (HdfsTable) tbl;
List<List<String>> partitionsNotInHms = hdfsTable.getPathsWithoutPartitions();
if (partitionsNotInHms.isEmpty()) return;
List<Partition> hmsPartitions = Lists.newArrayList();
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
TableName tableName = tbl.getTableName();
for (List<String> partitionSpecValues: partitionsNotInHms) {
hmsPartitions.add(createHmsPartitionFromValues(
partitionSpecValues, msTbl, tableName, null));
}
String cachePoolName = null;
Short replication = null;
List<Long> cacheIds = Lists.newArrayList();
Long parentTblCacheDirId =
HdfsCachingUtil.getCacheDirectiveId(msTbl.getParameters());
if (parentTblCacheDirId != null) {
// Inherit the HDFS cache value from the parent table.
cachePoolName = HdfsCachingUtil.getCachePool(parentTblCacheDirId);
Preconditions.checkNotNull(cachePoolName);
replication = HdfsCachingUtil.getCacheReplication(parentTblCacheDirId);
Preconditions.checkNotNull(replication);
}
// Add partitions to metastore.
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
// Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
for (List<Partition> hmsSublist :
Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
// ifNotExists and needResults are true.
List<Partition> hmsAddedPartitions =
msClient.getHiveClient().add_partitions(hmsSublist, true, true);
addHdfsPartitions(tbl, hmsAddedPartitions);
// Handle HDFS cache.
if (cachePoolName != null) {
for (Partition partition: hmsAddedPartitions) {
long id = HdfsCachingUtil.submitCachePartitionDirective(partition,
cachePoolName, replication);
cacheIds.add(id);
}
// Update the partition metadata to include the cache directive id.
MetastoreShim.alterPartitions(msClient.getHiveClient(), tableName.getDb(),
tableName.getTbl(), hmsAddedPartitions);
}
}
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e);
}
if (!cacheIds.isEmpty()) {
catalog_.watchCacheDirs(cacheIds, tableName.toThrift(),
"ALTER TABLE RECOVER PARTITIONS");
}
}
private void alterTableOrViewSetOwner(Table tbl, TAlterTableOrViewSetOwnerParams params,
TDdlExecResponse response) throws ImpalaException {
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
String oldOwner = msTbl.getOwner();
PrincipalType oldOwnerType = msTbl.getOwnerType();
msTbl.setOwner(params.owner_name);
msTbl.setOwnerType(PrincipalType.valueOf(params.owner_type.name()));
applyAlterTable(msTbl);
if (authzConfig_.isEnabled()) {
authzManager_.updateTableOwnerPrivilege(params.server_name, msTbl.getDbName(),
msTbl.getTableName(), oldOwner, oldOwnerType, msTbl.getOwner(),
msTbl.getOwnerType(), response);
}
}
/**
* Create a new HMS Partition.
*/
private Partition createHmsPartition(List<TPartitionKeyValue> partitionSpec,
org.apache.hadoop.hive.metastore.api.Table msTbl, TableName tableName,
String location) {
List<String> values = Lists.newArrayList();
// Need to add in the values in the same order they are defined in the table.
for (FieldSchema fs: msTbl.getPartitionKeys()) {
for (TPartitionKeyValue kv: partitionSpec) {
if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
values.add(kv.getValue());
}
}
}
return createHmsPartitionFromValues(values, msTbl, tableName, location);
}
/**
* Create a new HMS Partition from partition values.
*/
private Partition createHmsPartitionFromValues(List<String> partitionSpecValues,
org.apache.hadoop.hive.metastore.api.Table msTbl, TableName tableName,
String location) {
// Create HMS Partition.
org.apache.hadoop.hive.metastore.api.Partition partition =
new org.apache.hadoop.hive.metastore.api.Partition();
partition.setDbName(tableName.getDb());
partition.setTableName(tableName.getTbl());
partition.setValues(partitionSpecValues);
StorageDescriptor sd = msTbl.getSd().deepCopy();
sd.setLocation(location);
partition.setSd(sd);
// if external event processing is enabled, add the catalog service identifiers
// from table to the partition
addCatalogServiceIdentifiers(msTbl, partition);
return partition;
}
/**
* Adds this catalog service id and the given catalog version to the partition
* parameters from table parameters. No-op if event processing is disabled
*/
private void addCatalogServiceIdentifiers(
org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition) {
if (!catalog_.isExternalEventProcessingEnabled())
return;
Preconditions.checkState(msTbl.isSetParameters());
Map<String, String> tblParams = msTbl.getParameters();
Preconditions
.checkState(tblParams.containsKey(
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey()),
"Table parameters must have catalog service identifier before "
+ "adding it to partition parameters");
Preconditions
.checkState(tblParams.containsKey(
MetastoreEventPropertyKey.CATALOG_VERSION.getKey()),
"Table parameters must contain catalog version before adding "
+ "it to partition parameters");
partition.putToParameters(
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
tblParams.get(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey()));
partition.putToParameters(
MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
tblParams.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey()));
}
/**
* Creates a new function in the Hive metastore. Returns true if successful
* and false if the call fails and ifNotExists is true.
*/
public boolean addJavaFunctionToHms(String db,
org.apache.hadoop.hive.metastore.api.Function fn, boolean ifNotExists)
throws ImpalaRuntimeException{
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msClient.getHiveClient().createFunction(fn);
} catch(AlreadyExistsException e) {
if (!ifNotExists) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createFunction"), e);
}
return false;
} catch (Exception e) {
LOG.error("Error executing createFunction() metastore call: " +
fn.getFunctionName(), e);
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createFunction"), e);
}
return true;
}
/**
* Drops the given function from Hive metastore. Returns true if successful
* and false if the function does not exist and ifExists is true.
*/
public boolean dropJavaFunctionFromHms(String db, String fn, boolean ifExists)
throws ImpalaRuntimeException {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msClient.getHiveClient().dropFunction(db, fn);
} catch (NoSuchObjectException e) {
if (!ifExists) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "dropFunction"), e);
}
return false;
} catch (TException e) {
LOG.error("Error executing dropFunction() metastore call: " + fn, e);
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "dropFunction"), e);
}
return true;
}
/**
* Updates the database object in the metastore.
*/
private void applyAlterDatabase(Database msDb)
throws ImpalaRuntimeException {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msClient.getHiveClient().alterDatabase(msDb.getName(), msDb);
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "alterDatabase"), e);
}
}
/**
* Conveniance function to call applyAlterTable(3) with default arguments.
*/
private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
throws ImpalaRuntimeException {
applyAlterTable(msTbl, true, null);
}
/**
* Applies an ALTER TABLE command to the metastore table.
* Note: The metastore interface is not very safe because it only accepts
* an entire metastore.api.Table object rather than a delta of what to change. This
* means an external modification to the table could be overwritten by an ALTER TABLE
* command if the metadata is not completely in-sync. This affects both Hive and
* Impala, but is more important in Impala because the metadata is cached for a
* longer period of time.
* If 'overwriteLastDdlTime' is true, then table property 'transient_lastDdlTime'
* is updated to current time so that metastore does not update it in the alter_table
* call.
*/
private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
boolean overwriteLastDdlTime, TblTransaction tblTxn)
throws ImpalaRuntimeException {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
if (overwriteLastDdlTime) {
// It would be enough to remove this table property, as HMS would fill it, but
// this would make it necessary to reload the table after alter_table in order to
// remain consistent with HMS.
Table.updateTimestampProperty(msTbl, Table.TBL_PROP_LAST_DDL_TIME);
}
// Avoid computing/setting stats on the HMS side because that may reset the
// 'numRows' table property (see HIVE-15653). The DO_NOT_UPDATE_STATS flag
// tells the HMS not to recompute/reset any statistics on its own. Any
// stats-related alterations passed in the RPC will still be applied.
msTbl.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
if (tblTxn != null) {
MetastoreShim.alterTableWithTransaction(msClient.getHiveClient(), msTbl, tblTxn);
} else {
try {
msClient.getHiveClient().alter_table(
msTbl.getDbName(), msTbl.getTableName(), msTbl);
}
catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
}
}
}
}
private void applyAlterPartition(Table tbl, HdfsPartition partition)
throws ImpalaException {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
Partition hmsPartition = partition.toHmsPartition();
addCatalogServiceIdentifiers(tbl.getMetaStoreTable(), hmsPartition);
applyAlterHmsPartitions(tbl.getMetaStoreTable().deepCopy(), msClient,
tbl.getTableName(), Arrays.asList(hmsPartition));
}
}
private void applyAlterHmsPartitions(org.apache.hadoop.hive.metastore.api.Table msTbl,
MetaStoreClient msClient, TableName tableName, List<Partition> hmsPartitions)
throws ImpalaException {
try {
MetastoreShim.alterPartitions(
msClient.getHiveClient(), tableName.getDb(), tableName.getTbl(), hmsPartitions);
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partitions"), e);
}
}
/**
* Creates a role on behalf of the requestingUser.
*/
private void createRole(User requestingUser,
TCreateDropRoleParams createDropRoleParams, TDdlExecResponse resp)
throws ImpalaException {
Preconditions.checkNotNull(requestingUser);
Preconditions.checkNotNull(createDropRoleParams);
Preconditions.checkNotNull(resp);
Preconditions.checkArgument(!createDropRoleParams.isIs_drop());
authzManager_.createRole(requestingUser, createDropRoleParams, resp);
addSummary(resp, "Role has been created.");
}
/**
* Drops a role on behalf of the requestingUser.
*/
private void dropRole(User requestingUser,
TCreateDropRoleParams createDropRoleParams, TDdlExecResponse resp)
throws ImpalaException {
Preconditions.checkNotNull(requestingUser);
Preconditions.checkNotNull(createDropRoleParams);
Preconditions.checkNotNull(resp);
Preconditions.checkArgument(createDropRoleParams.isIs_drop());
authzManager_.dropRole(requestingUser, createDropRoleParams, resp);
addSummary(resp, "Role has been dropped.");
}
/**
* Grants a role to the given group on behalf of the requestingUser.
*/
private void grantRoleToGroup(User requestingUser,
TGrantRevokeRoleParams grantRevokeRoleParams, TDdlExecResponse resp)
throws ImpalaException {
Preconditions.checkNotNull(requestingUser);
Preconditions.checkNotNull(grantRevokeRoleParams);
Preconditions.checkNotNull(resp);
Preconditions.checkArgument(grantRevokeRoleParams.isIs_grant());
authzManager_.grantRoleToGroup(requestingUser, grantRevokeRoleParams, resp);
addSummary(resp, "Role has been granted.");
}
/**
* Revokes a role from the given group on behalf of the requestingUser.
*/
private void revokeRoleFromGroup(User requestingUser,
TGrantRevokeRoleParams grantRevokeRoleParams, TDdlExecResponse resp)
throws ImpalaException {
Preconditions.checkNotNull(requestingUser);
Preconditions.checkNotNull(grantRevokeRoleParams);
Preconditions.checkNotNull(resp);
Preconditions.checkArgument(!grantRevokeRoleParams.isIs_grant());
authzManager_.revokeRoleFromGroup(requestingUser, grantRevokeRoleParams, resp);
addSummary(resp, "Role has been revoked.");
}
/**
* Grants one or more privileges to role on behalf of the requestingUser.
*/
private void grantPrivilege(TCatalogServiceRequestHeader header,
TGrantRevokePrivParams grantRevokePrivParams, TDdlExecResponse resp)
throws ImpalaException {
Preconditions.checkNotNull(header);
Preconditions.checkNotNull(grantRevokePrivParams);
Preconditions.checkNotNull(resp);
Preconditions.checkArgument(grantRevokePrivParams.isIs_grant());
switch (grantRevokePrivParams.principal_type) {
case ROLE:
authzManager_.grantPrivilegeToRole(header, grantRevokePrivParams, resp);
break;
case USER:
authzManager_.grantPrivilegeToUser(header, grantRevokePrivParams,
resp);
break;
case GROUP:
authzManager_.grantPrivilegeToGroup(header, grantRevokePrivParams,
resp);
break;
default:
throw new IllegalArgumentException("Unexpected principal type: " +
grantRevokePrivParams.principal_type);
}
addSummary(resp, "Privilege(s) have been granted.");
}
/**
* Revokes one or more privileges to role on behalf of the requestingUser.
*/
private void revokePrivilege(TCatalogServiceRequestHeader header,
TGrantRevokePrivParams grantRevokePrivParams, TDdlExecResponse resp)
throws ImpalaException {
Preconditions.checkNotNull(header);
Preconditions.checkNotNull(grantRevokePrivParams);
Preconditions.checkNotNull(resp);
Preconditions.checkArgument(!grantRevokePrivParams.isIs_grant());
switch (grantRevokePrivParams.principal_type) {
case ROLE:
authzManager_.revokePrivilegeFromRole(header, grantRevokePrivParams, resp);
break;
case USER:
authzManager_.revokePrivilegeFromUser(header, grantRevokePrivParams, resp);
break;
case GROUP:
authzManager_.revokePrivilegeFromGroup(header, grantRevokePrivParams, resp);
break;
default:
throw new IllegalArgumentException("Unexpected principal type: " +
grantRevokePrivParams.principal_type);
}
addSummary(resp, "Privilege(s) have been revoked.");
}
/**
* Alters partitions in batches of size 'MAX_PARTITION_UPDATES_PER_RPC'. This
* reduces the time spent in a single update and helps avoid metastore client
* timeouts.
*/
private void bulkAlterPartitions(Table tbl, List<HdfsPartition> modifiedParts,
TblTransaction tblTxn) throws ImpalaException {
List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
Lists.newArrayList();
for (HdfsPartition p: modifiedParts) {
org.apache.hadoop.hive.metastore.api.Partition msPart = p.toHmsPartition();
if (msPart != null) {
addCatalogServiceIdentifiers(tbl.getMetaStoreTable(), msPart);
hmsPartitions.add(msPart);
}
}
if (hmsPartitions.isEmpty()) return;
String dbName = tbl.getDb().getName();
String tableName = tbl.getName();
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
// Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
for (List<Partition> hmsPartitionsSubList :
Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
try {
// Alter partitions in bulk.
if (tblTxn != null) {
MetastoreShim.alterPartitionsWithTransaction(msClient.getHiveClient(), dbName,
tableName, hmsPartitionsSubList, tblTxn);
}
else {
MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName, tableName,
hmsPartitionsSubList);
}
// Mark the corresponding HdfsPartition objects as dirty
for (org.apache.hadoop.hive.metastore.api.Partition msPartition:
hmsPartitionsSubList) {
try {
catalog_.getHdfsPartition(dbName, tableName, msPartition).markDirty();
} catch (PartitionNotFoundException e) {
LOG.error(String.format("Partition of table %s could not be found: %s",
tableName, e.getMessage()));
continue;
}
}
} catch (TException e) {
throw new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partitions"), e);
}
}
}
}
/**
* Returns the metastore.api.Table object from the Hive Metastore for an existing
* fully loaded table.
*/
private org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable(
MetaStoreClient msClient, Table tbl) throws CatalogException {
Preconditions.checkState(!(tbl instanceof IncompleteTable));
Preconditions.checkNotNull(msClient);
Db db = tbl.getDb();
org.apache.hadoop.hive.metastore.api.Table msTbl = null;
Stopwatch hmsLoadSW = new Stopwatch().start();
long hmsLoadTime;
try {
msTbl = msClient.getHiveClient().getTable(db.getName(), tbl.getName());
} catch (Exception e) {
throw new TableLoadingException("Error loading metadata for table: " +
db.getName() + "." + tbl.getName(), e);
} finally {
hmsLoadTime = hmsLoadSW.elapsed(TimeUnit.NANOSECONDS);
}
tbl.updateHMSLoadTableSchemaTime(hmsLoadTime);
return msTbl;
}
/**
* Returns the metastore.api.Table object from the Hive Metastore for an existing
* fully loaded table. Gets the MetaStore object from 'catalog_'.
*/
private org.apache.hadoop.hive.metastore.api.Table getTableFromMetaStore(
TableName tblName) throws CatalogException {
Preconditions.checkNotNull(tblName);
org.apache.hadoop.hive.metastore.api.Table msTbl = null;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
msTbl = msClient.getHiveClient().getTable(tblName.getDb(),tblName.getTbl());
} catch (TException e) {
LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTable") + e.getMessage());
}
return msTbl;
}
private static List<FieldSchema> buildFieldSchemaList(List<TColumn> columns) {
List<FieldSchema> fsList = Lists.newArrayList();
// Add in all the columns
for (TColumn col: columns) {
Type type = Type.fromThrift(col.getColumnType());
// The type string must be lowercase for Hive to read the column metadata properly.
String typeSql = type.toSql().toLowerCase();
FieldSchema fs = new FieldSchema(col.getColumnName(), typeSql, col.getComment());
fsList.add(fs);
}
return fsList;
}
/**
* Executes a TResetMetadataRequest and returns the result as a
* TResetMetadataResponse. Based on the request parameters, this operation
* may do one of three things:
* 1) invalidate the entire catalog, forcing the metadata for all catalog
* objects to be reloaded.
* 2) invalidate a specific table, forcing the metadata to be reloaded
* on the next access.
* 3) perform a synchronous incremental refresh of a specific table.
* 4) perform a refresh on authorization metadata.
*
* For details on the specific commands see comments on their respective
* methods in CatalogServiceCatalog.java.
*/
public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
throws CatalogException {
String cmdString = String.format("%s issued by %s",
req.is_refresh ? "REFRESH":"INVALIDATE",
req.header != null ? req.header.requesting_user : " unknown user");
TResetMetadataResponse resp = new TResetMetadataResponse();
resp.setResult(new TCatalogUpdateResult());
resp.getResult().setCatalog_service_id(JniCatalog.getServiceId());
if (req.isSetDb_name()) {
Preconditions.checkState(!catalog_.isBlacklistedDb(req.getDb_name()),
String.format("Can't refresh functions in blacklisted database: %s. %s",
req.getDb_name(), BLACKLISTED_DBS_INCONSISTENT_ERR_STR));
// This is a "refresh functions" operation.
synchronized (metastoreDdlLock_) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
List<TCatalogObject> addedFuncs = Lists.newArrayList();
List<TCatalogObject> removedFuncs = Lists.newArrayList();
catalog_.refreshFunctions(msClient, req.getDb_name(), addedFuncs, removedFuncs);
resp.result.setUpdated_catalog_objects(addedFuncs);
resp.result.setRemoved_catalog_objects(removedFuncs);
resp.result.setVersion(catalog_.getCatalogVersion());
for (TCatalogObject removedFn: removedFuncs) {
catalog_.getDeleteLog().addRemovedObject(removedFn);
}
}
}
} else if (req.isSetTable_name()) {
// Results of an invalidate operation, indicating whether the table was removed
// from the Metastore, and whether a new database was added to Impala as a result
// of the invalidate operation. Always false for refresh.
Reference<Boolean> tblWasRemoved = new Reference<Boolean>(false);
Reference<Boolean> dbWasAdded = new Reference<Boolean>(false);
// Thrift representation of the result of the invalidate/refresh operation.
TCatalogObject updatedThriftTable = null;
if (req.isIs_refresh()) {
TableName tblName = TableName.fromThrift(req.getTable_name());
// Quick check to see if the table exists in the catalog without triggering
// a table load.
Table tbl = catalog_.getTable(tblName.getDb(), tblName.getTbl());
if (tbl != null) {
// If the table is not loaded, no need to perform refresh after the initial
// metadata load.
boolean isTableLoadedInCatalog = tbl.isLoaded();
tbl = getExistingTable(tblName.getDb(), tblName.getTbl(),
"Load triggered by " + cmdString);
if (tbl != null) {
if (isTableLoadedInCatalog) {
if (req.isSetPartition_spec()) {
boolean isTransactional = AcidUtils.isTransactionalTable(
tbl.getMetaStoreTable().getParameters());
Preconditions.checkArgument(!isTransactional);
updatedThriftTable = catalog_.reloadPartition(tbl,
req.getPartition_spec(), cmdString);
} else {
// TODO IMPALA-8809: Optimisation for partitioned tables:
// 1: Reload the whole table if schema change happened. Identify
// such scenario by checking Table.TBL_PROP_LAST_DDL_TIME property.
// Note, table level writeId is not updated by HMS for partitioned
// ACID tables, there is a Jira to cover this: HIVE-22062.
// 2: If no need for a full table reload then fetch partition level
// writeIds and reload only the ones that changed.
updatedThriftTable = catalog_.reloadTable(tbl, cmdString);
}
} else {
// Table was loaded from scratch, so it's already "refreshed".
tbl.getLock().lock();
try {
updatedThriftTable = tbl.toTCatalogObject();
} finally {
tbl.getLock().unlock();
}
}
}
}
} else {
updatedThriftTable = catalog_.invalidateTable(
req.getTable_name(), tblWasRemoved, dbWasAdded);
}
if (updatedThriftTable == null) {
// Table does not exist in the Metastore and Impala catalog, throw error.
throw new TableNotFoundException("Table not found: " +
req.getTable_name().getDb_name() + "." +
req.getTable_name().getTable_name());
}
// Return the TCatalogObject in the result to indicate this request can be
// processed as a direct DDL operation.
if (tblWasRemoved.getRef()) {
resp.getResult().addToRemoved_catalog_objects(updatedThriftTable);
} else {
resp.getResult().addToUpdated_catalog_objects(updatedThriftTable);
}
if (dbWasAdded.getRef()) {
Db addedDb = catalog_.getDb(updatedThriftTable.getTable().getDb_name());
if (addedDb == null) {
throw new CatalogException("Database " +
updatedThriftTable.getTable().getDb_name() + " was removed by a " +
"concurrent operation. Try invalidating the table again.");
}
resp.getResult().addToUpdated_catalog_objects(addedDb.toTCatalogObject());
}
resp.getResult().setVersion(updatedThriftTable.getCatalog_version());
} else if (req.isAuthorization()) {
AuthorizationDelta authzDelta = catalog_.refreshAuthorization(false);
resp.result.setUpdated_catalog_objects(authzDelta.getCatalogObjectsAdded());
resp.result.setRemoved_catalog_objects(authzDelta.getCatalogObjectsRemoved());
resp.result.setVersion(catalog_.getCatalogVersion());
} else {
// Invalidate the entire catalog if no table name is provided.
Preconditions.checkArgument(!req.isIs_refresh());
resp.getResult().setVersion(catalog_.reset());
resp.getResult().setIs_invalidate(true);
}
if (req.isSync_ddl()) {
resp.getResult().setVersion(catalog_.waitForSyncDdlVersion(resp.getResult()));
}
resp.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>()));
return resp;
}
/**
* Create any new partitions required as a result of an INSERT statement and refreshes
* the table metadata after every INSERT statement. Any new partitions will inherit
* their cache configuration from the parent table. That is, if the parent is cached
* new partitions created will also be cached and will be put in the same pool as the
* parent.
* If the insert touched any pre-existing partitions that were cached, a request to
* watch the associated cache directives will be submitted. This will result in an
* async table refresh once the cache request completes.
*/
public TUpdateCatalogResponse updateCatalog(TUpdateCatalogRequest update)
throws ImpalaException {
TUpdateCatalogResponse response = new TUpdateCatalogResponse();
// Only update metastore for Hdfs tables.
Table table = getExistingTable(update.getDb_name(), update.getTarget_table(),
"Load for INSERT");
if (!(table instanceof HdfsTable)) {
throw new InternalException("Unexpected table type: " +
update.getTarget_table());
}
tryLock(table, "updating the catalog");
final Timer.Context context
= table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time();
long transactionId = -1;
TblTransaction tblTxn = null;
if (update.isSetTransaction_id()) {
transactionId = update.getTransaction_id();
Preconditions.checkState(transactionId > 0);
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
// Setup transactional parameters needed to do alter table/partitions later.
// TODO: Could be optimized to possibly save some RPCs, as these parameters are
// not always needed + the writeId of the INSERT could be probably reused.
tblTxn = MetastoreShim.createTblTransaction(
msClient.getHiveClient(), table.getMetaStoreTable(), transactionId);
}
}
try {
// Get new catalog version for table in insert.
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
catalog_.getLock().writeLock().unlock();
// Collects the cache directive IDs of any cached table/partitions that were
// targeted. A watch on these cache directives is submitted to the
// TableLoadingMgr and the table will be refreshed asynchronously after all
// cache directives complete.
List<Long> cacheDirIds = Lists.<Long>newArrayList();
// If the table is cached, get its cache pool name and replication factor. New
// partitions will inherit this property.
Pair<String, Short> cacheInfo = table.getTableCacheInfo(cacheDirIds);
String cachePoolName = cacheInfo.first;
Short cacheReplication = cacheInfo.second;
TableName tblName = new TableName(table.getDb().getName(), table.getName());
List<String> errorMessages = Lists.newArrayList();
HashSet<String> partsToLoadMetadata = null;
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions((HdfsTable) table);
List<FeFsPartition> affectedExistingPartitions = new ArrayList<>();
List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitionsStatsUnset =
Lists.newArrayList();
if (table.getNumClusteringCols() > 0) {
// Set of all partition names targeted by the insert that need to be created
// in the Metastore (partitions that do not currently exist in the catalog).
// In the BE, we don't currently distinguish between which targeted partitions
// are new and which already exist, so initialize the set with all targeted
// partition names and remove the ones that are found to exist.
HashSet<String> partsToCreate =
Sets.newHashSet(update.getCreated_partitions());
partsToLoadMetadata = Sets.newHashSet(partsToCreate);
for (FeFsPartition partition: parts) {
// TODO: In the BE we build partition names without a trailing char. In FE
// we build partition name with a trailing char. We should make this
// consistent.
String partName = partition.getPartitionName() + "/";
// Attempt to remove this partition name from partsToCreate. If remove
// returns true, it indicates the partition already exists.
if (partsToCreate.remove(partName)) {
affectedExistingPartitions.add(partition);
// For existing partitions, we need to unset column_stats_accurate to
// tell hive the statistics is not accurate any longer.
if (partition.getParameters() != null && partition.getParameters()
.containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE)) {
org.apache.hadoop.hive.metastore.api.Partition hmsPartition =
((HdfsPartition) partition).toHmsPartition();
hmsPartition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
hmsPartitionsStatsUnset.add(hmsPartition);
}
if (partition.isMarkedCached()) {
// The partition was targeted by the insert and is also cached. Since
// data was written to the partition, a watch needs to be placed on the
// cache directive so the TableLoadingMgr can perform an async
// refresh once all data becomes cached.
cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId(
partition.getParameters()));
}
}
if (partsToCreate.size() == 0) break;
}
if (!partsToCreate.isEmpty()) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
org.apache.hadoop.hive.metastore.api.Table msTbl =
table.getMetaStoreTable().deepCopy();
List<org.apache.hadoop.hive.metastore.api.Partition> hmsParts =
Lists.newArrayList();
HiveConf hiveConf = new HiveConf(this.getClass());
Warehouse warehouse = new Warehouse(hiveConf);
for (String partName: partsToCreate) {
org.apache.hadoop.hive.metastore.api.Partition partition =
new org.apache.hadoop.hive.metastore.api.Partition();
hmsParts.add(partition);
partition.setDbName(tblName.getDb());
partition.setTableName(tblName.getTbl());
partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl, partName));
partition.setParameters(new HashMap<String, String>());
partition.setSd(msTbl.getSd().deepCopy());
partition.getSd().setSerdeInfo(msTbl.getSd().getSerdeInfo().deepCopy());
partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" +
partName.substring(0, partName.length() - 1));
MetastoreShim.updatePartitionStatsFast(partition, msTbl, warehouse);
}
// First add_partitions and then alter_partitions the successful ones with
// caching directives. The reason is that some partitions could have been
// added concurrently, and we want to avoid caching a partition twice and
// leaking a caching directive.
List<org.apache.hadoop.hive.metastore.api.Partition> addedHmsParts =
msClient.getHiveClient().add_partitions(hmsParts, true, true);
if (addedHmsParts.size() > 0) {
if (cachePoolName != null) {
List<org.apache.hadoop.hive.metastore.api.Partition> cachedHmsParts =
Lists.newArrayList();
// Submit a new cache directive and update the partition metadata with
// the directive id.
for (org.apache.hadoop.hive.metastore.api.Partition part: addedHmsParts) {
try {
cacheDirIds.add(HdfsCachingUtil.submitCachePartitionDirective(
part, cachePoolName, cacheReplication));
StatsSetupConst.setBasicStatsState(part.getParameters(), "false");
cachedHmsParts.add(part);
} catch (ImpalaRuntimeException e) {
String msg = String.format("Partition %s.%s(%s): State: Not " +
"cached. Action: Cache manully via 'ALTER TABLE'.",
part.getDbName(), part.getTableName(), part.getValues());
LOG.error(msg, e);
errorMessages.add(msg);
}
}
try {
MetastoreShim.alterPartitions(msClient.getHiveClient(), tblName.getDb(),
tblName.getTbl(), cachedHmsParts);
} catch (Exception e) {
LOG.error("Failed in alter_partitions: ", e);
// Try to uncache the partitions when the alteration in the HMS
// failed.
for (org.apache.hadoop.hive.metastore.api.Partition part:
cachedHmsParts) {
try {
HdfsCachingUtil.removePartitionCacheDirective(part);
} catch (ImpalaException e1) {
String msg = String.format(
"Partition %s.%s(%s): State: Leaked caching directive. " +
"Action: Manually uncache directory %s via hdfs " +
"cacheAdmin.", part.getDbName(), part.getTableName(),
part.getValues(), part.getSd().getLocation());
LOG.error(msg, e);
errorMessages.add(msg);
}
}
}
}
}
} catch (AlreadyExistsException e) {
throw new InternalException(
"AlreadyExistsException thrown although ifNotExists given", e);
} catch (Exception e) {
throw new InternalException("Error adding partitions", e);
}
}
// Unset COLUMN_STATS_ACCURATE by calling alter partition to hms.
if (!hmsPartitionsStatsUnset.isEmpty()) {
unsetPartitionsColStats(table.getMetaStoreTable(), hmsPartitionsStatsUnset,
tblTxn);
}
} else {
// For non-partitioned table, only single part exists
FeFsPartition singlePart = Iterables.getOnlyElement((List<FeFsPartition>) parts);
affectedExistingPartitions.add(singlePart);
}
unsetTableColStats(table.getMetaStoreTable(), tblTxn);
// Submit the watch request for the given cache directives.
if (!cacheDirIds.isEmpty()) {
catalog_.watchCacheDirs(cacheDirIds, tblName.toThrift(),
"INSERT into cached partitions");
}
response.setResult(new TCatalogUpdateResult());
response.getResult().setCatalog_service_id(JniCatalog.getServiceId());
if (errorMessages.size() > 0) {
errorMessages.add("Please refer to the catalogd error log for details " +
"regarding the failed un/caching operations.");
response.getResult().setStatus(
new TStatus(TErrorCode.INTERNAL_ERROR, errorMessages));
} else {
response.getResult().setStatus(
new TStatus(TErrorCode.OK, new ArrayList<String>()));
}
// Commit transactional inserts on success. We don't abort the transaction
// here in case of failures, because the client, i.e. query coordinator, is
// always responsible for aborting transactions when queries hit errors.
if (update.isSetTransaction_id()) {
if (response.getResult().getStatus().getStatus_code() == TErrorCode.OK) {
commitTransaction(update.getTransaction_id());
}
}
loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata,
"INSERT");
// After loading metadata, fire insert events if external event processing is
// enabled.
createInsertEvents(table, affectedExistingPartitions, update.is_overwrite);
addTableToCatalogUpdate(table, response.result);
} finally {
context.stop();
UnlockWriteLockIfErronouslyLocked();
table.getLock().unlock();
}
if (update.isSync_ddl()) {
response.getResult().setVersion(
catalog_.waitForSyncDdlVersion(response.getResult()));
}
return response;
}
/**
* Populates insert event data and calls fireInsertEvent() if external event processing
* is enabled. This is no-op if event processing is disabled or there are no existing
* partitions affected by this insert.
*
* @param affectedExistingPartitions List of existing partitions touched by the insert.
* @param isInsertOverwrite indicates if the operation was an insert overwrite. If it
* is not, all the new files added by this insert is calculated.
*/
private void createInsertEvents(Table table,
List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) {
if (!catalog_.isExternalEventProcessingEnabled() ||
affectedExistingPartitions.size() == 0) return;
// Map of partition names to file names of all existing partitions touched by the
// insert.
Map<String, Set<String>> partitionFilesMapBeforeInsert = new HashMap<>();
if (!isInsertOverwrite) {
partitionFilesMapBeforeInsert =
getPartitionNameFilesMap(affectedExistingPartitions);
}
// If table is partitioned, we add all existing partitions touched by this insert
// to the insert event.
Collection<? extends FeFsPartition> partsPostInsert;
partsPostInsert =
((HdfsTable) table).getPartitionsForNames(
partitionFilesMapBeforeInsert.keySet());
// If it is not an insert overwrite operation, we find new files added by this insert.
Map<String, Set<String>> partitionFilesMapPostInsert = new HashMap<>();
if (!isInsertOverwrite) {
partitionFilesMapPostInsert =
getPartitionNameFilesMap(partsPostInsert);
}
for (FeFsPartition part : partsPostInsert) {
// Find the delta of the files added by the insert if it is not an overwrite
// operation. HMS fireListenerEvent() expects an empty list if no new files are
// added or if the operation is an insert overwrite.
Set<String> deltaFiles = new HashSet<>();
List<String> partVals = null;
if (!isInsertOverwrite) {
String partitionName = part.getPartitionName() + "/";
Set<String> filesPostInsert =
partitionFilesMapPostInsert.get(partitionName);
if (table.getNumClusteringCols() > 0) {
Set<String> filesBeforeInsert =
partitionFilesMapBeforeInsert.get(partitionName);
deltaFiles = Sets.difference(filesBeforeInsert, filesPostInsert);
partVals = part.getPartitionValuesAsStrings(true);
} else {
Map.Entry<String, Set<String>> entry =
partitionFilesMapBeforeInsert.entrySet().iterator().next();
deltaFiles = Sets.difference(entry.getValue(), filesPostInsert);
}
LOG.info("{} new files detected for table {} partition {}.",
filesPostInsert.size(), table.getTableName(), part.getPartitionName());
}
if (deltaFiles != null || isInsertOverwrite) {
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
MetaStoreUtil
.fireInsertEvent(metaStoreClient.getHiveClient(), table.getDb().getName(),
table.getName(), partVals, deltaFiles, isInsertOverwrite);
} catch (Exception e) {
LOG.error("Failed to fire insert event. Some tables might not be"
+ " refreshed on other impala clusters.", e);
}
}
else {
LOG.info("No new files were created, and is not a replace. Skipping "
+ "generating INSERT event.");
}
}
}
/**
* Util method to return a map of partition names to list of files for that partition.
*/
private static Map<String, Set<String>> getPartitionNameFilesMap(Collection<?
extends FeFsPartition> partitions) {
return partitions.stream().collect(
Collectors.toMap(p -> p.getPartitionName() + "/",
p -> ((HdfsPartition) p).getFileNames()));
}
/**
* Returns an existing, loaded table from the Catalog. Throws an exception if any
* of the following are true:
* - The table does not exist
* - There was an error loading the table metadata.
* - The table is missing (not yet loaded).
* This is to help protect against certain scenarios where the table was
* modified or dropped between the time analysis completed and the the catalog op
* started executing. However, even with these checks it is possible the table was
* modified or dropped/re-created without us knowing. This function also updates the
* table usage counter.
*
* TODO: Track object IDs to
* know when a table has been dropped and re-created with the same name.
*/
private Table getExistingTable(String dbName, String tblName, String reason)
throws CatalogException {
Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason);
if (tbl == null) {
throw new TableNotFoundException("Table not found: " + dbName + "." + tblName);
}
tbl.incrementMetadataOpsCount();
if (!tbl.isLoaded()) {
throw new CatalogException(String.format("Table '%s.%s' was modified while " +
"operation was in progress, aborting execution.", dbName, tblName));
}
if (tbl instanceof IncompleteTable && tbl.isLoaded()) {
// The table loading failed. Throw an exception.
ImpalaException e = ((IncompleteTable) tbl).getCause();
if (e instanceof TableLoadingException) {
throw (TableLoadingException) e;
}
throw new TableLoadingException(e.getMessage(), e);
}
Preconditions.checkNotNull(tbl);
Preconditions.checkState(tbl.isLoaded());
return tbl;
}
private void alterCommentOn(TCommentOnParams params, TDdlExecResponse response)
throws ImpalaRuntimeException, CatalogException, InternalException {
if (params.getDb() != null) {
Preconditions.checkArgument(!params.isSetTable_name() &&
!params.isSetColumn_name());
alterCommentOnDb(params.getDb(), params.getComment(), response);
} else if (params.getTable_name() != null) {
Preconditions.checkArgument(!params.isSetDb() && !params.isSetColumn_name());
alterCommentOnTableOrView(TableName.fromThrift(params.getTable_name()),
params.getComment(), response);
} else if (params.getColumn_name() != null) {
Preconditions.checkArgument(!params.isSetDb() && !params.isSetTable_name());
TColumnName columnName = params.getColumn_name();
alterCommentOnColumn(TableName.fromThrift(columnName.getTable_name()),
columnName.getColumn_name(), params.getComment(), response);
} else {
throw new UnsupportedOperationException("Unsupported COMMENT ON operation");
}
}
private void alterCommentOnDb(String dbName, String comment, TDdlExecResponse response)
throws ImpalaRuntimeException, CatalogException {
Db db = catalog_.getDb(dbName);
if (db == null) {
throw new CatalogException("Database: " + dbName + " does not exist.");
}
synchronized (metastoreDdlLock_) {
Database msDb = db.getMetaStoreDb().deepCopy();
msDb.setDescription(comment);
try {
applyAlterDatabase(msDb);
} catch (ImpalaRuntimeException e) {
throw e;
}
Db updatedDb = catalog_.updateDb(msDb);
addDbToCatalogUpdate(updatedDb, response.result);
}
addSummary(response, "Updated database.");
}
private void alterDatabase(TAlterDbParams params, TDdlExecResponse response)
throws ImpalaException {
Preconditions.checkNotNull(params);
String dbName = params.getDb();
Db db = catalog_.getDb(dbName);
if (db == null) {
throw new CatalogException("Database: " + dbName + " does not exist.");
}
switch (params.getAlter_type()) {
case SET_OWNER:
alterDatabaseSetOwner(db, params.getSet_owner_params(), response);
break;
default:
throw new UnsupportedOperationException(
"Unknown ALTER DATABASE operation type: " + params.getAlter_type());
}
}
private void alterDatabaseSetOwner(Db db, TAlterDbSetOwnerParams params,
TDdlExecResponse response) throws ImpalaException {
Preconditions.checkNotNull(params.owner_name);
Preconditions.checkNotNull(params.owner_type);
synchronized (metastoreDdlLock_) {
// Get a new catalog version to assign to the database being altered.
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
addCatalogServiceIdentifiers(db, catalog_.getCatalogServiceId(), newCatalogVersion);
Database msDb = db.getMetaStoreDb().deepCopy();
String originalOwnerName = msDb.getOwnerName();
PrincipalType originalOwnerType = msDb.getOwnerType();
msDb.setOwnerName(params.owner_name);
msDb.setOwnerType(PrincipalType.valueOf(params.owner_type.name()));
try {
applyAlterDatabase(msDb);
} catch (ImpalaRuntimeException e) {
throw e;
}
if (authzConfig_.isEnabled()) {
authzManager_.updateDatabaseOwnerPrivilege(params.server_name, db.getName(),
originalOwnerName, originalOwnerType, msDb.getOwnerName(),
msDb.getOwnerType(), response);
}
Db updatedDb = catalog_.updateDb(msDb);
addDbToCatalogUpdate(updatedDb, response.result);
// now that HMS alter operation has succeeded, add this version to list of inflight
// events in catalog database if event processing is enabled
catalog_.addVersionsForInflightEvents(db, newCatalogVersion);
}
addSummary(response, "Updated database.");
}
/**
* Adds the catalog service id and the given catalog version to the database parameters.
* No-op if event processing is disabled
*/
private void addCatalogServiceIdentifiers(
Db db, String catalogServiceId, long newCatalogVersion) {
if (!catalog_.isExternalEventProcessingEnabled()) return;
org.apache.hadoop.hive.metastore.api.Database msDb = db.getMetaStoreDb();
msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalogServiceId);
msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(),
String.valueOf(newCatalogVersion));
}
private void addDbToCatalogUpdate(Db db, TCatalogUpdateResult result) {
Preconditions.checkNotNull(db);
TCatalogObject updatedCatalogObject = db.toTCatalogObject();
updatedCatalogObject.setCatalog_version(updatedCatalogObject.getCatalog_version());
// TODO(todd): if client is a 'v2' impalad, only send back invalidation
result.addToUpdated_catalog_objects(updatedCatalogObject);
result.setVersion(updatedCatalogObject.getCatalog_version());
}
private void alterCommentOnTableOrView(TableName tableName, String comment,
TDdlExecResponse response) throws CatalogException, InternalException,
ImpalaRuntimeException {
Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
"Load for ALTER COMMENT");
tryLock(tbl);
try {
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
catalog_.getLock().writeLock().unlock();
addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(),
newCatalogVersion);
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
boolean isView = msTbl.getTableType().equalsIgnoreCase(
TableType.VIRTUAL_VIEW.toString());
if (comment == null) {
msTbl.getParameters().remove("comment");
} else {
msTbl.getParameters().put("comment", comment);
}
applyAlterTable(msTbl);
loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER COMMENT");
addTableToCatalogUpdate(tbl, response.result);
addSummary(response, String.format("Updated %s.", (isView) ? "view" : "table"));
} finally {
tbl.getLock().unlock();
}
}
private void alterCommentOnColumn(TableName tableName, String columnName,
String comment, TDdlExecResponse response) throws CatalogException,
InternalException, ImpalaRuntimeException {
Table tbl = getExistingTable(tableName.getDb(), tableName.getTbl(),
"Load for ALTER COLUMN COMMENT");
tryLock(tbl);
try {
long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
catalog_.getLock().writeLock().unlock();
if (tbl instanceof KuduTable) {
TColumn new_col = new TColumn(columnName,
tbl.getColumn(columnName).getType().toThrift());
new_col.setComment(comment != null ? comment : "");
KuduCatalogOpExecutor.alterColumn((KuduTable) tbl, columnName, new_col);
} else {
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable().deepCopy();
if (!updateColumnComment(msTbl.getSd().getColsIterator(), columnName, comment)) {
if (!updateColumnComment(msTbl.getPartitionKeysIterator(), columnName,
comment)) {
throw new ColumnNotFoundException(String.format(
"Column name %s not found in table %s.", columnName, tbl.getFullName()));
}
}
applyAlterTable(msTbl);
}
loadTableMetadata(tbl, newCatalogVersion, false, true, null,
"ALTER COLUMN COMMENT");
addTableToCatalogUpdate(tbl, response.result);
addSummary(response, "Column has been altered.");
} finally {
tbl.getLock().unlock();
}
}
/**
* Find the matching column name in the iterator and update its comment. Return
* true if found; false otherwise.
*/
private static boolean updateColumnComment(Iterator<FieldSchema> iterator,
String columnName, String comment) {
while (iterator.hasNext()) {
FieldSchema fs = iterator.next();
if (fs.getName().equalsIgnoreCase(columnName)) {
fs.setComment(comment);
return true;
}
}
return false;
}
/**
* Try to lock a table in the catalog. Throw an InternalException if the catalog is
* unable to lock the given table.
*/
private void tryLock(Table tbl) throws InternalException {
tryLock(tbl, "altering");
}
/**
* Try to lock a table in the catalog for a given operation. Throw an InternalException
* if the catalog is unable to lock the given table.
*/
private void tryLock(Table tbl, String operation) throws InternalException {
String type = tbl instanceof View ? "view" : "table";
if (!catalog_.tryLockTable(tbl)) {
throw new InternalException(String.format("Error %s (for) %s %s due to " +
"lock contention.", operation, type, tbl.getFullName()));
}
}
/**
* Commits ACID transaction with given transaction id.
* @param transactionId is the id of the transaction.
* @throws TransactionException
*/
private void commitTransaction(long transactionId) throws TransactionException {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
MetastoreShim.commitTransaction(msClient.getHiveClient(), transactionId);
LOG.info("Committed transaction: " + Long.toString(transactionId));
}
}
/**
* Update table properties to remove the COLUMN_STATS_ACCURATE entry if it exists.
*/
private void unsetTableColStats(org.apache.hadoop.hive.metastore.api.Table msTable,
TblTransaction tblTxn) throws ImpalaRuntimeException{
Map<String, String> params = msTable.getParameters();
if (params != null && params.containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE)) {
params.remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
applyAlterTable(msTable, false, tblTxn);
}
}
/**
* Update partitions properties to remove the COLUMN_STATS_ACCURATE entry from HMS.
* This method assumes the partitions in the input hmsPartitionsStatsUnset already
* had the COLUMN_STATS_ACCURATE removed from their properties.
*/
private void unsetPartitionsColStats(org.apache.hadoop.hive.metastore.api.Table msTable,
List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitionsStatsUnset,
TblTransaction tblTxn) throws ImpalaRuntimeException{
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
try {
if (tblTxn != null) {
MetastoreShim.alterPartitionsWithTransaction(
msClient.getHiveClient(), msTable.getDbName(), msTable.getTableName(),
hmsPartitionsStatsUnset, tblTxn);
} else {
MetastoreShim.alterPartitions(msClient.getHiveClient(), msTable.getDbName(),
msTable.getTableName(), hmsPartitionsStatsUnset);
}
} catch (TException te) {
new ImpalaRuntimeException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_partitions"), te);
}
}
}
}