blob: 4cb70720ec6df86ad113170fb664de1bfbd809aa [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <mesos/http.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/master/master.hpp>
#include <mesos/v1/scheduler/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <stout/gtest.hpp>
#include <stout/jsonify.hpp>
#include <stout/nothing.hpp>
#include <stout/recordio.hpp>
#include <stout/stringify.hpp>
#include <stout/try.hpp>
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/recordio.hpp"
#include "internal/evolve.hpp"
#include "master/detector/standalone.hpp"
#include "slave/slave.hpp"
#include "tests/allocator.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using google::protobuf::RepeatedPtrField;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using mesos::internal::recordio::Reader;
using mesos::internal::slave::Slave;
using mesos::internal::protobuf::maintenance::createSchedule;
using mesos::internal::protobuf::maintenance::createUnavailability;
using mesos::internal::protobuf::maintenance::createWindow;
using process::Clock;
using process::Failure;
using process::Future;
using process::Owned;
using process::http::Accepted;
using process::http::NotFound;
using process::http::OK;
using process::http::Pipe;
using process::http::Response;
using recordio::Decoder;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Eq;
using testing::Return;
using testing::WithParamInterface;
namespace mesos {
namespace internal {
namespace tests {
class MasterAPITest
: public MesosTest,
public WithParamInterface<ContentType>
{
public:
// Helper function to post a request to "/api/v1" master endpoint and return
// the response.
Future<v1::master::Response> post(
const process::PID<master::Master>& pid,
const v1::master::Call& call,
const ContentType& contentType)
{
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
return process::http::post(
pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType))
.then([contentType](const Response& response)
-> Future<v1::master::Response> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return deserialize<v1::master::Response>(contentType, response.body);
});
}
// Helper for evolving a type by serializing/parsing when the types
// have not changed across versions.
template <typename T>
static T evolve(const google::protobuf::Message& message)
{
T t;
string data;
// NOTE: We need to use 'SerializePartialToString' instead of
// 'SerializeToString' because some required fields might not be set
// and we don't want an exception to get thrown.
CHECK(message.SerializePartialToString(&data))
<< "Failed to serialize " << message.GetTypeName()
<< " while evolving to " << t.GetTypeName();
// NOTE: We need to use 'ParsePartialFromString' instead of
// 'ParsePartialFromString' because some required fields might not
// be set and we don't want an exception to get thrown.
CHECK(t.ParsePartialFromString(data))
<< "Failed to parse " << t.GetTypeName()
<< " while evolving from " << message.GetTypeName();
return t;
}
};
// These tests are parameterized by the content type of the HTTP request.
INSTANTIATE_TEST_CASE_P(
ContentType,
MasterAPITest,
::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
TEST_P(MasterAPITest, GetAgents)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
// Start one agent.
Future<SlaveRegisteredMessage> agentRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
slave::Flags flags = CreateSlaveFlags();
flags.hostname = "host";
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), flags);
ASSERT_SOME(agent);
AWAIT_READY(agentRegisteredMessage);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_AGENTS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_AGENTS, v1Response.get().type());
ASSERT_EQ(v1Response.get().get_agents().agents_size(), 1);
const v1::master::Response::GetAgents::Agent& v1Agent =
v1Response.get().get_agents().agents(0);
ASSERT_EQ("host", v1Agent.agent_info().hostname());
ASSERT_EQ(agent.get()->pid, v1Agent.pid());
ASSERT_TRUE(v1Agent.active());
ASSERT_EQ("1.0.0", v1Agent.version());
ASSERT_EQ(4, v1Agent.total_resources_size());
}
TEST_P(MasterAPITest, GetFlags)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_FLAGS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_FLAGS, v1Response.get().type());
}
TEST_P(MasterAPITest, GetFrameworks)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
Future<Nothing> registered;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureSatisfy(&registered));
driver.start();
AWAIT_READY(registered);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_FRAMEWORKS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_FRAMEWORKS, v1Response.get().type());
v1::master::Response::GetFrameworks frameworks =
v1Response.get().get_frameworks();
ASSERT_EQ(1, frameworks.frameworks_size());
ASSERT_EQ("default", frameworks.frameworks(0).framework_info().name());
ASSERT_EQ("*", frameworks.frameworks(0).framework_info().role());
ASSERT_FALSE(frameworks.frameworks(0).framework_info().checkpoint());
ASSERT_TRUE(frameworks.frameworks(0).active());
ASSERT_TRUE(frameworks.frameworks(0).connected());
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetHealth)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_HEALTH);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_HEALTH, v1Response.get().type());
ASSERT_TRUE(v1Response.get().get_health().healthy());
}
TEST_P(MasterAPITest, GetVersion)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_VERSION);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_VERSION, v1Response.get().type());
ASSERT_EQ(MESOS_VERSION,
v1Response.get().get_version().version_info().version());
}
TEST_P(MasterAPITest, GetMetrics)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
Duration timeout = Seconds(5);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_METRICS);
v1Call.mutable_get_metrics()->mutable_timeout()->set_nanoseconds(
timeout.ns());
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_METRICS, v1Response.get().type());
hashmap<string, double> metrics;
foreach (const v1::Metric& metric,
v1Response.get().get_metrics().metrics()) {
ASSERT_TRUE(metric.has_value());
metrics[metric.name()] = metric.value();
}
// Verifies that the response metrics is not empty.
ASSERT_LE(0, metrics.size());
}
TEST_P(MasterAPITest, GetExecutors)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// For capturing the SlaveID so we can use it to verify GET_EXECUTORS API
// call.
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
SlaveID slaveId = slaveRegisteredMessage.get().slave_id();
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("test");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
EXPECT_TRUE(status.get().has_executor_id());
EXPECT_EQ(exec.id, status.get().executor_id());
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_EXECUTORS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_EXECUTORS, v1Response.get().type());
ASSERT_EQ(1, v1Response.get().get_executors().executors_size());
ASSERT_EQ(evolve<v1::AgentID>(slaveId),
v1Response.get().get_executors().executors(0).agent_id());
v1::ExecutorInfo executorInfo =
v1Response.get().get_executors().executors(0).executor_info();
ASSERT_EQ(evolve<v1::ExecutorID>(exec.id), executorInfo.executor_id());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetState)
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_STATE);
master::Flags flags = CreateMasterFlags();
flags.hostname = "localhost";
flags.cluster = "test-cluster";
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
EXPECT_NE(0u, offers.get().size());
ContentType contentType = GetParam();
{
// GetState before task launch and check we have one framework, one agent
// and zero tasks/executors.
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
const v1::master::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(1u, getState.get_frameworks().frameworks_size());
ASSERT_EQ(1u, getState.get_agents().agents_size());
ASSERT_EQ(0u, getState.get_tasks().tasks_size());
ASSERT_EQ(0u, getState.get_executors().executors_size());
}
TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
Future<ExecutorDriver*> execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(execDriver);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
{
// GetState after task launch and check we have a running task.
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
const v1::master::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(1u, getState.get_tasks().tasks_size());
ASSERT_EQ(0u, getState.get_tasks().completed_tasks_size());
}
Future<StatusUpdateAcknowledgementMessage> acknowledgement =
FUTURE_PROTOBUF(
StatusUpdateAcknowledgementMessage(),
_,
Eq(slave.get()->pid));
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status2));
// Send a terminal update so that the task transitions to completed.
TaskStatus status3;
status3.mutable_task_id()->CopyFrom(task.task_id());
status3.set_state(TASK_FINISHED);
execDriver.get()->sendStatusUpdate(status3);
AWAIT_READY(status2);
EXPECT_EQ(TASK_FINISHED, status2.get().state());
AWAIT_READY(acknowledgement);
{
// GetState after task finished and check we have a completed task.
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
const v1::master::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(1u, getState.get_tasks().completed_tasks_size());
ASSERT_EQ(0u, getState.get_tasks().tasks_size());
}
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetTasksNoRunningTask)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_TASKS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response.get().type());
ASSERT_EQ(0, v1Response.get().get_tasks().pending_tasks().size());
ASSERT_EQ(0, v1Response.get().get_tasks().tasks().size());
ASSERT_EQ(0, v1Response.get().get_tasks().completed_tasks().size());
ASSERT_EQ(0, v1Response.get().get_tasks().orphan_tasks().size());
}
// This test verifies that the GetTasks v1 API call returns responses correctly
// when the task transitions from being active to completed.
TEST_P(MasterAPITest, GetTasks)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
TaskInfo task;
task.set_name("test");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
Future<ExecutorDriver*> execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(execDriver);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
EXPECT_TRUE(status.get().has_executor_id());
EXPECT_EQ(exec.id, status.get().executor_id());
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_TASKS);
ContentType contentType = GetParam();
{
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response.get().type());
ASSERT_EQ(1, v1Response.get().get_tasks().tasks().size());
ASSERT_EQ(v1::TaskState::TASK_RUNNING,
v1Response.get().get_tasks().tasks(0).state());
ASSERT_EQ("test", v1Response.get().get_tasks().tasks(0).name());
ASSERT_EQ("1", v1Response.get().get_tasks().tasks(0).task_id().value());
}
Future<StatusUpdateAcknowledgementMessage> acknowledgement =
FUTURE_PROTOBUF(
StatusUpdateAcknowledgementMessage(),
_,
Eq(slave.get()->pid));
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status2));
// Send a terminal update so that the task transitions to completed.
TaskStatus status3;
status3.mutable_task_id()->CopyFrom(task.task_id());
status3.set_state(TASK_FINISHED);
execDriver.get()->sendStatusUpdate(status3);
AWAIT_READY(status2);
EXPECT_EQ(TASK_FINISHED, status2.get().state());
AWAIT_READY(acknowledgement);
{
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response.get().type());
ASSERT_EQ(0, v1Response.get().get_tasks().tasks().size());
ASSERT_EQ(1, v1Response.get().get_tasks().completed_tasks().size());
ASSERT_EQ(v1::TaskState::TASK_FINISHED,
v1Response.get().get_tasks().completed_tasks(0).state());
ASSERT_EQ("test", v1Response.get().get_tasks().completed_tasks(0).name());
ASSERT_EQ(
"1",
v1Response.get().get_tasks().completed_tasks(0).task_id().value());
}
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetLoggingLevel)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_LOGGING_LEVEL);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::GET_LOGGING_LEVEL, v1Response.get().type());
ASSERT_LE(0, FLAGS_v);
ASSERT_EQ(
v1Response.get().get_logging_level().level(),
static_cast<uint32_t>(FLAGS_v));
}
// Test the logging level toggle and revert after specific toggle duration.
TEST_P(MasterAPITest, SetLoggingLevel)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
// We capture the original logging level first; it would be used to verify
// the logging level revert works.
uint32_t originalLevel = static_cast<uint32_t>(FLAGS_v);
// Send request to master to toggle the logging level.
uint32_t toggleLevel = originalLevel + 1;
Duration toggleDuration = Seconds(60);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SET_LOGGING_LEVEL);
v1::master::Call_SetLoggingLevel* setLoggingLevel =
v1Call.mutable_set_logging_level();
setLoggingLevel->set_level(toggleLevel);
setLoggingLevel->mutable_duration()->set_nanoseconds(toggleDuration.ns());
ContentType contentType = GetParam();
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<Nothing> v1Response = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1Response);
ASSERT_EQ(toggleLevel, static_cast<uint32_t>(FLAGS_v));
// Speedup the logging level revert.
Clock::pause();
Clock::advance(toggleDuration);
Clock::settle();
// Verifies the logging level reverted successfully.
ASSERT_EQ(originalLevel, static_cast<uint32_t>(FLAGS_v));
Clock::resume();
}
// This test verifies if we can retrieve the file listing for a directory
// in the master.
TEST_P(MasterAPITest, ListFiles)
{
Files files;
ASSERT_SOME(os::mkdir("1/2"));
ASSERT_SOME(os::mkdir("1/3"));
ASSERT_SOME(os::write("1/two", "two"));
AWAIT_EXPECT_READY(files.attach("1", "one"));
// Get the `FileInfo` for "1/two" file.
struct stat s;
ASSERT_EQ(0, stat("1/two", &s));
FileInfo file = protobuf::createFileInfo("one/two", s);
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::LIST_FILES);
v1Call.mutable_list_files()->set_path("one/");
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::master::Response::LIST_FILES, v1Response.get().type());
ASSERT_EQ(3, v1Response.get().list_files().file_infos().size());
ASSERT_EQ(internal::evolve(file),
v1Response.get().list_files().file_infos(2));
}
// This test verifies that the client will receive a `NotFound` response when it
// tries to make a `LIST_FILES` call with an invalid path.
TEST_P(MasterAPITest, ListFilesInvalidPath)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::LIST_FILES);
v1Call.mutable_list_files()->set_path("five/");
ContentType contentType = GetParam();
Future<Response> response = process::http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(NotFound().status, response);
}
TEST_P(MasterAPITest, GetRoles)
{
master::Flags masterFlags = CreateMasterFlags();
masterFlags.roles = "role1";
masterFlags.weights = "role1=2.5";
Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources =
"cpus(role1):0.5;mem(role1):512;ports(role1):[31000-31001];"
"disk(role1):1024;gpus(role1):0";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role("role1");
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_ROLES);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_ROLES, v1Response->type());
ASSERT_EQ(2, v1Response->get_roles().roles().size());
EXPECT_EQ("role1", v1Response->get_roles().roles(1).name());
EXPECT_EQ(2.5, v1Response->get_roles().roles(1).weight());
ASSERT_EQ(v1::Resources::parse(slaveFlags.resources.get()).get(),
v1Response->get_roles().roles(1).resources());
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetLeadingMaster)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_LEADING_MASTER);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_LEADING_MASTER, v1Response->type());
ASSERT_EQ(master.get()->getMasterInfo().ip(),
v1Response->get_leading_master().master_info().ip());
}
// This test verifies that an operator can reserve available resources through
// the `RESERVE_RESOURCES` call.
TEST_P(MasterAPITest, ReserveResources)
{
TestAllocator<> allocator;
EXPECT_CALL(allocator, initialize(_, _, _, _, _));
Try<Owned<cluster::Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
Future<SlaveID> slaveId;
EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
.WillOnce(DoAll(InvokeAddSlave(&allocator),
FutureArg<0>(&slaveId)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo .set_role("role");
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved = unreserved.flatten(
frameworkInfo.role(),
createReservationInfo(DEFAULT_CREDENTIAL.principal()));
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Expect an offer to be rescinded!
EXPECT_CALL(sched, offerRescinded(_, _));
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::RESERVE_RESOURCES);
v1::master::Call::ReserveResources* reserveResources =
v1Call.mutable_reserve_resources();
reserveResources->mutable_agent_id()->CopyFrom(
internal::evolve(slaveId.get()));
reserveResources->mutable_resources()->CopyFrom(
internal::evolve<v1::Resource>(
static_cast<const RepeatedPtrField<Resource>&>(dynamicallyReserved)));
ContentType contentType = GetParam();
Future<Response> response = process::http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
driver.stop();
driver.join();
}
// This test verifies that an operator can unreserve reserved resources through
// the `UNRESERVE_RESOURCES` call.
TEST_P(MasterAPITest, UnreserveResources)
{
TestAllocator<> allocator;
EXPECT_CALL(allocator, initialize(_, _, _, _, _));
Try<Owned<cluster::Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
Future<SlaveID> slaveId;
EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
.WillOnce(DoAll(InvokeAddSlave(&allocator),
FutureArg<0>(&slaveId)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo .set_role("role");
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved = unreserved.flatten(
frameworkInfo.role(),
createReservationInfo(DEFAULT_CREDENTIAL.principal()));
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Expect an offer to be rescinded!
EXPECT_CALL(sched, offerRescinded(_, _));
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::RESERVE_RESOURCES);
v1::master::Call::ReserveResources* reserveResources =
v1Call.mutable_reserve_resources();
reserveResources->mutable_agent_id()->CopyFrom(
internal::evolve(slaveId.get()));
reserveResources->mutable_resources()->CopyFrom(
internal::evolve<v1::Resource>(
static_cast<const RepeatedPtrField<Resource>&>(dynamicallyReserved)));
ContentType contentType = GetParam();
Future<Response> reserveResponse = process::http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, reserveResponse);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Expect an offer to be rescinded!
EXPECT_CALL(sched, offerRescinded(_, _));
// Unreserve the resources.
v1Call.set_type(v1::master::Call::UNRESERVE_RESOURCES);
v1::master::Call::UnreserveResources* unreserveResources =
v1Call.mutable_unreserve_resources();
unreserveResources->mutable_agent_id()->CopyFrom(
internal::evolve(slaveId.get()));
unreserveResources->mutable_resources()->CopyFrom(
internal::evolve<v1::Resource>(
static_cast<const RepeatedPtrField<Resource>&>(dynamicallyReserved)));
Future<Response> unreserveResponse = process::http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, unreserveResponse);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
driver.stop();
driver.join();
}
// Test updates a maintenance schedule and verifies it saved via query.
TEST_P(MasterAPITest, UpdateAndGetMaintenanceSchedule)
{
// Set up a master.
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
ContentType contentType = GetParam();
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
// Generate `MachineID`s that can be used in this test.
MachineID machine1;
MachineID machine2;
machine1.set_hostname("Machine1");
machine2.set_ip("0.0.0.2");
// Try to schedule maintenance on an unscheduled machine.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine1, machine2}, createUnavailability(Clock::now()))});
v1::maintenance::Schedule v1Schedule =
evolve<v1::maintenance::Schedule>(schedule);
v1::master::Call v1UpdateScheduleCall;
v1UpdateScheduleCall.set_type(v1::master::Call::UPDATE_MAINTENANCE_SCHEDULE);
v1::master::Call_UpdateMaintenanceSchedule* maintenanceSchedule =
v1UpdateScheduleCall.mutable_update_maintenance_schedule();
maintenanceSchedule->mutable_schedule()->CopyFrom(v1Schedule);
Future<Nothing> v1UpdateScheduleResponse = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1UpdateScheduleCall),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1UpdateScheduleResponse);
// Query maintenance schedule.
v1::master::Call v1GetScheduleCall;
v1GetScheduleCall.set_type(v1::master::Call::GET_MAINTENANCE_SCHEDULE);
Future<v1::master::Response> v1GetScheduleResponse =
post(master.get()->pid, v1GetScheduleCall, contentType);
AWAIT_READY(v1GetScheduleResponse);
ASSERT_TRUE(v1GetScheduleResponse.get().IsInitialized());
ASSERT_EQ(
v1::master::Response::GET_MAINTENANCE_SCHEDULE,
v1GetScheduleResponse.get().type());
// Verify maintenance schedule matches the expectation.
v1::maintenance::Schedule respSchedule =
v1GetScheduleResponse.get().get_maintenance_schedule().schedule();
ASSERT_EQ(1, respSchedule.windows().size());
ASSERT_EQ(2, respSchedule.windows(0).machine_ids().size());
ASSERT_EQ("Machine1", respSchedule.windows(0).machine_ids(0).hostname());
ASSERT_EQ("0.0.0.2", respSchedule.windows(0).machine_ids(1).ip());
}
// Test queries for machine maintenance status.
TEST_P(MasterAPITest, GetMaintenanceStatus)
{
// Set up a master.
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
ContentType contentType = GetParam();
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
// Generate `MachineID`s that can be used in this test.
MachineID machine1;
MachineID machine2;
machine1.set_hostname("Machine1");
machine2.set_ip("0.0.0.2");
// Try to schedule maintenance on an unscheduled machine.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine1, machine2}, createUnavailability(Clock::now()))});
v1::maintenance::Schedule v1Schedule =
evolve<v1::maintenance::Schedule>(schedule);
v1::master::Call v1UpdateScheduleCall;
v1UpdateScheduleCall.set_type(v1::master::Call::UPDATE_MAINTENANCE_SCHEDULE);
v1::master::Call_UpdateMaintenanceSchedule* maintenanceSchedule =
v1UpdateScheduleCall.mutable_update_maintenance_schedule();
maintenanceSchedule->mutable_schedule()->CopyFrom(v1Schedule);
Future<Nothing> v1UpdateScheduleResponse = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1UpdateScheduleCall),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1UpdateScheduleResponse);
// Query maintenance status.
v1::master::Call v1GetStatusCall;
v1GetStatusCall.set_type(v1::master::Call::GET_MAINTENANCE_STATUS);
Future<v1::master::Response> v1GetStatusResponse =
post(master.get()->pid, v1GetStatusCall, contentType);
AWAIT_READY(v1GetStatusResponse);
ASSERT_TRUE(v1GetStatusResponse.get().IsInitialized());
ASSERT_EQ(
v1::master::Response::GET_MAINTENANCE_STATUS,
v1GetStatusResponse.get().type());
// Verify maintenance status matches the expectation.
v1::maintenance::ClusterStatus status =
v1GetStatusResponse.get().get_maintenance_status().status();
ASSERT_EQ(2, status.draining_machines().size());
ASSERT_EQ(0, status.down_machines().size());
}
// Test start machine maintenance and stop machine maintenance APIs.
// In this test case, we start maintenance on a machine and stop maintenance,
// and then verify that the associated maintenance window disappears.
TEST_P(MasterAPITest, StartAndStopMaintenance)
{
// Set up a master.
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
ContentType contentType = GetParam();
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
// Generate `MachineID`s that can be used in this test.
MachineID machine1;
MachineID machine2;
MachineID machine3;
machine1.set_hostname("Machine1");
machine2.set_ip("0.0.0.2");
machine3.set_hostname("Machine3");
machine3.set_ip("0.0.0.3");
// Try to schedule maintenance on unscheduled machines.
Unavailability unavailability = createUnavailability(Clock::now());
maintenance::Schedule schedule = createSchedule({
createWindow({machine1, machine2}, unavailability),
createWindow({machine3}, unavailability)
});
v1::maintenance::Schedule v1Schedule =
evolve<v1::maintenance::Schedule>(schedule);
v1::master::Call v1UpdateScheduleCall;
v1UpdateScheduleCall.set_type(v1::master::Call::UPDATE_MAINTENANCE_SCHEDULE);
v1::master::Call_UpdateMaintenanceSchedule* maintenanceSchedule =
v1UpdateScheduleCall.mutable_update_maintenance_schedule();
maintenanceSchedule->mutable_schedule()->CopyFrom(v1Schedule);
Future<Nothing> v1UpdateScheduleResponse = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1UpdateScheduleCall),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1UpdateScheduleResponse);
// Start maintenance on machine3.
v1::master::Call v1StartMaintenanceCall;
v1StartMaintenanceCall.set_type(v1::master::Call::START_MAINTENANCE);
v1::master::Call_StartMaintenance* startMaintenance =
v1StartMaintenanceCall.mutable_start_maintenance();
startMaintenance->add_machines()->CopyFrom(evolve<v1::MachineID>(machine3));
Future<Nothing> v1StartMaintenanceResponse = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1StartMaintenanceCall),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1StartMaintenanceResponse);
// Stop maintenance on machine3.
v1::master::Call v1StopMaintenanceCall;
v1StopMaintenanceCall.set_type(v1::master::Call::STOP_MAINTENANCE);
v1::master::Call_StopMaintenance* stopMaintenance =
v1StopMaintenanceCall.mutable_stop_maintenance();
stopMaintenance->add_machines()->CopyFrom(evolve<v1::MachineID>(machine3));
Future<Nothing> v1StopMaintenanceResponse = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1StopMaintenanceCall),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1StopMaintenanceResponse);
// Query maintenance schedule.
v1::master::Call v1GetScheduleCall;
v1GetScheduleCall.set_type(v1::master::Call::GET_MAINTENANCE_SCHEDULE);
Future<v1::master::Response> v1GetScheduleResponse =
post(master.get()->pid, v1GetScheduleCall, contentType);
AWAIT_READY(v1GetScheduleResponse);
ASSERT_TRUE(v1GetScheduleResponse.get().IsInitialized());
ASSERT_EQ(
v1::master::Response::GET_MAINTENANCE_SCHEDULE,
v1GetScheduleResponse.get().type());
// Check that only one maintenance window remains.
v1::maintenance::Schedule respSchedule =
v1GetScheduleResponse.get().get_maintenance_schedule().schedule();
ASSERT_EQ(1, respSchedule.windows().size());
ASSERT_EQ(2, respSchedule.windows(0).machine_ids().size());
ASSERT_EQ("Machine1", respSchedule.windows(0).machine_ids(0).hostname());
ASSERT_EQ("0.0.0.2", respSchedule.windows(0).machine_ids(1).ip());
}
// This test tries to verify that a client subscribed to the 'api/v1'
// endpoint is able to receive `TASK_ADDED`/`TASK_UPDATED` events.
TEST_P(MasterAPITest, Subscribe)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<MockV1HTTPScheduler>();
auto executor = std::make_shared<MockV1HTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
scheduler::TestV1Mesos mesos(master.get()->pid, contentType, scheduler);
AWAIT_READY(connected);
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
v1::scheduler::Call call;
call.set_type(v1::scheduler::Call::SUBSCRIBE);
v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
// Launch a task using the scheduler. This should result in a `TASK_ADDED`
// event when the task is launched followed by a `TASK_UPDATED` event after
// the task transitions to running state.
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
// Create event stream after seeing first offer but before first task is
// launched. We should see one framework, one agent and zero task/executor.
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SUBSCRIBE);
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<Response> response = process::http::streaming::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
ASSERT_EQ(Response::PIPE, response.get().type);
ASSERT_SOME(response->reader);
Pipe::Reader reader = response->reader.get();
auto deserializer =
lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
Reader<v1::master::Event> decoder(
Decoder<v1::master::Event>(deserializer), reader);
Future<Result<v1::master::Event>> event = decoder.read();
AWAIT_READY(event);
EXPECT_EQ(v1::master::Event::SUBSCRIBED, event.get().get().type());
const v1::master::Response::GetState& getState =
event.get().get().subscribed().get_state();
EXPECT_EQ(1u, getState.get_frameworks().frameworks_size());
EXPECT_EQ(1u, getState.get_agents().agents_size());
EXPECT_EQ(0u, getState.get_tasks().tasks_size());
EXPECT_EQ(0u, getState.get_executors().executors_size());
event = decoder.read();
EXPECT_TRUE(event.isPending());
const v1::Offer& offer = offers->offers(0);
TaskInfo task = createTask(internal::devolve(offer), "", executorId);
EXPECT_CALL(*scheduler, update(_, _))
.Times(2);
EXPECT_CALL(*executor, connected(_))
.WillOnce(executor::SendSubscribe(
frameworkId, internal::evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(executor::SendUpdateFromTask(
frameworkId, internal::evolve(executorId), v1::TASK_RUNNING));
EXPECT_CALL(*executor, acknowledged(_, _));
{
v1::scheduler::Call call;
call.set_type(v1::scheduler::Call::ACCEPT);
call.mutable_framework_id()->CopyFrom(frameworkId);
v1::scheduler::Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(
internal::evolve(task));
mesos.send(call);
}
AWAIT_READY(event);
ASSERT_EQ(v1::master::Event::TASK_ADDED, event.get().get().type());
ASSERT_EQ(internal::evolve(task.task_id()),
event.get().get().task_added().task().task_id());
event = decoder.read();
AWAIT_READY(event);
ASSERT_EQ(v1::master::Event::TASK_UPDATED, event.get().get().type());
ASSERT_EQ(v1::TASK_RUNNING, event.get().get().task_updated().state());
event = decoder.read();
// After we advance the clock, the status update manager would retry the
// `TASK_RUNNING` update. Since, the state of the task is not changed, this
// should not result in another `TASK_UPDATED` event.
Clock::pause();
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
EXPECT_TRUE(event.isPending());
EXPECT_TRUE(reader.close());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
// This test verifies if we can retrieve the current quota status through
// `GET_QUOTA` call, after we set quota resources through `SET_QUOTA` call.
TEST_P(MasterAPITest, GetQuota)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::Resources quotaResources =
v1::Resources::parse("cpus:1;mem:512").get();
ContentType contentType = GetParam();
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SET_QUOTA);
v1::quota::QuotaRequest* quotaRequest =
v1Call.mutable_set_quota()->mutable_quota_request();
// Use the force flag for setting quota that cannot be satisfied in
// this empty cluster without any agents.
quotaRequest->set_force(true);
quotaRequest->set_role("role1");
quotaRequest->mutable_guarantee()->CopyFrom(quotaResources);
// Send a quota request for the specified role.
Future<Response> response = process::http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
}
// Verify the quota is set using the `GET_QUOTA` call.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_QUOTA);
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_QUOTA, v1Response->type());
ASSERT_EQ(1, v1Response->get_quota().status().infos().size());
EXPECT_EQ(quotaResources,
v1Response->get_quota().status().infos(0).guarantee());
}
}
TEST_P(MasterAPITest, SetQuota)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::Resources quotaResources =
v1::Resources::parse("cpus:1;mem:512").get();
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SET_QUOTA);
v1::quota::QuotaRequest* quotaRequest =
v1Call.mutable_set_quota()->mutable_quota_request();
// Use the force flag for setting quota that cannot be satisfied in
// this empty cluster without any agents.
quotaRequest->set_force(true);
quotaRequest->set_role("role1");
quotaRequest->mutable_guarantee()->CopyFrom(quotaResources);
ContentType contentType = GetParam();
// Send a quota request for the specified role.
Future<Response> response = process::http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
}
// This test verifies if we can remove a quota through `REMOVE_QUOTA` call,
// after we set quota resources through `SET_QUOTA` call.
TEST_P(MasterAPITest, RemoveQuota)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::Resources quotaResources =
v1::Resources::parse("cpus:1;mem:512").get();
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SET_QUOTA);
v1::quota::QuotaRequest* quotaRequest =
v1Call.mutable_set_quota()->mutable_quota_request();
// Use the force flag for setting quota that cannot be satisfied in
// this empty cluster without any agents.
quotaRequest->set_force(true);
quotaRequest->set_role("role1");
quotaRequest->mutable_guarantee()->CopyFrom(quotaResources);
ContentType contentType = GetParam();
// Send a quota request for the specified role.
Future<Response> response = process::http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
}
// Verify if the quota is set using `GET_QUOTA` call.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_QUOTA);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_QUOTA, v1Response->type());
ASSERT_EQ(1, v1Response->get_quota().status().infos().size());
EXPECT_EQ(quotaResources,
v1Response->get_quota().status().infos(0).guarantee());
}
// Remove the quota using `REMOVE_QUOTA` call.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::REMOVE_QUOTA);
v1Call.mutable_remove_quota()->set_role("role1");
ContentType contentType = GetParam();
// Send a quota request for the specified role.
Future<Response> response = process::http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
}
// Verify if the quota is removed using `GET_QUOTA` call.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_QUOTA);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_QUOTA, v1Response->type());
ASSERT_EQ(0, v1Response->get_quota().status().infos().size());
}
}
// Test create and destroy persistent volumes through the master operator API.
// In this test case, we create a persistent volume with the API, then launch a
// task using the volume. Then we destroy the volume with the API after the task
// is finished.
TEST_P(MasterAPITest, CreateAndDestroyVolumes)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// For capturing the SlaveID so we can use it in the create/destroy volumes
// API call.
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags slaveFlags = CreateSlaveFlags();
// Do Static reservation so we can create persistent volumes from it.
slaveFlags.resources = "disk(role1):1024";
Try<Owned<cluster::Slave>> slave = StartSlave(
detector.get(), &containerizer, slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
SlaveID slaveId = slaveRegisteredMessage.get().slave_id();
// Create the persistent volume.
v1::master::Call v1CreateVolumesCall;
v1CreateVolumesCall.set_type(v1::master::Call::CREATE_VOLUMES);
v1::master::Call_CreateVolumes* createVolumes =
v1CreateVolumesCall.mutable_create_volumes();
Resource volume = createPersistentVolume(
Megabytes(64),
"role1",
"id1",
"path1",
None(),
None(),
DEFAULT_CREDENTIAL.principal());
createVolumes->add_volumes()->CopyFrom(evolve<v1::Resource>(volume));
createVolumes->mutable_agent_id()->CopyFrom(evolve<v1::AgentID>(slaveId));
ContentType contentType = GetParam();
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<Nothing> v1CreateVolumesResponse = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1CreateVolumesCall),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != Accepted().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1CreateVolumesResponse);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role("role1");
// Start a framework and launch a task on the persistent volume.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _))
.Times(1);
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(volume));
Resources taskResources = Resources::parse(
"disk:256",
frameworkInfo.role()).get();
TaskInfo taskInfo = createTask(
offer.slave_id(),
taskResources,
"sleep 1",
DEFAULT_EXECUTOR_ID);
EXPECT_CALL(exec, registered(_, _, _, _))
.Times(1);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
AWAIT_READY(status);
EXPECT_EQ(TASK_FINISHED, status.get().state());
// Destroy the persistent volume.
v1::master::Call v1DestroyVolumesCall;
v1DestroyVolumesCall.set_type(v1::master::Call::DESTROY_VOLUMES);
v1::master::Call_DestroyVolumes* destroyVolumes =
v1DestroyVolumesCall.mutable_destroy_volumes();
destroyVolumes->mutable_agent_id()->CopyFrom(evolve<v1::AgentID>(slaveId));
destroyVolumes->add_volumes()->CopyFrom(evolve<v1::Resource>(volume));
Future<Nothing> v1DestroyVolumesResponse = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1DestroyVolumesCall),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != Accepted().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1DestroyVolumesResponse);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetWeights)
{
// Start a master with `--weights` flag.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.weights = "role=2.0";
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_WEIGHTS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_WEIGHTS, v1Response->type());
ASSERT_EQ(1, v1Response->get_weights().weight_infos_size());
ASSERT_EQ("role", v1Response->get_weights().weight_infos().Get(0).role());
ASSERT_EQ(2.0, v1Response->get_weights().weight_infos().Get(0).weight());
}
TEST_P(MasterAPITest, UpdateWeights)
{
// Start a master with `--weights` flag.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.weights = "role=2.0";
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
v1::master::Call getCall, updateCall;
getCall.set_type(v1::master::Call::GET_WEIGHTS);
updateCall.set_type(v1::master::Call::UPDATE_WEIGHTS);
ContentType contentType = GetParam();
Future<v1::master::Response> getResponse =
post(master.get()->pid, getCall, contentType);
AWAIT_READY(getResponse);
ASSERT_TRUE(getResponse->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_WEIGHTS, getResponse->type());
ASSERT_EQ(1, getResponse->get_weights().weight_infos_size());
ASSERT_EQ("role", getResponse->get_weights().weight_infos().Get(0).role());
ASSERT_EQ(2.0, getResponse->get_weights().weight_infos().Get(0).weight());
v1::WeightInfo* weightInfo =
updateCall.mutable_update_weights()->add_weight_infos();
weightInfo->set_role("role");
weightInfo->set_weight(4.0);
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<Nothing> updateResponse = process::http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, updateCall),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(updateResponse);
getResponse = post(master.get()->pid, getCall, contentType);
AWAIT_READY(getResponse);
ASSERT_TRUE(getResponse->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_WEIGHTS, getResponse->type());
ASSERT_EQ(1, getResponse->get_weights().weight_infos_size());
ASSERT_EQ("role", getResponse->get_weights().weight_infos().Get(0).role());
ASSERT_EQ(4.0, getResponse->get_weights().weight_infos().Get(0).weight());
}
class AgentAPITest
: public MesosTest,
public WithParamInterface<ContentType>
{
public:
// Helper function to post a request to "/api/v1" agent endpoint and return
// the response.
Future<v1::agent::Response> post(
const process::PID<slave::Slave>& pid,
const v1::agent::Call& call,
const ContentType& contentType)
{
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
return process::http::post(
pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType))
.then([contentType](const Response& response)
-> Future<v1::agent::Response> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return deserialize<v1::agent::Response>(contentType, response.body);
});
}
};
// These tests are parameterized by the content type of the HTTP request.
INSTANTIATE_TEST_CASE_P(
ContentType,
AgentAPITest,
::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
TEST_P(AgentAPITest, GetFlags)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
// Wait until the agent has finished recovery.
AWAIT_READY(__recover);
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_FLAGS);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_FLAGS, v1Response.get().type());
}
TEST_P(AgentAPITest, GetHealth)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
// Wait until the agent has finished recovery.
AWAIT_READY(__recover);
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_HEALTH);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_HEALTH, v1Response.get().type());
ASSERT_TRUE(v1Response.get().get_health().healthy());
}
TEST_P(AgentAPITest, GetVersion)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
// Wait until the agent has finished recovery.
AWAIT_READY(__recover);
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_VERSION);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_VERSION, v1Response.get().type());
ASSERT_EQ(MESOS_VERSION,
v1Response.get().get_version().version_info().version());
}
TEST_P(AgentAPITest, GetMetrics)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
// Wait until the agent has finished recovery.
AWAIT_READY(__recover);
Duration timeout = Seconds(5);
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_METRICS);
v1Call.mutable_get_metrics()->mutable_timeout()->set_nanoseconds(
timeout.ns());
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_METRICS, v1Response.get().type());
hashmap<string, double> metrics;
foreach (const v1::Metric& metric,
v1Response.get().get_metrics().metrics()) {
ASSERT_TRUE(metric.has_value());
metrics[metric.name()] = metric.value();
}
// Verifies that the response metrics is not empty.
ASSERT_LE(0, metrics.size());
}
TEST_P(AgentAPITest, GetLoggingLevel)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
// Wait until the agent has finished recovery.
AWAIT_READY(__recover);
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_LOGGING_LEVEL);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_LOGGING_LEVEL, v1Response.get().type());
ASSERT_LE(0, FLAGS_v);
ASSERT_EQ(
v1Response.get().get_logging_level().level(),
static_cast<uint32_t>(FLAGS_v));
}
// Test the logging level toggle and revert after specific toggle duration.
TEST_P(AgentAPITest, SetLoggingLevel)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
// Wait until the agent has finished recovery.
AWAIT_READY(__recover);
// We capture the original logging level first; it would be used to verify
// the logging level revert works.
uint32_t originalLevel = static_cast<uint32_t>(FLAGS_v);
// Send request to agent to toggle the logging level.
uint32_t toggleLevel = originalLevel + 1;
Duration toggleDuration = Seconds(60);
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::SET_LOGGING_LEVEL);
v1::agent::Call_SetLoggingLevel* setLoggingLevel =
v1Call.mutable_set_logging_level();
setLoggingLevel->set_level(toggleLevel);
setLoggingLevel->mutable_duration()->set_nanoseconds(toggleDuration.ns());
ContentType contentType = GetParam();
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<Nothing> v1Response = process::http::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType))
.then([contentType](const Response& response) -> Future<Nothing> {
if (response.status != OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1Response);
ASSERT_EQ(toggleLevel, static_cast<uint32_t>(FLAGS_v));
// Speedup the logging level revert.
Clock::pause();
Clock::advance(toggleDuration);
Clock::settle();
// Verifies the logging level reverted successfully.
ASSERT_EQ(originalLevel, static_cast<uint32_t>(FLAGS_v));
Clock::resume();
}
// This test verifies if we can retrieve the file listing for a directory
// in an agent.
TEST_P(AgentAPITest, ListFiles)
{
Files files;
ASSERT_SOME(os::mkdir("1/2"));
ASSERT_SOME(os::mkdir("1/3"));
ASSERT_SOME(os::write("1/two", "two"));
AWAIT_EXPECT_READY(files.attach("1", "one"));
// Get the `FileInfo` for "1/two" file.
struct stat s;
ASSERT_EQ(0, stat("1/two", &s));
FileInfo file = protobuf::createFileInfo("one/two", s);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::LIST_FILES);
v1Call.mutable_list_files()->set_path("one/");
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::agent::Response::LIST_FILES, v1Response.get().type());
ASSERT_EQ(3, v1Response.get().list_files().file_infos().size());
ASSERT_EQ(internal::evolve(file),
v1Response.get().list_files().file_infos(2));
}
// This test verifies that the client will receive a `NotFound` response when it
// tries to make a `LIST_FILES` call with an invalid path.
TEST_P(AgentAPITest, ListFilesInvalidPath)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::LIST_FILES);
v1Call.mutable_list_files()->set_path("five/");
ContentType contentType = GetParam();
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(NotFound().status, response);
}
TEST_P(AgentAPITest, GetContainers)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _));
EXPECT_CALL(exec, registered(_, _, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
const Offer& offer = offers.get()[0];
TaskInfo task = createTask(
offer.slave_id(),
Resources::parse("cpus:0.1;mem:32").get(),
"sleep 1000",
exec.id);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
// No tasks launched, we should expect zero containers in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_CONTAINERS);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_CONTAINERS, v1Response.get().type());
ASSERT_EQ(0, v1Response.get().get_containers().containers_size());
}
driver.launchTasks(offer.id(), {task});
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
ResourceStatistics statistics;
statistics.set_mem_limit_bytes(2048);
// We have to set timestamp here since serializing protobuf without
// filling all required fields generates errors.
statistics.set_timestamp(0);
EXPECT_CALL(containerizer, usage(_))
.WillOnce(Return(statistics));
ContainerStatus containerStatus;
NetworkInfo* networkInfo = containerStatus.add_network_infos();
NetworkInfo::IPAddress* ipAddr = networkInfo->add_ip_addresses();
ipAddr->set_ip_address("192.168.1.20");
EXPECT_CALL(containerizer, status(_))
.WillOnce(Return(containerStatus));
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_CONTAINERS);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response.get().IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_CONTAINERS, v1Response.get().type());
ASSERT_EQ(1, v1Response.get().get_containers().containers_size());
ASSERT_EQ("192.168.1.20",
v1Response.get().get_containers().containers(0).container_status()
.network_infos(0).ip_addresses(0).ip_address());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {