blob: 15d059f0bbda4e8cb93c65c09327dde1e34d3e7b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <list>
#include <vector>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/os.hpp>
#include "slave/state.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/composing.hpp"
using std::list;
using std::string;
using std::vector;
using namespace process;
namespace mesos {
namespace internal {
namespace slave {
class ComposingContainerizerProcess
: public Process<ComposingContainerizerProcess>
{
public:
ComposingContainerizerProcess(
const vector<Containerizer*>& containerizers)
: containerizers_(containerizers) {}
virtual ~ComposingContainerizerProcess();
Future<Nothing> recover(
const Option<state::SlaveState>& state);
Future<bool> launch(
const ContainerID& containerId,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint);
Future<bool> launch(
const ContainerID& containerId,
const TaskInfo& taskInfo,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint);
Future<Nothing> update(
const ContainerID& containerId,
const Resources& resources);
Future<ResourceStatistics> usage(
const ContainerID& containerId);
Future<ContainerStatus> status(
const ContainerID& containerId);
Future<containerizer::Termination> wait(
const ContainerID& containerId);
void destroy(const ContainerID& containerId);
Future<hashset<ContainerID>> containers();
private:
// Continuations.
Future<Nothing> _recover();
Future<Nothing> __recover(
Containerizer* containerizer,
const hashset<ContainerID>& containers);
static Future<Nothing> ___recover();
Future<bool> _launch(
const ContainerID& containerId,
const Option<TaskInfo>& taskInfo,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint,
vector<Containerizer*>::iterator containerizer,
bool launched);
vector<Containerizer*> containerizers_;
// The states that the composing containerizer cares about for the
// container it is asked to launch.
enum State
{
LAUNCHING,
LAUNCHED,
DESTROYED
};
struct Container
{
State state;
Containerizer* containerizer;
};
hashmap<ContainerID, Container*> containers_;
};
Try<ComposingContainerizer*> ComposingContainerizer::create(
const vector<Containerizer*>& containerizers)
{
return new ComposingContainerizer(containerizers);
}
ComposingContainerizer::ComposingContainerizer(
const vector<Containerizer*>& containerizers)
{
process = new ComposingContainerizerProcess(containerizers);
spawn(process);
}
ComposingContainerizer::~ComposingContainerizer()
{
terminate(process);
process::wait(process);
delete process;
}
Future<Nothing> ComposingContainerizer::recover(
const Option<state::SlaveState>& state)
{
return dispatch(process, &ComposingContainerizerProcess::recover, state);
}
Future<bool> ComposingContainerizer::launch(
const ContainerID& containerId,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint)
{
return dispatch(process,
&ComposingContainerizerProcess::launch,
containerId,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint);
}
Future<bool> ComposingContainerizer::launch(
const ContainerID& containerId,
const TaskInfo& taskInfo,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint)
{
return dispatch(process,
&ComposingContainerizerProcess::launch,
containerId,
taskInfo,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint);
}
Future<Nothing> ComposingContainerizer::update(
const ContainerID& containerId,
const Resources& resources)
{
return dispatch(process,
&ComposingContainerizerProcess::update,
containerId,
resources);
}
Future<ResourceStatistics> ComposingContainerizer::usage(
const ContainerID& containerId)
{
return dispatch(process, &ComposingContainerizerProcess::usage, containerId);
}
Future<ContainerStatus> ComposingContainerizer::status(
const ContainerID& containerId)
{
return dispatch(process, &ComposingContainerizerProcess::status, containerId);
}
Future<containerizer::Termination> ComposingContainerizer::wait(
const ContainerID& containerId)
{
return dispatch(process, &ComposingContainerizerProcess::wait, containerId);
}
void ComposingContainerizer::destroy(const ContainerID& containerId)
{
dispatch(process, &ComposingContainerizerProcess::destroy, containerId);
}
Future<hashset<ContainerID>> ComposingContainerizer::containers()
{
return dispatch(process, &ComposingContainerizerProcess::containers);
}
ComposingContainerizerProcess::~ComposingContainerizerProcess()
{
foreach (Containerizer* containerizer, containerizers_) {
delete containerizer;
}
foreachvalue (Container* container, containers_) {
delete container;
}
containerizers_.clear();
containers_.clear();
}
Future<Nothing> ComposingContainerizerProcess::recover(
const Option<state::SlaveState>& state)
{
// Recover each containerizer in parallel.
list<Future<Nothing>> futures;
foreach (Containerizer* containerizer, containerizers_) {
futures.push_back(containerizer->recover(state));
}
return collect(futures)
.then(defer(self(), &Self::_recover));
}
Future<Nothing> ComposingContainerizerProcess::_recover()
{
// Now collect all the running containers in order to multiplex.
list<Future<Nothing>> futures;
foreach (Containerizer* containerizer, containerizers_) {
Future<Nothing> future = containerizer->containers()
.then(defer(self(), &Self::__recover, containerizer, lambda::_1));
futures.push_back(future);
}
return collect(futures)
.then(lambda::bind(&Self::___recover));
}
Future<Nothing> ComposingContainerizerProcess::__recover(
Containerizer* containerizer,
const hashset<ContainerID>& containers)
{
foreach (const ContainerID& containerId, containers) {
Container* container = new Container();
container->state = LAUNCHED;
container->containerizer = containerizer;
containers_[containerId] = container;
}
return Nothing();
}
Future<Nothing> ComposingContainerizerProcess::___recover()
{
return Nothing();
}
Future<bool> ComposingContainerizerProcess::launch(
const ContainerID& containerId,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint)
{
if (containers_.contains(containerId)) {
return Failure("Container '" + containerId.value() +
"' is already launching");
}
// Try each containerizer. If none of them handle the
// TaskInfo/ExecutorInfo then return a Failure.
vector<Containerizer*>::iterator containerizer = containerizers_.begin();
Container* container = new Container();
container->state = LAUNCHING;
container->containerizer = *containerizer;
containers_[containerId] = container;
return (*containerizer)->launch(
containerId,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint)
.then(defer(self(),
&Self::_launch,
containerId,
None(),
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint,
containerizer,
lambda::_1));
}
Future<bool> ComposingContainerizerProcess::_launch(
const ContainerID& containerId,
const Option<TaskInfo>& taskInfo,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint,
vector<Containerizer*>::iterator containerizer,
bool launched)
{
// The container struct won't be cleaned up by destroy because
// in destroy we only forward the destroy, and wait until the
// launch returns and clean up here.
CHECK(containers_.contains(containerId));
Container* container = containers_[containerId];
if (container->state == DESTROYED) {
containers_.erase(containerId);
delete container;
return Failure("Container was destroyed while launching");
}
if (launched) {
container->state = LAUNCHED;
return true;
}
// Try the next containerizer.
++containerizer;
if (containerizer == containerizers_.end()) {
containers_.erase(containerId);
delete container;
return false;
}
container->containerizer = *containerizer;
Future<bool> f = taskInfo.isSome() ?
(*containerizer)->launch(
containerId,
taskInfo.get(),
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint) :
(*containerizer)->launch(
containerId,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint);
return f.then(
defer(self(),
&Self::_launch,
containerId,
taskInfo,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint,
containerizer,
lambda::_1));
}
Future<bool> ComposingContainerizerProcess::launch(
const ContainerID& containerId,
const TaskInfo& taskInfo,
const ExecutorInfo& executorInfo,
const string& directory,
const Option<string>& user,
const SlaveID& slaveId,
const PID<Slave>& slavePid,
bool checkpoint)
{
if (containers_.contains(containerId)) {
return Failure("Container '" + stringify(containerId) +
"' is already launching");
}
// Try each containerizer. If none of them handle the
// TaskInfo/ExecutorInfo then return a Failure.
vector<Containerizer*>::iterator containerizer = containerizers_.begin();
Container* container = new Container();
container->state = LAUNCHING;
container->containerizer = *containerizer;
containers_[containerId] = container;
return (*containerizer)->launch(
containerId,
taskInfo,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint)
.then(defer(self(),
&Self::_launch,
containerId,
taskInfo,
executorInfo,
directory,
user,
slaveId,
slavePid,
checkpoint,
containerizer,
lambda::_1));
}
Future<Nothing> ComposingContainerizerProcess::update(
const ContainerID& containerId,
const Resources& resources)
{
if (!containers_.contains(containerId)) {
return Failure("Container '" + containerId.value() + "' not found");
}
return containers_[containerId]->containerizer->update(
containerId, resources);
}
Future<ResourceStatistics> ComposingContainerizerProcess::usage(
const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
return Failure("Container '" + containerId.value() + "' not found");
}
return containers_[containerId]->containerizer->usage(containerId);
}
Future<ContainerStatus> ComposingContainerizerProcess::status(
const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
return Failure("Container " + containerId.value() + " not found");
}
return containers_[containerId]->containerizer->status(containerId);
}
Future<containerizer::Termination> ComposingContainerizerProcess::wait(
const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
return Failure("Container '" + containerId.value() + "' not found");
}
return containers_[containerId]->containerizer->wait(containerId);
}
void ComposingContainerizerProcess::destroy(const ContainerID& containerId)
{
if (!containers_.contains(containerId)) {
LOG(WARNING) << "Container '" << containerId.value() << "' not found";
return;
}
Container* container = containers_[containerId];
if (container->state == DESTROYED) {
LOG(WARNING) << "Container '" << containerId.value()
<< "' is already destroyed";
return;
}
// It's ok to forward destroy to any containerizer which is currently
// launching the container, because we expect each containerizer to
// handle calling destroy on non-existing container.
// The composing containerizer will not move to the next
// containerizer for a container that is destroyed as well.
container->containerizer->destroy(containerId);
if (container->state == LAUNCHING) {
// Record the fact that this container was asked to be destroyed
// so that we won't try and launch this container using any other
// containerizers in the event the current containerizer has
// decided it can't launch the container.
container->state = DESTROYED;
return;
}
// If the container is launched, then we can simply cleanup.
containers_.erase(containerId);
delete container;
}
Future<hashset<ContainerID>> ComposingContainerizerProcess::containers()
{
return containers_.keys();
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {