blob: fb1ed7381b8b1996609eee8c80bcc44122f07fab [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/exec-env.h"
#include <vector>
#include <boost/algorithm/string.hpp>
#include <gflags/gflags.h>
#include <gutil/strings/substitute.h>
#include "catalog/catalog-service-client-wrapper.h"
#include "codegen/llvm-codegen-cache.h"
#include "common/logging.h"
#include "common/object-pool.h"
#include "exec/kudu/kudu-util.h"
#include "kudu/rpc/service_if.h"
#include "rpc/rpc-mgr.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator.h"
#include "runtime/hbase-table-factory.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/lib-cache.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-exec-mgr.h"
#include "runtime/thread-resource-mgr.h"
#include "runtime/tmp-file-mgr.h"
#include "scheduling/admission-controller.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/request-pool-service.h"
#include "scheduling/scheduler.h"
#include "service/control-service.h"
#include "service/data-stream-service.h"
#include "service/frontend.h"
#include "service/impala-server.h"
#include "statestore/statestore-subscriber-catalog.h"
#include "statestore/statestore-subscriber.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
#include "util/default-path-handlers.h"
#include "util/hdfs-bulk-ops.h"
#include "util/impalad-metrics.h"
#include "util/mem-info.h"
#include "util/memory-metrics.h"
#include "util/metrics.h"
#include "util/network-util.h"
#include "util/openssl-util.h"
#include "util/parse-util.h"
#include "util/periodic-counter-updater.h"
#include "util/pretty-printer.h"
#include "util/system-state-info.h"
#include "util/test-info.h"
#include "util/thread-pool.h"
#include "util/uid-util.h"
#include "util/webserver.h"
#include "common/names.h"
using boost::algorithm::join;
using kudu::rpc::GeneratedServiceIf;
using namespace strings;
DEFINE_string(catalog_service_host, "localhost",
"hostname where CatalogService is running");
DEFINE_bool(enable_webserver, true, "If true, debug webserver is enabled");
DEFINE_bool(ping_expose_webserver_url, true,
"If true, debug webserver url is exposed via PingImpalaService/PingImpalaHS2Service "
"RPC calls");
DEFINE_int32(state_store_subscriber_port, 23000,
"port where StatestoreSubscriberService should be exported");
DEFINE_int32(num_hdfs_worker_threads, 16,
"(Advanced) The number of threads in the global HDFS operation pool");
DEFINE_int32(max_concurrent_queries, 0,
"(Deprecated) This has been replaced with --admission_control_slots, which "
"better accounts for the higher parallelism of queries with mt_dop > 1. "
"If --admission_control_slots is not set, the value of --max_concurrent_queries "
"is used instead for backward compatibility.");
DEFINE_int32(admission_control_slots, 0,
"(Advanced) The maximum degree of parallelism to run queries with on this backend. "
"This determines the number of slots available to queries in admission control for "
"this backend. The degree of parallelism of the query determines the number of slots "
"that it needs. Defaults to number of cores / -num_cores for executors, and 8x that "
"value for dedicated coordinators).");
DEFINE_string(codegen_cache_capacity, "1GB",
"Specify the capacity of the codegen cache. If set to 0, codegen cache is disabled.");
DEFINE_bool(use_local_catalog, false,
"Use the on-demand metadata feature in coordinators. If this is set, coordinators "
"pull metadata as needed from catalogd and cache it locally. The cached metadata "
"gets evicted automatically under memory pressure or after an expiration time.");
DEFINE_int32(local_catalog_cache_mb, -1,
"If --use_local_catalog is enabled, configures the size of the catalog "
"cache within each impalad. If this is set to -1, the cache is auto-"
"configured to 60% of the configured Java heap size. Note that the Java "
"heap size is distinct from and typically smaller than the overall "
"Impala memory limit.");
DEFINE_int32(local_catalog_cache_expiration_s, 60 * 60,
"If --use_local_catalog is enabled, configures the expiration time "
"of the catalog cache within each impalad. Even if the configured "
"cache capacity has not been reached, items are removed from the cache "
"if they have not been accessed in this amount of time.");
DEFINE_int32(local_catalog_max_fetch_retries, 40,
"If --use_local_catalog is enabled, configures the maximum number of times "
"the frontend retries when fetching a metadata object from the impalad "
"coordinator's local catalog cache.");
DEFINE_int32(local_catalog_cache_concurrency_level, 4,
"If --use_local_catalog is enabled, configures the local cache's concurrency "
"level to avoid lock contention, the default value 4 is consistent with the "
"default value of the original cache.");
DECLARE_int32(num_threads_per_core);
DECLARE_int32(num_cores);
DECLARE_int32(krpc_port);
DECLARE_string(mem_limit);
DECLARE_bool(mem_limit_includes_jvm);
DECLARE_string(buffer_pool_limit);
DECLARE_string(buffer_pool_clean_pages_limit);
DECLARE_int64(min_buffer_size);
DECLARE_bool(is_coordinator);
DECLARE_bool(is_executor);
DECLARE_string(webserver_interface);
DECLARE_int32(webserver_port);
DECLARE_int64(tcmalloc_max_total_thread_cache_bytes);
DECLARE_string(admission_service_host);
DECLARE_int32(admission_service_port);
DECLARE_string(catalog_service_host);
DECLARE_int32(catalog_service_port);
DECLARE_string(state_store_host);
DECLARE_int32(state_store_port);
DECLARE_string(state_store_2_host);
DECLARE_int32(state_store_2_port);
DECLARE_string(debug_actions);
DECLARE_string(ssl_client_ca_certificate);
DEFINE_int32(backend_client_connection_num_retries, 3, "Retry backend connections.");
// When network is unstable, TCP will retry and sending could take longer time.
// Choose 5 minutes as default timeout because we don't want RPC timeout be triggered
// by intermittent network issue. The timeout should not be too long either, otherwise
// query could hang for a while before it's cancelled.
DEFINE_int32(backend_client_rpc_timeout_ms, 300000, "(Advanced) The underlying "
"TSocket send/recv timeout in milliseconds for a backend client RPC. ");
DEFINE_int32(catalog_client_connection_num_retries, 10, "The number of times connections "
"or RPCs to the catalog should be retried.");
DEFINE_int32(catalog_client_rpc_timeout_ms, 36000000, "(Advanced) The underlying TSocket "
"send/recv timeout in milliseconds for a catalog client RPC. The default is 10 hours."
" Operations take longer than this are usually abnormal and hanging.");
DEFINE_int32(catalog_lightweight_rpc_timeout_ms, 1800000, "(Advanced) The underlying "
"TSocket send/recv timeout in milliseconds for a lightweight catalog RPC which "
"shouldn't take long in catalogd, e.g. fetching db/table list. The default is 30 "
"minutes which is long enough to tolerate TCP timeout due to retransmission.");
DEFINE_int32(catalog_client_rpc_retry_interval_ms, 3000, "(Advanced) The time to wait "
"before retrying when the catalog RPC client fails to connect to catalogd or when "
"RPCs to the catalogd fail.");
DEFINE_int32(metrics_webserver_port, 0,
"If non-zero, the port to run the metrics webserver on, which exposes the /metrics, "
"/jsonmetrics, /metrics_prometheus, and /healthz endpoints without authentication "
"enabled.");
DEFINE_string(metrics_webserver_interface, "",
"Interface to start metrics webserver on. If blank, webserver binds to 0.0.0.0");
const static string DEFAULT_FS = "fs.defaultFS";
// The max percentage that the codegen cache can take from the total process memory.
// The value is set to 10%.
const double MAX_CODEGEN_CACHE_MEM_PERCENT = 0.1;
// The multiplier for how many queries a dedicated coordinator can run compared to an
// executor. This is only effective when using non-default settings for executor groups
// and the absolute value can be overridden by the '--admission_control_slots' flag.
const static int COORDINATOR_CONCURRENCY_MULTIPLIER = 8;
namespace {
using namespace impala;
/// Helper method to forward cluster membership updates to the frontend.
/// For additional details see comments for PopulateExecutorMembershipRequest()
/// in cluster-membership-mgr.cc
void SendClusterMembershipToFrontend(ClusterMembershipMgr::SnapshotPtr& snapshot,
const vector<TExecutorGroupSet>& expected_exec_group_sets, Frontend* frontend) {
TUpdateExecutorMembershipRequest update_req;
PopulateExecutorMembershipRequest(snapshot, expected_exec_group_sets, update_req);
Status status = frontend->UpdateExecutorMembership(update_req);
if (!status.ok()) {
LOG(WARNING) << "Error updating frontend membership snapshot: " << status.GetDetail();
}
}
}
namespace impala {
ExecEnv* ExecEnv::exec_env_ = nullptr;
ExecEnv::ExecEnv(bool external_fe)
: ExecEnv(FLAGS_krpc_port, FLAGS_state_store_subscriber_port, FLAGS_webserver_port,
FLAGS_state_store_host, FLAGS_state_store_port, FLAGS_state_store_2_host,
FLAGS_state_store_2_port, external_fe) {}
ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
const string& statestore_host, int statestore_port,
const std::string& statestore2_host, int statestore2_port, bool external_fe)
: obj_pool_(new ObjectPool),
metrics_(new MetricGroup("impala-metrics")),
// Create the CatalogServiceClientCache with num_retries = 1 and wait_ms = 0.
// Connections are still retried, but the retry mechanism is driven by
// DoRpcWithRetry. Clients should always use DoRpcWithRetry rather than DoRpc to
// ensure that both RPCs and connections are retried.
catalogd_client_cache_(new CatalogServiceClientCache(1, 0,
FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "",
!FLAGS_ssl_client_ca_certificate.empty())),
catalogd_lightweight_req_client_cache_(new CatalogServiceClientCache(1, 0,
FLAGS_catalog_lightweight_rpc_timeout_ms,
FLAGS_catalog_lightweight_rpc_timeout_ms, "",
!FLAGS_ssl_client_ca_certificate.empty())),
htable_factory_(new HBaseTableFactory()),
disk_io_mgr_(new io::DiskIoMgr()),
webserver_(new Webserver(FLAGS_webserver_interface, webserver_port, metrics_.get())),
pool_mem_trackers_(new PoolMemTrackerRegistry),
thread_mgr_(new ThreadResourceMgr),
tmp_file_mgr_(new TmpFileMgr),
frontend_(external_fe ? nullptr : new Frontend()),
async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
query_exec_mgr_(new QueryExecMgr()),
rpc_metrics_(metrics_->GetOrCreateChildGroup("rpc")),
enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
external_fe_(external_fe),
configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, krpc_port)) {
UUIDToUniqueIdPB(boost::uuids::random_generator()(), &backend_id_);
// Resolve hostname to IP address.
ABORT_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address_));
// KRPC relies on resolved IP address.
rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
krpc_address_ = MakeNetworkAddressPB(
ip_address_, krpc_port, backend_id_, rpc_mgr_->GetUdsAddressUniqueId());
stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
request_pool_service_.reset(new RequestPoolService(metrics_.get()));
TNetworkAddress subscriber_address =
MakeNetworkAddress(FLAGS_hostname, subscriber_port);
TNetworkAddress statestore_address =
MakeNetworkAddress(statestore_host, statestore_port);
TNetworkAddress statestore2_address =
MakeNetworkAddress(statestore2_host, statestore2_port);
catalogd_address_ = std::make_shared<const TNetworkAddress>(
MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port));
active_catalogd_version_checker_.reset(new ActiveCatalogdVersionChecker());
TStatestoreSubscriberType::type subscriber_type = TStatestoreSubscriberType::UNKNOWN;
if (FLAGS_is_coordinator) {
subscriber_type = FLAGS_is_executor ? TStatestoreSubscriberType::COORDINATOR_EXECUTOR
: TStatestoreSubscriberType::COORDINATOR;
} else if (FLAGS_is_executor) {
subscriber_type = TStatestoreSubscriberType::EXECUTOR;
}
// Set StatestoreSubscriber::subscriber_id as hostname + krpc_port.
statestore_subscriber_.reset(new StatestoreSubscriber(
Substitute("impalad@$0:$1", FLAGS_hostname, FLAGS_krpc_port), subscriber_address,
statestore_address, statestore2_address, metrics_.get(), subscriber_type));
if (FLAGS_is_coordinator) {
hdfs_op_thread_pool_.reset(
CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024));
StatestoreSubscriber::UpdateCatalogdCallback update_catalogd_cb =
bind<void>(mem_fn(&ExecEnv::UpdateActiveCatalogd), this, _1, _2, _3);
statestore_subscriber_->AddUpdateCatalogdTopic(update_catalogd_cb);
}
StatestoreSubscriber::CompleteRegistrationCallback complete_registration_cb =
bind<void>(mem_fn(&ExecEnv::SetStatestoreRegistrationCompleted), this);
statestore_subscriber_->AddCompleteRegistrationTopic(complete_registration_cb);
if (FLAGS_is_coordinator && !AdmissionServiceEnabled()) {
// We only need a Scheduler if we're performing admission control locally, i.e. if
// this is a coordinator and there isn't an admissiond.
scheduler_.reset(new Scheduler(metrics_.get(), request_pool_service_.get()));
}
cluster_membership_mgr_.reset(new ClusterMembershipMgr(
PrintId(backend_id_), statestore_subscriber_.get(), metrics_.get()));
// TODO: Consider removing AdmissionController from executor only impalads.
admission_controller_.reset(new AdmissionController(cluster_membership_mgr_.get(),
statestore_subscriber_.get(), request_pool_service_.get(), metrics_.get(),
scheduler_.get(), pool_mem_trackers_.get(), configured_backend_address_));
if (FLAGS_metrics_webserver_port > 0) {
metrics_webserver_.reset(new Webserver(FLAGS_metrics_webserver_interface,
FLAGS_metrics_webserver_port, metrics_.get(), Webserver::AuthMode::NONE));
}
exec_env_ = this;
}
ExecEnv::~ExecEnv() {
if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
if (rpc_mgr_ != nullptr) rpc_mgr_->Shutdown();
disk_io_mgr_.reset(); // Need to tear down before mem_tracker_.
}
Status ExecEnv::InitForFeSupport() {
mem_tracker_.reset(new MemTracker(-1, "Process"));
is_fe_tests_ = true;
return Status::OK();
}
Status ExecEnv::Init() {
LOG(INFO) << "Initializing impalad with backend uuid: " << PrintId(backend_id_);
// Initialize thread pools
if (FLAGS_is_coordinator) {
RETURN_IF_ERROR(hdfs_op_thread_pool_->Init());
}
RETURN_IF_ERROR(async_rpc_pool_->Init());
int64_t bytes_limit;
RETURN_IF_ERROR(ChooseProcessMemLimit(&bytes_limit));
// Need to register JVM metrics first so that we can use them to compute the buffer pool
// limit.
JvmMemoryMetric::InitMetrics(metrics_.get());
if (!BitUtil::IsPowerOf2(FLAGS_min_buffer_size)) {
return Status(Substitute(
"--min_buffer_size must be a power-of-two: $0", FLAGS_min_buffer_size));
}
// The bytes limit we want to size everything else as a fraction of, excluding the
// JVM.
admit_mem_limit_ = bytes_limit;
if (FLAGS_mem_limit_includes_jvm) {
// The JVM max heap size is static and therefore known at this point. Other categories
// of JVM memory consumption are much smaller and dynamic so it is simpler not to
// include them here.
int64_t jvm_max_heap_size = JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
admit_mem_limit_ -= jvm_max_heap_size;
if (admit_mem_limit_ <= 0) {
return Status(
Substitute("Invalid combination of --mem_limit_includes_jvm and JVM max heap "
"size $0, which must be smaller than process memory limit $1",
jvm_max_heap_size, bytes_limit));
}
}
bool is_percent = false;
int64_t codegen_cache_capacity =
ParseUtil::ParseMemSpec(FLAGS_codegen_cache_capacity, &is_percent, 0);
if (codegen_cache_capacity > 0) {
// If codegen_cache_capacity is larger than 0, the number should not be a percentage.
DCHECK(!is_percent);
int64_t codegen_cache_limit = admit_mem_limit_ * MAX_CODEGEN_CACHE_MEM_PERCENT;
DCHECK(codegen_cache_limit > 0);
if (codegen_cache_capacity > codegen_cache_limit) {
LOG(INFO) << "CodeGen Cache capacity changed from "
<< PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES) << " to "
<< PrettyPrinter::Print(codegen_cache_limit, TUnit::BYTES)
<< " due to reaching the limit.";
codegen_cache_capacity = codegen_cache_limit;
}
codegen_cache_.reset(new CodeGenCache(metrics_.get()));
RETURN_IF_ERROR(codegen_cache_->Init(codegen_cache_capacity));
LOG(INFO) << "CodeGen Cache initialized with capacity "
<< PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES);
// Preserve the memory for codegen cache.
admit_mem_limit_ -= codegen_cache_capacity;
DCHECK_GT(admit_mem_limit_, 0);
} else {
LOG(INFO) << "CodeGen Cache is disabled.";
}
LOG(INFO) << "Admit memory limit: "
<< PrettyPrinter::Print(admit_mem_limit_, TUnit::BYTES);
int64_t buffer_pool_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_limit,
&is_percent, admit_mem_limit_);
if (buffer_pool_limit <= 0) {
return Status(Substitute("Invalid --buffer_pool_limit value, must be a "
"positive bytes value or percentage: $0",
FLAGS_buffer_pool_limit));
}
buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, FLAGS_min_buffer_size);
LOG(INFO) << "Buffer pool limit: "
<< PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
int64_t clean_pages_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_clean_pages_limit,
&is_percent, buffer_pool_limit);
if (clean_pages_limit <= 0) {
return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, must be a "
"positive bytes value or percentage: $0",
FLAGS_buffer_pool_clean_pages_limit));
}
InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
admission_slots_ = CpuInfo::num_cores();
if (FLAGS_admission_control_slots > 0) {
if (FLAGS_max_concurrent_queries > 0) {
LOG(WARNING) << "Ignored --max_concurrent_queries, --admission_control_slots was "
<< "set and takes precedence.";
}
admission_slots_ = FLAGS_admission_control_slots;
} else if (FLAGS_max_concurrent_queries > 0) {
admission_slots_ = FLAGS_max_concurrent_queries;
} else if (FLAGS_is_coordinator && !FLAGS_is_executor) {
// By default we assume that dedicated coordinators can handle more queries than
// executors.
admission_slots_ *= COORDINATOR_CONCURRENCY_MULTIPLIER;
}
InitSystemStateInfo();
if (enable_webserver_) {
RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(webserver_.get()));
}
if (FLAGS_metrics_webserver_port > 0) {
RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(metrics_webserver_.get()));
RETURN_IF_ERROR(metrics_webserver_->Start());
}
catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
catalogd_lightweight_req_client_cache_->InitMetrics(
metrics_.get(), "catalog.server", "for-lightweight-rpc");
RETURN_IF_ERROR(RegisterMemoryMetrics(
metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
// Initialize impalad metrics
ImpaladMetrics::CreateMetrics(
exec_env_->metrics()->GetOrCreateChildGroup("impala-server"));
InitMemTracker(bytes_limit);
// Initializes the RPCMgr, ControlServices and DataStreamServices.
// Initialization needs to happen in the following order due to dependencies:
// - RPC manager, DataStreamService and DataStreamManager.
RETURN_IF_ERROR(rpc_mgr_->Init(krpc_address_));
control_svc_.reset(new ControlService(rpc_metrics_));
RETURN_IF_ERROR(control_svc_->Init());
data_svc_.reset(new DataStreamService(rpc_metrics_));
RETURN_IF_ERROR(data_svc_->Init());
RETURN_IF_ERROR(stream_mgr_->Init(data_svc_->mem_tracker()));
// Bump thread cache to 1GB to reduce contention for TCMalloc central
// list's spinlock.
if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
}
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
// Change the total TCMalloc thread cache size if necessary.
if (FLAGS_tcmalloc_max_total_thread_cache_bytes > 0 &&
!MallocExtension::instance()->SetNumericProperty(
"tcmalloc.max_total_thread_cache_bytes",
FLAGS_tcmalloc_max_total_thread_cache_bytes)) {
return Status("Failed to change TCMalloc total thread cache size.");
}
// A MemTracker for TCMalloc overhead which is the difference between the physical bytes
// reserved (TcmallocMetric::PHYSICAL_BYTES_RESERVED) and the bytes in use
// (TcmallocMetrics::BYTES_IN_USE). This overhead accounts for all the cached freelists
// used by TCMalloc.
IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge(
MakeTMetricDef("negated_tcmalloc_bytes_in_use", TMetricKind::GAUGE, TUnit::BYTES),
TcmallocMetric::BYTES_IN_USE));
vector<IntGauge*> overhead_metrics;
overhead_metrics.push_back(negated_bytes_in_use);
overhead_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
SumGauge* tcmalloc_overhead = obj_pool_->Add(new SumGauge(
MakeTMetricDef("tcmalloc_overhead", TMetricKind::GAUGE, TUnit::BYTES),
overhead_metrics));
obj_pool_->Add(
new MemTracker(tcmalloc_overhead, -1, "TCMalloc Overhead", mem_tracker_.get()));
#endif
mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process");
RETURN_IF_ERROR(disk_io_mgr_->Init());
// Start services in order to ensure that dependencies between them are met
if (enable_webserver_) {
AddDefaultUrlCallbacks(webserver_.get(), metrics_.get(), mem_tracker_.get());
RETURN_IF_ERROR(webserver_->Start());
} else {
LOG(INFO) << "Not starting webserver";
}
RETURN_IF_ERROR(cluster_membership_mgr_->Init());
if (FLAGS_is_coordinator && frontend_ != nullptr) {
cluster_membership_mgr_->RegisterUpdateCallbackFn(
[this](ClusterMembershipMgr::SnapshotPtr snapshot) {
SendClusterMembershipToFrontend(snapshot,
this->cluster_membership_mgr()->GetExpectedExecGroupSets(),
this->frontend());
});
}
RETURN_IF_ERROR(admission_controller_->Init());
RETURN_IF_ERROR(InitHadoopConfig());
return Status::OK();
}
Status ExecEnv::InitHadoopConfig() {
if (frontend_ != nullptr) {
// Get the fs.defaultFS value set in core-site.xml and assign it to
// configured_defaultFs
TGetHadoopConfigRequest config_request;
config_request.__set_name(DEFAULT_FS);
TGetHadoopConfigResponse config_response;
RETURN_IF_ERROR(frontend_->GetHadoopConfig(config_request, &config_response));
if (config_response.__isset.value) {
default_fs_ = config_response.value;
} else {
default_fs_ = "hdfs://";
}
}
return Status::OK();
}
Status ExecEnv::StartStatestoreSubscriberService() {
LOG(INFO) << "Starting statestore subscriber service";
// Must happen after all topic registrations / callbacks are done
if (statestore_subscriber_.get() != nullptr) {
Status status = statestore_subscriber_->Start();
if (!status.ok()) {
status.AddDetail("Statestore subscriber did not start up.");
return status;
}
if (statestore_subscriber_->IsRegistered()) SetStatestoreRegistrationCompleted();
}
return Status::OK();
}
Status ExecEnv::StartKrpcService() {
LOG(INFO) << "Starting KRPC service";
RETURN_IF_ERROR(rpc_mgr_->StartServices());
return Status::OK();
}
void ExecEnv::SetImpalaServer(ImpalaServer* server) {
DCHECK(impala_server_ == nullptr) << "ImpalaServer already registered";
DCHECK(server != nullptr);
impala_server_ = server;
// Register the ImpalaServer with the cluster membership manager
cluster_membership_mgr_->SetLocalBeDescFn([server]() {
return server->GetLocalBackendDescriptor();
});
if (FLAGS_is_coordinator) {
cluster_membership_mgr_->RegisterUpdateCallbackFn(
[server](ClusterMembershipMgr::SnapshotPtr snapshot) {
std::unordered_set<BackendIdPB> current_backend_set;
for (const auto& it : snapshot->current_backends) {
current_backend_set.insert(it.second.backend_id());
}
server->CancelQueriesOnFailedBackends(current_backend_set);
});
}
if (FLAGS_is_executor && !TestInfo::is_test()) {
cluster_membership_mgr_->RegisterUpdateCallbackFn(
[](ClusterMembershipMgr::SnapshotPtr snapshot) {
std::unordered_set<BackendIdPB> current_backend_set;
for (const auto& it : snapshot->current_backends) {
current_backend_set.insert(it.second.backend_id());
}
ExecEnv::GetInstance()->query_exec_mgr()->CancelQueriesForFailedCoordinators(
current_backend_set);
});
}
}
void ExecEnv::InitTcMallocAggressiveDecommit() {
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
// Aggressive decommit is required so that unused pages in the TCMalloc page heap are
// not backed by physical pages and do not contribute towards memory consumption.
// Enable it in TCMalloc before InitBufferPool().
MallocExtension::instance()->SetNumericProperty(
"tcmalloc.aggressive_memory_decommit", 1);
#endif
}
void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity,
int64_t clean_pages_limit) {
InitTcMallocAggressiveDecommit();
buffer_pool_.reset(
new BufferPool(metrics_.get(), min_buffer_size, capacity, clean_pages_limit));
buffer_reservation_.reset(new ReservationTracker());
buffer_reservation_->InitRootTracker(nullptr, capacity);
}
void ExecEnv::InitMemTracker(int64_t bytes_limit) {
DCHECK(AggregateMemoryMetrics::TOTAL_USED != nullptr) << "Memory metrics not reg'd";
mem_tracker_.reset(
new MemTracker(AggregateMemoryMetrics::TOTAL_USED, bytes_limit, "Process"));
if (FLAGS_mem_limit_includes_jvm) {
// Add JVM metrics that should count against the process memory limit.
obj_pool_->Add(new MemTracker(
JvmMemoryMetric::HEAP_MAX_USAGE, -1, "JVM: max heap size", mem_tracker_.get()));
obj_pool_->Add(new MemTracker(JvmMemoryMetric::NON_HEAP_COMMITTED, -1,
"JVM: non-heap committed", mem_tracker_.get()));
}
if (buffer_pool_ != nullptr) {
// Tests can create multiple buffer pools, meaning that the metrics are not usable.
if (!TestInfo::is_test()) {
// Add BufferPool MemTrackers for cached memory that is not tracked against queries
// but is included in process memory consumption.
obj_pool_->Add(new MemTracker(BufferPoolMetric::FREE_BUFFER_BYTES, -1,
"Buffer Pool: Free Buffers", mem_tracker_.get()));
obj_pool_->Add(new MemTracker(BufferPoolMetric::CLEAN_PAGE_BYTES, -1,
"Buffer Pool: Clean Pages", mem_tracker_.get()));
// Also need a MemTracker for unused reservations as a negative value. Unused
// reservations are counted against queries but not against the process memory
// consumption. This accounts for that difference.
IntGauge* negated_unused_reservation = obj_pool_->Add(new NegatedGauge(
MakeTMetricDef("negated_unused_reservation", TMetricKind::GAUGE, TUnit::BYTES),
BufferPoolMetric::UNUSED_RESERVATION_BYTES));
obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
"Buffer Pool: Unused Reservation", mem_tracker_.get()));
}
mem_tracker_->AddGcFunction([buffer_pool=buffer_pool_.get()] (int64_t bytes_to_free)
{
// Only free memory in excess of the current reservation - the buffer pool
// does not need to give up cached memory that is offset by an unused reservation.
int64_t reserved = BufferPoolMetric::RESERVED->GetValue();
int64_t allocated_from_sys = BufferPoolMetric::SYSTEM_ALLOCATED->GetValue();
if (reserved >= allocated_from_sys) return;
buffer_pool->ReleaseMemory(min(bytes_to_free, allocated_from_sys - reserved));
});
}
}
void ExecEnv::InitSystemStateInfo() {
system_state_info_.reset(new SystemStateInfo());
PeriodicCounterUpdater::RegisterUpdateFunction([s = system_state_info_.get()]() {
s->CaptureSystemStateSnapshot();
}, true);
}
Status ExecEnv::GetKuduClient(const vector<string>& master_addresses,
kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client) {
string master_addr_concat = join(master_addresses, ",");
lock_guard<SpinLock> l(kudu_client_map_lock_);
auto kudu_client_map_it = kudu_client_map_.find(master_addr_concat);
if (kudu_client_map_it == kudu_client_map_.end()) {
// KuduClient doesn't exist, create it.
LOG(INFO) << "Creating a new KuduClient for masters=" << master_addr_concat;
kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client;
RETURN_IF_ERROR(CreateKuduClient(master_addresses, &kudu_client));
kudu_client_map_.insert(make_pair(master_addr_concat, kudu_client));
*client = kudu_client;
} else {
// Return existing KuduClient
*client = kudu_client_map_it->second;
}
return Status::OK();
}
bool ExecEnv::AdmissionServiceEnabled() const {
return !FLAGS_admission_service_host.empty();
}
Status ExecEnv::GetAdmissionServiceAddress(
NetworkAddressPB& admission_service_address) const {
if (AdmissionServiceEnabled()) {
IpAddr ip;
RETURN_IF_ERROR(HostnameToIpAddr(FLAGS_admission_service_host, &ip));
// TODO: get BackendId of admissiond in global admission control mode.
// Use admissiond's IP address as unique ID for UDS now.
admission_service_address = MakeNetworkAddressPB(
ip, FLAGS_admission_service_port, UdsAddressUniqueIdPB::IP_ADDRESS);
}
return Status::OK();
}
void ExecEnv::UpdateActiveCatalogd(bool is_registration_reply,
int64 active_catalogd_version, const TCatalogRegistration& catalogd_registration) {
std::lock_guard<std::mutex> l(catalogd_address_lock_);
if (!active_catalogd_version_checker_->CheckActiveCatalogdVersion(
is_registration_reply, active_catalogd_version)) {
return;
}
if (catalogd_registration.address.hostname.empty()
|| catalogd_registration.address.port == 0) {
return;
}
if (catalogd_registration.protocol
!= statestore_subscriber_->GetCatalogProtocolVersion()) {
// This should not happen now. Since we bump up catalog and statestore service
// versions at same time, coordinators must have same protocol versions as catalogd
// if they can join one cluster. But in future, it may happen if the two protocol
// versions are not updated at same time for some reason.
// Note that Impalad coordinators must wait for their local replica of the catalog to
// be initialized from the statestore prior to opening up client ports. If the
// coordinator has incompatible protocol version from catalogd, it should not open
// client ports.
LOG(INFO) << "Protocol version of Catalog service does not match the protocol "
<< "version of registered catalogd: "
<< statestore_subscriber_->GetCatalogProtocolVersion()
<< " vs. " << catalogd_registration.protocol;
}
DCHECK(catalogd_address_.get() != nullptr);
if (!is_catalogd_address_metric_set_) {
// At least set the metric once.
is_catalogd_address_metric_set_ = true;
ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS->SetValue(
TNetworkAddressToString(catalogd_registration.address));
}
bool is_matching = (catalogd_registration.address.port == catalogd_address_->port
&& catalogd_registration.address.hostname == catalogd_address_->hostname);
if (!is_matching) {
RETURN_VOID_IF_ERROR(
DebugAction(FLAGS_debug_actions, "IGNORE_NEW_ACTIVE_CATALOGD_ADDR"));
LOG(INFO) << "The address of Catalog service is changed from "
<< TNetworkAddressToString(*catalogd_address_.get())
<< " to " << TNetworkAddressToString(catalogd_registration.address);
catalogd_address_ =
std::make_shared<const TNetworkAddress>(catalogd_registration.address);
ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS->SetValue(
TNetworkAddressToString(catalogd_registration.address));
}
}
std::shared_ptr<const TNetworkAddress> ExecEnv::GetCatalogdAddress() const {
std::lock_guard<std::mutex> l(catalogd_address_lock_);
DCHECK(catalogd_address_.get() != nullptr);
std::shared_ptr<const TNetworkAddress> address = catalogd_address_;
return address;
}
} // namespace impala