| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.service; |
| |
| import static org.apache.impala.common.ByteUnits.MEGABYTE; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Predicates; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.google.common.math.IntMath; |
| import com.google.common.math.LongMath; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.api.DataOperationType; |
| import org.apache.hadoop.hive.metastore.api.LockComponent; |
| import org.apache.hadoop.hive.metastore.api.LockLevel; |
| import org.apache.hadoop.hive.metastore.api.LockType; |
| import org.apache.hadoop.hive.metastore.api.SQLForeignKey; |
| import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; |
| import org.apache.iceberg.HistoryEntry; |
| import org.apache.iceberg.Snapshot; |
| import org.apache.iceberg.Table; |
| import org.apache.impala.analysis.AlterDbStmt; |
| import org.apache.impala.analysis.AnalysisContext; |
| import org.apache.impala.analysis.AnalysisContext.AnalysisResult; |
| import org.apache.impala.analysis.CommentOnStmt; |
| import org.apache.impala.analysis.CopyTestCaseStmt; |
| import org.apache.impala.analysis.CreateDataSrcStmt; |
| import org.apache.impala.analysis.CreateDropRoleStmt; |
| import org.apache.impala.analysis.CreateUdaStmt; |
| import org.apache.impala.analysis.CreateUdfStmt; |
| import org.apache.impala.analysis.DescribeTableStmt; |
| import org.apache.impala.analysis.DescriptorTable; |
| import org.apache.impala.analysis.DmlStatementBase; |
| import org.apache.impala.analysis.DropDataSrcStmt; |
| import org.apache.impala.analysis.DropFunctionStmt; |
| import org.apache.impala.analysis.DropStatsStmt; |
| import org.apache.impala.analysis.DropTableOrViewStmt; |
| import org.apache.impala.analysis.GrantRevokePrivStmt; |
| import org.apache.impala.analysis.GrantRevokeRoleStmt; |
| import org.apache.impala.analysis.InsertStmt; |
| import org.apache.impala.analysis.Parser; |
| import org.apache.impala.analysis.QueryStmt; |
| import org.apache.impala.analysis.ResetMetadataStmt; |
| import org.apache.impala.analysis.ShowFunctionsStmt; |
| import org.apache.impala.analysis.ShowGrantPrincipalStmt; |
| import org.apache.impala.analysis.ShowRolesStmt; |
| import org.apache.impala.analysis.StatementBase; |
| import org.apache.impala.analysis.StmtMetadataLoader; |
| import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache; |
| import org.apache.impala.analysis.TableName; |
| import org.apache.impala.analysis.TruncateStmt; |
| import org.apache.impala.authentication.saml.ImpalaSamlClient; |
| import org.apache.impala.authorization.AuthorizationChecker; |
| import org.apache.impala.authorization.AuthorizationConfig; |
| import org.apache.impala.authorization.AuthorizationFactory; |
| import org.apache.impala.authorization.AuthorizationManager; |
| import org.apache.impala.authorization.ImpalaInternalAdminUser; |
| import org.apache.impala.authorization.Privilege; |
| import org.apache.impala.authorization.PrivilegeRequest; |
| import org.apache.impala.authorization.PrivilegeRequestBuilder; |
| import org.apache.impala.authorization.User; |
| import org.apache.impala.catalog.Catalog; |
| import org.apache.impala.catalog.Column; |
| import org.apache.impala.catalog.DatabaseNotFoundException; |
| import org.apache.impala.catalog.FeCatalog; |
| import org.apache.impala.catalog.FeCatalogUtils; |
| import org.apache.impala.catalog.FeDataSource; |
| import org.apache.impala.catalog.FeDataSourceTable; |
| import org.apache.impala.catalog.FeDb; |
| import org.apache.impala.catalog.FeFsPartition; |
| import org.apache.impala.catalog.FeFsTable; |
| import org.apache.impala.catalog.FeHBaseTable; |
| import org.apache.impala.catalog.FeIcebergTable; |
| import org.apache.impala.catalog.FeKuduTable; |
| import org.apache.impala.catalog.FeTable; |
| import org.apache.impala.catalog.Function; |
| import org.apache.impala.catalog.IcebergPositionDeleteTable; |
| import org.apache.impala.catalog.ImpaladCatalog; |
| import org.apache.impala.catalog.ImpaladTableUsageTracker; |
| import org.apache.impala.catalog.MaterializedViewHdfsTable; |
| import org.apache.impala.catalog.MetaStoreClientPool; |
| import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; |
| import org.apache.impala.catalog.StructType; |
| import org.apache.impala.catalog.TableLoadingException; |
| import org.apache.impala.catalog.Type; |
| import org.apache.impala.catalog.local.InconsistentMetadataFetchException; |
| import org.apache.impala.catalog.iceberg.IcebergMetadataTable; |
| import org.apache.impala.common.AnalysisException; |
| import org.apache.impala.common.FileSystemUtil; |
| import org.apache.impala.common.ImpalaException; |
| import org.apache.impala.common.ImpalaRuntimeException; |
| import org.apache.impala.common.InternalException; |
| import org.apache.impala.common.KuduTransactionManager; |
| import org.apache.impala.common.NotImplementedException; |
| import org.apache.impala.common.PrintUtils; |
| import org.apache.impala.common.RuntimeEnv; |
| import org.apache.impala.common.TransactionException; |
| import org.apache.impala.common.TransactionKeepalive; |
| import org.apache.impala.common.TransactionKeepalive.HeartbeatContext; |
| import org.apache.impala.compat.MetastoreShim; |
| import org.apache.impala.hooks.QueryCompleteContext; |
| import org.apache.impala.hooks.QueryEventHook; |
| import org.apache.impala.hooks.QueryEventHookManager; |
| import org.apache.impala.planner.HdfsScanNode; |
| import org.apache.impala.planner.PlanFragment; |
| import org.apache.impala.planner.Planner; |
| import org.apache.impala.planner.ScanNode; |
| import org.apache.impala.thrift.TAlterDbParams; |
| import org.apache.impala.thrift.TBackendGflags; |
| import org.apache.impala.thrift.TCatalogOpRequest; |
| import org.apache.impala.thrift.TCatalogOpType; |
| import org.apache.impala.thrift.TCatalogServiceRequestHeader; |
| import org.apache.impala.thrift.TClientRequest; |
| import org.apache.impala.thrift.TColumn; |
| import org.apache.impala.thrift.TColumnValue; |
| import org.apache.impala.thrift.TCommentOnParams; |
| import org.apache.impala.thrift.TCopyTestCaseReq; |
| import org.apache.impala.thrift.TCounter; |
| import org.apache.impala.thrift.TCreateDropRoleParams; |
| import org.apache.impala.thrift.TDdlExecRequest; |
| import org.apache.impala.thrift.TDdlQueryOptions; |
| import org.apache.impala.thrift.TDdlType; |
| import org.apache.impala.thrift.TDescribeHistoryParams; |
| import org.apache.impala.thrift.TDescribeOutputStyle; |
| import org.apache.impala.thrift.TDescribeResult; |
| import org.apache.impala.thrift.TImpalaTableType; |
| import org.apache.impala.thrift.TDescribeTableParams; |
| import org.apache.impala.thrift.TIcebergDmlFinalizeParams; |
| import org.apache.impala.thrift.TIcebergOperation; |
| import org.apache.impala.thrift.TExecRequest; |
| import org.apache.impala.thrift.TExecutorGroupSet; |
| import org.apache.impala.thrift.TExplainResult; |
| import org.apache.impala.thrift.TFinalizeParams; |
| import org.apache.impala.thrift.TFunctionCategory; |
| import org.apache.impala.thrift.TGetCatalogMetricsResult; |
| import org.apache.impala.thrift.TGetTableHistoryResult; |
| import org.apache.impala.thrift.TGetTableHistoryResultItem; |
| import org.apache.impala.thrift.TGrantRevokePrivParams; |
| import org.apache.impala.thrift.TGrantRevokeRoleParams; |
| import org.apache.impala.thrift.TImpalaQueryOptions; |
| import org.apache.impala.thrift.TLineageGraph; |
| import org.apache.impala.thrift.TLoadDataReq; |
| import org.apache.impala.thrift.TLoadDataResp; |
| import org.apache.impala.thrift.TMetadataOpRequest; |
| import org.apache.impala.thrift.TConvertTableRequest; |
| import org.apache.impala.thrift.TPlanExecInfo; |
| import org.apache.impala.thrift.TPlanFragment; |
| import org.apache.impala.thrift.TPoolConfig; |
| import org.apache.impala.thrift.TQueryCtx; |
| import org.apache.impala.thrift.TQueryExecRequest; |
| import org.apache.impala.thrift.TQueryOptions; |
| import org.apache.impala.thrift.TResetMetadataRequest; |
| import org.apache.impala.thrift.TResultRow; |
| import org.apache.impala.thrift.TResultSet; |
| import org.apache.impala.thrift.TResultSetMetadata; |
| import org.apache.impala.thrift.TRuntimeProfileNode; |
| import org.apache.impala.thrift.TShowFilesParams; |
| import org.apache.impala.thrift.TShowStatsOp; |
| import org.apache.impala.thrift.TStmtType; |
| import org.apache.impala.thrift.TTableName; |
| import org.apache.impala.thrift.TTruncateParams; |
| import org.apache.impala.thrift.TUniqueId; |
| import org.apache.impala.thrift.TUnit; |
| import org.apache.impala.thrift.TUpdateCatalogCacheRequest; |
| import org.apache.impala.thrift.TUpdateCatalogCacheResponse; |
| import org.apache.impala.util.AcidUtils; |
| import org.apache.impala.util.DebugUtils; |
| import org.apache.impala.util.EventSequence; |
| import org.apache.impala.util.ExecutorMembershipSnapshot; |
| import org.apache.impala.util.IcebergUtil; |
| import org.apache.impala.util.KuduUtil; |
| import org.apache.impala.util.MigrateTableUtil; |
| import org.apache.impala.util.PatternMatcher; |
| import org.apache.impala.util.RequestPoolService; |
| import org.apache.impala.util.TResultRowBuilder; |
| import org.apache.impala.util.TSessionStateUtil; |
| import org.apache.kudu.client.KuduClient; |
| import org.apache.kudu.client.KuduException; |
| import org.apache.kudu.client.KuduTransaction; |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Frontend API for the impalad process. |
| * This class allows the impala daemon to create TQueryExecRequest |
| * in response to TClientRequests. Also handles management of the authorization |
| * policy and query execution hooks. |
| */ |
| public class Frontend { |
| private final static Logger LOG = LoggerFactory.getLogger(Frontend.class); |
| |
| // Max time to wait for a catalog update notification. |
| public static final long MAX_CATALOG_UPDATE_WAIT_TIME_MS = 2 * 1000; |
| |
| // TODO: Make the reload interval configurable. |
| private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60; |
| |
| // Maximum number of times to retry a query if it fails due to inconsistent metadata. |
| private static final int INCONSISTENT_METADATA_NUM_RETRIES = |
| (BackendConfig.INSTANCE != null) ? |
| BackendConfig.INSTANCE.getLocalCatalogMaxFetchRetries() : 0; |
| |
| // Maximum number of threads used to check authorization for the user when executing |
| // show tables/databases. |
| private static final int MAX_CHECK_AUTHORIZATION_POOL_SIZE = 128; |
| |
| // The default pool name to use when ExecutorMembershipSnapshot is in initial state |
| // (i.e., ExecutorMembershipSnapshot.numExecutors_ == 0). |
| private static final String DEFAULT_POOL_NAME = "default-pool"; |
| |
| // Labels for various query profile counters. |
| private static final String EXECUTOR_GROUPS_CONSIDERED = "ExecutorGroupsConsidered"; |
| private static final String CPU_COUNT_DIVISOR = "CpuCountDivisor"; |
| private static final String EFFECTIVE_PARALLELISM = "EffectiveParallelism"; |
| private static final String VERDICT = "Verdict"; |
| private static final String MEMORY_MAX = "MemoryMax"; |
| private static final String MEMORY_ASK = "MemoryAsk"; |
| private static final String CPU_MAX = "CpuMax"; |
| private static final String CPU_ASK = "CpuAsk"; |
| |
| /** |
| * Plan-time context that allows capturing various artifacts created |
| * during the process. |
| * |
| * The context gathers an optional describe string for display to the |
| * user, and the optional plan fragment nodes for use in unit tests. |
| */ |
| public static class PlanCtx { |
| // The query context. |
| protected final TQueryCtx queryCtx_; |
| // The explain string built from the query plan. |
| protected final StringBuilder explainBuf_; |
| // Flag to indicate whether to capture (return) the plan. |
| protected boolean capturePlan_; |
| // Flag to control whether the descriptor table is serialized. This defaults to |
| // true, but some frontend tests set it to false because they are operating on |
| // incomplete structures (e.g. THdfsTable without nullPartitionKeyValue) that cannot |
| // be serialized. |
| protected boolean serializeDescTbl_ = true; |
| |
| // The physical plan, divided by fragment, before conversion to |
| // Thrift. For unit testing. |
| protected List<PlanFragment> plan_; |
| |
| // An inner class to capture the state of compilation for auto-scaling. |
| final class AutoScalingCompilationState { |
| // Flag to indicate whether to disable authorization after analyze. Used by |
| // auto-scalling to avoid redundent authorizations. |
| protected boolean disableAuthorization_ = false; |
| |
| // The estimated memory per host for certain queries that do not populate |
| // TExecRequest.query_exec_request field. |
| protected long estimated_memory_per_host_ = -1; |
| |
| // The processing cores required to execute the query. |
| // Certain queries such as EXPLAIN do not populate TExecRequest.query_exec_request. |
| // Therefore, cores requirement will be set here through setCoresRequired(). |
| protected int cores_required_ = -1; |
| |
| // The initial length of content in explain buffer to help return the buffer |
| // to the initial position prior to another auto-scaling compilation. |
| protected int initialExplainBufLen_ = -1; |
| |
| // The initial query options seen before any compilations, which will be copied |
| // and used by each iteration of auto-scaling compilation. |
| protected TQueryOptions initialQueryOptions_ = null; |
| |
| // The allocated write Id when in an ACID transaction. |
| protected long writeId_ = -1; |
| |
| // The transaction token when in a Kudu transaction. |
| protected byte[] kuduTransactionToken_ = null; |
| |
| // Indicate whether runtime profile/summary can be accessed. Set at the end of |
| // 1st iteration. |
| protected boolean user_has_profile_access_ = false; |
| |
| // The group set being applied in current compilation. |
| protected TExecutorGroupSet group_set_ = null; |
| |
| // Available cores per executor node. |
| // Valid value must be >= 0. Set by Frontend.getTExecRequest(). |
| protected int availableCoresPerNode_ = -1; |
| |
| public boolean disableAuthorization() { return disableAuthorization_; } |
| |
| public long getEstimatedMemoryPerHost() { return estimated_memory_per_host_; } |
| public void setEstimatedMemoryPerHost(long x) { estimated_memory_per_host_ = x; } |
| |
| public int getCoresRequired() { return cores_required_; } |
| public void setCoresRequired(int x) { cores_required_ = x; } |
| |
| public int getAvailableCoresPerNode() { |
| Preconditions.checkState(availableCoresPerNode_ >= 0); |
| return availableCoresPerNode_; |
| } |
| public void setAvailableCoresPerNode(int x) { availableCoresPerNode_ = x; } |
| |
| // Capture the current state and initialize before iterative compilations begin. |
| public void captureState() { |
| disableAuthorization_ = false; |
| estimated_memory_per_host_ = -1; |
| initialExplainBufLen_ = PlanCtx.this.explainBuf_.length(); |
| initialQueryOptions_ = |
| new TQueryOptions(getQueryContext().client_request.getQuery_options()); |
| writeId_ = -1; |
| kuduTransactionToken_ = null; |
| } |
| |
| // Restore to the captured state after an iterative compilation |
| public void restoreState() { |
| // Avoid authorization starting from 2nd iteration. This flag can be set to |
| // false when meta-data change is detected. |
| disableAuthorization_ = true; |
| |
| // Reset estimated memory |
| estimated_memory_per_host_ = -1; |
| |
| // Remove the explain string accumulated |
| explainBuf_.delete(initialExplainBufLen_, explainBuf_.length()); |
| |
| // Use a brand new copy of query options |
| getQueryContext().client_request.setQuery_options( |
| new TQueryOptions(initialQueryOptions_)); |
| |
| // Set the flag to false to avoid serializing TQueryCtx.desc_tbl_testonly (a list |
| // of Descriptors.TDescriptorTable) in next iteration of compilation. |
| // TQueryCtx.desc_tbl_testonly is set in current iteration during planner test. |
| queryCtx_.setDesc_tbl_testonlyIsSet(false); |
| } |
| |
| // When exception InconsistentMetadataFetchException is received and before |
| // next compilation, disable stmt cache and re-authorize. |
| public void disableStmtCacheAndReauthorize() { |
| restoreState(); |
| disableAuthorization_ = false; |
| } |
| |
| long getWriteId() { return writeId_; } |
| void setWriteId(long x) { writeId_ = x; } |
| |
| byte[] getKuduTransactionToken() { return kuduTransactionToken_; } |
| void setKuduTransactionToken(byte[] token) { |
| kuduTransactionToken_ = (token == null) ? null : token.clone(); |
| } |
| |
| boolean userHasProfileAccess() { return user_has_profile_access_; } |
| void setUserHasProfileAccess(boolean x) { user_has_profile_access_ = x; } |
| |
| TExecutorGroupSet getGroupSet() { return group_set_; } |
| void setGroupSet(TExecutorGroupSet x) { group_set_ = x; } |
| } |
| |
| public AutoScalingCompilationState compilationState_; |
| |
| public PlanCtx(TQueryCtx qCtx) { |
| queryCtx_ = qCtx; |
| explainBuf_ = new StringBuilder(); |
| compilationState_ = new AutoScalingCompilationState(); |
| } |
| |
| public PlanCtx(TQueryCtx qCtx, StringBuilder describe) { |
| queryCtx_ = qCtx; |
| explainBuf_ = describe; |
| compilationState_ = new AutoScalingCompilationState(); |
| } |
| |
| /** |
| * Request to capture the plan tree for unit tests. |
| */ |
| public void requestPlanCapture() { capturePlan_ = true; } |
| public boolean planCaptureRequested() { return capturePlan_; } |
| public void disableDescTblSerialization() { serializeDescTbl_ = false; } |
| public boolean serializeDescTbl() { return serializeDescTbl_; } |
| public TQueryCtx getQueryContext() { return queryCtx_; } |
| |
| /** |
| * @return the captured plan tree. Used only for unit tests |
| */ |
| @VisibleForTesting |
| public List<PlanFragment> getPlan() { return plan_; } |
| |
| /** |
| * @return the captured describe string |
| */ |
| public String getExplainString() { return explainBuf_.toString(); } |
| } |
| |
| private final FeCatalogManager catalogManager_; |
| private final AuthorizationFactory authzFactory_; |
| private final AuthorizationManager authzManager_; |
| // Privileges in which the user should have any of them to see a database or table, |
| private final EnumSet<Privilege> minPrivilegeSetForShowStmts_; |
| /** |
| * Authorization checker. Initialized on creation, then it is kept up to date |
| * via calls to updateCatalogCache(). |
| */ |
| private final AtomicReference<AuthorizationChecker> authzChecker_ = |
| new AtomicReference<>(); |
| |
| private final ImpaladTableUsageTracker impaladTableUsageTracker_; |
| |
| private final QueryEventHookManager queryHookManager_; |
| |
| // Stores metastore clients for direct accesses to HMS. |
| private final MetaStoreClientPool metaStoreClientPool_; |
| |
| private final TransactionKeepalive transactionKeepalive_; |
| |
| private static ExecutorService checkAuthorizationPool_; |
| |
| private final ImpalaSamlClient saml2Client_; |
| |
| private final KuduTransactionManager kuduTxnManager_; |
| |
| public Frontend(AuthorizationFactory authzFactory, boolean isBackendTest) |
| throws ImpalaException { |
| this(authzFactory, FeCatalogManager.createFromBackendConfig(), isBackendTest); |
| } |
| |
| /** |
| * Create a frontend with a specific catalog instance which will not allow |
| * updates and will be used for all requests. |
| */ |
| @VisibleForTesting |
| public Frontend(AuthorizationFactory authzFactory, FeCatalog testCatalog) |
| throws ImpalaException { |
| // This signature is only used for frontend tests, so pass false for isBackendTest |
| this(authzFactory, FeCatalogManager.createForTests(testCatalog), false); |
| } |
| |
| private Frontend(AuthorizationFactory authzFactory, FeCatalogManager catalogManager, |
| boolean isBackendTest) throws ImpalaException { |
| catalogManager_ = catalogManager; |
| authzFactory_ = authzFactory; |
| |
| AuthorizationConfig authzConfig = authzFactory.getAuthorizationConfig(); |
| if (authzConfig.isEnabled()) { |
| authzChecker_.set(authzFactory.newAuthorizationChecker( |
| getCatalog().getAuthPolicy())); |
| int numThreads = BackendConfig.INSTANCE.getNumCheckAuthorizationThreads(); |
| Preconditions.checkState(numThreads > 0 |
| && numThreads <= MAX_CHECK_AUTHORIZATION_POOL_SIZE); |
| if (numThreads == 1) { |
| checkAuthorizationPool_ = MoreExecutors.newDirectExecutorService(); |
| } else { |
| LOG.info("Using a thread pool of size {} for authorization", numThreads); |
| checkAuthorizationPool_ = Executors.newFixedThreadPool(numThreads, |
| new ThreadFactoryBuilder() |
| .setNameFormat("AuthorizationCheckerThread-%d").build()); |
| } |
| } else { |
| authzChecker_.set(authzFactory.newAuthorizationChecker()); |
| } |
| catalogManager_.setAuthzChecker(authzChecker_); |
| authzManager_ = authzFactory.newAuthorizationManager(catalogManager_, |
| authzChecker_::get); |
| minPrivilegeSetForShowStmts_ = getMinPrivilegeSetForShowStmts(); |
| impaladTableUsageTracker_ = ImpaladTableUsageTracker.createFromConfig( |
| BackendConfig.INSTANCE); |
| queryHookManager_ = QueryEventHookManager.createFromConfig(BackendConfig.INSTANCE); |
| if (!isBackendTest) { |
| TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg(); |
| metaStoreClientPool_ = new MetaStoreClientPool(1, cfg.initial_hms_cnxn_timeout_s); |
| if (MetastoreShim.getMajorVersion() > 2) { |
| transactionKeepalive_ = new TransactionKeepalive(metaStoreClientPool_); |
| } else { |
| transactionKeepalive_ = null; |
| } |
| } else { |
| metaStoreClientPool_ = null; |
| transactionKeepalive_ = null; |
| } |
| if (!BackendConfig.INSTANCE.getSaml2IdpMetadata().isEmpty()) { |
| saml2Client_ = ImpalaSamlClient.get(); |
| } else { |
| saml2Client_ = null; |
| } |
| kuduTxnManager_ = new KuduTransactionManager(); |
| } |
| |
| /** |
| * Returns the required privilege set for showing a database or table. |
| */ |
| private EnumSet<Privilege> getMinPrivilegeSetForShowStmts() throws InternalException { |
| String configStr = BackendConfig.INSTANCE.getMinPrivilegeSetForShowStmts(); |
| if (Strings.isNullOrEmpty(configStr)) return EnumSet.of(Privilege.ANY); |
| EnumSet<Privilege> privileges = EnumSet.noneOf(Privilege.class); |
| for (String pStr : configStr.toUpperCase().split(",")) { |
| try { |
| privileges.add(Privilege.valueOf(pStr.trim())); |
| } catch (IllegalArgumentException e) { |
| LOG.error("Illegal privilege name '{}'", pStr, e); |
| throw new InternalException("Failed to parse privileges: " + configStr, e); |
| } |
| } |
| return privileges.isEmpty() ? EnumSet.of(Privilege.ANY) : privileges; |
| } |
| |
| public FeCatalog getCatalog() { return catalogManager_.getOrCreateCatalog(); } |
| |
| public AuthorizationFactory getAuthzFactory() { return authzFactory_; } |
| |
| public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); } |
| |
| public AuthorizationManager getAuthzManager() { return authzManager_; } |
| |
| public ImpaladTableUsageTracker getImpaladTableUsageTracker() { |
| return impaladTableUsageTracker_; |
| } |
| |
| public ImpalaSamlClient getSaml2Client() { return saml2Client_; } |
| |
| public TUpdateCatalogCacheResponse updateCatalogCache( |
| TUpdateCatalogCacheRequest req) throws ImpalaException, TException { |
| TUpdateCatalogCacheResponse resp = catalogManager_.updateCatalogCache(req); |
| if (!req.is_delta) { |
| // In the case that it was a non-delta update, the catalog might have reloaded |
| // itself, and we need to reset the AuthorizationChecker accordingly. |
| authzChecker_.set(authzFactory_.newAuthorizationChecker( |
| getCatalog().getAuthPolicy())); |
| } |
| return resp; |
| } |
| |
| /** |
| * Constructs a TCatalogOpRequest and attaches it, plus any metadata, to the |
| * result argument. |
| */ |
| private void createCatalogOpRequest(AnalysisResult analysis, TExecRequest result) |
| throws InternalException { |
| TCatalogOpRequest ddl = new TCatalogOpRequest(); |
| TResultSetMetadata metadata = new TResultSetMetadata(); |
| if (analysis.isUseStmt()) { |
| ddl.op_type = TCatalogOpType.USE; |
| ddl.setUse_db_params(analysis.getUseStmt().toThrift()); |
| metadata.setColumns(Collections.<TColumn>emptyList()); |
| } else if (analysis.isShowTablesStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_TABLES; |
| ddl.setShow_tables_params(analysis.getShowTablesStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("name", Type.STRING.toThrift()))); |
| } else if (analysis.isShowViewsStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_VIEWS; |
| ddl.setShow_tables_params(analysis.getShowViewsStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("name", Type.STRING.toThrift()))); |
| } else if (analysis.isShowDbsStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_DBS; |
| ddl.setShow_dbs_params(analysis.getShowDbsStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("name", Type.STRING.toThrift()), |
| new TColumn("comment", Type.STRING.toThrift()))); |
| } else if (analysis.isShowDataSrcsStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_DATA_SRCS; |
| ddl.setShow_data_srcs_params(analysis.getShowDataSrcsStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("name", Type.STRING.toThrift()), |
| new TColumn("location", Type.STRING.toThrift()), |
| new TColumn("class name", Type.STRING.toThrift()), |
| new TColumn("api version", Type.STRING.toThrift()))); |
| } else if (analysis.isShowStatsStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_STATS; |
| ddl.setShow_stats_params(analysis.getShowStatsStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("name", Type.STRING.toThrift()))); |
| } else if (analysis.isShowFunctionsStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_FUNCTIONS; |
| ShowFunctionsStmt stmt = (ShowFunctionsStmt)analysis.getStmt(); |
| ddl.setShow_fns_params(stmt.toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("return type", Type.STRING.toThrift()), |
| new TColumn("signature", Type.STRING.toThrift()), |
| new TColumn("binary type", Type.STRING.toThrift()), |
| new TColumn("is persistent", Type.STRING.toThrift()))); |
| } else if (analysis.isShowCreateTableStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_CREATE_TABLE; |
| ddl.setShow_create_table_params(analysis.getShowCreateTableStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("result", Type.STRING.toThrift()))); |
| } else if (analysis.isShowCreateFunctionStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_CREATE_FUNCTION; |
| ddl.setShow_create_function_params(analysis.getShowCreateFunctionStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("result", Type.STRING.toThrift()))); |
| } else if (analysis.isShowFilesStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_FILES; |
| ddl.setShow_files_params(analysis.getShowFilesStmt().toThrift()); |
| metadata.setColumns(Collections.<TColumn>emptyList()); |
| } else if (analysis.isDescribeHistoryStmt()) { |
| ddl.op_type = TCatalogOpType.DESCRIBE_HISTORY; |
| ddl.setDescribe_history_params(analysis.getDescribeHistoryStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("creation_time", Type.STRING.toThrift()), |
| new TColumn("snapshot_id", Type.STRING.toThrift()), |
| new TColumn("parent_id", Type.STRING.toThrift()), |
| new TColumn("is_current_ancestor", Type.STRING.toThrift()))); |
| } else if (analysis.isDescribeDbStmt()) { |
| ddl.op_type = TCatalogOpType.DESCRIBE_DB; |
| ddl.setDescribe_db_params(analysis.getDescribeDbStmt().toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("name", Type.STRING.toThrift()), |
| new TColumn("location", Type.STRING.toThrift()), |
| new TColumn("comment", Type.STRING.toThrift()))); |
| } else if (analysis.isDescribeTableStmt()) { |
| ddl.op_type = TCatalogOpType.DESCRIBE_TABLE; |
| DescribeTableStmt descStmt = analysis.getDescribeTableStmt(); |
| ddl.setDescribe_table_params(descStmt.toThrift()); |
| List<TColumn> columns = Lists.newArrayList( |
| new TColumn("name", Type.STRING.toThrift()), |
| new TColumn("type", Type.STRING.toThrift()), |
| new TColumn("comment", Type.STRING.toThrift())); |
| if (descStmt.targetsTable() |
| && descStmt.getOutputStyle() == TDescribeOutputStyle.MINIMAL) { |
| if (descStmt.getTable() instanceof FeKuduTable) { |
| columns.add(new TColumn("primary_key", Type.STRING.toThrift())); |
| columns.add(new TColumn("key_unique", Type.STRING.toThrift())); |
| columns.add(new TColumn("nullable", Type.STRING.toThrift())); |
| columns.add(new TColumn("default_value", Type.STRING.toThrift())); |
| columns.add(new TColumn("encoding", Type.STRING.toThrift())); |
| columns.add(new TColumn("compression", Type.STRING.toThrift())); |
| columns.add(new TColumn("block_size", Type.STRING.toThrift())); |
| } else if ((descStmt.getTable() instanceof FeIcebergTable |
| || descStmt.getTable() instanceof IcebergMetadataTable)) { |
| columns.add(new TColumn("nullable", Type.STRING.toThrift())); |
| } |
| } |
| metadata.setColumns(columns); |
| } else if (analysis.isAlterTableStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.ALTER_TABLE); |
| req.setAlter_table_params(analysis.getAlterTableStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isAlterViewStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.ALTER_VIEW); |
| req.setAlter_view_params(analysis.getAlterViewStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isCreateTableStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.CREATE_TABLE); |
| req.setCreate_table_params(analysis.getCreateTableStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isCreateTableAsSelectStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.CREATE_TABLE_AS_SELECT); |
| req.setCreate_table_params( |
| analysis.getCreateTableAsSelectStmt().getCreateStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isCreateTableLikeStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.CREATE_TABLE_LIKE); |
| req.setCreate_table_like_params(analysis.getCreateTableLikeStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isCreateViewStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.CREATE_VIEW); |
| req.setCreate_view_params(analysis.getCreateViewStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isCreateDbStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.CREATE_DATABASE); |
| req.setCreate_db_params(analysis.getCreateDbStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isCreateUdfStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| CreateUdfStmt stmt = (CreateUdfStmt) analysis.getStmt(); |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.CREATE_FUNCTION); |
| req.setCreate_fn_params(stmt.toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isCreateUdaStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.CREATE_FUNCTION); |
| CreateUdaStmt stmt = (CreateUdaStmt)analysis.getStmt(); |
| req.setCreate_fn_params(stmt.toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isCreateDataSrcStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.CREATE_DATA_SOURCE); |
| CreateDataSrcStmt stmt = (CreateDataSrcStmt)analysis.getStmt(); |
| req.setCreate_data_source_params(stmt.toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isComputeStatsStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.COMPUTE_STATS); |
| req.setCompute_stats_params(analysis.getComputeStatsStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isDropDbStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.DROP_DATABASE); |
| req.setDrop_db_params(analysis.getDropDbStmt().toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isDropTableOrViewStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| DropTableOrViewStmt stmt = analysis.getDropTableOrViewStmt(); |
| req.setDdl_type(stmt.isDropTable() ? TDdlType.DROP_TABLE : TDdlType.DROP_VIEW); |
| req.setDrop_table_or_view_params(stmt.toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isTruncateStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| TruncateStmt stmt = analysis.getTruncateStmt(); |
| req.setDdl_type(TDdlType.TRUNCATE_TABLE); |
| TTruncateParams truncateParams = stmt.toThrift(); |
| TQueryOptions queryOptions = result.getQuery_options(); |
| // if DELETE_STATS_IN_TRUNCATE option is unset forward it to catalogd |
| // so that it can skip deleting the statistics during truncate execution |
| if (!queryOptions.isDelete_stats_in_truncate()) { |
| truncateParams.setDelete_stats(false); |
| } |
| req.setTruncate_params(truncateParams); |
| ddl.setDdl_params(req); |
| } else if (analysis.isDropFunctionStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.DROP_FUNCTION); |
| DropFunctionStmt stmt = (DropFunctionStmt)analysis.getStmt(); |
| req.setDrop_fn_params(stmt.toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isDropDataSrcStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.DROP_DATA_SOURCE); |
| DropDataSrcStmt stmt = (DropDataSrcStmt)analysis.getStmt(); |
| req.setDrop_data_source_params(stmt.toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isDropStatsStmt()) { |
| ddl.op_type = TCatalogOpType.DDL; |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.DROP_STATS); |
| DropStatsStmt stmt = (DropStatsStmt) analysis.getStmt(); |
| req.setDrop_stats_params(stmt.toThrift()); |
| ddl.setDdl_params(req); |
| } else if (analysis.isResetMetadataStmt()) { |
| ddl.op_type = TCatalogOpType.RESET_METADATA; |
| ResetMetadataStmt resetMetadataStmt = (ResetMetadataStmt) analysis.getStmt(); |
| TResetMetadataRequest req = resetMetadataStmt.toThrift(); |
| ddl.setReset_metadata_params(req); |
| metadata.setColumns(Collections.emptyList()); |
| } else if (analysis.isShowRolesStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_ROLES; |
| ShowRolesStmt showRolesStmt = (ShowRolesStmt) analysis.getStmt(); |
| ddl.setShow_roles_params(showRolesStmt.toThrift()); |
| metadata.setColumns(Arrays.asList( |
| new TColumn("role_name", Type.STRING.toThrift()))); |
| } else if (analysis.isShowGrantPrincipalStmt()) { |
| ddl.op_type = TCatalogOpType.SHOW_GRANT_PRINCIPAL; |
| ShowGrantPrincipalStmt showGrantPrincipalStmt = |
| (ShowGrantPrincipalStmt) analysis.getStmt(); |
| ddl.setShow_grant_principal_params(showGrantPrincipalStmt.toThrift()); |
| metadata.setColumns(Arrays.asList(new TColumn("name", Type.STRING.toThrift()))); |
| } else if (analysis.isCreateDropRoleStmt()) { |
| CreateDropRoleStmt createDropRoleStmt = (CreateDropRoleStmt) analysis.getStmt(); |
| TCreateDropRoleParams params = createDropRoleStmt.toThrift(); |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(params.isIs_drop() ? TDdlType.DROP_ROLE : TDdlType.CREATE_ROLE); |
| req.setCreate_drop_role_params(params); |
| ddl.op_type = TCatalogOpType.DDL; |
| ddl.setDdl_params(req); |
| } else if (analysis.isGrantRevokeRoleStmt()) { |
| GrantRevokeRoleStmt grantRoleStmt = (GrantRevokeRoleStmt) analysis.getStmt(); |
| TGrantRevokeRoleParams params = grantRoleStmt.toThrift(); |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(params.isIs_grant() ? TDdlType.GRANT_ROLE : TDdlType.REVOKE_ROLE); |
| req.setGrant_revoke_role_params(params); |
| ddl.op_type = TCatalogOpType.DDL; |
| ddl.setDdl_params(req); |
| } else if (analysis.isGrantRevokePrivStmt()) { |
| GrantRevokePrivStmt grantRevokePrivStmt = (GrantRevokePrivStmt) analysis.getStmt(); |
| TGrantRevokePrivParams params = grantRevokePrivStmt.toThrift(); |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(params.isIs_grant() ? |
| TDdlType.GRANT_PRIVILEGE : TDdlType.REVOKE_PRIVILEGE); |
| req.setGrant_revoke_priv_params(params); |
| ddl.op_type = TCatalogOpType.DDL; |
| ddl.setDdl_params(req); |
| } else if (analysis.isCommentOnStmt()) { |
| CommentOnStmt commentOnStmt = analysis.getCommentOnStmt(); |
| TCommentOnParams params = commentOnStmt.toThrift(); |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.COMMENT_ON); |
| req.setComment_on_params(params); |
| ddl.op_type = TCatalogOpType.DDL; |
| ddl.setDdl_params(req); |
| } else if (analysis.isAlterDbStmt()) { |
| AlterDbStmt alterDbStmt = analysis.getAlterDbStmt(); |
| TAlterDbParams params = alterDbStmt.toThrift(); |
| TDdlExecRequest req = new TDdlExecRequest(); |
| req.setDdl_type(TDdlType.ALTER_DATABASE); |
| req.setAlter_db_params(params); |
| ddl.op_type = TCatalogOpType.DDL; |
| ddl.setDdl_params(req); |
| } else if (analysis.isTestCaseStmt()){ |
| CopyTestCaseStmt stmt = (CopyTestCaseStmt) analysis.getStmt(); |
| TCopyTestCaseReq req = new TCopyTestCaseReq(stmt.getHdfsPath()); |
| TDdlExecRequest ddlReq = new TDdlExecRequest(); |
| ddlReq.setCopy_test_case_params(req); |
| ddlReq.setDdl_type(TDdlType.COPY_TESTCASE); |
| ddl.op_type = TCatalogOpType.DDL; |
| ddl.setDdl_params(ddlReq); |
| } else { |
| throw new IllegalStateException("Unexpected CatalogOp statement type."); |
| } |
| // All DDL commands return a string summarizing the outcome of the DDL. |
| if (ddl.op_type == TCatalogOpType.DDL) { |
| metadata.setColumns(Arrays.asList(new TColumn("summary", Type.STRING.toThrift()))); |
| } |
| result.setResult_set_metadata(metadata); |
| ddl.setSync_ddl(result.getQuery_options().isSync_ddl()); |
| result.setCatalog_op_request(ddl); |
| if (ddl.getOp_type() == TCatalogOpType.DDL) { |
| TCatalogServiceRequestHeader header = new TCatalogServiceRequestHeader(); |
| header.setRequesting_user(analysis.getAnalyzer().getUser().getName()); |
| TQueryCtx queryCtx = analysis.getAnalyzer().getQueryCtx(); |
| header.setQuery_id(queryCtx.query_id); |
| header.setClient_ip(queryCtx.getSession().getNetwork_address().getHostname()); |
| TClientRequest clientRequest = queryCtx.getClient_request(); |
| header.setRedacted_sql_stmt(clientRequest.isSetRedacted_stmt() ? |
| clientRequest.getRedacted_stmt() : clientRequest.getStmt()); |
| header.setWant_minimal_response( |
| BackendConfig.INSTANCE.getBackendCfg().use_local_catalog); |
| header.setCoordinator_hostname(BackendConfig.INSTANCE.getHostname()); |
| ddl.getDdl_params().setHeader(header); |
| // Forward relevant query options to the catalogd. |
| TDdlQueryOptions ddlQueryOpts = new TDdlQueryOptions(); |
| ddlQueryOpts.setSync_ddl(result.getQuery_options().isSync_ddl()); |
| if (result.getQuery_options().isSetDebug_action()) { |
| ddlQueryOpts.setDebug_action(result.getQuery_options().getDebug_action()); |
| } |
| ddlQueryOpts.setLock_max_wait_time_s( |
| result.getQuery_options().lock_max_wait_time_s); |
| ddlQueryOpts.setKudu_table_reserve_seconds( |
| result.getQuery_options().kudu_table_reserve_seconds); |
| ddl.getDdl_params().setQuery_options(ddlQueryOpts); |
| } else if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) { |
| ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl()); |
| ddl.getReset_metadata_params().setRefresh_updated_hms_partitions( |
| result.getQuery_options().isRefresh_updated_hms_partitions()); |
| ddl.getReset_metadata_params().getHeader().setWant_minimal_response( |
| BackendConfig.INSTANCE.getBackendCfg().use_local_catalog); |
| // forward debug_actions to the catalogd |
| if (result.getQuery_options().isSetDebug_action()) { |
| ddl.getReset_metadata_params() |
| .setDebug_action(result.getQuery_options().getDebug_action()); |
| } |
| } |
| } |
| |
| /** |
| * Loads a table or partition with one or more data files. If the "overwrite" flag |
| * in the request is true, all existing data in the table/partition will be replaced. |
| * If the "overwrite" flag is false, the files will be added alongside any existing |
| * data files. |
| */ |
| public TLoadDataResp loadTableData(TLoadDataReq request) throws ImpalaException, |
| IOException { |
| TTableName tableName = request.getTable_name(); |
| RetryTracker retries = new RetryTracker( |
| String.format("load table data %s.%s", tableName.db_name, tableName.table_name)); |
| while (true) { |
| try { |
| if (!request.iceberg_tbl) |
| return doLoadTableData(request); |
| else |
| return doLoadIcebergTableData(request); |
| } catch(InconsistentMetadataFetchException e) { |
| retries.handleRetryOrThrow(e); |
| } |
| } |
| } |
| |
| /** |
| * Migrate external Hdfs tables to Iceberg tables. |
| */ |
| public void convertTable(TExecRequest execRequest) |
| throws DatabaseNotFoundException, ImpalaRuntimeException, InternalException { |
| Preconditions.checkState(execRequest.isSetConvert_table_request()); |
| TQueryOptions queryOptions = execRequest.query_options; |
| TConvertTableRequest convertTableRequest = execRequest.convert_table_request; |
| TTableName tableName = convertTableRequest.getHdfs_table_name(); |
| FeTable table = getCatalog().getTable(tableName.getDb_name(), |
| tableName.getTable_name()); |
| Preconditions.checkNotNull(table); |
| Preconditions.checkState(table instanceof FeFsTable); |
| try (MetaStoreClient client = metaStoreClientPool_.getClient()) { |
| MigrateTableUtil.migrateToIcebergTable(client.getHiveClient(), convertTableRequest, |
| (FeFsTable) table, queryOptions); |
| } |
| } |
| |
| private TLoadDataResp doLoadTableData(TLoadDataReq request) throws ImpalaException, |
| IOException { |
| TableName tableName = TableName.fromThrift(request.getTable_name()); |
| |
| // Get the destination for the load. If the load is targeting a partition, |
| // this the partition location. Otherwise this is the table location. |
| String destPathString = null; |
| String partitionName = null; |
| FeCatalog catalog = getCatalog(); |
| if (request.isSetPartition_spec()) { |
| FeFsPartition partition = catalog.getHdfsPartition(tableName.getDb(), |
| tableName.getTbl(), request.getPartition_spec()); |
| destPathString = partition.getLocation(); |
| partitionName = partition.getPartitionName(); |
| } else { |
| destPathString = catalog.getTable(tableName.getDb(), tableName.getTbl()) |
| .getMetaStoreTable().getSd().getLocation(); |
| } |
| |
| Path destPath = new Path(destPathString); |
| Path sourcePath = new Path(request.source_path); |
| FileSystem destFs = destPath.getFileSystem(FileSystemUtil.getConfiguration()); |
| FileSystem sourceFs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration()); |
| |
| // Create a temporary directory within the final destination directory to stage the |
| // file move. |
| Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath); |
| |
| int numFilesLoaded = 0; |
| if (sourceFs.isDirectory(sourcePath)) { |
| numFilesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath, tmpDestPath); |
| } else { |
| FileSystemUtil.relocateFile(sourcePath, tmpDestPath, true); |
| numFilesLoaded = 1; |
| } |
| |
| // If this is an OVERWRITE, delete all files in the destination. |
| if (request.isOverwrite()) { |
| FileSystemUtil.deleteAllVisibleFiles(destPath); |
| } |
| |
| List<Path> filesLoaded = new ArrayList<>(); |
| // Move the files from the temporary location to the final destination. |
| FileSystemUtil.relocateAllVisibleFiles(tmpDestPath, destPath, filesLoaded); |
| // Cleanup the tmp directory. |
| destFs.delete(tmpDestPath, true); |
| TLoadDataResp response = new TLoadDataResp(); |
| TColumnValue col = new TColumnValue(); |
| String loadMsg = String.format( |
| "Loaded %d file(s). Total files in destination location: %d", |
| numFilesLoaded, FileSystemUtil.getTotalNumVisibleFiles(destPath)); |
| col.setString_val(loadMsg); |
| response.setLoad_summary(new TResultRow(Lists.newArrayList(col))); |
| response.setLoaded_files(filesLoaded.stream().map(Path::toString) |
| .collect(Collectors.toList())); |
| if (partitionName != null && !partitionName.isEmpty()) { |
| response.setPartition_name(partitionName); |
| } |
| return response; |
| } |
| |
| private TLoadDataResp doLoadIcebergTableData(TLoadDataReq request) |
| throws ImpalaException, IOException { |
| TLoadDataResp response = new TLoadDataResp(); |
| TableName tableName = TableName.fromThrift(request.getTable_name()); |
| FeCatalog catalog = getCatalog(); |
| String destPathString = catalog.getTable(tableName.getDb(), tableName.getTbl()) |
| .getMetaStoreTable().getSd().getLocation(); |
| Path destPath = new Path(destPathString); |
| Path sourcePath = new Path(request.source_path); |
| FileSystem sourceFs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration()); |
| |
| // Create a temporary directory within the final destination directory to stage the |
| // file move. |
| Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath); |
| |
| int numFilesLoaded = 0; |
| List<Path> filesLoaded = new ArrayList<>(); |
| String destFile; |
| if (sourceFs.isDirectory(sourcePath)) { |
| numFilesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath, |
| tmpDestPath, filesLoaded); |
| destFile = filesLoaded.get(0).toString(); |
| } else { |
| Path destFilePath = FileSystemUtil.relocateFile(sourcePath, tmpDestPath, |
| true); |
| filesLoaded.add(new Path(tmpDestPath.toString() + Path.SEPARATOR |
| + sourcePath.getName())); |
| numFilesLoaded = 1; |
| destFile = destFilePath.toString(); |
| } |
| |
| String createTmpTblQuery = String.format(request.create_tmp_tbl_query_template, |
| destFile, tmpDestPath.toString()); |
| |
| TColumnValue col = new TColumnValue(); |
| String loadMsg = String.format("Loaded %d file(s).", numFilesLoaded); |
| col.setString_val(loadMsg); |
| response.setLoaded_files(filesLoaded.stream().map(Path::toString) |
| .collect(Collectors.toList())); |
| response.setLoad_summary(new TResultRow(Lists.newArrayList(col))); |
| response.setCreate_tmp_tbl_query(createTmpTblQuery); |
| response.setCreate_location(tmpDestPath.toString()); |
| return response; |
| } |
| |
| /** |
| * Parses and plans a query in order to generate its explain string. This method does |
| * not increase the query id counter. |
| */ |
| public String getExplainString(TQueryCtx queryCtx) throws ImpalaException { |
| PlanCtx planCtx = new PlanCtx(queryCtx); |
| createExecRequest(planCtx); |
| return planCtx.getExplainString(); |
| } |
| |
| public TGetCatalogMetricsResult getCatalogMetrics() { |
| TGetCatalogMetricsResult resp = new TGetCatalogMetricsResult(); |
| if (BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) { |
| // Don't track these two metrics in LocalCatalog mode since they might introduce |
| // catalogd RPCs when the db list or some table lists are not cached. |
| resp.num_dbs = -1; |
| resp.num_tables = -1; |
| FeCatalogUtils.populateCacheMetrics(getCatalog(), resp); |
| } else { |
| for (FeDb db : getCatalog().getDbs(PatternMatcher.MATCHER_MATCH_ALL)) { |
| resp.num_dbs++; |
| resp.num_tables += db.getAllTableNames().size(); |
| } |
| } |
| return resp; |
| } |
| |
| /** |
| * Keeps track of retries when handling InconsistentMetadataFetchExceptions. |
| * Whenever a Catalog object is acquired (e.g., getCatalog), operations that access |
| * finer-grained objects, such as tables and partitions, can throw such a runtime |
| * exception. Inconsistent metadata comes up due to interleaving catalog object updates |
| * with retrieving those objects. Instead of bubbling up the issue to the user, retrying |
| * can get the user's operation to run on a consistent snapshot and to succeed. |
| * Retries are *not* needed for accessing top-level objects such as databases, since |
| * they do not have a parent, so cannot be inconsistent. |
| * TODO: this class is typically used in a loop at the call-site. replace with lambdas |
| * in Java 8 to simplify the looping boilerplate. |
| */ |
| public static class RetryTracker { |
| // Number of exceptions seen |
| private int attempt_ = 0; |
| // Message to add when logging retries. |
| private final String msg_; |
| |
| public RetryTracker(String msg) { msg_ = msg; } |
| |
| /** |
| * Record a retry. If the number of retries exceeds to configured maximum, the |
| * exception is thrown. Otherwise, the number of retries is incremented and logged. |
| * TODO: record these retries in the profile as done for query retries. |
| */ |
| public void handleRetryOrThrow(InconsistentMetadataFetchException exception) { |
| if (attempt_++ >= INCONSISTENT_METADATA_NUM_RETRIES) throw exception; |
| if (attempt_ > 1) { |
| // Back-off a bit on later retries. |
| Uninterruptibles.sleepUninterruptibly(200 * attempt_, TimeUnit.MILLISECONDS); |
| } |
| LOG.warn(String.format("Retried %s: (retry #%s of %s)", msg_, |
| attempt_, INCONSISTENT_METADATA_NUM_RETRIES), exception); |
| } |
| } |
| |
| /** |
| * A Callable wrapper used for checking authorization to tables/databases. |
| */ |
| private class CheckAuthorization implements Callable<Boolean> { |
| private final String dbName_; |
| private final String tblName_; |
| private final String owner_; |
| private final User user_; |
| |
| public CheckAuthorization(String dbName, String tblName, String owner, User user) { |
| // dbName and user cannot be null, tblName and owner can be null. |
| Preconditions.checkNotNull(dbName); |
| Preconditions.checkNotNull(user); |
| dbName_ = dbName; |
| tblName_ = tblName; |
| owner_ = owner; |
| user_ = user; |
| } |
| |
| @Override |
| public Boolean call() throws Exception { |
| return Boolean.valueOf(isAccessibleToUser(dbName_, tblName_, owner_, user_)); |
| } |
| } |
| |
| public List<String> getTableNames(String dbName, PatternMatcher matcher, User user) |
| throws ImpalaException { |
| return getTableNames(dbName, matcher, user, /*tableTypes*/ Collections.emptySet()); |
| } |
| |
| /** |
| * Returns tables of types specified in 'tableTypes' in database 'dbName' that |
| * match the pattern of 'matcher' and are accessible to 'user'. |
| */ |
| public List<String> getTableNames(String dbName, PatternMatcher matcher, |
| User user, Set<TImpalaTableType> tableTypes) throws ImpalaException { |
| RetryTracker retries = new RetryTracker( |
| String.format("fetching %s table names", dbName)); |
| while (true) { |
| try { |
| return doGetTableNames(dbName, matcher, user, tableTypes); |
| } catch(InconsistentMetadataFetchException e) { |
| retries.handleRetryOrThrow(e); |
| } |
| } |
| } |
| |
| /** |
| * This method filters out elements from the given list based on the the results |
| * of the pendingCheckTasks. |
| */ |
| private void filterUnaccessibleElements(List<Future<Boolean>> pendingCheckTasks, |
| List<?> checkList) throws InternalException { |
| int failedCheckTasks = 0; |
| int index = 0; |
| Iterator<?> iter = checkList.iterator(); |
| |
| Preconditions.checkState(checkList.size() == pendingCheckTasks.size()); |
| while (iter.hasNext()) { |
| iter.next(); |
| try { |
| if (!pendingCheckTasks.get(index).get()) iter.remove(); |
| index++; |
| } catch (ExecutionException | InterruptedException e) { |
| failedCheckTasks++; |
| LOG.error("Encountered an error checking access", e); |
| break; |
| } |
| } |
| |
| if (failedCheckTasks > 0) |
| throw new InternalException("Failed to check access." + |
| "Check the server log for more details."); |
| } |
| |
| private List<String> doGetTableNames(String dbName, PatternMatcher matcher, |
| User user, Set<TImpalaTableType> tableTypes) |
| throws ImpalaException { |
| FeCatalog catalog = getCatalog(); |
| List<String> tblNames = catalog.getTableNames(dbName, matcher, tableTypes); |
| |
| boolean needsAuthChecks = authzFactory_.getAuthorizationConfig().isEnabled() |
| && !userHasAccessForWholeDb(user, dbName); |
| |
| if (needsAuthChecks) { |
| List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList(); |
| Iterator<String> iter = tblNames.iterator(); |
| while (iter.hasNext()) { |
| String tblName = iter.next(); |
| // Get the owner information. Do not force load the table, only get it |
| // from cache, if it is already loaded. This means that we cannot access |
| // ownership information for unloaded tables and they will not be listed |
| // here. This might result in situations like 'show tables' not listing |
| // 'owned' tables for a given user just because the metadata is not loaded. |
| // TODO(IMPALA-8937): Figure out a way to load Table/Database ownership |
| // information when fetching the table lists from HMS. |
| FeTable table = catalog.getTableIfCached(dbName, tblName); |
| String tableOwner = table.getOwnerUser(); |
| if (tableOwner == null) { |
| LOG.info("Table {} not yet loaded, ignoring it in table listing.", |
| dbName + "." + tblName); |
| } |
| pendingCheckTasks.add(checkAuthorizationPool_.submit( |
| new CheckAuthorization(dbName, tblName, tableOwner, user))); |
| } |
| |
| filterUnaccessibleElements(pendingCheckTasks, tblNames); |
| } |
| |
| return tblNames; |
| } |
| |
| /** |
| * Returns a list of columns of a table using 'matcher' and are accessible |
| * to the given user. |
| */ |
| public List<Column> getColumns(FeTable table, PatternMatcher matcher, |
| User user) throws InternalException { |
| Preconditions.checkNotNull(table); |
| Preconditions.checkNotNull(matcher); |
| List<Column> columns = Lists.newArrayList(); |
| for (Column column: table.getColumnsInHiveOrder()) { |
| String colName = column.getName(); |
| if (!matcher.matches(colName)) continue; |
| if (authzFactory_.getAuthorizationConfig().isEnabled()) { |
| PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()) |
| .any().onColumn(table.getTableName().getDb(), table.getTableName().getTbl(), |
| colName, table.getOwnerUser()).build(); |
| if (!authzChecker_.get().hasAccess(user, privilegeRequest)) continue; |
| } |
| columns.add(column); |
| } |
| return columns; |
| } |
| |
| /** |
| * Returns a list of primary keys for a given table only if the user has access to all |
| * the columns that form the primary key. This is because all SQLPrimaryKeys for a |
| * given table together form the primary key. |
| */ |
| public List<SQLPrimaryKey> getPrimaryKeys(FeTable table, User user) |
| throws InternalException { |
| Preconditions.checkNotNull(table); |
| List<SQLPrimaryKey> pkList; |
| pkList = table.getSqlConstraints().getPrimaryKeys(); |
| Preconditions.checkNotNull(pkList); |
| for (SQLPrimaryKey pk : pkList) { |
| if (authzFactory_.getAuthorizationConfig().isEnabled()) { |
| PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()) |
| .any().onColumn(table.getTableName().getDb(), table.getTableName().getTbl(), |
| pk.getColumn_name(), table.getOwnerUser()).build(); |
| // If any of the pk columns is not accessible to the user, we return an empty |
| // list. |
| if (!authzChecker_.get().hasAccess(user, privilegeRequest)) { |
| return new ArrayList<>(); |
| } |
| } |
| } |
| return pkList; |
| } |
| |
| /** |
| * Returns a list of foreign keys for a given table only if both the primary key |
| * column and the foreign key columns are accessible to user. |
| */ |
| public List<SQLForeignKey> getForeignKeys(FeTable table, User user) |
| throws InternalException { |
| Preconditions.checkNotNull(table); |
| // Consider an example: |
| // A child table has the following foreign keys. |
| // 1) A composite foreign key (col1, col2) referencing parent_table_1 columns (a, b). |
| // 2) A foreign key (col3) referencing a different parent_table_2 column (c). |
| // In the above scenario, three "SQLForeignKey" structures are stored in HMS. Two |
| // SQLForiegnKey for 1) above which share the same FkName and will have key_seq 1 |
| // and 2 respectively and one for 2) above. In other words, within a foreign key |
| // definition, we will have one "SQLForeignKey" structure for each column in the |
| // definition. They share fkName but will have different key_seq numbers. For the |
| // purpose of authorization, we do not want to show only a part of a sequence of |
| // keys. So if any of the keys in a sequence has incorrect privileges, we omit the |
| // entire sequence. For instance, in a request for all the foreign keys on |
| // child_table above, if we discover that the user does not have privilege on col1 |
| // in the child_table, we omit both the "SQLForeignKey" associated with col1 and col2 |
| // but we return the "SQLFOreignKey" for col3. |
| Set<String> omitList = new HashSet<>(); |
| List<SQLForeignKey> fkList = new ArrayList<>(); |
| List<SQLForeignKey> foreignKeys = table.getSqlConstraints().getForeignKeys(); |
| Preconditions.checkNotNull(foreignKeys); |
| for (SQLForeignKey fk : foreignKeys) { |
| String fkName = fk.getFk_name(); |
| if (!omitList.contains(fkName)) { |
| if (authzFactory_.getAuthorizationConfig().isEnabled()) { |
| PrivilegeRequest fkPrivilegeRequest = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()) |
| .any() |
| .onColumn(table.getTableName().getDb(), table.getTableName().getTbl(), |
| fk.getFkcolumn_name(), table.getOwnerUser()).build(); |
| |
| // Build privilege request for PK table. |
| FeTable pkTable = |
| getCatalog().getTableNoThrow(fk.getPktable_db(), fk.getPktable_name()); |
| PrivilegeRequest pkPrivilegeRequest = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()) |
| .any().onColumn(pkTable.getTableName().getDb(), |
| pkTable.getTableName().getTbl(), fk.getPkcolumn_name(), |
| pkTable.getOwnerUser()).build(); |
| if (!authzChecker_.get().hasAccess(user, fkPrivilegeRequest) || |
| !authzChecker_.get().hasAccess(user, pkPrivilegeRequest)) { |
| omitList.add(fkName); |
| } |
| } |
| } |
| } |
| for (SQLForeignKey fk : foreignKeys) { |
| if (!omitList.contains(fk.getFk_name())) { |
| fkList.add(fk); |
| } |
| } |
| return fkList; |
| } |
| |
| /** |
| * Returns all databases in catalog cache that match the pattern of 'matcher' and are |
| * accessible to 'user'. |
| */ |
| public List<? extends FeDb> getDbs(PatternMatcher matcher, User user) |
| throws InternalException { |
| List<? extends FeDb> dbs = getCatalog().getDbs(matcher); |
| |
| boolean needsAuthChecks = authzFactory_.getAuthorizationConfig().isEnabled() |
| && !userHasAccessForWholeServer(user); |
| |
| // Filter out the databases the user does not have permissions on. |
| if (needsAuthChecks) { |
| Iterator<? extends FeDb> iter = dbs.iterator(); |
| List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList(); |
| while (iter.hasNext()) { |
| FeDb db = iter.next(); |
| pendingCheckTasks.add(checkAuthorizationPool_.submit( |
| new CheckAuthorization(db.getName(), null, db.getOwnerUser(), user))); |
| } |
| |
| filterUnaccessibleElements(pendingCheckTasks, dbs); |
| } |
| |
| return dbs; |
| } |
| |
| /** |
| * Handles DESCRIBE HISTORY queries. |
| */ |
| public TGetTableHistoryResult getTableHistory(TDescribeHistoryParams params) |
| throws DatabaseNotFoundException, TableLoadingException { |
| FeTable feTable = getCatalog().getTable(params.getTable_name().db_name, |
| params.getTable_name().table_name); |
| FeIcebergTable feIcebergTable = (FeIcebergTable) feTable; |
| Table table = feIcebergTable.getIcebergApiTable(); |
| Set<Long> ancestorIds = Sets.newHashSet(IcebergUtil.currentAncestorIds(table)); |
| TGetTableHistoryResult historyResult = new TGetTableHistoryResult(); |
| |
| List<HistoryEntry> filteredHistoryEntries = table.history(); |
| if (params.isSetFrom_time()) { |
| // DESCRIBE HISTORY <table> FROM <ts> |
| filteredHistoryEntries = table.history().stream() |
| .filter(c -> c.timestampMillis() >= params.from_time) |
| .collect(Collectors.toList()); |
| } else if (params.isSetBetween_start_time() && params.isSetBetween_end_time()) { |
| // DESCRIBE HISTORY <table> BETWEEN <ts> AND <ts> |
| filteredHistoryEntries = table.history().stream() |
| .filter(x -> x.timestampMillis() >= params.between_start_time && |
| x.timestampMillis() <= params.between_end_time) |
| .collect(Collectors.toList()); |
| } |
| |
| List<TGetTableHistoryResultItem> result = Lists.newArrayList(); |
| for (HistoryEntry historyEntry : filteredHistoryEntries) { |
| TGetTableHistoryResultItem resultItem = new TGetTableHistoryResultItem(); |
| long snapshotId = historyEntry.snapshotId(); |
| resultItem.setCreation_time(historyEntry.timestampMillis()); |
| resultItem.setSnapshot_id(snapshotId); |
| Snapshot snapshot = table.snapshot(snapshotId); |
| if (snapshot != null && snapshot.parentId() != null) { |
| resultItem.setParent_id(snapshot.parentId()); |
| } |
| resultItem.setIs_current_ancestor(ancestorIds.contains(snapshotId)); |
| result.add(resultItem); |
| } |
| historyResult.setResult(result); |
| return historyResult; |
| } |
| |
| /** |
| * Check whether table/database is accessible to given user. |
| */ |
| private boolean isAccessibleToUser(String dbName, String tblName, |
| String owner, User user) throws InternalException { |
| Preconditions.checkNotNull(dbName); |
| if (tblName == null && |
| dbName.toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) { |
| // Default DB should always be shown. |
| return true; |
| } |
| |
| PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()) |
| .anyOf(minPrivilegeSetForShowStmts_); |
| if (tblName == null) { |
| // Check database |
| builder = builder.onAnyColumn(dbName, owner); |
| } else { |
| // Check table |
| builder = builder.onAnyColumn(dbName, tblName, owner); |
| } |
| |
| return authzChecker_.get().hasAnyAccess(user, builder.buildSet()); |
| } |
| |
| /** |
| * Check whether the whole server is accessible to given user. |
| */ |
| private boolean userHasAccessForWholeServer(User user) |
| throws InternalException { |
| if (authEngineSupportsDenyRules()) return false; |
| PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()).anyOf(minPrivilegeSetForShowStmts_) |
| .onServer(authzFactory_.getAuthorizationConfig().getServerName()); |
| return authzChecker_.get().hasAnyAccess(user, builder.buildSet()); |
| } |
| |
| /** |
| * Check whether the whole database is accessible to given user. |
| */ |
| private boolean userHasAccessForWholeDb(User user, String dbName) |
| throws InternalException, DatabaseNotFoundException { |
| if (authEngineSupportsDenyRules()) return false; |
| // FeDb is needed to respect ownership in Ranger, dbName would be enough for |
| // the privilege request otherwise. |
| FeDb db = getCatalog().getDb(dbName); |
| if (db == null) { |
| throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); |
| } |
| PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()).anyOf(minPrivilegeSetForShowStmts_) |
| .onDb(db); |
| return authzChecker_.get().hasAnyAccess(user, builder.buildSet()); |
| } |
| |
| /** |
| * Returns whether the authorization engine supports deny rules. If it does, |
| * then a privilege on a higher level object does not imply privilege on lower |
| * level objects in the hierarchy. |
| */ |
| private boolean authEngineSupportsDenyRules() { |
| // Sentry did not support deny rules, but Ranger does. So, this now returns true. |
| // TODO: could check config for Ranger and return true if deny rules are disabled |
| return true; |
| } |
| |
| /** |
| * Returns all data sources that match the pattern. If pattern is null, |
| * matches all data sources. |
| */ |
| public List<? extends FeDataSource> getDataSrcs(String pattern) { |
| // TODO: handle InconsistentMetadataException for data sources. |
| return getCatalog().getDataSources( |
| PatternMatcher.createHivePatternMatcher(pattern)); |
| } |
| |
| /** |
| * Generate result set and schema for a SHOW COLUMN STATS command. |
| */ |
| public TResultSet getColumnStats(String dbName, String tableName, boolean showMinMax) |
| throws ImpalaException { |
| RetryTracker retries = new RetryTracker( |
| String.format("fetching column stats from %s.%s", dbName, tableName)); |
| while (true) { |
| try { |
| return doGetColumnStats(dbName, tableName, showMinMax); |
| } catch(InconsistentMetadataFetchException e) { |
| retries.handleRetryOrThrow(e); |
| } |
| } |
| } |
| |
| private TResultSet doGetColumnStats(String dbName, String tableName, boolean showMinMax) |
| throws ImpalaException { |
| FeTable table = getCatalog().getTable(dbName, tableName); |
| TResultSet result = new TResultSet(); |
| TResultSetMetadata resultSchema = new TResultSetMetadata(); |
| result.setSchema(resultSchema); |
| resultSchema.addToColumns(new TColumn("Column", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Type", Type.STRING.toThrift())); |
| resultSchema.addToColumns( |
| new TColumn("#Distinct Values", Type.BIGINT.toThrift())); |
| resultSchema.addToColumns(new TColumn("#Nulls", Type.BIGINT.toThrift())); |
| resultSchema.addToColumns(new TColumn("Max Size", Type.BIGINT.toThrift())); |
| resultSchema.addToColumns(new TColumn("Avg Size", Type.DOUBLE.toThrift())); |
| resultSchema.addToColumns(new TColumn("#Trues", Type.BIGINT.toThrift())); |
| resultSchema.addToColumns(new TColumn("#Falses", Type.BIGINT.toThrift())); |
| if (showMinMax) { |
| resultSchema.addToColumns(new TColumn("Min", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Max", Type.STRING.toThrift())); |
| } |
| |
| for (Column c: table.getColumnsInHiveOrder()) { |
| TResultRowBuilder rowBuilder = new TResultRowBuilder(); |
| // Add name, type, NDVs, numNulls, max size, avg size, and conditionally |
| // the min value and max value. |
| if (showMinMax) { |
| rowBuilder.add(c.getName()) |
| .add(c.getType().toSql()) |
| .add(c.getStats().getNumDistinctValues()) |
| .add(c.getStats().getNumNulls()) |
| .add(c.getStats().getMaxSize()) |
| .add(c.getStats().getAvgSize()) |
| .add(c.getStats().getNumTrues()) |
| .add(c.getStats().getNumFalses()) |
| .add(c.getStats().getLowValueAsString()) |
| .add(c.getStats().getHighValueAsString()); |
| } else { |
| rowBuilder.add(c.getName()) |
| .add(c.getType().toSql()) |
| .add(c.getStats().getNumDistinctValues()) |
| .add(c.getStats().getNumNulls()) |
| .add(c.getStats().getMaxSize()) |
| .add(c.getStats().getAvgSize()) |
| .add(c.getStats().getNumTrues()) |
| .add(c.getStats().getNumFalses()); |
| } |
| result.addToRows(rowBuilder.get()); |
| } |
| return result; |
| } |
| |
| /** |
| * Generate result set and schema for a SHOW TABLE STATS command. |
| */ |
| public TResultSet getTableStats(String dbName, String tableName, TShowStatsOp op) |
| throws ImpalaException { |
| RetryTracker retries = new RetryTracker( |
| String.format("fetching table stats from %s.%s", dbName, tableName)); |
| while (true) { |
| try { |
| return doGetTableStats(dbName, tableName, op); |
| } catch(InconsistentMetadataFetchException e) { |
| retries.handleRetryOrThrow(e); |
| } |
| } |
| } |
| |
| private TResultSet doGetTableStats(String dbName, String tableName, TShowStatsOp op) |
| throws ImpalaException { |
| FeTable table = getCatalog().getTable(dbName, tableName); |
| if (table instanceof FeFsTable) { |
| if (table instanceof FeIcebergTable && op == TShowStatsOp.PARTITIONS) { |
| return FeIcebergTable.Utils.getPartitionStats((FeIcebergTable) table); |
| } |
| if (table instanceof FeIcebergTable && op == TShowStatsOp.TABLE_STATS) { |
| return FeIcebergTable.Utils.getTableStats((FeIcebergTable) table); |
| } |
| return ((FeFsTable) table).getTableStats(); |
| } else if (table instanceof FeHBaseTable) { |
| return ((FeHBaseTable) table).getTableStats(); |
| } else if (table instanceof FeDataSourceTable) { |
| return ((FeDataSourceTable) table).getTableStats(); |
| } else if (table instanceof FeKuduTable) { |
| if (op == TShowStatsOp.RANGE_PARTITIONS) { |
| return FeKuduTable.Utils.getRangePartitions((FeKuduTable) table, false); |
| } else if (op == TShowStatsOp.HASH_SCHEMA) { |
| return FeKuduTable.Utils.getRangePartitions((FeKuduTable) table, true); |
| } else if (op == TShowStatsOp.PARTITIONS) { |
| return FeKuduTable.Utils.getPartitions((FeKuduTable) table); |
| } else { |
| Preconditions.checkState(op == TShowStatsOp.TABLE_STATS); |
| return FeKuduTable.Utils.getTableStats((FeKuduTable) table); |
| } |
| } else if (table instanceof MaterializedViewHdfsTable) { |
| return ((MaterializedViewHdfsTable) table).getTableStats(); |
| } else { |
| throw new InternalException("Invalid table class: " + table.getClass()); |
| } |
| } |
| |
| /** |
| * Returns all function signatures that match the pattern. If pattern is null, |
| * matches all functions. If exactMatch is true, treats fnPattern as a function |
| * name instead of pattern and returns exact match only. |
| */ |
| public List<Function> getFunctions(TFunctionCategory category, |
| String dbName, String fnPattern, boolean exactMatch) |
| throws DatabaseNotFoundException { |
| RetryTracker retries = new RetryTracker( |
| String.format("fetching functions from %s", dbName)); |
| while (true) { |
| try { |
| return doGetFunctions(category, dbName, fnPattern, exactMatch); |
| } catch(InconsistentMetadataFetchException e) { |
| retries.handleRetryOrThrow(e); |
| } |
| } |
| } |
| |
| private List<Function> doGetFunctions(TFunctionCategory category, |
| String dbName, String fnPattern, boolean exactMatch) |
| throws DatabaseNotFoundException { |
| FeDb db = getCatalog().getDb(dbName); |
| if (db == null) { |
| throw new DatabaseNotFoundException("Database '" + dbName + "' not found"); |
| } |
| List<Function> fns; |
| if (exactMatch) { |
| Preconditions.checkNotNull(fnPattern, "Invalid function name"); |
| fns = db.getFunctions(category, fnPattern); |
| } else { |
| fns = db.getFunctions( |
| category, PatternMatcher.createHivePatternMatcher(fnPattern)); |
| } |
| Collections.sort(fns, |
| new Comparator<Function>() { |
| @Override |
| public int compare(Function f1, Function f2) { |
| return f1.signatureString().compareTo(f2.signatureString()); |
| } |
| }); |
| return fns; |
| } |
| |
| /** |
| * Returns database metadata, in the specified database. Throws an exception if db is |
| * not found or if there is an error loading the db metadata. |
| */ |
| public TDescribeResult describeDb(String dbName, TDescribeOutputStyle outputStyle) |
| throws ImpalaException { |
| FeDb db = getCatalog().getDb(dbName); |
| return DescribeResultFactory.buildDescribeDbResult(db, outputStyle); |
| } |
| |
| /** |
| * Returns table metadata, such as the column descriptors, in the specified table. |
| * Throws an exception if the table or db is not found or if there is an error loading |
| * the table metadata. |
| */ |
| public TDescribeResult describeTable(TDescribeTableParams params, User user) |
| throws ImpalaException { |
| if (params.isSetTable_name()) { |
| RetryTracker retries = new RetryTracker( |
| String.format("fetching table %s.%s", params.table_name.db_name, |
| params.table_name.table_name)); |
| while (true) { |
| try { |
| return doDescribeTable(params.table_name, params.output_style, user, |
| params.metadata_table_name); |
| } catch(InconsistentMetadataFetchException e) { |
| retries.handleRetryOrThrow(e); |
| } |
| } |
| } else { |
| Preconditions.checkState(params.output_style == TDescribeOutputStyle.MINIMAL); |
| Preconditions.checkNotNull(params.result_struct); |
| StructType structType = (StructType)Type.fromThrift(params.result_struct); |
| return DescribeResultFactory.buildDescribeMinimalResult(structType); |
| } |
| } |
| |
| /** |
| * Filters out columns that the user is not authorized to see. |
| */ |
| private List<Column> filterAuthorizedColumnsForDescribeTable(FeTable table, User user) |
| throws InternalException { |
| List<Column> authFilteredColumns; |
| if (authzFactory_.getAuthorizationConfig().isEnabled()) { |
| // First run a table check |
| PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()) |
| .allOf(Privilege.VIEW_METADATA).onTable(table).build(); |
| if (!authzChecker_.get().hasAccess(user, privilegeRequest)) { |
| // Filter out columns that the user is not authorized to see. |
| authFilteredColumns = new ArrayList<Column>(); |
| for (Column col: table.getColumnsInHiveOrder()) { |
| String colName = col.getName(); |
| privilegeRequest = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()) |
| .allOf(Privilege.VIEW_METADATA) |
| .onColumn(table.getDb().getName(), |
| table.getName(), colName, table.getOwnerUser()).build(); |
| if (authzChecker_.get().hasAccess(user, privilegeRequest)) { |
| authFilteredColumns.add(col); |
| } |
| } |
| } else { |
| // User has table-level access |
| authFilteredColumns = table.getColumnsInHiveOrder(); |
| } |
| } else { |
| // Authorization is disabled |
| authFilteredColumns = table.getColumnsInHiveOrder(); |
| } |
| return authFilteredColumns; |
| } |
| |
| private TDescribeResult doDescribeTable(TTableName tableName, |
| TDescribeOutputStyle outputStyle, User user, String vTableName) |
| throws ImpalaException { |
| FeTable table = getCatalog().getTable(tableName.db_name, |
| tableName.table_name); |
| List<Column> filteredColumns = filterAuthorizedColumnsForDescribeTable(table, user); |
| if (outputStyle == TDescribeOutputStyle.MINIMAL) { |
| if (table instanceof FeKuduTable) { |
| return DescribeResultFactory.buildKuduDescribeMinimalResult(filteredColumns); |
| } else if (table instanceof FeIcebergTable) { |
| if (vTableName == null) { |
| return DescribeResultFactory.buildIcebergDescribeMinimalResult(filteredColumns); |
| } else { |
| Preconditions.checkArgument(vTableName != null); |
| return DescribeResultFactory.buildIcebergMetadataDescribeMinimalResult(table, |
| vTableName); |
| } |
| } else { |
| return DescribeResultFactory.buildDescribeMinimalResult( |
| Column.columnsToStruct(filteredColumns)); |
| } |
| } else { |
| Preconditions.checkArgument(outputStyle == TDescribeOutputStyle.FORMATTED || |
| outputStyle == TDescribeOutputStyle.EXTENDED); |
| Preconditions.checkArgument(vTableName == null); |
| TDescribeResult result = DescribeResultFactory.buildDescribeFormattedResult(table, |
| filteredColumns); |
| // Filter out LOCATION text |
| if (authzFactory_.getAuthorizationConfig().isEnabled()) { |
| PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder( |
| authzFactory_.getAuthorizableFactory()) |
| .allOf(Privilege.VIEW_METADATA).onTable(table).build(); |
| // Only filter if the user doesn't have table access. |
| if (!authzChecker_.get().hasAccess(user, privilegeRequest)) { |
| List<TResultRow> results = new ArrayList<>(); |
| for(TResultRow row: result.getResults()) { |
| String stringVal = row.getColVals().get(0).getString_val(); |
| if (!stringVal.contains("Location")) { |
| results.add(row); |
| } |
| } |
| result.setResults(results); |
| } |
| } |
| return result; |
| } |
| } |
| |
| /** |
| * Waits indefinitely for the local catalog to be ready. The catalog is "ready" after |
| * the first catalog update with a version > INITIAL_CATALOG_VERSION is received from |
| * the statestore. |
| * |
| * @see ImpaladCatalog#isReady() |
| */ |
| public void waitForCatalog() { |
| LOG.info("Waiting for first catalog update from the statestore."); |
| int numTries = 0; |
| long startTimeMs = System.currentTimeMillis(); |
| while (true) { |
| if (getCatalog().isReady()) { |
| LOG.info("Local catalog initialized after: " + |
| (System.currentTimeMillis() - startTimeMs) + " ms."); |
| return; |
| } |
| LOG.info("Waiting for local catalog to be initialized, attempt: " + numTries); |
| getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS); |
| ++numTries; |
| } |
| } |
| |
| /** |
| * Return a TPlanExecInfo corresponding to the plan with root fragment 'planRoot'. |
| */ |
| public static TPlanExecInfo createPlanExecInfo(PlanFragment planRoot, |
| TQueryCtx queryCtx) { |
| TPlanExecInfo result = new TPlanExecInfo(); |
| List<PlanFragment> fragments = planRoot.getFragmentsInPlanPreorder(); |
| |
| // collect ScanNodes |
| List<ScanNode> scanNodes = Lists.newArrayList(); |
| for (PlanFragment fragment : fragments) { |
| fragment.collectPlanNodes( |
| Predicates.instanceOf(ScanNode.class), scanNodes); |
| } |
| |
| // Set scan ranges/locations for scan nodes. |
| LOG.trace("get scan range locations"); |
| Set<TTableName> tablesMissingStats = Sets.newTreeSet(); |
| Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet(); |
| Set<TTableName> tablesWithMissingDiskIds = Sets.newTreeSet(); |
| for (ScanNode scanNode: scanNodes) { |
| result.putToPer_node_scan_ranges( |
| scanNode.getId().asInt(), scanNode.getScanRangeSpecs()); |
| |
| TTableName tableName = scanNode.getTupleDesc().getTableName().toThrift(); |
| if (scanNode.isTableMissingStats()) tablesMissingStats.add(tableName); |
| if (scanNode.hasCorruptTableStats()) tablesWithCorruptStats.add(tableName); |
| if (scanNode instanceof HdfsScanNode && |
| ((HdfsScanNode) scanNode).hasMissingDiskIds()) { |
| tablesWithMissingDiskIds.add(tableName); |
| } |
| } |
| |
| // Clear pre-existing lists to avoid adding duplicate entries in FE tests. |
| queryCtx.unsetTables_missing_stats(); |
| queryCtx.unsetTables_with_corrupt_stats(); |
| for (TTableName tableName: tablesMissingStats) { |
| queryCtx.addToTables_missing_stats(tableName); |
| } |
| for (TTableName tableName: tablesWithCorruptStats) { |
| queryCtx.addToTables_with_corrupt_stats(tableName); |
| } |
| for (TTableName tableName: tablesWithMissingDiskIds) { |
| queryCtx.addToTables_missing_diskids(tableName); |
| } |
| |
| // The fragment at this point has all state set, serialize it to thrift. |
| for (PlanFragment fragment: fragments) { |
| TPlanFragment thriftFragment = fragment.toThrift(); |
| result.addToFragments(thriftFragment); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * Create a populated TQueryExecRequest, corresponding to the supplied planner. |
| */ |
| private TQueryExecRequest createExecRequest( |
| Planner planner, PlanCtx planCtx) throws ImpalaException { |
| TQueryCtx queryCtx = planner.getQueryCtx(); |
| List<PlanFragment> planRoots = planner.createPlans(); |
| if (planCtx.planCaptureRequested()) { |
| planCtx.plan_ = planRoots; |
| } |
| |
| // Compute resource requirements of the final plans. |
| TQueryExecRequest result = new TQueryExecRequest(); |
| Planner.reduceCardinalityByRuntimeFilter(planRoots, planner.getPlannerCtx()); |
| Planner.computeProcessingCost(planRoots, result, planner.getPlannerCtx()); |
| Planner.computeResourceReqs(planRoots, queryCtx, result, |
| planner.getPlannerCtx(), planner.getAnalysisResult().isQueryStmt()); |
| |
| // create per-plan exec info; |
| // also assemble list of names of tables with missing or corrupt stats for |
| // assembling a warning message |
| for (PlanFragment planRoot: planRoots) { |
| result.addToPlan_exec_info( |
| createPlanExecInfo(planRoot, queryCtx)); |
| } |
| |
| // Optionally disable spilling in the backend. Allow spilling if there are plan hints |
| // or if all tables have stats. |
| boolean disableSpilling = |
| queryCtx.client_request.query_options.isDisable_unsafe_spills() |
| && queryCtx.isSetTables_missing_stats() |
| && !queryCtx.tables_missing_stats.isEmpty() |
| && !planner.getAnalysisResult().getAnalyzer().hasPlanHints(); |
| queryCtx.setDisable_spilling(disableSpilling); |
| |
| // assign fragment idx |
| int idx = 0; |
| for (TPlanExecInfo planExecInfo: result.plan_exec_info) { |
| for (TPlanFragment fragment: planExecInfo.fragments) fragment.setIdx(idx++); |
| } |
| |
| // create EXPLAIN output after setting everything else |
| result.setQuery_ctx(queryCtx); // needed by getExplainString() |
| List<PlanFragment> allFragments = planRoots.get(0).getNodesPreOrder(); |
| planCtx.explainBuf_.append(planner.getExplainString(allFragments, result)); |
| result.setQuery_plan(planCtx.getExplainString()); |
| |
| // copy estimated memory per host to planCtx for auto-scaling. |
| planCtx.compilationState_.setEstimatedMemoryPerHost( |
| result.getPer_host_mem_estimate()); |
| |
| planCtx.compilationState_.setCoresRequired(result.getCores_required()); |
| |
| return result; |
| } |
| |
| /** |
| * Create a TExecRequest for the query and query context provided in the plan |
| * context. Fills in the EXPLAIN string and optionally the internal plan tree. |
| */ |
| public TExecRequest createExecRequest(PlanCtx planCtx) |
| throws ImpalaException { |
| // Timeline of important events in the planning process, used for debugging |
| // and profiling. |
| try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) { |
| EventSequence timeline = new EventSequence("Query Compilation"); |
| TExecRequest result = getTExecRequest(planCtx, timeline); |
| timeline.markEvent("Planning finished"); |
| result.setTimeline(timeline.toThrift()); |
| result.setProfile(FrontendProfile.getCurrent().emitAsThrift()); |
| result.setProfile_children(FrontendProfile.getCurrent().emitChildrenAsThrift()); |
| return result; |
| } |
| } |
| |
| /** |
| * Marks 'timeline' with the number of query planning retries that were needed. |
| * Includes a 'msg' that explains the cause of retries. If there were no retries, then |
| * 'timeline' is not written. |
| */ |
| private void markTimelineRetries(int numRetries, String msg, EventSequence timeline) { |
| if (numRetries == 0) return; |
| timeline.markEvent( |
| String.format("Retried query planning due to inconsistent metadata " |
| + "%s of %s times: ",numRetries, INCONSISTENT_METADATA_NUM_RETRIES) + msg); |
| } |
| |
| /** |
| * Examines the input 'executorGroupSets', removes non-default and useless group sets |
| * from it and fills the max_mem_limit field. A group set is considered useless if its |
| * name is not a suffix of 'request_pool'. The max_mem_limit is set to the |
| * max_query_mem_limit from the pool service for the surviving group sets. |
| * |
| * Also imposes the artificial two-executor groups for testing when needed. |
| */ |
| public static List<TExecutorGroupSet> setupThresholdsForExecutorGroupSets( |
| List<TExecutorGroupSet> executorGroupSets, String request_pool, |
| boolean default_executor_group, boolean test_replan) throws ImpalaException { |
| RequestPoolService poolService = RequestPoolService.getInstance(); |
| |
| List<TExecutorGroupSet> result = Lists.newArrayList(); |
| if (default_executor_group) { |
| TExecutorGroupSet e = executorGroupSets.get(0); |
| if (e.getCurr_num_executors() == 0) { |
| // The default group has 0 executors. Return one with the number of default |
| // executors as the number of expected executors. |
| result.add(new TExecutorGroupSet(e)); |
| result.get(0).setCurr_num_executors(e.getExpected_num_executors()); |
| result.get(0).setMax_mem_limit(Long.MAX_VALUE); |
| result.get(0).setNum_cores_per_executor(Integer.MAX_VALUE); |
| } else if (test_replan) { |
| ExecutorMembershipSnapshot cluster = ExecutorMembershipSnapshot.getCluster(); |
| int num_nodes = cluster.numExecutors(); |
| Preconditions.checkState(e.getCurr_num_executors() == num_nodes); |
| // Form a two-executor group testing environment so that we can exercise |
| // auto-scaling logic (see getTExecRequest()). |
| TExecutorGroupSet s = new TExecutorGroupSet(e); |
| s.setExec_group_name_prefix("small"); |
| s.setMax_mem_limit(64*MEGABYTE); |
| s.setNum_cores_per_executor(8); |
| result.add(s); |
| TExecutorGroupSet l = new TExecutorGroupSet(e); |
| String newName = "large"; |
| if (e.isSetExec_group_name_prefix()) { |
| String currentName = e.getExec_group_name_prefix(); |
| if (currentName.length() > 0) newName = currentName; |
| } |
| l.setExec_group_name_prefix(newName); |
| l.setMax_mem_limit(Long.MAX_VALUE); |
| l.setNum_cores_per_executor(Integer.MAX_VALUE); |
| result.add(l); |
| } else { |
| // Copy and augment the group with the maximally allowed max_mem_limit value. |
| result.add(new TExecutorGroupSet(e)); |
| result.get(0).setMax_mem_limit(Long.MAX_VALUE); |
| result.get(0).setNum_cores_per_executor(Integer.MAX_VALUE); |
| } |
| return result; |
| } |
| |
| // If there are no executor groups in the cluster, Create a group set with 1 executor. |
| if (executorGroupSets.size() == 0) { |
| result.add(new TExecutorGroupSet(1, 20, DEFAULT_POOL_NAME)); |
| result.get(0).setMax_mem_limit(Long.MAX_VALUE); |
| result.get(0).setNum_cores_per_executor(Integer.MAX_VALUE); |
| return result; |
| } |
| |
| // Executor groups exist in the cluster. Identify those that can be used. |
| for (TExecutorGroupSet e : executorGroupSets) { |
| // If defined, request_pool can be a suffix of the group name prefix. For example |
| // group_set_prefix = root.queue1 |
| // request_pool = queue1 |
| if (StringUtils.isNotEmpty(request_pool) |
| && !e.getExec_group_name_prefix().endsWith(request_pool)) { |
| continue; |
| } |
| TExecutorGroupSet new_entry = new TExecutorGroupSet(e); |
| if (poolService != null) { |
| // Find out the max_mem_limit from the pool service. Set to max_mem_limit when it |
| // is greater than 0, otherwise set to max possible threshold value. |
| TPoolConfig poolConfig = |
| poolService.getPoolConfig(e.getExec_group_name_prefix()); |
| Preconditions.checkNotNull(poolConfig); |
| new_entry.setMax_mem_limit(poolConfig.getMax_query_mem_limit() > 0 ? |
| poolConfig.getMax_query_mem_limit() : Long.MAX_VALUE); |
| new_entry.setNum_cores_per_executor( |
| poolConfig.getMax_query_cpu_core_per_node_limit() > 0 ? |
| (int)poolConfig.getMax_query_cpu_core_per_node_limit() : Integer.MAX_VALUE); |
| } else { |
| // Set to max possible threshold value when there is no pool service |
| new_entry.setMax_mem_limit(Long.MAX_VALUE); |
| new_entry.setNum_cores_per_executor(Integer.MAX_VALUE); |
| } |
| result.add(new_entry); |
| } |
| if (executorGroupSets.size() > 0 && result.size() == 0 |
| && StringUtils.isNotEmpty(request_pool)) { |
| throw new AnalysisException("Request pool: " + request_pool |
| + " does not map to any known executor group set."); |
| } |
| |
| // Sort 'executorGroupSets' by |
| // <max_mem_limit, expected_num_executors * num_cores_per_executor> |
| // in ascending order. Use exec_group_name_prefix to break the tie. |
| Collections.sort(result, new Comparator<TExecutorGroupSet>() { |
| @Override |
| public int compare(TExecutorGroupSet e1, TExecutorGroupSet e2) { |
| int i = Long.compare(e1.getMax_mem_limit(), e2.getMax_mem_limit()); |
| if (i == 0) { |
| i = Long.compare(expectedTotalCores(e1), expectedTotalCores(e2)); |
| if (i == 0) { |
| i = e1.getExec_group_name_prefix().compareTo(e2.getExec_group_name_prefix()); |
| } |
| } |
| return i; |
| } |
| }); |
| return result; |
| } |
| |
| // Only the following types of statements are considered auto scalable since each |
| // can be planned by the distributed planner utilizing the number of executors in |
| // an executor group as input. |
| public static boolean canStmtBeAutoScaled(TExecRequest req) { |
| return req.stmt_type == TStmtType.EXPLAIN || req.stmt_type == TStmtType.QUERY |
| || req.stmt_type == TStmtType.DML |
| || (req.stmt_type == TStmtType.DDL && req.query_exec_request != null |
| && req.query_exec_request.stmt_type == TStmtType.DML /* CTAS */); |
| } |
| |
| private static int expectedNumExecutor(TExecutorGroupSet execGroupSet) { |
| return execGroupSet.getCurr_num_executors() > 0 ? |
| execGroupSet.getCurr_num_executors() : |
| execGroupSet.getExpected_num_executors(); |
| } |
| |
| private static int expectedTotalCores(TExecutorGroupSet execGroupSet) { |
| return IntMath.saturatedMultiply( |
| expectedNumExecutor(execGroupSet), execGroupSet.getNum_cores_per_executor()); |
| } |
| |
| private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) |
| throws ImpalaException { |
| TQueryCtx queryCtx = planCtx.getQueryContext(); |
| LOG.info("Analyzing query: " + queryCtx.client_request.stmt + " db: " |
| + queryCtx.session.database); |
| |
| TQueryOptions queryOptions = queryCtx.client_request.getQuery_options(); |
| boolean enable_replan = queryOptions.isEnable_replan(); |
| final boolean clientSetRequestPool = queryOptions.isSetRequest_pool(); |
| Preconditions.checkState( |
| !clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty()); |
| |
| List<TExecutorGroupSet> originalExecutorGroupSets = |
| ExecutorMembershipSnapshot.getAllExecutorGroupSets(); |
| |
| LOG.info("The original executor group sets from executor membership snapshot: " |
| + originalExecutorGroupSets); |
| |
| boolean default_executor_group = false; |
| if (originalExecutorGroupSets.size() == 1) { |
| TExecutorGroupSet e = originalExecutorGroupSets.get(0); |
| default_executor_group = e.getExec_group_name_prefix() == null |
| || e.getExec_group_name_prefix().isEmpty(); |
| } |
| List<TExecutorGroupSet> executorGroupSetsToUse = |
| Frontend.setupThresholdsForExecutorGroupSets(originalExecutorGroupSets, |
| queryOptions.getRequest_pool(), default_executor_group, |
| enable_replan |
| && (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan())); |
| |
| int num_executor_group_sets = executorGroupSetsToUse.size(); |
| if (num_executor_group_sets == 0) { |
| throw new AnalysisException( |
| "No suitable executor group sets can be identified and used."); |
| } |
| LOG.info("A total of {} executor group sets to be considered for auto-scaling: " |
| + executorGroupSetsToUse, num_executor_group_sets); |
| |
| TExecRequest req = null; |
| |
| // Capture the current state. |
| planCtx.compilationState_.captureState(); |
| boolean isComputeCost = queryOptions.isCompute_processing_cost(); |
| |
| double cpuCountDivisor = BackendConfig.INSTANCE.getQueryCpuCountDivisor(); |
| if (isComputeCost) { |
| if (queryOptions.isSetQuery_cpu_count_divisor()) { |
| cpuCountDivisor = queryOptions.getQuery_cpu_count_divisor(); |
| } |
| FrontendProfile.getCurrent().setToCounter(CPU_COUNT_DIVISOR, TUnit.DOUBLE_VALUE, |
| Double.doubleToLongBits(cpuCountDivisor)); |
| } |
| |
| TExecutorGroupSet group_set = null; |
| String reason = "Unknown"; |
| int attempt = 0; |
| int lastExecutorGroupTotalCores = |
| expectedTotalCores(executorGroupSetsToUse.get(num_executor_group_sets - 1)); |
| int i = 0; |
| while (i < num_executor_group_sets) { |
| group_set = executorGroupSetsToUse.get(i); |
| planCtx.compilationState_.setGroupSet(group_set); |
| if (isComputeCost) { |
| planCtx.compilationState_.setAvailableCoresPerNode( |
| Math.max(queryOptions.getProcessing_cost_min_threads(), |
| lastExecutorGroupTotalCores / expectedNumExecutor(group_set))); |
| } else { |
| planCtx.compilationState_.setAvailableCoresPerNode(queryOptions.getMt_dop()); |
| } |
| LOG.info("Consider executor group set: " + group_set + " with assumption of " |
| + planCtx.compilationState_.getAvailableCoresPerNode() + " cores per node."); |
| |
| String retryMsg = ""; |
| while (true) { |
| try { |
| req = doCreateExecRequest(planCtx, timeline); |
| markTimelineRetries(attempt, retryMsg, timeline); |
| break; |
| } catch (InconsistentMetadataFetchException e) { |
| if (attempt++ == INCONSISTENT_METADATA_NUM_RETRIES) { |
| markTimelineRetries(attempt, e.getMessage(), timeline); |
| throw e; |
| } |
| planCtx.compilationState_.disableStmtCacheAndReauthorize(); |
| if (attempt > 1) { |
| // Back-off a bit on later retries. |
| Uninterruptibles.sleepUninterruptibly(200 * attempt, TimeUnit.MILLISECONDS); |
| } |
| retryMsg = e.getMessage(); |
| LOG.warn("Retrying plan of query {}: {} (retry #{} of {})", |
| queryCtx.client_request.stmt, retryMsg, attempt, |
| INCONSISTENT_METADATA_NUM_RETRIES); |
| } |
| } |
| |
| // Counters about this group set. |
| int available_cores = expectedTotalCores(group_set); |
| String profileName = "Executor group " + (i + 1); |
| if (group_set.isSetExec_group_name_prefix() |
| && !group_set.getExec_group_name_prefix().isEmpty()) { |
| profileName += " (" + group_set.getExec_group_name_prefix() + ")"; |
| } |
| TRuntimeProfileNode groupSetProfile = createTRuntimeProfileNode(profileName); |
| addCounter(groupSetProfile, |
| new TCounter(MEMORY_MAX, TUnit.BYTES, |
| LongMath.saturatedMultiply( |
| expectedNumExecutor(group_set), group_set.getMax_mem_limit()))); |
| if (isComputeCost) { |
| addCounter(groupSetProfile, new TCounter(CPU_MAX, TUnit.UNIT, available_cores)); |
| } |
| |
| // If it is for a single node plan, enable_replan is disabled, or it is not a query |
| // that can be auto scaled, return the 1st plan generated. |
| boolean notScalable = false; |
| if (queryOptions.num_nodes == 1) { |
| reason = "the number of nodes is 1"; |
| notScalable = true; |
| } else if (!enable_replan) { |
| reason = "query option ENABLE_REPLAN=false"; |
| notScalable = true; |
| } else if (!Frontend.canStmtBeAutoScaled(req)) { |
| reason = "query is not auto-scalable"; |
| notScalable = true; |
| } |
| |
| if (notScalable) { |
| setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set); |
| addInfoString( |
| groupSetProfile, VERDICT, "Assign to first group because " + reason); |
| FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); |
| break; |
| } |
| |
| // Find out the per host memory estimated from two possible sources. |
| long per_host_mem_estimate = -1; |
| int cores_requirement = -1; |
| if (req.query_exec_request != null) { |
| // For non-explain queries |
| per_host_mem_estimate = req.query_exec_request.per_host_mem_estimate; |
| cores_requirement = req.query_exec_request.getCores_required(); |
| } else { |
| // For explain queries |
| per_host_mem_estimate = planCtx.compilationState_.getEstimatedMemoryPerHost(); |
| cores_requirement = planCtx.compilationState_.getCoresRequired(); |
| } |
| |
| Preconditions.checkState(per_host_mem_estimate >= 0); |
| boolean memReqSatisfied = per_host_mem_estimate <= group_set.getMax_mem_limit(); |
| addCounter(groupSetProfile, |
| new TCounter(MEMORY_ASK, TUnit.BYTES, |
| LongMath.saturatedMultiply( |
| expectedNumExecutor(group_set), per_host_mem_estimate))); |
| |
| boolean cpuReqSatisfied = true; |
| int scaled_cores_requirement = -1; |
| if (isComputeCost) { |
| Preconditions.checkState(cores_requirement > 0); |
| if (queryOptions.getProcessing_cost_min_threads() |
| > queryOptions.getMax_fragment_instances_per_node()) { |
| throw new AnalysisException( |
| TImpalaQueryOptions.PROCESSING_COST_MIN_THREADS.name() + " (" |
| + queryOptions.getProcessing_cost_min_threads() |
| + ") can not be larger than " |
| + TImpalaQueryOptions.MAX_FRAGMENT_INSTANCES_PER_NODE.name() + " (" |
| + queryOptions.getMax_fragment_instances_per_node() + ")."); |
| } |
| |
| scaled_cores_requirement = (int) Math.min( |
| Integer.MAX_VALUE, Math.ceil(cores_requirement / cpuCountDivisor)); |
| cpuReqSatisfied = scaled_cores_requirement <= available_cores; |
| |
| addCounter( |
| groupSetProfile, new TCounter(CPU_ASK, TUnit.UNIT, scaled_cores_requirement)); |
| addCounter(groupSetProfile, |
| new TCounter(EFFECTIVE_PARALLELISM, TUnit.UNIT, cores_requirement)); |
| } |
| |
| boolean matchFound = false; |
| if (clientSetRequestPool) { |
| if (!default_executor_group) { |
| Preconditions.checkState(group_set.getExec_group_name_prefix().endsWith( |
| queryOptions.getRequest_pool())); |
| } |
| reason = "query option REQUEST_POOL=" + queryOptions.getRequest_pool() |
| + " is set. Memory and cpu limit checking is skipped."; |
| addInfoString(groupSetProfile, VERDICT, reason); |
| matchFound = true; |
| } else if (memReqSatisfied && cpuReqSatisfied) { |
| reason = "suitable group found (estimated per-host memory=" |
| + PrintUtils.printBytes(per_host_mem_estimate) |
| + ", estimated cpu cores required=" + cores_requirement |
| + ", scaled cpu cores=" + scaled_cores_requirement + ")"; |
| addInfoString(groupSetProfile, VERDICT, "Match"); |
| matchFound = true; |
| } |
| |
| if (!matchFound && (i >= num_executor_group_sets - 1) |
| && BackendConfig.INSTANCE.isSkipResourceCheckingOnLastExecutorGroupSet()) { |
| reason = "no executor group set fit. Admit to last executor group set."; |
| addInfoString(groupSetProfile, VERDICT, reason); |
| matchFound = true; |
| } |
| |
| // Append this exec group set profile node. |
| FrontendProfile.getCurrent().addChildrenProfile(groupSetProfile); |
| |
| if (matchFound) { |
| setGroupNamePrefix(default_executor_group, clientSetRequestPool, req, group_set); |
| break; |
| } |
| |
| List<String> verdicts = Lists.newArrayListWithCapacity(2); |
| List<String> reasons = Lists.newArrayListWithCapacity(2); |
| if (!memReqSatisfied) { |
| String verdict = "not enough per-host memory"; |
| verdicts.add(verdict); |
| reasons.add(verdict + " (require=" + per_host_mem_estimate |
| + ", max=" + group_set.getMax_mem_limit() + ")"); |
| } |
| if (!cpuReqSatisfied) { |
| String verdict = "not enough cpu cores"; |
| verdicts.add(verdict); |
| reasons.add(verdict + " (require=" + scaled_cores_requirement |
| + ", max=" + available_cores + ")"); |
| } |
| reason = String.join(", ", reasons); |
| addInfoString(groupSetProfile, VERDICT, String.join(", ", verdicts)); |
| group_set = null; |
| |
| // Restore to the captured state. |
| planCtx.compilationState_.restoreState(); |
| FrontendProfile.getCurrent().addToCounter( |
| EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1); |
| i++; |
| } |
| |
| if (group_set == null) { |
| if (reason.equals("Unknown")) { |
| throw new AnalysisException("The query does not fit any executor group sets."); |
| } else { |
| throw new AnalysisException( |
| "The query does not fit largest executor group sets. Reason: " + reason |
| + "."); |
| } |
| } else { |
| // This group_set is a match. |
| FrontendProfile.getCurrent().addToCounter( |
| EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1); |
| } |
| |
| LOG.info("Selected executor group: " + group_set + ", reason: " + reason); |
| |
| // Transfer the profile access flag which is collected during 1st compilation. |
| req.setUser_has_profile_access(planCtx.compilationState_.userHasProfileAccess()); |
| |
| return req; |
| } |
| |
| private static void setGroupNamePrefix(boolean default_executor_group, |
| boolean clientSetRequestPool, TExecRequest req, TExecutorGroupSet group_set) { |
| // Set the group name prefix in both the returned query options and |
| // the query context for non default group setup. |
| if (!default_executor_group) { |
| String namePrefix = group_set.getExec_group_name_prefix(); |
| req.query_options.setRequest_pool(namePrefix); |
| req.setRequest_pool_set_by_frontend(!clientSetRequestPool); |
| if (req.query_exec_request != null) { |
| req.query_exec_request.query_ctx.setRequest_pool(namePrefix); |
| } |
| } |
| } |
| |
| public static TRuntimeProfileNode createTRuntimeProfileNode( |
| String childrenProfileName) { |
| return new TRuntimeProfileNode(childrenProfileName, |
| /*num_children=*/0, |
| /*counters=*/new ArrayList<>(), |
| /*metadata=*/-1L, |
| /*indent=*/true, |
| /*info_strings=*/new HashMap<>(), |
| /*info_strings_display_order*/ new ArrayList<>(), |
| /*child_counters_map=*/ImmutableMap.of("", new HashSet<>())); |
| } |
| |
| /** |
| * Add counter into node profile. |
| * <p> |
| * Caller must make sure that there is no other counter existing in node profile that |
| * share the same counter name. |
| */ |
| private static void addCounter(TRuntimeProfileNode node, TCounter counter) { |
| Preconditions.checkNotNull(node.child_counters_map.get("")); |
| node.addToCounters(counter); |
| node.child_counters_map.get("").add(counter.getName()); |
| } |
| |
| private static void addInfoString(TRuntimeProfileNode node, String key, String value) { |
| if (node.getInfo_strings().put(key, value) == null) { |
| node.getInfo_strings_display_order().add(key); |
| } |
| } |
| |
| private TExecRequest doCreateExecRequest(PlanCtx planCtx, |
| EventSequence timeline) throws ImpalaException { |
| TQueryCtx queryCtx = planCtx.getQueryContext(); |
| // Parse stmt and collect/load metadata to populate a stmt-local table cache |
| StatementBase stmt = Parser.parse( |
| queryCtx.client_request.stmt, queryCtx.client_request.query_options); |
| User user = new User(TSessionStateUtil.getEffectiveUser(queryCtx.session)); |
| StmtMetadataLoader metadataLoader = new StmtMetadataLoader( |
| this, queryCtx.session.database, timeline, user, queryCtx.getQuery_id()); |
| //TODO (IMPALA-8788): should load table write ids in transaction context. |
| StmtTableCache stmtTableCache = metadataLoader.loadTables(stmt); |
| if (queryCtx.client_request.query_options.isSetDebug_action()) { |
| DebugUtils.executeDebugAction( |
| queryCtx.client_request.query_options.getDebug_action(), |
| DebugUtils.LOAD_TABLES_DELAY); |
| } |
| |
| // Add referenced tables to frontend profile |
| FrontendProfile.getCurrent().addInfoString("Referenced Tables", |
| stmtTableCache.tables.keySet() |
| .stream() |
| .map(TableName::toString) |
| .collect(Collectors.joining(", "))); |
| |
| // Analyze and authorize stmt |
| AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzFactory_, timeline); |
| AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(stmt, stmtTableCache, |
| authzChecker_.get(), planCtx.compilationState_.disableAuthorization()); |
| if (!planCtx.compilationState_.disableAuthorization()) { |
| LOG.info("Analysis and authorization finished."); |
| planCtx.compilationState_.setUserHasProfileAccess( |
| analysisResult.userHasProfileAccess()); |
| } else { |
| LOG.info("Analysis finished."); |
| } |
| Preconditions.checkNotNull(analysisResult.getStmt()); |
| TExecRequest result = createBaseExecRequest(queryCtx, analysisResult); |
| |
| // Transfer the expected number of executors in executor group set to |
| // analyzer's global state. The info is needed to compute the number of nodes to be |
| // used during planner phase for scans (see HdfsScanNode.computeNumNodes()). |
| analysisResult.getAnalyzer().setNumExecutorsForPlanning( |
| expectedNumExecutor(planCtx.compilationState_.getGroupSet())); |
| analysisResult.getAnalyzer().setAvailableCoresPerNode( |
| Math.max(1, planCtx.compilationState_.getAvailableCoresPerNode())); |
| |
| try { |
| TQueryOptions queryOptions = queryCtx.client_request.query_options; |
| if (analysisResult.isCatalogOp()) { |
| result.stmt_type = TStmtType.DDL; |
| createCatalogOpRequest(analysisResult, result); |
| TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph(); |
| if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) { |
| result.catalog_op_request.setLineage_graph(thriftLineageGraph); |
| } |
| setMtDopForCatalogOp(analysisResult, queryOptions); |
| // All DDL operations except for CTAS are done with analysis at this point. |
| if (!analysisResult.isCreateTableAsSelectStmt()) { |
| return result; |
| } |
| } |
| if (!analysisResult.isExplainStmt() && |
| (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())) { |
| InsertStmt insertStmt = analysisResult.getInsertStmt(); |
| FeTable targetTable = insertStmt.getTargetTable(); |
| if (AcidUtils.isTransactionalTable( |
| targetTable.getMetaStoreTable().getParameters())) { |
| |
| if (planCtx.compilationState_.getWriteId() == -1) { |
| // 1st time compilation. Open a transaction and save the writeId. |
| long txnId = openTransaction(queryCtx); |
| timeline.markEvent("Transaction opened (" + String.valueOf(txnId) + ")"); |
| Collection<FeTable> tables = stmtTableCache.tables.values(); |
| String staticPartitionTarget = null; |
| if (insertStmt.isStaticPartitionTarget()) { |
| staticPartitionTarget = FeCatalogUtils.getPartitionName( |
| insertStmt.getPartitionKeyValues()); |
| } |
| long writeId = allocateWriteId(queryCtx, targetTable); |
| insertStmt.setWriteId(writeId); |
| createLockForInsert(txnId, tables, targetTable, insertStmt.isOverwrite(), |
| staticPartitionTarget, queryOptions); |
| |
| planCtx.compilationState_.setWriteId(writeId); |
| } else { |
| // Continue the transaction by reusing the writeId. |
| insertStmt.setWriteId(planCtx.compilationState_.getWriteId()); |
| } |
| } |
| } else if (analysisResult.isLoadDataStmt()) { |
| result.stmt_type = TStmtType.LOAD; |
| result.setResult_set_metadata(new TResultSetMetadata( |
| Collections.singletonList(new TColumn("summary", Type.STRING.toThrift())))); |
| result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift()); |
| return result; |
| } else if (analysisResult.isSetStmt()) { |
| result.stmt_type = TStmtType.SET; |
| result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList( |
| new TColumn("option", Type.STRING.toThrift()), |
| new TColumn("value", Type.STRING.toThrift()), |
| new TColumn("level", Type.STRING.toThrift())))); |
| result.setSet_query_option_request(analysisResult.getSetStmt().toThrift()); |
| return result; |
| } else if (analysisResult.isAdminFnStmt()) { |
| result.stmt_type = TStmtType.ADMIN_FN; |
| result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList( |
| new TColumn("summary", Type.STRING.toThrift())))); |
| result.setAdmin_request(analysisResult.getAdminFnStmt().toThrift()); |
| return result; |
| } else if (analysisResult.isConvertTableToIcebergStmt()) { |
| result.stmt_type = TStmtType.CONVERT; |
| result.setResult_set_metadata(new TResultSetMetadata( |
| Collections.singletonList(new TColumn("summary", Type.STRING.toThrift())))); |
| result.setConvert_table_request( |
| analysisResult.getConvertTableToIcebergStmt().toThrift()); |
| return result; |
| } else if (analysisResult.isTestCaseStmt()) { |
| CopyTestCaseStmt testCaseStmt = ((CopyTestCaseStmt) stmt); |
| if (testCaseStmt.isTestCaseExport()) { |
| result.setStmt_type(TStmtType.TESTCASE); |
| result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList( |
| new TColumn("Test case data output path", Type.STRING.toThrift())))); |
| result.setTestcase_data_path(testCaseStmt.writeTestCaseData()); |
| } else { |
| // Mimic it as a DDL. |
| result.setStmt_type(TStmtType.DDL); |
| createCatalogOpRequest(analysisResult, result); |
| } |
| return result; |
| } |
| |
| // Open or continue Kudu transaction if Kudu transaction is enabled and target table |
| // is Kudu table. |
| if (!analysisResult.isExplainStmt() && queryOptions.isEnable_kudu_transaction()) { |
| if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt()) |
| && analysisResult.getInsertStmt().isTargetTableKuduTable()) { |
| // For INSERT/UPSERT/CTAS statements. |
| openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult, |
| analysisResult.getInsertStmt().getTargetTable(), timeline); |
| } else if (analysisResult.isUpdateStmt() |
| && analysisResult.getUpdateStmt().isTargetTableKuduTable()) { |
| // For UPDATE statement. |
| openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult, |
| analysisResult.getUpdateStmt().getTargetTable(), timeline); |
| } else if (analysisResult.isDeleteStmt() |
| && analysisResult.getDeleteStmt().isTargetTableKuduTable()) { |
| // For DELETE statement. |
| openOrContinueKuduTransaction(planCtx, queryCtx, analysisResult, |
| analysisResult.getDeleteStmt().getTargetTable(), timeline); |
| } |
| } |
| |
| // If unset, set MT_DOP to 0 to simplify the rest of the code. |
| if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0); |
| |
| // create TQueryExecRequest |
| TQueryExecRequest queryExecRequest = |
| getPlannedExecRequest(planCtx, analysisResult, timeline); |
| |
| TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph(); |
| if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) { |
| queryExecRequest.setLineage_graph(thriftLineageGraph); |
| } |
| |
| // Override the per_host_mem_estimate sent to the backend if needed. The explain |
| // string is already generated at this point so this does not change the estimate |
| // shown in the plan. |
| checkAndOverrideMemEstimate(queryExecRequest, queryOptions); |
| |
| if (analysisResult.isExplainStmt()) { |
| // Return the EXPLAIN request |
| createExplainRequest(planCtx.getExplainString(), result); |
| return result; |
| } |
| |
| result.setQuery_exec_request(queryExecRequest); |
| if (analysisResult.isQueryStmt()) { |
| result.stmt_type = TStmtType.QUERY; |
| result.query_exec_request.stmt_type = result.stmt_type; |
| // fill in the metadata |
| result.setResult_set_metadata(createQueryResultSetMetadata(analysisResult)); |
| } else if (analysisResult.isInsertStmt() || |
| analysisResult.isCreateTableAsSelectStmt()) { |
| // For CTAS the overall TExecRequest statement type is DDL, but the |
| // query_exec_request should be DML |
| result.stmt_type = |
| analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML; |
| result.query_exec_request.stmt_type = TStmtType.DML; |
| // create finalization params of insert stmt |
| addFinalizationParamsForInsert( |
| queryCtx, queryExecRequest, analysisResult.getInsertStmt()); |
| } else { |
| Preconditions.checkState( |
| analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt() || |
| analysisResult.isOptimizeStmt()); |
| result.stmt_type = TStmtType.DML; |
| result.query_exec_request.stmt_type = TStmtType.DML; |
| if (analysisResult.isDeleteStmt()) { |
| addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest, |
| analysisResult.getDeleteStmt(), TIcebergOperation.DELETE); |
| } else if (analysisResult.isUpdateStmt()) { |
| addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest, |
| analysisResult.getUpdateStmt(), TIcebergOperation.UPDATE); |
| } else if (analysisResult.isOptimizeStmt()) { |
| addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest, |
| analysisResult.getOptimizeStmt(), TIcebergOperation.OPTIMIZE); |
| } |
| } |
| return result; |
| } catch (Exception e) { |
| if (queryCtx.isSetTransaction_id()) { |
| try { |
| planCtx.compilationState_.setWriteId(-1); |
| abortTransaction(queryCtx.getTransaction_id()); |
| timeline.markEvent("Transaction aborted"); |
| } catch (TransactionException te) { |
| LOG.error("Could not abort transaction because: " + te.getMessage()); |
| } |
| } else if (queryCtx.isIs_kudu_transactional()) { |
| try { |
| planCtx.compilationState_.setKuduTransactionToken(null); |
| abortKuduTransaction(queryCtx.getQuery_id()); |
| timeline.markEvent( |
| "Kudu transaction aborted: " + queryCtx.getQuery_id().toString()); |
| } catch (TransactionException te) { |
| LOG.error("Could not abort transaction because: " + te.getMessage()); |
| } |
| } |
| throw e; |
| } |
| } |
| |
| /** |
| * Set MT_DOP based on the analysis result |
| */ |
| private static void setMtDopForCatalogOp( |
| AnalysisResult analysisResult, TQueryOptions queryOptions) { |
| // Set MT_DOP=4 for COMPUTE STATS, unless the user has already provided another |
| // value for MT_DOP. |
| if (!queryOptions.isSetMt_dop() && analysisResult.isComputeStatsStmt()) { |
| queryOptions.setMt_dop(4); |
| } |
| // If unset, set MT_DOP to 0 to simplify the rest of the code. |
| if (!queryOptions.isSetMt_dop()) queryOptions.setMt_dop(0); |
| } |
| |
| /** |
| * Create the TExecRequest and initialize it |
| */ |
| private static TExecRequest createBaseExecRequest( |
| TQueryCtx queryCtx, AnalysisResult analysisResult) { |
| TExecRequest result = new TExecRequest(); |
| result.setQuery_options(queryCtx.client_request.getQuery_options()); |
| result.setAccess_events(Lists.newArrayList(analysisResult.getAccessEvents())); |
| result.analysis_warnings = analysisResult.getAnalyzer().getWarnings(); |
| result.setUser_has_profile_access(analysisResult.userHasProfileAccess()); |
| return result; |
| } |
| |
| /** |
| * Add the finalize params for an insert statement to the queryExecRequest |
| */ |
| private static void addFinalizationParamsForInsert( |
| TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, InsertStmt insertStmt) { |
| FeTable targetTable = insertStmt.getTargetTable(); |
| addFinalizationParamsForInsert(queryCtx, queryExecRequest, |
| targetTable, insertStmt.getWriteId(), insertStmt.isOverwrite()); |
| } |
| |
| /** |
| * Add the finalize params to the queryExecRequest for a non-INSERT Iceberg DML |
| * statement: DELETE, UPDATE and OPTIMIZE. |
| */ |
| private static void addFinalizationParamsForIcebergModify(TQueryCtx queryCtx, |
| TQueryExecRequest queryExecRequest, DmlStatementBase dmlStmt, |
| TIcebergOperation iceOperation) { |
| Preconditions.checkState(!(dmlStmt instanceof InsertStmt)); |
| FeTable targetTable = dmlStmt.getTargetTable(); |
| if (!(targetTable instanceof FeIcebergTable)) return; |
| if (iceOperation == TIcebergOperation.DELETE) { |
| Preconditions.checkState(targetTable instanceof IcebergPositionDeleteTable); |
| targetTable = ((IcebergPositionDeleteTable)targetTable).getBaseTable(); |
| } |
| TFinalizeParams finalizeParams = addFinalizationParamsForDml( |
| queryCtx, targetTable, false); |
| TIcebergDmlFinalizeParams iceFinalizeParams = |
| addFinalizationParamsForIcebergDml((FeIcebergTable)targetTable, iceOperation); |
| finalizeParams.setIceberg_params(iceFinalizeParams); |
| queryExecRequest.setFinalize_params(finalizeParams); |
| } |
| |
| private static TIcebergDmlFinalizeParams addFinalizationParamsForIcebergDml( |
| FeIcebergTable iceTable, TIcebergOperation iceOperation) { |
| TIcebergDmlFinalizeParams iceFinalizeParams = new TIcebergDmlFinalizeParams(); |
| iceFinalizeParams.operation = iceOperation; |
| iceFinalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId()); |
| iceFinalizeParams.setInitial_snapshot_id(iceTable.snapshotId()); |
| return iceFinalizeParams; |
| } |
| |
| // This is public to allow external frontends to utilize this method to fill in the |
| // finalization parameters for externally generated INSERTs. |
| public static void addFinalizationParamsForInsert( |
| TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, FeTable targetTable, |
| long writeId, boolean isOverwrite) { |
| TFinalizeParams finalizeParams = addFinalizationParamsForDml( |
| queryCtx, targetTable, isOverwrite); |
| if (targetTable instanceof FeFsTable) { |
| if (writeId != -1) { |
| Preconditions.checkState(queryCtx.isSetTransaction_id()); |
| finalizeParams.setTransaction_id(queryCtx.getTransaction_id()); |
| finalizeParams.setWrite_id(writeId); |
| } else if (targetTable instanceof FeIcebergTable) { |
| FeIcebergTable iceTable = (FeIcebergTable)targetTable; |
| TIcebergDmlFinalizeParams iceFinalizeParams = |
| addFinalizationParamsForIcebergDml(iceTable, TIcebergOperation.INSERT); |
| finalizeParams.setIceberg_params(iceFinalizeParams); |
| } else { |
| // TODO: Currently this flag only controls the removal of the query-level staging |
| // directory. HdfsTableSink (that creates the staging dir) calculates the path |
| // independently. So it'd be better to either remove this option, or make it used |
| // everywhere where the staging directory is referenced. |
| FeFsTable hdfsTable = (FeFsTable) targetTable; |
| finalizeParams.setStaging_dir( |
| hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging"); |
| } |
| queryExecRequest.setFinalize_params(finalizeParams); |
| } |
| } |
| |
| private static TFinalizeParams addFinalizationParamsForDml(TQueryCtx queryCtx, |
| FeTable targetTable, boolean isOverwrite) { |
| TFinalizeParams finalizeParams = new TFinalizeParams(); |
| if (targetTable instanceof FeFsTable) { |
| finalizeParams.setIs_overwrite(isOverwrite); |
| finalizeParams.setTable_name(targetTable.getTableName().getTbl()); |
| finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID); |
| String db = targetTable.getTableName().getDb(); |
| finalizeParams.setTable_db(db == null ? queryCtx.session.database : db); |
| FeFsTable hdfsTable = (FeFsTable) targetTable; |
| finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir()); |
| } |
| return finalizeParams; |
| } |
| |
| /** |
| * Add the metadata for the result set |
| */ |
| private static TResultSetMetadata createQueryResultSetMetadata( |
| AnalysisResult analysisResult) { |
| LOG.trace("create result set metadata"); |
| TResultSetMetadata metadata = new TResultSetMetadata(); |
| QueryStmt queryStmt = analysisResult.getQueryStmt(); |
| int colCnt = queryStmt.getColLabels().size(); |
| for (int i = 0; i < colCnt; ++i) { |
| TColumn colDesc = new TColumn(); |
| colDesc.columnName = queryStmt.getColLabels().get(i); |
| colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift(); |
| metadata.addToColumns(colDesc); |
| } |
| return metadata; |
| } |
| |
| /** |
| * Get the TQueryExecRequest and use it to populate the query context |
| */ |
| private TQueryExecRequest getPlannedExecRequest(PlanCtx planCtx, |
| AnalysisResult analysisResult, EventSequence timeline) |
| throws ImpalaException { |
| Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt() |
| || analysisResult.isCreateTableAsSelectStmt()); |
| TQueryCtx queryCtx = planCtx.getQueryContext(); |
| Planner planner = new Planner(analysisResult, queryCtx, timeline); |
| TQueryExecRequest queryExecRequest = createExecRequest(planner, planCtx); |
| if (planCtx.serializeDescTbl()) { |
| queryCtx.setDesc_tbl_serialized( |
| planner.getAnalysisResult().getAnalyzer().getDescTbl().toSerializedThrift()); |
| } else { |
| queryCtx.setDesc_tbl_testonly( |
| planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift()); |
| } |
| queryExecRequest.setQuery_ctx(queryCtx); |
| queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList()); |
| return queryExecRequest; |
| } |
| |
| /** |
| * The MAX_MEM_ESTIMATE_FOR_ADMISSION query option can override the planner memory |
| * estimate if set. Sets queryOptions.per_host_mem_estimate if the override is |
| * effective. |
| */ |
| private void checkAndOverrideMemEstimate(TQueryExecRequest queryExecRequest, |
| TQueryOptions queryOptions) { |
| if (queryOptions.isSetMax_mem_estimate_for_admission() |
| && queryOptions.getMax_mem_estimate_for_admission() > 0) { |
| long effectivePerHostMemEstimate = Math.min( |
| queryExecRequest.getPer_host_mem_estimate(), |
| queryOptions.getMax_mem_estimate_for_admission()); |
| queryExecRequest.setPer_host_mem_estimate(effectivePerHostMemEstimate); |
| long effectiveCoordinatorMemEstimate = Math.min( |
| queryExecRequest.getDedicated_coord_mem_estimate(), |
| queryOptions.getMax_mem_estimate_for_admission()); |
| queryExecRequest.setDedicated_coord_mem_estimate(effectiveCoordinatorMemEstimate); |
| } |
| } |
| |
| /** |
| * Attaches the explain result to the TExecRequest. |
| */ |
| private void createExplainRequest(String explainString, TExecRequest result) { |
| // update the metadata - one string column |
| TColumn colDesc = new TColumn("Explain String", Type.STRING.toThrift()); |
| TResultSetMetadata metadata = new TResultSetMetadata(Lists.newArrayList(colDesc)); |
| result.setResult_set_metadata(metadata); |
| |
| // create the explain result set - split the explain string into one line per row |
| String[] explainStringArray = explainString.split("\n"); |
| TExplainResult explainResult = new TExplainResult(); |
| explainResult.results = Lists.newArrayList(); |
| for (int i = 0; i < explainStringArray.length; ++i) { |
| TColumnValue col = new TColumnValue(); |
| col.setString_val(explainStringArray[i]); |
| TResultRow row = new TResultRow(Lists.newArrayList(col)); |
| explainResult.results.add(row); |
| } |
| result.setExplain_result(explainResult); |
| result.stmt_type = TStmtType.EXPLAIN; |
| } |
| |
| /** |
| * Executes a HiveServer2 metadata operation and returns a TResultSet |
| */ |
| public TResultSet execHiveServer2MetadataOp(TMetadataOpRequest request) |
| throws ImpalaException { |
| User user = request.isSetSession() ? |
| new User(TSessionStateUtil.getEffectiveUser(request.session)) : |
| ImpalaInternalAdminUser.getInstance(); |
| switch (request.opcode) { |
| case GET_TYPE_INFO: return MetadataOp.getTypeInfo(); |
| case GET_SCHEMAS: return MetastoreShim.execGetSchemas(this, request, user); |
| case GET_TABLES: return MetastoreShim.execGetTables(this, request, user); |
| case GET_COLUMNS: return MetastoreShim.execGetColumns(this, request, user); |
| case GET_CATALOGS: return MetadataOp.getCatalogs(); |
| case GET_TABLE_TYPES: return MetadataOp.getTableTypes(); |
| case GET_FUNCTIONS: return MetastoreShim.execGetFunctions(this, request, user); |
| case GET_PRIMARY_KEYS: return MetadataOp.getPrimaryKeys(this, request, |
| user); |
| case GET_CROSS_REFERENCE: return MetadataOp.getCrossReference(this, |
| request, user); |
| default: |
| throw new NotImplementedException(request.opcode + " has not been implemented."); |
| } |
| } |
| |
| /** |
| * Returns all files info of a table or partition. |
| */ |
| public TResultSet getTableFiles(TShowFilesParams request) |
| throws ImpalaException { |
| TTableName tableName = request.getTable_name(); |
| RetryTracker retries = new RetryTracker( |
| String.format("getting table files %s.%s", tableName.db_name, |
| tableName.table_name)); |
| while (true) { |
| try { |
| return doGetTableFiles(request); |
| } catch(InconsistentMetadataFetchException e) { |
| retries.handleRetryOrThrow(e); |
| } |
| } |
| } |
| |
| private TResultSet doGetTableFiles(TShowFilesParams request) |
| throws ImpalaException{ |
| FeTable table = getCatalog().getTable(request.getTable_name().getDb_name(), |
| request.getTable_name().getTable_name()); |
| if (table instanceof FeFsTable) { |
| return FeFsTable.Utils.getFiles((FeFsTable)table, request.getPartition_set()); |
| } else { |
| throw new InternalException("SHOW FILES only supports Hdfs table. " + |
| "Unsupported table class: " + table.getClass()); |
| } |
| } |
| |
| /** |
| * Executes the {@link QueryEventHook#onQueryComplete(QueryCompleteContext)} |
| * execution hooks for each hook registered in this instance's |
| * {@link QueryEventHookManager}. |
| * |
| * <h3>Service Guarantees</h3> |
| * |
| * Impala makes the following guarantees about how this method executes hooks: |
| * |
| * <h4>Hooks are executed asynchronously</h4> |
| * |
| * All hook execution happens asynchronously of the query that triggered |
| * them. Hooks may still be executing after the query response has returned |
| * to the caller. Additionally, hooks may execute concurrently if the |
| * hook executor thread size is configured appropriately. |
| * |
| * <h4>Hook Invocation is in Configuration Order</h4> |
| * |
| * The <i>submission</i> of the hook execution tasks occurs in the order |
| * that the hooks were defined in configuration. This generally means that |
| * hooks will <i>start</i> executing in order, but there are no guarantees |
| * about finishing order. |
| * <p> |
| * For example, if configured with {@code query_event_hook_classes=hook1,hook2,hook3}, |
| * then hook1 will start before hook2, and hook2 will start before hook3. |
| * If you need to guarantee that hook1 <i>completes</i> before hook2 starts, then |
| * you should specify {@code query_event_hook_nthreads=1} for serial hook |
| * execution. |
| * </p> |
| * |
| * <h4>Hook Execution Blocks</h4> |
| * |
| * A hook will block the thread it executes on until it completes. If a hook hangs, |
| * then the thread also hangs. Impala (currently) will not check for hanging hooks to |
| * take any action. This means that if you have {@code query_event_hook_nthreads} |
| * less than the number of hooks, then 1 hook may effectively block others from |
| * executing. |
| * |
| * <h4>Hook Exceptions are non-fatal</h4> |
| * |
| * Any exception thrown from this hook method will be logged and ignored. Therefore, |
| * an exception in 1 hook will not affect another hook (when no shared resources are |
| * involved). |
| * |
| * <h4>Hook Execution may end abruptly at Impala shutdown</h4> |
| * |
| * If a hook is still executing when Impala is shutdown, there are no guarantees |
| * that it will complete execution before being killed. |
| * |
| * @see QueryCompleteContext |
| * @see QueryEventHookManager |
| * |
| * @param context the execution context of the query |
| */ |
| public void callQueryCompleteHooks(QueryCompleteContext context) { |
| // TODO (IMPALA-8571): can we make use of the futures to implement better |
| // error-handling? Currently, the queryHookManager simply |
| // logs-then-rethrows any exception thrown from a hook.postQueryExecute |
| final List<Future<QueryEventHook>> futures |
| = this.queryHookManager_.executeQueryCompleteHooks(context); |
| } |
| |
| /** |
| * Adds a transaction to the keepalive object. |
| * @param queryCtx context that the transaction is associated with |
| * @throws TransactionException |
| */ |
| public void addTransaction(TQueryCtx queryCtx) throws TransactionException { |
| Preconditions.checkState(queryCtx.isSetTransaction_id()); |
| long transactionId = queryCtx.getTransaction_id(); |
| HeartbeatContext ctx = new HeartbeatContext(queryCtx, System.nanoTime()); |
| transactionKeepalive_.addTransaction(transactionId, ctx); |
| LOG.info("Opened transaction: " + Long.toString(transactionId)); |
| } |
| |
| /** |
| * Opens a new transaction and registers it to the keepalive object. |
| * @param queryCtx context of the query that requires the transaction. |
| * @return the transaction id. |
| * @throws TransactionException |
| */ |
| private long openTransaction(TQueryCtx queryCtx) throws TransactionException { |
| try (MetaStoreClient client = metaStoreClientPool_.getClient()) { |
| IMetaStoreClient hmsClient = client.getHiveClient(); |
| queryCtx.setTransaction_id(MetastoreShim.openTransaction(hmsClient)); |
| addTransaction(queryCtx); |
| } |
| return queryCtx.getTransaction_id(); |
| } |
| |
| /** |
| * Aborts a transaction. |
| * @param txnId is the id of the transaction to abort. |
| * @throws TransactionException |
| * TODO: maybe we should make it async. |
| */ |
| public void abortTransaction(long txnId) throws TransactionException { |
| LOG.error("Aborting transaction: " + Long.toString(txnId)); |
| try (MetaStoreClient client = metaStoreClientPool_.getClient()) { |
| IMetaStoreClient hmsClient = client.getHiveClient(); |
| transactionKeepalive_.deleteTransaction(txnId); |
| MetastoreShim.abortTransaction(hmsClient, txnId); |
| } |
| } |
| |
| /** |
| * Unregisters an already committed transaction. |
| * @param txnId is the id of the transaction to clear. |
| */ |
| public void unregisterTransaction(long txnId) { |
| LOG.info("Unregistering already committed transaction: " + Long.toString(txnId)); |
| transactionKeepalive_.deleteTransaction(txnId); |
| } |
| |
| /** |
| * Allocates write id for transactional table. |
| * @param queryCtx the query context that contains the transaction id. |
| * @param table the target table of the write operation. |
| * @return the allocated write id |
| * @throws TransactionException |
| */ |
| private long allocateWriteId(TQueryCtx queryCtx, FeTable table) |
| throws TransactionException { |
| Preconditions.checkState(queryCtx.isSetTransaction_id()); |
| Preconditions.checkState(table instanceof FeFsTable); |
| Preconditions.checkState( |
| AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())); |
| try (MetaStoreClient client = metaStoreClientPool_.getClient()) { |
| IMetaStoreClient hmsClient = client.getHiveClient(); |
| long txnId = queryCtx.getTransaction_id(); |
| return MetastoreShim.allocateTableWriteId(hmsClient, txnId, table.getDb().getName(), |
| table.getName()); |
| } |
| } |
| |
| /** |
| * Creates Lock object for the insert statement. |
| * @param txnId the transaction id to be used. |
| * @param tables the tables in the query. |
| * @param targetTable the target table of INSERT. Must be transactional. |
| * @param isOverwrite true when the INSERT stmt is an INSERT OVERWRITE |
| * @param staticPartitionTarget the static partition target in case of static partition |
| * INSERT |
| * @param queryOptions the query options for this INSERT statement |
| * @throws TransactionException |
| */ |
| private void createLockForInsert(Long txnId, Collection<FeTable> tables, |
| FeTable targetTable, boolean isOverwrite, String staticPartitionTarget, |
| TQueryOptions queryOptions) |
| throws TransactionException { |
| Preconditions.checkState( |
| AcidUtils.isTransactionalTable(targetTable.getMetaStoreTable().getParameters())); |
| List<LockComponent> lockComponents = new ArrayList<>(tables.size()); |
| List<FeTable> lockTables = new ArrayList<>(tables); |
| if (!lockTables.contains(targetTable)) lockTables.add(targetTable); |
| for (FeTable table : lockTables) { |
| if (!AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) { |
| continue; |
| } |
| LockComponent lockComponent = new LockComponent(); |
| lockComponent.setDbname(table.getDb().getName()); |
| lockComponent.setTablename(table.getName()); |
| if (table == targetTable) { |
| if (isOverwrite) { |
| lockComponent.setType(LockType.EXCLUSIVE); |
| } else { |
| lockComponent.setType(LockType.SHARED_READ); |
| } |
| lockComponent.setOperationType(DataOperationType.INSERT); |
| if (staticPartitionTarget != null) { |
| lockComponent.setLevel(LockLevel.PARTITION); |
| lockComponent.setPartitionname(staticPartitionTarget); |
| } else { |
| lockComponent.setLevel(LockLevel.TABLE); |
| } |
| } else { |
| lockComponent.setLevel(LockLevel.TABLE); |
| lockComponent.setType(LockType.SHARED_READ); |
| lockComponent.setOperationType(DataOperationType.SELECT); |
| } |
| lockComponents.add(lockComponent); |
| } |
| try (MetaStoreClient client = metaStoreClientPool_.getClient()) { |
| IMetaStoreClient hmsClient = client.getHiveClient(); |
| MetastoreShim.acquireLock(hmsClient, txnId, lockComponents, |
| queryOptions.lock_max_wait_time_s); |
| } |
| } |
| |
| /** |
| * Opens or continue a Kudu transaction. |
| */ |
| private void openOrContinueKuduTransaction(PlanCtx planCtx, TQueryCtx queryCtx, |
| AnalysisResult analysisResult, FeTable targetTable, EventSequence timeline) |
| throws TransactionException { |
| |
| byte[] token = planCtx.compilationState_.getKuduTransactionToken(); |
| // Continue the transaction by reusing the token. |
| if (token != null) { |
| if (analysisResult.isUpdateStmt()) { |
| analysisResult.getUpdateStmt().setKuduTransactionToken(token); |
| } else if (analysisResult.isDeleteStmt()) { |
| analysisResult.getDeleteStmt().setKuduTransactionToken(token); |
| } else { |
| analysisResult.getInsertStmt().setKuduTransactionToken(token); |
| } |
| return; |
| } |
| |
| FeKuduTable kuduTable = (FeKuduTable) targetTable; |
| KuduClient client = KuduUtil.getKuduClient(kuduTable.getKuduMasterHosts()); |
| KuduTransaction txn = null; |
| try { |
| // Open Kudu transaction. |
| LOG.info("Open Kudu transaction: " + queryCtx.getQuery_id().toString()); |
| txn = client.newTransaction(); |
| timeline.markEvent( |
| "Kudu transaction opened with query id: " + queryCtx.getQuery_id().toString()); |
| token = txn.serialize(); |
| if (analysisResult.isUpdateStmt()) { |
| analysisResult.getUpdateStmt().setKuduTransactionToken(token); |
| } else if (analysisResult.isDeleteStmt()) { |
| analysisResult.getDeleteStmt().setKuduTransactionToken(token); |
| } else { |
| analysisResult.getInsertStmt().setKuduTransactionToken(token); |
| } |
| kuduTxnManager_.addTransaction(queryCtx.getQuery_id(), txn); |
| queryCtx.setIs_kudu_transactional(true); |
| |
| planCtx.compilationState_.setKuduTransactionToken(token); |
| |
| } catch (IOException e) { |
| if (txn != null) txn.close(); |
| throw new TransactionException(e.getMessage()); |
| } |
| } |
| |
| /** |
| * Aborts a Kudu transaction. |
| * @param queryId is the id of the query. |
| */ |
| public void abortKuduTransaction(TUniqueId queryId) throws TransactionException { |
| LOG.info("Abort Kudu transaction: " + queryId.toString()); |
| KuduTransaction txn = kuduTxnManager_.deleteTransaction(queryId); |
| Preconditions.checkNotNull(txn); |
| if (txn != null) { |
| try { |
| txn.rollback(); |
| } catch (KuduException e) { |
| throw new TransactionException(e.getMessage()); |
| } finally { |
| // Call KuduTransaction.close() explicitly to stop sending automatic |
| // keepalive messages by the 'txn' handle. |
| txn.close(); |
| } |
| } |
| } |
| |
| /** |
| * Commits a Kudu transaction. |
| * @param queryId is the id of the query. |
| */ |
| public void commitKuduTransaction(TUniqueId queryId) throws TransactionException { |
| LOG.info("Commit Kudu transaction: " + queryId.toString()); |
| KuduTransaction txn = kuduTxnManager_.deleteTransaction(queryId); |
| Preconditions.checkNotNull(txn); |
| if (txn != null) { |
| try { |
| txn.commit(); |
| } catch (KuduException e) { |
| throw new TransactionException(e.getMessage()); |
| } finally { |
| // Call KuduTransaction.close() explicitly to stop sending automatic |
| // keepalive messages by the 'txn' handle. |
| txn.close(); |
| } |
| } |
| } |
| } |