blob: e25c60431a208977088d370b32749f3c50f2bba6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <vector>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/os.hpp>
#include "common/protobuf_utils.hpp"
#include "slave/state.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/composing.hpp"
using namespace process;
using std::map;
using std::string;
using std::vector;
using mesos::slave::ContainerClass;
using mesos::slave::ContainerConfig;
using mesos::slave::ContainerTermination;
namespace mesos {
namespace internal {
namespace slave {
class ComposingContainerizerProcess
: public Process<ComposingContainerizerProcess>
{
public:
ComposingContainerizerProcess(
const vector<Containerizer*>& containerizers)
: ProcessBase(process::ID::generate("composing-containerizer")),
containerizers_(containerizers) {}
~ComposingContainerizerProcess() override;
Future<Nothing> recover(
const Option<state::SlaveState>& state);
Future<Containerizer::LaunchResult> launch(
const ContainerID& containerId,
const ContainerConfig& config,
const map<string, string>& environment,
const Option<std::string>& pidCheckpointPath);
Future<http::Connection> attach(
const ContainerID& containerId);
Future<Nothing> update(
const ContainerID& containerId,
const Resources& resourceRequests,
const google::protobuf::Map<
std::string, Value::Scalar>& resourceLimits = {});
Future<ResourceStatistics> usage(
const ContainerID& containerId);
Future<ContainerStatus> status(
const ContainerID& containerId);
Future<Option<ContainerTermination>> wait(
const ContainerID& containerId);
Future<Option<ContainerTermination>> destroy(
const ContainerID& containerId);
Future<bool> kill(const ContainerID& containerId, int signal);
Future<hashset<ContainerID>> containers();
Future<Nothing> remove(const ContainerID& containerId);
Future<Nothing> pruneImages(const vector<Image>& excludedImages);
private:
// Continuations.
Future<Nothing> _recover();
Future<Nothing> __recover(
Containerizer* containerizer,
const hashset<ContainerID>& containers);
static Future<Nothing> ___recover();
Future<Containerizer::LaunchResult> _launch(
const ContainerID& containerId,
const ContainerConfig& config,
const map<string, string>& environment,
const Option<std::string>& pidCheckpointPath,
vector<Containerizer*>::iterator containerizer,
Containerizer::LaunchResult launchResult);
// Continuation for nested containers.
Future<Containerizer::LaunchResult> _launch(
const ContainerID& containerId,
Containerizer::LaunchResult launchResult);
vector<Containerizer*> containerizers_;
// The states that the composing containerizer cares about for the
// container it is asked to launch.
enum State
{
LAUNCHING,
LAUNCHED,
DESTROYING,
// No need for DESTROYED, since we remove containers
// once the destroy completes.
};
struct Container
{
State state;
Containerizer* containerizer;
};
hashmap<ContainerID, Container*> containers_;
};
Try<ComposingContainerizer*> ComposingContainerizer::create(
const vector<Containerizer*>& containerizers)
{
return new ComposingContainerizer(containerizers);
}
ComposingContainerizer::ComposingContainerizer(
const vector<Containerizer*>& containerizers)
{
process = new ComposingContainerizerProcess(containerizers);
spawn(process);
}
ComposingContainerizer::~ComposingContainerizer()
{
terminate(process);
process::wait(process);
delete process;
}
Future<Nothing> ComposingContainerizer::recover(
const Option<state::SlaveState>& state)
{
return dispatch(process, &ComposingContainerizerProcess::recover, state);
}
Future<Containerizer::LaunchResult> ComposingContainerizer::launch(
const ContainerID& containerId,
const ContainerConfig& containerConfig,
const map<string, string>& environment,
const Option<std::string>& pidCheckpointPath)
{
return dispatch(process,
&ComposingContainerizerProcess::launch,
containerId,
containerConfig,
environment,
pidCheckpointPath);
}
Future<Nothing> ComposingContainerizer::update(
const ContainerID& containerId,
const Resources& resourceRequests,
const google::protobuf::Map<string, Value::Scalar>& resourceLimits)
{
return dispatch(process,
&ComposingContainerizerProcess::update,
containerId,
resourceRequests,
resourceLimits);
}
Future<http::Connection> ComposingContainerizer::attach(
const ContainerID& containerId)
{
return dispatch(process,
&ComposingContainerizerProcess::attach,
containerId);
}
Future<ResourceStatistics> ComposingContainerizer::usage(
const ContainerID& containerId)
{
return dispatch(process, &ComposingContainerizerProcess::usage, containerId);
}
Future<ContainerStatus> ComposingContainerizer::status(
const ContainerID& containerId)
{
return dispatch(process, &ComposingContainerizerProcess::status, containerId);
}
Future<Option<ContainerTermination>> ComposingContainerizer::wait(
const ContainerID& containerId)
{
return dispatch(process, &ComposingContainerizerProcess::wait, containerId);
}
Future<Option<ContainerTermination>> ComposingContainerizer::destroy(
const ContainerID& containerId)
{
return dispatch(process,
&ComposingContainerizerProcess::destroy,
containerId);
}
Future<bool> ComposingContainerizer::kill(
const ContainerID& containerId,
int signal)
{
return dispatch(process,
&ComposingContainerizerProcess::kill,
containerId,
signal);
}
Future<hashset<ContainerID>> ComposingContainerizer::containers()
{
return dispatch(process, &ComposingContainerizerProcess::containers);
}
Future<Nothing> ComposingContainerizer::remove(const ContainerID& containerId)
{
return dispatch(process, &ComposingContainerizerProcess::remove, containerId);
}
Future<Nothing> ComposingContainerizer::pruneImages(
const vector<Image>& excludedImages)
{
return dispatch(
process, &ComposingContainerizerProcess::pruneImages, excludedImages);
}
ComposingContainerizerProcess::~ComposingContainerizerProcess()
{
foreach (Containerizer* containerizer, containerizers_) {
delete containerizer;
}
foreachvalue (Container* container, containers_) {
delete container;
}
containerizers_.clear();
containers_.clear();
}
Future<Nothing> ComposingContainerizerProcess::recover(
const Option<state::SlaveState>& state)
{
// Recover each containerizer in parallel.
vector<Future<Nothing>> futures;
foreach (Containerizer* containerizer, containerizers_) {
futures.push_back(containerizer->recover(state));
}
return collect(futures)
.then(defer(self(), &Self::_recover));
}
Future<Nothing> ComposingContainerizerProcess::_recover()
{
// Now collect all the running containers in order to multiplex.
vector<Future<Nothing>> futures;
foreach (Containerizer* containerizer, containerizers_) {
Future<Nothing> future = containerizer->containers()
.then(defer(self(), &Self::__recover, containerizer, lambda::_1));
futures.push_back(future);
}
return collect(futures)
.then(lambda::bind(&Self::___recover));
}
Future<Nothing> ComposingContainerizerProcess::__recover(
Containerizer* containerizer,
const hashset<ContainerID>& containers)
{
foreach (const ContainerID& containerId, containers) {
Container* container = new Container();
container->state = LAUNCHED;
container->containerizer = containerizer;
containers_[containerId] = container;
// This is needed for eventually removing the given container from
// the list of active containers.
containerizer->wait(containerId)
.onAny(defer(self(), [=](const Future<Option<ContainerTermination>>&) {
if (containers_.contains(containerId)) {
delete containers_.at(containerId);
containers_.erase(containerId);
}
}));
}
return Nothing();
}
Future<Nothing> ComposingContainerizerProcess::___recover()
{
LOG(INFO) << "Finished recovering all containerizers";
return Nothing();
}
Future<Containerizer::LaunchResult> ComposingContainerizerProcess::_launch(
const ContainerID& containerId,
const ContainerConfig& containerConfig,
const map<string, string>& environment,
const Option<std::string>& pidCheckpointPath,
vector<Containerizer*>::iterator containerizer,
Containerizer::LaunchResult launchResult)
{
if (!containers_.contains(containerId)) {
// If we are here a destroy started and finished in the interim.
return launchResult;
}
Container* container = containers_.at(containerId);
if (launchResult == Containerizer::LaunchResult::SUCCESS) {
// Note that we don't update the state if a destroy is in progress.
if (container->state == LAUNCHING) {
container->state = LAUNCHED;
// This is needed for eventually removing the given container from
// the list of active containers.
container->containerizer->wait(containerId)
.onAny(defer(self(), [=](const Future<Option<ContainerTermination>>&) {
if (containers_.contains(containerId)) {
delete containers_.at(containerId);
containers_.erase(containerId);
}
}));
}
// Note that the return value is not impacted
// by whether a destroy is currently in progress.
return Containerizer::LaunchResult::SUCCESS;
}
// If we are here, the launch is not supported by `containerizer`.
// Try the next containerizer.
++containerizer;
if (containerizer == containerizers_.end()) {
// If we are here none of the containerizers support the launch.
// We destroy the container irrespective whether
// a destroy is already in progress, for simplicity.
containers_.erase(containerId);
delete container;
// None of the containerizers support the launch.
return Containerizer::LaunchResult::NOT_SUPPORTED;
}
if (container->state == DESTROYING) {
// If we are here there is at least one more containerizer that could
// potentially launch this container. But since a destroy is in progress
// we do not try any other containerizers.
//
// We return failure here because there is a chance some other
// containerizer might be able to launch this container but
// we are not trying it because a destroy is in progress.
return Failure("Container was destroyed while launching");
}
container->containerizer = *containerizer;
return (*containerizer)->launch(
containerId,
containerConfig,
environment,
pidCheckpointPath)
.then(defer(
self(),
&Self::_launch,
containerId,
containerConfig,
environment,
pidCheckpointPath,
containerizer,
lambda::_1));
}
Future<Containerizer::LaunchResult> ComposingContainerizerProcess::launch(
const ContainerID& containerId,
const ContainerConfig& containerConfig,
const map<string, string>& environment,
const Option<std::string>& pidCheckpointPath)
{
if (containers_.contains(containerId)) {
return Containerizer::LaunchResult::ALREADY_LAUNCHED;
}
Container* container = new Container();
container->state = LAUNCHING;
containers_[containerId] = container;
// For nested containers, use the containerizer that launched the
// root container. This code path uses a different continuation
// function because there is no need to try other containerizers.
if (containerId.has_parent()) {
ContainerID rootContainerId = protobuf::getRootContainerId(containerId);
if (!containers_.contains(rootContainerId)) {
// We do cleanup here, otherwise we cannot remove or destroy the nested
// container due to its undefined `containerizer` field.
containers_.erase(containerId);
delete container;
return Failure(
"Root container " + stringify(rootContainerId) + " not found");
}
Containerizer* containerizer =
containers_.at(rootContainerId)->containerizer;
container->containerizer = containerizer;
return containerizer->launch(
containerId,
containerConfig,
environment,
pidCheckpointPath)
.then(defer(self(),
&Self::_launch,
containerId,
lambda::_1));
}
// Try each containerizer. If none of them handle the
// TaskInfo/ExecutorInfo then return a Failure.
vector<Containerizer*>::iterator containerizer = containerizers_.begin();
container->containerizer = *containerizer;
return (*containerizer)->launch(
containerId,
containerConfig,
environment,
pidCheckpointPath)
.then(defer(self(),
&Self::_launch,
containerId,
containerConfig,
environment,
pidCheckpointPath,
containerizer,
lambda::_1));
}
Future<Containerizer::LaunchResult> ComposingContainerizerProcess::_launch(
const ContainerID& containerId,
Containerizer::LaunchResult launchResult)
{
if (!containers_.contains(containerId)) {
// If we are here a destroy started and finished in the interim.
return launchResult;
}
Container* container = containers_.at(containerId);
if (launchResult == Containerizer::LaunchResult::SUCCESS) {
// Note that we don't update the state if a destroy is in progress.
if (container->state == LAUNCHING) {
container->state = LAUNCHED;
// This is needed for eventually removing the given container from
// the list of active containers.
container->containerizer->wait(containerId)
.onAny(defer(self(), [=](const Future<Option<ContainerTermination>>&) {
if (containers_.contains(containerId)) {
delete containers_.at(containerId);
containers_.erase(containerId);
}
}));
}
// Note that the return value is not impacted
// by whether a destroy is currently in progress.
return Containerizer::LaunchResult::SUCCESS;
}
// If we are here, the launch is not supported by the containerizer.
// We destroy the container irrespective whether
// a destroy is already in progress, for simplicity.
containers_.erase(containerId);
delete container;
return Containerizer::LaunchResult::NOT_SUPPORTED;
}
Future<http::Connection> ComposingContainerizerProcess::attach(
const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
return Failure("Container not found");
}
return containers_[containerId]->containerizer->attach(containerId);
}
Future<Nothing> ComposingContainerizerProcess::update(
const ContainerID& containerId,
const Resources& resourceRequests,
const google::protobuf::Map<string, Value::Scalar>& resourceLimits)
{
if (!containers_.contains(containerId)) {
return Failure("Container not found");
}
return containers_[containerId]->containerizer->update(
containerId, resourceRequests, resourceLimits);
}
Future<ResourceStatistics> ComposingContainerizerProcess::usage(
const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
return Failure("Container not found");
}
return containers_[containerId]->containerizer->usage(containerId);
}
Future<ContainerStatus> ComposingContainerizerProcess::status(
const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
return Failure("Container not found");
}
return containers_[containerId]->containerizer->status(containerId);
}
Future<Option<ContainerTermination>> ComposingContainerizerProcess::wait(
const ContainerID& containerId)
{
// A nested container might have already been terminated, therefore
// `containers_` might not contain it, but its exit status might have
// been checkpointed.
//
// The containerizer that launched the root container should be able
// to retrieve the exit status even if it has been checkpointed.
const ContainerID rootContainerId = protobuf::getRootContainerId(containerId);
if (!containers_.contains(rootContainerId)) {
return None();
}
return containers_[rootContainerId]->containerizer->wait(containerId);
}
Future<Option<ContainerTermination>> ComposingContainerizerProcess::destroy(
const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
// TODO(bmahler): Currently the agent does not log destroy
// failures or unknown containers, so we log it here for now.
// Move this logging into the callers.
LOG(WARNING) << "Attempted to destroy unknown container " << containerId;
// A nested container might have already been terminated, therefore
// `containers_` might not contain it, but its exit status might have
// been checkpointed.
return wait(containerId);
}
Container* container = containers_.at(containerId);
if (container->state == LAUNCHING || container->state == LAUNCHED) {
// Note that this method might be called between two successive attempts to
// launch a container using different containerizers. In this case, we will
// return `None`, because there is no underlying containerizer that is
// actually aware of launching a container.
container->state = DESTROYING;
}
CHECK_EQ(container->state, DESTROYING);
return container->containerizer->destroy(containerId)
.onAny(defer(self(), [=](const Future<Option<ContainerTermination>>&) {
if (containers_.contains(containerId)) {
delete containers_.at(containerId);
containers_.erase(containerId);
}
}));
}
Future<bool> ComposingContainerizerProcess::kill(
const ContainerID& containerId,
int signal)
{
if (!containers_.contains(containerId)) {
return false;
}
return containers_.at(containerId)->containerizer->kill(containerId, signal);
}
Future<hashset<ContainerID>> ComposingContainerizerProcess::containers()
{
return containers_.keys();
}
Future<Nothing> ComposingContainerizerProcess::remove(
const ContainerID& containerId)
{
// A precondition of this method is that the nested container has already
// been terminated, hence `containers_` won't contain it. To work around it,
// we use the containerizer that launched the root container to remove the
// nested container.
const ContainerID rootContainerId = protobuf::getRootContainerId(containerId);
if (!containers_.contains(rootContainerId)) {
return Failure(
"Root container " + stringify(rootContainerId) + " not found");
}
return containers_[rootContainerId]->containerizer->remove(containerId);
}
Future<Nothing> ComposingContainerizerProcess::pruneImages(
const vector<Image>& excludedImages)
{
vector<Future<Nothing>> futures;
foreach (Containerizer* containerizer, containerizers_) {
futures.push_back(containerizer->pruneImages(excludedImages));
}
return collect(futures)
.then([]() { return Nothing(); });
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {