blob: a87df9633fddcf35ba2767704465cbc24e4082e0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "csi/service_manager.hpp"
#include <algorithm>
#include <functional>
#include <list>
#include <utility>
#include <vector>
#include <mesos/http.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/agent/agent.hpp>
#include <process/after.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/process.hpp>
#include <process/timeout.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/os.hpp>
#include <stout/result.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/os/realpath.hpp>
#include "common/http.hpp"
#include "csi/paths.hpp"
#include "csi/v0_client.hpp"
#include "csi/v1_client.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "slave/container_daemon.hpp"
#include "slave/state.hpp"
namespace http = process::http;
namespace slave = mesos::internal::slave;
using std::list;
using std::string;
using std::vector;
using process::Break;
using process::Continue;
using process::ControlFlow;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
using process::ProcessBase;
using process::Promise;
using process::Timeout;
using process::grpc::StatusError;
using process::grpc::client::Runtime;
using slave::ContainerDaemon;
namespace mesos {
namespace csi {
// Timeout for a CSI plugin component to create its endpoint socket.
//
// TODO(chhsiao): Make the timeout configurable.
constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
// Returns the container ID of the specified CSI plugin container. The container
// ID is of the following format:
//
// <container_prefix><plugin_type>-<plugin_name>--<list_of_services>
//
// where <plugin_type> and <plugin_name> are the type and name of the CSI plugin
// with dots replaced by dashes, and <list_of_services> lists the CSI services
// provided by the component, concatenated with dashes.
static ContainerID getContainerId(
const CSIPluginInfo& info,
const string& containerPrefix,
const CSIPluginContainerInfo& container)
{
// NOTE: We cannot simply stringify `container.services()` since it returns
// `RepeatedField<int>`, so we reconstruct the list of services here.
vector<Service> services;
services.reserve(container.services_size());
for (int i = 0; i < container.services_size(); i++) {
services.push_back(container.services(i));
}
ContainerID containerId;
containerId.set_value(
containerPrefix +
strings::join("-", strings::replace(info.type(), ".", "-"), info.name()) +
"--" + strings::join("-", services));
return containerId;
}
class ServiceManagerProcess : public Process<ServiceManagerProcess>
{
public:
ServiceManagerProcess(
const http::URL& _agentUrl,
const string& _rootDir,
const CSIPluginInfo& _info,
const hashset<Service>& services,
const string& _containerPrefix,
const Option<string>& _authToken,
const Runtime& _runtime,
Metrics* _metrics);
Future<Nothing> recover();
Future<string> getServiceEndpoint(const Service& service);
Future<string> getApiVersion();
private:
// Returns the container info of the specified container for this CSI plugin.
Option<CSIPluginContainerInfo> getContainerInfo(
const ContainerID& containerId);
// Returns a map of any existing container of this CSI plugin to its status,
// or `None` if it does not have a status (e.g., being destroyed).
Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
// Waits for the specified plugin container to be terminated.
Future<Nothing> waitContainer(const ContainerID& containerId);
// Kills the specified plugin container.
Future<Nothing> killContainer(const ContainerID& containerId);
// Waits for the endpoint (URI to a Unix domain socket) to be created.
Future<Nothing> waitEndpoint(const string& endpoint);
// Probes the endpoint to detect the API version and check for readiness.
Future<Nothing> probeEndpoint(const string& endpoint);
// Returns the URI of the latest service endpoint for the specified plugin
// container. If the container is not already running, this method will start
// a new container.
Future<string> getEndpoint(const ContainerID& containerId);
const http::URL agentUrl;
const string rootDir;
const CSIPluginInfo info;
const string containerPrefix;
const Option<string> authToken;
const ContentType contentType;
Runtime runtime;
Metrics* metrics;
http::Headers headers;
Option<string> apiVersion;
hashmap<Service, ContainerID> serviceContainers;
hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
hashmap<ContainerID, Owned<Promise<string>>> endpoints;
};
ServiceManagerProcess::ServiceManagerProcess(
const http::URL& _agentUrl,
const string& _rootDir,
const CSIPluginInfo& _info,
const hashset<Service>& services,
const string& _containerPrefix,
const Option<string>& _authToken,
const Runtime& _runtime,
Metrics* _metrics)
: ProcessBase(process::ID::generate("csi-service-manager")),
agentUrl(_agentUrl),
rootDir(_rootDir),
info(_info),
containerPrefix(_containerPrefix),
authToken(_authToken),
contentType(ContentType::PROTOBUF),
runtime(_runtime),
metrics(_metrics)
{
headers["Accept"] = stringify(contentType);
if (authToken.isSome()) {
headers["Authorization"] = "Bearer " + authToken.get();
}
foreach (const Service& service, services) {
// Each service is served by the first container providing the service. See
// `CSIPluginInfo` in `mesos.proto` for details.
foreach (const CSIPluginContainerInfo& container, info.containers()) {
if (container.services().end() != std::find(
container.services().begin(),
container.services().end(),
service)) {
serviceContainers[service] =
getContainerId(info, containerPrefix, container);
break;
}
}
CHECK(serviceContainers.contains(service))
<< service << " not found for CSI plugin type '" << info.type()
<< "' and name '" << info.name() << "'";
}
}
Future<Nothing> ServiceManagerProcess::recover()
{
return getContainers()
.then(process::defer(self(), [=](
const hashmap<ContainerID, Option<ContainerStatus>>& containers)
-> Future<Nothing> {
Try<list<string>> containerPaths =
paths::getContainerPaths(rootDir, info.type(), info.name());
if (containerPaths.isError()) {
return Failure(
"Failed to find service container paths for CSI plugin type '" +
info.type() + "' and name '" + info.name() +
"': " + containerPaths.error());
}
vector<Future<Nothing>> futures;
foreach (const string& path, containerPaths.get()) {
Try<paths::ContainerPath> containerPath =
paths::parseContainerPath(rootDir, path);
if (containerPath.isError()) {
return Failure(
"Failed to parse service container path '" + path +
"': " + containerPath.error());
}
CHECK_EQ(info.type(), containerPath->type);
CHECK_EQ(info.name(), containerPath->name);
const ContainerID& containerId = containerPath->containerId;
// NOTE: Since `GET_CONTAINERS` might return containers that are being
// destroyed, to identify if the container is actually running, we check
// if the `executor_pid` field is set as a workaround.
bool isRunningContainer =
containers.contains(containerId) &&
containers.at(containerId).isSome() &&
containers.at(containerId)->has_executor_pid();
// Do not kill the up-to-date running controller or node container.
if (serviceContainers.contains_value(containerId) &&
isRunningContainer) {
const string configPath = paths::getContainerInfoPath(
rootDir, info.type(), info.name(), containerId);
if (os::exists(configPath)) {
Result<CSIPluginContainerInfo> config =
slave::state::read<CSIPluginContainerInfo>(configPath);
if (config.isError()) {
return Failure(
"Failed to read service container config from '" +
configPath + "': " + config.error());
}
if (config.isSome() &&
getContainerInfo(containerId) == config.get()) {
continue;
}
}
}
LOG(INFO) << "Cleaning up plugin container '" << containerId << "'";
// Otherwise, kill the container if it is running. We always wait for
// the container to be destroyed before performing the cleanup even if
// it is not killed here.
Future<Nothing> cleanup = Nothing();
if (containers.contains(containerId)) {
if (isRunningContainer) {
cleanup = killContainer(containerId);
}
cleanup = cleanup
.then(process::defer(self(), &Self::waitContainer, containerId));
}
cleanup = cleanup
.then(process::defer(self(), [=]() -> Future<Nothing> {
Result<string> endpointDir =
os::realpath(paths::getEndpointDirSymlinkPath(
rootDir, info.type(), info.name(), containerId));
if (endpointDir.isSome()) {
Try<Nothing> rmdir = os::rmdir(endpointDir.get());
if (rmdir.isError()) {
return Failure(
"Failed to remove endpoint directory '" +
endpointDir.get() + "': " + rmdir.error());
}
}
Try<Nothing> rmdir = os::rmdir(path);
if (rmdir.isError()) {
return Failure(
"Failed to remove plugin container directory '" + path +
"': " + rmdir.error());
}
return Nothing();
}));
futures.push_back(cleanup);
}
return process::collect(futures).then([] { return Nothing(); });
}));
}
Future<string> ServiceManagerProcess::getServiceEndpoint(const Service& service)
{
if (!serviceContainers.contains(service)) {
return Failure(
stringify(service) + " not found for CSI plugin type '" + info.type() +
"' and name '" + info.name() + "'");
}
return getEndpoint(serviceContainers.at(service));
}
Future<string> ServiceManagerProcess::getApiVersion()
{
if (apiVersion.isSome()) {
return apiVersion.get();
}
// Ensure that the plugin has been probed (which does the API version
// detection) through `getEndpoint` before returning the API version.
CHECK(!serviceContainers.empty());
return getEndpoint(serviceContainers.begin()->second)
.then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); }));
}
Option<CSIPluginContainerInfo> ServiceManagerProcess::getContainerInfo(
const ContainerID& containerId)
{
foreach (const auto& container, info.containers()) {
if (getContainerId(info, containerPrefix, container) == containerId) {
return container;
}
}
return None();
}
Future<hashmap<ContainerID, Option<ContainerStatus>>>
ServiceManagerProcess::getContainers()
{
agent::Call call;
call.set_type(agent::Call::GET_CONTAINERS);
call.mutable_get_containers()->set_show_nested(false);
call.mutable_get_containers()->set_show_standalone(true);
return http::post(
agentUrl,
headers,
internal::serialize(contentType, internal::evolve(call)),
stringify(contentType))
.then(process::defer(self(), [this](const http::Response& httpResponse)
-> Future<hashmap<ContainerID, Option<ContainerStatus>>> {
if (httpResponse.status != http::OK().status) {
return Failure(
"Failed to get containers: Unexpected response '" +
httpResponse.status + "' (" + httpResponse.body + ")");
}
Try<mesos::v1::agent::Response> v1Response =
internal::deserialize<mesos::v1::agent::Response>(
contentType, httpResponse.body);
if (v1Response.isError()) {
return Failure("Failed to get containers: " + v1Response.error());
}
hashmap<ContainerID, Option<ContainerStatus>> result;
agent::Response response = internal::devolve(v1Response.get());
foreach (const agent::Response::GetContainers::Container& container,
response.get_containers().containers()) {
// Container IDs of this CSI plugin must contain the given prefix. See
// `LocalResourceProvider::principal` for details.
if (!strings::startsWith(
container.container_id().value(), containerPrefix)) {
continue;
}
result.put(
container.container_id(),
container.has_container_status() ? container.container_status()
: Option<ContainerStatus>::none());
}
return std::move(result);
}));
}
Future<Nothing> ServiceManagerProcess::waitContainer(
const ContainerID& containerId)
{
agent::Call call;
call.set_type(agent::Call::WAIT_CONTAINER);
call.mutable_wait_container()->mutable_container_id()->CopyFrom(containerId);
return http::post(
agentUrl,
headers,
internal::serialize(contentType, internal::evolve(call)),
stringify(contentType))
.then([containerId](const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status &&
response.status != http::NotFound().status) {
return Failure(
"Failed to wait for container '" + stringify(containerId) +
"': Unexpected response '" + response.status + "' (" + response.body
+ ")");
}
return Nothing();
});
}
Future<Nothing> ServiceManagerProcess::killContainer(
const ContainerID& containerId)
{
agent::Call call;
call.set_type(agent::Call::KILL_CONTAINER);
call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId);
return http::post(
agentUrl,
headers,
internal::serialize(contentType, internal::evolve(call)),
stringify(contentType))
.then([containerId](const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status &&
response.status != http::NotFound().status) {
return Failure(
"Failed to kill container '" + stringify(containerId) +
"': Unexpected response '" + response.status + "' (" + response.body
+ ")");
}
return Nothing();
});
}
Future<Nothing> ServiceManagerProcess::waitEndpoint(const string& endpoint)
{
CHECK(strings::startsWith(endpoint, "unix://"));
const string endpointPath =
strings::remove(endpoint, "unix://", strings::PREFIX);
if (os::exists(endpointPath)) {
return Nothing();
}
// Wait for the endpoint socket to appear until the timeout expires.
Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
return process::loop(
[=]() -> Future<Nothing> {
if (timeout.expired()) {
return Failure("Timed out waiting for endpoint '" + endpoint + "'");
}
return process::after(Milliseconds(10));
},
[=](const Nothing&) -> ControlFlow<Nothing> {
if (os::exists(endpointPath)) {
return Break();
}
return Continue();
});
}
Future<Nothing> ServiceManagerProcess::probeEndpoint(const string& endpoint)
{
// Each probe function returns its API version if the probe is successful,
// an error if the API version is implemented but the probe fails, or a `None`
// if the API version is not implemented.
static const hashmap<
string,
std::function<Future<Result<string>>(const string&, const Runtime&)>>
probers = {
{v0::API_VERSION,
[](const string& endpoint, const Runtime& runtime) {
LOG(INFO) << "Probing endpoint '" << endpoint << "' with CSI v0";
return v0::Client(endpoint, runtime)
.probe(v0::ProbeRequest())
.then([](const v0::RPCResult<v0::ProbeResponse>& result) {
return result.isError()
? (result.error().status.error_code() == grpc::UNIMPLEMENTED
? Result<string>::none() : result.error())
: v0::API_VERSION;
});
}},
{v1::API_VERSION,
[](const string& endpoint, const Runtime& runtime) {
LOG(INFO) << "Probing endpoint '" << endpoint << "' with CSI v1";
return v1::Client(endpoint, runtime).probe(v1::ProbeRequest())
.then([](const v1::RPCResult<v1::ProbeResponse>& result) {
// TODO(chhsiao): Retry when `result->ready` is false.
return result.isError()
? (result.error().status.error_code() == grpc::UNIMPLEMENTED
? Result<string>::none() : result.error())
: v1::API_VERSION;
});
}},
};
++metrics->csi_plugin_rpcs_pending;
Future<Result<string>> probed;
if (apiVersion.isSome()) {
CHECK(probers.contains(apiVersion.get()));
probed = probers.at(apiVersion.get())(endpoint, runtime);
} else {
probed = probers.at(v1::API_VERSION)(endpoint, runtime)
.then(process::defer(self(), [=](const Result<string>& result) {
return result.isNone()
? probers.at(v0::API_VERSION)(endpoint, runtime) : result;
}));
}
return probed
.then(process::defer(self(), [=](
const Result<string>& result) -> Future<Nothing> {
if (result.isError()) {
return Failure(
"Failed to probe endpoint '" + endpoint + "': " + result.error());
}
if (result.isNone()) {
return Failure(
"Failed to probe endpoint '" + endpoint + "': Unknown API version");
}
if (apiVersion.isNone()) {
apiVersion = result.get();
} else if (apiVersion != result.get()) {
return Failure(
"Failed to probe endpoint '" + endpoint +
"': Inconsistent API version");
}
return Nothing();
}))
.onAny(process::defer(self(), [this](const Future<Nothing>& future) {
// We only update the metrics after the whole detection loop is done so
// it won't introduce much noise.
--metrics->csi_plugin_rpcs_pending;
if (future.isReady()) {
++metrics->csi_plugin_rpcs_finished;
} else if (future.isDiscarded()) {
++metrics->csi_plugin_rpcs_cancelled;
} else {
++metrics->csi_plugin_rpcs_failed;
}
}));
}
Future<string> ServiceManagerProcess::getEndpoint(
const ContainerID& containerId)
{
if (endpoints.contains(containerId)) {
return endpoints.at(containerId)->future();
}
CHECK(!daemons.contains(containerId));
Option<CSIPluginContainerInfo> config = getContainerInfo(containerId);
CHECK_SOME(config);
// We checkpoint the config first to keep track of the plugin container even
// if we fail to create its container daemon.
const string configPath =
paths::getContainerInfoPath(rootDir, info.type(), info.name(), containerId);
Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
if (checkpoint.isError()) {
return Failure(
"Failed to checkpoint plugin container config to '" + configPath +
"': " + checkpoint.error());
}
CommandInfo commandInfo;
if (config->has_command()) {
commandInfo = config->command();
}
// Set the `CSI_ENDPOINT` environment variable.
Try<string> endpointPath = paths::getEndpointSocketPath(
rootDir, info.type(), info.name(), containerId);
if (endpointPath.isError()) {
return Failure(
"Failed to resolve endpoint path for plugin container '" +
stringify(containerId) + "': " + endpointPath.error());
}
const string endpoint = "unix://" + endpointPath.get();
Environment::Variable* endpoint_ =
commandInfo.mutable_environment()->add_variables();
endpoint_->set_name("CSI_ENDPOINT");
endpoint_->set_value(endpoint);
ContainerInfo containerInfo;
if (config->has_container()) {
containerInfo = config->container();
} else {
containerInfo.set_type(ContainerInfo::MESOS);
}
// Prepare a volume where the endpoint socket will be placed.
const string endpointDir = Path(endpointPath.get()).dirname();
Volume* endpointVolume = containerInfo.add_volumes();
endpointVolume->set_mode(Volume::RW);
endpointVolume->set_container_path(endpointDir);
endpointVolume->set_host_path(endpointDir);
// Prepare the directory where the mount points will be placed.
const string mountRootDir =
paths::getMountRootDir(rootDir, info.type(), info.name());
Try<Nothing> mkdir = os::mkdir(mountRootDir);
if (mkdir.isError()) {
return Failure(
"Failed to create directory '" + mountRootDir + "': " + mkdir.error());
}
// Prepare a volume where the mount points will be placed.
Volume* mountVolume = containerInfo.add_volumes();
mountVolume->set_mode(Volume::RW);
mountVolume->set_container_path(mountRootDir);
mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
mountVolume->mutable_source()->mutable_host_path()->set_path(mountRootDir);
mountVolume->mutable_source()->mutable_host_path()
->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
CHECK(!endpoints.contains(containerId));
endpoints[containerId].reset(new Promise<string>());
Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
agentUrl,
authToken,
containerId,
std::move(commandInfo),
config->resources(),
std::move(containerInfo),
std::function<Future<Nothing>()>(
process::defer(self(), [=]() -> Future<Nothing> {
LOG(INFO)
<< "Connecting to endpoint '" << endpoint
<< "' of CSI plugin container " << containerId;
CHECK(endpoints.at(containerId)->associate(
waitEndpoint(endpoint)
.then(process::defer(self(), &Self::probeEndpoint, endpoint))
.then([endpoint]() -> string { return endpoint; })));
return endpoints.at(containerId)->future().then([] {
return Nothing();
});
})),
std::function<Future<Nothing>()>(
process::defer(self(), [=]() -> Future<Nothing> {
++metrics->csi_plugin_container_terminations;
endpoints.at(containerId)->discard();
endpoints.at(containerId).reset(new Promise<string>());
LOG(INFO)
<< "Disconnected from endpoint '" << endpoint
<< "' of CSI plugin container " << containerId;
const string endpointPath =
strings::remove(endpoint, "unix://", strings::PREFIX);
if (os::exists(endpointPath)) {
Try<Nothing> rm = os::rm(endpointPath);
if (rm.isError()) {
return Failure(
"Failed to remove endpoint socket '" + endpointPath +
"': " + rm.error());
}
}
return Nothing();
})));
if (daemon.isError()) {
return Failure(
"Failed to create container daemon for plugin container '" +
stringify(containerId) + "': " + daemon.error());
}
daemon.get()->wait()
.recover(process::defer(self(), [this, containerId](
const Future<Nothing>& future) {
LOG(ERROR)
<< "Container daemon for '" << containerId << "' failed: "
<< (future.isFailed() ? future.failure() : "future discarded");
// Fail or discard the corresponding endpoint promise if is has not been
// associated by the post-start hook above yet.
endpoints.at(containerId)->associate(
future.then([]() -> string { UNREACHABLE(); }));
return future;
}));
daemons.put(containerId, std::move(daemon.get()));
return endpoints.at(containerId)->future();
}
ServiceManager::ServiceManager(
const http::URL& agentUrl,
const string& rootDir,
const CSIPluginInfo& info,
const hashset<Service>& services,
const string& containerPrefix,
const Option<string>& authToken,
const Runtime& runtime,
Metrics* metrics)
: process(new ServiceManagerProcess(
agentUrl,
rootDir,
info,
services,
containerPrefix,
authToken,
runtime,
metrics))
{
process::spawn(CHECK_NOTNULL(process.get()));
recovered = process::dispatch(process.get(), &ServiceManagerProcess::recover);
}
ServiceManager::~ServiceManager()
{
recovered.discard();
process::terminate(process.get());
process::wait(process.get());
}
Future<Nothing> ServiceManager::recover()
{
return recovered;
}
Future<string> ServiceManager::getServiceEndpoint(const Service& service)
{
return recovered
.then(process::defer(
process.get(), &ServiceManagerProcess::getServiceEndpoint, service));
}
Future<string> ServiceManager::getApiVersion()
{
return recovered
.then(process::defer(process.get(), &ServiceManagerProcess::getApiVersion));
}
} // namespace csi {
} // namespace mesos {