blob: 15bb541b063959e81ab0ab2c94c6f3d146be6b1f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <list>
#include <map>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/v1/executor.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/scheduler.hpp>
#include <process/collect.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/timeout.hpp>
#include <stout/duration.hpp>
#include <stout/gtest.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/os/realpath.hpp>
#ifdef __linux__
#include "linux/fs.hpp"
#endif
#include "logging/logging.hpp"
#include "local/local.hpp"
#include "master/master.hpp"
#include "slave/constants.hpp"
#include "slave/flags.hpp"
#include "slave/gc.hpp"
#include "slave/gc_process.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
using mesos::internal::master::Master;
using mesos::internal::slave::GarbageCollector;
using mesos::internal::slave::GarbageCollectorProcess;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using process::Clock;
using process::Future;
using process::Owned;
using process::PID;
using process::Timeout;
using std::list;
using std::map;
using std::string;
using std::vector;
using testing::_;
using testing::AllOf;
using testing::AtMost;
using testing::DoAll;
using testing::Return;
using testing::SaveArg;
namespace mesos {
namespace internal {
namespace tests {
class GarbageCollectorTest : public TemporaryDirectoryTest {};
TEST_F(GarbageCollectorTest, Schedule)
{
GarbageCollector gc("work_dir");
// Make some temporary files to gc.
const string& file1 = "file1";
const string& file2 = "file2";
const string& file3 = "file3";
ASSERT_SOME(os::touch(file1));
ASSERT_SOME(os::touch(file2));
ASSERT_SOME(os::touch(file3));
ASSERT_TRUE(os::exists(file1));
ASSERT_TRUE(os::exists(file2));
ASSERT_TRUE(os::exists(file3));
Clock::pause();
Future<Nothing> scheduleDispatch1 =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
Future<Nothing> scheduleDispatch2 =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
Future<Nothing> scheduleDispatch3 =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
// Schedule the gc operations.
Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
Future<Nothing> schedule3 = gc.schedule(Seconds(15), file3);
// Ensure the dispatches are completed before advancing the clock.
AWAIT_READY(scheduleDispatch1);
AWAIT_READY(scheduleDispatch2);
AWAIT_READY(scheduleDispatch3);
Clock::settle();
JSON::Object metrics = Metrics();
ASSERT_EQ(1u, metrics.values.count("gc/path_removals_pending"));
EXPECT_SOME_EQ(
3u,
metrics.at<JSON::Number>("gc/path_removals_pending"));
// Advance the clock to trigger the GC of file1 and file2.
Clock::advance(Seconds(10));
Clock::settle();
AWAIT_READY(schedule1);
AWAIT_READY(schedule2);
ASSERT_TRUE(schedule3.isPending());
EXPECT_FALSE(os::exists(file1));
EXPECT_FALSE(os::exists(file2));
EXPECT_TRUE(os::exists(file3));
// Trigger the GC of file3.
Clock::advance(Seconds(5));
Clock::settle();
AWAIT_READY(schedule3);
EXPECT_FALSE(os::exists(file3));
metrics = Metrics();
ASSERT_EQ(1u, metrics.values.count("gc/path_removals_pending"));
ASSERT_EQ(1u, metrics.values.count("gc/path_removals_succeeded"));
ASSERT_EQ(1u, metrics.values.count("gc/path_removals_failed"));
EXPECT_SOME_EQ(
0u,
metrics.at<JSON::Number>("gc/path_removals_pending"));
EXPECT_SOME_EQ(
3u,
metrics.at<JSON::Number>("gc/path_removals_succeeded"));
EXPECT_SOME_EQ(
0u,
metrics.at<JSON::Number>("gc/path_removals_failed"));
Clock::resume();
}
TEST_F(GarbageCollectorTest, Unschedule)
{
GarbageCollector gc("work_dir");
// Attempt to unschedule a file that is not scheduled.
AWAIT_ASSERT_FALSE(gc.unschedule("bogus"));
// Make some temporary files to gc.
const string& file1 = "file1";
const string& file2 = "file2";
const string& file3 = "file3";
ASSERT_SOME(os::touch(file1));
ASSERT_SOME(os::touch(file2));
ASSERT_SOME(os::touch(file3));
ASSERT_TRUE(os::exists(file1));
ASSERT_TRUE(os::exists(file2));
ASSERT_TRUE(os::exists(file3));
Clock::pause();
// Schedule the gc operations.
Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
Future<Nothing> schedule3 = gc.schedule(Seconds(10), file3);
// Unschedule each operation.
AWAIT_ASSERT_TRUE(gc.unschedule(file2));
AWAIT_ASSERT_TRUE(gc.unschedule(file3));
AWAIT_ASSERT_TRUE(gc.unschedule(file1));
// Advance the clock to ensure nothing was GCed.
Clock::advance(Seconds(10));
Clock::settle();
// The unscheduling will have discarded the GC futures.
AWAIT_DISCARDED(schedule1);
AWAIT_DISCARDED(schedule2);
AWAIT_DISCARDED(schedule3);
EXPECT_TRUE(os::exists(file1));
EXPECT_TRUE(os::exists(file2));
EXPECT_TRUE(os::exists(file3));
Clock::resume();
}
TEST_F(GarbageCollectorTest, Prune)
{
GarbageCollector gc("work_dir");
// Make some temporary files to prune.
const string& file1 = "file1";
const string& file2 = "file2";
const string& file3 = "file3";
const string& file4 = "file4";
ASSERT_SOME(os::touch(file1));
ASSERT_SOME(os::touch(file2));
ASSERT_SOME(os::touch(file3));
ASSERT_SOME(os::touch(file4));
ASSERT_TRUE(os::exists(file1));
ASSERT_TRUE(os::exists(file2));
ASSERT_TRUE(os::exists(file3));
ASSERT_TRUE(os::exists(file4));
Clock::pause();
Future<Nothing> schedule1 = gc.schedule(Seconds(10), file1);
Future<Nothing> schedule2 = gc.schedule(Seconds(10), file2);
Future<Nothing> schedule3 = gc.schedule(Seconds(15), file3);
Future<Nothing> schedule4 = gc.schedule(Seconds(15), file4);
AWAIT_ASSERT_TRUE(gc.unschedule(file3));
AWAIT_DISCARDED(schedule3);
// Prune file1 and file2.
gc.prune(Seconds(10));
AWAIT_READY(schedule1);
AWAIT_READY(schedule2);
ASSERT_TRUE(schedule4.isPending());
// Both file1 and file2 will have been removed.
EXPECT_FALSE(os::exists(file1));
EXPECT_FALSE(os::exists(file2));
EXPECT_TRUE(os::exists(file3));
EXPECT_TRUE(os::exists(file4));
// Prune file4.
gc.prune(Seconds(15));
AWAIT_READY(schedule4);
EXPECT_FALSE(os::exists(file4));
Clock::resume();
}
class GarbageCollectorIntegrationTest : public MesosTest {};
// This test ensures that garbage collection does not remove
// the slave working directory after a slave restart.
TEST_F(GarbageCollectorIntegrationTest, Restart)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
// Need to create our own flags because we want to reuse them when
// we (re)start the slave below.
slave::Flags flags = CreateSlaveFlags();
// Set the `executor_shutdown_grace_period` to a small value so that
// the agent does not wait for executors to clean up for too long.
flags.executor_shutdown_grace_period = Milliseconds(50);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, flags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(_, _, _));
Resources resources = Resources::parse(flags.resources.get()).get();
double cpus = resources.get<Value::Scalar>("cpus")->value();
double mem = resources.get<Value::Scalar>("mem")->value();
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, cpus, mem, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Ignore offerRescinded calls. The scheduler might receive it
// because the slave might reregister due to ping timeout.
EXPECT_CALL(sched, offerRescinded(_, _))
.WillRepeatedly(Return());
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
driver.start();
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
// Make sure directory exists. Need to do this AFTER getting a
// status update for a task because the directory won't get created
// until the task is launched. We get the slave ID from the
// SlaveRegisteredMessage.
const string& slaveDir = slave::paths::getSlavePath(
flags.work_dir,
slaveRegisteredMessage->slave_id());
ASSERT_TRUE(os::exists(slaveDir));
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(sched, statusUpdate(_, _))
.Times(AtMost(1)); // Ignore TASK_LOST from killed executor.
Future<Nothing> slaveLost;
EXPECT_CALL(sched, slaveLost(_, _))
.WillOnce(FutureSatisfy(&slaveLost));
// Stop the slave with explicit shutdown as otherwise with
// checkpointing the master will wait for the slave to reconnect.
slave.get()->shutdown();
AWAIT_READY(slaveLost);
Clock::pause();
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
// Wait for the agent to finish recovery.
AWAIT_READY(__recover);
Clock::settle();
Clock::advance(flags.gc_delay);
Clock::settle();
// By this time the old slave directory should not be cleaned up.
ASSERT_TRUE(os::exists(slaveDir));
Clock::resume();
driver.stop();
driver.join();
}
TEST_F(GarbageCollectorIntegrationTest, ExitedFramework)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
slave::Flags flags = CreateSlaveFlags();
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, flags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
SlaveID slaveId = slaveRegisteredMessage->slave_id();
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
// Scheduler expectations.
FrameworkID frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(SaveArg<1>(&frameworkId));
Resources resources = Resources::parse(flags.resources.get()).get();
double cpus = resources.get<Value::Scalar>("cpus")->value();
double mem = resources.get<Value::Scalar>("mem")->value();
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, cpus, mem, "*"))
.WillRepeatedly(Return());
// Executor expectations.
EXPECT_CALL(exec, registered(_, _, _, _))
.WillRepeatedly(Return());
EXPECT_CALL(exec, launchTask(_, _))
.WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status))
.WillRepeatedly(Return()); // Ignore subsequent updates.
Future<Nothing> executorLaunched =
FUTURE_DISPATCH(_, &Slave::executorLaunched);
driver.start();
// Wait until the slave has been notified about the start of the
// executor. There is race where in a slave might get status updates
// before it it notified about the start of the executor. This is
// important in this test because if we don't wait and shutdown the
// framework, it might so happen that 'executorLaunched' event is
// received after the slave gets a 'shutdownFramework' leading to
// shutdown and eventually gc of the executor and framework
// directories. We want the gc to happen after we setup the
// expectation on 'gc.schedule'.
AWAIT_READY(executorLaunched);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
Future<Nothing> shutdown;
EXPECT_CALL(exec, shutdown(_))
.WillOnce(FutureSatisfy(&shutdown));
// Shutdown the framework.
driver.stop();
driver.join();
Clock::pause();
AWAIT_READY(shutdown);
Clock::settle(); // Wait for Slave::shutdownExecutor to complete.
Future<Nothing> schedule =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
// Advance clock to kill executor via isolator.
Clock::advance(flags.executor_shutdown_grace_period);
Clock::settle();
AWAIT_READY(schedule);
Clock::settle(); // Wait for GarbageCollectorProcess::schedule to complete.
Clock::advance(flags.gc_delay);
Clock::settle();
// Framework's directory should be gc'ed by now.
const string& frameworkDir = slave::paths::getFrameworkPath(
flags.work_dir, slaveId, frameworkId);
ASSERT_FALSE(os::exists(frameworkDir));
process::UPID filesUpid("files", process::address());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(
filesUpid,
"browse",
"path=" + frameworkDir,
createBasicAuthHeaders(DEFAULT_CREDENTIAL)));
Clock::resume();
}
TEST_F(GarbageCollectorIntegrationTest, ExitedExecutor)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
slave::Flags flags = CreateSlaveFlags();
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, flags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
SlaveID slaveId = slaveRegisteredMessage->slave_id();
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Resources resources = Resources::parse(flags.resources.get()).get();
double cpus = resources.get<Value::Scalar>("cpus")->value();
double mem = resources.get<Value::Scalar>("mem")->value();
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, cpus, mem, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Ignore offerRescinded calls. The scheduler might receive it
// because the slave might reregister due to ping timeout.
EXPECT_CALL(sched, offerRescinded(_, _))
.WillRepeatedly(Return());
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
const string& executorDir = slave::paths::getExecutorPath(
flags.work_dir, slaveId, frameworkId.get(), DEFAULT_EXECUTOR_ID);
ASSERT_TRUE(os::exists(executorDir));
const string& latestDir = slave::paths::getExecutorLatestRunPath(
flags.work_dir, slaveId, frameworkId.get(), DEFAULT_EXECUTOR_ID);
process::UPID latest("files", process::address());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::OK().status,
process::http::get(
latest,
"browse",
"path=" + latestDir,
createBasicAuthHeaders(DEFAULT_CREDENTIAL)));
Clock::pause();
// Killing the executor will cause the slave to schedule its
// directory to get garbage collected.
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
Future<Nothing> schedule =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
EXPECT_CALL(sched, statusUpdate(_, _))
.Times(AtMost(1)); // Ignore TASK_LOST from killed executor.
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
// Kill the executor and inform the slave.
containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
AWAIT_READY(schedule);
Clock::settle(); // Wait for GarbageCollectorProcess::schedule to complete.
Clock::advance(flags.gc_delay);
Clock::settle();
// Executor's directory should be gc'ed by now.
ASSERT_FALSE(os::exists(executorDir));
process::UPID files("files", process::address());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(
files,
"browse",
"path=" + executorDir,
createBasicAuthHeaders(DEFAULT_CREDENTIAL)));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(
latest,
"browse",
"path=" + latestDir,
createBasicAuthHeaders(DEFAULT_CREDENTIAL)));
Clock::resume();
driver.stop();
driver.join();
}
// This test verifies that task metadata and sandboxes are scheduled for GC
// when a task finishes, but the executor is still running.
TEST_F(GarbageCollectorIntegrationTest, LongLivedDefaultExecutor)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// We need this for the agent's work directory and GC policy.
slave::Flags flags = CreateSlaveFlags();
// Turn on GC of nested container sandboxes by default.
flags.gc_non_executor_container_sandboxes = true;
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
// Enable checkpointing, otherwise there will be no metadata to GC.
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::PROTOBUF,
scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
v1::DEFAULT_EXECUTOR_ID,
None(),
resources,
v1::ExecutorInfo::DEFAULT,
frameworkId);
// We launch two tasks for this test:
// * One will be a long-lived task to keep the executor alive.
// * One will be a short-lived task to exercise task metadata/sandbox GC.
v1::TaskInfo longLivedTaskInfo =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
v1::TaskInfo shortLivedTaskInfo =
v1::createTask(agentId, resources, "exit 0");
// There should be a total of 5 updates:
// * TASK_STARTING/RUNNING from the long-lived task,
// * TASK_STARTING/RUNNING/FINISHED from the short-lived task.
testing::Sequence longTask;
Future<v1::scheduler::Event::Update> longStartingUpdate;
Future<v1::scheduler::Event::Update> longRunningUpdate;
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(longLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_STARTING))))
.InSequence(longTask)
.WillOnce(DoAll(
FutureArg<1>(&longStartingUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(longLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
.InSequence(longTask)
.WillOnce(DoAll(
FutureArg<1>(&longRunningUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
testing::Sequence shortTask;
Future<v1::scheduler::Event::Update> shortStartingUpdate;
Future<v1::scheduler::Event::Update> shortRunningUpdate;
Future<v1::scheduler::Event::Update> shortFinishedUpdate;
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(shortLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_STARTING))))
.InSequence(shortTask)
.WillOnce(DoAll(
FutureArg<1>(&shortStartingUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(shortLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
.InSequence(shortTask)
.WillOnce(DoAll(
FutureArg<1>(&shortRunningUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(shortLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_FINISHED))))
.InSequence(shortTask)
.WillOnce(DoAll(
FutureArg<1>(&shortFinishedUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
// There should be two directories scheduled for GC:
// the short-lived task's metadata and sandbox.
vector<Future<Nothing>> schedules = {
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule)
};
mesos.send(
v1::createCallAccept(
frameworkId,
offer,
{
v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({longLivedTaskInfo})),
v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({shortLivedTaskInfo}))
}));
AWAIT_READY(collect(schedules));
AWAIT_READY(longStartingUpdate);
AWAIT_READY(longRunningUpdate);
AWAIT_READY(shortStartingUpdate);
AWAIT_READY(shortRunningUpdate);
AWAIT_READY(shortFinishedUpdate);
// Check that the short-lived task's metadata and sandbox exist.
string shortLivedTaskPath = slave::paths::getTaskPath(
slave::paths::getMetaRootDir(flags.work_dir),
devolve(agentId),
devolve(frameworkId),
devolve(executorInfo.executor_id()),
devolve(
shortStartingUpdate->status()
.container_status().container_id().parent()),
devolve(shortLivedTaskInfo.task_id()));
ASSERT_TRUE(os::exists(shortLivedTaskPath));
string shortLivedSandboxPath = path::join(
slave::paths::getExecutorRunPath(
flags.work_dir,
devolve(agentId),
devolve(frameworkId),
devolve(executorInfo.executor_id()),
devolve(
shortStartingUpdate->status()
.container_status().container_id().parent())),
"containers",
shortStartingUpdate->status().container_status().container_id().value());
ASSERT_TRUE(os::exists(shortLivedSandboxPath));
// Check another metadata directory that should only be GC'd after the
// executor exits.
string executorMetaPath = slave::paths::getExecutorPath(
slave::paths::getMetaRootDir(flags.work_dir),
devolve(agentId),
devolve(frameworkId),
devolve(executorInfo.executor_id()));
ASSERT_TRUE(os::exists(executorMetaPath));
// Trigger garbage collection on the short-lived task's directories
// and check that those are properly deleted.
Clock::pause();
Clock::advance(flags.gc_delay);
Clock::settle();
Clock::resume();
ASSERT_FALSE(os::exists(shortLivedTaskPath));
ASSERT_FALSE(os::exists(shortLivedSandboxPath));
ASSERT_TRUE(os::exists(executorMetaPath));
// Kill the remaining task and trigger garbage collection again.
Future<v1::scheduler::Event::Update> killedUpdate;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(DoAll(
FutureArg<1>(&killedUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
// Since this is the last executor belonging to the framework, we expect
// multiple directories to be scheduled for GC:
// * Task, Executor container, Executor, and Framework metadata directories.
// * Executor sandbox and run directories.
// * Framework work directory.
schedules = {
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule)
};
mesos.send(v1::createCallKill(frameworkId, longLivedTaskInfo.task_id()));
AWAIT_READY(killedUpdate);
EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
EXPECT_EQ(longLivedTaskInfo.task_id(), killedUpdate->status().task_id());
AWAIT_READY(collect(schedules));
// Trigger GC and then check one of the directories above.
Clock::pause();
Clock::advance(flags.gc_delay);
Clock::settle();
Clock::resume();
ASSERT_FALSE(os::exists(executorMetaPath));
}
// This test verifies that task metadata and sandboxes are scheduled for GC
// when a task finishes, but the executor is still running. This version of
// the test restarts the agent to ensure recovered tasks are also scheduled
// for GC.
TEST_F(GarbageCollectorIntegrationTest, LongLivedDefaultExecutorRestart)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// We need this for the agent's work directory and GC policy.
slave::Flags flags = CreateSlaveFlags();
// Turn on GC of nested container sandboxes by default.
flags.gc_non_executor_container_sandboxes = true;
// Start the slave with a static process ID. This allows the executor to
// reconnect with the slave upon a process restart.
const string id(process::ID::generate("agent"));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), id, flags, false);
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
// Enable checkpointing, otherwise there will be no metadata to GC.
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(true);
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::PROTOBUF,
scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
v1::DEFAULT_EXECUTOR_ID,
None(),
resources,
v1::ExecutorInfo::DEFAULT,
frameworkId);
// We launch two tasks for this test:
// * One will be a long-lived task to keep the executor alive.
// * One will be a short-lived task to exercise task metadata/sandbox GC.
v1::TaskInfo longLivedTaskInfo =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
v1::TaskInfo shortLivedTaskInfo =
v1::createTask(agentId, resources, "exit 0");
// There should be a total of 5 updates:
// * TASK_STARTING/RUNNING from the long-lived task,
// * TASK_STARTING/RUNNING/FINISHED from the short-lived task.
testing::Sequence longTask;
Future<v1::scheduler::Event::Update> longStartingUpdate;
Future<v1::scheduler::Event::Update> longRunningUpdate;
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(longLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_STARTING))))
.InSequence(longTask)
.WillOnce(DoAll(
FutureArg<1>(&longStartingUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(longLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
.InSequence(longTask)
.WillOnce(DoAll(
FutureArg<1>(&longRunningUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
testing::Sequence shortTask;
Future<v1::scheduler::Event::Update> shortStartingUpdate;
Future<v1::scheduler::Event::Update> shortRunningUpdate;
Future<v1::scheduler::Event::Update> shortFinishedUpdate;
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(shortLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_STARTING))))
.InSequence(shortTask)
.WillOnce(DoAll(
FutureArg<1>(&shortStartingUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(shortLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
.InSequence(shortTask)
.WillOnce(DoAll(
FutureArg<1>(&shortRunningUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
EXPECT_CALL(
*scheduler,
update(_, AllOf(
TaskStatusUpdateTaskIdEq(shortLivedTaskInfo.task_id()),
TaskStatusUpdateStateEq(v1::TASK_FINISHED))))
.InSequence(shortTask)
.WillOnce(DoAll(
FutureArg<1>(&shortFinishedUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)));
// There should be two directories scheduled for GC:
// the short-lived task's metadata and sandbox.
vector<Future<Nothing>> schedules = {
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule),
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule)
};
mesos.send(
v1::createCallAccept(
frameworkId,
offer,
{
v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({longLivedTaskInfo})),
v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({shortLivedTaskInfo}))
}));
AWAIT_READY(collect(schedules));
AWAIT_READY(longStartingUpdate);
AWAIT_READY(longRunningUpdate);
AWAIT_READY(shortStartingUpdate);
AWAIT_READY(shortRunningUpdate);
AWAIT_READY(shortFinishedUpdate);
// Check that the short-lived task's metadata and sandbox exist.
string shortLivedTaskPath = slave::paths::getTaskPath(
slave::paths::getMetaRootDir(flags.work_dir),
devolve(agentId),
devolve(frameworkId),
devolve(executorInfo.executor_id()),
devolve(
shortStartingUpdate->status()
.container_status().container_id().parent()),
devolve(shortLivedTaskInfo.task_id()));
ASSERT_TRUE(os::exists(shortLivedTaskPath));
string shortLivedSandboxPath = path::join(
slave::paths::getExecutorRunPath(
flags.work_dir,
devolve(agentId),
devolve(frameworkId),
devolve(executorInfo.executor_id()),
devolve(
shortStartingUpdate->status()
.container_status().container_id().parent())),
"containers",
shortStartingUpdate->status().container_status().container_id().value());
ASSERT_TRUE(os::exists(shortLivedSandboxPath));
// Restart the agent to wipe out any scheduled GC.
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
slave.get()->terminate();
// The agent should reregister once recovery is complete, which also means
// that any finished tasks metadata/sandboxes should be rescheduled for GC.
slave = StartSlave(detector.get(), id, flags, false);
ASSERT_SOME(slave);
AWAIT_READY(slaveReregisteredMessage);
// Trigger garbage collection on the short-lived task's directories
// and check that those are properly deleted.
Clock::pause();
Clock::advance(flags.gc_delay);
Clock::settle();
Clock::resume();
ASSERT_FALSE(os::exists(shortLivedTaskPath));
ASSERT_FALSE(os::exists(shortLivedSandboxPath));
}
TEST_F(GarbageCollectorIntegrationTest, DiskUsage)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
slave::Flags flags = CreateSlaveFlags();
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, flags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
SlaveID slaveId = slaveRegisteredMessage->slave_id();
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Resources resources = Resources::parse(flags.resources.get()).get();
double cpus = resources.get<Value::Scalar>("cpus")->value();
double mem = resources.get<Value::Scalar>("mem")->value();
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, cpus, mem, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
const string& executorDir = slave::paths::getExecutorPath(
flags.work_dir, slaveId, frameworkId.get(), DEFAULT_EXECUTOR_ID);
ASSERT_TRUE(os::exists(executorDir));
Clock::pause();
// Killing the executor will cause the slave to schedule its
// directory to get garbage collected.
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
Future<Nothing> schedule =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
EXPECT_CALL(sched, statusUpdate(_, _))
.Times(AtMost(1)); // Ignore TASK_LOST from killed executor.
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, slaveId, _));
// Kill the executor and inform the slave.
containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
AWAIT_READY(schedule);
Clock::settle(); // Wait for GarbageCollectorProcess::schedule to complete.
// We advance the clock here so that the 'removalTime' of the
// executor directory is definitely less than 'flags.gc_delay' in
// the GarbageCollectorProcess 'GarbageCollector::prune()' gets
// called (below). Otherwise, due to double comparison precision
// in 'prune()' the directory might not be deleted.
Clock::advance(Seconds(1));
Future<Nothing> _checkDiskUsage =
FUTURE_DISPATCH(_, &Slave::_checkDiskUsage);
// Simulate a disk full message to the slave.
process::dispatch(
slave.get()->pid,
&Slave::_checkDiskUsage,
Try<double>(1.0 - slave::GC_DISK_HEADROOM));
AWAIT_READY(_checkDiskUsage);
Clock::settle(); // Wait for Slave::_checkDiskUsage to complete.
// Executor's directory should be gc'ed by now.
ASSERT_FALSE(os::exists(executorDir));
process::UPID files("files", process::address());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
process::http::NotFound().status,
process::http::get(
files,
"browse",
"path=" + executorDir,
createBasicAuthHeaders(DEFAULT_CREDENTIAL)));
Clock::resume();
driver.stop();
driver.join();
}
// This test verifies that the launch of new executor will result in
// an unschedule of the framework work directory created by an old
// executor.
TEST_F(GarbageCollectorIntegrationTest, Unschedule)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<SlaveRegisteredMessage> slaveRegistered =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
ExecutorInfo executor1 = createExecutorInfo("executor-1", "exit 1");
ExecutorInfo executor2 = createExecutorInfo("executor-2", "exit 1");
MockExecutor exec1(executor1.executor_id());
MockExecutor exec2(executor2.executor_id());
hashmap<ExecutorID, Executor*> execs;
execs[executor1.executor_id()] = &exec1;
execs[executor2.executor_id()] = &exec2;
TestContainerizer containerizer(execs);
slave::Flags flags = CreateSlaveFlags();
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, flags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegistered);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Resources resources = Resources::parse(flags.resources.get()).get();
double cpus = resources.get<Value::Scalar>("cpus")->value();
double mem = resources.get<Value::Scalar>("mem")->value();
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(LaunchTasks(executor1, 1, cpus, mem, "*"));
EXPECT_CALL(exec1, registered(_, _, _, _));
EXPECT_CALL(exec1, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status->state());
// TODO(benh/vinod): Would've been great to match the dispatch
// against arguments here.
// NOTE: Since Google Mock selects the last matching expectation
// that is still active, the order of (un)schedule expectations
// below are the reverse of the actual (un)schedule call order.
// Schedule framework work directory.
Future<Nothing> scheduleFrameworkWork =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
// Schedule top level executor work directory.
Future<Nothing> scheduleExecutorWork =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
// Schedule executor run work directory.
Future<Nothing> scheduleExecutorRunWork =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
// Unschedule framework work directory.
Future<Nothing> unscheduleFrameworkWork =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::unschedule);
// We ask the isolator to kill the first executor below.
EXPECT_CALL(exec1, shutdown(_))
.Times(AtMost(1));
EXPECT_CALL(sched, statusUpdate(_, _))
.Times(AtMost(2)); // Once for a TASK_LOST then once for TASK_RUNNING.
// We use the killed executor/tasks resources to run another task.
EXPECT_CALL(sched, resourceOffers(_, _))
.WillOnce(LaunchTasks(executor2, 1, cpus, mem, "*"));
EXPECT_CALL(exec2, registered(_, _, _, _));
EXPECT_CALL(exec2, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
EXPECT_CALL(sched, executorLost(&driver, exec1.id, _, _));
Clock::pause();
// Kill the first executor.
containerizer.destroy(frameworkId.get(), exec1.id);
AWAIT_READY(scheduleExecutorRunWork);
AWAIT_READY(scheduleExecutorWork);
AWAIT_READY(scheduleFrameworkWork);
// Speedup the allocator.
while (unscheduleFrameworkWork.isPending()) {
Clock::advance(Seconds(1));
Clock::settle();
}
AWAIT_READY(unscheduleFrameworkWork);
Clock::resume();
EXPECT_CALL(exec2, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
#ifdef __linux__
// This test creates a persistent volume and runs a task which mounts the volume
// inside the sandbox, to simulate a dangling mount which agent failed to
// clean up (see MESOS-8830). We verify that GC process will unmount the
// dangling mount point successfully and report success in metrics.
TEST_F(GarbageCollectorIntegrationTest, ROOT_DanglingMount)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
slave::Flags flags = CreateSlaveFlags();
flags.resources = strings::format("disk(%s):1024", DEFAULT_TEST_ROLE).get();
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched,
frameworkInfo,
master.get()->pid,
DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(_, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_FALSE(offers->empty());
const Offer& offer = offers->at(0);
string persistenceId = "persistence-id";
string containerPath = "path";
Resource volume = createPersistentVolume(
Megabytes(1024),
DEFAULT_TEST_ROLE,
persistenceId,
containerPath,
None(),
None(),
frameworkInfo.principal());
string mountPoint = "dangling";
string hostPath = slave::paths::getPersistentVolumePath(
flags.work_dir, DEFAULT_TEST_ROLE, persistenceId);
string fileInVolume = "foo.txt";
TaskInfo task = createTask(
offer.slave_id(),
Resources(volume),
"touch "+ path::join(containerPath, fileInVolume) + "; "
"mkdir " + mountPoint + "; "
"mount --bind " + hostPath + " " + mountPoint,
None(),
"test-task123",
"test-task123");
Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
Future<Nothing> schedule = FUTURE_DISPATCH(
_, &GarbageCollectorProcess::schedule);
Future<Nothing> ack1 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
Future<Nothing> ack2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})});
AWAIT_READY(status0);
EXPECT_EQ(task.task_id(), status0->task_id());
EXPECT_EQ(TASK_STARTING, status0->state());
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
ExecutorID executorId;
executorId.set_value("test-task123");
Result<string> _sandbox = os::realpath(slave::paths::getExecutorLatestRunPath(
flags.work_dir,
offer.slave_id(),
frameworkId.get(),
executorId));
ASSERT_SOME(_sandbox);
string sandbox = _sandbox.get();
EXPECT_TRUE(os::exists(sandbox));
// Wait for the task to create the dangling mount point.
Timeout timeout = Timeout::in(process::TEST_AWAIT_TIMEOUT);
while (!os::exists(path::join(sandbox, mountPoint)) ||
!timeout.expired()) {
os::sleep(Milliseconds(10));
}
ASSERT_TRUE(os::exists(path::join(sandbox, mountPoint)));
AWAIT_READY(status2);
ASSERT_EQ(task.task_id(), status2->task_id());
EXPECT_EQ(TASK_FINISHED, status2->state());
AWAIT_READY(schedule);
ASSERT_TRUE(os::exists(path::join(sandbox, mountPoint)));
ASSERT_TRUE(os::exists(path::join(hostPath, fileInVolume)));
AWAIT_READY(schedule);
Clock::pause();
Clock::advance(flags.gc_delay);
Clock::settle();
// Verify that GC metrics showes no failure.
JSON::Object metrics = Metrics();
ASSERT_EQ(1u, metrics.values.count("gc/path_removals_pending"));
ASSERT_EQ(1u, metrics.values.count("gc/path_removals_succeeded"));
ASSERT_EQ(1u, metrics.values.count("gc/path_removals_failed"));
EXPECT_SOME_EQ(
0u,
metrics.at<JSON::Number>("gc/path_removals_pending"));
EXPECT_SOME_EQ(
0u,
metrics.at<JSON::Number>("gc/path_removals_failed"));
ASSERT_SOME(metrics.at<JSON::Number>("gc/path_removals_succeeded"));
EXPECT_GT(
metrics.at<JSON::Number>("gc/path_removals_succeeded")->as<unsigned>(),
0u);
ASSERT_FALSE(os::exists(path::join(sandbox, mountPoint)));
ASSERT_TRUE(os::exists(path::join(hostPath, fileInVolume)));
Clock::resume();
driver.stop();
driver.join();
}
// This is a regression test for MESOS-9966. It enables the agent
// flag `--gc_non_executor_container_sandboxes` and launches a
// nested container with checkpointing disabled, and then verifies
// that agent can recover successfully.
//
// TODO(qianzhang): For now, this test is Linux specific because
// the POSIX launcher is not able to destroy orphan containers
// after recovery, see MESOS-8771 for details.
TEST_F(GarbageCollectorIntegrationTest, ROOT_OrphanContainer)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
// Turn on GC of nested container sandboxes by default.
slave::Flags flags = CreateSlaveFlags();
flags.gc_non_executor_container_sandboxes = true;
flags.launcher = "linux";
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags, false);
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
// Disable checkpointing so the container launched by the framework will be
// orphan after agent restarts.
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_checkpoint(false);
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
v1::scheduler::TestMesos mesos(
master.get()->pid,
ContentType::PROTOBUF,
scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
v1::DEFAULT_EXECUTOR_ID,
None(),
resources,
v1::ExecutorInfo::DEFAULT,
frameworkId);
v1::TaskInfo taskInfo =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
Future<v1::scheduler::Event::Update> startingUpdate;
Future<v1::scheduler::Event::Update> runningUpdate;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(DoAll(
FutureArg<1>(&startingUpdate),
v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(FutureArg<1>(&runningUpdate))
.WillRepeatedly(Return());
mesos.send(
v1::createCallAccept(
frameworkId,
offer,
{v1::LAUNCH_GROUP(
executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
AWAIT_READY(startingUpdate);
ASSERT_EQ(v1::TASK_STARTING, startingUpdate->status().state());
EXPECT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
EXPECT_TRUE(startingUpdate->status().has_timestamp());
AWAIT_READY(runningUpdate);
// Restart the agent.
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
slave.get()->terminate();
slave = StartSlave(detector.get(), flags, false);
ASSERT_SOME(slave);
AWAIT_READY(slaveReregisteredMessage);
}
#endif // __linux__
} // namespace tests {
} // namespace internal {
} // namespace mesos {