| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "scheduling/admission-controller.h" |
| |
| #include "common/names.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/logging_test_util.h" |
| #include "runtime/bufferpool/reservation-util.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/test-env.h" |
| #include "scheduling/query-schedule.h" |
| #include "service/fe-support.h" |
| #include "service/impala-server.h" |
| #include "testutil/gtest-util.h" |
| #include "testutil/scoped-flag-setter.h" |
| #include "util/metrics.h" |
| |
| // Access the flags that are defined in RequestPoolService. |
| DECLARE_string(fair_scheduler_allocation_path); |
| DECLARE_string(llama_site_path); |
| |
| namespace impala { |
| |
| static const string IMPALA_HOME(getenv("IMPALA_HOME")); |
| |
| // Queues used in the configuration files fair-scheduler-test2.xml and |
| // llama-site-test2.xml. |
| static const string QUEUE_A = "root.queueA"; |
| static const string QUEUE_B = "root.queueB"; |
| static const string QUEUE_C = "root.queueC"; |
| static const string QUEUE_D = "root.queueD"; |
| |
| // Host names |
| static const string HOST_1 = "host1:25000"; |
| static const string HOST_2 = "host2:25000"; |
| |
| static const int64_t MEGABYTE = 1024L * 1024L; |
| static const int64_t GIGABYTE = 1024L * MEGABYTE; |
| |
| /// Parent class for Admission Controller tests. |
| /// Common code and constants should go here. |
| /// These are single threaded tests so we access the internal data structures of |
| /// the AdmissionController object, and call methods such as 'GetPoolStats' without |
| /// taking the admission_ctrl_lock_ lock. |
| class AdmissionControllerTest : public testing::Test { |
| protected: |
| boost::scoped_ptr<TestEnv> test_env_; |
| |
| // Pool for objects to be destroyed during test teardown. |
| ObjectPool pool_; |
| |
| // Saved configuration flags for restoring the values at the end of the test. |
| std::unique_ptr<google::FlagSaver> flag_saver_; |
| |
| virtual void SetUp() { |
| // Establish a TestEnv so that ExecEnv works in tests. |
| test_env_.reset(new TestEnv); |
| flag_saver_.reset(new google::FlagSaver()); |
| ASSERT_OK(test_env_->Init()); |
| } |
| |
| virtual void TearDown() { |
| flag_saver_.reset(); |
| pool_.Clear(); |
| } |
| |
| /// Make a QuerySchedule with dummy parameters that can be used to test admission and |
| /// rejection in AdmissionController. |
| QuerySchedule* MakeQuerySchedule(string request_pool_name, int64_t mem_limit, |
| TPoolConfig& config, const int num_hosts, const int per_host_mem_estimate, |
| const int coord_mem_estimate, bool is_dedicated_coord, |
| const string& executor_group = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME) { |
| DCHECK_GT(num_hosts, 0); |
| TQueryExecRequest* request = pool_.Add(new TQueryExecRequest()); |
| request->query_ctx.request_pool = request_pool_name; |
| request->__set_per_host_mem_estimate(per_host_mem_estimate); |
| request->__set_dedicated_coord_mem_estimate(coord_mem_estimate); |
| request->__set_stmt_type(TStmtType::QUERY); |
| |
| RuntimeProfile* profile = RuntimeProfile::Create(&pool_, "pool1"); |
| TUniqueId* query_id = pool_.Add(new TUniqueId()); // always 0,0 |
| TQueryOptions* query_options = pool_.Add(new TQueryOptions()); |
| query_options->__set_mem_limit(mem_limit); |
| QuerySchedule* query_schedule = pool_.Add(new QuerySchedule( |
| *query_id, *request, *query_options, profile)); |
| query_schedule->set_executor_group(executor_group); |
| SetHostsInQuerySchedule(*query_schedule, num_hosts, is_dedicated_coord); |
| query_schedule->UpdateMemoryRequirements(config); |
| return query_schedule; |
| } |
| |
| /// Same as previous MakeQuerySchedule with fewer input (more default params). |
| QuerySchedule* MakeQuerySchedule(string request_pool_name, TPoolConfig& config, |
| const int num_hosts, const int per_host_mem_estimate, |
| const string& executor_group = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME) { |
| return MakeQuerySchedule(request_pool_name, 0, config, num_hosts, |
| per_host_mem_estimate, per_host_mem_estimate, false, executor_group); |
| } |
| |
| /// Replace the per-backend hosts in the schedule with one having 'count' hosts. |
| /// Note: no FInstanceExecParams are added so |
| /// QuerySchedule::UseDedicatedCoordEstimates() would consider this schedule as not |
| /// having anything scheduled on the backend which would result in always returning true |
| /// if a dedicated coordinator backend exists. |
| void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count, |
| bool is_dedicated_coord, int64_t min_mem_reservation_bytes = 0, |
| int64_t admit_mem_limit = 200L * MEGABYTE, int slots_to_use = 1, |
| int slots_available = 8) { |
| PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams()); |
| for (int i = 0; i < count; ++i) { |
| const string host_name = Substitute("host$0", i); |
| TNetworkAddress host_addr = MakeNetworkAddress(host_name, 25000); |
| BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams()); |
| backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes; |
| backend_exec_params->slots_to_use = slots_to_use; |
| backend_exec_params->be_desc.__set_admit_mem_limit(admit_mem_limit); |
| backend_exec_params->be_desc.__set_admission_slots(slots_available); |
| backend_exec_params->be_desc.__set_is_executor(true); |
| backend_exec_params->be_desc.__set_address(host_addr); |
| if (i == 0) { |
| // Add first element as the coordinator. |
| backend_exec_params->is_coord_backend = true; |
| backend_exec_params->be_desc.__set_is_executor(!is_dedicated_coord); |
| } |
| per_backend_exec_params->emplace(host_addr, *backend_exec_params); |
| } |
| query_schedule.set_per_backend_exec_params(*per_backend_exec_params); |
| } |
| |
| /// Extract the host network addresses from 'schedule'. |
| vector<TNetworkAddress> GetHostAddrs(const QuerySchedule& schedule) { |
| vector<TNetworkAddress> host_addrs; |
| for (auto& backend_state : schedule.per_backend_exec_params()) { |
| host_addrs.push_back(backend_state.first); |
| } |
| return host_addrs; |
| } |
| |
| /// Set the slots in use for all the hosts in 'host_addrs'. |
| void SetSlotsInUse(AdmissionController* admission_controller, |
| const vector<TNetworkAddress>& host_addrs, int slots_in_use) { |
| for (TNetworkAddress host_addr : host_addrs) { |
| string host = TNetworkAddressToString(host_addr); |
| admission_controller->host_stats_[host].slots_in_use = slots_in_use; |
| } |
| } |
| |
| /// Build a TTopicDelta object for IMPALA_REQUEST_QUEUE_TOPIC. |
| static TTopicDelta MakeTopicDelta(const bool is_delta) { |
| TTopicDelta delta; |
| delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC; |
| delta.is_delta = is_delta; |
| return delta; |
| } |
| |
| /// Build a TPoolStats object. |
| static TPoolStats MakePoolStats(const int backend_mem_reserved, |
| const int num_admitted_running, const int num_queued) { |
| TPoolStats stats; |
| stats.backend_mem_reserved = backend_mem_reserved; |
| stats.num_admitted_running = num_admitted_running; |
| stats.num_queued = num_queued; |
| return stats; |
| } |
| |
| /// Add a TPoolStats to the TTopicDelta 'delta' with a key created from 'host' and |
| /// 'pool_name' |
| static void AddStatsToTopic( |
| TTopicDelta* topic, const string host, const string pool_name, TPoolStats stats) { |
| // Build topic item. |
| TTopicItem item; |
| item.key = AdmissionController::MakePoolTopicKey(pool_name, host); |
| ThriftSerializer serializer(false); |
| Status status = serializer.SerializeToString(&stats, &item.value); |
| DCHECK(status.ok()); |
| |
| // Add to the topic. |
| topic->topic_entries.push_back(item); |
| } |
| |
| /// Check that PoolConfig can be read from a RequestPoolService, and that the |
| /// configured values are as expected. |
| static void CheckPoolConfig(RequestPoolService& request_pool_service, |
| const string pool_name, const int64_t max_requests, const int64_t max_mem_resources, |
| const int64_t queue_timeout_ms, const bool clamp_mem_limit_query_option, |
| const int64_t min_query_mem_limit = 0, const int64_t max_query_mem_limit = 0, |
| const double max_running_queries_multiple = 0.0, |
| const double max_queued_queries_multiple = 0.0, |
| const int64_t max_memory_multiple = 0) { |
| TPoolConfig config; |
| ASSERT_OK(request_pool_service.GetPoolConfig(pool_name, &config)); |
| |
| ASSERT_EQ(max_requests, config.max_requests); |
| ASSERT_EQ(max_mem_resources, config.max_mem_resources); |
| ASSERT_EQ(queue_timeout_ms, config.queue_timeout_ms); |
| ASSERT_EQ(clamp_mem_limit_query_option, config.clamp_mem_limit_query_option); |
| ASSERT_EQ(min_query_mem_limit, config.min_query_mem_limit); |
| ASSERT_EQ(max_query_mem_limit, config.max_query_mem_limit); |
| ASSERT_EQ(max_running_queries_multiple, config.max_running_queries_multiple); |
| ASSERT_EQ(max_queued_queries_multiple, config.max_queued_queries_multiple); |
| ASSERT_EQ(max_memory_multiple, config.max_memory_multiple); |
| } |
| |
| /// Check that a PoolStats object has all zero values. |
| static void CheckPoolStatsEmpty(AdmissionController::PoolStats* pool_stats) { |
| ASSERT_EQ(0, pool_stats->agg_mem_reserved_); |
| ASSERT_EQ(0, pool_stats->agg_num_queued_); |
| ASSERT_EQ(0, pool_stats->agg_num_running_); |
| ASSERT_EQ(0, pool_stats->local_mem_admitted_); |
| ASSERT_EQ(0, pool_stats->local_stats_.num_queued); |
| ASSERT_EQ(0, pool_stats->local_stats_.num_admitted_running); |
| ASSERT_EQ(0, pool_stats->local_stats_.backend_mem_reserved); |
| ASSERT_EQ(0, pool_stats->metrics()->agg_num_queued->GetValue()); |
| ASSERT_EQ(0, pool_stats->metrics()->agg_num_running->GetValue()); |
| } |
| |
| /// Check the calculations made by GetMaxQueuedForPool and GetMaxRequestsForPool are |
| /// rounded correctly. |
| static void CheckRoundingForPool(AdmissionController* admission_controller, |
| const int expected_result, const double multiple, const int host_count) { |
| TPoolConfig config_round; |
| config_round.max_queued_queries_multiple = multiple; |
| config_round.max_queued = 0; |
| config_round.max_running_queries_multiple = multiple; |
| config_round.max_requests = 0; |
| |
| int64_t num_queued_rounded = |
| admission_controller->GetMaxQueuedForPool(config_round, host_count); |
| ASSERT_EQ(expected_result, num_queued_rounded) |
| << "with max_queued_queries_multiple=" << config_round.max_queued_queries_multiple |
| << " host_count=" << host_count; |
| |
| int64_t num_requests_rounded = |
| admission_controller->GetMaxRequestsForPool(config_round, host_count); |
| ASSERT_EQ(expected_result, num_requests_rounded) |
| << "with max_running_queries_multiple=" |
| << config_round.max_running_queries_multiple << " host_count=" << host_count; |
| } |
| |
| /// Return the path of the configuration file in the test resources directory |
| /// that has name 'file_name'. |
| static string GetResourceFile(const string& file_name) { |
| return Substitute("$0/fe/src/test/resources/$1", IMPALA_HOME, file_name); |
| } |
| |
| /// Make an AdmissionController with some dummy parameters |
| AdmissionController* MakeAdmissionController() { |
| // Create a RequestPoolService which will read the configuration files. |
| MetricGroup* metric_group = pool_.Add(new MetricGroup("impala-metrics")); |
| RequestPoolService* request_pool_service = |
| pool_.Add(new RequestPoolService(metric_group)); |
| TNetworkAddress* addr = pool_.Add(new TNetworkAddress()); |
| addr->__set_hostname("host0"); |
| addr->__set_port(25000); |
| ClusterMembershipMgr* cmm = |
| pool_.Add(new ClusterMembershipMgr("", nullptr, metric_group)); |
| return pool_.Add(new AdmissionController( |
| cmm, nullptr, request_pool_service, metric_group, *addr)); |
| } |
| |
| static void checkPoolDisabled(bool expected_result, int64_t max_requests, |
| double max_running_queries_multiple, int64_t max_mem_resources, |
| int64_t max_memory_multiple) { |
| TPoolConfig pool_config; |
| pool_config.max_requests = max_requests; |
| pool_config.max_running_queries_multiple = max_running_queries_multiple; |
| pool_config.max_mem_resources = max_mem_resources; |
| pool_config.max_memory_multiple = max_memory_multiple; |
| ASSERT_EQ(expected_result, AdmissionController::PoolDisabled(pool_config)); |
| } |
| }; |
| |
| /// Test that AdmissionController will admit a query into a pool, then simulate other |
| /// work being added to the pool, and then test that the AdmissionController will no |
| /// longer admit the query. |
| TEST_F(AdmissionControllerTest, Simple) { |
| // Pass the paths of the configuration files as command line flags |
| FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); |
| FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); |
| |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| // Get the PoolConfig for QUEUE_C ("root.queueC"). |
| TPoolConfig config_c; |
| ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_C, &config_c)); |
| |
| // Create a QuerySchedule to run on QUEUE_C. |
| QuerySchedule* query_schedule = MakeQuerySchedule(QUEUE_C, config_c, 1, 64L * MEGABYTE); |
| query_schedule->UpdateMemoryRequirements(config_c); |
| |
| // Check that the AdmissionController initially has no data about other hosts. |
| ASSERT_EQ(0, admission_controller->host_stats_.size()); |
| |
| // Check that the query can be admitted. |
| string not_admitted_reason; |
| int64_t host_count = 1; |
| ASSERT_TRUE(admission_controller->CanAdmitRequest( |
| *query_schedule, config_c, host_count, true, ¬_admitted_reason)); |
| |
| // Create a QuerySchedule just like 'query_schedule' but to run on 3 hosts. |
| QuerySchedule* query_schedule_3_hosts = |
| MakeQuerySchedule(QUEUE_C, config_c, 3, 64L * MEGABYTE); |
| host_count = 3; |
| // This won't run as configuration using 'max_mem_resources' is not scalable. |
| ASSERT_FALSE(admission_controller->CanAdmitRequest( |
| *query_schedule_3_hosts, config_c, host_count, true, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "Not enough aggregate memory available in pool root.queueC with max mem " |
| "resources 128.00 MB (configured statically). Needed 192.00 MB but only 128.00 " |
| "MB was available."); |
| |
| // Make a TopicDeltaMap describing some activity on host1 and host2. |
| TTopicDelta membership = MakeTopicDelta(false); |
| AddStatsToTopic(&membership, HOST_1, QUEUE_B, MakePoolStats(1000, 1, 0)); |
| AddStatsToTopic(&membership, HOST_1, QUEUE_C, MakePoolStats(5000, 10, 0)); |
| AddStatsToTopic(&membership, HOST_2, QUEUE_C, MakePoolStats(5000, 1, 0)); |
| |
| // Imitate the StateStore passing updates on query activity to the |
| // AdmissionController. |
| StatestoreSubscriber::TopicDeltaMap incoming_topic_deltas; |
| incoming_topic_deltas.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, membership); |
| vector<TTopicDelta> outgoing_topic_updates; |
| admission_controller->UpdatePoolStats(incoming_topic_deltas, &outgoing_topic_updates); |
| |
| // Check that the AdmissionController has aggregated the remote stats. |
| ASSERT_EQ(3, admission_controller->host_stats_.size()); |
| ASSERT_EQ(6000, admission_controller->host_stats_[HOST_1].mem_reserved); |
| ASSERT_EQ(5000, admission_controller->host_stats_[HOST_2].mem_reserved); |
| |
| // Check the PoolStats for QUEUE_C. |
| AdmissionController::PoolStats* pool_stats = |
| admission_controller->GetPoolStats(QUEUE_C); |
| ASSERT_EQ(10000, pool_stats->agg_mem_reserved_); |
| ASSERT_EQ(11, pool_stats->agg_num_running_); |
| |
| // Test that the query cannot be admitted now. |
| ASSERT_FALSE(admission_controller->CanAdmitRequest( |
| *query_schedule, config_c, host_count, true, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "number of running queries 11 is at or over limit 10 (configured statically)."); |
| } |
| |
| /// Test rounding of scalable configuration parameters. |
| TEST_F(AdmissionControllerTest, CheckRounding) { |
| // Pass the paths of the configuration files as command line flags. |
| FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); |
| FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); |
| |
| AdmissionController* ac = MakeAdmissionController(); // Short name to make code neat. |
| |
| // The scalable configuration parameters 'max_running_queries_multiple' and |
| // 'max_queued_queries_multiple' are scaled by multiplying by the number of hosts. |
| // If the result is non-zero then this is rounded up. |
| CheckRoundingForPool(ac, /*expected*/ 0, /*parameter*/ 0, /*num hosts*/ 100); |
| CheckRoundingForPool(ac, /*expected*/ 3, /*parameter*/ 0.3, /*num hosts*/ 10); |
| CheckRoundingForPool(ac, /*expected*/ 4, /*parameter*/ 0.31, /*num hosts*/ 10); |
| CheckRoundingForPool(ac, /*expected*/ 1, /*parameter*/ 0.3, /*num hosts*/ 3); |
| CheckRoundingForPool(ac, /*expected*/ 1, /*parameter*/ 0.1, /*num hosts*/ 3); |
| CheckRoundingForPool(ac, /*expected*/ 3, /*parameter*/ 0.3, /*num hosts*/ 9); |
| CheckRoundingForPool(ac, /*expected*/ 2, /*parameter*/ 0.5, /*num hosts*/ 3); |
| CheckRoundingForPool(ac, /*expected*/ 10000, /*parameter*/ 100, /*num hosts*/ 100); |
| } |
| |
| /// Test CanAdmitRequest using scalable memory parameter 'max_memory_multiple'. |
| TEST_F(AdmissionControllerTest, CanAdmitRequestMemory) { |
| // Pass the paths of the configuration files as command line flags. |
| FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); |
| FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); |
| |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| // Get the PoolConfig for QUEUE_D ("root.queueD"). |
| TPoolConfig config_d; |
| ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_D, &config_d)); |
| // This queue is using a scalable amount of memory. |
| ASSERT_EQ(40L * MEGABYTE, config_d.max_memory_multiple); |
| |
| // Check the PoolStats for QUEUE_D. |
| AdmissionController::PoolStats* pool_stats = |
| admission_controller->GetPoolStats(QUEUE_D); |
| CheckPoolStatsEmpty(pool_stats); |
| |
| // Create a QuerySchedule to run on QUEUE_D with per_host_mem_estimate of 30MB. |
| int64_t host_count = 2; |
| QuerySchedule* query_schedule = |
| MakeQuerySchedule(QUEUE_D, config_d, host_count, 30L * MEGABYTE); |
| |
| // Check that the query can be admitted. |
| string not_admitted_reason; |
| ASSERT_TRUE(admission_controller->CanAdmitRequest( |
| *query_schedule, config_d, host_count, true, ¬_admitted_reason)); |
| |
| // The query scales with cluster size of 1000. |
| host_count = 1000; |
| QuerySchedule* query_schedule1000 = |
| MakeQuerySchedule(QUEUE_D, config_d, host_count, 30L * MEGABYTE); |
| ASSERT_TRUE(admission_controller->CanAdmitRequest( |
| *query_schedule1000, config_d, host_count, true, ¬_admitted_reason)); |
| |
| // Create a QuerySchedule to run on QUEUE_D with per_host_mem_estimate of 50MB. |
| // which is going to be too much memory. |
| host_count = 10; |
| QuerySchedule* query_schedule_10_fail = |
| MakeQuerySchedule(QUEUE_D, config_d, host_count, 50L * MEGABYTE); |
| |
| // Test that this query cannot be admitted. |
| ASSERT_FALSE(admission_controller->CanAdmitRequest( |
| *query_schedule_10_fail, config_d, host_count, true, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "Not enough aggregate memory available in pool root.queueD with max mem resources " |
| "400.00 MB (calculated as 10 backends each with 40.00 MB). Needed 500.00 MB but " |
| "only 400.00 MB was available."); |
| } |
| |
| /// Test CanAdmitRequest using scalable parameter 'max_running_queries_multiple'. |
| TEST_F(AdmissionControllerTest, CanAdmitRequestCount) { |
| // Pass the paths of the configuration files as command line flags. |
| FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); |
| FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); |
| |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| // Get the PoolConfig for QUEUE_D ("root.queueD"). |
| TPoolConfig config_d; |
| ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_D, &config_d)); |
| |
| // This queue can run a scalable number of queries. |
| ASSERT_EQ(0.5, config_d.max_running_queries_multiple); |
| ASSERT_EQ(2.5, config_d.max_queued_queries_multiple); |
| |
| // Check the PoolStats for QUEUE_D. |
| AdmissionController::PoolStats* pool_stats = |
| admission_controller->GetPoolStats(QUEUE_D); |
| CheckPoolStatsEmpty(pool_stats); |
| |
| // Create a QuerySchedule to run on QUEUE_D on 12 hosts. |
| int64_t host_count = 12; |
| QuerySchedule* query_schedule = |
| MakeQuerySchedule(QUEUE_D, config_d, host_count, 30L * MEGABYTE); |
| string not_admitted_reason; |
| |
| // Simulate that there are 2 queries queued. |
| pool_stats->local_stats_.num_queued = 2; |
| |
| // Query can be admitted from queue... |
| ASSERT_TRUE(admission_controller->CanAdmitRequest( |
| *query_schedule, config_d, host_count, true, ¬_admitted_reason)); |
| // ... but same Query cannot be admitted directly. |
| ASSERT_FALSE(admission_controller->CanAdmitRequest( |
| *query_schedule, config_d, host_count, false, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "queue is not empty (size 2); queued queries are executed first"); |
| |
| // Simulate that there are 7 queries already running. |
| pool_stats->agg_num_running_ = 7; |
| ASSERT_FALSE(admission_controller->CanAdmitRequest( |
| *query_schedule, config_d, host_count, true, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "number of running queries 7 is at or over limit 6 (calculated as 12 backends each " |
| "with 0.5 queries)"); |
| } |
| |
| /// Test CanAdmitRequest() using the slots mechanism that is enabled with non-default |
| /// executor groups. |
| TEST_F(AdmissionControllerTest, CanAdmitRequestSlots) { |
| // Pass the paths of the configuration files as command line flags. |
| FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); |
| FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); |
| |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| // Get the PoolConfig for QUEUE_D ("root.queueD"). |
| TPoolConfig config_d; |
| ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_D, &config_d)); |
| |
| // Create QuerySchedules to run on QUEUE_D on 12 hosts. |
| // Running in both default and non-default executor groups is simulated. |
| int64_t host_count = 12; |
| int64_t slots_per_host = 16; |
| int64_t slots_per_query = 4; |
| QuerySchedule* default_group_schedule = |
| MakeQuerySchedule(QUEUE_D, config_d, host_count, 30L * MEGABYTE); |
| QuerySchedule* other_group_schedule = |
| MakeQuerySchedule(QUEUE_D, config_d, host_count, 30L * MEGABYTE, "other_group"); |
| for (QuerySchedule* schedule : {default_group_schedule, other_group_schedule}) { |
| SetHostsInQuerySchedule(*schedule, 2, false, |
| MEGABYTE, 200L * MEGABYTE, slots_per_query, slots_per_host); |
| } |
| vector<TNetworkAddress> host_addrs = GetHostAddrs(*default_group_schedule); |
| string not_admitted_reason; |
| |
| // Simulate that there are just enough slots free for the query on all hosts. |
| SetSlotsInUse(admission_controller, host_addrs, slots_per_host - slots_per_query); |
| |
| // Enough slots are available so it can be admitted in both cases. |
| ASSERT_TRUE(admission_controller->CanAdmitRequest( |
| *default_group_schedule, config_d, host_count, true, ¬_admitted_reason)) |
| << not_admitted_reason; |
| ASSERT_TRUE(admission_controller->CanAdmitRequest( |
| *other_group_schedule, config_d, host_count, true, ¬_admitted_reason)) |
| << not_admitted_reason; |
| |
| // Simulate that almost all the slots are in use, which prevents admission in the |
| // non-default group. |
| SetSlotsInUse(admission_controller, host_addrs, slots_per_host - 1); |
| |
| ASSERT_TRUE(admission_controller->CanAdmitRequest( |
| *default_group_schedule, config_d, host_count, true, ¬_admitted_reason)) |
| << not_admitted_reason; |
| ASSERT_FALSE(admission_controller->CanAdmitRequest( |
| *other_group_schedule, config_d, host_count, true, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "Not enough admission control slots available on host host1:25000. Needed 4 " |
| "slots but 15/16 are already in use."); |
| } |
| |
| /// Tests that query rejection works as expected by calling RejectForSchedule() and |
| /// RejectForCluster() directly. |
| TEST_F(AdmissionControllerTest, QueryRejection) { |
| // Pass the paths of the configuration files as command line flags. |
| FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); |
| FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); |
| |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| // Get the PoolConfig for QUEUE_D ("root.queueD"). |
| TPoolConfig config_d; |
| ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_D, &config_d)); |
| |
| // Check the PoolStats for QUEUE_D. |
| AdmissionController::PoolStats* pool_stats = |
| admission_controller->GetPoolStats(QUEUE_D); |
| CheckPoolStatsEmpty(pool_stats); |
| |
| // Create a QuerySchedule to run on QUEUE_D with per_host_mem_estimate of 50MB |
| // which is going to be too much memory. |
| int host_count = 10; |
| QuerySchedule* query_schedule = |
| MakeQuerySchedule(QUEUE_D, config_d, host_count, 50L * MEGABYTE); |
| |
| // Check messages from RejectForSchedule(). |
| string rejected_reason; |
| ASSERT_TRUE(admission_controller->RejectForSchedule( |
| *query_schedule, config_d, host_count, host_count, &rejected_reason)); |
| EXPECT_STR_CONTAINS(rejected_reason, |
| "request memory needed 500.00 MB is greater than pool max mem resources 400.00 MB " |
| "(calculated as 10 backends each with 40.00 MB)"); |
| |
| // Adjust the QuerySchedule to have minimum memory reservation of 45MB. |
| // This will be rejected immediately as minimum memory reservation is too high. |
| SetHostsInQuerySchedule(*query_schedule, host_count, false, 45L * MEGABYTE); |
| string rejected_reserved_reason; |
| ASSERT_TRUE(admission_controller->RejectForSchedule( |
| *query_schedule, config_d, host_count, host_count, &rejected_reserved_reason)); |
| EXPECT_STR_CONTAINS(rejected_reserved_reason, |
| "minimum memory reservation needed is greater than pool max mem resources. Pool " |
| "max mem resources: 400.00 MB (calculated as 10 backends each with 40.00 MB). " |
| "Cluster-wide memory reservation needed: 450.00 MB. Increase the pool max mem " |
| "resources."); |
| |
| // Adjust the QuerySchedule to require many slots per node. |
| // This will be rejected immediately in non-default executor groups |
| // as the nodes do not have that many slots. Because of IMPALA-8757, this check |
| // does not yet occur for the default executor group. |
| SetHostsInQuerySchedule(*query_schedule, 2, false, MEGABYTE, 200L * MEGABYTE, 16, 4); |
| string rejected_slots_reason; |
| // Don't reject for default executor group. |
| EXPECT_FALSE(admission_controller->RejectForSchedule( |
| *query_schedule, config_d, host_count, host_count, &rejected_slots_reason)) |
| << rejected_slots_reason; |
| // Reject for non-default executor group. |
| QuerySchedule* other_group_schedule = MakeQuerySchedule( |
| QUEUE_D, config_d, host_count, 50L * MEGABYTE, "a_different_executor_group"); |
| SetHostsInQuerySchedule( |
| *other_group_schedule, 2, false, MEGABYTE, 200L * MEGABYTE, 16, 4); |
| EXPECT_TRUE(admission_controller->RejectForSchedule( |
| *other_group_schedule, config_d, host_count, host_count, &rejected_slots_reason)); |
| EXPECT_STR_CONTAINS(rejected_slots_reason, "number of admission control slots needed " |
| "(16) on backend 'host1:25000' is greater than total slots available 4. Reduce " |
| "mt_dop to less than 4 to ensure that the query can execute."); |
| rejected_slots_reason = ""; |
| // Reduce mt_dop to ensure it can execute. |
| SetHostsInQuerySchedule( |
| *other_group_schedule, 2, false, MEGABYTE, 200L * MEGABYTE, 4, 4); |
| EXPECT_FALSE(admission_controller->RejectForSchedule( |
| *other_group_schedule, config_d, host_count, host_count, &rejected_slots_reason)) |
| << rejected_slots_reason; |
| |
| // Overwrite min_query_mem_limit and max_query_mem_limit in config_d to test a message. |
| // After this config_d is unusable. |
| config_d.min_query_mem_limit = 600L * MEGABYTE; |
| config_d.max_query_mem_limit = 700L * MEGABYTE; |
| string rejected_invalid_config_reason; |
| ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_d, |
| /* admit_from_queue=*/false, host_count, &rejected_invalid_config_reason)); |
| EXPECT_STR_CONTAINS(rejected_invalid_config_reason, |
| "The min_query_mem_limit 629145600 is greater than the current max_mem_resources " |
| "419430400 (calculated as 10 backends each with 40.00 MB); queries will not be " |
| "admitted until more executors are available."); |
| |
| TPoolConfig config_disabled_queries; |
| config_disabled_queries.max_requests = 0; |
| string rejected_queries_reason; |
| ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_disabled_queries, |
| /* admit_from_queue=*/false, host_count, &rejected_queries_reason)); |
| EXPECT_STR_CONTAINS(rejected_queries_reason, "disabled by requests limit set to 0"); |
| |
| TPoolConfig config_disabled_memory; |
| config_disabled_memory.max_requests = 1; |
| config_disabled_memory.max_mem_resources = 0; |
| config_disabled_memory.max_memory_multiple = 0; |
| string rejected_mem_reason; |
| ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_disabled_memory, |
| /* admit_from_queue=*/false, host_count, &rejected_mem_reason)); |
| EXPECT_STR_CONTAINS(rejected_mem_reason, "disabled by pool max mem resources set to 0"); |
| |
| TPoolConfig config_queue_small; |
| config_queue_small.max_requests = 1; |
| config_queue_small.max_queued = 3; |
| config_queue_small.max_mem_resources = 600 * MEGABYTE; |
| pool_stats->agg_num_queued_ = 3; |
| string rejected_queue_length_reason; |
| ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_queue_small, |
| /* admit_from_queue=*/false, host_count, &rejected_queue_length_reason)); |
| EXPECT_STR_CONTAINS(rejected_queue_length_reason, |
| "queue full, limit=3 (configured statically), num_queued=3."); |
| |
| // Make max_queued_queries_multiple small so that rejection is becasue of number of |
| // queries that can run be queued. |
| config_queue_small.max_queued_queries_multiple = 0.3; |
| string rejected_queue_multiple_reason; |
| ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_queue_small, |
| /* admit_from_queue=*/false, host_count, &rejected_queue_multiple_reason)); |
| EXPECT_STR_CONTAINS(rejected_queue_multiple_reason, |
| "queue full, limit=3 (calculated as 10 backends each with 0.3 queries), " |
| "num_queued=3."); |
| } |
| |
| /// Test GetMaxToDequeue() method. |
| TEST_F(AdmissionControllerTest, GetMaxToDequeue) { |
| // Pass the paths of the configuration files as command line flags |
| FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); |
| FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); |
| |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| // Get the PoolConfig for QUEUE_C and QUEUE_D |
| TPoolConfig config_c; |
| TPoolConfig config_d; |
| TPoolConfig config; |
| ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_D, &config_d)); |
| AdmissionController::RequestQueue& queue_c = |
| admission_controller->request_queue_map_[QUEUE_C]; |
| ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_C, &config_c)); |
| AdmissionController::RequestQueue& queue_d = |
| admission_controller->request_queue_map_[QUEUE_D]; |
| |
| AdmissionController::PoolStats* stats_c = admission_controller->GetPoolStats(QUEUE_C); |
| AdmissionController::PoolStats* stats_d = admission_controller->GetPoolStats(QUEUE_D); |
| |
| int64_t max_to_dequeue; |
| int64_t host_count = 1; |
| |
| // Queue is empty, so nothing to dequeue |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, stats_c, config_c, host_count); |
| ASSERT_EQ(0, max_to_dequeue); |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_d, stats_d, config_d, host_count); |
| ASSERT_EQ(0, max_to_dequeue); |
| |
| AdmissionController::PoolStats stats(admission_controller, "test"); |
| |
| // First of all test non-scalable configuration. |
| |
| // Queue holds 10 with 10 running - cannot dequeue |
| config.max_requests = 10; |
| stats.local_stats_.num_queued = 10; |
| stats.agg_num_queued_ = 20; |
| stats.agg_num_running_ = 10; |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, &stats, config, host_count); |
| ASSERT_EQ(0, max_to_dequeue); |
| |
| // Can only dequeue 1. |
| stats.agg_num_running_ = 9; |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, &stats, config, host_count); |
| ASSERT_EQ(1, max_to_dequeue); |
| |
| // There is space for 10 but it looks like there are 2 coordinators. |
| stats.agg_num_running_ = 0; |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, &stats, config, host_count); |
| ASSERT_EQ(5, max_to_dequeue); |
| |
| // Now test scalable configuration. |
| |
| config.max_running_queries_multiple = 0.5; |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, &stats, config, host_count); |
| ASSERT_EQ(1, max_to_dequeue); |
| |
| config.max_running_queries_multiple = 5; |
| // At this point the host_count is one, so the estimate of the pool |
| // size will be 1. This coordinator will take its share (1/2) of the 5 that can run |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, &stats, config, host_count); |
| ASSERT_EQ(2, max_to_dequeue); |
| |
| // Add a lot of hosts, limitation will now be number queued |
| host_count = 100; |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, &stats, config, host_count); |
| ASSERT_EQ(stats.local_stats_.num_queued, max_to_dequeue); |
| |
| // Increase number queued. |
| host_count = 200; |
| stats.local_stats_.num_queued = host_count; |
| stats.agg_num_queued_ = host_count; |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, &stats, config, host_count); |
| ASSERT_EQ(stats.local_stats_.num_queued, max_to_dequeue); |
| |
| // Test max_running_queries_multiple less than 1. |
| config.max_running_queries_multiple = 0.5; |
| max_to_dequeue = |
| admission_controller->GetMaxToDequeue(queue_c, &stats, config, host_count); |
| ASSERT_EQ(100, max_to_dequeue); |
| } |
| |
| /// Test that RequestPoolService correctly reads configuration files. |
| TEST_F(AdmissionControllerTest, Config) { |
| // Pass the paths of the configuration files as command line flags |
| FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); |
| FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); |
| |
| // Create a RequestPoolService which will read the configuration files. |
| MetricGroup metric_group("impala-metrics"); |
| RequestPoolService request_pool_service(&metric_group); |
| |
| // Test that the pool configurations can be read correctly. |
| CheckPoolConfig(request_pool_service, "non-existent queue", 5, -1, 30000, true); |
| CheckPoolConfig(request_pool_service, QUEUE_A, 1, 100000L * MEGABYTE, 50, true); |
| CheckPoolConfig(request_pool_service, QUEUE_B, 5, -1, 600000, true); |
| CheckPoolConfig(request_pool_service, QUEUE_C, 10, 128L * MEGABYTE, 30000, true); |
| CheckPoolConfig(request_pool_service, QUEUE_D, 5, MEGABYTE * 1024L, 30000, true, 10, |
| 60L * MEGABYTE, 0.5, 2.5, 40L * MEGABYTE); |
| } |
| |
| /// Unit test for PoolStats |
| TEST_F(AdmissionControllerTest, PoolStats) { |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| // Get the PoolConfig for QUEUE_C ("root.queueC"). |
| TPoolConfig config_c; |
| ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_C, &config_c)); |
| |
| // Create a QuerySchedule to run on QUEUE_C. |
| QuerySchedule* query_schedule = MakeQuerySchedule(QUEUE_C, config_c, 1, 1000); |
| |
| // Get the PoolStats for QUEUE_C. |
| AdmissionController::PoolStats* pool_stats = |
| admission_controller->GetPoolStats(QUEUE_C); |
| CheckPoolStatsEmpty(pool_stats); |
| |
| // Show that Queue and Dequeue leave stats at zero. |
| pool_stats->Queue(); |
| ASSERT_EQ(1, pool_stats->agg_num_queued()); |
| ASSERT_EQ(1, pool_stats->metrics()->agg_num_queued->GetValue()); |
| pool_stats->Dequeue(false); |
| CheckPoolStatsEmpty(pool_stats); |
| |
| // Show that Admit and Release leave stats at zero. |
| pool_stats->AdmitQueryAndMemory(*query_schedule); |
| ASSERT_EQ(1, pool_stats->agg_num_running()); |
| ASSERT_EQ(1, pool_stats->metrics()->agg_num_running->GetValue()); |
| int64_t mem_to_release = 0; |
| for (auto& backend_state : query_schedule->per_backend_exec_params()) { |
| mem_to_release += |
| admission_controller->GetMemToAdmit(*query_schedule, backend_state.second); |
| } |
| pool_stats->ReleaseMem(mem_to_release); |
| pool_stats->ReleaseQuery(0); |
| CheckPoolStatsEmpty(pool_stats); |
| } |
| |
| /// Test that PoolDisabled works |
| TEST_F(AdmissionControllerTest, PoolDisabled) { |
| checkPoolDisabled(true, /* max_requests */ 0, /* max_running_queries_multiple */ 0, |
| /* max_mem_resources */ 0, /* max_memory_multiple */ 0); |
| checkPoolDisabled(false, /* max_requests */ 1, /* max_running_queries_multiple */ 0, |
| /* max_mem_resources */ 1, /* max_memory_multiple */ 0); |
| checkPoolDisabled(false, /* max_requests */ 0, /* max_running_queries_multiple */ 1.0, |
| /* max_mem_resources */ 0, /* max_memory_multiple */ 1); |
| checkPoolDisabled(true, /* max_requests */ 0, /* max_running_queries_multiple */ 0, |
| /* max_mem_resources */ 0, /* max_memory_multiple */ 1); |
| checkPoolDisabled(true, /* max_requests */ 0, /* max_running_queries_multiple */ 1.0, |
| /* max_mem_resources */ 0, /* max_memory_multiple */ 0); |
| } |
| |
| // Basic tests of the QuerySchedule object to confirm that a query with different |
| // coordinator and executor memory estimates calculates memory to admit correctly |
| // for various combinations of memory limit configurations. |
| TEST_F(AdmissionControllerTest, DedicatedCoordQuerySchedule) { |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| const int64_t PER_EXEC_MEM_ESTIMATE = 512 * MEGABYTE; |
| const int64_t COORD_MEM_ESTIMATE = 150 * MEGABYTE; |
| TPoolConfig pool_config; |
| ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config)); |
| |
| // For query only running on the coordinator, the per_backend_mem_to_admit should be 0. |
| QuerySchedule* query_schedule = MakeQuerySchedule( |
| "default", 0, pool_config, 1, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true); |
| query_schedule->UpdateMemoryRequirements(pool_config); |
| ASSERT_EQ(0, query_schedule->per_backend_mem_to_admit()); |
| ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit()); |
| |
| // Make sure executors and coordinators are assigned memory to admit appropriately and |
| // that the cluster memory to admitted is calculated correctly. |
| query_schedule = MakeQuerySchedule( |
| "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true); |
| ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->GetDedicatedCoordMemoryEstimate()); |
| ASSERT_EQ(PER_EXEC_MEM_ESTIMATE, query_schedule->GetPerExecutorMemoryEstimate()); |
| query_schedule->UpdateMemoryRequirements(pool_config); |
| ASSERT_EQ(PER_EXEC_MEM_ESTIMATE, query_schedule->per_backend_mem_to_admit()); |
| ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit()); |
| ASSERT_EQ(-1, query_schedule->per_backend_mem_limit()); |
| ASSERT_EQ(-1, query_schedule->coord_backend_mem_limit()); |
| ASSERT_EQ(COORD_MEM_ESTIMATE + PER_EXEC_MEM_ESTIMATE, |
| query_schedule->GetClusterMemoryToAdmit()); |
| |
| // Set the min_query_mem_limit in pool_config. min_query_mem_limit should |
| // not be enforced on the coordinator. Also ensure mem limits are set for both. |
| query_schedule = MakeQuerySchedule( |
| "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true); |
| ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config)); |
| pool_config.__set_min_query_mem_limit(700 * MEGABYTE); |
| query_schedule->UpdateMemoryRequirements(pool_config); |
| ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit()); |
| ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit()); |
| ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit()); |
| ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_limit()); |
| |
| // Make sure coordinator's mem to admit is adjusted based on its own minimum mem |
| // reservation. |
| query_schedule = MakeQuerySchedule( |
| "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true); |
| int64_t coord_min_reservation = 200 * MEGABYTE; |
| int64_t min_coord_mem_limit_required = |
| ReservationUtil::GetMinMemLimitFromReservation(coord_min_reservation); |
| pool_config.__set_min_query_mem_limit(700 * MEGABYTE); |
| query_schedule->set_coord_min_reservation(200 * MEGABYTE); |
| query_schedule->UpdateMemoryRequirements(pool_config); |
| ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit()); |
| ASSERT_EQ(min_coord_mem_limit_required, query_schedule->coord_backend_mem_to_admit()); |
| ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit()); |
| ASSERT_EQ(min_coord_mem_limit_required, query_schedule->coord_backend_mem_limit()); |
| |
| // Set mem_limit query option. |
| ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config)); |
| query_schedule = MakeQuerySchedule("default", GIGABYTE, pool_config, 2, |
| PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true); |
| query_schedule->UpdateMemoryRequirements(pool_config); |
| ASSERT_EQ(GIGABYTE, query_schedule->per_backend_mem_to_admit()); |
| ASSERT_EQ(GIGABYTE, query_schedule->coord_backend_mem_to_admit()); |
| ASSERT_EQ(GIGABYTE, query_schedule->per_backend_mem_limit()); |
| ASSERT_EQ(GIGABYTE, query_schedule->coord_backend_mem_limit()); |
| |
| // Set mem_limit query option and max_query_mem_limit. In this case, max_query_mem_limit |
| // will be enforced on both coordinator and executor. |
| query_schedule = MakeQuerySchedule("default", GIGABYTE, pool_config, 2, |
| PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true); |
| ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config)); |
| pool_config.__set_max_query_mem_limit(700 * MEGABYTE); |
| query_schedule->UpdateMemoryRequirements(pool_config); |
| ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit()); |
| ASSERT_EQ(700 * MEGABYTE, query_schedule->coord_backend_mem_to_admit()); |
| ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit()); |
| ASSERT_EQ(700 * MEGABYTE, query_schedule->coord_backend_mem_limit()); |
| } |
| |
| // Test admission decisions for clusters with dedicated coordinators, where different |
| // amounts of memory should be admitted on coordinators and executors. |
| TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) { |
| AdmissionController* admission_controller = MakeAdmissionController(); |
| RequestPoolService* request_pool_service = admission_controller->request_pool_service_; |
| |
| TPoolConfig pool_config; |
| ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config)); |
| pool_config.__set_max_mem_resources(2*GIGABYTE); // to enable memory based admission. |
| |
| // Set up a query schedule to test. |
| const int64_t PER_EXEC_MEM_ESTIMATE = GIGABYTE; |
| const int64_t COORD_MEM_ESTIMATE = 150 * MEGABYTE; |
| QuerySchedule* query_schedule = MakeQuerySchedule( |
| "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true); |
| PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams()); |
| // Add coordinator backend. |
| BackendExecParams* coord_exec_params = pool_.Add(new BackendExecParams()); |
| coord_exec_params->is_coord_backend = true; |
| coord_exec_params->thread_reservation = 1; |
| coord_exec_params->be_desc.__set_admit_mem_limit(512 * MEGABYTE); |
| coord_exec_params->be_desc.__set_is_executor(false); |
| const string coord_host_name = Substitute("host$0", 1); |
| TNetworkAddress coord_addr = MakeNetworkAddress(coord_host_name, 25000); |
| const string coord_host = TNetworkAddressToString(coord_addr); |
| per_backend_exec_params->emplace(coord_addr, *coord_exec_params); |
| // Add executor backend. |
| BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams()); |
| backend_exec_params->thread_reservation = 1; |
| backend_exec_params->be_desc.__set_admit_mem_limit(GIGABYTE); |
| backend_exec_params->be_desc.__set_is_executor(true); |
| const string exec_host_name = Substitute("host$0", 2); |
| TNetworkAddress exec_addr = MakeNetworkAddress(exec_host_name, 25000); |
| const string exec_host = TNetworkAddressToString(exec_addr); |
| per_backend_exec_params->emplace(exec_addr, *backend_exec_params); |
| query_schedule->set_per_backend_exec_params(*per_backend_exec_params); |
| string not_admitted_reason; |
| // Test 1: coord's admit_mem_limit < executor's admit_mem_limit. Query should not |
| // be rejected because query fits on both executor and coordinator. It should be |
| // queued if there is not enough capacity. |
| query_schedule->UpdateMemoryRequirements(pool_config); |
| ASSERT_FALSE(admission_controller->RejectForSchedule( |
| *query_schedule, pool_config, 2, 2, ¬_admitted_reason)); |
| ASSERT_TRUE(admission_controller->HasAvailableMemResources( |
| *query_schedule, pool_config, 2, ¬_admitted_reason)); |
| // Coord does not have enough available memory. |
| admission_controller->host_stats_[coord_host].mem_reserved = 500 * MEGABYTE; |
| ASSERT_FALSE(admission_controller->HasAvailableMemResources( |
| *query_schedule, pool_config, 2, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "Not enough memory available on host host1:25000. Needed 150.00 MB but only " |
| "12.00 MB out of 512.00 MB was available."); |
| not_admitted_reason.clear(); |
| // Neither coordinator or executor has enough available memory. |
| admission_controller->host_stats_[exec_host].mem_reserved = 500 * MEGABYTE; |
| ASSERT_FALSE(admission_controller->HasAvailableMemResources( |
| *query_schedule, pool_config, 2, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "Not enough memory available on host host2:25000. Needed 1.00 GB but only " |
| "524.00 MB out of 1.00 GB was available."); |
| not_admitted_reason.clear(); |
| // Executor does not have enough available memory. |
| admission_controller->host_stats_[coord_host].mem_reserved = 0; |
| ASSERT_FALSE(admission_controller->HasAvailableMemResources( |
| *query_schedule, pool_config, 2, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "Not enough memory available on host host2:25000. Needed 1.00 GB but only " |
| "524.00 MB out of 1.00 GB was available."); |
| not_admitted_reason.clear(); |
| admission_controller->host_stats_[exec_host].mem_reserved = 0; |
| |
| // Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because |
| // coord's admit_mem_limit is less than mem_to_admit on the coord. |
| // Re-using previous QuerySchedule object. |
| coord_exec_params->be_desc.__set_admit_mem_limit(100 * MEGABYTE); |
| (*per_backend_exec_params)[coord_addr] = *coord_exec_params; |
| query_schedule->set_per_backend_exec_params(*per_backend_exec_params); |
| ASSERT_TRUE(admission_controller->RejectForSchedule( |
| *query_schedule, pool_config, 2, 2, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "request memory needed 150.00 MB is greater than memory available for " |
| "admission 100.00 MB of host1:25000"); |
| ASSERT_FALSE(admission_controller->HasAvailableMemResources( |
| *query_schedule, pool_config, 2, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, |
| "Not enough memory available on host host1:25000. Needed 150.00 MB but only " |
| "100.00 MB out of 100.00 MB was available."); |
| not_admitted_reason.clear(); |
| |
| // Test 3: Make sure that coord and executors have separate checks on for whether their |
| // mem limits can accommodate their respective initial reservations. |
| query_schedule = MakeQuerySchedule( |
| "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true); |
| pool_config.__set_min_query_mem_limit(MEGABYTE); // to auto set mem_limit(s). |
| query_schedule->UpdateMemoryRequirements(pool_config); |
| query_schedule->set_largest_min_reservation(600 * MEGABYTE); |
| query_schedule->set_coord_min_reservation(50 * MEGABYTE); |
| ASSERT_TRUE(AdmissionController::CanAccommodateMaxInitialReservation( |
| *query_schedule, pool_config, ¬_admitted_reason)); |
| // Coordinator reservation doesn't fit. |
| query_schedule->set_coord_min_reservation(200 * MEGABYTE); |
| ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation( |
| *query_schedule, pool_config, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater " |
| "than memory available to the query for buffer reservations. Memory reservation " |
| "needed given the current plan: 200.00 MB"); |
| // Neither coordinator or executor reservation fits. |
| query_schedule->set_largest_min_reservation(GIGABYTE); |
| ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation( |
| *query_schedule, pool_config, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater " |
| "than memory available to the query for buffer reservations. Memory reservation " |
| "needed given the current plan: 1.00 GB"); |
| // Coordinator reservation doesn't fit. |
| query_schedule->set_coord_min_reservation(50 * MEGABYTE); |
| query_schedule->set_largest_min_reservation(GIGABYTE); |
| ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation( |
| *query_schedule, pool_config, ¬_admitted_reason)); |
| EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater " |
| "than memory available to the query for buffer reservations. Memory reservation " |
| "needed given the current plan: 1.00 GB"); |
| } |
| |
| } // end namespace impala |