blob: 5c282d31612b5c205913897548faaa132819b202 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/query-exec-mgr.h"
#include <gperftools/malloc_extension.h>
#include <gutil/strings/substitute.h>
#include <boost/thread/locks.hpp>
#include <boost/thread/lock_guard.hpp>
#include "common/logging.h"
#include "runtime/query-state.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
#include "util/uid-util.h"
#include "util/thread.h"
#include "common/names.h"
using namespace impala;
// TODO: this logging should go into a per query log.
DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
"every log_mem_usage_interval'th fragment completion.");
Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
TUniqueId query_id = query_ctx.query_id;
VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
<< " coord=" << TNetworkAddressToString(query_ctx.coord_address);
bool dummy;
QueryState* qs =
GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
Status status = qs->Init(request, fragment_info);
if (!status.ok()) {
qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
ReleaseQueryState(qs);
return status;
}
// avoid blocking the rpc handler thread for too long by starting a new thread for
// query startup (which takes ownership of the QueryState reference)
unique_ptr<Thread> t;
status = Thread::Create("query-exec-mgr",
Substitute("query-state-$0", PrintId(query_id)),
&QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
if (!status.ok()) {
// decrement refcount taken in QueryState::Init()
qs->ReleaseBackendResourceRefcount();
// decrement refcount taken in GetOrCreateQueryState()
ReleaseQueryState(qs);
return status;
}
t->Detach();
return Status::OK();
}
QueryState* QueryExecMgr::CreateQueryState(
const TQueryCtx& query_ctx, int64_t mem_limit) {
bool created;
QueryState* qs = GetOrCreateQueryState(query_ctx, mem_limit, &created);
DCHECK(created);
return qs;
}
QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
QueryState* qs = nullptr;
int refcnt;
{
ScopedShardedMapRef<QueryState*> map_ref(query_id,
&ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_id);
if (it == map_ref->end()) return nullptr;
qs = it->second;
refcnt = qs->refcnt_.Add(1);
}
DCHECK(qs != nullptr && refcnt > 0);
VLOG_QUERY << "QueryState: query_id=" << PrintId(query_id) << " refcnt=" << refcnt;
return qs;
}
QueryState* QueryExecMgr::GetOrCreateQueryState(
const TQueryCtx& query_ctx, int64_t mem_limit, bool* created) {
QueryState* qs = nullptr;
int refcnt;
{
ScopedShardedMapRef<QueryState*> map_ref(query_ctx.query_id,
&ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_ctx.query_id);
if (it == map_ref->end()) {
// Register new QueryState. This marks when the query first starts executing on
// this backend.
ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTED->Increment(1);
ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->Increment(1);
qs = new QueryState(query_ctx, mem_limit);
map_ref->insert(make_pair(query_ctx.query_id, qs));
*created = true;
} else {
qs = it->second;
*created = false;
}
// decremented by ReleaseQueryState()
refcnt = qs->refcnt_.Add(1);
}
DCHECK(qs != nullptr && refcnt > 0);
return qs;
}
void QueryExecMgr::ExecuteQueryHelper(QueryState* qs) {
// Start the query fragment instances and wait for completion or errors.
if (LIKELY(qs->StartFInstances())) qs->MonitorFInstances();
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
// tcmalloc and address or thread sanitizer cannot be used together
if (FLAGS_log_mem_usage_interval > 0) {
uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->GetValue();
if (num_complete % FLAGS_log_mem_usage_interval == 0) {
char buf[2048];
// This outputs how much memory is currently being used by this impalad
MallocExtension::instance()->GetStats(buf, 2048);
LOG(INFO) << buf;
}
}
#endif
// decrement refcount taken in QueryState::Init();
qs->ReleaseBackendResourceRefcount();
// decrement refcount taken in StartQuery()
ReleaseQueryState(qs);
}
void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
DCHECK(qs != nullptr);
TUniqueId query_id = qs->query_id();
int32_t cnt = qs->refcnt_.Add(-1);
// don't reference anything from 'qs' beyond this point, 'qs' might get
// gc'd out from under us
qs = nullptr;
VLOG(2) << "ReleaseQueryState(): query_id=" << PrintId(query_id)
<< " refcnt=" << cnt + 1;
DCHECK_GE(cnt, 0);
if (cnt > 0) return;
QueryState* qs_from_map = nullptr;
{
ScopedShardedMapRef<QueryState*> map_ref(query_id,
&ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
DCHECK(map_ref.get() != nullptr);
auto it = map_ref->find(query_id);
// someone else might have gc'd the entry
if (it == map_ref->end()) return;
qs_from_map = it->second;
DCHECK(qs_from_map->query_ctx().query_id == query_id);
int32_t cnt = qs_from_map->refcnt_.Load();
DCHECK_GE(cnt, 0);
// someone else might have increased the refcnt in the meantime
if (cnt > 0) return;
map_ref->erase(it);
}
delete qs_from_map;
VLOG(1) << "ReleaseQueryState(): deleted query_id=" << PrintId(query_id);
// BACKEND_NUM_QUERIES_EXECUTING is used to detect the backend being quiesced, so we
// decrement it after we're completely done with the query.
ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->Increment(-1);
}