| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/exec-env.h" |
| |
| #include <vector> |
| |
| #include <boost/algorithm/string.hpp> |
| #include <gflags/gflags.h> |
| #include <gutil/strings/substitute.h> |
| |
| #include "common/logging.h" |
| #include "gen-cpp/CatalogService.h" |
| #include "gen-cpp/ImpalaInternalService.h" |
| #include "runtime/backend-client.h" |
| #include "runtime/client-cache.h" |
| #include "runtime/coordinator.h" |
| #include "runtime/data-stream-mgr.h" |
| #include "runtime/disk-io-mgr.h" |
| #include "runtime/hbase-table-factory.h" |
| #include "runtime/hdfs-fs-cache.h" |
| #include "runtime/lib-cache.h" |
| #include "runtime/mem-tracker.h" |
| #include "runtime/thread-resource-mgr.h" |
| #include "runtime/query-exec-mgr.h" |
| #include "runtime/tmp-file-mgr.h" |
| #include "scheduling/request-pool-service.h" |
| #include "scheduling/simple-scheduler.h" |
| #include "service/frontend.h" |
| #include "statestore/statestore-subscriber.h" |
| #include "util/debug-util.h" |
| #include "util/debug-util.h" |
| #include "util/default-path-handlers.h" |
| #include "util/hdfs-bulk-ops.h" |
| #include "util/mem-info.h" |
| #include "util/mem-info.h" |
| #include "util/memory-metrics.h" |
| #include "util/memory-metrics.h" |
| #include "util/metrics.h" |
| #include "util/network-util.h" |
| #include "util/parse-util.h" |
| #include "util/pretty-printer.h" |
| #include "util/thread-pool.h" |
| #include "util/webserver.h" |
| |
| #include "common/names.h" |
| |
| using boost::algorithm::is_any_of; |
| using boost::algorithm::split; |
| using boost::algorithm::to_lower; |
| using boost::algorithm::token_compress_on; |
| using namespace strings; |
| |
| DEFINE_bool(use_statestore, true, |
| "Use an external statestore process to manage cluster membership"); |
| DEFINE_string(catalog_service_host, "localhost", |
| "hostname where CatalogService is running"); |
| DEFINE_bool(enable_webserver, true, "If true, debug webserver is enabled"); |
| DEFINE_string(state_store_host, "localhost", |
| "hostname where StatestoreService is running"); |
| DEFINE_int32(state_store_subscriber_port, 23000, |
| "port where StatestoreSubscriberService should be exported"); |
| DEFINE_int32(num_hdfs_worker_threads, 16, |
| "(Advanced) The number of threads in the global HDFS operation pool"); |
| |
| DECLARE_int32(state_store_port); |
| DECLARE_int32(num_threads_per_core); |
| DECLARE_int32(num_cores); |
| DECLARE_int32(be_port); |
| DECLARE_string(mem_limit); |
| |
| // TODO: Remove the following RM-related flags in Impala 3.0. |
| DEFINE_bool(enable_rm, false, "Deprecated"); |
| DEFINE_int32(llama_callback_port, 28000, "Deprecated"); |
| DEFINE_string(llama_host, "", "Deprecated"); |
| DEFINE_int32(llama_port, 15000, "Deprecated"); |
| DEFINE_string(llama_addresses, "", "Deprecated"); |
| DEFINE_int64(llama_registration_timeout_secs, 30, "Deprecated"); |
| DEFINE_int64(llama_registration_wait_secs, 3, "Deprecated"); |
| DEFINE_int64(llama_max_request_attempts, 5, "Deprecated"); |
| DEFINE_string(cgroup_hierarchy_path, "", "Deprecated"); |
| DEFINE_string(staging_cgroup, "impala_staging", "Deprecated"); |
| DEFINE_int32(resource_broker_cnxn_attempts, 1, "Deprecated"); |
| DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "Deprecated"); |
| DEFINE_int32(resource_broker_send_timeout, 0, "Deprecated"); |
| DEFINE_int32(resource_broker_recv_timeout, 0, "Deprecated"); |
| |
| DEFINE_int32(coordinator_rpc_threads, 12, "(Advanced) Number of threads available to " |
| "start fragments on remote Impala daemons."); |
| |
| DECLARE_string(ssl_client_ca_certificate); |
| |
| DEFINE_int32(backend_client_connection_num_retries, 3, "Retry backend connections."); |
| // When network is unstable, TCP will retry and sending could take longer time. |
| // Choose 5 minutes as default timeout because we don't want RPC timeout be triggered |
| // by intermittent network issue. The timeout should not be too long either, otherwise |
| // query could hang for a while before it's cancelled. |
| DEFINE_int32(backend_client_rpc_timeout_ms, 300000, "(Advanced) The underlying " |
| "TSocket send/recv timeout in milliseconds for a backend client RPC. "); |
| |
| DEFINE_int32(catalog_client_connection_num_retries, 3, "Retry catalog connections."); |
| DEFINE_int32(catalog_client_rpc_timeout_ms, 0, "(Advanced) The underlying TSocket " |
| "send/recv timeout in milliseconds for a catalog client RPC."); |
| |
| const static string DEFAULT_FS = "fs.defaultFS"; |
| |
| namespace impala { |
| |
| ExecEnv* ExecEnv::exec_env_ = NULL; |
| |
| ExecEnv::ExecEnv() |
| : metrics_(new MetricGroup("impala-metrics")), |
| stream_mgr_(new DataStreamMgr(metrics_.get())), |
| impalad_client_cache_( |
| new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0, |
| FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "", |
| !FLAGS_ssl_client_ca_certificate.empty())), |
| catalogd_client_cache_( |
| new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, 0, |
| FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "", |
| !FLAGS_ssl_client_ca_certificate.empty())), |
| htable_factory_(new HBaseTableFactory()), |
| disk_io_mgr_(new DiskIoMgr()), |
| webserver_(new Webserver()), |
| mem_tracker_(NULL), |
| thread_mgr_(new ThreadResourceMgr), |
| hdfs_op_thread_pool_( |
| CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)), |
| tmp_file_mgr_(new TmpFileMgr), |
| request_pool_service_(new RequestPoolService(metrics_.get())), |
| frontend_(new Frontend()), |
| fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc", |
| "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), |
| async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), |
| query_exec_mgr_(new QueryExecMgr()), |
| enable_webserver_(FLAGS_enable_webserver), |
| is_fe_tests_(false), |
| backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { |
| // Initialize the scheduler either dynamically (with a statestore) or statically (with |
| // a standalone single backend) |
| if (FLAGS_use_statestore) { |
| TNetworkAddress subscriber_address = |
| MakeNetworkAddress(FLAGS_hostname, FLAGS_state_store_subscriber_port); |
| TNetworkAddress statestore_address = |
| MakeNetworkAddress(FLAGS_state_store_host, FLAGS_state_store_port); |
| |
| statestore_subscriber_.reset(new StatestoreSubscriber( |
| Substitute("impalad@$0", TNetworkAddressToString(backend_address_)), |
| subscriber_address, statestore_address, metrics_.get())); |
| |
| scheduler_.reset(new SimpleScheduler(statestore_subscriber_.get(), |
| statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(), |
| request_pool_service_.get())); |
| } else { |
| vector<TNetworkAddress> addresses; |
| addresses.push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)); |
| scheduler_.reset(new SimpleScheduler( |
| addresses, metrics_.get(), webserver_.get(), request_pool_service_.get())); |
| } |
| if (exec_env_ == NULL) exec_env_ = this; |
| } |
| |
| // TODO: Need refactor to get rid of duplicated code. |
| ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, |
| int webserver_port, const string& statestore_host, int statestore_port) |
| : metrics_(new MetricGroup("impala-metrics")), |
| stream_mgr_(new DataStreamMgr(metrics_.get())), |
| impalad_client_cache_( |
| new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0, |
| FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "", |
| !FLAGS_ssl_client_ca_certificate.empty())), |
| catalogd_client_cache_( |
| new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, 0, |
| FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "", |
| !FLAGS_ssl_client_ca_certificate.empty())), |
| htable_factory_(new HBaseTableFactory()), |
| disk_io_mgr_(new DiskIoMgr()), |
| webserver_(new Webserver(webserver_port)), |
| mem_tracker_(NULL), |
| thread_mgr_(new ThreadResourceMgr), |
| hdfs_op_thread_pool_( |
| CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)), |
| tmp_file_mgr_(new TmpFileMgr), |
| frontend_(new Frontend()), |
| fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc", |
| "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), |
| async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), |
| query_exec_mgr_(new QueryExecMgr()), |
| enable_webserver_(FLAGS_enable_webserver && webserver_port > 0), |
| is_fe_tests_(false), |
| backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { |
| request_pool_service_.reset(new RequestPoolService(metrics_.get())); |
| |
| if (FLAGS_use_statestore && statestore_port > 0) { |
| TNetworkAddress subscriber_address = |
| MakeNetworkAddress(hostname, subscriber_port); |
| TNetworkAddress statestore_address = |
| MakeNetworkAddress(statestore_host, statestore_port); |
| |
| statestore_subscriber_.reset(new StatestoreSubscriber( |
| Substitute("impalad@$0", TNetworkAddressToString(backend_address_)), |
| subscriber_address, statestore_address, metrics_.get())); |
| |
| scheduler_.reset(new SimpleScheduler(statestore_subscriber_.get(), |
| statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(), |
| request_pool_service_.get())); |
| } else { |
| vector<TNetworkAddress> addresses; |
| addresses.push_back(MakeNetworkAddress(hostname, backend_port)); |
| scheduler_.reset(new SimpleScheduler( |
| addresses, metrics_.get(), webserver_.get(), request_pool_service_.get())); |
| } |
| if (exec_env_ == NULL) exec_env_ = this; |
| } |
| |
| |
| ExecEnv::~ExecEnv() { |
| } |
| |
| Status ExecEnv::InitForFeTests() { |
| mem_tracker_.reset(new MemTracker(-1, "Process")); |
| is_fe_tests_ = true; |
| return Status::OK(); |
| } |
| |
| Status ExecEnv::StartServices() { |
| LOG(INFO) << "Starting global services"; |
| |
| // Initialize global memory limit. |
| // Depending on the system configuration, we will have to calculate the process |
| // memory limit either based on the available physical memory, or if overcommitting |
| // is turned off, we use the memory commit limit from /proc/meminfo (see |
| // IMPALA-1690). |
| // --mem_limit="" means no memory limit |
| int64_t bytes_limit = 0; |
| bool is_percent; |
| if (MemInfo::vm_overcommit() == 2 && |
| MemInfo::commit_limit() < MemInfo::physical_mem()) { |
| bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, |
| MemInfo::commit_limit()); |
| // There might be the case of misconfiguration, when on a system swap is disabled |
| // and overcommitting is turned off the actual usable memory is less than the |
| // available physical memory. |
| LOG(WARNING) << "This system shows a discrepancy between the available " |
| << "memory and the memory commit limit allowed by the " |
| << "operating system. ( Mem: " << MemInfo::physical_mem() |
| << "<=> CommitLimit: " |
| << MemInfo::commit_limit() << "). " |
| << "Impala will adhere to the smaller value by setting the " |
| << "process memory limit to " << bytes_limit << " " |
| << "Please verify the system configuration. Specifically, " |
| << "/proc/sys/vm/overcommit_memory and " |
| << "/proc/sys/vm/overcommit_ratio."; |
| } else { |
| bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, |
| MemInfo::physical_mem()); |
| } |
| |
| if (bytes_limit < 0) { |
| return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'."); |
| } |
| // Minimal IO Buffer requirements: |
| // IO buffer (8MB default) * number of IO buffers per thread (5) * |
| // number of threads per core * number of cores |
| int64_t min_requirement = disk_io_mgr_->max_read_buffer_size() * |
| DiskIoMgr::DEFAULT_QUEUE_CAPACITY * |
| FLAGS_num_threads_per_core * FLAGS_num_cores; |
| if (bytes_limit < min_requirement) { |
| LOG(WARNING) << "Memory limit " |
| << PrettyPrinter::Print(bytes_limit, TUnit::BYTES) |
| << " does not meet minimal memory requirement of " |
| << PrettyPrinter::Print(min_requirement, TUnit::BYTES); |
| } |
| |
| metrics_->Init(enable_webserver_ ? webserver_.get() : NULL); |
| impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends"); |
| catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server"); |
| RETURN_IF_ERROR(RegisterMemoryMetrics(metrics_.get(), true)); |
| |
| #ifndef ADDRESS_SANITIZER |
| // Limit of -1 means no memory limit. |
| mem_tracker_.reset(new MemTracker(TcmallocMetric::PHYSICAL_BYTES_RESERVED, |
| bytes_limit > 0 ? bytes_limit : -1, "Process")); |
| |
| // Since tcmalloc does not free unused memory, we may exceed the process mem limit even |
| // if Impala is not actually using that much memory. Add a callback to free any unused |
| // memory if we hit the process limit. |
| mem_tracker_->AddGcFunction(boost::bind(&MallocExtension::ReleaseFreeMemory, |
| MallocExtension::instance())); |
| #else |
| // tcmalloc metrics aren't defined in ASAN builds, just use the default behavior to |
| // track process memory usage (sum of all children trackers). |
| mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, "Process")); |
| #endif |
| |
| mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process"); |
| |
| if (bytes_limit > MemInfo::physical_mem()) { |
| LOG(WARNING) << "Memory limit " |
| << PrettyPrinter::Print(bytes_limit, TUnit::BYTES) |
| << " exceeds physical memory of " |
| << PrettyPrinter::Print(MemInfo::physical_mem(), TUnit::BYTES); |
| } |
| LOG(INFO) << "Using global memory limit: " |
| << PrettyPrinter::Print(bytes_limit, TUnit::BYTES); |
| |
| RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get())); |
| |
| // Start services in order to ensure that dependencies between them are met |
| if (enable_webserver_) { |
| AddDefaultUrlCallbacks(webserver_.get(), mem_tracker_.get(), metrics_.get()); |
| RETURN_IF_ERROR(webserver_->Start()); |
| } else { |
| LOG(INFO) << "Not starting webserver"; |
| } |
| |
| if (scheduler_ != NULL) RETURN_IF_ERROR(scheduler_->Init()); |
| |
| // Get the fs.defaultFS value set in core-site.xml and assign it to |
| // configured_defaultFs |
| TGetHadoopConfigRequest config_request; |
| config_request.__set_name(DEFAULT_FS); |
| TGetHadoopConfigResponse config_response; |
| frontend_->GetHadoopConfig(config_request, &config_response); |
| if (config_response.__isset.value) { |
| default_fs_ = config_response.value; |
| } else { |
| default_fs_ = "hdfs://"; |
| } |
| // Must happen after all topic registrations / callbacks are done |
| if (statestore_subscriber_.get() != NULL) { |
| Status status = statestore_subscriber_->Start(); |
| if (!status.ok()) { |
| status.AddDetail("Statestore subscriber did not start up."); |
| return status; |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| } |