blob: e1b0812b8b467ee4061b23d39552e2417d65a14a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "slave/container_daemon.hpp"
#include <process/defer.hpp>
#include <process/id.hpp>
#include <stout/lambda.hpp>
#include <stout/stringify.hpp>
#include <stout/unreachable.hpp>
#include "internal/evolve.hpp"
#include "slave/container_daemon_process.hpp"
namespace http = process::http;
using std::string;
using mesos::agent::Call;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
using process::Promise;
using process::defer;
namespace mesos {
namespace internal {
namespace slave {
// Returns the 'Bearer' credential as a header for calling the V1 agent
// API if the `authToken` is presented, or empty otherwise.
// TODO(chhsiao): Currently we assume the JWT authenticator is used for
// the agent operator API.
static inline http::Headers getAuthHeader(const Option<string>& authToken)
{
http::Headers headers;
if (authToken.isSome()) {
headers["Authorization"] = "Bearer " + authToken.get();
}
return headers;
}
ContainerDaemonProcess::ContainerDaemonProcess(
const http::URL& _agentUrl,
const Option<string>& _authToken,
const ContainerID& containerId,
const Option<CommandInfo>& commandInfo,
const Option<Resources>& resources,
const Option<ContainerInfo>& containerInfo,
const Option<std::function<Future<Nothing>()>>& _postStartHook,
const Option<std::function<Future<Nothing>()>>& _postStopHook)
: ProcessBase(process::ID::generate("container-daemon")),
agentUrl(_agentUrl),
authToken(_authToken),
contentType(ContentType::PROTOBUF),
postStartHook(_postStartHook),
postStopHook(_postStopHook)
{
launchCall.set_type(Call::LAUNCH_CONTAINER);
launchCall.mutable_launch_container()
->mutable_container_id()->CopyFrom(containerId);
if (commandInfo.isSome()) {
launchCall.mutable_launch_container()
->mutable_command()->CopyFrom(commandInfo.get());
}
if (resources.isSome()) {
launchCall.mutable_launch_container()
->mutable_resources()->CopyFrom(resources.get());
}
if (containerInfo.isSome()) {
launchCall.mutable_launch_container()
->mutable_container()->CopyFrom(containerInfo.get());
}
waitCall.set_type(Call::WAIT_CONTAINER);
waitCall.mutable_wait_container()->mutable_container_id()->CopyFrom(
containerId);
}
Future<Nothing> ContainerDaemonProcess::wait()
{
return terminated.future();
}
void ContainerDaemonProcess::initialize()
{
launchContainer();
}
void ContainerDaemonProcess::launchContainer()
{
const ContainerID& containerId = launchCall.launch_container().container_id();
LOG(INFO) << "Launching container '" << containerId << "'";
http::post(
agentUrl,
getAuthHeader(authToken),
serialize(contentType, evolve(launchCall)),
stringify(contentType))
.then(defer(self(), [=](
const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status &&
response.status != http::Accepted().status) {
return Failure(
"Failed to launch container '" +
stringify(launchCall.launch_container().container_id()) +
"': Unexpected response '" + response.status + "' (" +
response.body + ")");
}
if (postStartHook.isSome()) {
LOG(INFO)
<< "Invoking post-start hook for container '" << containerId << "'";
return postStartHook.get()();
}
return Nothing();
}))
.onReady(defer(self(), &Self::waitContainer))
.onFailed(defer(self(), [=](const string& failure) {
LOG(ERROR)
<< "Failed to launch container '"
<< launchCall.launch_container().container_id() << "': " << failure;
terminated.fail(failure);
}))
.onDiscarded(defer(self(), [=] {
LOG(ERROR)
<< "Failed to launch container '"
<< launchCall.launch_container().container_id()
<< "': future discarded";
terminated.discard();
}));
}
void ContainerDaemonProcess::waitContainer()
{
const ContainerID& containerId = waitCall.wait_container().container_id();
LOG(INFO) << "Waiting for container '" << containerId << "'";
http::post(
agentUrl,
getAuthHeader(authToken),
serialize(contentType, evolve(waitCall)),
stringify(contentType))
.then(defer(self(), [=](const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status &&
response.status != http::NotFound().status) {
return Failure(
"Failed to wait for container '" +
stringify(waitCall.wait_container().container_id()) +
"': Unexpected response '" + response.status + "' (" +
response.body + ")");
}
if (postStopHook.isSome()) {
LOG(INFO)
<< "Invoking post-stop hook for container '" << containerId << "'";
return postStopHook.get()();
}
return Nothing();
}))
.onReady(defer(self(), &Self::launchContainer))
.onFailed(defer(self(), [=](const string& failure) {
LOG(ERROR)
<< "Failed to wait for container '"
<< waitCall.wait_container().container_id() << "': " << failure;
terminated.fail(failure);
}))
.onDiscarded(defer(self(), [=] {
LOG(ERROR)
<< "Failed to wait for container '"
<< waitCall.wait_container().container_id() << "': future discarded";
terminated.discard();
}));
}
Try<Owned<ContainerDaemon>> ContainerDaemon::create(
const http::URL& agentUrl,
const Option<string>& authToken,
const ContainerID& containerId,
const Option<CommandInfo>& commandInfo,
const Option<Resources>& resources,
const Option<ContainerInfo>& containerInfo,
const Option<std::function<Future<Nothing>()>>& postStartHook,
const Option<std::function<Future<Nothing>()>>& postStopHook)
{
return Owned<ContainerDaemon>(new ContainerDaemon(
agentUrl,
authToken,
containerId,
commandInfo,
resources,
containerInfo,
postStartHook,
postStopHook));
}
ContainerDaemon::ContainerDaemon(
const http::URL& agentUrl,
const Option<string>& authToken,
const ContainerID& containerId,
const Option<CommandInfo>& commandInfo,
const Option<Resources>& resources,
const Option<ContainerInfo>& containerInfo,
const Option<std::function<Future<Nothing>()>>& postStartHook,
const Option<std::function<Future<Nothing>()>>& postStopHook)
: process(new ContainerDaemonProcess(
agentUrl,
authToken,
containerId,
commandInfo,
resources,
containerInfo,
postStartHook,
postStopHook))
{
spawn(CHECK_NOTNULL(process.get()));
}
ContainerDaemon::~ContainerDaemon()
{
process::terminate(process.get());
process::wait(process.get());
}
Future<Nothing> ContainerDaemon::wait()
{
return process->wait();
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {