blob: da787370b1ed9ac86538cf8a0b8946d8df354478 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "tests/containerizer.hpp"
#include <mutex>
#include "stout/synchronized.hpp"
#include "tests/mesos.hpp"
using process::Failure;
using process::Future;
using process::Owned;
using process::http::Connection;
using std::map;
using std::shared_ptr;
using std::string;
using std::vector;
using testing::_;
using testing::Invoke;
using testing::Return;
using mesos::slave::ContainerClass;
using mesos::slave::ContainerConfig;
using mesos::slave::ContainerTermination;
using mesos::v1::executor::Mesos;
namespace mesos {
namespace internal {
namespace tests {
class TestContainerizerProcess
: public process::Process<TestContainerizerProcess>
{
public:
TestContainerizerProcess() {}
TestContainerizerProcess(
const ExecutorID& executorId,
const std::shared_ptr<v1::MockHTTPExecutor>& executor)
{
executors[executorId] = Owned<ExecutorData>(new ExecutorData());
executors.at(executorId)->v1ExecutorMock = executor;
}
TestContainerizerProcess(
const hashmap<ExecutorID, Executor*>& _executors)
{
foreachpair (const ExecutorID& executorId, Executor* executor, _executors) {
executors[executorId] = Owned<ExecutorData>(new ExecutorData());
executors.at(executorId)->executor = executor;
}
}
~TestContainerizerProcess() override
{
foreachvalue (const Owned<ExecutorData>& data, executors) {
if (data->driver.get() != nullptr) {
data->driver->stop();
data->driver->join();
}
}
}
Future<Nothing> recover(
const Option<slave::state::SlaveState>& state)
{
return Nothing();
}
Future<slave::Containerizer::LaunchResult> launch(
const ContainerID& containerId,
const ContainerConfig& containerConfig,
const map<string, string>& environment,
const Option<string>& pidCheckpointPath)
{
CHECK(!terminatedContainers.contains(containerId))
<< "Failed to launch nested container " << containerId
<< " for executor '" << containerConfig.executor_info().executor_id()
<< "' of framework " << containerConfig.executor_info().framework_id()
<< " because this ContainerID is being re-used with"
<< " a previously terminated container";
CHECK(!containers_.contains(containerId))
<< "Failed to launch container " << containerId
<< " for executor '" << containerConfig.executor_info().executor_id()
<< "' of framework " << containerConfig.executor_info().framework_id()
<< " because it is already launched";
containers_[containerId] = Owned<ContainerData>(new ContainerData());
if (containerId.has_parent()) {
// Launching a nested container via the test containerizer is a
// no-op for now.
return slave::Containerizer::LaunchResult::SUCCESS;
}
CHECK(executors.contains(containerConfig.executor_info().executor_id()))
<< "Failed to launch executor '"
<< containerConfig.executor_info().executor_id()
<< "' of framework " << containerConfig.executor_info().framework_id()
<< " because it is unknown to the containerizer";
containers_.at(containerId)->executorId =
containerConfig.executor_info().executor_id();
containers_.at(containerId)->frameworkId =
containerConfig.executor_info().framework_id();
// Assemble the environment for the executor.
//
// NOTE: Since in this case the executor will live in the same OS process,
// pass the environment into the executor driver (library) c-tor directly
// instead of manipulating `setenv`/`getenv` to avoid concurrent
// modification of the environment.
map<string, string> fullEnvironment = os::environment();
fullEnvironment.insert(environment.begin(), environment.end());
// TODO(benh): Can this be removed and done exclusively in the
// 'executorEnvironment()' function? There are other places in the
// code where we do this as well and it's likely we can do this once
// in 'executorEnvironment()'.
foreach (const Environment::Variable& variable,
containerConfig.executor_info()
.command().environment().variables()) {
fullEnvironment.emplace(variable.name(), variable.value());
}
fullEnvironment.emplace("MESOS_LOCAL", "1");
const Owned<ExecutorData>& executorData =
executors.at(containerConfig.executor_info().executor_id());
if (executorData->executor != nullptr) {
executorData->driver = Owned<MesosExecutorDriver>(
new MesosExecutorDriver(executorData->executor, fullEnvironment));
executorData->driver->start();
} else {
shared_ptr<v1::MockHTTPExecutor> executor =
executorData->v1ExecutorMock;
executorData->v1Library = Owned<v1::executor::TestMesos>(
new v1::executor::TestMesos(
ContentType::PROTOBUF, executor, fullEnvironment));
}
// Checkpoint the forked pid if requested by the agent.
if (pidCheckpointPath.isSome()) {
Try<Nothing> checkpointed = slave::state::checkpoint(
pidCheckpointPath.get(), stringify(::getpid()));
if (checkpointed.isError()) {
LOG(ERROR) << "Failed to checkpoint container's forked pid to '"
<< pidCheckpointPath.get() << "': " << checkpointed.error();
return Failure("Could not checkpoint container's pid");
}
}
return slave::Containerizer::LaunchResult::SUCCESS;
}
Future<Nothing> update(
const ContainerID& containerId,
const Resources& resourceRequests,
const google::protobuf::Map<string, Value::Scalar>& resourceLimits)
{
return Nothing();
}
Future<Connection> attach(
const ContainerID& containerId)
{
return Failure("Unsupported");
}
Future<ResourceStatistics> usage(
const ContainerID& containerId)
{
return ResourceStatistics();
}
Future<ContainerStatus> status(
const ContainerID& containerId)
{
return ContainerStatus();
}
Future<Option<mesos::slave::ContainerTermination>> wait(
const ContainerID& containerId)
{
if (terminatedContainers.contains(containerId)) {
return terminatedContainers.at(containerId);
}
// An unknown container is possible for tests where we "drop" the
// 'launch' in order to verify recovery still works correctly.
if (!containers_.contains(containerId)) {
return None();
}
return containers_.at(containerId)->termination.future()
.then(Option<ContainerTermination>::some);
}
Future<Option<mesos::slave::ContainerTermination>> destroy(
const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
return None();
}
const Owned<ContainerData>& containerData = containers_.at(containerId);
if (containerData->executorId.isSome()) {
CHECK(executors.contains(containerData->executorId.get()));
const Owned<ExecutorData>& executorData =
executors.at(containerData->executorId.get());
if (executorData->driver.get() != nullptr) {
executorData->driver->stop();
executorData->driver->join();
}
executors.erase(containerData->executorId.get());
}
ContainerTermination termination;
termination.set_message("Killed executor");
termination.set_status(0);
containerData->termination.set(termination);
containers_.erase(containerId);
terminatedContainers[containerId] = termination;
return termination;
}
// Additional destroy method for testing because we won't know the
// ContainerID created for each container.
Future<Option<mesos::slave::ContainerTermination>> destroy(
const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
Option<ContainerID> containerId = None();
foreachpair (const ContainerID& containerId_,
const Owned<ContainerData>& container,
containers_) {
if (container->frameworkId == frameworkId &&
container->executorId == executorId) {
containerId = containerId_;
}
}
if (containerId.isNone()) {
LOG(WARNING) << "Ignoring destroy of unknown container"
<< " for executor '" << executorId << "'"
<< " of framework " << frameworkId;
return None();
}
return destroy(containerId.get());
}
Future<bool> kill(const ContainerID& containerId, int /* signal */)
{
return destroy(containerId)
.then([]() { return true; });
}
Future<hashset<ContainerID>> containers()
{
return containers_.keys();
}
Future<Nothing> pruneImages(const vector<Image>& excludedImages)
{
return Nothing();
}
private:
struct ContainerData
{
Option<ExecutorID> executorId;
Option<FrameworkID> frameworkId;
process::Promise<mesos::slave::ContainerTermination> termination;
};
// We also store the terminated containers to allow callers to
// "reap" the termination if a container is already destroyed.
// This mimics the behavior of the mesos containerizer.
hashmap<ContainerID, process::Owned<ContainerData>> containers_;
hashmap<ContainerID, mesos::slave::ContainerTermination> terminatedContainers;
struct ExecutorData
{
// Pre-HTTP executors.
Executor* executor;
process::Owned<MesosExecutorDriver> driver;
// HTTP executors. Note that `mesos::v1::executor::Mesos`
// requires that we provide it a shared pointer to the executor.
shared_ptr<v1::MockHTTPExecutor> v1ExecutorMock;
process::Owned<v1::executor::TestMesos> v1Library;
};
// TODO(bmahler): The test containerizer currently assumes that
// executor IDs are unique across frameworks (see the constructors).
hashmap<ExecutorID, process::Owned<ExecutorData>> executors;
};
TestContainerizer::TestContainerizer(
const ExecutorID& executorId,
const shared_ptr<v1::MockHTTPExecutor>& executor)
: process(new TestContainerizerProcess(executorId, executor))
{
process::spawn(process.get());
setup();
}
TestContainerizer::TestContainerizer(
const hashmap<ExecutorID, Executor*>& executors)
: process(new TestContainerizerProcess(executors))
{
process::spawn(process.get());
setup();
}
TestContainerizer::TestContainerizer(
const ExecutorID& executorId,
Executor* executor)
: process(new TestContainerizerProcess({{executorId, executor}}))
{
process::spawn(process.get());
setup();
}
TestContainerizer::TestContainerizer(MockExecutor* executor)
: process(new TestContainerizerProcess({{executor->id, executor}}))
{
process::spawn(process.get());
setup();
}
TestContainerizer::TestContainerizer()
: process(new TestContainerizerProcess())
{
process::spawn(process.get());
setup();
}
TestContainerizer::~TestContainerizer()
{
process::terminate(process.get());
process::wait(process.get());
}
void TestContainerizer::setup()
{
// NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of
// 'ON_CALL' and 'WillByDefault' because the latter gives the gmock
// warning "Uninteresting mock function call" unless each tests puts
// the expectations in place which would make the tests much more
// verbose.
//
// TODO(bmahler): Update this to use the same style as the
// TestAllocator, which allows us to have default actions
// 'DoDefault', without requiring each test to put expectations in
// place.
EXPECT_CALL(*this, recover(_))
.WillRepeatedly(Invoke(this, &TestContainerizer::_recover));
EXPECT_CALL(*this, usage(_))
.WillRepeatedly(Invoke(this, &TestContainerizer::_usage));
EXPECT_CALL(*this, status(_))
.WillRepeatedly(Invoke(this, &TestContainerizer::_status));
EXPECT_CALL(*this, update(_, _, _))
.WillRepeatedly(Invoke(this, &TestContainerizer::_update));
EXPECT_CALL(*this, launch(_, _, _, _))
.WillRepeatedly(Invoke(this, &TestContainerizer::_launch));
EXPECT_CALL(*this, attach(_))
.WillRepeatedly(Invoke(this, &TestContainerizer::_attach));
EXPECT_CALL(*this, wait(_))
.WillRepeatedly(Invoke(this, &TestContainerizer::_wait));
EXPECT_CALL(*this, destroy(_))
.WillRepeatedly(Invoke(this, &TestContainerizer::_destroy));
EXPECT_CALL(*this, kill(_, _))
.WillRepeatedly(Invoke(this, &TestContainerizer::_kill));
EXPECT_CALL(*this, pruneImages(_))
.WillRepeatedly(Invoke(this, &TestContainerizer::_pruneImages));
}
Future<Nothing> TestContainerizer::_recover(
const Option<slave::state::SlaveState>& state)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::recover,
state);
}
Future<slave::Containerizer::LaunchResult> TestContainerizer::_launch(
const ContainerID& containerId,
const ContainerConfig& containerConfig,
const map<string, string>& environment,
const Option<string>& pidCheckpointPath)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::launch,
containerId,
containerConfig,
environment,
pidCheckpointPath);
}
Future<Connection> TestContainerizer::_attach(
const ContainerID& containerId)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::attach,
containerId);
}
Future<Nothing> TestContainerizer::_update(
const ContainerID& containerId,
const Resources& resourceRequests,
const google::protobuf::Map<string, Value::Scalar>& resourceLimits)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::update,
containerId,
resourceRequests,
resourceLimits);
}
Future<ResourceStatistics> TestContainerizer::_usage(
const ContainerID& containerId)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::usage,
containerId);
}
Future<ContainerStatus> TestContainerizer::_status(
const ContainerID& containerId)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::status,
containerId);
}
Future<Option<mesos::slave::ContainerTermination>> TestContainerizer::_wait(
const ContainerID& containerId)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::wait,
containerId);
}
Future<Option<mesos::slave::ContainerTermination>> TestContainerizer::_destroy(
const ContainerID& containerId)
{
// Need to disambiguate for the compiler.
Future<Option<mesos::slave::ContainerTermination>> (
TestContainerizerProcess::*destroy)(const ContainerID&) =
&TestContainerizerProcess::destroy;
return process::dispatch(
process.get(),
destroy,
containerId);
}
Future<bool> TestContainerizer::_kill(
const ContainerID& containerId,
int signal)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::kill,
containerId,
signal);
}
Future<Option<mesos::slave::ContainerTermination>> TestContainerizer::destroy(
const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
// Need to disambiguate for the compiler.
Future<Option<mesos::slave::ContainerTermination>> (
TestContainerizerProcess::*destroy)(
const FrameworkID&, const ExecutorID&) =
&TestContainerizerProcess::destroy;
return process::dispatch(
process.get(),
destroy,
frameworkId,
executorId);
}
Future<hashset<ContainerID>> TestContainerizer::containers()
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::containers);
}
Future<Nothing> TestContainerizer::_pruneImages(
const vector<Image>& excludedImages)
{
return process::dispatch(
process.get(),
&TestContainerizerProcess::pruneImages,
excludedImages);
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {