| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <gmock/gmock.h> |
| |
| #include <process/clock.hpp> |
| #include <process/future.hpp> |
| #include <process/gmock.hpp> |
| #include <process/pid.hpp> |
| |
| #include <process/metrics/metrics.hpp> |
| |
| #include "master/allocator.hpp" |
| #include "master/flags.hpp" |
| #include "master/master.hpp" |
| |
| #include "tests/mesos.hpp" |
| |
| using namespace mesos; |
| using namespace mesos::internal; |
| using namespace mesos::internal::tests; |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::master::allocator::AllocatorProcess; |
| |
| using process::metrics::internal::MetricsProcess; |
| |
| using process::Clock; |
| using process::Future; |
| using process::PID; |
| |
| using std::string; |
| |
| using testing::_; |
| using testing::Eq; |
| using testing::Return; |
| |
| namespace mesos { |
| namespace internal { |
| namespace master { |
| |
| // Query Mesos metrics snapshot endpoint and return a JSON::Object |
| // result. |
| #define METRICS_SNAPSHOT \ |
| ({ Future<process::http::Response> response = \ |
| process::http::get(MetricsProcess::instance()->self(), "snapshot"); \ |
| AWAIT_READY(response); \ |
| \ |
| EXPECT_SOME_EQ( \ |
| "application/json", \ |
| response.get().headers.get("Content-Type")); \ |
| \ |
| Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body); \ |
| ASSERT_SOME(parse); \ |
| \ |
| parse.get(); }) |
| |
| |
| // This test case covers tests related to framework API rate limiting |
| // which includes metrics exporting for API call rates. |
| class RateLimitingTest : public MesosTest |
| { |
| public: |
| virtual master::Flags CreateMasterFlags() |
| { |
| master::Flags flags = MesosTest::CreateMasterFlags(); |
| |
| RateLimits limits; |
| RateLimit* limit = limits.mutable_limits()->Add(); |
| limit->set_principal(DEFAULT_CREDENTIAL.principal()); |
| // Set 1qps so that the half-second Clock::advance()s for |
| // metrics endpoint (because it also throttles requests but at |
| // 2qps) don't mess with framework rate limiting. |
| limit->set_qps(1); |
| flags.rate_limits = limits; |
| |
| return flags; |
| } |
| }; |
| |
| |
| // Verify that message counters for a framework are added when a |
| // framework registers, removed when it terminates and count messages |
| // correctly when it is given unlimited rate. |
| TEST_F(RateLimitingTest, NoRateLimiting) |
| { |
| // Give the framework unlimited rate explicitly by specifying a |
| // RateLimit entry without 'qps' |
| master::Flags flags = CreateMasterFlags(); |
| RateLimits limits; |
| RateLimit* limit = limits.mutable_limits()->Add(); |
| limit->set_principal(DEFAULT_CREDENTIAL.principal()); |
| flags.rate_limits = limits; |
| |
| Try<PID<Master> > master = StartMaster(flags); |
| ASSERT_SOME(master); |
| |
| Clock::pause(); |
| |
| // Settle to make sure master is ready for incoming requests, i.e., |
| // '_recover()' completes. |
| Clock::settle(); |
| |
| // Advance before the test so that the 1st call to Metrics endpoint |
| // is not throttled. MetricsProcess which hosts the endpoint |
| // throttles requests at 2qps and its singleton instance is shared |
| // across tests. |
| Clock::advance(Milliseconds(501)); |
| |
| // Message counters not present before the framework registers. |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| EXPECT_EQ( |
| 0u, |
| metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() + |
| "/messages_received")); |
| |
| EXPECT_EQ( |
| 0u, |
| metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() + |
| "/messages_processed")); |
| } |
| |
| MockScheduler sched; |
| // Create MesosSchedulerDriver on the heap because of the need to |
| // destroy it during the test due to MESOS-1456. |
| MesosSchedulerDriver* driver = new MesosSchedulerDriver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(driver, _, _)) |
| .Times(1); |
| |
| // Grab the stuff we need to replay the RegisterFrameworkMessage. |
| Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( |
| RegisterFrameworkMessage(), _, master.get()); |
| Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| ASSERT_EQ(DRIVER_RUNNING, driver->start()); |
| |
| AWAIT_READY(registerFrameworkMessage); |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; |
| |
| // For metrics endpoint. |
| Clock::advance(Milliseconds(501)); |
| |
| // Send a duplicate RegisterFrameworkMessage. Master sends |
| // FrameworkRegisteredMessage back after processing it. |
| { |
| Future<process::Message> duplicateFrameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| _); |
| |
| process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); |
| AWAIT_READY(duplicateFrameworkRegisteredMessage); |
| |
| // Verify that one message is received and processed (after |
| // registration). |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value); |
| |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); |
| } |
| |
| Future<Nothing> frameworkRemoved = |
| FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved); |
| |
| driver->stop(); |
| driver->join(); |
| delete driver; |
| |
| // The fact that UnregisterFrameworkMessage (the 2nd message from |
| // 'sched' that reaches Master after its registration) gets |
| // processed without Clock advances proves that the framework is |
| // given unlimited rate. |
| AWAIT_READY(frameworkRemoved); |
| |
| // For metrics endpoint. |
| Clock::advance(Milliseconds(501)); |
| |
| // Message counters removed after the framework is unregistered. |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| EXPECT_EQ( |
| 0u, |
| metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() + |
| "/messages_received")); |
| |
| EXPECT_EQ( |
| 0u, |
| metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() + |
| "/messages_processed")); |
| } |
| |
| Shutdown(); |
| } |
| |
| |
| // Verify that a framework is being correctly throttled at the |
| // configured rate. |
| TEST_F(RateLimitingTest, RateLimitingEnabled) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Clock::pause(); |
| |
| // Settle to make sure master is ready for incoming requests, i.e., |
| // '_recover()' completes. |
| Clock::settle(); |
| |
| // Advance before the test so that the 1st call to Metrics endpoint |
| // is not throttled. MetricsProcess which hosts the endpoint |
| // throttles requests at 2qps and its singleton instance is shared |
| // across tests. |
| Clock::advance(Milliseconds(501)); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .Times(1); |
| |
| // Grab the stuff we need to replay the RegisterFrameworkMessage. |
| Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( |
| RegisterFrameworkMessage(), _, master.get()); |
| Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| ASSERT_EQ(DRIVER_RUNNING, driver.start()); |
| |
| AWAIT_READY(registerFrameworkMessage); |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; |
| |
| // Keep sending duplicate RegisterFrameworkMessages. Master sends |
| // FrameworkRegisteredMessage back after processing each of them. |
| { |
| Future<process::Message> duplicateFrameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| _); |
| |
| process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); |
| |
| // The first message is not throttled because it's at the head of |
| // the queue. |
| AWAIT_READY(duplicateFrameworkRegisteredMessage); |
| |
| // Verify that one message is received and processed (after |
| // registration). |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value); |
| |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); |
| } |
| |
| // The 2nd message is throttled for a second. |
| Future<process::Message> duplicateFrameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| _); |
| |
| process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); |
| |
| // Advance for half a second and verify that the message is still |
| // not processed. |
| Clock::advance(Milliseconds(501)); |
| |
| // Settle to make sure all events not delayed are processed. |
| Clock::settle(); |
| |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| |
| // The 2nd message is received and but not processed after half |
| // a second because of throttling. |
| EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value); |
| EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); |
| EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending()); |
| } |
| |
| // After another half a second the message should be processed. |
| Clock::advance(Milliseconds(501)); |
| AWAIT_READY(duplicateFrameworkRegisteredMessage); |
| |
| // Verify counters after processing of the message. |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| |
| EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value); |
| EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value); |
| |
| EXPECT_EQ(DRIVER_STOPPED, driver.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver.join()); |
| |
| Shutdown(); |
| } |
| |
| |
| // Verify that framework message counters and rate limiters work with |
| // frameworks of different principals which are throttled at |
| // different rates. |
| TEST_F(RateLimitingTest, DifferentPrincipalFrameworks) |
| { |
| master::Flags flags = CreateMasterFlags(); |
| |
| // Configure RateLimits to be 1qps and 0.5qps for two frameworks. |
| // Rate for the second framework is implicitly specified via |
| // 'aggregate_default_qps'. |
| RateLimits limits; |
| RateLimit* limit1 = limits.mutable_limits()->Add(); |
| limit1->set_principal("framework1"); |
| limit1->set_qps(1); |
| limits.set_aggregate_default_qps(0.5); |
| flags.rate_limits = limits; |
| |
| flags.authenticate_frameworks = false; |
| |
| Try<PID<Master> > master = StartMaster(flags); |
| ASSERT_SOME(master); |
| |
| Clock::pause(); |
| |
| // Settle to make sure master is ready for incoming requests, i.e., |
| // '_recover()' completes. |
| Clock::settle(); |
| |
| // Advance before the test so that the 1st call to Metrics endpoint |
| // is not throttled. MetricsProcess which hosts the endpoint |
| // throttles requests at 2qps and its singleton instance is shared |
| // across tests. |
| Clock::advance(Milliseconds(501)); |
| |
| // 1. Register two frameworks. |
| |
| // 1.1. Create the first framework. |
| FrameworkInfo frameworkInfo1; // Bug in gcc 4.1.*, must assign on next line. |
| frameworkInfo1 = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo1.set_principal("framework1"); |
| |
| MockScheduler sched1; |
| // Create MesosSchedulerDriver on the heap because of the need to |
| // destroy it during the test due to MESOS-1456. |
| MesosSchedulerDriver* driver1 = |
| new MesosSchedulerDriver(&sched1, frameworkInfo1, master.get()); |
| |
| EXPECT_CALL(sched1, registered(driver1, _, _)) |
| .Times(1); |
| |
| // Grab the stuff we need to replay the RegisterFrameworkMessage |
| // for sched1. |
| Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF( |
| RegisterFrameworkMessage(), _, master.get()); |
| Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| ASSERT_EQ(DRIVER_RUNNING, driver1->start()); |
| |
| AWAIT_READY(registerFrameworkMessage1); |
| AWAIT_READY(frameworkRegisteredMessage1); |
| |
| const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to; |
| |
| // 1.2. Create the second framework. |
| FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line. |
| frameworkInfo2 = DEFAULT_FRAMEWORK_INFO; |
| frameworkInfo2.set_principal("framework2"); |
| |
| MockScheduler sched2; |
| MesosSchedulerDriver driver2(&sched2, frameworkInfo2, master.get()); |
| |
| EXPECT_CALL(sched2, registered(&driver2, _, _)) |
| .Times(1); |
| |
| // Grab the stuff we need to replay the RegisterFrameworkMessage |
| // for sched2. |
| Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF( |
| RegisterFrameworkMessage(), _, master.get()); |
| Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| ASSERT_EQ(DRIVER_RUNNING, driver2.start()); |
| |
| AWAIT_READY(registerFrameworkMessage2); |
| AWAIT_READY(frameworkRegisteredMessage2); |
| |
| const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to; |
| |
| // 2. Send duplicate RegisterFrameworkMessages from the two |
| // schedulers to Master. |
| |
| // The first messages are not throttled because they are at the |
| // head of the queue. |
| { |
| Future<process::Message> duplicateFrameworkRegisteredMessage1 = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| sched1Pid); |
| Future<process::Message> duplicateFrameworkRegisteredMessage2 = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| sched2Pid); |
| |
| process::post(sched1Pid, master.get(), registerFrameworkMessage1.get()); |
| process::post(sched2Pid, master.get(), registerFrameworkMessage2.get()); |
| |
| AWAIT_READY(duplicateFrameworkRegisteredMessage1); |
| AWAIT_READY(duplicateFrameworkRegisteredMessage2); |
| } |
| |
| // Send the second batch of messages which should be throttled. |
| { |
| Future<process::Message> duplicateFrameworkRegisteredMessage1 = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| sched1Pid); |
| Future<process::Message> duplicateFrameworkRegisteredMessage2 = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| sched2Pid); |
| |
| process::post(sched1Pid, master.get(), registerFrameworkMessage1.get()); |
| process::post(sched2Pid, master.get(), registerFrameworkMessage2.get()); |
| |
| // Settle to make sure the pending futures below are indeed due |
| // to throttling. |
| Clock::settle(); |
| |
| EXPECT_TRUE(duplicateFrameworkRegisteredMessage1.isPending()); |
| EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending()); |
| |
| { |
| // Verify counters also indicate that messages are received but |
| // not processed. |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework1/messages_received")); |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework1/messages_processed")); |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework2/messages_received")); |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework2/messages_processed")); |
| |
| EXPECT_EQ( |
| 2, |
| metrics.values["frameworks/framework1/messages_received"] |
| .as<JSON::Number>().value); |
| EXPECT_EQ( |
| 2, |
| metrics.values["frameworks/framework2/messages_received"] |
| .as<JSON::Number>().value); |
| EXPECT_EQ( |
| 1, |
| metrics.values["frameworks/framework1/messages_processed"] |
| .as<JSON::Number>().value); |
| EXPECT_EQ( |
| 1, |
| metrics.values["frameworks/framework2/messages_processed"] |
| .as<JSON::Number>().value); |
| } |
| |
| // Advance for a second so the message from framework1 (1qps) |
| // should be processed. |
| Clock::advance(Seconds(1)); |
| AWAIT_READY(duplicateFrameworkRegisteredMessage1); |
| EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending()); |
| |
| // Framework1's message is processed and framework2's is not |
| // because it's throttled at a lower rate. |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| EXPECT_EQ( |
| 2, |
| metrics.values["frameworks/framework1/messages_processed"] |
| .as<JSON::Number>().value); |
| EXPECT_EQ( |
| 1, |
| metrics.values["frameworks/framework2/messages_processed"] |
| .as<JSON::Number>().value); |
| |
| // After another half a second framework2 (0.2qps)'s message is |
| // processed as well. |
| Clock::advance(Seconds(1)); |
| AWAIT_READY(duplicateFrameworkRegisteredMessage2); |
| } |
| |
| // 2. Counters confirm that both frameworks' messages are processed. |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework1/messages_received")); |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework1/messages_processed")); |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework2/messages_received")); |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework2/messages_processed")); |
| EXPECT_EQ( |
| 2, |
| metrics.values["frameworks/framework1/messages_received"] |
| .as<JSON::Number>().value); |
| EXPECT_EQ( |
| 2, |
| metrics.values["frameworks/framework2/messages_received"] |
| .as<JSON::Number>().value); |
| EXPECT_EQ( |
| 2, |
| metrics.values["frameworks/framework1/messages_processed"] |
| .as<JSON::Number>().value); |
| EXPECT_EQ( |
| 2, |
| metrics.values["frameworks/framework2/messages_processed"] |
| .as<JSON::Number>().value); |
| } |
| |
| // 3. Remove a framework and its message counters are deleted while |
| // the other framework's counters stay. |
| Future<Nothing> frameworkRemoved = |
| FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved); |
| |
| driver1->stop(); |
| driver1->join(); |
| delete driver1; |
| |
| // No need to advance again because we already advanced 1sec for |
| // sched2 so the RateLimiter for sched1 doesn't impose a delay this |
| // time. |
| AWAIT_READY(frameworkRemoved); |
| |
| // Settle to avoid the race between the removal of the counters and |
| // the metrics endpoint query. |
| Clock::settle(); |
| |
| // Advance for Metrics rate limiting. |
| Clock::advance(Milliseconds(501)); |
| |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| EXPECT_EQ( |
| 0u, metrics.values.count("frameworks/framework1/messages_received")); |
| EXPECT_EQ( |
| 0u, metrics.values.count("frameworks/framework1/messages_processed")); |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework2/messages_received")); |
| EXPECT_EQ( |
| 1u, metrics.values.count("frameworks/framework2/messages_processed")); |
| |
| driver2.stop(); |
| driver2.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // Verify that if multiple frameworks use the same principal, they |
| // share the same counters, are throtted at the same rate and |
| // removing one framework doesn't remove the counters. |
| TEST_F(RateLimitingTest, SamePrincipalFrameworks) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Clock::pause(); |
| |
| // Settle to make sure master is ready for incoming requests, i.e., |
| // '_recover()' completes. |
| Clock::settle(); |
| |
| // Advance before the test so that the 1st call to Metrics endpoint |
| // is not throttled. MetricsProcess which hosts the endpoint |
| // throttles requests at 2qps and its singleton instance is shared |
| // across tests. |
| Clock::advance(Milliseconds(501)); |
| |
| // 1. Register two frameworks. |
| |
| // 1.1. Create the first framework. |
| MockScheduler sched1; |
| // Create MesosSchedulerDriver on the heap because of the need to |
| // destroy it during the test due to MESOS-1456. |
| MesosSchedulerDriver* driver1 = new MesosSchedulerDriver( |
| &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched1, registered(driver1, _, _)) |
| .Times(1); |
| |
| // Grab the stuff we need to replay the RegisterFrameworkMessage |
| // for sched1. |
| Future<RegisterFrameworkMessage> registerFrameworkMessage1 = FUTURE_PROTOBUF( |
| RegisterFrameworkMessage(), _, master.get()); |
| Future<process::Message> frameworkRegisteredMessage1 = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| ASSERT_EQ(DRIVER_RUNNING, driver1->start()); |
| |
| AWAIT_READY(registerFrameworkMessage1); |
| AWAIT_READY(frameworkRegisteredMessage1); |
| |
| const process::UPID sched1Pid = frameworkRegisteredMessage1.get().to; |
| |
| // 1.2. Create the second framework. |
| |
| // 'sched2' uses the same principal "test-principal". |
| MockScheduler sched2; |
| MesosSchedulerDriver driver2( |
| &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched2, registered(&driver2, _, _)) |
| .Times(1); |
| |
| // Grab the stuff we need to replay the RegisterFrameworkMessage |
| // for sched2. |
| Future<RegisterFrameworkMessage> registerFrameworkMessage2 = FUTURE_PROTOBUF( |
| RegisterFrameworkMessage(), _, master.get()); |
| Future<process::Message> frameworkRegisteredMessage2 = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| ASSERT_EQ(DRIVER_RUNNING, driver2.start()); |
| |
| AWAIT_READY(registerFrameworkMessage2); |
| AWAIT_READY(frameworkRegisteredMessage2); |
| |
| const process::UPID sched2Pid = frameworkRegisteredMessage2.get().to; |
| |
| // Message counters added after both frameworks are registered. |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| EXPECT_EQ( |
| 1u, |
| metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() + |
| "/messages_received")); |
| EXPECT_EQ( |
| 1u, |
| metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() + |
| "/messages_processed")); |
| } |
| |
| // The 1st message from sched1 is not throttled as it's at the head |
| // of the queue but the 1st message from sched2 is because it's |
| // throttled by the same RateLimiter. |
| Future<process::Message> duplicateFrameworkRegisteredMessage1 = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| sched1Pid); |
| Future<process::Message> duplicateFrameworkRegisteredMessage2 = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| sched2Pid); |
| |
| process::post(sched1Pid, master.get(), registerFrameworkMessage1.get()); |
| process::post(sched2Pid, master.get(), registerFrameworkMessage2.get()); |
| |
| AWAIT_READY(duplicateFrameworkRegisteredMessage1); |
| |
| // Settle to make sure the pending future is indeed caused by |
| // throttling. |
| Clock::settle(); |
| EXPECT_TRUE(duplicateFrameworkRegisteredMessage2.isPending()); |
| |
| // For metrics endpoint. |
| Clock::advance(Milliseconds(501)); |
| |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| // Two messages received and one processed. |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value); |
| |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); |
| } |
| |
| // Advance for another half a second to make sure throttled |
| // message is processed. |
| Clock::advance(Milliseconds(501)); |
| |
| AWAIT_READY(duplicateFrameworkRegisteredMessage2); |
| |
| Future<Nothing> frameworkRemoved = |
| FUTURE_DISPATCH(_, &AllocatorProcess::frameworkRemoved); |
| |
| driver1->stop(); |
| driver1->join(); |
| delete driver1; |
| |
| // Advance to let UnregisterFrameworkMessage come through. |
| Clock::settle(); |
| Clock::advance(Seconds(1)); |
| |
| AWAIT_READY(frameworkRemoved); |
| |
| // Message counters are not removed after the first framework is |
| // unregistered. |
| |
| // For metrics endpoint. |
| Clock::advance(Milliseconds(501)); |
| |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| EXPECT_EQ( |
| 1u, |
| metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() + |
| "/messages_received")); |
| EXPECT_EQ( |
| 1u, |
| metrics.values.count("frameworks/" + DEFAULT_CREDENTIAL.principal() + |
| "/messages_processed")); |
| } |
| |
| driver2.stop(); |
| driver2.join(); |
| |
| Shutdown(); |
| } |
| |
| |
| // Verify that when a scheduler fails over, the new scheduler |
| // instance continues to use the same counters and RateLimiter. |
| TEST_F(RateLimitingTest, SchedulerFailover) |
| { |
| Try<PID<Master> > master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Clock::pause(); |
| |
| // Settle to make sure master is ready for incoming requests, i.e., |
| // '_recover()' completes. |
| Clock::settle(); |
| |
| // Advance before the test so that the 1st call to Metrics endpoint |
| // is not throttled. MetricsProcess which hosts the endpoint |
| // throttles requests at 2qps and its singleton instance is shared |
| // across tests. |
| Clock::advance(Milliseconds(501)); |
| |
| // 1. Launch the first (i.e., failing) scheduler and verify its |
| // counters. |
| |
| MockScheduler sched1; |
| MesosSchedulerDriver driver1( |
| &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched1, registered(&driver1, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| { |
| // Grab the stuff we need to replay the RegisterFrameworkMessage. |
| Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( |
| RegisterFrameworkMessage(), _, master.get()); |
| Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| driver1.start(); |
| |
| AWAIT_READY(registerFrameworkMessage); |
| AWAIT_READY(frameworkRegisteredMessage); |
| AWAIT_READY(frameworkId); |
| |
| const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; |
| |
| // Send a duplicate RegisterFrameworkMessage. Master replies |
| // with a duplicate FrameworkRegisteredMessage. |
| Future<process::Message> duplicateFrameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| _); |
| |
| process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); |
| |
| // Now one message has been received and processed by Master in |
| // addition to the RegisterFrameworkMessage. |
| AWAIT_READY(duplicateFrameworkRegisteredMessage); |
| |
| // Settle to make sure message_processed counters are updated. |
| Clock::settle(); |
| |
| // Verify the message counters. |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| // One message received and processed after the framework is |
| // registered. |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value); |
| |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); |
| } |
| |
| // 2. Now launch the second (i.e., failover) scheduler using the |
| // framework id recorded from the first scheduler and verify that |
| // its counters are not reset to zero. |
| |
| MockScheduler sched2; |
| |
| FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line. |
| framework2 = DEFAULT_FRAMEWORK_INFO; |
| framework2.mutable_id()->MergeFrom(frameworkId.get()); |
| |
| MesosSchedulerDriver driver2( |
| &sched2, framework2, master.get(), DEFAULT_CREDENTIAL); |
| |
| // Scheduler driver ignores duplicate FrameworkRegisteredMessages. |
| EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _)) |
| .Times(1); |
| |
| Future<Nothing> sched1Error; |
| EXPECT_CALL(sched1, error(&driver1, "Framework failed over")) |
| .WillOnce(FutureSatisfy(&sched1Error)); |
| |
| // Grab the stuff we need to replay the ReregisterFrameworkMessage |
| Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| Future<ReregisterFrameworkMessage> reregisterFrameworkMessage = |
| FUTURE_PROTOBUF(ReregisterFrameworkMessage(), _, master.get()); |
| |
| driver2.start(); |
| |
| AWAIT_READY(reregisterFrameworkMessage); |
| AWAIT_READY(sched1Error); |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; |
| |
| Future<process::Message> duplicateFrameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| _); |
| |
| // Sending a duplicate ReregisterFrameworkMessage to test the |
| // message counters with the new scheduler instance. |
| process::post(schedulerPid, master.get(), reregisterFrameworkMessage.get()); |
| |
| // Settle to make sure everything not delayed is processed. |
| Clock::settle(); |
| |
| // Throttled because the same RateLimiter instance is |
| // throttling the new scheduler instance. |
| EXPECT_TRUE(duplicateFrameworkRegisteredMessage.isPending()); |
| |
| // Advance for metrics. |
| Clock::advance(Milliseconds(501)); |
| |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| // Verify that counters correctly indicates the message is |
| // received but not processed. |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value); |
| |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); |
| } |
| |
| // Need another half a second to have it processed. |
| Clock::advance(Milliseconds(501)); |
| |
| AWAIT_READY(duplicateFrameworkRegisteredMessage); |
| |
| // Advance for metrics. |
| Clock::advance(Milliseconds(501)); |
| |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| // Another message after sched2 is reregistered plus the one from |
| // the sched1. |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(2, metrics.values[messages_received].as<JSON::Number>().value); |
| |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| EXPECT_EQ(2, metrics.values[messages_processed].as<JSON::Number>().value); |
| } |
| |
| EXPECT_EQ(DRIVER_STOPPED, driver2.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver2.join()); |
| |
| EXPECT_EQ(DRIVER_ABORTED, driver1.stop()); |
| EXPECT_EQ(DRIVER_STOPPED, driver1.join()); |
| |
| Shutdown(); |
| } |
| |
| |
| TEST_F(RateLimitingTest, CapacityReached) |
| { |
| master::Flags flags = CreateMasterFlags(); |
| RateLimits limits; |
| RateLimit* limit = limits.mutable_limits()->Add(); |
| limit->set_principal(DEFAULT_CREDENTIAL.principal()); |
| limit->set_qps(1); |
| limit->set_capacity(2); |
| flags.rate_limits = limits; |
| |
| Try<PID<Master> > master = StartMaster(flags); |
| ASSERT_SOME(master); |
| |
| Clock::pause(); |
| |
| // Advance before the test so that the 1st call to Metrics endpoint |
| // is not throttled. MetricsProcess which hosts the endpoint |
| // throttles requests at 2qps and its singleton instance is shared |
| // across tests. |
| Clock::advance(Milliseconds(501)); |
| |
| MockScheduler sched; |
| |
| FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line. |
| frameworkInfo = DEFAULT_FRAMEWORK_INFO; |
| |
| // Use a long failover timeout so the master doesn't unregister the |
| // framework right away when it aborts. |
| frameworkInfo.set_failover_timeout(10); |
| |
| // Create MesosSchedulerDriver on the heap because of the need to |
| // destroy it during the test due to MESOS-1456. |
| MesosSchedulerDriver* driver = new MesosSchedulerDriver( |
| &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); |
| |
| EXPECT_CALL(sched, registered(driver, _, _)) |
| .Times(1); |
| |
| // Grab the stuff we need to replay the RegisterFrameworkMessage. |
| Future<RegisterFrameworkMessage> registerFrameworkMessage = FUTURE_PROTOBUF( |
| RegisterFrameworkMessage(), _, master.get()); |
| Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( |
| Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); |
| |
| ASSERT_EQ(DRIVER_RUNNING, driver->start()); |
| |
| AWAIT_READY(registerFrameworkMessage); |
| AWAIT_READY(frameworkRegisteredMessage); |
| |
| const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; |
| |
| // Keep sending duplicate RegisterFrameworkMessages. Master sends |
| // FrameworkRegisteredMessage back after processing each of them. |
| { |
| Future<process::Message> duplicateFrameworkRegisteredMessage = |
| FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), |
| master.get(), |
| _); |
| |
| process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); |
| |
| // The first message is not throttled because it's at the head of |
| // the queue. |
| AWAIT_READY(duplicateFrameworkRegisteredMessage); |
| |
| // Verify that one message is received and processed (after |
| // registration). |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(1, metrics.values[messages_received].as<JSON::Number>().value); |
| |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); |
| } |
| |
| // The subsequent messages are going to be throttled. |
| Future<process::Message> frameworkErrorMessage = |
| FUTURE_MESSAGE(Eq(FrameworkErrorMessage().GetTypeName()), |
| master.get(), |
| _); |
| |
| // Send two messages which will be queued up. This will reach but not |
| // exceed the capacity. |
| for (int i = 0; i < 2; i++) { |
| process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); |
| } |
| |
| // Settle to make sure no error is sent just yet. |
| Clock::settle(); |
| EXPECT_TRUE(frameworkErrorMessage.isPending()); |
| |
| // The 3rd message results in an immediate error. |
| Future<Nothing> error; |
| EXPECT_CALL(sched, error( |
| driver, |
| "Message mesos.internal.RegisterFrameworkMessage dropped: capacity(2) " |
| "exceeded")) |
| .WillOnce(FutureSatisfy(&error)); |
| |
| process::post(schedulerPid, master.get(), registerFrameworkMessage.get()); |
| AWAIT_READY(frameworkErrorMessage); |
| |
| // Settle to make sure scheduler aborts and its |
| // DeactivateFrameworkMessage is received by master. |
| Clock::settle(); |
| |
| AWAIT_READY(error); |
| |
| // Stop the driver but indicate it wants to failover. |
| EXPECT_EQ(DRIVER_ABORTED, driver->stop(true)); |
| EXPECT_EQ(DRIVER_STOPPED, driver->join()); |
| delete driver; |
| |
| // Wait for half a second for metrics endpoint. |
| Clock::advance(Milliseconds(501)); |
| |
| { |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value); |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| // Four messages not processed, two in the queue and two dropped. |
| EXPECT_EQ(1, metrics.values[messages_processed].as<JSON::Number>().value); |
| } |
| |
| // Advance three times for the two pending messages and the exited |
| // event to be processed. |
| for (int i = 0; i < 3; i++) { |
| Clock::advance(Milliseconds(1001)); |
| Clock::settle(); |
| } |
| |
| // Counters are not removed because the scheduler is not |
| // unregistered and the master expects it to failover. |
| JSON::Object metrics = METRICS_SNAPSHOT; |
| |
| const string& messages_received = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_received"; |
| EXPECT_EQ(1u, metrics.values.count(messages_received)); |
| EXPECT_EQ(5, metrics.values[messages_received].as<JSON::Number>().value); |
| const string& messages_processed = |
| "frameworks/" + DEFAULT_CREDENTIAL.principal() + "/messages_processed"; |
| EXPECT_EQ(1u, metrics.values.count(messages_processed)); |
| // Two messages are dropped. |
| EXPECT_EQ(3, metrics.values[messages_processed].as<JSON::Number>().value); |
| |
| Shutdown(); |
| } |
| |
| } // namespace master { |
| } // namespace internal { |
| } // namespace mesos { |