#include "runtime/query-exec-mgr.h"
#include <gperftools/malloc_extension.h>
#include <gutil/strings/substitute.h>
#include <boost/thread/locks.hpp>
#include <boost/thread/lock_guard.hpp>
#include "common/logging.h"
#include "runtime/query-state.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
#include "util/uid-util.h"
#include "util/thread.h"
#include "common/names.h"
using namespace impala;
// TODO: this logging should go into a per query log.
DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
"every log_mem_usage_interval'th fragment completion.");
Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
TUniqueId query_id = query_ctx.query_id;
VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
<< " coord=" << TNetworkAddressToString(query_ctx.coord_address);
bool dummy;
QueryState* qs =
GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
Status status = qs->Init(request, fragment_info);
if (!status.ok()) {
qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
return status;
// avoid blocking the rpc handler thread for too long by starting a new thread for
// query startup (which takes ownership of the QueryState reference)
unique_ptr<Thread> t;
status = Thread::Create("query-exec-mgr",
Substitute("query-state-$0", PrintId(query_id)),
&QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
if (!status.ok()) {
// decrement refcount taken in QueryState::Init()
// decrement refcount taken in GetOrCreateQueryState()
return status;
return Status::OK();
QueryState* QueryExecMgr::CreateQueryState(
const TQueryCtx& query_ctx, int64_t mem_limit) {
bool created;
QueryState* qs = GetOrCreateQueryState(query_ctx, mem_limit, &created);
return qs;
QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
QueryState* qs = nullptr;
int refcnt;
ScopedShardedMapRef<QueryState*> map_ref(query_id,
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_id);
if (it == map_ref->end()) return nullptr;
qs = it->second;
refcnt = qs->refcnt_.Add(1);
DCHECK(qs != nullptr && refcnt > 0);
VLOG_QUERY << "QueryState: query_id=" << PrintId(query_id) << " refcnt=" << refcnt;
return qs;
QueryState* QueryExecMgr::GetOrCreateQueryState(
const TQueryCtx& query_ctx, int64_t mem_limit, bool* created) {
QueryState* qs = nullptr;
int refcnt;
ScopedShardedMapRef<QueryState*> map_ref(query_ctx.query_id,
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_ctx.query_id);
if (it == map_ref->end()) {
// Register new QueryState. This marks when the query first starts executing on
// this backend.
qs = new QueryState(query_ctx, mem_limit);
map_ref->insert(make_pair(query_ctx.query_id, qs));
*created = true;
} else {
qs = it->second;
*created = false;
// decremented by ReleaseQueryState()
refcnt = qs->refcnt_.Add(1);
DCHECK(qs != nullptr && refcnt > 0);
return qs;
void QueryExecMgr::ExecuteQueryHelper(QueryState* qs) {
// Start the query fragment instances and wait for completion or errors.
if (LIKELY(qs->StartFInstances())) qs->MonitorFInstances();
// tcmalloc and address or thread sanitizer cannot be used together
if (FLAGS_log_mem_usage_interval > 0) {
uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->GetValue();
if (num_complete % FLAGS_log_mem_usage_interval == 0) {
char buf[2048];
// This outputs how much memory is currently being used by this impalad
MallocExtension::instance()->GetStats(buf, 2048);
LOG(INFO) << buf;
// decrement refcount taken in QueryState::Init();
// decrement refcount taken in StartQuery()
void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
DCHECK(qs != nullptr);
TUniqueId query_id = qs->query_id();
int32_t cnt = qs->refcnt_.Add(-1);
// don't reference anything from 'qs' beyond this point, 'qs' might get
// gc'd out from under us
qs = nullptr;
VLOG(2) << "ReleaseQueryState(): query_id=" << PrintId(query_id)
<< " refcnt=" << cnt + 1;
DCHECK_GE(cnt, 0);
if (cnt > 0) return;
QueryState* qs_from_map = nullptr;
ScopedShardedMapRef<QueryState*> map_ref(query_id,
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_id);
// someone else might have gc'd the entry
if (it == map_ref->end()) return;
qs_from_map = it->second;
DCHECK(qs_from_map->query_ctx().query_id == query_id);
int32_t cnt = qs_from_map->refcnt_.Load();
DCHECK_GE(cnt, 0);
// someone else might have increased the refcnt in the meantime
if (cnt > 0) return;
delete qs_from_map;
VLOG(1) << "ReleaseQueryState(): deleted query_id=" << PrintId(query_id);
// BACKEND_NUM_QUERIES_EXECUTING is used to detect the backend being quiesced, so we
// decrement it after we're completely done with the query.