blob: fa34e417155cfb4e13c2e28a2c972163e2a6d22a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <tuple>
#include <mesos/http.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/master/master.hpp>
#include <mesos/v1/scheduler/scheduler.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <stout/gtest.hpp>
#include <stout/jsonify.hpp>
#include <stout/nothing.hpp>
#include <stout/recordio.hpp>
#include <stout/stringify.hpp>
#include <stout/try.hpp>
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/recordio.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "master/detector/standalone.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/allocator.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
#include "tests/resources_utils.hpp"
#include "tests/containerizer/mock_containerizer.hpp"
namespace http = process::http;
using google::protobuf::RepeatedPtrField;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using mesos::slave::ContainerTermination;
using mesos::internal::devolve;
using mesos::internal::evolve;
using mesos::internal::recordio::Reader;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::MesosContainerizer;
using mesos::internal::slave::Slave;
using mesos::internal::protobuf::maintenance::createSchedule;
using mesos::internal::protobuf::maintenance::createUnavailability;
using mesos::internal::protobuf::maintenance::createWindow;
using process::Clock;
using process::Failure;
using process::Future;
using process::Owned;
using process::Promise;
using recordio::Decoder;
using std::string;
using std::tuple;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::Eq;
using testing::Return;
using testing::WithParamInterface;
namespace mesos {
namespace internal {
namespace tests {
class MasterAPITest
: public MesosTest,
public WithParamInterface<ContentType>
{
public:
// Helper function to post a request to "/api/v1" master endpoint and return
// the response.
Future<v1::master::Response> post(
const process::PID<master::Master>& pid,
const v1::master::Call& call,
const ContentType& contentType)
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
return http::post(
pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType))
.then([contentType](const http::Response& response)
-> Future<v1::master::Response> {
if (response.status != http::OK().status) {
return Failure("Unexpected response status " + response.status);
}
return deserialize<v1::master::Response>(contentType, response.body);
});
}
};
// These tests are parameterized by the content type of the HTTP request.
INSTANTIATE_TEST_CASE_P(
ContentType,
MasterAPITest,
::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
TEST_P(MasterAPITest, GetAgents)
{
master::Flags masterFlags = CreateMasterFlags();
masterFlags.domain = createDomainInfo("region-abc", "zone-123");
Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
// Start one agent.
Future<SlaveRegisteredMessage> agentRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.hostname = "host";
slaveFlags.domain = createDomainInfo("region-xyz", "zone-456");
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(agent);
AWAIT_READY(agentRegisteredMessage);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_AGENTS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_AGENTS, v1Response->type());
ASSERT_EQ(v1Response->get_agents().agents_size(), 1);
const v1::master::Response::GetAgents::Agent& v1Agent =
v1Response->get_agents().agents(0);
ASSERT_EQ("host", v1Agent.agent_info().hostname());
ASSERT_EQ(evolve(slaveFlags.domain.get()), v1Agent.agent_info().domain());
ASSERT_EQ(agent.get()->pid, v1Agent.pid());
ASSERT_TRUE(v1Agent.active());
ASSERT_EQ(MESOS_VERSION, v1Agent.version());
ASSERT_EQ(4, v1Agent.total_resources_size());
}
TEST_P(MasterAPITest, GetFlags)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_FLAGS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_FLAGS, v1Response->type());
}
TEST_P(MasterAPITest, GetFrameworks)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
Future<Nothing> registered;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureSatisfy(&registered));
driver.start();
AWAIT_READY(registered);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_FRAMEWORKS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_FRAMEWORKS, v1Response->type());
v1::master::Response::GetFrameworks frameworks =
v1Response->get_frameworks();
ASSERT_EQ(1, frameworks.frameworks_size());
ASSERT_EQ("default", frameworks.frameworks(0).framework_info().name());
ASSERT_EQ("*", frameworks.frameworks(0).framework_info().roles(0));
ASSERT_FALSE(frameworks.frameworks(0).framework_info().checkpoint());
ASSERT_TRUE(frameworks.frameworks(0).active());
ASSERT_TRUE(frameworks.frameworks(0).connected());
ASSERT_FALSE(frameworks.frameworks(0).recovered());
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetHealth)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_HEALTH);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_HEALTH, v1Response->type());
ASSERT_TRUE(v1Response->get_health().healthy());
}
TEST_P(MasterAPITest, GetVersion)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_VERSION);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_VERSION, v1Response->type());
ASSERT_EQ(MESOS_VERSION,
v1Response->get_version().version_info().version());
}
TEST_P(MasterAPITest, GetMetrics)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
Duration timeout = Seconds(5);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_METRICS);
v1Call.mutable_get_metrics()->mutable_timeout()->set_nanoseconds(
timeout.ns());
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_METRICS, v1Response->type());
hashmap<string, double> metrics;
foreach (const v1::Metric& metric,
v1Response->get_metrics().metrics()) {
ASSERT_TRUE(metric.has_value());
metrics[metric.name()] = metric.value();
}
// Verifies that the response metrics is not empty.
ASSERT_LE(0, metrics.size());
}
TEST_P(MasterAPITest, GetExecutors)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// For capturing the SlaveID so we can use it to verify GET_EXECUTORS API
// call.
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
SlaveID slaveId = slaveRegisteredMessage->slave_id();
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
TaskInfo task;
task.set_name("test");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
EXPECT_TRUE(status->has_executor_id());
EXPECT_EQ(exec.id, status->executor_id());
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_EXECUTORS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_EXECUTORS, v1Response->type());
ASSERT_EQ(1, v1Response->get_executors().executors_size());
ASSERT_EQ(evolve(slaveId),
v1Response->get_executors().executors(0).agent_id());
v1::ExecutorInfo executorInfo =
v1Response->get_executors().executors(0).executor_info();
ASSERT_EQ(evolve(exec.id), executorInfo.executor_id());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetState)
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_STATE);
master::Flags flags = CreateMasterFlags();
flags.hostname = "localhost";
flags.cluster = "test-cluster";
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
EXPECT_NE(0u, offers->size());
ContentType contentType = GetParam();
{
// GetState before task launch and check we have one framework, one agent
// and zero tasks/executors.
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
const v1::master::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(1u, getState.get_frameworks().frameworks_size());
ASSERT_EQ(1u, getState.get_agents().agents_size());
ASSERT_EQ(0u, getState.get_tasks().tasks_size());
ASSERT_EQ(0u, getState.get_executors().executors_size());
}
TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
Future<ExecutorDriver*> execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<StatusUpdateAcknowledgementMessage> acknowledgement =
FUTURE_PROTOBUF(
StatusUpdateAcknowledgementMessage(),
Eq(master.get()->pid),
Eq(slave.get()->pid));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(execDriver);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
AWAIT_READY(acknowledgement);
{
// GetState after task launch and check we have a running task.
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
const v1::master::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(1u, getState.get_tasks().tasks_size());
ASSERT_EQ(0u, getState.get_tasks().completed_tasks_size());
}
acknowledgement = FUTURE_PROTOBUF(
StatusUpdateAcknowledgementMessage(),
Eq(master.get()->pid),
Eq(slave.get()->pid));
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status2));
// Send a terminal update so that the task transitions to completed.
TaskStatus status3;
status3.mutable_task_id()->CopyFrom(task.task_id());
status3.set_state(TASK_FINISHED);
execDriver.get()->sendStatusUpdate(status3);
AWAIT_READY(status2);
EXPECT_EQ(TASK_FINISHED, status2->state());
AWAIT_READY(acknowledgement);
{
// GetState after task finished and check we have a completed task.
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
const v1::master::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(1u, getState.get_tasks().completed_tasks_size());
ASSERT_EQ(0u, getState.get_tasks().tasks_size());
}
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetTasksNoRunningTask)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_TASKS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response->type());
ASSERT_EQ(0, v1Response->get_tasks().pending_tasks().size());
ASSERT_EQ(0, v1Response->get_tasks().tasks().size());
ASSERT_EQ(0, v1Response->get_tasks().completed_tasks().size());
ASSERT_EQ(0, v1Response->get_tasks().orphan_tasks().size());
}
// This test verifies that the GetTasks v1 API call returns responses correctly
// when the task transitions from being active to completed.
TEST_P(MasterAPITest, GetTasks)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
TaskInfo task;
task.set_name("test");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
Future<ExecutorDriver*> execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<StatusUpdateAcknowledgementMessage> acknowledgement =
FUTURE_PROTOBUF(
StatusUpdateAcknowledgementMessage(),
Eq(master.get()->pid),
Eq(slave.get()->pid));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(execDriver);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
EXPECT_TRUE(status->has_executor_id());
EXPECT_EQ(exec.id, status->executor_id());
AWAIT_READY(acknowledgement);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_TASKS);
ContentType contentType = GetParam();
{
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response->type());
ASSERT_EQ(1, v1Response->get_tasks().tasks().size());
ASSERT_EQ(v1::TaskState::TASK_RUNNING,
v1Response->get_tasks().tasks(0).state());
ASSERT_EQ("test", v1Response->get_tasks().tasks(0).name());
ASSERT_EQ("1", v1Response->get_tasks().tasks(0).task_id().value());
}
acknowledgement = FUTURE_PROTOBUF(
StatusUpdateAcknowledgementMessage(),
Eq(master.get()->pid),
Eq(slave.get()->pid));
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status2));
// Send a terminal update so that the task transitions to completed.
TaskStatus status3;
status3.mutable_task_id()->CopyFrom(task.task_id());
status3.set_state(TASK_FINISHED);
execDriver.get()->sendStatusUpdate(status3);
AWAIT_READY(status2);
EXPECT_EQ(TASK_FINISHED, status2->state());
AWAIT_READY(acknowledgement);
{
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_TASKS, v1Response->type());
ASSERT_EQ(0, v1Response->get_tasks().tasks().size());
ASSERT_EQ(1, v1Response->get_tasks().completed_tasks().size());
ASSERT_EQ(v1::TaskState::TASK_FINISHED,
v1Response->get_tasks().completed_tasks(0).state());
ASSERT_EQ("test", v1Response->get_tasks().completed_tasks(0).name());
ASSERT_EQ(
"1",
v1Response->get_tasks().completed_tasks(0).task_id().value());
}
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetLoggingLevel)
{
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_LOGGING_LEVEL);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_LOGGING_LEVEL, v1Response->type());
ASSERT_LE(0, FLAGS_v);
ASSERT_EQ(
v1Response->get_logging_level().level(),
static_cast<uint32_t>(FLAGS_v));
}
// Test the logging level toggle and revert after specific toggle duration.
TEST_P(MasterAPITest, SetLoggingLevel)
{
master::Flags flags = CreateMasterFlags();
{
// Default principal 2 is not allowed to set the logging level.
mesos::ACL::SetLogLevel* acl = flags.acls.get().add_set_log_level();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_level()->set_type(mesos::ACL::Entity::NONE);
}
Try<Owned<cluster::Master>> master = this->StartMaster(flags);
ASSERT_SOME(master);
// We capture the original logging level first; it would be used to verify
// the logging level revert works.
uint32_t originalLevel = static_cast<uint32_t>(FLAGS_v);
// Send request to master to toggle the logging level.
uint32_t toggleLevel = originalLevel + 1;
Duration toggleDuration = Seconds(60);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SET_LOGGING_LEVEL);
v1::master::Call_SetLoggingLevel* setLoggingLevel =
v1Call.mutable_set_logging_level();
setLoggingLevel->set_level(toggleLevel);
setLoggingLevel->mutable_duration()->set_nanoseconds(toggleDuration.ns());
ContentType contentType = GetParam();
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
headers["Accept"] = stringify(contentType);
Future<http::Response> v1Response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, v1Response);
}
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> v1Response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, v1Response);
AWAIT_READY(v1Response);
ASSERT_EQ(toggleLevel, static_cast<uint32_t>(FLAGS_v));
}
// Speedup the logging level revert.
Clock::pause();
Clock::advance(toggleDuration);
Clock::settle();
// Verifies the logging level reverted successfully.
ASSERT_EQ(originalLevel, static_cast<uint32_t>(FLAGS_v));
Clock::resume();
}
// This test verifies if we can retrieve the file listing for a directory
// in the master.
TEST_P_TEMP_DISABLED_ON_WINDOWS(MasterAPITest, ListFiles)
{
Files files;
ASSERT_SOME(os::mkdir("1/2"));
ASSERT_SOME(os::mkdir("1/3"));
ASSERT_SOME(os::write("1/two", "two"));
AWAIT_EXPECT_READY(files.attach("1", "one"));
// Get the `FileInfo` for "1/two" file.
struct stat s;
ASSERT_EQ(0, stat("1/two", &s));
FileInfo file = protobuf::createFileInfo("one/two", s);
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::LIST_FILES);
v1Call.mutable_list_files()->set_path("one/");
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::LIST_FILES, v1Response->type());
ASSERT_EQ(3, v1Response->list_files().file_infos().size());
ASSERT_EQ(evolve(file), v1Response->list_files().file_infos(2));
}
// This test verifies that the client will receive a `NotFound` response when it
// tries to make a `LIST_FILES` call with an invalid path.
TEST_P(MasterAPITest, ListFilesInvalidPath)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::LIST_FILES);
v1Call.mutable_list_files()->set_path("five/");
ContentType contentType = GetParam();
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::NotFound().status, response);
}
TEST_P(MasterAPITest, GetRoles)
{
master::Flags masterFlags = CreateMasterFlags();
masterFlags.roles = "role1";
masterFlags.weights = "role1=2.5";
Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources =
"cpus(role1):0.5;mem(role1):512;ports(role1):[31000-31001];"
"disk(role1):1024;gpus(role1):0";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role1");
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_ROLES);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_ROLES, v1Response->type());
ASSERT_EQ(2, v1Response->get_roles().roles().size());
EXPECT_EQ("role1", v1Response->get_roles().roles(1).name());
EXPECT_EQ(2.5, v1Response->get_roles().roles(1).weight());
ASSERT_EQ(
allocatedResources(
devolve(v1::Resources::parse(slaveFlags.resources.get()).get()),
"role1"),
devolve(v1Response->get_roles().roles(1).resources()));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetMaster)
{
master::Flags masterFlags = CreateMasterFlags();
masterFlags.domain = createDomainInfo("region-abc", "zone-123");
Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_MASTER);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_MASTER, v1Response->type());
const mesos::v1::MasterInfo& masterInfo =
v1Response->get_master().master_info();
ASSERT_EQ(evolve(masterFlags.domain.get()), masterInfo.domain());
ASSERT_EQ(master.get()->getMasterInfo().ip(), masterInfo.ip());
}
// This test verifies that an operator can reserve available resources through
// the `RESERVE_RESOURCES` call.
TEST_P(MasterAPITest, ReserveResources)
{
TestAllocator<> allocator;
EXPECT_CALL(allocator, initialize(_, _, _, _, _, _, _));
// Set a low allocation interval to speed up this test.
master::Flags flags = MesosTest::CreateMasterFlags();
flags.allocation_interval = Milliseconds(50);
Try<Owned<cluster::Master>> master = StartMaster(&allocator, flags);
ASSERT_SOME(master);
Future<SlaveID> slaveId;
EXPECT_CALL(allocator, addSlave(_, _, _, _, _, _))
.WillOnce(DoAll(InvokeAddSlave(&allocator),
FutureArg<0>(&slaveId)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved =
unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Expect an offer to be rescinded!
EXPECT_CALL(sched, offerRescinded(_, _));
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::RESERVE_RESOURCES);
v1::master::Call::ReserveResources* reserveResources =
v1Call.mutable_reserve_resources();
reserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId.get()));
reserveResources->mutable_resources()->CopyFrom(evolve(dynamicallyReserved));
ContentType contentType = GetParam();
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Accepted().status, response);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// This test verifies that an operator can unreserve reserved resources through
// the `UNRESERVE_RESOURCES` call.
TEST_P(MasterAPITest, UnreserveResources)
{
TestAllocator<> allocator;
EXPECT_CALL(allocator, initialize(_, _, _, _, _, _, _));
// Set a low allocation interval to speed up this test.
master::Flags flags = MesosTest::CreateMasterFlags();
flags.allocation_interval = Milliseconds(50);
Try<Owned<cluster::Master>> master = StartMaster(&allocator, flags);
ASSERT_SOME(master);
Future<SlaveID> slaveId;
EXPECT_CALL(allocator, addSlave(_, _, _, _, _, _))
.WillOnce(DoAll(InvokeAddSlave(&allocator),
FutureArg<0>(&slaveId)));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved =
unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::RESERVE_RESOURCES);
v1::master::Call::ReserveResources* reserveResources =
v1Call.mutable_reserve_resources();
reserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId.get()));
reserveResources->mutable_resources()->CopyFrom(evolve(dynamicallyReserved));
ContentType contentType = GetParam();
Future<http::Response> reserveResponse = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Accepted().status, reserveResponse);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo.roles(0))));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Expect an offer to be rescinded!
EXPECT_CALL(sched, offerRescinded(_, _));
v1Call.set_type(v1::master::Call::UNRESERVE_RESOURCES);
v1::master::Call::UnreserveResources* unreserveResources =
v1Call.mutable_unreserve_resources();
unreserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId.get()));
unreserveResources->mutable_resources()->CopyFrom(
evolve(dynamicallyReserved));
Future<http::Response> unreserveResponse = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Accepted().status, unreserveResponse);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
// Verifies if the resources are unreserved.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// Test updates a maintenance schedule and verifies it saved via query.
TEST_P(MasterAPITest, UpdateAndGetMaintenanceSchedule)
{
// Set up a master.
master::Flags flags = CreateMasterFlags();
{
// Default principal 2 is not allowed to update any maintenance schedule.
mesos::ACL::UpdateMaintenanceSchedule* acl =
flags.acls.get().add_update_maintenance_schedules();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_machines()->set_type(mesos::ACL::Entity::NONE);
}
{
// Default principal 2 is not allowed to view any maintenance schedule.
mesos::ACL::GetMaintenanceSchedule* acl =
flags.acls.get().add_get_maintenance_schedules();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_machines()->set_type(mesos::ACL::Entity::NONE);
}
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
// Generate `MachineID`s that can be used in this test.
MachineID machine1;
MachineID machine2;
machine1.set_hostname("Machine1");
machine2.set_ip("0.0.0.2");
// Try to schedule maintenance on an unscheduled machine.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine1, machine2}, createUnavailability(Clock::now()))});
v1::maintenance::Schedule v1Schedule = evolve(schedule);
v1::master::Call v1UpdateScheduleCall;
v1UpdateScheduleCall.set_type(v1::master::Call::UPDATE_MAINTENANCE_SCHEDULE);
v1::master::Call_UpdateMaintenanceSchedule* maintenanceSchedule =
v1UpdateScheduleCall.mutable_update_maintenance_schedule();
maintenanceSchedule->mutable_schedule()->CopyFrom(v1Schedule);
ContentType contentType = GetParam();
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1UpdateScheduleCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
}
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1UpdateScheduleCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Query maintenance schedule.
v1::master::Call v1GetScheduleCall;
v1GetScheduleCall.set_type(v1::master::Call::GET_MAINTENANCE_SCHEDULE);
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1GetScheduleCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
Future<v1::master::Response> v1Response =
deserialize<v1::master::Response>(contentType, response->body);
AWAIT_READY(v1Response);
v1::maintenance::Schedule schedule =
v1Response->get_maintenance_schedule().schedule();
ASSERT_EQ(0, schedule.windows().size());
}
{
Future<v1::master::Response> response =
post(master.get()->pid, v1GetScheduleCall, contentType);
AWAIT_READY(response);
ASSERT_TRUE(response->IsInitialized());
ASSERT_EQ(
v1::master::Response::GET_MAINTENANCE_SCHEDULE,
response->type());
// Verify maintenance schedule matches the expectation.
v1::maintenance::Schedule schedule =
response->get_maintenance_schedule().schedule();
ASSERT_EQ(1, schedule.windows().size());
ASSERT_EQ(2, schedule.windows(0).machine_ids().size());
ASSERT_EQ("Machine1", schedule.windows(0).machine_ids(0).hostname());
ASSERT_EQ("0.0.0.2", schedule.windows(0).machine_ids(1).ip());
}
}
// Test queries for machine maintenance status.
TEST_P(MasterAPITest, GetMaintenanceStatus)
{
// Set up a master.
master::Flags flags = CreateMasterFlags();
{
// Default principal 2 is not allowed to view any maintenance status.
mesos::ACL::GetMaintenanceStatus* acl =
flags.acls.get().add_get_maintenance_statuses();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_machines()->set_type(mesos::ACL::Entity::NONE);
}
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
ContentType contentType = GetParam();
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
// Generate `MachineID`s that can be used in this test.
MachineID machine1;
MachineID machine2;
machine1.set_hostname("Machine1");
machine2.set_ip("0.0.0.2");
// Try to schedule maintenance on an unscheduled machine.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine1, machine2}, createUnavailability(Clock::now()))});
v1::maintenance::Schedule v1Schedule = evolve(schedule);
v1::master::Call v1UpdateScheduleCall;
v1UpdateScheduleCall.set_type(v1::master::Call::UPDATE_MAINTENANCE_SCHEDULE);
v1::master::Call_UpdateMaintenanceSchedule* maintenanceSchedule =
v1UpdateScheduleCall.mutable_update_maintenance_schedule();
maintenanceSchedule->mutable_schedule()->CopyFrom(v1Schedule);
Future<Nothing> v1UpdateScheduleResponse = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1UpdateScheduleCall),
stringify(contentType))
.then([](const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1UpdateScheduleResponse);
// Query maintenance status.
v1::master::Call v1GetStatusCall;
v1GetStatusCall.set_type(v1::master::Call::GET_MAINTENANCE_STATUS);
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1GetStatusCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
Try<v1::master::Response> v1Response =
deserialize<v1::master::Response>(contentType, response->body);
v1::maintenance::ClusterStatus status =
v1Response->get_maintenance_status().status();
ASSERT_EQ(0, status.draining_machines().size());
ASSERT_EQ(0, status.down_machines().size());
}
{
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1GetStatusCall, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_MAINTENANCE_STATUS, v1Response->type());
// Verify maintenance status matches the expectation.
v1::maintenance::ClusterStatus status =
v1Response->get_maintenance_status().status();
ASSERT_EQ(2, status.draining_machines().size());
ASSERT_EQ(0, status.down_machines().size());
}
}
// Test start machine maintenance and stop machine maintenance APIs.
// In this test case, we start maintenance on a machine and stop maintenance,
// and then verify that the associated maintenance window disappears.
TEST_P(MasterAPITest, StartAndStopMaintenance)
{
// Set up a master.
master::Flags flags = CreateMasterFlags();
{
// Default principal 2 is not allowed to start maintenance in any machine.
mesos::ACL::StartMaintenance* acl =
flags.acls.get().add_start_maintenances();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_machines()->set_type(mesos::ACL::Entity::NONE);
}
{
// Default principal 2 is not allowed to stop maintenance in any machine.
mesos::ACL::StopMaintenance* acl =
flags.acls.get().add_stop_maintenances();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_machines()->set_type(mesos::ACL::Entity::NONE);
}
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
ContentType contentType = GetParam();
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
// Generate `MachineID`s that can be used in this test.
MachineID machine1;
MachineID machine2;
MachineID machine3;
machine1.set_hostname("Machine1");
machine2.set_ip("0.0.0.2");
machine3.set_hostname("Machine3");
machine3.set_ip("0.0.0.3");
// Try to schedule maintenance on unscheduled machines.
Unavailability unavailability = createUnavailability(Clock::now());
maintenance::Schedule schedule = createSchedule({
createWindow({machine1, machine2}, unavailability),
createWindow({machine3}, unavailability)
});
v1::maintenance::Schedule v1Schedule = evolve(schedule);
{
v1::master::Call v1UpdateScheduleCall;
v1UpdateScheduleCall.set_type(
v1::master::Call::UPDATE_MAINTENANCE_SCHEDULE);
v1::master::Call_UpdateMaintenanceSchedule* maintenanceSchedule =
v1UpdateScheduleCall.mutable_update_maintenance_schedule();
maintenanceSchedule->mutable_schedule()->CopyFrom(v1Schedule);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1UpdateScheduleCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Start maintenance on machine3.
v1::master::Call v1StartMaintenanceCall;
v1StartMaintenanceCall.set_type(v1::master::Call::START_MAINTENANCE);
v1::master::Call_StartMaintenance* startMaintenance =
v1StartMaintenanceCall.mutable_start_maintenance();
startMaintenance->add_machines()->CopyFrom(evolve(machine3));
{
headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1StartMaintenanceCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
}
{
headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1StartMaintenanceCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Stop maintenance on machine3.
v1::master::Call v1StopMaintenanceCall;
v1StopMaintenanceCall.set_type(v1::master::Call::STOP_MAINTENANCE);
v1::master::Call_StopMaintenance* stopMaintenance =
v1StopMaintenanceCall.mutable_stop_maintenance();
stopMaintenance->add_machines()->CopyFrom(evolve(machine3));
{
headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1StopMaintenanceCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
}
{
headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1StopMaintenanceCall),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Query maintenance schedule.
v1::master::Call v1GetScheduleCall;
v1GetScheduleCall.set_type(v1::master::Call::GET_MAINTENANCE_SCHEDULE);
Future<v1::master::Response> v1GetScheduleResponse =
post(master.get()->pid, v1GetScheduleCall, contentType);
AWAIT_READY(v1GetScheduleResponse);
ASSERT_TRUE(v1GetScheduleResponse->IsInitialized());
ASSERT_EQ(
v1::master::Response::GET_MAINTENANCE_SCHEDULE,
v1GetScheduleResponse->type());
// Check that only one maintenance window remains.
v1::maintenance::Schedule respSchedule =
v1GetScheduleResponse->get_maintenance_schedule().schedule();
ASSERT_EQ(1, respSchedule.windows().size());
ASSERT_EQ(2, respSchedule.windows(0).machine_ids().size());
ASSERT_EQ("Machine1", respSchedule.windows(0).machine_ids(0).hostname());
ASSERT_EQ("0.0.0.2", respSchedule.windows(0).machine_ids(1).ip());
}
// This test verifies that a subscriber can receive `AGENT_ADDED`
// and `AGENT_REMOVED` events.
TEST_P(MasterAPITest, SubscribeAgentEvents)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SUBSCRIBE);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
ASSERT_EQ(http::Response::PIPE, response->type);
ASSERT_SOME(response->reader);
http::Pipe::Reader reader = response->reader.get();
auto deserializer =
lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
Reader<v1::master::Event> decoder(
Decoder<v1::master::Event>(deserializer), reader);
Future<Result<v1::master::Event>> event = decoder.read();
AWAIT_READY(event);
EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
const v1::master::Response::GetState& getState =
event->get().subscribed().get_state();
EXPECT_EQ(0u, getState.get_frameworks().frameworks_size());
EXPECT_EQ(0u, getState.get_agents().agents_size());
EXPECT_EQ(0u, getState.get_tasks().tasks_size());
EXPECT_EQ(0u, getState.get_executors().executors_size());
// Start one agent.
Future<SlaveRegisteredMessage> agentRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
slave::Flags flags = CreateSlaveFlags();
flags.hostname = "host";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(agentRegisteredMessage);
event = decoder.read();
AWAIT_READY(event);
SlaveID agentID = agentRegisteredMessage->slave_id();
{
ASSERT_EQ(v1::master::Event::AGENT_ADDED, event->get().type());
const v1::master::Response::GetAgents::Agent& agent =
event->get().agent_added().agent();
ASSERT_EQ("host", agent.agent_info().hostname());
ASSERT_EQ(evolve(agentID), agent.agent_info().id());
ASSERT_EQ(slave.get()->pid, agent.pid());
ASSERT_EQ(MESOS_VERSION, agent.version());
ASSERT_EQ(4, agent.total_resources_size());
}
// Forcefully trigger a shutdown on the slave so that master will remove it.
slave.get()->shutdown();
slave->reset();
event = decoder.read();
AWAIT_READY(event);
{
ASSERT_EQ(v1::master::Event::AGENT_REMOVED, event->get().type());
ASSERT_EQ(evolve(agentID), event->get().agent_removed().agent_id());
}
}
// This test verifies that no information about reservations and/or allocations
// is returned to unauthorized users in response to the GET_AGENTS call.
TEST_P(MasterAPITest, GetAgentsFiltering)
{
master::Flags flags = CreateMasterFlags();
const string roleSuperhero = "superhero";
const string roleMuggle = "muggle";
{
mesos::ACL::ViewRole* acl = flags.acls.get().add_view_roles();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
acl->mutable_roles()->add_values(roleSuperhero);
acl = flags.acls.get().add_view_roles();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
}
{
mesos::ACL::ViewRole* acl = flags.acls.get().add_view_roles();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_roles()->add_values(roleMuggle);
acl = flags.acls.get().add_view_roles();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
}
Try<Owned<cluster::Master>> master = StartMaster(flags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Future<SlaveRegisteredMessage> agentRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
slave::Flags slaveFlags = this->CreateSlaveFlags();
// Statically reserve some resources on the agent.
slaveFlags.resources =
"cpus(muggle):1;cpus(*):2;gpus(*):0;mem(muggle):1024;mem(*):1024;"
"disk(muggle):1024;disk(*):1024;ports(muggle):[30000-30999];"
"ports(*):[31000-32000]";
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(agent);
AWAIT_READY(agentRegisteredMessage);
const SlaveID& agentId = agentRegisteredMessage->slave_id();
// Create dynamic reservation.
{
RepeatedPtrField<Resource> reservation =
Resources::parse("cpus:1;mem:12")->pushReservation(
createDynamicReservationInfo(
roleSuperhero,
DEFAULT_CREDENTIAL.principal()));
Future<http::Response> response = process::http::post(
master.get()->pid,
"reserve",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
strings::format(
"slaveId=%s&resources=%s",
agentId,
JSON::protobuf(reservation)).get());
AWAIT_READY(response);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Accepted().status, response);
}
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_AGENTS);
ContentType contentType = GetParam();
// Default credential principal should only be allowed to see resources
// which are reserved for the role 'superhero'.
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
Try<v1::master::Response> v1Response =
deserialize<v1::master::Response>(contentType, response->body);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_AGENTS, v1Response->type());
ASSERT_EQ(1, v1Response->get_agents().agents_size());
// AgentInfo.resources is not passed through `convertResourceFormat()` so
// its format is different.
foreach (const v1::Resource& resource,
v1Response->get_agents().agents(0).agent_info().resources()) {
EXPECT_FALSE(resource.has_role());
EXPECT_FALSE(resource.has_allocation_info());
EXPECT_FALSE(resource.has_reservation());
EXPECT_EQ(0, resource.reservations_size());
}
vector<RepeatedPtrField<v1::Resource>> resourceFields = {
v1Response->get_agents().agents(0).total_resources(),
v1Response->get_agents().agents(0).allocated_resources(),
v1Response->get_agents().agents(0).offered_resources()
};
bool hasReservedResources = false;
foreach (const RepeatedPtrField<v1::Resource>& resources, resourceFields) {
foreach (const v1::Resource& resource, resources) {
EXPECT_TRUE(resource.has_role());
EXPECT_TRUE(roleSuperhero == resource.role() || "*" == resource.role());
EXPECT_FALSE(resource.has_allocation_info());
if (resource.role() != "*") {
hasReservedResources = true;
EXPECT_TRUE(resource.has_reservation());
EXPECT_FALSE(resource.reservation().has_role());
EXPECT_NE(0, resource.reservations_size());
foreach (const v1::Resource::ReservationInfo& reservation,
resource.reservations()) {
EXPECT_EQ(roleSuperhero, reservation.role());
}
} else {
EXPECT_FALSE(resource.has_reservation());
EXPECT_EQ(0, resource.reservations_size());
}
}
}
EXPECT_TRUE(hasReservedResources);
}
// Default credential principal 2 should only be allowed to see resources
// which are reserved for the role 'muggle'.
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
Try<v1::master::Response> v1Response =
deserialize<v1::master::Response>(contentType, response->body);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_AGENTS, v1Response->type());
ASSERT_EQ(1, v1Response->get_agents().agents_size());
// AgentInfo.resources is not passed through `convertResourceFormat()` so
// its format is different.
foreach (const v1::Resource& resource,
v1Response->get_agents().agents(0).agent_info().resources()) {
EXPECT_FALSE(resource.has_role());
EXPECT_FALSE(resource.has_allocation_info());
EXPECT_FALSE(resource.has_reservation());
if (resource.reservations_size() > 0) {
foreach (const v1::Resource::ReservationInfo& reservation,
resource.reservations()) {
EXPECT_EQ(roleMuggle, reservation.role());
}
}
}
vector<RepeatedPtrField<v1::Resource>> resourceFields = {
v1Response->get_agents().agents(0).total_resources(),
v1Response->get_agents().agents(0).allocated_resources(),
v1Response->get_agents().agents(0).offered_resources()
};
bool hasReservedResources = false;
foreach (const RepeatedPtrField<v1::Resource>& resources, resourceFields) {
foreach (const v1::Resource& resource, resources) {
EXPECT_TRUE(resource.has_role());
EXPECT_TRUE(roleMuggle == resource.role() || "*" == resource.role());
EXPECT_FALSE(resource.has_allocation_info());
if (resource.role() != "*") {
hasReservedResources = true;
EXPECT_FALSE(resource.has_reservation());
EXPECT_NE(0, resource.reservations_size());
foreach (const v1::Resource::ReservationInfo& reservation,
resource.reservations()) {
EXPECT_EQ(roleMuggle, reservation.role());
}
} else {
EXPECT_FALSE(resource.has_reservation());
EXPECT_EQ(0, resource.reservations_size());
}
}
}
EXPECT_TRUE(hasReservedResources);
}
}
// This test verifies that recovered but yet to reregister agents are returned
// in `recovered_agents` field of `GetAgents` response. Authorization is enabled
// to ensure that authorization-based filtering is able to handle recovered
// agents, whose resources are currently stored in the
// pre-reservation-refinement format (see MESOS-7851).
TEST_P_TEMP_DISABLED_ON_WINDOWS(MasterAPITest, GetRecoveredAgents)
{
master::Flags masterFlags = CreateMasterFlags();
masterFlags.registry = "replicated_log";
// This forces the authorizer to be initialized.
{
mesos::ACL::ViewRole* acl = masterFlags.acls.get().add_view_roles();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
acl->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
}
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
// Reuse slaveFlags so both StartSlave() use the same work_dir.
slave::Flags slaveFlags = this->CreateSlaveFlags();
// Statically reserve some resources on the agent.
slaveFlags.resources =
"cpus(foo):1;cpus(*):2;gpus(*):0;mem(foo):1024;mem(*):1024;"
"disk(foo):1024;disk(*):1024;ports(*):[31000-32000]";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
v1::AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
// Create dynamic reservation.
{
RepeatedPtrField<Resource> reservation =
Resources::parse("cpus:1;mem:12")->pushReservation(
createDynamicReservationInfo(
"bar",
DEFAULT_CREDENTIAL.principal()));
Future<http::Response> response = process::http::post(
master.get()->pid,
"reserve",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
strings::format(
"slaveId=%s&resources=%s",
agentId,
JSON::protobuf(reservation)).get());
AWAIT_READY(response);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Accepted().status, response);
}
// Ensure that the agent is present in `GetAgent.agents` while
// `GetAgents.recovered_agents` is empty.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_AGENTS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_AGENTS, v1Response->type());
ASSERT_EQ(1, v1Response->get_agents().agents_size());
ASSERT_EQ(agentId,
v1Response->get_agents().agents(0).agent_info().id());
ASSERT_EQ(0, v1Response->get_agents().recovered_agents_size());
}
// Stop the slave while the master is down.
master->reset();
slave.get()->terminate();
slave->reset();
// Restart the master.
master = StartMaster(masterFlags);
ASSERT_SOME(master);
// Ensure that the agent is present in `GetAgents.recovered_agents`
// while `GetAgents.agents` is empty.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_AGENTS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_AGENTS, v1Response->type());
ASSERT_EQ(0u, v1Response->get_agents().agents_size());
ASSERT_EQ(1u, v1Response->get_agents().recovered_agents_size());
ASSERT_EQ(agentId, v1Response->get_agents().recovered_agents(0).id());
}
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get()->pid, _);
// Start the agent to make it re-register with the master.
detector = master.get()->createDetector();
slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveReregisteredMessage);
// After the agent has successfully re-registered with the master,
// the `GetAgents.recovered_agents` field would be empty.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_AGENTS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_AGENTS, v1Response->type());
ASSERT_EQ(1u, v1Response->get_agents().agents_size());
ASSERT_EQ(0u, v1Response->get_agents().recovered_agents_size());
}
}
// This test tries to verify that a client subscribed to the 'api/v1'
// endpoint is able to receive `TASK_ADDED`/`TASK_UPDATED` events.
TEST_P(MasterAPITest, Subscribe)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
v1::scheduler::Call call;
call.set_type(v1::scheduler::Call::SUBSCRIBE);
v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
// Launch a task using the scheduler. This should result in a `TASK_ADDED`
// event when the task is launched followed by a `TASK_UPDATED` event after
// the task transitions to running state.
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
EXPECT_NE(0, offers->offers().size());
// Create event stream after seeing first offer but before first task is
// launched. We should see one framework, one agent and zero task/executor.
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SUBSCRIBE);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
ASSERT_EQ(http::Response::PIPE, response->type);
ASSERT_SOME(response->reader);
http::Pipe::Reader reader = response->reader.get();
auto deserializer =
lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
Reader<v1::master::Event> decoder(
Decoder<v1::master::Event>(deserializer), reader);
Future<Result<v1::master::Event>> event = decoder.read();
AWAIT_READY(event);
EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
const v1::master::Response::GetState& getState =
event->get().subscribed().get_state();
EXPECT_EQ(1u, getState.get_frameworks().frameworks_size());
EXPECT_EQ(1u, getState.get_agents().agents_size());
EXPECT_EQ(0u, getState.get_tasks().tasks_size());
EXPECT_EQ(0u, getState.get_executors().executors_size());
event = decoder.read();
EXPECT_TRUE(event.isPending());
const v1::Offer& offer = offers->offers(0);
TaskInfo task = createTask(devolve(offer), "", executorId);
Future<Nothing> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureSatisfy(&update));
EXPECT_CALL(*executor, connected(_))
.WillOnce(v1::executor::SendSubscribe(frameworkId, evolve(executorId)));
EXPECT_CALL(*executor, subscribed(_, _));
EXPECT_CALL(*executor, launch(_, _))
.WillOnce(v1::executor::SendUpdateFromTask(
frameworkId, evolve(executorId), v1::TASK_RUNNING));
EXPECT_CALL(*executor, acknowledged(_, _));
{
v1::scheduler::Call call;
call.set_type(v1::scheduler::Call::ACCEPT);
call.mutable_framework_id()->CopyFrom(frameworkId);
v1::scheduler::Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(evolve(task));
mesos.send(call);
}
AWAIT_READY(event);
AWAIT_READY(update);
ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
ASSERT_EQ(evolve(task.task_id()),
event->get().task_added().task().task_id());
event = decoder.read();
AWAIT_READY(event);
ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
ASSERT_EQ(v1::TASK_RUNNING,
event->get().task_updated().state());
ASSERT_EQ(v1::TASK_RUNNING,
event->get().task_updated().status().state());
ASSERT_EQ(evolve(task.task_id()),
event->get().task_updated().status().task_id());
event = decoder.read();
Future<Nothing> update2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureSatisfy(&update2));
// After we advance the clock, the status update manager would retry the
// `TASK_RUNNING` update. Since, the state of the task is not changed, this
// should not result in another `TASK_UPDATED` event.
Clock::pause();
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
AWAIT_READY(update2);
EXPECT_TRUE(event.isPending());
EXPECT_TRUE(reader.close());
EXPECT_CALL(*executor, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(*executor, disconnected(_))
.Times(AtMost(1));
}
// This test tries to verify that a client subscribed to the 'api/v1' endpoint
// can receive `FRAMEWORK_ADDED`, `FRAMEWORK_UPDATED` and 'FRAMEWORK_REMOVED'
// events.
TEST_P(MasterAPITest, FrameworksEvent)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SUBSCRIBE);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
ASSERT_EQ(http::Response::PIPE, response->type);
ASSERT_SOME(response->reader);
http::Pipe::Reader reader = response->reader.get();
auto deserializer =
lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
Reader<v1::master::Event> decoder(
Decoder<v1::master::Event>(deserializer), reader);
Future<Result<v1::master::Event>> event = decoder.read();
AWAIT_READY(event);
EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
const v1::master::Response::GetState& getState =
event->get().subscribed().get_state();
EXPECT_EQ(0u, getState.get_frameworks().frameworks_size());
EXPECT_EQ(0u, getState.get_agents().agents_size());
EXPECT_EQ(0u, getState.get_tasks().tasks_size());
EXPECT_EQ(0u, getState.get_executors().executors_size());
event = decoder.read();
EXPECT_TRUE(event.isPending());
// Start a scheduler. The subscriber will receive a 'FRAMEWORK_ADDED' event
// when the scheduler subscribes with the master.
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler,
detector);
AWAIT_READY(connected);
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
// Set the timeout to a large value to avoid the framework being removed
// when it reconnects.
frameworkInfo.set_failover_timeout(Weeks(2).secs());
{
v1::scheduler::Call call;
call.set_type(v1::scheduler::Call::SUBSCRIBE);
v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
mesos.send(call);
}
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId = subscribed->framework_id();
frameworkInfo.mutable_id()->CopyFrom(frameworkId);
AWAIT_READY(event);
{
EXPECT_EQ(v1::master::Event::FRAMEWORK_ADDED, event.get().get().type());
const v1::master::Response::GetFrameworks::Framework& framework =
event.get().get().framework_added().framework();
EXPECT_EQ(frameworkInfo, framework.framework_info());
EXPECT_TRUE(framework.active());
EXPECT_TRUE(framework.connected());
}
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<Nothing> disconnected;
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected))
.WillRepeatedly(Return()); // Ignore future invocations.
// Force a reconnection with the master. This should result in a
// 'FRAMEWORK_UPDATED' event when the scheduler re-registers with the master.
mesos.reconnect();
AWAIT_READY(disconnected);
// The scheduler should be able to immediately reconnect with the master.
AWAIT_READY(connected);
{
v1::scheduler::Call call;
call.set_type(v1::scheduler::Call::SUBSCRIBE);
call.mutable_framework_id()->CopyFrom(frameworkId);
v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
mesos.send(call);
}
event = decoder.read();
AWAIT_READY(event);
{
EXPECT_EQ(v1::master::Event::FRAMEWORK_UPDATED, event.get().get().type());
const v1::master::Response::GetFrameworks::Framework& framework =
event.get().get().framework_updated().framework();
EXPECT_EQ(frameworkInfo, framework.framework_info());
}
EXPECT_CALL(*scheduler, disconnected(_))
.WillOnce(FutureSatisfy(&disconnected));
// Send a teardown request to the master to teardown the framework.
// The subscriber will receive a 'FRAMEWORK_REMOVED' event from the master.
{
Future<http::Response> response = process::http::post(
master.get()->pid,
"teardown",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
"frameworkId=" + frameworkId.value());
AWAIT_READY(response);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
AWAIT_READY(disconnected);
event = decoder.read();
AWAIT_READY(event);
{
EXPECT_EQ(v1::master::Event::FRAMEWORK_REMOVED, event.get().get().type());
const v1::FrameworkID& frameworkId_ =
event.get().get().framework_removed().framework_info().id();
EXPECT_EQ(frameworkId, frameworkId_);
}
}
// This test verifies if we can retrieve the current quota status through
// `GET_QUOTA` call, after we set quota resources through `SET_QUOTA` call.
TEST_P(MasterAPITest, GetQuota)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::Resources quotaResources =
v1::Resources::parse("cpus:1;mem:512").get();
ContentType contentType = GetParam();
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SET_QUOTA);
v1::quota::QuotaRequest* quotaRequest =
v1Call.mutable_set_quota()->mutable_quota_request();
// Use the force flag for setting quota that cannot be satisfied in
// this empty cluster without any agents.
quotaRequest->set_force(true);
quotaRequest->set_role("role1");
quotaRequest->mutable_guarantee()->CopyFrom(quotaResources);
// Send a quota request for the specified role.
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Verify the quota is set using the `GET_QUOTA` call.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_QUOTA);
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_QUOTA, v1Response->type());
ASSERT_EQ(1, v1Response->get_quota().status().infos().size());
EXPECT_EQ(quotaResources,
v1Response->get_quota().status().infos(0).guarantee());
}
}
TEST_P(MasterAPITest, SetQuota)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::Resources quotaResources =
v1::Resources::parse("cpus:1;mem:512").get();
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SET_QUOTA);
v1::quota::QuotaRequest* quotaRequest =
v1Call.mutable_set_quota()->mutable_quota_request();
// Use the force flag for setting quota that cannot be satisfied in
// this empty cluster without any agents.
quotaRequest->set_force(true);
quotaRequest->set_role("role1");
quotaRequest->mutable_guarantee()->CopyFrom(quotaResources);
ContentType contentType = GetParam();
// Send a quota request for the specified role.
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// This test verifies if we can remove a quota through `REMOVE_QUOTA` call,
// after we set quota resources through `SET_QUOTA` call.
TEST_P(MasterAPITest, RemoveQuota)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
v1::Resources quotaResources =
v1::Resources::parse("cpus:1;mem:512").get();
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SET_QUOTA);
v1::quota::QuotaRequest* quotaRequest =
v1Call.mutable_set_quota()->mutable_quota_request();
// Use the force flag for setting quota that cannot be satisfied in
// this empty cluster without any agents.
quotaRequest->set_force(true);
quotaRequest->set_role("role1");
quotaRequest->mutable_guarantee()->CopyFrom(quotaResources);
ContentType contentType = GetParam();
// Send a quota request for the specified role.
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Verify if the quota is set using `GET_QUOTA` call.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_QUOTA);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_QUOTA, v1Response->type());
ASSERT_EQ(1, v1Response->get_quota().status().infos().size());
EXPECT_EQ(quotaResources,
v1Response->get_quota().status().infos(0).guarantee());
}
// Remove the quota using `REMOVE_QUOTA` call.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::REMOVE_QUOTA);
v1Call.mutable_remove_quota()->set_role("role1");
ContentType contentType = GetParam();
// Send a quota request for the specified role.
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Verify if the quota is removed using `GET_QUOTA` call.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_QUOTA);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_QUOTA, v1Response->type());
ASSERT_EQ(0, v1Response->get_quota().status().infos().size());
}
}
// Test create and destroy persistent volumes through the master operator API.
// In this test case, we create a persistent volume with the API, then launch a
// task using the volume. Then we destroy the volume with the API after the task
// is finished.
TEST_P(MasterAPITest, CreateAndDestroyVolumes)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// For capturing the SlaveID so we can use it in the create/destroy volumes
// API call.
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags slaveFlags = CreateSlaveFlags();
// Do Static reservation so we can create persistent volumes from it.
slaveFlags.resources = "disk(role1):1024";
Try<Owned<cluster::Slave>> slave = StartSlave(
detector.get(), &containerizer, slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
SlaveID slaveId = slaveRegisteredMessage->slave_id();
// Create the persistent volume.
v1::master::Call v1CreateVolumesCall;
v1CreateVolumesCall.set_type(v1::master::Call::CREATE_VOLUMES);
v1::master::Call_CreateVolumes* createVolumes =
v1CreateVolumesCall.mutable_create_volumes();
Resource volume = createPersistentVolume(
Megabytes(64),
"role1",
"id1",
"path1",
None(),
None(),
DEFAULT_CREDENTIAL.principal());
createVolumes->add_volumes()->CopyFrom(evolve(volume));
createVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
ContentType contentType = GetParam();
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<Nothing> v1CreateVolumesResponse = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1CreateVolumesCall),
stringify(contentType))
.then([](const http::Response& response) -> Future<Nothing> {
if (response.status != http::Accepted().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1CreateVolumesResponse);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role1");
// Start a framework and launch a task on the persistent volume.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(volume, frameworkInfo.roles(0))));
Resources taskResources = Resources::parse(
"disk:256",
frameworkInfo.roles(0)).get();
TaskInfo taskInfo = createTask(
offer.slave_id(),
taskResources,
"sleep 1",
DEFAULT_EXECUTOR_ID);
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
AWAIT_READY(status);
EXPECT_EQ(TASK_FINISHED, status->state());
// Destroy the persistent volume.
v1::master::Call v1DestroyVolumesCall;
v1DestroyVolumesCall.set_type(v1::master::Call::DESTROY_VOLUMES);
v1::master::Call_DestroyVolumes* destroyVolumes =
v1DestroyVolumesCall.mutable_destroy_volumes();
destroyVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
destroyVolumes->add_volumes()->CopyFrom(evolve(volume));
Future<Nothing> v1DestroyVolumesResponse = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1DestroyVolumesCall),
stringify(contentType))
.then([](const http::Response& response) -> Future<Nothing> {
if (response.status != http::Accepted().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1DestroyVolumesResponse);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P(MasterAPITest, GetWeights)
{
// Start a master with `--weights` flag.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.weights = "role=2.0";
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::GET_WEIGHTS);
ContentType contentType = GetParam();
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_WEIGHTS, v1Response->type());
ASSERT_EQ(1, v1Response->get_weights().weight_infos_size());
ASSERT_EQ("role", v1Response->get_weights().weight_infos().Get(0).role());
ASSERT_EQ(2.0, v1Response->get_weights().weight_infos().Get(0).weight());
}
TEST_P(MasterAPITest, UpdateWeights)
{
// Start a master with `--weights` flag.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.weights = "role=2.0";
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
v1::master::Call getCall, updateCall;
getCall.set_type(v1::master::Call::GET_WEIGHTS);
updateCall.set_type(v1::master::Call::UPDATE_WEIGHTS);
ContentType contentType = GetParam();
Future<v1::master::Response> getResponse =
post(master.get()->pid, getCall, contentType);
AWAIT_READY(getResponse);
ASSERT_TRUE(getResponse->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_WEIGHTS, getResponse->type());
ASSERT_EQ(1, getResponse->get_weights().weight_infos_size());
ASSERT_EQ("role", getResponse->get_weights().weight_infos().Get(0).role());
ASSERT_EQ(2.0, getResponse->get_weights().weight_infos().Get(0).weight());
v1::WeightInfo* weightInfo =
updateCall.mutable_update_weights()->add_weight_infos();
weightInfo->set_role("role");
weightInfo->set_weight(4.0);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<Nothing> updateResponse = http::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, updateCall),
stringify(contentType))
.then([](const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(updateResponse);
getResponse = post(master.get()->pid, getCall, contentType);
AWAIT_READY(getResponse);
ASSERT_TRUE(getResponse->IsInitialized());
ASSERT_EQ(v1::master::Response::GET_WEIGHTS, getResponse->type());
ASSERT_EQ(1, getResponse->get_weights().weight_infos_size());
ASSERT_EQ("role", getResponse->get_weights().weight_infos().Get(0).role());
ASSERT_EQ(4.0, getResponse->get_weights().weight_infos().Get(0).weight());
}
// This test verifies if we can retrieve file data in the master.
TEST_P(MasterAPITest, ReadFile)
{
Files files;
// Now write a file.
ASSERT_SOME(os::write("file", "body"));
AWAIT_EXPECT_READY(files.attach("file", "myname"));
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
ContentType contentType = GetParam();
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::READ_FILE);
v1::master::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(1);
readFile->set_length(2);
readFile->set_path("myname");
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::READ_FILE, v1Response->type());
ASSERT_EQ("od", v1Response->read_file().data());
ASSERT_EQ(4, v1Response->read_file().size());
}
// Read the file with `offset >= size`. This should return the size of file
// and empty data.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::READ_FILE);
v1::master::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(5);
readFile->set_length(2);
readFile->set_path("myname");
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::READ_FILE, v1Response->type());
ASSERT_EQ("", v1Response->read_file().data());
ASSERT_EQ(4, v1Response->read_file().size());
}
// Read the file without length being set and `offset=0`. This should read
// the entire file.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::READ_FILE);
v1::master::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(0);
readFile->set_path("myname");
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::READ_FILE, v1Response->type());
ASSERT_EQ("body", v1Response->read_file().data());
ASSERT_EQ(4, v1Response->read_file().size());
}
// Read the file with `length > size - offset`. This should return the
// data actually read.
{
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::READ_FILE);
v1::master::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(1);
readFile->set_length(6);
readFile->set_path("myname");
Future<v1::master::Response> v1Response =
post(master.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::master::Response::READ_FILE, v1Response->type());
ASSERT_EQ("ody", v1Response->read_file().data());
ASSERT_EQ(4, v1Response->read_file().size());
}
}
// This test verifies that the client will receive a `NotFound` response when
// it tries to make a `READ_FILE` call with an invalid path.
TEST_P(MasterAPITest, ReadFileInvalidPath)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// Read an invalid file.
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::READ_FILE);
v1::master::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(1);
readFile->set_length(2);
readFile->set_path("invalid_file");
ContentType contentType = GetParam();
Future<http::Response> response = http::post(
master.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::NotFound().status, response);
}
class AgentAPITest
: public MesosTest,
public WithParamInterface<ContentType>
{
public:
// Helper function to post a request to "/api/v1" agent endpoint and return
// the response.
Future<v1::agent::Response> post(
const process::PID<slave::Slave>& pid,
const v1::agent::Call& call,
const ContentType& contentType)
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
return http::post(
pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType))
.then([contentType](const http::Response& response)
-> Future<v1::agent::Response> {
if (response.status != http::OK().status) {
return Failure("Unexpected response status " + response.status);
}
return deserialize<v1::agent::Response>(contentType, response.body);
});
}
};
// These tests are parameterized by the content type of the HTTP request.
INSTANTIATE_TEST_CASE_P(
ContentType,
AgentAPITest,
::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
// Reads `ProcessIO::Data` records from the pipe `reader` until EOF is reached
// and returns the merged stdout and stderr.
// NOTE: It ignores any `ProcessIO::Control` records.
static Future<tuple<string, string>> getProcessIOData(
ContentType contentType,
http::Pipe::Reader reader)
{
return reader.readAll()
.then([contentType](const string& data) -> Future<tuple<string, string>> {
string stdoutReceived;
string stderrReceived;
::recordio::Decoder<v1::agent::ProcessIO> decoder(lambda::bind(
deserialize<v1::agent::ProcessIO>, contentType, lambda::_1));
Try<std::deque<Try<v1::agent::ProcessIO>>> records =
decoder.decode(data);
if (records.isError()) {
return process::Failure(records.error());
}
while(!records->empty()) {
Try<v1::agent::ProcessIO> record = records->front();
records->pop_front();
if (record.isError()) {
return process::Failure(record.error());
}
if (record->data().type() == v1::agent::ProcessIO::Data::STDOUT) {
stdoutReceived += record->data().data();
} else if (record->data().type() ==
v1::agent::ProcessIO::Data::STDERR) {
stderrReceived += record->data().data();
}
}
return std::make_tuple(stdoutReceived, stderrReceived);
});
}
TEST_P(AgentAPITest, GetFlags)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_FLAGS);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_FLAGS, v1Response->type());
}
TEST_P(AgentAPITest, GetHealth)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_HEALTH);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_HEALTH, v1Response->type());
ASSERT_TRUE(v1Response->get_health().healthy());
}
TEST_P(AgentAPITest, GetVersion)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_VERSION);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_VERSION, v1Response->type());
ASSERT_EQ(MESOS_VERSION,
v1Response->get_version().version_info().version());
}
TEST_P(AgentAPITest, GetMetrics)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
Duration timeout = Seconds(5);
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_METRICS);
v1Call.mutable_get_metrics()->mutable_timeout()->set_nanoseconds(
timeout.ns());
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_METRICS, v1Response->type());
hashmap<string, double> metrics;
foreach (const v1::Metric& metric,
v1Response->get_metrics().metrics()) {
ASSERT_TRUE(metric.has_value());
metrics[metric.name()] = metric.value();
}
// Verifies that the response metrics is not empty.
ASSERT_LE(0, metrics.size());
}
TEST_P(AgentAPITest, GetLoggingLevel)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_LOGGING_LEVEL);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_LOGGING_LEVEL, v1Response->type());
ASSERT_LE(0, FLAGS_v);
ASSERT_EQ(
v1Response->get_logging_level().level(),
static_cast<uint32_t>(FLAGS_v));
}
// Test the logging level toggle and revert after specific toggle duration.
TEST_P(AgentAPITest, SetLoggingLevel)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
// We capture the original logging level first; it would be used to verify
// the logging level revert works.
uint32_t originalLevel = static_cast<uint32_t>(FLAGS_v);
// Send request to agent to toggle the logging level.
uint32_t toggleLevel = originalLevel + 1;
Duration toggleDuration = Seconds(60);
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::SET_LOGGING_LEVEL);
v1::agent::Call_SetLoggingLevel* setLoggingLevel =
v1Call.mutable_set_logging_level();
setLoggingLevel->set_level(toggleLevel);
setLoggingLevel->mutable_duration()->set_nanoseconds(toggleDuration.ns());
ContentType contentType = GetParam();
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<Nothing> v1Response = http::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType))
.then([](const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status) {
return Failure("Unexpected response status " + response.status);
}
return Nothing();
});
AWAIT_READY(v1Response);
ASSERT_EQ(toggleLevel, static_cast<uint32_t>(FLAGS_v));
// Speedup the logging level revert.
Clock::pause();
Clock::advance(toggleDuration);
Clock::settle();
// Verifies the logging level reverted successfully.
ASSERT_EQ(originalLevel, static_cast<uint32_t>(FLAGS_v));
Clock::resume();
}
// This test verifies if we can retrieve the file listing for a directory
// in an agent.
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, ListFiles)
{
Files files;
ASSERT_SOME(os::mkdir("1/2"));
ASSERT_SOME(os::mkdir("1/3"));
ASSERT_SOME(os::write("1/two", "two"));
AWAIT_EXPECT_READY(files.attach("1", "one"));
// Get the `FileInfo` for "1/two" file.
struct stat s;
ASSERT_EQ(0, stat("1/two", &s));
FileInfo file = protobuf::createFileInfo("one/two", s);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::LIST_FILES);
v1Call.mutable_list_files()->set_path("one/");
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::LIST_FILES, v1Response->type());
ASSERT_EQ(3, v1Response->list_files().file_infos().size());
ASSERT_EQ(evolve(file), v1Response->list_files().file_infos(2));
}
// This test verifies that the client will receive a `NotFound` response when it
// tries to make a `LIST_FILES` call with an invalid path.
TEST_P(AgentAPITest, ListFilesInvalidPath)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::LIST_FILES);
v1Call.mutable_list_files()->set_path("five/");
ContentType contentType = GetParam();
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::NotFound().status, response);
}
TEST_P(AgentAPITest, GetContainers)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _));
EXPECT_CALL(exec, registered(_, _, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task = createTask(
offer.slave_id(),
Resources::parse("cpus:0.1;mem:32").get(),
"sleep 1000",
exec.id);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
// No tasks launched, we should expect zero containers in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_CONTAINERS);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_CONTAINERS, v1Response->type());
ASSERT_EQ(0, v1Response->get_containers().containers_size());
}
driver.launchTasks(offer.id(), {task});
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
ResourceStatistics statistics;
statistics.set_mem_limit_bytes(2048);
// We have to set timestamp here since serializing protobuf without
// filling all required fields generates errors.
statistics.set_timestamp(0);
EXPECT_CALL(containerizer, usage(_))
.WillOnce(Return(statistics));
ContainerStatus containerStatus;
NetworkInfo* networkInfo = containerStatus.add_network_infos();
NetworkInfo::IPAddress* ipAddr = networkInfo->add_ip_addresses();
ipAddr->set_ip_address("192.168.1.20");
EXPECT_CALL(containerizer, status(_))
.WillOnce(Return(containerStatus));
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_CONTAINERS);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_CONTAINERS, v1Response->type());
ASSERT_EQ(1, v1Response->get_containers().containers_size());
ASSERT_EQ("192.168.1.20",
v1Response->get_containers().containers(0).container_status()
.network_infos(0).ip_addresses(0).ip_address());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies if we can retrieve file data in the agent.
TEST_P(AgentAPITest, ReadFile)
{
Files files;
// Now write a file.
ASSERT_SOME(os::write("file", "body"));
AWAIT_EXPECT_READY(files.attach("file", "myname"));
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
ContentType contentType = GetParam();
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::READ_FILE);
v1::agent::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(1);
readFile->set_length(2);
readFile->set_path("myname");
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::READ_FILE, v1Response->type());
ASSERT_EQ("od", v1Response->read_file().data());
ASSERT_EQ(4, v1Response->read_file().size());
}
// Read the file with `offset >= size`. This should return the size of file
// and empty data.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::READ_FILE);
v1::agent::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(5);
readFile->set_length(2);
readFile->set_path("myname");
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::READ_FILE, v1Response->type());
ASSERT_EQ("", v1Response->read_file().data());
ASSERT_EQ(4, v1Response->read_file().size());
}
// Read the file without length being set and `offset=0`. This should read
// the entire file.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::READ_FILE);
v1::agent::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(0);
readFile->set_path("myname");
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::READ_FILE, v1Response->type());
ASSERT_EQ("body", v1Response->read_file().data());
ASSERT_EQ(4, v1Response->read_file().size());
}
// Read the file with `length > size - offset`. This should return the
// data actually read.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::READ_FILE);
v1::agent::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(1);
readFile->set_length(6);
readFile->set_path("myname");
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::READ_FILE, v1Response->type());
ASSERT_EQ("ody", v1Response->read_file().data());
ASSERT_EQ(4, v1Response->read_file().size());
}
}
// This test verifies that the client will receive a `NotFound` response when
// it tries to make a `READ_FILE` call with an invalid path.
TEST_P(AgentAPITest, ReadFileInvalidPath)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
// Read an invalid file.
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::READ_FILE);
v1::agent::Call::ReadFile* readFile = v1Call.mutable_read_file();
readFile->set_offset(1);
readFile->set_length(2);
readFile->set_path("invalid_file");
ContentType contentType = GetParam();
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::NotFound().status, response);
}
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetFrameworks)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_resources()->MergeFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
task.mutable_command()->MergeFrom(command);
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
ContentType contentType = GetParam();
// No tasks launched, we should expect zero frameworks in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_FRAMEWORKS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_FRAMEWORKS, v1Response->type());
ASSERT_EQ(0, v1Response->get_frameworks().frameworks_size());
ASSERT_EQ(0, v1Response->get_frameworks().completed_frameworks_size());
}
driver.launchTasks(offer.id(), {task});
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// A task launched, we expect one framework in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_FRAMEWORKS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_FRAMEWORKS, v1Response->type());
ASSERT_EQ(1, v1Response->get_frameworks().frameworks_size());
ASSERT_EQ(0, v1Response->get_frameworks().completed_frameworks_size());
}
// Make sure the executor terminated.
Future<Nothing> executorTerminated =
FUTURE_DISPATCH(_, &Slave::executorTerminated);
driver.stop();
driver.join();
AWAIT_READY(executorTerminated);
// After the executor terminated, we should expect one completed framework in
// Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_FRAMEWORKS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_FRAMEWORKS, v1Response->type());
ASSERT_EQ(0, v1Response->get_frameworks().frameworks_size());
ASSERT_EQ(1, v1Response->get_frameworks().completed_frameworks_size());
}
}
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetExecutors)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_resources()->MergeFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
task.mutable_command()->MergeFrom(command);
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
ContentType contentType = GetParam();
// No tasks launched, we should expect zero executors in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_EXECUTORS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_EXECUTORS, v1Response->type());
ASSERT_EQ(0, v1Response->get_executors().executors_size());
ASSERT_EQ(0, v1Response->get_executors().completed_executors_size());
}
driver.launchTasks(offer.id(), {task});
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// A task launched, we expect one executor in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_EXECUTORS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_EXECUTORS, v1Response->type());
ASSERT_EQ(1, v1Response->get_executors().executors_size());
ASSERT_EQ(0, v1Response->get_executors().completed_executors_size());
}
// Make sure the executor terminated.
Future<Nothing> executorTerminated =
FUTURE_DISPATCH(_, &Slave::executorTerminated);
driver.stop();
driver.join();
AWAIT_READY(executorTerminated);
// Make sure `Framework::destroyExecutor()` is processed.
Clock::pause();
Clock::settle();
// After the executor terminated, we should expect one completed executor in
// Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_EXECUTORS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_EXECUTORS, v1Response->type());
ASSERT_EQ(0, v1Response->get_executors().executors_size());
ASSERT_EQ(1, v1Response->get_executors().completed_executors_size());
}
}
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetTasks)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_resources()->MergeFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
task.mutable_command()->MergeFrom(command);
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
ContentType contentType = GetParam();
// No tasks launched, we should expect zero tasks in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_TASKS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_TASKS, v1Response->type());
ASSERT_EQ(0, v1Response->get_tasks().pending_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().queued_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().launched_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().terminated_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().completed_tasks_size());
}
driver.launchTasks(offer.id(), {task});
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// A task launched, we expect one task in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_TASKS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_TASKS, v1Response->type());
ASSERT_EQ(0, v1Response->get_tasks().pending_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().queued_tasks_size());
ASSERT_EQ(1, v1Response->get_tasks().launched_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().terminated_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().completed_tasks_size());
}
Clock::pause();
// Kill the task.
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusKilled));
Future<Nothing> _statusUpdateAcknowledgement =
FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.killTask(statusRunning->task_id());
AWAIT_READY(statusKilled);
EXPECT_EQ(TASK_KILLED, statusKilled->state());
// Make sure the agent receives and properly handles the ACK.
AWAIT_READY(_statusUpdateAcknowledgement);
Clock::settle();
Clock::resume();
// After the executor terminated, we should expect one completed task in
// Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_TASKS);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_TASKS, v1Response->type());
ASSERT_EQ(0, v1Response->get_tasks().pending_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().queued_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().launched_tasks_size());
ASSERT_EQ(0, v1Response->get_tasks().terminated_tasks_size());
ASSERT_EQ(1, v1Response->get_tasks().completed_tasks_size());
}
driver.stop();
driver.join();
}
TEST_P(AgentAPITest, GetAgent)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
slave::Flags flags = CreateSlaveFlags();
flags.hostname = "host";
flags.domain = createDomainInfo("region-xyz", "zone-456");
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector, flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_AGENT);
ContentType contentType = GetParam();
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_AGENT, v1Response->type());
const mesos::v1::AgentInfo& agentInfo = v1Response->get_agent().agent_info();
ASSERT_EQ(flags.hostname, agentInfo.hostname());
ASSERT_EQ(evolve(flags.domain.get()), agentInfo.domain());
}
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, GetState)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
TaskInfo task;
task.set_name("");
task.mutable_task_id()->set_value("1");
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_resources()->MergeFrom(offer.resources());
CommandInfo command;
command.set_value("sleep 1000");
task.mutable_command()->MergeFrom(command);
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusRunning));
ContentType contentType = GetParam();
// GetState before task launch, we should expect zero
// frameworks/tasks/executors in Response.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_STATE);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_STATE, v1Response->type());
const v1::agent::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(0u, getState.get_frameworks().frameworks_size());
ASSERT_EQ(0u, getState.get_tasks().launched_tasks_size());
ASSERT_EQ(0u, getState.get_executors().executors_size());
}
driver.launchTasks(offer.id(), {task});
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// GetState after task launch and check we have a running
// framework/task/executor.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_STATE);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_STATE, v1Response->type());
const v1::agent::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(1u, getState.get_frameworks().frameworks_size());
ASSERT_EQ(0u, getState.get_frameworks().completed_frameworks_size());
ASSERT_EQ(1u, getState.get_tasks().launched_tasks_size());
ASSERT_EQ(0u, getState.get_tasks().completed_tasks_size());
ASSERT_EQ(1u, getState.get_executors().executors_size());
ASSERT_EQ(0u, getState.get_executors().completed_executors_size());
}
Clock::pause();
// Kill the task.
Future<TaskStatus> statusKilled;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&statusKilled));
Future<Nothing> _statusUpdateAcknowledgement =
FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.killTask(statusRunning->task_id());
AWAIT_READY(statusKilled);
EXPECT_EQ(TASK_KILLED, statusKilled->state());
// Make sure the agent receives and properly handles the ACK of `TASK_KILLED`.
AWAIT_READY(_statusUpdateAcknowledgement);
Clock::settle();
Clock::resume();
// Make sure the executor terminated.
Future<Nothing> executorTerminated =
FUTURE_DISPATCH(_, &Slave::executorTerminated);
driver.stop();
driver.join();
AWAIT_READY(executorTerminated);
// Make sure `Framework::destroyExecutor()` is processed.
Clock::pause();
Clock::settle();
// After the executor terminated, we should expect a completed
// framework/task/executor.
{
v1::agent::Call v1Call;
v1Call.set_type(v1::agent::Call::GET_STATE);
Future<v1::agent::Response> v1Response =
post(slave.get()->pid, v1Call, contentType);
AWAIT_READY(v1Response);
ASSERT_TRUE(v1Response->IsInitialized());
ASSERT_EQ(v1::agent::Response::GET_STATE, v1Response->type());
const v1::agent::Response::GetState& getState = v1Response->get_state();
ASSERT_EQ(0u, getState.get_frameworks().frameworks_size());
ASSERT_EQ(1u, getState.get_frameworks().completed_frameworks_size());
ASSERT_EQ(0u, getState.get_tasks().launched_tasks_size());
ASSERT_EQ(1u, getState.get_tasks().completed_tasks_size());
ASSERT_EQ(0u, getState.get_executors().executors_size());
ASSERT_EQ(1u, getState.get_executors().completed_executors_size());
}
}
TEST_P(AgentAPITest, NestedContainerWaitNotFound)
{
ContentType contentType = GetParam();
Clock::pause();
StandaloneMasterDetector detector;
MockContainerizer mockContainerizer;
EXPECT_CALL(mockContainerizer, recover(_))
.WillOnce(Return(Future<Nothing>(Nothing())));
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Try<Owned<cluster::Slave>> slave =
StartSlave(&detector, &mockContainerizer);
ASSERT_SOME(slave);
// Wait for the agent to finish recovery.
AWAIT_READY(__recover);
Clock::settle();
// Expect a 404 for waiting on unknown containers.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::WAIT_NESTED_CONTAINER);
v1::ContainerID unknownContainerId;
unknownContainerId.set_value(UUID::random().toString());
unknownContainerId.mutable_parent()->set_value(UUID::random().toString());
call.mutable_wait_nested_container()->mutable_container_id()
->CopyFrom(unknownContainerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::NotFound().status, response);
}
// The destructor of `cluster::Slave` will try to clean up any
// remaining containers by inspecting the result of `containers()`.
EXPECT_CALL(mockContainerizer, containers())
.WillRepeatedly(Return(hashset<ContainerID>()));
}
TEST_P(AgentAPITest, NestedContainerKillNotFound)
{
ContentType contentType = GetParam();
Clock::pause();
StandaloneMasterDetector detector;
MockContainerizer mockContainerizer;
EXPECT_CALL(mockContainerizer, recover(_))
.WillOnce(Return(Future<Nothing>(Nothing())));
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Try<Owned<cluster::Slave>> slave =
StartSlave(&detector, &mockContainerizer);
ASSERT_SOME(slave);
// Wait for the agent to finish recovery.
AWAIT_READY(__recover);
Clock::settle();
// Expect a 404 for killing unknown containers.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::KILL_NESTED_CONTAINER);
v1::ContainerID unknownContainerId;
unknownContainerId.set_value(UUID::random().toString());
unknownContainerId.mutable_parent()->set_value(UUID::random().toString());
call.mutable_kill_nested_container()->mutable_container_id()
->CopyFrom(unknownContainerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::NotFound().status, response);
}
// The destructor of `cluster::Slave` will try to clean up any
// remaining containers by inspecting the result of `containers()`.
EXPECT_CALL(mockContainerizer, containers())
.WillRepeatedly(Return(hashset<ContainerID>()));
}
// When containerizer returns false from launching a nested
// container, it is considered a bad request (e.g. image
// type is not supported).
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, NestedContainerLaunchFalse)
{
ContentType contentType = GetParam();
Clock::pause();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
slave::Flags agentFlags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, agentFlags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 0.1, 32, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
Future<Nothing> executorRegistered;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureSatisfy(&executorRegistered));
EXPECT_CALL(exec, launchTask(_, _));
driver.start();
// Trigger authentication and registration for the agent.
Clock::advance(agentFlags.authentication_backoff_factor);
Clock::advance(agentFlags.registration_backoff_factor);
AWAIT_READY(executorRegistered);
Future<hashset<ContainerID>> containerIds = containerizer.containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
// Try to launch an "unsupported" container.
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
{
// Return false here to indicate "unsupported".
EXPECT_CALL(containerizer, launch(_, _, _, _))
.WillOnce(Return(Future<bool>(false)));
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER);
call.mutable_launch_nested_container()->mutable_container_id()
->CopyFrom(containerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
}
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, NestedContainerLaunch)
{
ContentType contentType = GetParam();
Clock::pause();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
slave::Flags agentFlags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, agentFlags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 0.1, 32, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
Future<Nothing> executorRegistered;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureSatisfy(&executorRegistered));
EXPECT_CALL(exec, launchTask(_, _));
driver.start();
// Trigger authentication and registration for the agent.
Clock::advance(agentFlags.authentication_backoff_factor);
Clock::advance(agentFlags.registration_backoff_factor);
AWAIT_READY(executorRegistered);
Future<hashset<ContainerID>> containerIds = containerizer.containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
// Launch a nested container and wait for it to finish.
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
{
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER);
call.mutable_launch_nested_container()->mutable_container_id()
->CopyFrom(containerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
Future<v1::agent::Response> wait;
{
v1::agent::Call call;
call.set_type(v1::agent::Call::WAIT_NESTED_CONTAINER);
call.mutable_wait_nested_container()->mutable_container_id()
->CopyFrom(containerId);
wait = post(slave.get()->pid, call, contentType);
Clock::settle();
EXPECT_TRUE(wait.isPending());
}
// Now kill the nested container.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::KILL_NESTED_CONTAINER);
call.mutable_kill_nested_container()->mutable_container_id()
->CopyFrom(containerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
AWAIT_READY(wait);
ASSERT_EQ(v1::agent::Response::WAIT_NESTED_CONTAINER, wait->type());
// The test containerizer sets exit status to 0 when destroyed.
EXPECT_EQ(0, wait->wait_nested_container().exit_status());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, TwoLevelNestedContainerLaunch)
{
ContentType contentType = GetParam();
Clock::pause();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
slave::Flags agentFlags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, agentFlags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 0.1, 32, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
Future<Nothing> executorRegistered;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureSatisfy(&executorRegistered));
EXPECT_CALL(exec, launchTask(_, _));
driver.start();
// Trigger authentication and registration for the agent.
Clock::advance(agentFlags.authentication_backoff_factor);
Clock::advance(agentFlags.registration_backoff_factor);
AWAIT_READY(executorRegistered);
Future<hashset<ContainerID>> containerIds = containerizer.containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
// Launch a two level nested parent/child container and then wait for them to
// finish.
v1::ContainerID parentContainerId;
parentContainerId.set_value(UUID::random().toString());
parentContainerId.mutable_parent()->set_value(containerIds->begin()->value());
// Launch the parent container.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER);
call.mutable_launch_nested_container()->mutable_container_id()
->CopyFrom(parentContainerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Launch the child container.
v1::ContainerID childContainerId;
childContainerId.set_value(UUID::random().toString());
childContainerId.mutable_parent()->CopyFrom(parentContainerId);
{
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER);
call.mutable_launch_nested_container()->mutable_container_id()
->CopyFrom(childContainerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Wait for the parent container.
Future<v1::agent::Response> waitParent;
{
v1::agent::Call call;
call.set_type(v1::agent::Call::WAIT_NESTED_CONTAINER);
call.mutable_wait_nested_container()->mutable_container_id()
->CopyFrom(parentContainerId);
waitParent = post(slave.get()->pid, call, contentType);
Clock::settle();
EXPECT_TRUE(waitParent.isPending());
}
// Wait for the child container.
Future<v1::agent::Response> waitChild;
{
v1::agent::Call call;
call.set_type(v1::agent::Call::WAIT_NESTED_CONTAINER);
call.mutable_wait_nested_container()->mutable_container_id()
->CopyFrom(childContainerId);
waitChild = post(slave.get()->pid, call, contentType);
Clock::settle();
EXPECT_TRUE(waitChild.isPending());
}
// Kill the child container.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::KILL_NESTED_CONTAINER);
call.mutable_kill_nested_container()->mutable_container_id()
->CopyFrom(childContainerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
AWAIT_READY(waitChild);
ASSERT_EQ(v1::agent::Response::WAIT_NESTED_CONTAINER, waitChild->type());
// The test containerizer sets exit status to 0 when destroyed.
EXPECT_EQ(0, waitChild->wait_nested_container().exit_status());
// The parent container should still be running.
EXPECT_TRUE(waitParent.isPending());
// Kill the parent container.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::KILL_NESTED_CONTAINER);
call.mutable_kill_nested_container()->mutable_container_id()
->CopyFrom(parentContainerId);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
AWAIT_READY(waitParent);
ASSERT_EQ(v1::agent::Response::WAIT_NESTED_CONTAINER, waitParent->type());
// The test containerizer sets exit status to 0 when destroyed.
EXPECT_EQ(0, waitParent->wait_nested_container().exit_status());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies that launch nested container session fails when
// attaching to the output of the container fails. Consequently, the
// launched container should be destroyed.
TEST_P_TEMP_DISABLED_ON_WINDOWS(
AgentAPITest,
LaunchNestedContainerSessionAttachFailure)
{
ContentType contentType = GetParam();
Clock::pause();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
slave::Flags agentFlags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, agentFlags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 0.1, 32, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
Future<Nothing> executorRegistered;
EXPECT_CALL(exec, registered(_, _, _, _))
.WillOnce(FutureSatisfy(&executorRegistered));
EXPECT_CALL(exec, launchTask(_, _));
driver.start();
// Trigger authentication and registration for the agent.
Clock::advance(agentFlags.authentication_backoff_factor);
Clock::advance(agentFlags.registration_backoff_factor);
AWAIT_READY(executorRegistered);
Future<hashset<ContainerID>> containerIds = containerizer.containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
call.mutable_launch_nested_container_session()->mutable_container_id()
->CopyFrom(containerId);
Future<http::Response> response = http::streaming::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
// Launch should fail because test containerizer doesn't support `attach`.
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::InternalServerError().status, response);
// Settle the clock here to ensure any pending callbacks are executed.
Clock::settle();
// Attach failure should result in the destruction of nested container.
containerIds = containerizer.containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
EXPECT_FALSE(containerIds->contains(devolve(containerId)));
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies that launching a nested container session results
// in stdout and stderr being streamed correctly.
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPITest, LaunchNestedContainerSession)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<MesosContainerizer*> _containerizer =
MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(_containerizer);
Owned<slave::Containerizer> containerizer(_containerizer.get());
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
// Launch a nested container session that runs a command
// that writes something to stdout and stderr and exits.
Future<hashset<ContainerID>> containerIds = containerizer->containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
string output = "output";
string error = "error";
string command = "printf " + output + " && printf " + error + " 1>&2";
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
call.mutable_launch_nested_container_session()->mutable_container_id()
->CopyFrom(containerId);
call.mutable_launch_nested_container_session()->mutable_command()->set_value(
command);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
ASSERT_EQ(stringify(contentType), response->headers.at("Content-Type"));
ASSERT_NONE(response->headers.get(MESSAGE_CONTENT_TYPE));
ASSERT_EQ(http::Response::PIPE, response->type);
ASSERT_SOME(response->reader);
Future<tuple<string, string>> received =
getProcessIOData(contentType, response->reader.get());
AWAIT_READY(received);
string stdoutReceived;
string stderrReceived;
tie(stdoutReceived, stderrReceived) = received.get();
EXPECT_EQ(output, stdoutReceived);
EXPECT_EQ(error, stderrReceived);
driver.stop();
driver.join();
}
// This tests verifies that unauthorized principals are unable to
// launch nested container sessions.
TEST_P_TEMP_DISABLED_ON_WINDOWS(
AgentAPITest,
LaunchNestedContainerSessionUnauthorized)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<MesosContainerizer*> mesosContainerizer =
MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(mesosContainerizer);
Owned<slave::Containerizer> containerizer(mesosContainerizer.get());
Owned<MasterDetector> detector = master.get()->createDetector();
{
// Default principal is not allowed to launch nested container sessions.
mesos::ACL::LaunchNestedContainerSessionUnderParentWithUser* acl =
flags.acls.get()
.add_launch_nested_container_sessions_under_parent_with_user();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
}
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
// Attempt to launch a nested container which does nothing.
Future<hashset<ContainerID>> containerIds = containerizer->containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
string command = "sleep 1000";
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
call.mutable_launch_nested_container_session()->mutable_container_id()
->CopyFrom(containerId);
call.mutable_launch_nested_container_session()->mutable_command()->set_value(
command);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
containerIds = containerizer->containers();
AWAIT_READY(containerIds);
EXPECT_EQ(1u, containerIds->size());
driver.stop();
driver.join();
}
// This test verifies that launching a nested container session with `TTYInfo`
// results in stdout and stderr being streamed to the client as stdout.
TEST_P_TEMP_DISABLED_ON_WINDOWS(
AgentAPITest,
LaunchNestedContainerSessionWithTTY)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<MesosContainerizer*> _containerizer =
MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(_containerizer);
Owned<slave::Containerizer> containerizer(_containerizer.get());
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
// Launch a nested container session that runs a command
// that writes something to stdout and stderr and exits.
Future<hashset<ContainerID>> containerIds = containerizer->containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
string output = "output";
string error = "error";
string command = "printf " + output + " && printf " + error + " 1>&2";
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
call.mutable_launch_nested_container_session()->mutable_container_id()
->CopyFrom(containerId);
call.mutable_launch_nested_container_session()->mutable_command()->set_value(
command);
call.mutable_launch_nested_container_session()->mutable_container()->set_type(
mesos::v1::ContainerInfo::MESOS);
call.mutable_launch_nested_container_session()->mutable_container()
->mutable_tty_info();
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
ASSERT_EQ(stringify(contentType), response->headers.at("Content-Type"));
ASSERT_EQ(http::Response::PIPE, response->type);
ASSERT_SOME(response->reader);
Future<tuple<string, string>> received =
getProcessIOData(contentType, response->reader.get());
AWAIT_READY(received);
string stdoutReceived;
string stderrReceived;
tie(stdoutReceived, stderrReceived) = received.get();
EXPECT_EQ(output + error, stdoutReceived);
EXPECT_EQ("", stderrReceived);
driver.stop();
driver.join();
}
// This test verifies that the nested container session is destroyed
// upon a client disconnection.
TEST_P_TEMP_DISABLED_ON_WINDOWS(
AgentAPITest,
LaunchNestedContainerSessionDisconnected)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<MesosContainerizer*> _containerizer =
MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(_containerizer);
Owned<slave::Containerizer> containerizer(_containerizer.get());
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
// Launch a nested container session that runs `cat` so that it never exits.
Future<hashset<ContainerID>> containerIds = containerizer->containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
call.mutable_launch_nested_container_session()->mutable_container_id()
->CopyFrom(containerId);
call.mutable_launch_nested_container_session()->mutable_command()->set_value(
"cat");
// TODO(vinod): Ideally, we can use `http::post` here but we cannot currently
// because the caller currently doesn't have a way to disconnect the
// connection (e.g., by closing the response reader pipe).
http::URL agent = http::URL(
"http",
slave.get()->pid.address.ip,
slave.get()->pid.address.port,
slave.get()->pid.id +
"/api/v1");
Future<http::Connection> _connection = http::connect(agent);
AWAIT_READY(_connection);
http::Connection connection = _connection.get(); // Remove const.
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
headers["Content-Type"] = stringify(contentType);
http::Request request;
request.url = agent;
request.method = "POST";
request.headers = headers;
request.body = serialize(contentType, call);
Future<http::Response> response = connection.send(request, true);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
ASSERT_EQ(stringify(contentType), response->headers.at("Content-Type"));
ASSERT_EQ(http::Response::PIPE, response->type);
// Disconnect the launch connection. This should
// result in the nested container being destroyed.
AWAIT_READY(connection.disconnect());
AWAIT_READY(containerizer->wait(devolve(containerId)));
driver.stop();
driver.join();
}
// This test verifies that attaching to the output of a container fails if the
// containerizer doesn't support the operation.
TEST_P(AgentAPITest, AttachContainerOutputFailure)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
// Disable authorization in the agent.
flags.acls = None();
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
Future<hashset<ContainerID>> containerIds = containerizer.containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_OUTPUT);
call.mutable_attach_container_output()->mutable_container_id()
->set_value(containerIds->begin()->value());
EXPECT_CALL(containerizer, attach(_))
.WillOnce(Return(process::Failure("Unsupported")));
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::InternalServerError().status, response);
AWAIT_EXPECT_RESPONSE_BODY_EQ("Unsupported", response);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies that attaching to the input of a container fails if the
// containerizer doesn't support the operation.
TEST_F(AgentAPITest, AttachContainerInputFailure)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
// Disable authorization in the agent.
flags.acls = None();
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
Future<hashset<ContainerID>> containerIds = containerizer.containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
call.mutable_attach_container_input()->set_type(
v1::agent::Call::AttachContainerInput::CONTAINER_ID);
call.mutable_attach_container_input()->mutable_container_id()
->set_value(containerIds->begin()->value());
ContentType contentType = ContentType::RECORDIO;
ContentType messageContentType = ContentType::PROTOBUF;
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
serialize, messageContentType, lambda::_1));
EXPECT_CALL(containerizer, attach(_))
.WillOnce(Return(process::Failure("Unsupported")));
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
headers,
encoder.encode(call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::InternalServerError().status, response);
AWAIT_EXPECT_RESPONSE_BODY_EQ("Unsupported", response);
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// Verifies that unauthorized users are not able to attach to a
// nested container input.
TEST_P_TEMP_DISABLED_ON_WINDOWS(
AgentAPITest,
AttachContainerInputAuthorization)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
{
mesos::ACL::AttachContainerInput* acl =
flags.acls->add_attach_containers_input();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
}
Fetcher fetcher(flags);
Try<MesosContainerizer*> _containerizer =
MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(_containerizer);
Owned<slave::Containerizer> containerizer(_containerizer.get());
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
// Launch a nested container session which runs a shell.
Future<hashset<ContainerID>> containerIds = containerizer->containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
{
string command = "sh";
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
call.mutable_launch_nested_container_session()->mutable_container_id()
->CopyFrom(containerId);
call.mutable_launch_nested_container_session()
->mutable_command()->set_value(command);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Attempt to attach to the container session's input.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
call.mutable_attach_container_input()
->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
call.mutable_attach_container_input()->mutable_container_id()
->CopyFrom(containerId);
ContentType contentType = ContentType::RECORDIO;
ContentType messageContentType = ContentType::PROTOBUF;
::recordio::Encoder<v1::agent::Call> encoder(
lambda::bind(serialize, messageContentType, lambda::_1));
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
headers,
encoder.encode(call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
}
driver.stop();
driver.join();
}
TEST_F(AgentAPITest, AttachContainerInputValidation)
{
Clock::pause();
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
// Wait for the agent to finish recovery.
AWAIT_READY(__recover);
Clock::settle();
ContentType contentType = ContentType::RECORDIO;
ContentType messageContentType = ContentType::PROTOBUF;
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
// Missing 'attach_container_input.container_id'.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
call.mutable_attach_container_input()->set_type(
v1::agent::Call::AttachContainerInput::CONTAINER_ID);
::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
serialize, messageContentType, lambda::_1));
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
headers,
encoder.encode(call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
}
// First call on the request stream should be of type
// 'AttachContainerInput::CONTAINER_ID'.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
call.mutable_attach_container_input()->set_type(
v1::agent::Call::AttachContainerInput::PROCESS_IO);
::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
serialize, messageContentType, lambda::_1));
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
headers,
encoder.encode(call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
}
}
// This test verifies that any missing headers or unsupported media
// types in the request result in a 4xx response.
TEST_F(AgentAPITest, HeaderValidation)
{
Clock::pause();
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
// Wait for the agent to finish recovery.
AWAIT_READY(__recover);
Clock::settle();
// Missing 'Message-Content-Type' header for a streaming request.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
call.mutable_attach_container_input()->set_type(
v1::agent::Call::AttachContainerInput::CONTAINER_ID);
::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
serialize, ContentType::PROTOBUF, lambda::_1));
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
encoder.encode(call),
stringify(ContentType::RECORDIO));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::BadRequest().status, response);
}
// Unsupported 'Message-Content-Type' media type for a streaming request.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
call.mutable_attach_container_input()->set_type(
v1::agent::Call::AttachContainerInput::CONTAINER_ID);
::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
serialize, ContentType::PROTOBUF, lambda::_1));
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers[MESSAGE_CONTENT_TYPE] = "unsupported/media-type";
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
headers,
encoder.encode(call),
stringify(ContentType::RECORDIO));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::UnsupportedMediaType().status,
response);
}
// Unsupported 'Message-Accept' media type for a streaming response.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_OUTPUT);
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
call.mutable_attach_container_output()->mutable_container_id()
->CopyFrom(containerId);
ContentType contentType = ContentType::PROTOBUF;
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(ContentType::RECORDIO);
headers[MESSAGE_ACCEPT] = "unsupported/media-type";
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::NotAcceptable().status, response);
}
// Setting 'Message-Content-Type' header for a non-streaming request.
{
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_OUTPUT);
call.mutable_attach_container_output()->mutable_container_id()
->CopyFrom(containerId);
ContentType contentType = ContentType::PROTOBUF;
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers[MESSAGE_CONTENT_TYPE] = stringify(ContentType::PROTOBUF);
Future<http::Response> response = http::streaming::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::UnsupportedMediaType().status,
response);
}
}
// This test verifies that the default 'Accept' for the
// Agent API endpoint is `APPLICATION_JSON`.
TEST_P(AgentAPITest, DefaultAccept)
{
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
StandaloneMasterDetector detector;
Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait until the agent has finished recovery.
Clock::pause();
Clock::settle();
process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = "*/*";
v1::agent::Call call;
call.set_type(v1::agent::Call::GET_STATE);
ContentType contentType = GetParam();
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
}
class AgentAPIStreamingTest
: public MesosTest,
public WithParamInterface<ContentType> {};
// These tests are parameterized by the content type of the
// streaming HTTP request.
INSTANTIATE_TEST_CASE_P(
ContentType,
AgentAPIStreamingTest,
::testing::Values(
ContentType::PROTOBUF, ContentType::JSON));
// This test launches a child container with TTY and the 'cat' command
// as the entrypoint and attaches to its STDOUT via the attach output call.
// It then verifies that any data streamed to the container via the
// attach input call is received by the client on the output stream.
//
// TODO(alexr): Enable this test once MESOS-6780 is resolved.
TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentAPIStreamingTest,
DISABLED_AttachContainerInput)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<MesosContainerizer*> _containerizer =
MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(_containerizer);
Owned<slave::Containerizer> containerizer(_containerizer.get());
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo taskInfo = createTask(offer, "sleep 1000");
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
Future<hashset<ContainerID>> containerIds = containerizer->containers();
AWAIT_READY(containerIds);
ASSERT_EQ(1u, containerIds->size());
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
// Launch the child container with TTY and then attach to it's output.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER);
call.mutable_launch_nested_container()->mutable_container_id()
->CopyFrom(containerId);
call.mutable_launch_nested_container()->mutable_command()
->CopyFrom(v1::createCommandInfo("cat"));
call.mutable_launch_nested_container()->mutable_container()
->set_type(mesos::v1::ContainerInfo::MESOS);
call.mutable_launch_nested_container()->mutable_container()
->mutable_tty_info();
Future<http::Response> response = http::post(
slave.get()->pid,
"api/v1",
createBasicAuthHeaders(DEFAULT_CREDENTIAL),
serialize(ContentType::PROTOBUF, call),
stringify(ContentType::PROTOBUF));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
ContentType messageContentType = GetParam();
Option<http::Pipe::Reader> output;
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_OUTPUT);
call.mutable_attach_container_output()->mutable_container_id()
->CopyFrom(containerId);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(messageContentType);
Future<http::Response> response = http::streaming::post(
slave.get()->pid,
"api/v1",
headers,
serialize(messageContentType, call),
stringify(messageContentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
ASSERT_SOME(response->reader);
output = response->reader.get();
}
string data =
"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
"eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
"ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
"aliquip ex ea commodo consequat. Duis aute irure dolor in "
"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
"pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
"culpa qui officia deserunt mollit anim id est laborum.\n";
// Terminal transforms "\n" to "\r\n".
string stdoutExpected = strings::trim(data) + "\r\n";
http::Pipe pipe;
http::Pipe::Writer writer = pipe.writer();
http::Pipe::Reader reader = pipe.reader();
::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
serialize, messageContentType, lambda::_1));
// Prepare the data that needs to be streamed to the entrypoint
// of the container.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
v1::agent::Call::AttachContainerInput* attach =
call.mutable_attach_container_input();
attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
attach->mutable_container_id()->CopyFrom(containerId);
writer.write(encoder.encode(call));
}
size_t offset = 0;
size_t chunkSize = 4096;
while (offset < data.length()) {
string dataChunk = data.substr(offset, chunkSize);
offset += chunkSize;
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
v1::agent::Call::AttachContainerInput* attach =
call.mutable_attach_container_input();
attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
v1::agent::ProcessIO* processIO = attach->mutable_process_io();
processIO->set_type(v1::agent::ProcessIO::DATA);
processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
processIO->mutable_data()->set_data(dataChunk);
writer.write(encoder.encode(call));
}
// Signal `EOT` to the terminal so that it sends `EOF` to `cat` command.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
v1::agent::Call::AttachContainerInput* attach =
call.mutable_attach_container_input();
attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
v1::agent::ProcessIO* processIO = attach->mutable_process_io();
processIO->set_type(v1::agent::ProcessIO::DATA);
processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
processIO->mutable_data()->set_data("\x04");
writer.write(encoder.encode(call));
}
writer.close();
// TODO(anand): Add a `post()` overload that handles request streaming.
{
http::URL agent = http::URL(
"http",
slave.get()->pid.address.ip,
slave.get()->pid.address.port,
slave.get()->pid.id +
"/api/v1");
Future<http::Connection> _connection = http::connect(agent);
AWAIT_READY(_connection);
http::Connection connection = _connection.get(); // Remove const.
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Content-Type"] = stringify(ContentType::RECORDIO);
headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
http::Request request;
request.url = agent;
request.method = "POST";
request.type = http::Request::PIPE;
request.reader = reader;
request.headers = headers;
Future<http::Response> response = connection.send(request);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
ASSERT_SOME(output);
Future<tuple<string, string>> received =
getProcessIOData(messageContentType, output.get());
AWAIT_READY(received);
string stdoutReceived;
string stderrReceived;
tie(stdoutReceived, stderrReceived) = received.get();
// `stdoutExpected` appears twice in stdout because the terminal in raw mode
// echoes the data once and `cat` outputs it once.
ASSERT_EQ(stdoutExpected + stdoutExpected, stdoutReceived);
ASSERT_TRUE(stderrReceived.empty());
}
// This test launches a nested container session with 'cat' as its
// entrypoint and verifies that any data streamed to the container via
// an ATTACH_CONTAINER_INPUT call is received by the client on the
// output stream.
TEST_P_TEMP_DISABLED_ON_WINDOWS(
AgentAPIStreamingTest,
AttachInputToNestedContainerSession)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
Fetcher fetcher(flags);
Try<MesosContainerizer*> _containerizer =
MesosContainerizer::create(flags, false, &fetcher);
ASSERT_SOME(_containerizer);
Owned<slave::Containerizer> containerizer(_containerizer.get());
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), containerizer.get(), flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
EXPECT_NE(0u, offers->size());
const Offer& offer = offers.get()[0];
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
TaskInfo taskInfo = createTask(offer, "sleep 1000");
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
AWAIT_READY(status);
ASSERT_EQ(TASK_RUNNING, status->state());
Future<hashset<ContainerID>> containerIds = containerizer->containers();
AWAIT_READY(containerIds);
EXPECT_EQ(1u, containerIds->size());
v1::ContainerID containerId;
containerId.set_value(UUID::random().toString());
containerId.mutable_parent()->set_value(containerIds->begin()->value());
ContentType messageContentType = GetParam();
Future<http::Response> sessionResponse;
// Start a new LAUNCH_NESTED_CONTAINER_SESSION with `cat` as the
// command being launched.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
call.mutable_launch_nested_container_session()->mutable_container_id()
->CopyFrom(containerId);
call.mutable_launch_nested_container_session()->mutable_command()
->CopyFrom(v1::createCommandInfo("cat"));
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(ContentType::RECORDIO);
headers[MESSAGE_ACCEPT] = stringify(messageContentType);
sessionResponse = http::streaming::post(
slave.get()->pid,
"api/v1",
headers,
serialize(messageContentType, call),
stringify(messageContentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, sessionResponse);
}
// Prepare the data to send to `cat` and send it over an
// `ATTACH_CONTAINER_INPUT` stream.
string data =
"Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
"eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
"ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
"aliquip ex ea commodo consequat. Duis aute irure dolor in "
"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
"pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
"culpa qui officia deserunt mollit anim id est laborum.";
while (Bytes(data.size()) < Megabytes(1)) {
data.append(data);
}
http::Pipe pipe;
http::Pipe::Writer writer = pipe.writer();
http::Pipe::Reader reader = pipe.reader();
::recordio::Encoder<v1::agent::Call> encoder(lambda::bind(
serialize, messageContentType, lambda::_1));
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
v1::agent::Call::AttachContainerInput* attach =
call.mutable_attach_container_input();
attach->set_type(v1::agent::Call::AttachContainerInput::CONTAINER_ID);
attach->mutable_container_id()->CopyFrom(containerId);
writer.write(encoder.encode(call));
}
size_t offset = 0;
size_t chunkSize = 4096;
while (offset < data.length()) {
string dataChunk = data.substr(offset, chunkSize);
offset += chunkSize;
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
v1::agent::Call::AttachContainerInput* attach =
call.mutable_attach_container_input();
attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
v1::agent::ProcessIO* processIO = attach->mutable_process_io();
processIO->set_type(v1::agent::ProcessIO::DATA);
processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
processIO->mutable_data()->set_data(dataChunk);
writer.write(encoder.encode(call));
}
// Signal `EOF` to the 'cat' command.
{
v1::agent::Call call;
call.set_type(v1::agent::Call::ATTACH_CONTAINER_INPUT);
v1::agent::Call::AttachContainerInput* attach =
call.mutable_attach_container_input();
attach->set_type(v1::agent::Call::AttachContainerInput::PROCESS_IO);
v1::agent::ProcessIO* processIO = attach->mutable_process_io();
processIO->set_type(v1::agent::ProcessIO::DATA);
processIO->mutable_data()->set_type(v1::agent::ProcessIO::Data::STDIN);
processIO->mutable_data()->set_data("");
writer.write(encoder.encode(call));
}
writer.close();
{
// TODO(anand): Add a `post()` overload that handles request streaming.
http::URL agent = http::URL(
"http",
slave.get()->pid.address.ip,
slave.get()->pid.address.port,
slave.get()->pid.id +
"/api/v1");
Future<http::Connection> _connection = http::connect(agent);
AWAIT_READY(_connection);
http::Connection connection = _connection.get(); // Remove const.
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Content-Type"] = stringify(ContentType::RECORDIO);
headers[MESSAGE_CONTENT_TYPE] = stringify(messageContentType);
http::Request request;
request.url = agent;
request.method = "POST";
request.type = http::Request::PIPE;
request.reader = reader;
request.headers = headers;
Future<http::Response> response = connection.send(request);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
}
// Read the output from the LAUNCH_NESTED_CONTAINER_SESSION.
ASSERT_SOME(sessionResponse->reader);
Option<http::Pipe::Reader> output = sessionResponse->reader.get();
ASSERT_SOME(output);
Future<tuple<string, string>> received =
getProcessIOData(messageContentType, output.get());
AWAIT_READY(received);
string stdoutReceived;
string stderrReceived;
tie(stdoutReceived, stderrReceived) = received.get();
// Verify the output matches what we sent.
ASSERT_TRUE(stderrReceived.empty());
ASSERT_EQ(data, stdoutReceived);
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {