blob: 6874ca4df882ac92439bdb5ea146445e844ba06f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/query-exec-mgr.h"
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
#include <gflags/gflags.h>
#include <gperftools/malloc_extension.h>
#include <gutil/strings/substitute.h>
#include "common/compiler-util.h"
#include "common/logging.h"
#include "gen-cpp/Types_types.h"
#include "gen-cpp/control_service.pb.h"
#include "runtime/exec-env.h"
#include "runtime/query-state.h"
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
#include "util/network-util.h"
#include "util/thread-pool.h"
#include "common/names.h"
using namespace impala;
// TODO: this logging should go into a per query log.
DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
"every log_mem_usage_interval'th fragment completion.");
DEFINE_int32(query_exec_mgr_cancellation_thread_pool_size, 1,
"(Advanced) Size of the QueryExecMgr thread-pool processing cancellations due to "
"coordinator failure");
const uint32_t QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE = 65536;
QueryExecMgr::QueryExecMgr() {
// Initialise the cancellation thread pool with 1 thread (by default). The max queue
// size is deliberately set so high that it should never fill; if it does we fill the
// queue up to the maximum limit and ignore the rest. The ignored queries will get
// cancelled when they time out trying to send status reports.
cancellation_thread_pool_.reset(new ThreadPool<QueryCancellationTask>("query-exec-mgr",
"cancellation-worker", FLAGS_query_exec_mgr_cancellation_thread_pool_size,
QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE,
bind<void>(&QueryExecMgr::CancelFromThreadPool, this, _2)));
ABORT_IF_ERROR(cancellation_thread_pool_->Init());
}
QueryExecMgr::~QueryExecMgr() {}
Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
TUniqueId query_id = query_ctx.query_id;
VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
<< " coord=" << TNetworkAddressToString(query_ctx.coord_address);
bool dummy;
QueryState* qs =
GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
Status status = qs->Init(request, fragment_info);
if (!status.ok()) {
qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
ReleaseQueryState(qs);
return status;
}
// avoid blocking the rpc handler thread for too long by starting a new thread for
// query startup (which takes ownership of the QueryState reference)
unique_ptr<Thread> t;
status = Thread::Create("query-exec-mgr",
Substitute("query-state-$0", PrintId(query_id)),
&QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
if (!status.ok()) {
// decrement refcount taken in QueryState::Init()
qs->ReleaseBackendResourceRefcount();
// decrement refcount taken in GetOrCreateQueryState()
ReleaseQueryState(qs);
return status;
}
t->Detach();
return Status::OK();
}
QueryState* QueryExecMgr::CreateQueryState(
const TQueryCtx& query_ctx, int64_t mem_limit) {
bool created;
QueryState* qs = GetOrCreateQueryState(query_ctx, mem_limit, &created);
DCHECK(created);
return qs;
}
QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
QueryState* qs = nullptr;
int refcnt;
{
ScopedShardedMapRef<QueryState*> map_ref(query_id,
&ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_id);
if (it == map_ref->end()) return nullptr;
qs = it->second;
refcnt = qs->refcnt_.Add(1);
}
DCHECK(qs != nullptr && refcnt > 0);
VLOG_QUERY << "QueryState: query_id=" << PrintId(query_id) << " refcnt=" << refcnt;
return qs;
}
QueryState* QueryExecMgr::GetOrCreateQueryState(
const TQueryCtx& query_ctx, int64_t mem_limit, bool* created) {
QueryState* qs = nullptr;
int refcnt;
{
ScopedShardedMapRef<QueryState*> map_ref(query_ctx.query_id,
&ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_ctx.query_id);
if (it == map_ref->end()) {
// Register new QueryState. This marks when the query first starts executing on
// this backend.
ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTED->Increment(1);
ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->Increment(1);
qs = new QueryState(query_ctx, mem_limit);
map_ref->insert(make_pair(query_ctx.query_id, qs));
*created = true;
} else {
qs = it->second;
*created = false;
}
// decremented by ReleaseQueryState()
refcnt = qs->refcnt_.Add(1);
}
DCHECK(qs != nullptr && refcnt > 0);
return qs;
}
void QueryExecMgr::ExecuteQueryHelper(QueryState* qs) {
// Start the query fragment instances and wait for completion or errors.
if (LIKELY(qs->StartFInstances())) qs->MonitorFInstances();
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
// tcmalloc and address or thread sanitizer cannot be used together
if (FLAGS_log_mem_usage_interval > 0) {
uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->GetValue();
if (num_complete % FLAGS_log_mem_usage_interval == 0) {
char buf[2048];
// This outputs how much memory is currently being used by this impalad
MallocExtension::instance()->GetStats(buf, 2048);
LOG(INFO) << buf;
}
}
#endif
// decrement refcount taken in QueryState::Init();
qs->ReleaseBackendResourceRefcount();
// decrement refcount taken in StartQuery()
ReleaseQueryState(qs);
}
void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
DCHECK(qs != nullptr);
TUniqueId query_id = qs->query_id();
int32_t cnt = qs->refcnt_.Add(-1);
// don't reference anything from 'qs' beyond this point, 'qs' might get
// gc'd out from under us
qs = nullptr;
VLOG(2) << "ReleaseQueryState(): query_id=" << PrintId(query_id)
<< " refcnt=" << cnt + 1;
DCHECK_GE(cnt, 0);
if (cnt > 0) return;
QueryState* qs_from_map = nullptr;
{
ScopedShardedMapRef<QueryState*> map_ref(query_id,
&ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_id);
// someone else might have gc'd the entry
if (it == map_ref->end()) return;
qs_from_map = it->second;
DCHECK(qs_from_map->query_ctx().query_id == query_id);
int32_t cnt = qs_from_map->refcnt_.Load();
DCHECK_GE(cnt, 0);
// someone else might have increased the refcnt in the meantime
if (cnt > 0) return;
map_ref->erase(it);
}
delete qs_from_map;
VLOG(1) << "ReleaseQueryState(): deleted query_id=" << PrintId(query_id);
// BACKEND_NUM_QUERIES_EXECUTING is used to detect the backend being quiesced, so we
// decrement it after we're completely done with the query.
ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->Increment(-1);
}
void QueryExecMgr::AcquireQueryStateLocked(QueryState* qs) {
if (qs == nullptr) return;
int refcnt = qs->refcnt_.Add(1);
DCHECK(refcnt > 0);
}
void QueryExecMgr::CancelQueriesForFailedCoordinators(
const std::unordered_set<BackendIdPB>& current_membership) {
// Build a list of queries that are scheduled by failed coordinators (as
// evidenced by their absence from the cluster membership list).
std::vector<QueryCancellationTask> to_cancel;
ExecEnv::GetInstance()->query_exec_mgr()->qs_map_.DoFuncForAllEntries(
[&](QueryState* qs) {
if (qs != nullptr && !qs->IsCancelled()) {
if (current_membership.find(qs->coord_backend_id())
== current_membership.end()) {
// decremented by ReleaseQueryState()
AcquireQueryStateLocked(qs);
to_cancel.push_back(QueryCancellationTask(qs));
}
}
});
// Since we are the only producer for the cancellation thread pool, we can find the
// remaining capacity of the pool and submit the new cancellation requests without
// blocking.
int query_num_to_cancel = to_cancel.size();
int remaining_queue_size = QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE
- cancellation_thread_pool_->GetQueueSize();
if (query_num_to_cancel > remaining_queue_size) {
// Fill the queue up to maximum limit, and ignore the rest which will get cancelled
// eventually anyways when QueryState::ReportExecStatus() hits the timeout.
LOG_EVERY_N(WARNING, 60) << "QueryExecMgr cancellation queue is full";
query_num_to_cancel = remaining_queue_size;
for (int i = query_num_to_cancel; i < to_cancel.size(); ++i) {
ReleaseQueryState(to_cancel[i].GetQueryState());
}
}
for (int i = 0; i < query_num_to_cancel; ++i) {
cancellation_thread_pool_->Offer(to_cancel[i]);
}
}
void QueryExecMgr::CancelFromThreadPool(const QueryCancellationTask& cancellation_task) {
QueryState* qs = cancellation_task.GetQueryState();
VLOG(1) << "CancelFromThreadPool(): cancel query " << qs->query_id();
qs->Cancel();
qs->is_coord_active_.Store(false);
ReleaseQueryState(qs);
}