blob: 36b6fc06b03c750dfe07698bea3afec36ee74ef3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "resource_provider/storage/provider.hpp"
#include <algorithm>
#include <cctype>
#include <cstdlib>
#include <functional>
#include <list>
#include <memory>
#include <numeric>
#include <queue>
#include <type_traits>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <process/after.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/grpc.hpp>
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/process.hpp>
#include <process/sequence.hpp>
#include <process/timeout.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/push_gauge.hpp>
#include <mesos/http.hpp>
#include <mesos/resources.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/resource_provider/resource_provider.hpp>
#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
#include <mesos/v1/resource_provider.hpp>
#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/unreachable.hpp>
#include <stout/uuid.hpp>
#include <stout/os/realpath.hpp>
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
#include "csi/client.hpp"
#include "csi/paths.hpp"
#include "csi/rpc.hpp"
#include "csi/state.hpp"
#include "csi/utils.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "resource_provider/detector.hpp"
#include "resource_provider/state.hpp"
#include "resource_provider/storage/provider_process.hpp"
#include "slave/container_daemon.hpp"
#include "slave/paths.hpp"
#include "slave/state.hpp"
#include "status_update_manager/operation.hpp"
namespace http = process::http;
using std::accumulate;
using std::find;
using std::list;
using std::queue;
using std::shared_ptr;
using std::string;
using std::vector;
using process::after;
using process::await;
using process::Break;
using process::collect;
using process::Continue;
using process::ControlFlow;
using process::defer;
using process::delay;
using process::dispatch;
using process::Failure;
using process::Future;
using process::loop;
using process::Owned;
using process::ProcessBase;
using process::Promise;
using process::Sequence;
using process::spawn;
using process::Timeout;
using process::grpc::StatusError;
using process::http::authentication::Principal;
using process::metrics::Counter;
using process::metrics::PushGauge;
using mesos::csi::state::VolumeState;
using mesos::internal::protobuf::convertLabelsToStringMap;
using mesos::internal::protobuf::convertStringMapToLabels;
using mesos::internal::slave::ContainerDaemon;
using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
using mesos::resource_provider::ResourceProviderState;
using mesos::v1::resource_provider::Driver;
namespace mesos {
namespace internal {
// Timeout for a CSI plugin component to create its endpoint socket.
//
// TODO(chhsiao): Make the timeout configurable.
constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
// Returns true if the string is a valid Java identifier.
static bool isValidName(const string& s)
{
if (s.empty()) {
return false;
}
foreach (const char c, s) {
if (!isalnum(c) && c != '_') {
return false;
}
}
return true;
}
// Returns true if the string is a valid Java package name.
static bool isValidType(const string& s)
{
if (s.empty()) {
return false;
}
foreach (const string& token, strings::split(s, ".")) {
if (!isValidName(token)) {
return false;
}
}
return true;
}
// Returns the container ID of the standalone container to run a CSI plugin
// component. The container ID is of the following format:
// <cid_prefix><csi_type>-<csi_name>--<list_of_services>
// where <cid_prefix> comes from the principal of the resource provider,
// <csi_type> and <csi_name> are the type and name of the CSI plugin, with dots
// replaced by dashes. <list_of_services> lists the CSI services provided by the
// component, concatenated with dashes.
static inline ContainerID getContainerId(
const ResourceProviderInfo& info,
const CSIPluginContainerInfo& container)
{
const Principal principal = LocalResourceProvider::principal(info);
CHECK(principal.claims.contains("cid_prefix"));
string value = principal.claims.at("cid_prefix") + strings::join(
"-",
strings::replace(info.storage().plugin().type(), ".", "-"),
info.storage().plugin().name(),
"");
for (int i = 0; i < container.services_size(); i++) {
value += "-" + stringify(container.services(i));
}
ContainerID containerId;
containerId.set_value(value);
return containerId;
}
static Option<CSIPluginContainerInfo> getCSIPluginContainerInfo(
const ResourceProviderInfo& info,
const ContainerID& containerId)
{
foreach (const CSIPluginContainerInfo& container,
info.storage().plugin().containers()) {
if (getContainerId(info, container) == containerId) {
return container;
}
}
return None();
}
// Returns the parent endpoint as a URL.
// TODO(jieyu): Consider using a more reliable way to get the agent v1
// operator API endpoint URL.
static inline http::URL extractParentEndpoint(const http::URL& url)
{
http::URL parent = url;
parent.path = Path(url.path).dirname();
return parent;
}
// Returns the 'Bearer' credential as a header for calling the V1 agent
// API if the `authToken` is presented, or empty otherwise.
static inline http::Headers getAuthHeader(const Option<string>& authToken)
{
http::Headers headers;
if (authToken.isSome()) {
headers["Authorization"] = "Bearer " + authToken.get();
}
return headers;
}
static inline Resource createRawDiskResource(
const ResourceProviderInfo& info,
const Bytes& capacity,
const Option<string>& profile,
const Option<string>& vendor,
const Option<string>& id = None(),
const Option<Labels>& metadata = None())
{
CHECK(info.has_id());
CHECK(info.has_storage());
Resource resource;
resource.set_name("disk");
resource.set_type(Value::SCALAR);
resource.mutable_scalar()
->set_value(static_cast<double>(capacity.bytes()) / Bytes::MEGABYTES);
resource.mutable_provider_id()->CopyFrom(info.id()),
resource.mutable_reservations()->CopyFrom(info.default_reservations());
Resource::DiskInfo::Source* source =
resource.mutable_disk()->mutable_source();
source->set_type(Resource::DiskInfo::Source::RAW);
if (profile.isSome()) {
source->set_profile(profile.get());
}
if (vendor.isSome()) {
source->set_vendor(vendor.get());
}
if (id.isSome()) {
source->set_id(id.get());
}
if (metadata.isSome()) {
source->mutable_metadata()->CopyFrom(metadata.get());
}
return resource;
}
StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
const http::URL& _url,
const string& _workDir,
const ResourceProviderInfo& _info,
const SlaveID& _slaveId,
const Option<string>& _authToken,
bool _strict)
: ProcessBase(process::ID::generate("storage-local-resource-provider")),
state(RECOVERING),
url(_url),
workDir(_workDir),
metaDir(slave::paths::getMetaRootDir(_workDir)),
contentType(ContentType::PROTOBUF),
info(_info),
vendor(
info.storage().plugin().type() + "." + info.storage().plugin().name()),
slaveId(_slaveId),
authToken(_authToken),
strict(_strict),
resourceVersion(id::UUID::random()),
sequence("storage-local-resource-provider-sequence"),
metrics("resource_providers/" + info.type() + "." + info.name() + "/")
{
diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
CHECK_NOTNULL(diskProfileAdaptor.get());
}
void StorageLocalResourceProviderProcess::connected()
{
CHECK_EQ(DISCONNECTED, state);
LOG(INFO) << "Connected to resource provider manager";
state = CONNECTED;
doReliableRegistration();
}
void StorageLocalResourceProviderProcess::disconnected()
{
CHECK(state == CONNECTED || state == SUBSCRIBED || state == READY);
LOG(INFO) << "Disconnected from resource provider manager";
state = DISCONNECTED;
statusUpdateManager.pause();
}
void StorageLocalResourceProviderProcess::received(const Event& event)
{
LOG(INFO) << "Received " << event.type() << " event";
switch (event.type()) {
case Event::SUBSCRIBED: {
CHECK(event.has_subscribed());
subscribed(event.subscribed());
break;
}
case Event::APPLY_OPERATION: {
CHECK(event.has_apply_operation());
applyOperation(event.apply_operation());
break;
}
case Event::PUBLISH_RESOURCES: {
CHECK(event.has_publish_resources());
publishResources(event.publish_resources());
break;
}
case Event::ACKNOWLEDGE_OPERATION_STATUS: {
CHECK(event.has_acknowledge_operation_status());
acknowledgeOperationStatus(event.acknowledge_operation_status());
break;
}
case Event::RECONCILE_OPERATIONS: {
CHECK(event.has_reconcile_operations());
reconcileOperations(event.reconcile_operations());
break;
}
case Event::UNKNOWN: {
LOG(WARNING) << "Received an UNKNOWN event and ignored";
break;
}
}
}
template <
csi::v0::RPC rpc,
typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call(
const ContainerID& containerId,
const csi::v0::Request<rpc>& request,
const bool retry)
{
Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
return loop(
self(),
[=] {
// Perform the call with the latest service future.
return getService(containerId)
.then(defer(
self(),
&StorageLocalResourceProviderProcess::_call<rpc>,
lambda::_1,
request));
},
[=](const Try<csi::v0::Response<rpc>, StatusError>& result) mutable
-> Future<ControlFlow<csi::v0::Response<rpc>>> {
Option<Duration> backoff = retry
? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
: Option<Duration>::none();
maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
// We dispatch `__call` for testing purpose.
return dispatch(
self(),
&StorageLocalResourceProviderProcess::__call<rpc>,
result,
backoff);
});
}
template <csi::v0::RPC rpc>
Future<Try<csi::v0::Response<rpc>, StatusError>>
StorageLocalResourceProviderProcess::_call(
csi::v0::Client client, const csi::v0::Request<rpc>& request)
{
++metrics.csi_plugin_rpcs_pending.at(rpc);
return client.call<rpc>(request)
.onAny(defer(self(), [=](
const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) {
--metrics.csi_plugin_rpcs_pending.at(rpc);
if (future.isReady() && future->isSome()) {
++metrics.csi_plugin_rpcs_successes.at(rpc);
} else if (future.isDiscarded()) {
++metrics.csi_plugin_rpcs_cancelled.at(rpc);
} else {
++metrics.csi_plugin_rpcs_errors.at(rpc);
}
}));
}
template <csi::v0::RPC rpc>
Future<ControlFlow<csi::v0::Response<rpc>>>
StorageLocalResourceProviderProcess::__call(
const Try<csi::v0::Response<rpc>, StatusError>& result,
const Option<Duration>& backoff)
{
if (result.isSome()) {
return Break(result.get());
}
if (backoff.isNone()) {
return Failure(result.error());
}
// See the link below for retryable status codes:
// https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
switch (result.error().status.error_code()) {
case grpc::DEADLINE_EXCEEDED:
case grpc::UNAVAILABLE: {
LOG(ERROR)
<< "Received '" << result.error() << "' while calling " << rpc
<< ". Retrying in " << backoff.get();
return after(backoff.get())
.then([]() -> Future<ControlFlow<csi::v0::Response<rpc>>> {
return Continue();
});
}
case grpc::CANCELLED:
case grpc::UNKNOWN:
case grpc::INVALID_ARGUMENT:
case grpc::NOT_FOUND:
case grpc::ALREADY_EXISTS:
case grpc::PERMISSION_DENIED:
case grpc::UNAUTHENTICATED:
case grpc::RESOURCE_EXHAUSTED:
case grpc::FAILED_PRECONDITION:
case grpc::ABORTED:
case grpc::OUT_OF_RANGE:
case grpc::UNIMPLEMENTED:
case grpc::INTERNAL:
case grpc::DATA_LOSS: {
return Failure(result.error());
}
case grpc::OK:
case grpc::DO_NOT_USE: {
UNREACHABLE();
}
}
UNREACHABLE();
}
void StorageLocalResourceProviderProcess::initialize()
{
Try<string> _bootId = os::bootId();
if (_bootId.isError()) {
LOG(ERROR) << "Failed to get boot ID: " << _bootId.error();
return fatal();
}
bootId = _bootId.get();
foreach (const CSIPluginContainerInfo& container,
info.storage().plugin().containers()) {
if (container.services().end() != find(
container.services().begin(),
container.services().end(),
CSIPluginContainerInfo::NODE_SERVICE)) {
nodeContainerId = getContainerId(info, container);
break;
}
}
CHECK_SOME(nodeContainerId);
foreach (const CSIPluginContainerInfo& container,
info.storage().plugin().containers()) {
if (container.services().end() != find(
container.services().begin(),
container.services().end(),
CSIPluginContainerInfo::CONTROLLER_SERVICE)) {
controllerContainerId = getContainerId(info, container);
break;
}
}
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to recover resource provider with type '" << info.type()
<< "' and name '" << info.name() << "': " << message;
fatal();
};
// NOTE: Most resource provider events rely on the plugins being
// prepared. To avoid race conditions, we connect to the agent after
// preparing the plugins.
recover()
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
void StorageLocalResourceProviderProcess::fatal()
{
// Force the disconnection early.
driver.reset();
process::terminate(self());
}
Future<Nothing> StorageLocalResourceProviderProcess::recover()
{
CHECK_EQ(RECOVERING, state);
return recoverServices()
.then(defer(self(), &Self::recoverVolumes))
.then(defer(self(), &Self::recoverResourceProviderState))
.then(defer(self(), [=]() -> Future<Nothing> {
LOG(INFO)
<< "Finished recovery for resource provider with type '" << info.type()
<< "' and name '" << info.name();
state = DISCONNECTED;
statusUpdateManager.pause();
driver.reset(new Driver(
Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
contentType,
defer(self(), &Self::connected),
defer(self(), &Self::disconnected),
defer(self(), [this](queue<v1::resource_provider::Event> events) {
while(!events.empty()) {
const v1::resource_provider::Event& event = events.front();
received(devolve(event));
events.pop();
}
}),
authToken));
driver->start();
return Nothing();
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
{
return getContainers()
.then(defer(self(), [=](
const hashmap<ContainerID, Option<ContainerStatus>>& runningContainers)
-> Future<Nothing> {
Try<list<string>> containerPaths = csi::paths::getContainerPaths(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name());
if (containerPaths.isError()) {
return Failure(
"Failed to find plugin containers for CSI plugin type '" +
info.storage().plugin().type() + "' and name '" +
info.storage().plugin().name() + "': " +
containerPaths.error());
}
vector<Future<Nothing>> futures;
foreach (const string& path, containerPaths.get()) {
Try<csi::paths::ContainerPath> containerPath =
csi::paths::parseContainerPath(
slave::paths::getCsiRootDir(workDir),
path);
if (containerPath.isError()) {
return Failure(
"Failed to parse container path '" + path + "': " +
containerPath.error());
}
CHECK_EQ(info.storage().plugin().type(), containerPath->type);
CHECK_EQ(info.storage().plugin().name(), containerPath->name);
const ContainerID& containerId = containerPath->containerId;
// NOTE: Since `getContainers` might return containers that are not
// actually running, to identify if the container is actually running,
// we check if the `executor_pid` field is set as a workaround.
bool isRunningContainer = runningContainers.contains(containerId) &&
runningContainers.at(containerId).isSome() &&
runningContainers.at(containerId)->has_executor_pid();
// Do not kill the up-to-date running controller or node container.
if ((nodeContainerId == containerId ||
controllerContainerId == containerId) && isRunningContainer) {
const string configPath = csi::paths::getContainerInfoPath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name(),
containerId);
if (os::exists(configPath)) {
Result<CSIPluginContainerInfo> config =
slave::state::read<CSIPluginContainerInfo>(configPath);
if (config.isError()) {
return Failure(
"Failed to read plugin container config from '" + configPath +
"': " + config.error());
}
if (config.isSome() &&
getCSIPluginContainerInfo(info, containerId) == config.get()) {
continue;
}
}
}
LOG(INFO) << "Cleaning up plugin container '" << containerId << "'";
// Otherwise, kill the container only if it is actually running (i.e.,
// not already being destroyed), then wait for the container to be
// destroyed before performing the cleanup despite if we kill it.
Future<Nothing> cleanup = Nothing();
if (runningContainers.contains(containerId)) {
if (isRunningContainer) {
cleanup = killContainer(containerId);
}
cleanup = cleanup
.then(defer(self(), &Self::waitContainer, containerId));
}
cleanup = cleanup
.then(defer(self(), [=]() -> Future<Nothing> {
Result<string> endpointDir =
os::realpath(csi::paths::getEndpointDirSymlinkPath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name(),
containerId));
if (endpointDir.isSome()) {
Try<Nothing> rmdir = os::rmdir(endpointDir.get());
if (rmdir.isError()) {
return Failure(
"Failed to remove endpoint directory '" +
endpointDir.get() + "': " + rmdir.error());
}
}
Try<Nothing> rmdir = os::rmdir(path);
if (rmdir.isError()) {
return Failure(
"Failed to remove plugin container directory '" + path +
"': " + rmdir.error());
}
return Nothing();
}));
futures.push_back(cleanup);
}
return collect(futures).then([] { return Nothing(); });
}))
// NOTE: The `Controller` service is supported if the plugin has the
// `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported if
// the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability.
// So we first launch the node plugin to get the plugin capabilities, then
// decide if we need to launch the controller plugin and get the node ID.
.then(defer(self(), &Self::prepareIdentityService))
.then(defer(self(), &Self::prepareControllerService))
.then(defer(self(), &Self::prepareNodeService));
}
Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
{
// Recover the states of CSI volumes.
Try<list<string>> volumePaths = csi::paths::getVolumePaths(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name());
if (volumePaths.isError()) {
return Failure(
"Failed to find volumes for CSI plugin type '" +
info.storage().plugin().type() + "' and name '" +
info.storage().plugin().name() + "': " + volumePaths.error());
}
vector<Future<Nothing>> futures;
foreach (const string& path, volumePaths.get()) {
Try<csi::paths::VolumePath> volumePath =
csi::paths::parseVolumePath(slave::paths::getCsiRootDir(workDir), path);
if (volumePath.isError()) {
return Failure(
"Failed to parse volume path '" + path + "': " + volumePath.error());
}
CHECK_EQ(info.storage().plugin().type(), volumePath->type);
CHECK_EQ(info.storage().plugin().name(), volumePath->name);
const string& volumeId = volumePath->volumeId;
const string statePath = csi::paths::getVolumeStatePath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name(),
volumeId);
if (!os::exists(statePath)) {
continue;
}
Result<VolumeState> volumeState =
slave::state::read<VolumeState>(statePath);
if (volumeState.isError()) {
return Failure(
"Failed to read volume state from '" + statePath + "': " +
volumeState.error());
}
if (volumeState.isSome()) {
volumes.put(volumeId, std::move(volumeState.get()));
// To avoid any race with, e.g., `deleteVolume` calls, we sequentialize
// this lambda with any other operation on the same volume below, so the
// volume is guaranteed to exist in the deferred execution.
std::function<Future<Nothing>()> recoverVolume = defer(self(), [=]()
-> Future<Nothing> {
VolumeData& volume = volumes.at(volumeId);
Future<Nothing> recovered = Nothing();
// First, bring the volume back to a "good" state.
if (VolumeState::State_IsValid(volume.state.state())) {
switch (volume.state.state()) {
case VolumeState::CREATED:
case VolumeState::NODE_READY: {
break;
}
case VolumeState::VOL_READY:
case VolumeState::PUBLISHED: {
if (volume.state.boot_id() != bootId) {
// The node has been restarted since the volume is made
// publishable, so it is reset to `NODE_READY` state.
volume.state.set_state(VolumeState::NODE_READY);
volume.state.clear_boot_id();
checkpointVolumeState(volumeId);
}
break;
}
case VolumeState::CONTROLLER_PUBLISH: {
recovered = controllerPublish(volumeId);
break;
}
case VolumeState::CONTROLLER_UNPUBLISH: {
recovered = controllerUnpublish(volumeId);
break;
}
case VolumeState::NODE_STAGE: {
recovered = nodeStage(volumeId);
break;
}
case VolumeState::NODE_UNSTAGE: {
if (volume.state.boot_id() != bootId) {
// The node has been restarted since the volume is made
// publishable, so it is reset to `NODE_READY` state.
volume.state.set_state(VolumeState::NODE_READY);
volume.state.clear_boot_id();
checkpointVolumeState(volumeId);
} else {
recovered = nodeUnstage(volumeId);
}
break;
}
case VolumeState::NODE_PUBLISH: {
if (volume.state.boot_id() != bootId) {
// The node has been restarted since the volume is made
// publishable, so it is reset to `NODE_READY` state.
volume.state.set_state(VolumeState::NODE_READY);
volume.state.clear_boot_id();
checkpointVolumeState(volumeId);
} else {
recovered = nodePublish(volumeId);
}
break;
}
case VolumeState::NODE_UNPUBLISH: {
if (volume.state.boot_id() != bootId) {
// The node has been restarted since the volume is made
// publishable, so it is reset to `NODE_READY` state.
volume.state.set_state(VolumeState::NODE_READY);
volume.state.clear_boot_id();
checkpointVolumeState(volumeId);
} else {
recovered = nodeUnpublish(volumeId);
}
break;
}
case VolumeState::UNKNOWN: {
return Failure(
"Volume '" + volumeId + "' is in " +
stringify(volume.state.state()) + " state");
}
// NOTE: We avoid using a default clause for the following values in
// proto3's open enum to enable the compiler to detect missing enum
// cases for us. See: https://github.com/google/protobuf/issues/3917
case google::protobuf::kint32min:
case google::protobuf::kint32max: {
UNREACHABLE();
}
}
} else {
return Failure("Volume '" + volumeId + "' is in UNDEFINED state");
}
auto err = [](const string& volumeId, const string& message) {
LOG(ERROR)
<< "Failed to recover volume '" << volumeId << "': " << message;
};
// Second, if the volume has been used by a container before recovery,
// we have to bring the volume back to `PUBLISHED` so data can be
// cleaned up synchronously upon `DESTROY`. Otherwise, we skip the error
// and continue recovery.
if (volume.state.node_publish_required()) {
recovered = recovered
.then(defer(self(), [this, volumeId]() -> Future<Nothing> {
const VolumeData& volume = volumes.at(volumeId);
Future<Nothing> published = Nothing();
CHECK(VolumeState::State_IsValid(volume.state.state()));
switch (volume.state.state()) {
case VolumeState::NODE_READY: {
published = published
.then(defer(self(), &Self::nodeStage, volumeId));
// NOTE: We continue to the next case to recover the volume in
// `VOL_READY` state once the above is done.
}
case VolumeState::VOL_READY: {
published = published
.then(defer(self(), &Self::nodePublish, volumeId));
// NOTE: We continue to the next case to recover the volume in
// `PUBLISHED` state once the above is done.
}
case VolumeState::PUBLISHED: {
break;
}
case VolumeState::UNKNOWN:
case VolumeState::CREATED:
case VolumeState::CONTROLLER_PUBLISH:
case VolumeState::CONTROLLER_UNPUBLISH:
case VolumeState::NODE_STAGE:
case VolumeState::NODE_UNSTAGE:
case VolumeState::NODE_PUBLISH:
case VolumeState::NODE_UNPUBLISH: {
UNREACHABLE();
}
// NOTE: We avoid using a default clause for the following
// values in proto3's open enum to enable the compiler to detect
// missing enum cases for us. See:
// https://github.com/google/protobuf/issues/3917
case google::protobuf::kint32min:
case google::protobuf::kint32max: {
UNREACHABLE();
}
}
return published;
}))
.onFailed(std::bind(err, volumeId, lambda::_1))
.onDiscarded(std::bind(err, volumeId, "future discarded"));
} else {
recovered = recovered
.onFailed(std::bind(err, volumeId, lambda::_1))
.onDiscarded(std::bind(err, volumeId, "future discarded"))
.recover([](const Future<Nothing>& future) { return Nothing(); });
}
return recovered;
});
futures.push_back(volumes.at(volumeId).sequence->add(recoverVolume));
}
}
// Garbage collect leftover mount paths that were failed to remove before.
const string mountRootDir = csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name());
Try<list<string>> mountPaths = csi::paths::getMountPaths(mountRootDir);
if (mountPaths.isError()) {
// TODO(chhsiao): This could indicate that something is seriously wrong. To
// help debugging the problem, we should surface the error via MESOS-8745.
return Failure(
"Failed to find mount paths for CSI plugin type '" +
info.storage().plugin().type() + "' and name '" +
info.storage().plugin().name() + "': " + mountPaths.error());
}
foreach (const string& path, mountPaths.get()) {
Try<string> volumeId = csi::paths::parseMountPath(mountRootDir, path);
if (volumeId.isError()) {
return Failure(
"Failed to parse mount path '" + path + "': " + volumeId.error());
}
if (!volumes.contains(volumeId.get())) {
garbageCollectMountPath(volumeId.get());
}
}
return collect(futures).then([] { return Nothing(); });
}
Future<Nothing>
StorageLocalResourceProviderProcess::recoverResourceProviderState()
{
// Recover the resource provider ID and state from the latest
// symlink. If the symlink does not exist, this is a new resource
// provider, and the total resources will be empty, which is fine
// since new resources will be added during reconciliation.
Result<string> realpath = os::realpath(
slave::paths::getLatestResourceProviderPath(
metaDir, slaveId, info.type(), info.name()));
if (realpath.isError()) {
return Failure(
"Failed to read the latest symlink for resource provider with type '" +
info.type() + "' and name '" + info.name() + "': " + realpath.error());
}
if (realpath.isSome()) {
info.mutable_id()->set_value(Path(realpath.get()).basename());
const string statePath = slave::paths::getResourceProviderStatePath(
metaDir, slaveId, info.type(), info.name(), info.id());
if (!os::exists(statePath)) {
return Nothing();
}
Result<ResourceProviderState> resourceProviderState =
slave::state::read<ResourceProviderState>(statePath);
if (resourceProviderState.isError()) {
return Failure(
"Failed to read resource provider state from '" + statePath +
"': " + resourceProviderState.error());
}
if (resourceProviderState.isSome()) {
foreach (const Operation& operation,
resourceProviderState->operations()) {
Try<id::UUID> uuid = id::UUID::fromBytes(operation.uuid().value());
CHECK_SOME(uuid);
operations[uuid.get()] = operation;
}
totalResources = resourceProviderState->resources();
const ResourceProviderState::Storage& storage =
resourceProviderState->storage();
using ProfileEntry = google::protobuf::MapPair<
string, ResourceProviderState::Storage::ProfileInfo>;
foreach (const ProfileEntry& entry, storage.profiles()) {
profileInfos.put(
entry.first,
{entry.second.capability(), entry.second.parameters()});
}
// We only checkpoint profiles associated with storage pools (i.e.,
// resources without IDs) in `checkpointResourceProviderState` as only
// these profiles might be used by pending operations, so we validate here
// that all such profiles exist.
foreach (const Resource& resource, totalResources) {
if (!resource.disk().source().has_id() &&
resource.disk().source().has_profile() &&
!profileInfos.contains(resource.disk().source().profile())) {
return Failure(
"Cannot recover profile for storage pool '" +
stringify(resource) + "' from '" + statePath + "'");
}
}
}
}
return Nothing();
}
void StorageLocalResourceProviderProcess::doReliableRegistration()
{
if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
return;
}
CHECK_EQ(CONNECTED, state);
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_resource_provider_info()->CopyFrom(info);
auto err = [](const ResourceProviderInfo& info, const string& message) {
LOG(ERROR)
<< "Failed to subscribe resource provider with type '" << info.type()
<< "' and name '" << info.name() << "': " << message;
};
driver->send(evolve(call))
.onFailed(std::bind(err, info, lambda::_1))
.onDiscarded(std::bind(err, info, "future discarded"));
// TODO(chhsiao): Consider doing an exponential backoff.
delay(Seconds(1), self(), &Self::doReliableRegistration);
}
Future<Nothing>
StorageLocalResourceProviderProcess::reconcileResourceProviderState()
{
return reconcileOperationStatuses()
.then(defer(self(), [=] {
return collect(vector<Future<Resources>>{listVolumes(), getCapacities()})
.then(defer(self(), [=](const vector<Resources>& discovered) {
ResourceConversion conversion = reconcileResources(
totalResources,
accumulate(discovered.begin(), discovered.end(), Resources()));
Try<Resources> result = totalResources.apply(conversion);
CHECK_SOME(result);
if (result.get() != totalResources) {
LOG(INFO)
<< "Removing '" << conversion.consumed << "' and adding '"
<< conversion.converted << "' to the total resources";
totalResources = result.get();
checkpointResourceProviderState();
}
// NOTE: Since this is the first `UPDATE_STATE` call of the
// current subscription, there must be no racing speculative
// operation, thus no need to update the resource version.
sendResourceProviderStateUpdate();
statusUpdateManager.resume();
LOG(INFO)
<< "Resource provider " << info.id() << " is in READY state";
state = READY;
return Nothing();
}));
}));
}
Future<Nothing>
StorageLocalResourceProviderProcess::reconcileOperationStatuses()
{
CHECK(info.has_id());
const string resourceProviderDir = slave::paths::getResourceProviderPath(
metaDir, slaveId, info.type(), info.name(), info.id());
statusUpdateManager.initialize(
defer(self(), &Self::sendOperationStatusUpdate, lambda::_1),
std::bind(
&slave::paths::getOperationUpdatesPath,
resourceProviderDir,
lambda::_1));
Try<list<string>> operationPaths = slave::paths::getOperationPaths(
slave::paths::getResourceProviderPath(
metaDir, slaveId, info.type(), info.name(), info.id()));
if (operationPaths.isError()) {
return Failure(
"Failed to find operations for resource provider " +
stringify(info.id()) + ": " + operationPaths.error());
}
list<id::UUID> operationUuids;
foreach (const string& path, operationPaths.get()) {
Try<id::UUID> uuid =
slave::paths::parseOperationPath(resourceProviderDir, path);
if (uuid.isError()) {
return Failure(
"Failed to parse operation path '" + path + "': " +
uuid.error());
}
// NOTE: This could happen if we failed to remove the operation path before.
if (!operations.contains(uuid.get())) {
LOG(WARNING)
<< "Ignoring unknown operation (uuid: " << uuid.get()
<< ") for resource provider " << info.id();
garbageCollectOperationPath(uuid.get());
continue;
}
operationUuids.emplace_back(std::move(uuid.get()));
}
return statusUpdateManager.recover(operationUuids, strict)
.then(defer(self(), [=](
const OperationStatusUpdateManagerState& statusUpdateManagerState)
-> Future<Nothing> {
using StreamState =
typename OperationStatusUpdateManagerState::StreamState;
// Clean up the operations that are completed.
vector<id::UUID> completedOperations;
foreachpair (const id::UUID& uuid,
const Option<StreamState>& stream,
statusUpdateManagerState.streams) {
if (stream.isSome() && stream->terminated) {
operations.erase(uuid);
completedOperations.push_back(uuid);
}
}
// Garbage collect the operation streams after checkpointing.
checkpointResourceProviderState();
foreach (const id::UUID& uuid, completedOperations) {
garbageCollectOperationPath(uuid);
}
// Send updates for all missing statuses.
foreachpair (const id::UUID& uuid,
const Operation& operation,
operations) {
if (operation.latest_status().state() == OPERATION_PENDING) {
continue;
}
const int numStatuses =
statusUpdateManagerState.streams.contains(uuid) &&
statusUpdateManagerState.streams.at(uuid).isSome()
? statusUpdateManagerState.streams.at(uuid)->updates.size() : 0;
for (int i = numStatuses; i < operation.statuses().size(); i++) {
UpdateOperationStatusMessage update =
protobuf::createUpdateOperationStatusMessage(
protobuf::createUUID(uuid),
operation.statuses(i),
None(),
operation.has_framework_id()
? operation.framework_id() : Option<FrameworkID>::none(),
slaveId);
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to update status of operation (uuid: " << uuid << "): "
<< message;
fatal();
};
statusUpdateManager.update(std::move(update))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
}
// We replay all pending operations here, so that if a volume is
// created or deleted before the last failover, the result will be
// reflected in the total resources before reconciliation.
vector<Future<Nothing>> futures;
foreachpair (const id::UUID& uuid,
const Operation& operation,
operations) {
switch (operation.latest_status().state()) {
case OPERATION_PENDING:
++metrics.operations_pending.at(operation.info().type());
break;
case OPERATION_FINISHED:
++metrics.operations_finished.at(operation.info().type());
break;
case OPERATION_FAILED:
++metrics.operations_failed.at(operation.info().type());
break;
case OPERATION_DROPPED:
++metrics.operations_dropped.at(operation.info().type());
break;
case OPERATION_UNSUPPORTED:
case OPERATION_ERROR:
case OPERATION_UNREACHABLE:
case OPERATION_GONE_BY_OPERATOR:
case OPERATION_RECOVERING:
case OPERATION_UNKNOWN:
UNREACHABLE();
}
if (protobuf::isTerminalState(operation.latest_status().state())) {
continue;
}
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to apply operation (uuid: " << uuid << "): "
<< message;
};
futures.push_back(_applyOperation(uuid)
.onFailed(std::bind(err, uuid, lambda::_1))
.onDiscarded(std::bind(err, uuid, "future discarded")));
}
// We await the futures instead of collect them because it is OK
// for operations to fail.
return await(futures).then([] { return Nothing(); });
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
{
CHECK_PENDING(reconciled);
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to reconcile storage pools for resource provider " << info.id()
<< ": " << message;
fatal();
};
return getCapacities()
.then(defer(self(), [=](const Resources& discovered) {
ResourceConversion conversion = reconcileResources(
totalResources.filter(
[](const Resource& r) { return !r.disk().source().has_id(); }),
discovered);
Try<Resources> result = totalResources.apply(conversion);
CHECK_SOME(result);
if (result.get() != totalResources) {
LOG(INFO)
<< "Removing '" << conversion.consumed << "' and adding '"
<< conversion.converted << "' to the total resources";
totalResources = result.get();
checkpointResourceProviderState();
// NOTE: We always update the resource version before sending
// an `UPDATE_STATE`, so that any racing speculative operation
// will be rejected. Otherwise, the speculative resource
// conversion done on the master will be cancelled out.
resourceVersion = id::UUID::random();
sendResourceProviderStateUpdate();
}
return Nothing();
}))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
bool StorageLocalResourceProviderProcess::allowsReconciliation(
const Offer::Operation& operation)
{
switch (operation.type()) {
case Offer::Operation::RESERVE:
case Offer::Operation::UNRESERVE: {
Resources consumedStoragePools =
CHECK_NOTERROR(protobuf::getConsumedResources(operation))
.filter([](const Resource& r) {
return r.disk().source().has_profile() &&
r.disk().source().type() == Resource::DiskInfo::Source::RAW;
});
return consumedStoragePools.empty();
}
case Offer::Operation::CREATE:
case Offer::Operation::DESTROY: {
return true;
}
case Offer::Operation::CREATE_DISK:
case Offer::Operation::DESTROY_DISK: {
return false;
}
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME: {
// TODO(chhsiao): These operations are currently not supported for
// resource providers, and should have been validated by the master.
UNREACHABLE();
}
case Offer::Operation::UNKNOWN:
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP: {
UNREACHABLE();
}
}
UNREACHABLE();
}
ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
const Resources& checkpointed,
const Resources& discovered)
{
// NOTE: If a resource in the checkpointed resources is missing in the
// discovered resources, we will still keep it if it is converted by
// an operation before (i.e., has extra info other than the default
// reservations). The reason is that we want to maintain a consistent
// view with frameworks, and do not want to lose any data on
// persistent volumes due to some temporarily CSI plugin faults. Other
// missing resources that are "unconverted" by any framework will be
// removed. Then, any newly discovered resource will be added.
Resources toRemove;
Resources toAdd = discovered;
foreach (const Resource& resource, checkpointed) {
Resource unconverted = createRawDiskResource(
info,
Bytes(resource.scalar().value() * Bytes::MEGABYTES),
resource.disk().source().has_profile()
? resource.disk().source().profile() : Option<string>::none(),
resource.disk().source().has_vendor()
? resource.disk().source().vendor() : Option<string>::none(),
resource.disk().source().has_id()
? resource.disk().source().id() : Option<string>::none(),
resource.disk().source().has_metadata()
? resource.disk().source().metadata() : Option<Labels>::none());
if (toAdd.contains(unconverted)) {
// If the remaining of the discovered resources contain the
// "unconverted" version of a checkpointed resource, this is not a
// new resource.
toAdd -= unconverted;
} else if (checkpointed.contains(unconverted)) {
// If the remaining of the discovered resources does not contain
// the "unconverted" version of the checkpointed resource, the
// resource is missing. However, if it remains unconverted in the
// checkpoint, we can safely remove it from the total resources.
toRemove += unconverted;
} else {
LOG(WARNING)
<< "Missing converted resource '" << resource
<< "'. This might cause further operations to fail.";
}
}
return ResourceConversion(std::move(toRemove), std::move(toAdd));
}
void StorageLocalResourceProviderProcess::watchProfiles()
{
auto err = [](const string& message) {
LOG(ERROR) << "Failed to watch for DiskProfileAdaptor: " << message;
};
// TODO(chhsiao): Consider retrying with backoff.
loop(
self(),
[=] {
return diskProfileAdaptor->watch(profileInfos.keys(), info);
},
[=](const hashset<string>& profiles) {
CHECK(info.has_id());
LOG(INFO)
<< "Updating profiles " << stringify(profiles)
<< " for resource provider " << info.id();
std::function<Future<Nothing>()> update = defer(self(), [=] {
return updateProfiles(profiles)
.then(defer(self(), &Self::reconcileStoragePools));
});
// Update the profile mapping and storage pools in `sequence` to wait
// for any pending operation that disallow reconciliation or the last
// reconciliation (if any) to finish, and set up `reconciled` to drop
// incoming operations that disallow reconciliation until the storage
// pools are reconciled.
reconciled = sequence.add(update);
return reconciled
.then(defer(self(), [=]() -> ControlFlow<Nothing> {
return Continue();
}));
})
.onFailed(std::bind(err, lambda::_1))
.onDiscarded(std::bind(err, "future discarded"));
}
Future<Nothing> StorageLocalResourceProviderProcess::updateProfiles(
const hashset<string>& profiles)
{
// Remove disappeared profiles.
foreach (const string& profile, profileInfos.keys()) {
if (!profiles.contains(profile)) {
profileInfos.erase(profile);
}
}
// Translate and add newly appeared profiles.
vector<Future<Nothing>> futures;
foreach (const string& profile, profiles) {
// Since profiles are immutable after creation, we do not need to
// translate any profile that is already in the mapping.
if (profileInfos.contains(profile)) {
continue;
}
auto err = [](const string& profile, const string& message) {
LOG(ERROR)
<< "Failed to translate profile '" << profile << "': " << message;
};
futures.push_back(diskProfileAdaptor->translate(profile, info)
.then(defer(self(), [=](
const DiskProfileAdaptor::ProfileInfo& profileInfo) {
profileInfos.put(profile, profileInfo);
return Nothing();
}))
.onFailed(std::bind(err, profile, lambda::_1))
.onDiscarded(std::bind(err, profile, "future discarded")));
}
// We use `await` here to return a future that never fails, so the loop in
// `watchProfiles` will continue to watch for profile changes. If any profile
// translation fails, the profile will not be added to the set of known
// profiles and thus the disk profile adaptor will notify the resource
// provider again.
return await(futures).then([] { return Nothing(); });
}
void StorageLocalResourceProviderProcess::subscribed(
const Event::Subscribed& subscribed)
{
CHECK_EQ(CONNECTED, state);
LOG(INFO) << "Subscribed with ID " << subscribed.provider_id().value();
state = SUBSCRIBED;
if (!info.has_id()) {
// New subscription.
info.mutable_id()->CopyFrom(subscribed.provider_id());
slave::paths::createResourceProviderDirectory(
metaDir,
slaveId,
info.type(),
info.name(),
info.id());
}
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to reconcile resource provider " << info.id() << ": "
<< message;
fatal();
};
// Reconcile resources after obtaining the resource provider ID and start
// watching for profile changes after the reconciliation.
// TODO(chhsiao): Reconcile and watch for profile changes early.
reconciled = reconcileResourceProviderState()
.onReady(defer(self(), &Self::watchProfiles))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
void StorageLocalResourceProviderProcess::applyOperation(
const Event::ApplyOperation& operation)
{
CHECK(state == SUBSCRIBED || state == READY);
Try<id::UUID> uuid = id::UUID::fromBytes(operation.operation_uuid().value());
CHECK_SOME(uuid);
LOG(INFO)
<< "Received " << operation.info().type() << " operation '"
<< operation.info().id() << "' (uuid: " << uuid.get() << ")";
Option<FrameworkID> frameworkId = operation.has_framework_id()
? operation.framework_id() : Option<FrameworkID>::none();
if (state == SUBSCRIBED) {
return dropOperation(
uuid.get(),
frameworkId,
operation.info(),
"Cannot apply operation in SUBSCRIBED state");
}
if (reconciled.isPending() && !allowsReconciliation(operation.info())) {
return dropOperation(
uuid.get(),
frameworkId,
operation.info(),
"Cannot apply operation when reconciling storage pools");
}
Try<id::UUID> operationVersion =
id::UUID::fromBytes(operation.resource_version_uuid().value());
CHECK_SOME(operationVersion);
if (operationVersion.get() != resourceVersion) {
return dropOperation(
uuid.get(),
frameworkId,
operation.info(),
"Mismatched resource version " + stringify(operationVersion.get()) +
" (expected: " + stringify(resourceVersion) + ")");
}
CHECK(!operations.contains(uuid.get()));
operations[uuid.get()] = protobuf::createOperation(
operation.info(),
protobuf::createOperationStatus(
OPERATION_PENDING,
operation.info().has_id()
? operation.info().id() : Option<OperationID>::none(),
None(),
None(),
None(),
slaveId,
info.id()),
frameworkId,
slaveId,
protobuf::createUUID(uuid.get()));
checkpointResourceProviderState();
++metrics.operations_pending.at(operation.info().type());
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to apply operation (uuid: " << uuid << "): " << message;
};
_applyOperation(uuid.get())
.onFailed(std::bind(err, uuid.get(), lambda::_1))
.onDiscarded(std::bind(err, uuid.get(), "future discarded"));
}
void StorageLocalResourceProviderProcess::publishResources(
const Event::PublishResources& publish)
{
Option<Error> error;
hashset<string> volumeIds;
if (state == SUBSCRIBED) {
error = Error("Cannot publish resources in SUBSCRIBED state");
} else {
CHECK_EQ(READY, state);
Resources resources = publish.resources();
resources.unallocate();
foreach (const Resource& resource, resources) {
if (!totalResources.contains(resource)) {
error = Error(
"Cannot publish unknown resource '" + stringify(resource) + "'");
break;
}
switch (resource.disk().source().type()) {
case Resource::DiskInfo::Source::PATH:
case Resource::DiskInfo::Source::MOUNT:
case Resource::DiskInfo::Source::BLOCK: {
CHECK(resource.disk().source().has_id());
CHECK(volumes.contains(resource.disk().source().id()));
volumeIds.insert(resource.disk().source().id());
break;
}
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::RAW: {
error = Error(
"Cannot publish volume of " +
stringify(resource.disk().source().type()) + " type");
break;
}
}
}
}
Future<vector<Nothing>> allPublished;
if (error.isSome()) {
allPublished = Failure(error.get());
} else {
vector<Future<Nothing>> futures;
foreach (const string& volumeId, volumeIds) {
// We check the state of the volume along with the CSI calls
// atomically with respect to other publish or deletion requests
// for the same volume through dispatching the whole lambda on the
// volume's sequence.
std::function<Future<Nothing>()> controllerAndNodePublish =
defer(self(), [=] {
CHECK(volumes.contains(volumeId));
const VolumeData& volume = volumes.at(volumeId);
Future<Nothing> published = Nothing();
CHECK(VolumeState::State_IsValid(volume.state.state()));
switch (volume.state.state()) {
case VolumeState::CONTROLLER_UNPUBLISH: {
published = published
.then(defer(self(), &Self::controllerUnpublish, volumeId));
// NOTE: We continue to the next case to publish the volume in
// `CREATED` state once the above is done.
}
case VolumeState::CREATED:
case VolumeState::CONTROLLER_PUBLISH: {
published = published
.then(defer(self(), &Self::controllerPublish, volumeId))
.then(defer(self(), &Self::nodeStage, volumeId))
.then(defer(self(), &Self::nodePublish, volumeId));
break;
}
case VolumeState::NODE_UNSTAGE: {
published = published
.then(defer(self(), &Self::nodeUnstage, volumeId));
// NOTE: We continue to the next case to publish the volume in
// `NODE_READY` state once the above is done.
}
case VolumeState::NODE_READY:
case VolumeState::NODE_STAGE: {
published = published
.then(defer(self(), &Self::nodeStage, volumeId))
.then(defer(self(), &Self::nodePublish, volumeId));
break;
}
case VolumeState::NODE_UNPUBLISH: {
published = published
.then(defer(self(), &Self::nodeUnpublish, volumeId));
// NOTE: We continue to the next case to publish the volume in
// `VOL_READY` state once the above is done.
}
case VolumeState::VOL_READY:
case VolumeState::NODE_PUBLISH: {
published = published
.then(defer(self(), &Self::nodePublish, volumeId));
break;
}
case VolumeState::PUBLISHED: {
break;
}
case VolumeState::UNKNOWN: {
UNREACHABLE();
}
// NOTE: We avoid using a default clause for the following
// values in proto3's open enum to enable the compiler to detect
// missing enum cases for us. See:
// https://github.com/google/protobuf/issues/3917
case google::protobuf::kint32min:
case google::protobuf::kint32max: {
UNREACHABLE();
}
}
return published;
});
futures.push_back(
volumes.at(volumeId).sequence->add(controllerAndNodePublish));
}
allPublished = collect(futures);
}
allPublished
.onAny(defer(self(), [=](const Future<vector<Nothing>>& future) {
// TODO(chhsiao): Currently there is no way to reply to the
// resource provider manager with a failure message, so we log the
// failure here.
if (!future.isReady()) {
LOG(ERROR)
<< "Failed to publish resources '" << publish.resources() << "': "
<< (future.isFailed() ? future.failure() : "future discarded");
}
Call call;
call.mutable_resource_provider_id()->CopyFrom(info.id());
call.set_type(Call::UPDATE_PUBLISH_RESOURCES_STATUS);
Call::UpdatePublishResourcesStatus* update =
call.mutable_update_publish_resources_status();
update->mutable_uuid()->CopyFrom(publish.uuid());
update->set_status(future.isReady()
? Call::UpdatePublishResourcesStatus::OK
: Call::UpdatePublishResourcesStatus::FAILED);
auto err = [](const mesos::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to send status update for publish "
<< id::UUID::fromBytes(uuid.value()).get() << ": " << message;
};
driver->send(evolve(call))
.onFailed(std::bind(err, publish.uuid(), lambda::_1))
.onDiscarded(std::bind(err, publish.uuid(), "future discarded"));
}));
}
void StorageLocalResourceProviderProcess::acknowledgeOperationStatus(
const Event::AcknowledgeOperationStatus& acknowledge)
{
CHECK_EQ(READY, state);
Try<id::UUID> operationUuid =
id::UUID::fromBytes(acknowledge.operation_uuid().value());
CHECK_SOME(operationUuid);
Try<id::UUID> statusUuid =
id::UUID::fromBytes(acknowledge.status_uuid().value());
CHECK_SOME(statusUuid);
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to acknowledge status update for operation (uuid: " << uuid
<< "): " << message;
};
// NOTE: It is possible that an incoming acknowledgement races with an
// outgoing retry of status update, and then a duplicated
// acknowledgement will be received. In this case, the following call
// will fail, so we just leave an error log.
statusUpdateManager.acknowledgement(operationUuid.get(), statusUuid.get())
.then(defer(self(), [=](bool continuation) {
if (!continuation) {
operations.erase(operationUuid.get());
checkpointResourceProviderState();
garbageCollectOperationPath(operationUuid.get());
}
return Nothing();
}))
.onFailed(std::bind(err, operationUuid.get(), lambda::_1))
.onDiscarded(std::bind(err, operationUuid.get(), "future discarded"));
}
void StorageLocalResourceProviderProcess::reconcileOperations(
const Event::ReconcileOperations& reconcile)
{
CHECK_EQ(READY, state);
foreach (const mesos::UUID& operationUuid, reconcile.operation_uuids()) {
Try<id::UUID> uuid = id::UUID::fromBytes(operationUuid.value());
CHECK_SOME(uuid);
if (operations.contains(uuid.get())) {
// When the agent asks for reconciliation for a known operation,
// that means the `APPLY_OPERATION` event races with the last
// `UPDATE_STATE` call and arrives after the call. Since the event
// is received, nothing needs to be done here.
continue;
}
// TODO(chhsiao): Consider sending `OPERATION_UNKNOWN` instead.
dropOperation(
uuid.get(),
None(),
None(),
"Unknown operation");
}
}
Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
const string& endpoint)
{
Future<csi::v0::Client> service;
if (os::exists(endpoint)) {
service = csi::v0::Client("unix://" + endpoint, runtime);
} else {
// Wait for the endpoint socket to appear until the timeout expires.
Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
service = loop(
self(),
[=]() -> Future<Nothing> {
if (timeout.expired()) {
return Failure("Timed out waiting for endpoint '" + endpoint + "'");
}
return after(Milliseconds(10));
},
[=](const Nothing&) -> ControlFlow<csi::v0::Client> {
if (os::exists(endpoint)) {
return Break(csi::v0::Client("unix://" + endpoint, runtime));
}
return Continue();
});
}
return service
.then(defer(self(), [=](csi::v0::Client client) {
return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
.then([=](const Try<csi::v0::ProbeResponse, StatusError>& result)
-> Future<csi::v0::Client> {
if (result.isError()) {
return Failure(result.error());
}
return client;
});
}));
}
Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
const ContainerID& containerId)
{
if (daemons.contains(containerId)) {
CHECK(services.contains(containerId));
return services.at(containerId)->future();
}
Option<CSIPluginContainerInfo> config =
getCSIPluginContainerInfo(info, containerId);
CHECK_SOME(config);
// We checkpoint the config first to keep track of the plugin container even
// if we fail to create its container daemon.
const string configPath = csi::paths::getContainerInfoPath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name(),
containerId);
Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
if (checkpoint.isError()) {
return Failure(
"Failed to checkpoint plugin container config to '" + configPath +
"': " + checkpoint.error());
}
CommandInfo commandInfo;
if (config->has_command()) {
commandInfo.CopyFrom(config->command());
}
// Set the `CSI_ENDPOINT` environment variable.
Try<string> endpoint = csi::paths::getEndpointSocketPath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name(),
containerId);
if (endpoint.isError()) {
return Failure(
"Failed to resolve endpoint path for plugin container '" +
stringify(containerId) + "': " + endpoint.error());
}
const string& endpointPath = endpoint.get();
Environment::Variable* endpointVar =
commandInfo.mutable_environment()->add_variables();
endpointVar->set_name("CSI_ENDPOINT");
endpointVar->set_value("unix://" + endpointPath);
ContainerInfo containerInfo;
if (config->has_container()) {
containerInfo.CopyFrom(config->container());
} else {
containerInfo.set_type(ContainerInfo::MESOS);
}
// Prepare a volume where the endpoint socket will be placed.
const string endpointDir = Path(endpointPath).dirname();
Volume* endpointVolume = containerInfo.add_volumes();
endpointVolume->set_mode(Volume::RW);
endpointVolume->set_container_path(endpointDir);
endpointVolume->set_host_path(endpointDir);
// Prepare the directory where the mount points will be placed.
const string mountRootDir = csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name());
Try<Nothing> mkdir = os::mkdir(mountRootDir);
if (mkdir.isError()) {
return Failure(
"Failed to create directory '" + mountRootDir + "': " + mkdir.error());
}
// Prepare a volume where the mount points will be placed.
Volume* mountVolume = containerInfo.add_volumes();
mountVolume->set_mode(Volume::RW);
mountVolume->set_container_path(mountRootDir);
mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
mountVolume->mutable_source()->mutable_host_path()->set_path(mountRootDir);
mountVolume->mutable_source()->mutable_host_path()
->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
CHECK(!services.contains(containerId));
services[containerId].reset(new Promise<csi::v0::Client>());
Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
extractParentEndpoint(url),
authToken,
containerId,
commandInfo,
config->resources(),
containerInfo,
std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
LOG(INFO)
<< "CSI plugin container '" << containerId << "' started for plugin"
<< " type '" << info.storage().plugin().type() << "' and "
<< " name '" << info.storage().plugin().name() << "'";
CHECK(services.at(containerId)->associate(waitService(endpointPath)));
return services.at(containerId)->future()
.then([] { return Nothing(); });
})),
std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
++metrics.csi_plugin_container_terminations;
services.at(containerId)->discard();
services.at(containerId).reset(new Promise<csi::v0::Client>());
LOG(INFO)
<< "CSI plugin container '" << containerId << "' stopped for plugin"
<< " type '" << info.storage().plugin().type() << "' and "
<< " name '" << info.storage().plugin().name() << "'";
if (os::exists(endpointPath)) {
Try<Nothing> rm = os::rm(endpointPath);
if (rm.isError()) {
return Failure(
"Failed to remove endpoint '" + endpointPath + "': " +
rm.error());
}
}
return Nothing();
})));
if (daemon.isError()) {
return Failure(
"Failed to create container daemon for plugin container '" +
stringify(containerId) + "': " + daemon.error());
}
auto die = [=](const string& message) {
LOG(ERROR)
<< "Container daemon for '" << containerId << "' failed: " << message;
fatal();
};
daemons[containerId] = daemon.get();
daemon.get()->wait()
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
return services.at(containerId)->future();
}
Future<hashmap<ContainerID, Option<ContainerStatus>>>
StorageLocalResourceProviderProcess::getContainers()
{
agent::Call call;
call.set_type(agent::Call::GET_CONTAINERS);
call.mutable_get_containers()->set_show_nested(false);
call.mutable_get_containers()->set_show_standalone(true);
return http::post(
extractParentEndpoint(url),
getAuthHeader(authToken) +
http::Headers{{"Accept", stringify(contentType)}},
serialize(contentType, evolve(call)),
stringify(contentType))
.then(defer(self(), [=](const http::Response& httpResponse)
-> Future<hashmap<ContainerID, Option<ContainerStatus>>> {
hashmap<ContainerID, Option<ContainerStatus>> result;
if (httpResponse.status != http::OK().status) {
return Failure(
"Failed to get containers: Unexpected response '" +
httpResponse.status + "' (" + httpResponse.body + ")");
}
Try<v1::agent::Response> v1Response =
deserialize<v1::agent::Response>(contentType, httpResponse.body);
if (v1Response.isError()) {
return Failure("Failed to get containers: " + v1Response.error());
}
const Principal principal = LocalResourceProvider::principal(info);
CHECK(principal.claims.contains("cid_prefix"));
const string& cidPrefix = principal.claims.at("cid_prefix");
agent::Response response = devolve(v1Response.get());
foreach (const agent::Response::GetContainers::Container& container,
response.get_containers().containers()) {
if (strings::startsWith(container.container_id().value(), cidPrefix)) {
result.put(
container.container_id(),
container.has_container_status()
? container.container_status()
: Option<ContainerStatus>::none());
}
}
return result;
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
const ContainerID& containerId)
{
agent::Call call;
call.set_type(agent::Call::WAIT_CONTAINER);
call.mutable_wait_container()->mutable_container_id()->CopyFrom(containerId);
return http::post(
extractParentEndpoint(url),
getAuthHeader(authToken),
serialize(contentType, evolve(call)),
stringify(contentType))
.then([containerId](const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status &&
response.status != http::NotFound().status) {
return Failure(
"Failed to wait for container '" + stringify(containerId) +
"': Unexpected response '" + response.status + "' (" + response.body
+ ")");
}
return Nothing();
});
}
Future<Nothing> StorageLocalResourceProviderProcess::killContainer(
const ContainerID& containerId)
{
agent::Call call;
call.set_type(agent::Call::KILL_CONTAINER);
call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId);
return http::post(
extractParentEndpoint(url),
getAuthHeader(authToken),
serialize(contentType, evolve(call)),
stringify(contentType))
.then([containerId](const http::Response& response) -> Future<Nothing> {
if (response.status != http::OK().status &&
response.status != http::NotFound().status) {
return Failure(
"Failed to kill container '" + stringify(containerId) +
"': Unexpected response '" + response.status + "' (" + response.body
+ ")");
}
return Nothing();
});
}
Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
{
CHECK_SOME(nodeContainerId);
// Get the plugin info.
return call<csi::v0::GET_PLUGIN_INFO>(
nodeContainerId.get(), csi::v0::GetPluginInfoRequest())
.then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
pluginInfo = response;
LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
// Get the plugin capabilities.
return call<csi::v0::GET_PLUGIN_CAPABILITIES>(
nodeContainerId.get(), csi::v0::GetPluginCapabilitiesRequest());
}))
.then(defer(self(), [=](
const csi::v0::GetPluginCapabilitiesResponse& response) {
pluginCapabilities = response.capabilities();
return Nothing();
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
{
CHECK_SOME(pluginInfo);
if (!pluginCapabilities.controllerService) {
return Nothing();
}
if (controllerContainerId.isNone()) {
return Failure(
stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
}
// Get the controller plugin info and check for consistency.
return call<csi::v0::GET_PLUGIN_INFO>(
controllerContainerId.get(), csi::v0::GetPluginInfoRequest())
.then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
LOG(INFO) << "Controller plugin loaded: " << stringify(response);
if (pluginInfo->name() != response.name() ||
pluginInfo->vendor_version() != response.vendor_version()) {
LOG(WARNING)
<< "Inconsistent controller and node plugin components. Please check "
"with the plugin vendor to ensure compatibility.";
}
// Get the controller capabilities.
return call<csi::v0::CONTROLLER_GET_CAPABILITIES>(
controllerContainerId.get(),
csi::v0::ControllerGetCapabilitiesRequest());
}))
.then(defer(self(), [=](
const csi::v0::ControllerGetCapabilitiesResponse& response) {
controllerCapabilities = response.capabilities();
return Nothing();
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
{
CHECK_SOME(nodeContainerId);
// Get the node capabilities.
return call<csi::v0::NODE_GET_CAPABILITIES>(
nodeContainerId.get(), csi::v0::NodeGetCapabilitiesRequest())
.then(defer(self(), [=](
const csi::v0::NodeGetCapabilitiesResponse& response)
-> Future<Nothing> {
nodeCapabilities = response.capabilities();
if (!controllerCapabilities.publishUnpublishVolume) {
return Nothing();
}
// Get the node ID.
return call<csi::v0::NODE_GET_ID>(
nodeContainerId.get(), csi::v0::NodeGetIdRequest())
.then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) {
nodeId = response.node_id();
return Nothing();
}));
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
const string& volumeId)
{
CHECK(volumes.contains(volumeId));
VolumeData& volume = volumes.at(volumeId);
if (!controllerCapabilities.publishUnpublishVolume) {
CHECK_EQ(VolumeState::CREATED, volume.state.state());
volume.state.set_state(VolumeState::NODE_READY);
checkpointVolumeState(volumeId);
return Nothing();
}
if (volume.state.state() == VolumeState::CREATED) {
volume.state.set_state(VolumeState::CONTROLLER_PUBLISH);
checkpointVolumeState(volumeId);
}
CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state());
CHECK_SOME(nodeId);
csi::v0::ControllerPublishVolumeRequest request;
request.set_volume_id(volumeId);
request.set_node_id(nodeId.get());
*request.mutable_volume_capability() = volume.state.volume_capability();
request.set_readonly(false);
*request.mutable_volume_attributes() = volume.state.volume_attributes();
CHECK_SOME(controllerContainerId);
return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
controllerContainerId.get(), std::move(request))
.then(defer(self(), [this, volumeId](
const csi::v0::ControllerPublishVolumeResponse& response) {
VolumeData& volume = volumes.at(volumeId);
volume.state.set_state(VolumeState::NODE_READY);
*volume.state.mutable_publish_info() = response.publish_info();
checkpointVolumeState(volumeId);
return Nothing();
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
const string& volumeId)
{
CHECK(volumes.contains(volumeId));
VolumeData& volume = volumes.at(volumeId);
if (!controllerCapabilities.publishUnpublishVolume) {
CHECK_EQ(VolumeState::NODE_READY, volume.state.state());
volume.state.set_state(VolumeState::CREATED);
checkpointVolumeState(volumeId);
return Nothing();
}
// A previously failed `ControllerPublishVolume` call can be recovered through
// the current `ControllerUnpublishVolume` call. See:
// https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
if (volume.state.state() == VolumeState::NODE_READY ||
volume.state.state() == VolumeState::CONTROLLER_PUBLISH) {
volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH);
checkpointVolumeState(volumeId);
}
CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state());
CHECK_SOME(nodeId);
csi::v0::ControllerUnpublishVolumeRequest request;
request.set_volume_id(volumeId);
request.set_node_id(nodeId.get());
CHECK_SOME(controllerContainerId);
return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
controllerContainerId.get(), std::move(request))
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
volume.state.set_state(VolumeState::CREATED);
volume.state.mutable_publish_info()->clear();
checkpointVolumeState(volumeId);
return Nothing();
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
const string& volumeId)
{
CHECK(volumes.contains(volumeId));
VolumeData& volume = volumes.at(volumeId);
if (!nodeCapabilities.stageUnstageVolume) {
CHECK_EQ(VolumeState::NODE_READY, volume.state.state());
volume.state.set_state(VolumeState::VOL_READY);
volume.state.set_boot_id(bootId);
checkpointVolumeState(volumeId);
return Nothing();
}
const string stagingPath = csi::paths::getMountStagingPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
volumeId);
// NOTE: The staging path will be cleaned up in `deleteVolume`.
Try<Nothing> mkdir = os::mkdir(stagingPath);
if (mkdir.isError()) {
return Failure(
"Failed to create mount staging path '" + stagingPath +
"': " + mkdir.error());
}
if (volume.state.state() == VolumeState::NODE_READY) {
volume.state.set_state(VolumeState::NODE_STAGE);
checkpointVolumeState(volumeId);
}
CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
csi::v0::NodeStageVolumeRequest request;
request.set_volume_id(volumeId);
*request.mutable_publish_info() = volume.state.publish_info();
request.set_staging_target_path(stagingPath);
*request.mutable_volume_capability() = volume.state.volume_capability();
*request.mutable_volume_attributes() = volume.state.volume_attributes();
CHECK_SOME(nodeContainerId);
return call<csi::v0::NODE_STAGE_VOLUME>(
nodeContainerId.get(), std::move(request))
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
volume.state.set_state(VolumeState::VOL_READY);
volume.state.set_boot_id(bootId);
checkpointVolumeState(volumeId);
return Nothing();
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
const string& volumeId)
{
CHECK(volumes.contains(volumeId));
VolumeData& volume = volumes.at(volumeId);
if (!nodeCapabilities.stageUnstageVolume) {
CHECK_EQ(VolumeState::VOL_READY, volume.state.state());
volume.state.set_state(VolumeState::NODE_READY);
volume.state.clear_boot_id();
checkpointVolumeState(volumeId);
return Nothing();
}
const string stagingPath = csi::paths::getMountStagingPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
volumeId);
CHECK(os::exists(stagingPath));
// A previously failed `NodeStageVolume` call can be recovered through the
// current `NodeUnstageVolume` call. See:
// https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
if (volume.state.state() == VolumeState::VOL_READY ||
volume.state.state() == VolumeState::NODE_STAGE) {
volume.state.set_state(VolumeState::NODE_UNSTAGE);
checkpointVolumeState(volumeId);
}
CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
csi::v0::NodeUnstageVolumeRequest request;
request.set_volume_id(volumeId);
request.set_staging_target_path(stagingPath);
CHECK_SOME(nodeContainerId);
return call<csi::v0::NODE_UNSTAGE_VOLUME>(
nodeContainerId.get(), std::move(request))
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
volume.state.set_state(VolumeState::NODE_READY);
volume.state.clear_boot_id();
checkpointVolumeState(volumeId);
return Nothing();
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
const string& volumeId)
{
CHECK(volumes.contains(volumeId));
VolumeData& volume = volumes.at(volumeId);
const string targetPath = csi::paths::getMountTargetPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
volumeId);
// NOTE: The target path will be cleaned up in `deleteVolume`.
Try<Nothing> mkdir = os::mkdir(targetPath);
if (mkdir.isError()) {
return Failure(
"Failed to create mount target path '" + targetPath +
"': " + mkdir.error());
}
if (volume.state.state() == VolumeState::VOL_READY) {
volume.state.set_state(VolumeState::NODE_PUBLISH);
checkpointVolumeState(volumeId);
}
CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state());
csi::v0::NodePublishVolumeRequest request;
request.set_volume_id(volumeId);
*request.mutable_publish_info() = volume.state.publish_info();
request.set_target_path(targetPath);
*request.mutable_volume_capability() = volume.state.volume_capability();
request.set_readonly(false);
*request.mutable_volume_attributes() = volume.state.volume_attributes();
if (nodeCapabilities.stageUnstageVolume) {
const string stagingPath = csi::paths::getMountStagingPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
volumeId);
CHECK(os::exists(stagingPath));
request.set_staging_target_path(stagingPath);
}
CHECK_SOME(nodeContainerId);
return call<csi::v0::NODE_PUBLISH_VOLUME>(
nodeContainerId.get(), std::move(request))
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
volume.state.set_state(VolumeState::PUBLISHED);
// NOTE: The `node_publish_required` field is always set up by the
// successful `nodePublish` call, as it indicates that a container is
// going to use the volume. However, it will not cleared by a
// `nodeUnpublish` call, but by a `deleteVolume` call instead.
volume.state.set_node_publish_required(true);
checkpointVolumeState(volumeId);
return Nothing();
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
const string& volumeId)
{
CHECK(volumes.contains(volumeId));
VolumeData& volume = volumes.at(volumeId);
const string targetPath = csi::paths::getMountTargetPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
volumeId);
CHECK(os::exists(targetPath));
// A previously failed `NodePublishVolume` call can be recovered through the
// current `NodeUnpublishVolume` call. See:
// https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
if (volume.state.state() == VolumeState::PUBLISHED ||
volume.state.state() == VolumeState::NODE_PUBLISH) {
volume.state.set_state(VolumeState::NODE_UNPUBLISH);
checkpointVolumeState(volumeId);
}
CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state());
csi::v0::NodeUnpublishVolumeRequest request;
request.set_volume_id(volumeId);
request.set_target_path(targetPath);
CHECK_SOME(nodeContainerId);
return call<csi::v0::NODE_UNPUBLISH_VOLUME>(
nodeContainerId.get(), std::move(request))
.then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
VolumeData& volume = volumes.at(volumeId);
volume.state.set_state(VolumeState::VOL_READY);
checkpointVolumeState(volumeId);
return Nothing();
}));
}
Future<string> StorageLocalResourceProviderProcess::createVolume(
const string& name,
const Bytes& capacity,
const DiskProfileAdaptor::ProfileInfo& profileInfo)
{
if (!controllerCapabilities.createDeleteVolume) {
return Failure(
"Controller capability 'CREATE_DELETE_VOLUME' is not supported");
}
csi::v0::CreateVolumeRequest request;
request.set_name(name);
request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
*request.add_volume_capabilities() = profileInfo.capability;
*request.mutable_parameters() = profileInfo.parameters;
CHECK_SOME(controllerContainerId);
return call<csi::v0::CREATE_VOLUME>(
controllerContainerId.get(), std::move(request), true) // Retry.
.then(defer(self(), [=](
const csi::v0::CreateVolumeResponse& response) -> string {
const csi::v0::Volume& volume = response.volume();
if (volumes.contains(volume.id())) {
// The resource provider failed over after the last `createVolume` call,
// but before the operation status was checkpointed.
CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state());
} else {
VolumeState volumeState;
volumeState.set_state(VolumeState::CREATED);
*volumeState.mutable_volume_capability() = profileInfo.capability;
*volumeState.mutable_parameters() = profileInfo.parameters;
*volumeState.mutable_volume_attributes() = volume.attributes();
volumes.put(volume.id(), std::move(volumeState));
checkpointVolumeState(volume.id());
}
return volume.id();
}));
}
Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
const string& volumeId)
{
Future<Nothing> deleted = Nothing();
// If the volume has a checkpointed state, we transition it to `CREATED` state
// before deleting. Otherwise, we have to delete it directly because we have
// no idea what state this volume is in.
if (volumes.contains(volumeId)) {
VolumeData& volume = volumes.at(volumeId);
if (volume.state.node_publish_required()) {
CHECK_EQ(VolumeState::PUBLISHED, volume.state.state());
const string targetPath = csi::paths::getMountTargetPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
volumeId);
// NOTE: Normally the volume should have been cleaned up before
// `deleteVolume` is called. However this may not be true for
// preprovisioned volumes (e.g., leftover from a previous resource
// provider instance). To prevent data leakage in such cases, we clean up
// the data (but not the target path) here.
Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
if (rmdir.isError()) {
return Failure(
"Failed to clean up volume '" + volumeId + "': " + rmdir.error());
}
volume.state.set_node_publish_required(false);
checkpointVolumeState(volumeId);
}
CHECK(VolumeState::State_IsValid(volume.state.state()));
switch (volume.state.state()) {
case VolumeState::PUBLISHED:
case VolumeState::NODE_PUBLISH:
case VolumeState::NODE_UNPUBLISH: {
deleted = deleted.then(defer(self(), &Self::nodeUnpublish, volumeId));
// NOTE: We continue to the next case to delete the volume in
// `VOL_READY` state once the above is done.
}
case VolumeState::VOL_READY:
case VolumeState::NODE_STAGE:
case VolumeState::NODE_UNSTAGE: {
deleted = deleted.then(defer(self(), &Self::nodeUnstage, volumeId));
// NOTE: We continue to the next case to delete the volume in
// `NODE_READY` state once the above is done.
}
case VolumeState::NODE_READY:
case VolumeState::CONTROLLER_PUBLISH:
case VolumeState::CONTROLLER_UNPUBLISH: {
deleted =
deleted.then(defer(self(), &Self::controllerUnpublish, volumeId));
// NOTE: We continue to the next case to delete the volume in `CREATED`
// state once the above is done.
}
case VolumeState::CREATED: {
break;
}
case VolumeState::UNKNOWN: {
UNREACHABLE();
}
// NOTE: We avoid using a default clause for the following values in
// proto3's open enum to enable the compiler to detect missing enum cases
// for us. See:
// https://github.com/google/protobuf/issues/3917
case google::protobuf::kint32min:
case google::protobuf::kint32max: {
UNREACHABLE();
}
}
}
// We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
// supported. Otherwise, we simply leave it as a preprovisioned volume.
if (controllerCapabilities.createDeleteVolume) {
deleted = deleted.then(defer(self(), [this, volumeId] {
csi::v0::DeleteVolumeRequest request;
request.set_volume_id(volumeId);
CHECK_SOME(controllerContainerId);
return call<csi::v0::DELETE_VOLUME>(
controllerContainerId.get(), std::move(request), true) // Retry.
.then([] { return Nothing(); });
}));
}
// NOTE: The last asynchronous continuation of `deleteVolume`, which is
// supposed to be run in the volume's sequence if it exists, would cause the
// sequence to be destructed, which would in turn discard the returned future.
// However, since the continuation would have already been run, the returned
// future will become ready.
return deleted
.then(defer(self(), [this, volumeId] {
if (volumes.contains(volumeId)) {
volumes.erase(volumeId);
const string volumePath = csi::paths::getVolumePath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name(),
volumeId);
Try<Nothing> rmdir = os::rmdir(volumePath);
CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
<< volumePath << "': " << rmdir.error();
garbageCollectMountPath(volumeId);
}
return controllerCapabilities.createDeleteVolume;
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
const string& volumeId,
const Option<Labels>& metadata,
const DiskProfileAdaptor::ProfileInfo& profileInfo)
{
// If the volume has a checkpointed state, the validation succeeds only if the
// capability and parameters of the specified profile are the same as those in
// the checkpoint.
if (volumes.contains(volumeId)) {
const VolumeState& volumeState = volumes.at(volumeId).state;
if (volumeState.volume_capability() != profileInfo.capability) {
return Failure("Invalid volume capability for volume '" + volumeId + "'");
}
if (volumeState.parameters() != profileInfo.parameters) {
return Failure("Invalid parameters for volume '" + volumeId + "'");
}
return Nothing();
}
if (!pluginCapabilities.controllerService) {
return Failure(
"Plugin capability 'CONTROLLER_SERVICE' is not supported");
}
google::protobuf::Map<string, string> volumeAttributes;
if (metadata.isSome()) {
volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
}
// TODO(chhsiao): Validate the volume against the parameters of the profile
// once we get CSI v1.
csi::v0::ValidateVolumeCapabilitiesRequest request;
request.set_volume_id(volumeId);
*request.add_volume_capabilities() = profileInfo.capability;
*request.mutable_volume_attributes() = volumeAttributes;
CHECK_SOME(controllerContainerId);
return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
controllerContainerId.get(), std::move(request))
.then(defer(self(), [=](
const csi::v0::ValidateVolumeCapabilitiesResponse& response)
-> Future<Nothing> {
if (!response.supported()) {
return Failure(
"Unsupported volume capability for volume '" + volumeId + "': " +
response.message());
}
VolumeState volumeState;
volumeState.set_state(VolumeState::CREATED);
*volumeState.mutable_volume_capability() = profileInfo.capability;
*volumeState.mutable_parameters() = profileInfo.parameters;
*volumeState.mutable_volume_attributes() = volumeAttributes;
volumes.put(volumeId, std::move(volumeState));
checkpointVolumeState(volumeId);
return Nothing();
}));
}
Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
{
CHECK(info.has_id());
// This is only used for reconciliation so no failure is returned.
if (!controllerCapabilities.listVolumes) {
return Resources();
}
CHECK_SOME(controllerContainerId);
// TODO(chhsiao): Set the max entries and use a loop to do
// multiple `ListVolumes` calls.
return call<csi::v0::LIST_VOLUMES>(
controllerContainerId.get(), csi::v0::ListVolumesRequest())
.then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
Resources resources;
// Recover disk profiles from the checkpointed state.
hashmap<string, string> volumesToProfiles;
foreach (const Resource& resource, totalResources) {
if (resource.disk().source().has_id() &&
resource.disk().source().has_profile()) {
volumesToProfiles.put(
resource.disk().source().id(),
resource.disk().source().profile());
}
}
foreach (const auto& entry, response.entries()) {
resources += createRawDiskResource(
info,
Bytes(entry.volume().capacity_bytes()),
volumesToProfiles.contains(entry.volume().id())
? volumesToProfiles.at(entry.volume().id())
: Option<string>::none(),
vendor,
entry.volume().id(),
entry.volume().attributes().empty()
? Option<Labels>::none()
: convertStringMapToLabels(entry.volume().attributes()));
}
return resources;
}));
}
Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
{
CHECK(info.has_id());
// This is only used for reconciliation so no failure is returned.
if (!controllerCapabilities.getCapacity) {
return Resources();
}
CHECK_SOME(controllerContainerId);
vector<Future<Resources>> futures;
foreachpair (const string& profile,
const DiskProfileAdaptor::ProfileInfo& profileInfo,
profileInfos) {
csi::v0::GetCapacityRequest request;
*request.add_volume_capabilities() = profileInfo.capability;
*request.mutable_parameters() = profileInfo.parameters;
futures.push_back(
call<csi::v0::GET_CAPACITY>(
controllerContainerId.get(), std::move(request))
.then(defer(self(), [=](
const csi::v0::GetCapacityResponse& response) -> Resources {
if (response.available_capacity() == 0) {
return Resources();
}
return createRawDiskResource(
info, Bytes(response.available_capacity()), profile, vendor);
})));
}
return collect(futures)
.then([](const vector<Resources>& resources) {
return accumulate(resources.begin(), resources.end(), Resources());
});
}
Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
const id::UUID& operationUuid)
{
CHECK(operations.contains(operationUuid));
const Operation& operation = operations.at(operationUuid);
CHECK(!protobuf::isTerminalState(operation.latest_status().state()));
Future<vector<ResourceConversion>> conversions;
switch (operation.info().type()) {
case Offer::Operation::RESERVE:
case Offer::Operation::UNRESERVE: {
// Synchronously apply the speculative operations to ensure that its
// result is reflected in the total resources before any of its succeeding
// operations is applied.
return updateOperationStatus(
operationUuid,
getResourceConversions(operation.info()));
}
case Offer::Operation::CREATE: {
// Synchronously create the persistent volumes to ensure that its result
// is reflected in the total resources before any of its succeeding
// operations is applied.
return updateOperationStatus(
operationUuid, applyCreate(operation.info()));
}
case Offer::Operation::DESTROY: {
// Synchronously clean up and destroy the persistent volumes to ensure
// that its result is reflected in the total resources before any of its
// succeeding operations is applied.
return updateOperationStatus(
operationUuid, applyDestroy(operation.info()));
}
case Offer::Operation::CREATE_DISK: {
CHECK(operation.info().has_create_disk());
conversions = applyCreateDisk(
operation.info().create_disk().source(),
operationUuid,
operation.info().create_disk().target_type(),
operation.info().create_disk().has_target_profile()
? operation.info().create_disk().target_profile()
: Option<string>::none());
break;
}
case Offer::Operation::DESTROY_DISK: {
CHECK(operation.info().has_destroy_disk());
conversions = applyDestroyDisk(
operation.info().destroy_disk().source());
break;
}
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME: {
// TODO(chhsiao): These operations are currently not supported for
// resource providers, and should have been validated by the master.
UNREACHABLE();
}
case Offer::Operation::UNKNOWN:
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP: {
UNREACHABLE();
}
}
CHECK(!protobuf::isSpeculativeOperation(operation.info()))
<< "Unexpected speculative operation: " << operation.info().type();
shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
conversions
.onAny(defer(self(), [=](const Future<vector<ResourceConversion>>& future) {
Try<vector<ResourceConversion>> conversions = future.isReady()
? Try<vector<ResourceConversion>>::some(future.get())
: Error(future.isFailed() ? future.failure() : "future discarded");
if (conversions.isSome()) {
LOG(INFO)
<< "Applying conversion from '" << conversions->at(0).consumed
<< "' to '" << conversions->at(0).converted
<< "' for operation (uuid: " << operationUuid << ")";
}
promise->associate(
updateOperationStatus(operationUuid, conversions));
}));
Future<Nothing> future = promise->future();
if (!allowsReconciliation(operation.info())) {
// We place the future in `sequence` so it can be waited before reconciling
// storage pools.
sequence.add(std::function<Future<Nothing>()>([future] { return future; }));
}
return future;
}
void StorageLocalResourceProviderProcess::dropOperation(
const id::UUID& operationUuid,
const Option<FrameworkID>& frameworkId,
const Option<Offer::Operation>& operation,
const string& message)
{
LOG(WARNING)
<< "Dropping operation (uuid: " << operationUuid << "): " << message;
CHECK(!operations.contains(operationUuid));
UpdateOperationStatusMessage update =
protobuf::createUpdateOperationStatusMessage(
protobuf::createUUID(operationUuid),
protobuf::createOperationStatus(
OPERATION_DROPPED,
None(),
message,
None(),
None(),
slaveId,
info.id()),
None(),
frameworkId,
slaveId);
if (operation.isSome()) {
// This operation is dropped intentionally. We have to persist the operation
// in the resource provider state and retry the status update.
*update.mutable_status()->mutable_uuid() = protobuf::createUUID();
if (operation->has_id()) {
*update.mutable_status()->mutable_operation_id() = operation->id();
}
operations[operationUuid] = protobuf::createOperation(
operation.get(),
update.status(),
frameworkId,
slaveId,
update.operation_uuid());
checkpointResourceProviderState();
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to update status of operation (uuid: " << operationUuid
<< "): " << message;
fatal();
};
statusUpdateManager.update(std::move(update))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
} else {
// This operation is unknown to the resource provider because of a
// disconnection, and is being asked for reconciliation. In this case, we
// send a status update without a retry. If it is dropped because of another
// disconnection, another reconciliation will be triggered by the master
// after a reregistration.
sendOperationStatusUpdate(std::move(update));
}
++metrics.operations_dropped.at(
operation.isSome() ? operation->type() : Offer::Operation::UNKNOWN);
}
Future<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyCreateDisk(
const Resource& resource,
const id::UUID& operationUuid,
const Resource::DiskInfo::Source::Type& targetType,
const Option<string>& targetProfile)
{
CHECK_EQ(Resource::DiskInfo::Source::RAW, resource.disk().source().type());
// NOTE: Currently we only support two types of RAW disk resources:
// 1. RAW disk from `GetCapacity` with a profile but no volume ID.
// 2. RAW disk from `ListVolumes` for a preprovisioned volume, which has a
// volume ID but no profile.
//
// For 1, we check if its profile is mount or block capable, then
// call `createVolume` with the operation UUID as the name (so that
// the same volume will be returned when recovering from a failover).
//
// For 2, the target profile will be specified, so we first check if the
// profile is mount or block capable. Then, we call `validateVolume` to handle
// the following two scenarios:
// a. If the volume has a checkpointed state (because it is created by a
// previous resource provider), we simply check if its checkpointed
// capability and parameters match the profile.
// b. If the volume is newly discovered, `ValidateVolumeCapabilities` is
// called with the capability of the profile.
CHECK_NE(resource.disk().source().has_profile(),
resource.disk().source().has_id() && targetProfile.isSome());
const string profile =
targetProfile.getOrElse(resource.disk().source().profile());
if (!profileInfos.contains(profile)) {
return Failure("Profile '" + profile + "' not found");
}
const DiskProfileAdaptor::ProfileInfo& profileInfo = profileInfos.at(profile);
switch (targetType) {
case Resource::DiskInfo::Source::MOUNT: {
if (!profileInfo.capability.has_mount()) {
return Failure(
"Profile '" + profile + "' cannot be used to create a MOUNT disk");
}
break;
}
case Resource::DiskInfo::Source::BLOCK: {
if (!profileInfo.capability.has_block()) {
return Failure(
"Profile '" + profile + "' cannot be used to create a BLOCK disk");
}
break;
}
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::PATH:
case Resource::DiskInfo::Source::RAW: {
UNREACHABLE();
}
}
// TODO(chhsiao): Consider calling `createVolume` sequentially with other
// create or delete operations, and send an `UPDATE_STATE` for storage pools
// afterward. See MESOS-9254.
Future<string> created = resource.disk().source().has_profile()
? createVolume(
operationUuid.toString(),
Bytes(resource.scalar().value() * Bytes::MEGABYTES),
profileInfo)
: validateVolume(
resource.disk().source().id(),
resource.disk().source().has_metadata()
? resource.disk().source().metadata()
: Option<Labels>::none(),
profileInfo)
.then([=]() -> string { return resource.disk().source().id(); });
return created
.then(defer(self(), [=](const string& volumeId) {
CHECK(volumes.contains(volumeId));
const VolumeState& volumeState = volumes.at(volumeId).state;
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_id(volumeId);
converted.mutable_disk()->mutable_source()->set_type(targetType);
converted.mutable_disk()->mutable_source()->set_profile(profile);
if (!volumeState.volume_attributes().empty()) {
converted.mutable_disk()->mutable_source()->mutable_metadata()
->CopyFrom(convertStringMapToLabels(volumeState.volume_attributes()));
}
const string mountRootDir = csi::paths::getMountRootDir(
slave::paths::getCsiRootDir("."),
info.storage().plugin().type(),
info.storage().plugin().name());
switch (targetType) {
case Resource::DiskInfo::Source::MOUNT: {
// Set the root path relative to agent work dir.
converted.mutable_disk()->mutable_source()->mutable_mount()
->set_root(mountRootDir);
break;
}
case Resource::DiskInfo::Source::BLOCK: {
break;
}
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::PATH:
case Resource::DiskInfo::Source::RAW: {
UNREACHABLE();
}
}
vector<ResourceConversion> conversions;
conversions.emplace_back(resource, std::move(converted));
return conversions;
}));
}
Future<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyDestroyDisk(
const Resource& resource)
{
CHECK(!Resources::isPersistentVolume(resource));
CHECK(resource.disk().source().has_id());
const string& volumeId = resource.disk().source().id();
Future<bool> deleted =
volumes.contains(volumeId)
? volumes.at(volumeId).sequence->add(std::function<Future<bool>()>(
defer(self(), &Self::deleteVolume, volumeId)))
: deleteVolume(volumeId);
// Sequentialize the deletion with other operation on the same volume.
return deleted
.then(defer(self(), [=](bool deprovisioned) {
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_type(
Resource::DiskInfo::Source::RAW);
switch (resource.disk().source().type()) {
case Resource::DiskInfo::Source::MOUNT: {
converted.mutable_disk()->mutable_source()->clear_mount();
break;
}
case Resource::DiskInfo::Source::BLOCK:
case Resource::DiskInfo::Source::RAW: {
break;
}
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::PATH: {
UNREACHABLE(); // Should have been validated by the master.
}
}
// We clear the volume ID and metadata if the volume has been
// deprovisioned. Otherwise, we clear the profile.
if (deprovisioned) {
converted.mutable_disk()->mutable_source()->clear_id();
converted.mutable_disk()->mutable_source()->clear_metadata();
if (!resource.disk().source().has_profile() ||
!profileInfos.contains(resource.disk().source().profile())) {
// The destroyed volume is converted into an empty resource to prevent
// the freed disk from being sent out with a disappeared profile.
converted.mutable_scalar()->set_value(0);
// Since the profile disappears, The freed disk might be claimed by
// other appeared profiles. If there is an ongoing reconciliation, it
// is waiting for this operation to finish and will recover the freed
// disk, so no reconciliation should be done here. Otherwise, we
// reconcile the storage pools to recover the freed disk.
if (!reconciled.isPending()) {
CHECK(info.has_id());
LOG(INFO)
<< "Reconciling storage pools for resource provider " << info.id()
<< " after resource '" << resource << "' has been freed";
// Reconcile the storage pools in `sequence` to wait for any other
// pending operation that disallow reconciliation to finish, and set
// up `reconciled` to drop incoming operations that disallow
// reconciliation until the storage pools are reconciled.
reconciled = sequence.add(std::function<Future<Nothing>()>(
defer(self(), &Self::reconcileStoragePools)));
}
}
} else {
converted.mutable_disk()->mutable_source()->clear_profile();
}
vector<ResourceConversion> conversions;
conversions.emplace_back(resource, std::move(converted));
return conversions;
}));
}
Try<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyCreate(
const Offer::Operation& operation) const
{
CHECK(operation.has_create());
foreach (const Resource& resource, operation.create().volumes()) {
CHECK(Resources::isPersistentVolume(resource));
// TODO(chhsiao): Support persistent BLOCK volumes.
if (resource.disk().source().type() != Resource::DiskInfo::Source::MOUNT) {
return Error(
"Cannot create persistent volume '" +
stringify(resource.disk().persistence().id()) + "' on a " +
stringify(resource.disk().source().type()) + " disk");
}
}
return getResourceConversions(operation);
}
Try<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyDestroy(
const Offer::Operation& operation) const
{
CHECK(operation.has_destroy());
foreach (const Resource& resource, operation.destroy().volumes()) {
// TODO(chhsiao): Support cleaning up persistent BLOCK volumes, presumably
// with `dd` or any other utility to zero out the block device.
CHECK(Resources::isPersistentVolume(resource));
CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT);
CHECK(resource.disk().source().has_id());
const string& volumeId = resource.disk().source().id();
CHECK(volumes.contains(volumeId));
const VolumeState& volumeState = volumes.at(volumeId).state;
// NOTE: Data can only be written to the persistent volume when when it is
// in `PUBLISHED` state (i.e., mounted). Once a volume has been transitioned
// to `PUBLISHED`, we will set the `node_publish_required` field and always
// recover it back to `PUBLISHED` after a failover, until a `DESTROY_DISK`
// is applied, which only comes after `DESTROY`. So we only need to clean up
// the volume if it has the field set.
if (!volumeState.node_publish_required()) {
continue;
}
CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
const string targetPath = csi::paths::getMountTargetPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
volumeId);
// Only the data in the target path, but not itself, should be removed.
Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
if (rmdir.isError()) {
return Error(
"Failed to remove persistent volume '" +
stringify(resource.disk().persistence().id()) + "' at '" +
targetPath + "': " + rmdir.error());
}
}
return getResourceConversions(operation);
}
Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
const id::UUID& operationUuid,
const Try<vector<ResourceConversion>>& conversions)
{
Option<Error> error;
Resources convertedResources;
CHECK(operations.contains(operationUuid));
Operation& operation = operations.at(operationUuid);
if (conversions.isSome()) {
// Strip away the allocation info when applying the conversion to
// the total resources.
vector<ResourceConversion> _conversions;
foreach (ResourceConversion conversion, conversions.get()) {
convertedResources += conversion.converted;
conversion.consumed.unallocate();
conversion.converted.unallocate();
_conversions.emplace_back(std::move(conversion));
}
Try<Resources> result = totalResources.apply(_conversions);
if (result.isSome()) {
totalResources = result.get();
} else {
error = result.error();
}
} else {
error = conversions.error();
}
operation.mutable_latest_status()->CopyFrom(protobuf::createOperationStatus(
error.isNone() ? OPERATION_FINISHED : OPERATION_FAILED,
operation.info().has_id()
? operation.info().id() : Option<OperationID>::none(),
error.isNone() ? Option<string>::none() : error->message,
error.isNone() ? convertedResources : Option<Resources>::none(),
id::UUID::random(),
slaveId,
info.id()));
operation.add_statuses()->CopyFrom(operation.latest_status());
checkpointResourceProviderState();
// Send out the status update for the operation.
UpdateOperationStatusMessage update =
protobuf::createUpdateOperationStatusMessage(
protobuf::createUUID(operationUuid),
operation.latest_status(),
None(),
operation.has_framework_id()
? operation.framework_id() : Option<FrameworkID>::none(),
slaveId);
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to update status of operation (uuid: " << operationUuid
<< "): " << message;
fatal();
};
statusUpdateManager.update(std::move(update))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
--metrics.operations_pending.at(operation.info().type());
switch (operation.latest_status().state()) {
case OPERATION_FINISHED:
++metrics.operations_finished.at(operation.info().type());
break;
case OPERATION_FAILED:
++metrics.operations_failed.at(operation.info().type());
break;
case OPERATION_UNSUPPORTED:
case OPERATION_PENDING:
case OPERATION_ERROR:
case OPERATION_DROPPED:
case OPERATION_UNREACHABLE:
case OPERATION_GONE_BY_OPERATOR:
case OPERATION_RECOVERING:
case OPERATION_UNKNOWN:
UNREACHABLE();
}
if (error.isSome()) {
// We only send `UPDATE_STATE` for failed speculative operations.
if (protobuf::isSpeculativeOperation(operation.info())) {
resourceVersion = id::UUID::random();
sendResourceProviderStateUpdate();
}
return error.get();
}
return Nothing();
}
void StorageLocalResourceProviderProcess::garbageCollectOperationPath(
const id::UUID& operationUuid)
{
CHECK(!operations.contains(operationUuid));
const string path = slave::paths::getOperationPath(
slave::paths::getResourceProviderPath(
metaDir, slaveId, info.type(), info.name(), info.id()),
operationUuid);
// NOTE: We check if the path exists since we do not checkpoint some status
// updates, such as OPERATION_DROPPED.
if (os::exists(path)) {
Try<Nothing> rmdir = os::rmdir(path);
if (rmdir.isError()) {
LOG(ERROR)
<< "Failed to remove directory '" << path << "': " << rmdir.error();
}
}
}
void StorageLocalResourceProviderProcess::garbageCollectMountPath(
const string& volumeId)
{
CHECK(!volumes.contains(volumeId));
const string path = csi::paths::getMountPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
volumeId);
if (os::exists(path)) {
Try<Nothing> rmdir = os::rmdir(path);
if (rmdir.isError()) {
LOG(ERROR)
<< "Failed to remove directory '" << path << "': " << rmdir.error();
}
}
}
void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
{
ResourceProviderState state;
foreachvalue (const Operation& operation, operations) {
state.add_operations()->CopyFrom(operation);
}
state.mutable_resources()->CopyFrom(totalResources);
ResourceProviderState::Storage* storage = state.mutable_storage();
// NOTE: We only checkpoint profiles associated with any storage
// pool (i.e., resource that has no volume ID) in the total resources.
// We do not need to checkpoint profiles for resources that have
// volume IDs, as their volume capabilities are already checkpointed.
hashset<string> requiredProfiles;
foreach (const Resource& resource, totalResources) {
if (!resource.disk().source().has_id()) {
CHECK(resource.disk().source().has_profile());
requiredProfiles.insert(resource.disk().source().profile());
}
}
foreach (const string& profile, requiredProfiles) {
CHECK(profileInfos.contains(profile));
const DiskProfileAdaptor::ProfileInfo& profileInfo =
profileInfos.at(profile);
ResourceProviderState::Storage::ProfileInfo& profileInfo_ =
(*storage->mutable_profiles())[profile];
*profileInfo_.mutable_capability() = profileInfo.capability;
*profileInfo_.mutable_parameters() = profileInfo.parameters;
}
const string statePath = slave::paths::getResourceProviderStatePath(
metaDir, slaveId, info.type(), info.name(), info.id());
// NOTE: We ensure the checkpoint is synced to the filesystem to avoid
// resulting in a stale or empty checkpoint when a system crash happens.
Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state, true);
CHECK_SOME(checkpoint)
<< "Failed to checkpoint resource provider state to '" << statePath << "': "
<< checkpoint.error();
}
void StorageLocalResourceProviderProcess::checkpointVolumeState(
const string& volumeId)
{
const string statePath = csi::paths::getVolumeStatePath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name(),
volumeId);
// NOTE: We ensure the checkpoint is synced to the filesystem to avoid
// resulting in a stale or empty checkpoint when a system crash happens.
Try<Nothing> checkpoint =
slave::state::checkpoint(statePath, volumes.at(volumeId).state, true);
CHECK_SOME(checkpoint)
<< "Failed to checkpoint volume state to '" << statePath << "':"
<< checkpoint.error();
}
void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
{
Call call;
call.set_type(Call::UPDATE_STATE);
call.mutable_resource_provider_id()->CopyFrom(info.id());
Call::UpdateState* update = call.mutable_update_state();
update->mutable_resources()->CopyFrom(totalResources);
update->mutable_resource_version_uuid()->CopyFrom(
protobuf::createUUID(resourceVersion));
foreachvalue (const Operation& operation, operations) {
update->add_operations()->CopyFrom(operation);
}
LOG(INFO)
<< "Sending UPDATE_STATE call with resources '" << totalResources
<< "' and " << update->operations_size() << " operations to agent "
<< slaveId;
// NOTE: We terminate the resource provider here if the state cannot be
// updated, so that the state is in sync with the agent's view.
auto die = [=](const ResourceProviderID& id, const string& message) {
LOG(ERROR)
<< "Failed to update state for resource provider " << id << ": "
<< message;
fatal();
};
driver->send(evolve(call))
.onFailed(defer(self(), std::bind(die, info.id(), lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, info.id(), "future discarded")));
}
void StorageLocalResourceProviderProcess::sendOperationStatusUpdate(
const UpdateOperationStatusMessage& _update)
{
Call call;
call.set_type(Call::UPDATE_OPERATION_STATUS);
call.mutable_resource_provider_id()->CopyFrom(info.id());
Call::UpdateOperationStatus* update =
call.mutable_update_operation_status();
update->mutable_operation_uuid()->CopyFrom(_update.operation_uuid());
update->mutable_status()->CopyFrom(_update.status());
if (_update.has_framework_id()) {
update->mutable_framework_id()->CopyFrom(_update.framework_id());
}
if (_update.has_latest_status()) {
update->mutable_latest_status()->CopyFrom(_update.latest_status());
}
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to send status update for operation (uuid: " << uuid << "): "
<< message;
};
Try<id::UUID> uuid =
id::UUID::fromBytes(_update.operation_uuid().value());
CHECK_SOME(uuid);
driver->send(evolve(call))
.onFailed(std::bind(err, uuid.get(), lambda::_1))
.onDiscarded(std::bind(err, uuid.get(), "future discarded"));
}
StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix)
: csi_plugin_container_terminations(
prefix + "csi_plugin/container_terminations")
{
process::metrics::add(csi_plugin_container_terminations);
vector<csi::v0::RPC> rpcs;
// NOTE: We use a switch statement here as a compile-time sanity check so we
// won't forget to add metrics for new RPCs in the future.
csi::v0::RPC firstRpc = csi::v0::GET_PLUGIN_INFO;
switch (firstRpc) {
case csi::v0::GET_PLUGIN_INFO:
rpcs.push_back(csi::v0::GET_PLUGIN_INFO);
case csi::v0::GET_PLUGIN_CAPABILITIES:
rpcs.push_back(csi::v0::GET_PLUGIN_CAPABILITIES);
case csi::v0::PROBE:
rpcs.push_back(csi::v0::PROBE);
case csi::v0::CREATE_VOLUME:
rpcs.push_back(csi::v0::CREATE_VOLUME);
case csi::v0::DELETE_VOLUME:
rpcs.push_back(csi::v0::DELETE_VOLUME);
case csi::v0::CONTROLLER_PUBLISH_VOLUME:
rpcs.push_back(csi::v0::CONTROLLER_PUBLISH_VOLUME);
case csi::v0::CONTROLLER_UNPUBLISH_VOLUME:
rpcs.push_back(csi::v0::CONTROLLER_UNPUBLISH_VOLUME);
case csi::v0::VALIDATE_VOLUME_CAPABILITIES:
rpcs.push_back(csi::v0::VALIDATE_VOLUME_CAPABILITIES);
case csi::v0::LIST_VOLUMES:
rpcs.push_back(csi::v0::LIST_VOLUMES);
case csi::v0::GET_CAPACITY:
rpcs.push_back(csi::v0::GET_CAPACITY);
case csi::v0::CONTROLLER_GET_CAPABILITIES:
rpcs.push_back(csi::v0::CONTROLLER_GET_CAPABILITIES);
case csi::v0::NODE_STAGE_VOLUME:
rpcs.push_back(csi::v0::NODE_STAGE_VOLUME);
case csi::v0::NODE_UNSTAGE_VOLUME:
rpcs.push_back(csi::v0::NODE_UNSTAGE_VOLUME);
case csi::v0::NODE_PUBLISH_VOLUME:
rpcs.push_back(csi::v0::NODE_PUBLISH_VOLUME);
case csi::v0::NODE_UNPUBLISH_VOLUME:
rpcs.push_back(csi::v0::NODE_UNPUBLISH_VOLUME);
case csi::v0::NODE_GET_ID:
rpcs.push_back(csi::v0::NODE_GET_ID);
case csi::v0::NODE_GET_CAPABILITIES:
rpcs.push_back(csi::v0::NODE_GET_CAPABILITIES);
}
foreach (const csi::v0::RPC& rpc, rpcs) {
const string name = stringify(rpc);
csi_plugin_rpcs_pending.put(
rpc, PushGauge(prefix + "csi_plugin/rpcs/" + name + "/pending"));
csi_plugin_rpcs_successes.put(
rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/successes"));
csi_plugin_rpcs_errors.put(
rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/errors"));
csi_plugin_rpcs_cancelled.put(
rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/cancelled"));
process::metrics::add(csi_plugin_rpcs_pending.at(rpc));
process::metrics::add(csi_plugin_rpcs_successes.at(rpc));
process::metrics::add(csi_plugin_rpcs_errors.at(rpc));
process::metrics::add(csi_plugin_rpcs_cancelled.at(rpc));
}
vector<Offer::Operation::Type> operationTypes;
// NOTE: We use a switch statement here as a compile-time sanity check so we
// won't forget to add metrics for new operations in the future.
Offer::Operation::Type firstOperationType = Offer::Operation::RESERVE;
switch (firstOperationType) {
case Offer::Operation::RESERVE:
operationTypes.push_back(Offer::Operation::RESERVE);
case Offer::Operation::UNRESERVE:
operationTypes.push_back(Offer::Operation::UNRESERVE);
case Offer::Operation::CREATE:
operationTypes.push_back(Offer::Operation::CREATE);
case Offer::Operation::DESTROY:
operationTypes.push_back(Offer::Operation::DESTROY);
case Offer::Operation::CREATE_DISK:
operationTypes.push_back(Offer::Operation::CREATE_DISK);
case Offer::Operation::DESTROY_DISK:
operationTypes.push_back(Offer::Operation::DESTROY_DISK);
break;
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME:
// TODO(chhsiao): These operations are currently not supported for
// resource providers, and should have been validated by the master.
UNREACHABLE();
case Offer::Operation::UNKNOWN:
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP:
UNREACHABLE();
};
foreach (const Offer::Operation::Type& type, operationTypes) {
const string name = strings::lower(Offer::Operation::Type_Name(type));
operations_pending.put(type, PushGauge(
prefix + "operations/" + name + "/pending"));
operations_finished.put(type, Counter(
prefix + "operations/" + name + "/finished"));
operations_failed.put(type, Counter(
prefix + "operations/" + name + "/failed"));
operations_dropped.put(type, Counter(
prefix + "operations/" + name + "/dropped"));
process::metrics::add(operations_pending.at(type));
process::metrics::add(operations_finished.at(type));
process::metrics::add(operations_failed.at(type));
process::metrics::add(operations_dropped.at(type));
}
// Special metric for counting the number of `OPERATION_DROPPED` statuses when
// receiving explicit reconciliation for unknown operation UUIDs.
operations_dropped.put(
Offer::Operation::UNKNOWN,
Counter(prefix + "operations/unknown/dropped"));
process::metrics::add(operations_dropped.at(Offer::Operation::UNKNOWN));
}
StorageLocalResourceProviderProcess::Metrics::~Metrics()
{
process::metrics::remove(csi_plugin_container_terminations);
foreachvalue (const PushGauge& gauge, csi_plugin_rpcs_pending) {
process::metrics::remove(gauge);
}
foreachvalue (const Counter& counter, csi_plugin_rpcs_successes) {
process::metrics::remove(counter);
}
foreachvalue (const Counter& counter, csi_plugin_rpcs_errors) {
process::metrics::remove(counter);
}
foreachvalue (const Counter& counter, csi_plugin_rpcs_cancelled) {
process::metrics::remove(counter);
}
foreachvalue (const PushGauge& gauge, operations_pending) {
process::metrics::remove(gauge);
}
foreachvalue (const Counter& counter, operations_finished) {
process::metrics::remove(counter);
}
foreachvalue (const Counter& counter, operations_failed) {
process::metrics::remove(counter);
}
foreachvalue (const Counter& counter, operations_dropped) {
process::metrics::remove(counter);
}
}
Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
const http::URL& url,
const string& workDir,
const ResourceProviderInfo& info,
const SlaveID& slaveId,
const Option<string>& authToken,
bool strict)
{
Option<Error> error = validate(info);
if (error.isSome()) {
return error.get();
}
return Owned<LocalResourceProvider>(new StorageLocalResourceProvider(
url, workDir, info, slaveId, authToken, strict));
}
Option<Error> StorageLocalResourceProvider::validate(
const ResourceProviderInfo& info)
{
if (info.has_id()) {
return Error("'ResourceProviderInfo.id' must not be set");
}
// Verify that the name follows Java package naming convention.
// TODO(chhsiao): We should move this check to a validation function
// for `ResourceProviderInfo`.
if (!isValidName(info.name())) {
return Error(
"Resource provider name '" + info.name() +
"' does not follow Java package naming convention");
}
if (!info.has_storage()) {
return Error("'ResourceProviderInfo.storage' must be set");
}
// Verify that the type and name of the CSI plugin follow Java package
// naming convention.
// TODO(chhsiao): We should move this check to a validation function
// for `CSIPluginInfo`.
if (!isValidType(info.storage().plugin().type()) ||
!isValidName(info.storage().plugin().name())) {
return Error(
"CSI plugin type '" + info.storage().plugin().type() +
"' and name '" + info.storage().plugin().name() +
"' does not follow Java package naming convention");
}
// Verify that the plugin provides the CSI node service.
// TODO(chhsiao): We should move this check to a validation function
// for `CSIPluginInfo`.
bool hasNodeService = false;
foreach (const CSIPluginContainerInfo& container,
info.storage().plugin().containers()) {
if (container.services().end() != find(
container.services().begin(),
container.services().end(),
CSIPluginContainerInfo::NODE_SERVICE)) {
hasNodeService = true;
break;
}
}
if (!hasNodeService) {
return Error(
stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");
}
return None();
}
StorageLocalResourceProvider::StorageLocalResourceProvider(
const http::URL& url,
const string& workDir,
const ResourceProviderInfo& info,
const SlaveID& slaveId,
const Option<string>& authToken,
bool strict)
: process(new StorageLocalResourceProviderProcess(
url, workDir, info, slaveId, authToken, strict))
{
spawn(CHECK_NOTNULL(process.get()));
}
StorageLocalResourceProvider::~StorageLocalResourceProvider()
{
process::terminate(process.get());
process::wait(process.get());
}
} // namespace internal {
} // namespace mesos {