| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| |
| #include "runtime/query-exec-mgr.h" |
| |
| #include <gperftools/malloc_extension.h> |
| #include <gutil/strings/substitute.h> |
| #include <boost/thread/locks.hpp> |
| #include <boost/thread/lock_guard.hpp> |
| |
| #include "common/logging.h" |
| #include "runtime/query-state.h" |
| #include "runtime/fragment-instance-state.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/mem-tracker.h" |
| #include "util/uid-util.h" |
| #include "util/thread.h" |
| #include "util/impalad-metrics.h" |
| #include "util/debug-util.h" |
| |
| using boost::lock_guard; |
| using namespace impala; |
| |
| // TODO: this logging should go into a per query log. |
| DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage " |
| "every log_mem_usage_interval'th fragment completion."); |
| |
| Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) { |
| TUniqueId instance_id = params.fragment_instance_ctx.fragment_instance_id; |
| VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id) |
| << " coord=" << params.query_ctx.coord_address; |
| |
| // Starting a new fragment instance creates a thread and consumes a non-trivial |
| // amount of memory. If we are already starved for memory, cancel the instance as |
| // early as possible to avoid digging the hole deeper. |
| MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker(); |
| if (process_mem_tracker->LimitExceeded()) { |
| string msg = Substitute("Instance $0 of plan fragment $1 could not " |
| "start because the backend Impala daemon is over its memory limit", |
| PrintId(instance_id), params.fragment_ctx.fragment.display_name); |
| return process_mem_tracker->MemLimitExceeded(NULL, msg, 0); |
| } |
| |
| QueryState* qs = nullptr; |
| int refcnt; |
| { |
| lock_guard<mutex> l(qs_map_lock_); |
| TUniqueId query_id = params.query_ctx.query_id; |
| auto it = qs_map_.find(query_id); |
| if (it == qs_map_.end()) { |
| // register new QueryState |
| qs = new QueryState(params.query_ctx); |
| qs_map_.insert(make_pair(query_id, qs)); |
| VLOG_QUERY << "new QueryState: query_id=" << query_id; |
| } else { |
| qs = it->second; |
| } |
| // decremented at the end of ExecFInstance() |
| refcnt = qs->refcnt_.Add(1); |
| } |
| DCHECK(qs != nullptr && qs->refcnt_.Load() > 0); |
| VLOG_QUERY << "QueryState: query_id=" << params.query_ctx.query_id |
| << " refcnt=" << refcnt; |
| |
| DCHECK(params.__isset.fragment_ctx); |
| DCHECK(params.__isset.fragment_instance_ctx); |
| FragmentInstanceState* fis = qs->obj_pool()->Add( |
| new FragmentInstanceState(qs, params.fragment_ctx, params.fragment_instance_ctx, |
| params.query_ctx.desc_tbl)); |
| // register instance before returning so that async Cancel() calls can |
| // find the instance |
| qs->RegisterFInstance(fis); |
| // start new thread to execute instance |
| Thread t("query-exec-mgr", |
| Substitute("exec-fragment-instance-$0", PrintId(instance_id)), |
| &QueryExecMgr::ExecFInstance, this, fis); |
| t.Detach(); |
| |
| ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L); |
| ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L); |
| return Status::OK(); |
| } |
| |
| void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) { |
| fis->Exec(); |
| |
| ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L); |
| VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id()); |
| |
| #ifndef ADDRESS_SANITIZER |
| // tcmalloc and address sanitizer can not be used together |
| if (FLAGS_log_mem_usage_interval > 0) { |
| uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value(); |
| if (num_complete % FLAGS_log_mem_usage_interval == 0) { |
| char buf[2048]; |
| // This outputs how much memory is currently being used by this impalad |
| MallocExtension::instance()->GetStats(buf, 2048); |
| LOG(INFO) << buf; |
| } |
| } |
| #endif |
| |
| // decrement refcount taken in StartFInstance() |
| ReleaseQueryState(fis->query_state()); |
| } |
| |
| QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) { |
| VLOG_QUERY << "GetQueryState(): query_id=" << PrintId(query_id); |
| lock_guard<mutex> l(qs_map_lock_); |
| auto it = qs_map_.find(query_id); |
| if (it == qs_map_.end()) return nullptr; |
| QueryState* qs = it->second; |
| int32_t cnt = qs->refcnt_.Add(1); |
| DCHECK_GT(cnt, 0); |
| return qs; |
| } |
| |
| void QueryExecMgr::ReleaseQueryState(QueryState* qs) { |
| DCHECK(qs != nullptr); |
| TUniqueId query_id; |
| { |
| int32_t cnt = qs->refcnt_.Add(-1); |
| VLOG_QUERY << "ReleaseQueryState(): query_id=" << PrintId(qs->query_id()) |
| << " refcnt=" << cnt + 1; |
| DCHECK_GE(cnt, 0); |
| if (cnt > 0) return; |
| // don't reference anything from 'qs' beyond this point, 'qs' might get |
| // gc'd out from under us |
| query_id = qs->query_id(); |
| } |
| |
| { |
| // for now, gc right away |
| lock_guard<mutex> l(qs_map_lock_); |
| auto it = qs_map_.find(query_id); |
| // someone else might have gc'd the entry |
| if (it == qs_map_.end()) return; |
| qs = it->second; |
| DCHECK_EQ(qs->query_ctx().query_id, query_id); |
| int32_t cnt = qs->refcnt_.Load(); |
| DCHECK_GE(cnt, 0); |
| // someone else might have increased the refcnt in the meantime |
| if (cnt > 0) return; |
| size_t num_erased = qs_map_.erase(qs->query_ctx().query_id); |
| DCHECK_EQ(num_erased, 1); |
| } |
| // TODO: send final status report during gc, but do this from a different thread |
| delete qs; |
| } |
| |