blob: e290b3a219b9a6fdae6f11cca5d11fd887c26ae7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "resource_provider/storage/provider.hpp"
#include <algorithm>
#include <cctype>
#include <functional>
#include <list>
#include <memory>
#include <numeric>
#include <queue>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <mesos/http.hpp>
#include <mesos/resources.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/resource_provider/resource_provider.hpp>
#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
#include <mesos/v1/resource_provider.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/future.hpp>
#include <process/grpc.hpp>
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/process.hpp>
#include <process/sequence.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/push_gauge.hpp>
#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/unreachable.hpp>
#include <stout/uuid.hpp>
#include <stout/os/realpath.hpp>
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
#include "csi/metrics.hpp"
#include "csi/paths.hpp"
#include "csi/service_manager.hpp"
#include "csi/volume_manager.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "resource_provider/detector.hpp"
#include "resource_provider/state.hpp"
#include "slave/paths.hpp"
#include "slave/state.hpp"
#include "status_update_manager/operation.hpp"
namespace http = process::http;
using std::accumulate;
using std::find;
using std::list;
using std::queue;
using std::shared_ptr;
using std::string;
using std::vector;
using process::await;
using process::collect;
using process::Continue;
using process::ControlFlow;
using process::defer;
using process::delay;
using process::Failure;
using process::Future;
using process::loop;
using process::Owned;
using process::Process;
using process::ProcessBase;
using process::Promise;
using process::Sequence;
using process::spawn;
using process::grpc::StatusError;
using process::grpc::client::Runtime;
using process::http::authentication::Principal;
using process::metrics::Counter;
using process::metrics::PushGauge;
using mesos::csi::ServiceManager;
using mesos::csi::VolumeInfo;
using mesos::csi::VolumeManager;
using mesos::internal::protobuf::convertLabelsToStringMap;
using mesos::internal::protobuf::convertStringMapToLabels;
using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
using mesos::resource_provider::ResourceProviderState;
using mesos::v1::resource_provider::Driver;
namespace mesos {
namespace internal {
// Returns true if the string is a valid Java identifier.
static bool isValidName(const string& s)
{
if (s.empty()) {
return false;
}
foreach (const char c, s) {
if (!isalnum(c) && c != '_') {
return false;
}
}
return true;
}
// Returns true if the string is a valid Java package name.
static bool isValidType(const string& s)
{
if (s.empty()) {
return false;
}
foreach (const string& token, strings::split(s, ".")) {
if (!isValidName(token)) {
return false;
}
}
return true;
}
// Returns the parent endpoint as a URL.
// TODO(jieyu): Consider using a more reliable way to get the agent v1
// operator API endpoint URL.
static inline http::URL extractParentEndpoint(const http::URL& url)
{
http::URL parent = url;
parent.path = Path(url.path).dirname();
return parent;
}
static string getContainerPrefix(const ResourceProviderInfo& info)
{
const Principal principal = LocalResourceProvider::principal(info);
CHECK(principal.claims.contains("cid_prefix"));
return principal.claims.at("cid_prefix");
}
static inline Resource createRawDiskResource(
const ResourceProviderInfo& info,
const Bytes& capacity,
const Option<string>& profile,
const Option<string>& vendor,
const Option<string>& id = None(),
const Option<Labels>& metadata = None())
{
CHECK(info.has_id());
CHECK(info.has_storage());
Resource resource;
resource.set_name("disk");
resource.set_type(Value::SCALAR);
resource.mutable_scalar()
->set_value(static_cast<double>(capacity.bytes()) / Bytes::MEGABYTES);
resource.mutable_provider_id()->CopyFrom(info.id()),
resource.mutable_reservations()->CopyFrom(info.default_reservations());
Resource::DiskInfo::Source* source =
resource.mutable_disk()->mutable_source();
source->set_type(Resource::DiskInfo::Source::RAW);
if (profile.isSome()) {
source->set_profile(profile.get());
}
if (vendor.isSome()) {
source->set_vendor(vendor.get());
}
if (id.isSome()) {
source->set_id(id.get());
}
if (metadata.isSome()) {
source->mutable_metadata()->CopyFrom(metadata.get());
}
return resource;
}
class StorageLocalResourceProviderProcess
: public Process<StorageLocalResourceProviderProcess>
{
public:
explicit StorageLocalResourceProviderProcess(
const http::URL& _url,
const string& _workDir,
const ResourceProviderInfo& _info,
const SlaveID& _slaveId,
const Option<string>& _authToken,
bool _strict);
StorageLocalResourceProviderProcess(
const StorageLocalResourceProviderProcess& other) = delete;
StorageLocalResourceProviderProcess& operator=(
const StorageLocalResourceProviderProcess& other) = delete;
void connected();
void disconnected();
void received(const Event& event);
private:
void initialize() override;
void fatal();
Future<Nothing> recover();
void doReliableRegistration();
// The reconcile functions are responsible to reconcile the state of
// the resource provider from the recovered state and other sources of
// truth, such as CSI plugin responses or the status update manager.
Future<Nothing> reconcileResourceProviderState();
Future<Nothing> reconcileOperationStatuses();
ResourceConversion reconcileResources(
const Resources& checkpointed,
const Resources& discovered);
// Returns a list of resource conversions to updates volume contexts for
// existing volumes, remove disappeared unconverted volumes, and add newly
// appeared ones.
Future<vector<ResourceConversion>> getExistingVolumes();
// Returns a list of resource conversions to remove disappeared unconverted
// storage pools and add newly appeared ones.
Future<vector<ResourceConversion>> getStoragePools();
// Spawns a loop to watch for changes in the set of known profiles and update
// the profile mapping and storage pools accordingly.
void watchProfiles();
// Update the profile mapping when the set of known profiles changes.
// NOTE: This function never fails. If it fails to translate a new
// profile, the resource provider will continue to operate with the
// set of profiles it knows about.
Future<Nothing> updateProfiles(const hashset<string>& profiles);
// Reconcile the storage pools when the set of known profiles changes,
// or a volume with an unknown profile is destroyed.
Future<Nothing> reconcileStoragePools();
// Returns true if the storage pools are allowed to be reconciled when
// the operation is being applied.
static bool allowsReconciliation(const Offer::Operation& operation);
// Functions for received events.
void subscribed(const Event::Subscribed& subscribed);
void applyOperation(const Event::ApplyOperation& operation);
void publishResources(const Event::PublishResources& publish);
void acknowledgeOperationStatus(
const Event::AcknowledgeOperationStatus& acknowledge);
void reconcileOperations(const Event::ReconcileOperations& reconcile);
// Applies the operation. Speculative operations will be synchronously
// applied. Do nothing if the operation is already in a terminal state.
Future<Nothing> _applyOperation(const id::UUID& operationUuid);
// Sends `OPERATION_DROPPED` status update. The operation status will be
// checkpointed if `operation` is set.
void dropOperation(
const id::UUID& operationUuid,
const Option<FrameworkID>& frameworkId,
const Option<Offer::Operation>& operation,
const string& message);
Future<vector<ResourceConversion>> applyCreateDisk(
const Resource& resource,
const id::UUID& operationUuid,
const Resource::DiskInfo::Source::Type& targetType,
const Option<string>& targetProfile);
Future<vector<ResourceConversion>> applyDestroyDisk(
const Resource& resource);
// Synchronously creates persistent volumes.
Try<vector<ResourceConversion>> applyCreate(
const Offer::Operation& operation) const;
// Synchronously cleans up and destroys persistent volumes.
Try<vector<ResourceConversion>> applyDestroy(
const Offer::Operation& operation) const;
// Synchronously updates `totalResources` and the operation status and
// then asks the status update manager to send status updates.
Try<Nothing> updateOperationStatus(
const id::UUID& operationUuid,
const Try<vector<ResourceConversion>>& conversions);
void garbageCollectOperationPath(const id::UUID& operationUuid);
void checkpointResourceProviderState();
void sendResourceProviderStateUpdate();
void sendOperationStatusUpdate(const UpdateOperationStatusMessage& update);
enum State
{
RECOVERING,
DISCONNECTED,
CONNECTED,
SUBSCRIBED,
READY
} state;
const http::URL url;
const string workDir;
const string metaDir;
const ContentType contentType;
ResourceProviderInfo info;
const string vendor;
const SlaveID slaveId;
const Option<string> authToken;
const bool strict;
shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
Owned<Driver> driver;
OperationStatusUpdateManager statusUpdateManager;
// The mapping of known profiles fetched from the DiskProfileAdaptor.
hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
Runtime runtime;
// NOTE: `metrics` must be destructed after `volumeManager` and
// `serviceManager` since they hold a pointer to it.
struct Metrics : public csi::Metrics
{
explicit Metrics(const string& prefix);
~Metrics();
hashmap<Offer::Operation::Type, PushGauge> operations_pending;
hashmap<Offer::Operation::Type, Counter> operations_finished;
hashmap<Offer::Operation::Type, Counter> operations_failed;
hashmap<Offer::Operation::Type, Counter> operations_dropped;
} metrics;
// NOTE: `serviceManager` must be destructed after `volumeManager` since the
// latter holds a pointer of the former.
Owned<ServiceManager> serviceManager;
Owned<VolumeManager> volumeManager;
// We maintain the following invariant: if one operation depends on
// another, they cannot be in PENDING state at the same time, i.e.,
// the result of the preceding operation must have been reflected in
// the total resources.
//
// NOTE: We store the list of operations in a `LinkedHashMap` to
// preserve the order we receive the operations in case we need it.
LinkedHashMap<id::UUID, Operation> operations;
Resources totalResources;
id::UUID resourceVersion;
// If pending, it means that the storage pools are being reconciled, and all
// incoming operations that disallow reconciliation will be dropped.
Future<Nothing> reconciled;
// We maintain a sequence to coordinate reconciliations of storage pools. It
// keeps track of pending operations that disallow reconciliation, and ensures
// that any reconciliation waits for these operations to finish.
Sequence sequence;
};
StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
const http::URL& _url,
const string& _workDir,
const ResourceProviderInfo& _info,
const SlaveID& _slaveId,
const Option<string>& _authToken,
bool _strict)
: ProcessBase(process::ID::generate("storage-local-resource-provider")),
state(RECOVERING),
url(_url),
workDir(_workDir),
metaDir(slave::paths::getMetaRootDir(_workDir)),
contentType(ContentType::PROTOBUF),
info(_info),
vendor(
info.storage().plugin().type() + "." + info.storage().plugin().name()),
slaveId(_slaveId),
authToken(_authToken),
strict(_strict),
metrics("resource_providers/" + info.type() + "." + info.name() + "/"),
resourceVersion(id::UUID::random()),
sequence("storage-local-resource-provider-sequence")
{
diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
CHECK_NOTNULL(diskProfileAdaptor.get());
}
void StorageLocalResourceProviderProcess::connected()
{
CHECK_EQ(DISCONNECTED, state);
LOG(INFO) << "Connected to resource provider manager";
state = CONNECTED;
doReliableRegistration();
}
void StorageLocalResourceProviderProcess::disconnected()
{
CHECK(state == CONNECTED || state == SUBSCRIBED || state == READY);
LOG(INFO) << "Disconnected from resource provider manager";
state = DISCONNECTED;
statusUpdateManager.pause();
}
void StorageLocalResourceProviderProcess::received(const Event& event)
{
LOG(INFO) << "Received " << event.type() << " event";
switch (event.type()) {
case Event::SUBSCRIBED: {
CHECK(event.has_subscribed());
subscribed(event.subscribed());
break;
}
case Event::APPLY_OPERATION: {
CHECK(event.has_apply_operation());
applyOperation(event.apply_operation());
break;
}
case Event::PUBLISH_RESOURCES: {
CHECK(event.has_publish_resources());
publishResources(event.publish_resources());
break;
}
case Event::ACKNOWLEDGE_OPERATION_STATUS: {
CHECK(event.has_acknowledge_operation_status());
acknowledgeOperationStatus(event.acknowledge_operation_status());
break;
}
case Event::RECONCILE_OPERATIONS: {
CHECK(event.has_reconcile_operations());
reconcileOperations(event.reconcile_operations());
break;
}
case Event::TEARDOWN: {
// TODO(bbannier): Clean up state after teardown.
break;
}
case Event::UNKNOWN: {
LOG(WARNING) << "Received an UNKNOWN event and ignored";
break;
}
}
}
void StorageLocalResourceProviderProcess::initialize()
{
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to recover resource provider with type '" << info.type()
<< "' and name '" << info.name() << "': " << message;
fatal();
};
// NOTE: Most resource provider events rely on the plugins being
// prepared. To avoid race conditions, we connect to the agent after
// preparing the plugins.
recover()
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
void StorageLocalResourceProviderProcess::fatal()
{
// Force the disconnection early.
driver.reset();
process::terminate(self());
}
Future<Nothing> StorageLocalResourceProviderProcess::recover()
{
CHECK_EQ(RECOVERING, state);
serviceManager.reset(new ServiceManager(
extractParentEndpoint(url),
slave::paths::getCsiRootDir(workDir),
info.storage().plugin(),
{csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
getContainerPrefix(info),
authToken,
runtime,
&metrics));
return serviceManager->recover()
.then(defer(self(), [=] {
return serviceManager->getApiVersion();
}))
.then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin(),
{csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
apiVersion,
runtime,
serviceManager.get(),
&metrics);
if (volumeManager_.isError()) {
return Failure(
"Failed to create CSI volume manager for resource provider with "
"type '" + info.type() + "' and name '" + info.name() + "': " +
volumeManager_.error());
}
volumeManager = std::move(volumeManager_.get());
return volumeManager->recover();
}))
.then(defer(self(), [=]() -> Future<Nothing> {
// Recover the resource provider ID and state from the latest symlink. If
// the symlink does not exist, this is a new resource provider, and the
// total resources will be empty, which is fine since new resources will
// be added during reconciliation.
Result<string> realpath =
os::realpath(slave::paths::getLatestResourceProviderPath(
metaDir, slaveId, info.type(), info.name()));
if (realpath.isError()) {
return Failure(
"Failed to read the latest symlink for resource provider with type "
"'" + info.type() + "' and name '" + info.name() + "': " +
realpath.error());
}
if (realpath.isSome()) {
info.mutable_id()->set_value(Path(realpath.get()).basename());
const string statePath = slave::paths::getResourceProviderStatePath(
metaDir, slaveId, info.type(), info.name(), info.id());
if (os::exists(statePath)) {
Result<ResourceProviderState> resourceProviderState =
slave::state::read<ResourceProviderState>(statePath);
if (resourceProviderState.isError()) {
return Failure(
"Failed to read resource provider state from '" + statePath +
"': " + resourceProviderState.error());
}
if (resourceProviderState.isSome()) {
foreach (const Operation& operation,
resourceProviderState->operations()) {
Try<id::UUID> uuid =
id::UUID::fromBytes(operation.uuid().value());
operations[CHECK_NOTERROR(uuid)] = operation;
}
totalResources = resourceProviderState->resources();
const ResourceProviderState::Storage& storage =
resourceProviderState->storage();
using ProfileEntry = google::protobuf::
MapPair<string, ResourceProviderState::Storage::ProfileInfo>;
foreach (const ProfileEntry& entry, storage.profiles()) {
profileInfos.put(
entry.first,
{entry.second.capability(), entry.second.parameters()});
}
// We only checkpoint profiles associated with storage pools (i.e.,
// resources without IDs) in `checkpointResourceProviderState` as
// only these profiles might be used by pending operations, so we
// validate here that all such profiles exist.
foreach (const Resource& resource, totalResources) {
if (!resource.disk().source().has_id() &&
resource.disk().source().has_profile() &&
!profileInfos.contains(resource.disk().source().profile())) {
return Failure(
"Cannot recover profile for storage pool '" +
stringify(resource) + "' from '" + statePath + "'");
}
}
}
}
}
LOG(INFO)
<< "Recovered resources '" << totalResources << "' and "
<< operations.size() << " operations for resource provider with type '"
<< info.type() << "' and name '" << info.name() << "'";
state = DISCONNECTED;
statusUpdateManager.pause();
driver.reset(new Driver(
Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
contentType,
defer(self(), &Self::connected),
defer(self(), &Self::disconnected),
defer(self(), [this](queue<v1::resource_provider::Event> events) {
while(!events.empty()) {
const v1::resource_provider::Event& event = events.front();
received(devolve(event));
events.pop();
}
}),
authToken));
driver->start();
return Nothing();
}));
}
void StorageLocalResourceProviderProcess::doReliableRegistration()
{
if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
return;
}
CHECK_EQ(CONNECTED, state);
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_resource_provider_info()->CopyFrom(info);
auto err = [](const ResourceProviderInfo& info, const string& message) {
LOG(ERROR)
<< "Failed to subscribe resource provider with type '" << info.type()
<< "' and name '" << info.name() << "': " << message;
};
driver->send(evolve(call))
.onFailed(std::bind(err, info, lambda::_1))
.onDiscarded(std::bind(err, info, "future discarded"));
// TODO(chhsiao): Consider doing an exponential backoff.
delay(Seconds(1), self(), &Self::doReliableRegistration);
}
Future<Nothing>
StorageLocalResourceProviderProcess::reconcileResourceProviderState()
{
return reconcileOperationStatuses()
.then(defer(self(), [=] {
return collect<vector<ResourceConversion>>(
{getExistingVolumes(), getStoragePools()})
.then(defer(self(), [=](
const vector<vector<ResourceConversion>>& collected) {
Resources result = totalResources;
foreach (const vector<ResourceConversion>& conversions, collected) {
result = CHECK_NOTERROR(result.apply(conversions));
}
if (result != totalResources) {
LOG(INFO)
<< "Removing '" << (totalResources - result) << "' and adding '"
<< (result - totalResources) << "' to the total resources";
totalResources = result;
checkpointResourceProviderState();
}
// NOTE: Since this is the first `UPDATE_STATE` call of the
// current subscription, there must be no racing speculative
// operation, thus no need to update the resource version.
sendResourceProviderStateUpdate();
statusUpdateManager.resume();
LOG(INFO)
<< "Resource provider " << info.id() << " is in READY state";
state = READY;
return Nothing();
}));
}));
}
Future<Nothing>
StorageLocalResourceProviderProcess::reconcileOperationStatuses()
{
CHECK(info.has_id());
const string resourceProviderDir = slave::paths::getResourceProviderPath(
metaDir, slaveId, info.type(), info.name(), info.id());
statusUpdateManager.initialize(
defer(self(), &Self::sendOperationStatusUpdate, lambda::_1),
std::bind(
&slave::paths::getOperationUpdatesPath,
resourceProviderDir,
lambda::_1));
Try<list<string>> operationPaths = slave::paths::getOperationPaths(
slave::paths::getResourceProviderPath(
metaDir, slaveId, info.type(), info.name(), info.id()));
if (operationPaths.isError()) {
return Failure(
"Failed to find operations for resource provider " +
stringify(info.id()) + ": " + operationPaths.error());
}
list<id::UUID> operationUuids;
foreach (const string& path, operationPaths.get()) {
Try<id::UUID> uuid =
slave::paths::parseOperationPath(resourceProviderDir, path);
if (uuid.isError()) {
return Failure(
"Failed to parse operation path '" + path + "': " +
uuid.error());
}
// NOTE: This could happen if we failed to remove the operation path before.
if (!operations.contains(uuid.get())) {
LOG(WARNING)
<< "Ignoring unknown operation (uuid: " << uuid.get()
<< ") for resource provider " << info.id();
garbageCollectOperationPath(uuid.get());
continue;
}
operationUuids.emplace_back(std::move(uuid.get()));
}
return statusUpdateManager.recover(operationUuids, strict)
.then(defer(self(), [=](
const OperationStatusUpdateManagerState& statusUpdateManagerState)
-> Future<Nothing> {
using StreamState =
typename OperationStatusUpdateManagerState::StreamState;
// Clean up the operations that are completed.
vector<id::UUID> completedOperations;
foreachpair (const id::UUID& uuid,
const Option<StreamState>& stream,
statusUpdateManagerState.streams) {
if (stream.isSome() && stream->terminated) {
operations.erase(uuid);
completedOperations.push_back(uuid);
}
}
// Garbage collect the operation streams after checkpointing.
checkpointResourceProviderState();
foreach (const id::UUID& uuid, completedOperations) {
garbageCollectOperationPath(uuid);
}
// Send updates for all missing statuses.
foreachpair (const id::UUID& uuid,
const Operation& operation,
operations) {
if (operation.latest_status().state() == OPERATION_PENDING) {
continue;
}
const int numStatuses =
statusUpdateManagerState.streams.contains(uuid) &&
statusUpdateManagerState.streams.at(uuid).isSome()
? statusUpdateManagerState.streams.at(uuid)->updates.size() : 0;
for (int i = numStatuses; i < operation.statuses().size(); i++) {
UpdateOperationStatusMessage update =
protobuf::createUpdateOperationStatusMessage(
protobuf::createUUID(uuid),
operation.statuses(i),
None(),
operation.has_framework_id()
? operation.framework_id() : Option<FrameworkID>::none(),
slaveId);
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to update status of operation (uuid: " << uuid << "): "
<< message;
fatal();
};
statusUpdateManager.update(std::move(update))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
}
// We replay all pending operations here, so that if a volume is
// created or deleted before the last failover, the result will be
// reflected in the total resources before reconciliation.
vector<Future<Nothing>> futures;
foreachpair (const id::UUID& uuid,
const Operation& operation,
operations) {
switch (operation.latest_status().state()) {
case OPERATION_PENDING:
++metrics.operations_pending.at(operation.info().type());
break;
case OPERATION_FINISHED:
++metrics.operations_finished.at(operation.info().type());
break;
case OPERATION_FAILED:
++metrics.operations_failed.at(operation.info().type());
break;
case OPERATION_DROPPED:
++metrics.operations_dropped.at(operation.info().type());
break;
case OPERATION_UNSUPPORTED:
case OPERATION_ERROR:
case OPERATION_UNREACHABLE:
case OPERATION_GONE_BY_OPERATOR:
case OPERATION_RECOVERING:
case OPERATION_UNKNOWN:
UNREACHABLE();
}
if (protobuf::isTerminalState(operation.latest_status().state())) {
continue;
}
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to apply operation (uuid: " << uuid << "): "
<< message;
};
futures.push_back(_applyOperation(uuid)
.onFailed(std::bind(err, uuid, lambda::_1))
.onDiscarded(std::bind(err, uuid, "future discarded")));
}
// We await the futures instead of collect them because it is OK
// for operations to fail.
return await(futures).then([] { return Nothing(); });
}));
}
Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
{
CHECK_PENDING(reconciled);
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to reconcile storage pools for resource provider " << info.id()
<< ": " << message;
fatal();
};
return getStoragePools()
.then(defer(self(), [=](const vector<ResourceConversion>& conversions) {
Resources result = CHECK_NOTERROR(totalResources.apply(conversions));
if (result != totalResources) {
LOG(INFO)
<< "Removing '" << (totalResources - result) << "' and adding '"
<< (result - totalResources) << "' to the total resources";
totalResources = result;
checkpointResourceProviderState();
// NOTE: We always update the resource version before sending
// an `UPDATE_STATE`, so that any racing speculative operation
// will be rejected. Otherwise, the speculative resource
// conversion done on the master will be cancelled out.
resourceVersion = id::UUID::random();
sendResourceProviderStateUpdate();
}
return Nothing();
}))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
bool StorageLocalResourceProviderProcess::allowsReconciliation(
const Offer::Operation& operation)
{
switch (operation.type()) {
case Offer::Operation::RESERVE:
case Offer::Operation::UNRESERVE: {
Resources consumedStoragePools =
CHECK_NOTERROR(protobuf::getConsumedResources(operation))
.filter([](const Resource& r) {
return r.disk().source().has_profile() &&
r.disk().source().type() == Resource::DiskInfo::Source::RAW;
});
return consumedStoragePools.empty();
}
case Offer::Operation::CREATE:
case Offer::Operation::DESTROY: {
return true;
}
case Offer::Operation::CREATE_DISK:
case Offer::Operation::DESTROY_DISK: {
return false;
}
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME: {
// TODO(chhsiao): These operations are currently not supported for
// resource providers, and should have been validated by the master.
UNREACHABLE();
}
case Offer::Operation::UNKNOWN:
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP: {
UNREACHABLE();
}
}
UNREACHABLE();
}
ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
const Resources& checkpointed,
const Resources& discovered)
{
// NOTE: If a resource in the checkpointed resources is missing in the
// discovered resources, we will still keep it if it is converted by
// an operation before (i.e., has extra info other than the default
// reservations). The reason is that we want to maintain a consistent
// view with frameworks, and do not want to lose any data on
// persistent volumes due to some temporarily CSI plugin faults. Other
// missing resources that are "unconverted" by any framework will be
// removed. Then, any newly discovered resource will be added.
Resources toRemove;
Resources toAdd = discovered;
foreach (const Resource& resource, checkpointed) {
Resources unconverted = createRawDiskResource(
info,
Bytes(resource.scalar().value() * Bytes::MEGABYTES),
resource.disk().source().has_profile()
? resource.disk().source().profile() : Option<string>::none(),
resource.disk().source().has_vendor()
? resource.disk().source().vendor() : Option<string>::none(),
resource.disk().source().has_id()
? resource.disk().source().id() : Option<string>::none(),
resource.disk().source().has_metadata()
? resource.disk().source().metadata() : Option<Labels>::none());
if (toAdd.contains(unconverted)) {
// If the remaining of the discovered resources contain the
// "unconverted" version of a checkpointed resource, this is not a
// new resource.
toAdd -= unconverted;
} else if (unconverted == resource) {
// If the remaining of the discovered resources does not contain
// the "unconverted" version of the checkpointed resource, the
// resource is missing. However, if it remains unconverted in the
// checkpoint, we can safely remove it from the total resources.
toRemove += unconverted;
} else {
LOG(WARNING)
<< "Missing converted resource '" << resource
<< "'. This might cause further operations to fail.";
}
}
return ResourceConversion(std::move(toRemove), std::move(toAdd));
}
Future<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::getExistingVolumes()
{
CHECK(info.has_id());
return volumeManager->listVolumes()
.then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) {
// If a volume is duplicated or a volume context has been changed by a
// non-conforming CSI plugin, we need to construct a resources conversion
// to remove the duplicate and update the metadata, so we maintain the
// resources to be removed and those to be added here.
Resources toRemove;
Resources toAdd;
// Since we only support "exclusive" (MOUNT or BLOCK) disks, there should
// be only one checkpointed resource for each volume ID.
hashmap<string, Resource> checkpointedMap;
foreach (const Resource& resource, totalResources) {
if (resource.disk().source().has_id()) {
// If the checkpointed resources contain duplicated volumes because of
// a non-conforming CSI plugin, remove the duplicate.
if (checkpointedMap.contains(resource.disk().source().id())) {
LOG(WARNING) << "Removing duplicated volume '" << resource
<< "' from the total resources";
toRemove += resource;
} else {
checkpointedMap.put(resource.disk().source().id(), resource);
}
}
}
// The "discovered" resources consist of RAW disk resources, one for each
// volume reported by the CSI plugin.
Resources discovered;
hashset<string> discoveredVolumeIds;
foreach (const VolumeInfo& volumeInfo, volumeInfos) {
const Option<string> profile =
checkpointedMap.contains(volumeInfo.id) &&
checkpointedMap.at(volumeInfo.id).disk().source().has_profile()
? checkpointedMap.at(volumeInfo.id).disk().source().profile()
: Option<string>::none();
const Option<Labels> metadata = volumeInfo.context.empty()
? Option<Labels>::none()
: convertStringMapToLabels(volumeInfo.context);
const Resource resource = createRawDiskResource(
info,
volumeInfo.capacity,
profile,
vendor,
volumeInfo.id,
metadata);
if (discoveredVolumeIds.contains(volumeInfo.id)) {
LOG(WARNING) << "Dropping duplicated volume '" << resource
<< "' from the discovered resources";
continue;
}
discovered += resource;
discoveredVolumeIds.insert(volumeInfo.id);
if (checkpointedMap.contains(volumeInfo.id)) {
const Resource& resource = checkpointedMap.at(volumeInfo.id);
// If the volume context has been changed by a non-conforming CSI
// plugin, the changes will be reflected in a resource conversion.
if (resource.disk().source().metadata() !=
metadata.getOrElse(Labels())) {
toRemove += resource;
Resource changed = resource;
if (metadata.isSome()) {
*changed.mutable_disk()->mutable_source()->mutable_metadata() =
*metadata;
} else {
changed.mutable_disk()->mutable_source()->clear_metadata();
}
toAdd += changed;
}
}
}
ResourceConversion metadataConversion(
std::move(toRemove), std::move(toAdd));
Resources checkpointed = CHECK_NOTERROR(
totalResources.filter([](const Resource& resource) {
return resource.disk().source().has_id();
}).apply(metadataConversion));
return vector<ResourceConversion>{
std::move(metadataConversion),
reconcileResources(std::move(checkpointed), std::move(discovered))};
}));
}
Future<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::getStoragePools()
{
CHECK(info.has_id());
vector<Future<Resource>> futures;
foreachpair (const string& profile,
const DiskProfileAdaptor::ProfileInfo& profileInfo,
profileInfos) {
futures.push_back(
volumeManager->getCapacity(
profileInfo.capability, profileInfo.parameters)
.then(std::bind(
&createRawDiskResource,
info,
lambda::_1,
profile,
vendor,
None(),
None())));
}
return collect(futures)
.then(defer(self(), [=](const vector<Resource>& resources) {
Resources discovered(resources); // Zero resources will be ignored.
Resources checkpointed =
totalResources.filter([](const Resource& resource) {
return Resources::isDisk(resource, Resource::DiskInfo::Source::RAW) &&
!resource.disk().source().has_id();
});
return vector<ResourceConversion>{
reconcileResources(std::move(checkpointed), std::move(discovered))};
}));
}
void StorageLocalResourceProviderProcess::watchProfiles()
{
auto err = [](const string& message) {
LOG(ERROR) << "Failed to watch for DiskProfileAdaptor: " << message;
};
// TODO(chhsiao): Consider retrying with backoff.
loop(
self(),
[=] {
return diskProfileAdaptor->watch(profileInfos.keys(), info);
},
[=](const hashset<string>& profiles) {
CHECK(info.has_id());
LOG(INFO)
<< "Updating profiles " << stringify(profiles)
<< " for resource provider " << info.id();
std::function<Future<Nothing>()> update = defer(self(), [=] {
return updateProfiles(profiles)
.then(defer(self(), &Self::reconcileStoragePools));
});
// Update the profile mapping and storage pools in `sequence` to wait
// for any pending operation that disallow reconciliation or the last
// reconciliation (if any) to finish, and set up `reconciled` to drop
// incoming operations that disallow reconciliation until the storage
// pools are reconciled.
reconciled = sequence.add(update);
return reconciled
.then(defer(self(), [=]() -> ControlFlow<Nothing> {
return Continue();
}));
})
.onFailed(std::bind(err, lambda::_1))
.onDiscarded(std::bind(err, "future discarded"));
}
Future<Nothing> StorageLocalResourceProviderProcess::updateProfiles(
const hashset<string>& profiles)
{
// Remove disappeared profiles.
foreach (const string& profile, profileInfos.keys()) {
if (!profiles.contains(profile)) {
profileInfos.erase(profile);
}
}
// Translate and add newly appeared profiles.
vector<Future<Nothing>> futures;
foreach (const string& profile, profiles) {
// Since profiles are immutable after creation, we do not need to
// translate any profile that is already in the mapping.
if (profileInfos.contains(profile)) {
continue;
}
auto err = [](const string& profile, const string& message) {
LOG(ERROR)
<< "Failed to translate profile '" << profile << "': " << message;
};
futures.push_back(diskProfileAdaptor->translate(profile, info)
.then(defer(self(), [=](
const DiskProfileAdaptor::ProfileInfo& profileInfo) {
profileInfos.put(profile, profileInfo);
return Nothing();
}))
.onFailed(std::bind(err, profile, lambda::_1))
.onDiscarded(std::bind(err, profile, "future discarded")));
}
// We use `await` here to return a future that never fails, so the loop in
// `watchProfiles` will continue to watch for profile changes. If any profile
// translation fails, the profile will not be added to the set of known
// profiles and thus the disk profile adaptor will notify the resource
// provider again.
return await(futures).then([] { return Nothing(); });
}
void StorageLocalResourceProviderProcess::subscribed(
const Event::Subscribed& subscribed)
{
CHECK_EQ(CONNECTED, state);
LOG(INFO) << "Subscribed with ID " << subscribed.provider_id().value();
state = SUBSCRIBED;
if (!info.has_id()) {
// New subscription.
info.mutable_id()->CopyFrom(subscribed.provider_id());
slave::paths::createResourceProviderDirectory(
metaDir,
slaveId,
info.type(),
info.name(),
info.id());
}
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to reconcile resource provider " << info.id() << ": "
<< message;
fatal();
};
// Reconcile resources after obtaining the resource provider ID and start
// watching for profile changes after the reconciliation.
// TODO(chhsiao): Reconcile and watch for profile changes early.
reconciled = reconcileResourceProviderState()
.onReady(defer(self(), &Self::watchProfiles))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
void StorageLocalResourceProviderProcess::applyOperation(
const Event::ApplyOperation& operation)
{
CHECK(state == SUBSCRIBED || state == READY);
Try<id::UUID> uuid = id::UUID::fromBytes(operation.operation_uuid().value());
CHECK_SOME(uuid);
LOG(INFO)
<< "Received " << operation.info().type() << " operation '"
<< operation.info().id() << "' (uuid: " << uuid.get() << ")";
Option<FrameworkID> frameworkId = operation.has_framework_id()
? operation.framework_id() : Option<FrameworkID>::none();
if (state == SUBSCRIBED) {
return dropOperation(
uuid.get(),
frameworkId,
operation.info(),
"Cannot apply operation in SUBSCRIBED state");
}
if (reconciled.isPending() && !allowsReconciliation(operation.info())) {
return dropOperation(
uuid.get(),
frameworkId,
operation.info(),
"Cannot apply operation when reconciling storage pools");
}
Try<id::UUID> operationVersion =
id::UUID::fromBytes(operation.resource_version_uuid().value());
CHECK_SOME(operationVersion);
if (operationVersion.get() != resourceVersion) {
return dropOperation(
uuid.get(),
frameworkId,
operation.info(),
"Mismatched resource version " + stringify(operationVersion.get()) +
" (expected: " + stringify(resourceVersion) + ")");
}
CHECK(!operations.contains(uuid.get()));
operations[uuid.get()] = protobuf::createOperation(
operation.info(),
protobuf::createOperationStatus(
OPERATION_PENDING,
operation.info().has_id()
? operation.info().id() : Option<OperationID>::none(),
None(),
None(),
None(),
slaveId,
info.id()),
frameworkId,
slaveId,
protobuf::createUUID(uuid.get()));
checkpointResourceProviderState();
++metrics.operations_pending.at(operation.info().type());
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to apply operation (uuid: " << uuid << "): " << message;
};
_applyOperation(uuid.get())
.onFailed(std::bind(err, uuid.get(), lambda::_1))
.onDiscarded(std::bind(err, uuid.get(), "future discarded"));
}
void StorageLocalResourceProviderProcess::publishResources(
const Event::PublishResources& publish)
{
Option<Error> error;
hashset<string> volumeIds;
if (state == SUBSCRIBED) {
error = Error("Cannot publish resources in SUBSCRIBED state");
} else {
CHECK_EQ(READY, state);
Resources resources = publish.resources();
resources.unallocate();
foreach (const Resource& resource, resources) {
if (!totalResources.contains(resource)) {
error = Error(
"Cannot publish unknown resource '" + stringify(resource) + "'");
break;
}
switch (resource.disk().source().type()) {
case Resource::DiskInfo::Source::PATH:
case Resource::DiskInfo::Source::MOUNT:
case Resource::DiskInfo::Source::BLOCK: {
CHECK(resource.disk().source().has_id());
volumeIds.insert(resource.disk().source().id());
break;
}
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::RAW: {
error = Error(
"Cannot publish volume of " +
stringify(resource.disk().source().type()) + " type");
break;
}
}
}
}
Future<vector<Nothing>> allPublished;
if (error.isSome()) {
allPublished = Failure(error.get());
} else {
vector<Future<Nothing>> futures;
foreach (const string& volumeId, volumeIds) {
futures.push_back(volumeManager->publishVolume(volumeId));
}
allPublished = collect(futures);
}
allPublished
.onAny(defer(self(), [=](const Future<vector<Nothing>>& future) {
// TODO(chhsiao): Currently there is no way to reply to the
// resource provider manager with a failure message, so we log the
// failure here.
if (!future.isReady()) {
LOG(ERROR)
<< "Failed to publish resources '" << publish.resources() << "': "
<< (future.isFailed() ? future.failure() : "future discarded");
}
Call call;
call.mutable_resource_provider_id()->CopyFrom(info.id());
call.set_type(Call::UPDATE_PUBLISH_RESOURCES_STATUS);
Call::UpdatePublishResourcesStatus* update =
call.mutable_update_publish_resources_status();
update->mutable_uuid()->CopyFrom(publish.uuid());
update->set_status(future.isReady()
? Call::UpdatePublishResourcesStatus::OK
: Call::UpdatePublishResourcesStatus::FAILED);
auto err = [](const mesos::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to send status update for publish "
<< id::UUID::fromBytes(uuid.value()).get() << ": " << message;
};
driver->send(evolve(call))
.onFailed(std::bind(err, publish.uuid(), lambda::_1))
.onDiscarded(std::bind(err, publish.uuid(), "future discarded"));
}));
}
void StorageLocalResourceProviderProcess::acknowledgeOperationStatus(
const Event::AcknowledgeOperationStatus& acknowledge)
{
CHECK_EQ(READY, state);
Try<id::UUID> operationUuid =
id::UUID::fromBytes(acknowledge.operation_uuid().value());
CHECK_SOME(operationUuid);
Try<id::UUID> statusUuid =
id::UUID::fromBytes(acknowledge.status_uuid().value());
CHECK_SOME(statusUuid);
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to acknowledge status update for operation (uuid: " << uuid
<< "): " << message;
};
// NOTE: It is possible that an incoming acknowledgement races with an
// outgoing retry of status update, and then a duplicated
// acknowledgement will be received. In this case, the following call
// will fail, so we just leave an error log.
statusUpdateManager.acknowledgement(operationUuid.get(), statusUuid.get())
.then(defer(self(), [=](bool continuation) {
if (!continuation) {
operations.erase(operationUuid.get());
checkpointResourceProviderState();
garbageCollectOperationPath(operationUuid.get());
}
return Nothing();
}))
.onFailed(std::bind(err, operationUuid.get(), lambda::_1))
.onDiscarded(std::bind(err, operationUuid.get(), "future discarded"));
}
void StorageLocalResourceProviderProcess::reconcileOperations(
const Event::ReconcileOperations& reconcile)
{
CHECK_EQ(READY, state);
foreach (const mesos::UUID& operationUuid, reconcile.operation_uuids()) {
Try<id::UUID> uuid = id::UUID::fromBytes(operationUuid.value());
CHECK_SOME(uuid);
if (operations.contains(uuid.get())) {
// When the agent asks for reconciliation for a known operation,
// that means the `APPLY_OPERATION` event races with the last
// `UPDATE_STATE` call and arrives after the call. Since the event
// is received, nothing needs to be done here.
continue;
}
// TODO(chhsiao): Consider sending `OPERATION_UNKNOWN` instead.
dropOperation(
uuid.get(),
None(),
None(),
"Unknown operation");
}
}
Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
const id::UUID& operationUuid)
{
CHECK(operations.contains(operationUuid));
const Operation& operation = operations.at(operationUuid);
CHECK(!protobuf::isTerminalState(operation.latest_status().state()));
Future<vector<ResourceConversion>> conversions;
switch (operation.info().type()) {
case Offer::Operation::RESERVE:
case Offer::Operation::UNRESERVE: {
// Synchronously apply the speculative operations to ensure that its
// result is reflected in the total resources before any of its succeeding
// operations is applied.
return updateOperationStatus(
operationUuid,
getResourceConversions(operation.info()));
}
case Offer::Operation::CREATE: {
// Synchronously create the persistent volumes to ensure that its result
// is reflected in the total resources before any of its succeeding
// operations is applied.
return updateOperationStatus(
operationUuid, applyCreate(operation.info()));
}
case Offer::Operation::DESTROY: {
// Synchronously clean up and destroy the persistent volumes to ensure
// that its result is reflected in the total resources before any of its
// succeeding operations is applied.
return updateOperationStatus(
operationUuid, applyDestroy(operation.info()));
}
case Offer::Operation::CREATE_DISK: {
CHECK(operation.info().has_create_disk());
conversions = applyCreateDisk(
operation.info().create_disk().source(),
operationUuid,
operation.info().create_disk().target_type(),
operation.info().create_disk().has_target_profile()
? operation.info().create_disk().target_profile()
: Option<string>::none());
break;
}
case Offer::Operation::DESTROY_DISK: {
CHECK(operation.info().has_destroy_disk());
conversions = applyDestroyDisk(
operation.info().destroy_disk().source());
break;
}
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME: {
// TODO(chhsiao): These operations are currently not supported for
// resource providers, and should have been validated by the master.
UNREACHABLE();
}
case Offer::Operation::UNKNOWN:
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP: {
UNREACHABLE();
}
}
CHECK(!protobuf::isSpeculativeOperation(operation.info()))
<< "Unexpected speculative operation: " << operation.info().type();
shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
conversions
.onAny(defer(self(), [=](const Future<vector<ResourceConversion>>& future) {
Try<vector<ResourceConversion>> conversions = future.isReady()
? Try<vector<ResourceConversion>>::some(future.get())
: Error(future.isFailed() ? future.failure() : "future discarded");
if (conversions.isSome()) {
LOG(INFO)
<< "Applying conversion from '" << conversions->at(0).consumed
<< "' to '" << conversions->at(0).converted
<< "' for operation (uuid: " << operationUuid << ")";
}
promise->associate(
updateOperationStatus(operationUuid, conversions));
}));
Future<Nothing> future = promise->future();
if (!allowsReconciliation(operation.info())) {
// We place the future in `sequence` so it can be waited before reconciling
// storage pools.
sequence.add(std::function<Future<Nothing>()>([future] { return future; }));
}
return future;
}
void StorageLocalResourceProviderProcess::dropOperation(
const id::UUID& operationUuid,
const Option<FrameworkID>& frameworkId,
const Option<Offer::Operation>& operation,
const string& message)
{
LOG(WARNING)
<< "Dropping operation (uuid: " << operationUuid << "): " << message;
CHECK(!operations.contains(operationUuid));
UpdateOperationStatusMessage update =
protobuf::createUpdateOperationStatusMessage(
protobuf::createUUID(operationUuid),
protobuf::createOperationStatus(
OPERATION_DROPPED,
None(),
message,
None(),
None(),
slaveId,
info.id()),
None(),
frameworkId,
slaveId);
if (operation.isSome()) {
// This operation is dropped intentionally. We have to persist the operation
// in the resource provider state and retry the status update.
*update.mutable_status()->mutable_uuid() = protobuf::createUUID();
if (operation->has_id()) {
*update.mutable_status()->mutable_operation_id() = operation->id();
}
operations[operationUuid] = protobuf::createOperation(
operation.get(),
update.status(),
frameworkId,
slaveId,
update.operation_uuid());
checkpointResourceProviderState();
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to update status of operation (uuid: " << operationUuid
<< "): " << message;
fatal();
};
statusUpdateManager.update(std::move(update))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
} else {
// This operation is unknown to the resource provider because of a
// disconnection, and is being asked for reconciliation. In this case, we
// send a status update without a retry. If it is dropped because of another
// disconnection, another reconciliation will be triggered by the master
// after a reregistration.
sendOperationStatusUpdate(std::move(update));
}
++metrics.operations_dropped.at(
operation.isSome() ? operation->type() : Offer::Operation::UNKNOWN);
}
Future<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyCreateDisk(
const Resource& resource,
const id::UUID& operationUuid,
const Resource::DiskInfo::Source::Type& targetType,
const Option<string>& targetProfile)
{
CHECK_EQ(Resource::DiskInfo::Source::RAW, resource.disk().source().type());
// NOTE: Currently we only support two types of RAW disk resources:
// 1. RAW disk from `GetCapacity` with a profile but no volume ID.
// 2. RAW disk from `ListVolumes` for a preprovisioned volume, which has a
// volume ID but no profile.
//
// For 1, we check if its profile is mount or block capable, then
// call `createVolume` with the operation UUID as the name (so that
// the same volume will be returned when recovering from a failover).
//
// For 2, the target profile will be specified, so we first check if the
// profile is mount or block capable. Then, we call `validateVolume` to handle
// the following two scenarios:
// a. If the volume has a checkpointed state (because it is created by a
// previous resource provider), we simply check if its checkpointed
// capability and parameters match the profile.
// b. If the volume is newly discovered, `ValidateVolumeCapabilities` is
// called with the capability of the profile.
CHECK_NE(resource.disk().source().has_profile(),
resource.disk().source().has_id() && targetProfile.isSome());
const string profile =
targetProfile.getOrElse(resource.disk().source().profile());
if (!profileInfos.contains(profile)) {
return Failure("Profile '" + profile + "' not found");
}
const DiskProfileAdaptor::ProfileInfo& profileInfo = profileInfos.at(profile);
switch (targetType) {
case Resource::DiskInfo::Source::MOUNT: {
if (!profileInfo.capability.has_mount()) {
return Failure(
"Profile '" + profile + "' cannot be used to create a MOUNT disk");
}
break;
}
case Resource::DiskInfo::Source::BLOCK: {
if (!profileInfo.capability.has_block()) {
return Failure(
"Profile '" + profile + "' cannot be used to create a BLOCK disk");
}
break;
}
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::PATH:
case Resource::DiskInfo::Source::RAW: {
UNREACHABLE();
}
}
// TODO(chhsiao): Consider calling `createVolume` sequentially with other
// create or delete operations, and send an `UPDATE_STATE` for storage pools
// afterward. See MESOS-9254.
Future<VolumeInfo> created;
if (resource.disk().source().has_profile()) {
created = volumeManager->createVolume(
operationUuid.toString(),
resource.scalar().value() * Bytes::MEGABYTES,
profileInfo.capability,
profileInfo.parameters);
} else {
VolumeInfo volumeInfo = {
resource.scalar().value() * Bytes::MEGABYTES,
resource.disk().source().id(),
CHECK_NOTERROR(convertLabelsToStringMap(
resource.disk().source().metadata()))
};
created = volumeManager->validateVolume(
volumeInfo, profileInfo.capability, profileInfo.parameters)
.then([resource, profile, volumeInfo](
const Option<Error>& error) -> Future<VolumeInfo> {
if (error.isSome()) {
return Failure(
"Cannot apply profile '" + profile + "' to resource '" +
stringify(resource) + "': " + error->message);
}
return volumeInfo;
});
}
return created
.then(defer(self(), [=](const VolumeInfo& volumeInfo) {
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_id(volumeInfo.id);
converted.mutable_disk()->mutable_source()->set_type(targetType);
converted.mutable_disk()->mutable_source()->set_profile(profile);
if (!volumeInfo.context.empty()) {
*converted.mutable_disk()->mutable_source()->mutable_metadata() =
convertStringMapToLabels(volumeInfo.context);
}
const string mountRootDir = csi::paths::getMountRootDir(
slave::paths::getCsiRootDir("."),
info.storage().plugin().type(),
info.storage().plugin().name());
switch (targetType) {
case Resource::DiskInfo::Source::MOUNT: {
// Set the root path relative to agent work dir.
converted.mutable_disk()->mutable_source()->mutable_mount()->set_root(
mountRootDir);
break;
}
case Resource::DiskInfo::Source::BLOCK: {
break;
}
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::PATH:
case Resource::DiskInfo::Source::RAW: {
UNREACHABLE();
}
}
vector<ResourceConversion> conversions;
conversions.emplace_back(resource, std::move(converted));
return conversions;
}));
}
Future<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyDestroyDisk(
const Resource& resource)
{
CHECK(!Resources::isPersistentVolume(resource));
CHECK(resource.disk().source().has_id());
return volumeManager->deleteVolume(resource.disk().source().id())
.then(defer(self(), [=](bool deprovisioned) {
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_type(
Resource::DiskInfo::Source::RAW);
switch (resource.disk().source().type()) {
case Resource::DiskInfo::Source::MOUNT: {
converted.mutable_disk()->mutable_source()->clear_mount();
break;
}
case Resource::DiskInfo::Source::BLOCK:
case Resource::DiskInfo::Source::RAW: {
break;
}
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::PATH: {
UNREACHABLE(); // Should have been validated by the master.
}
}
// We clear the volume ID and metadata if the volume has been
// deprovisioned. Otherwise, we clear the profile.
if (deprovisioned) {
converted.mutable_disk()->mutable_source()->clear_id();
converted.mutable_disk()->mutable_source()->clear_metadata();
if (!resource.disk().source().has_profile() ||
!profileInfos.contains(resource.disk().source().profile())) {
// The destroyed volume is converted into an empty resource to prevent
// the freed disk from being sent out with a disappeared profile.
converted.mutable_scalar()->set_value(0);
// Since the profile disappears, The freed disk might be claimed by
// other appeared profiles. If there is an ongoing reconciliation, it
// is waiting for this operation to finish and will recover the freed
// disk, so no reconciliation should be done here. Otherwise, we
// reconcile the storage pools to recover the freed disk.
if (!reconciled.isPending()) {
CHECK(info.has_id());
LOG(INFO)
<< "Reconciling storage pools for resource provider " << info.id()
<< " after resource '" << resource << "' has been freed";
// Reconcile the storage pools in `sequence` to wait for any other
// pending operation that disallow reconciliation to finish, and set
// up `reconciled` to drop incoming operations that disallow
// reconciliation until the storage pools are reconciled.
reconciled = sequence.add(std::function<Future<Nothing>()>(
defer(self(), &Self::reconcileStoragePools)));
}
}
} else {
converted.mutable_disk()->mutable_source()->clear_profile();
}
vector<ResourceConversion> conversions;
conversions.emplace_back(resource, std::move(converted));
return conversions;
}));
}
Try<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyCreate(
const Offer::Operation& operation) const
{
CHECK(operation.has_create());
foreach (const Resource& resource, operation.create().volumes()) {
CHECK(Resources::isPersistentVolume(resource));
CHECK(resource.disk().source().has_id());
// TODO(chhsiao): Support persistent BLOCK volumes.
if (resource.disk().source().type() != Resource::DiskInfo::Source::MOUNT) {
return Error(
"Cannot create persistent volume '" +
stringify(resource.disk().persistence().id()) + "' on a " +
stringify(resource.disk().source().type()) + " disk");
}
// TODO(chhsiao): Ideally, we could perform a sanity check to verify that
// the target path is empty before creating a new persistent volume.
// However, right now we cannot distinguish the case where a framework is
// recreating its own persistent volume after the agent ID changes from the
// case where existing data is being leaked to another framework.
}
return getResourceConversions(operation);
}
Try<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyDestroy(
const Offer::Operation& operation) const
{
CHECK(operation.has_destroy());
foreach (const Resource& resource, operation.destroy().volumes()) {
CHECK(Resources::isPersistentVolume(resource));
CHECK(resource.disk().source().has_id());
// TODO(chhsiao): Support cleaning up persistent BLOCK volumes, presumably
// with `dd` or any other utility to zero out the block device.
CHECK_EQ(Resource::DiskInfo::Source::MOUNT,
resource.disk().source().type());
const string targetPath = csi::paths::getMountTargetPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
resource.disk().source().id());
if (os::exists(targetPath)) {
// NOTE: We always clean up the data in the target path (but not the
// directory itself) even if the volume is not published, in which case
// this should be a no-op.
Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
if (rmdir.isError()) {
return Error(
"Failed to remove persistent volume '" +
stringify(resource.disk().persistence().id()) + "' at '" +
targetPath + "': " + rmdir.error());
}
}
}
return getResourceConversions(operation);
}
Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
const id::UUID& operationUuid,
const Try<vector<ResourceConversion>>& conversions)
{
Option<Error> error;
Resources convertedResources;
CHECK(operations.contains(operationUuid));
Operation& operation = operations.at(operationUuid);
if (conversions.isSome()) {
// Strip away the allocation info when applying the conversion to
// the total resources.
vector<ResourceConversion> _conversions;
foreach (ResourceConversion conversion, conversions.get()) {
convertedResources += conversion.converted;
conversion.consumed.unallocate();
conversion.converted.unallocate();
_conversions.emplace_back(std::move(conversion));
}
Try<Resources> result = totalResources.apply(_conversions);
if (result.isSome()) {
totalResources = result.get();
} else {
error = result.error();
}
} else {
error = conversions.error();
}
operation.mutable_latest_status()->CopyFrom(protobuf::createOperationStatus(
error.isNone() ? OPERATION_FINISHED : OPERATION_FAILED,
operation.info().has_id()
? operation.info().id() : Option<OperationID>::none(),
error.isNone() ? Option<string>::none() : error->message,
error.isNone() ? convertedResources : Option<Resources>::none(),
id::UUID::random(),
slaveId,
info.id()));
operation.add_statuses()->CopyFrom(operation.latest_status());
checkpointResourceProviderState();
// Send out the status update for the operation.
UpdateOperationStatusMessage update =
protobuf::createUpdateOperationStatusMessage(
protobuf::createUUID(operationUuid),
operation.latest_status(),
None(),
operation.has_framework_id()
? operation.framework_id() : Option<FrameworkID>::none(),
slaveId);
auto die = [=](const string& message) {
LOG(ERROR)
<< "Failed to update status of operation (uuid: " << operationUuid
<< "): " << message;
fatal();
};
statusUpdateManager.update(std::move(update))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
--metrics.operations_pending.at(operation.info().type());
switch (operation.latest_status().state()) {
case OPERATION_FINISHED:
++metrics.operations_finished.at(operation.info().type());
break;
case OPERATION_FAILED:
++metrics.operations_failed.at(operation.info().type());
break;
case OPERATION_UNSUPPORTED:
case OPERATION_PENDING:
case OPERATION_ERROR:
case OPERATION_DROPPED:
case OPERATION_UNREACHABLE:
case OPERATION_GONE_BY_OPERATOR:
case OPERATION_RECOVERING:
case OPERATION_UNKNOWN:
UNREACHABLE();
}
if (error.isSome()) {
// We only send `UPDATE_STATE` for failed speculative operations.
if (protobuf::isSpeculativeOperation(operation.info())) {
resourceVersion = id::UUID::random();
sendResourceProviderStateUpdate();
}
return error.get();
}
return Nothing();
}
void StorageLocalResourceProviderProcess::garbageCollectOperationPath(
const id::UUID& operationUuid)
{
CHECK(!operations.contains(operationUuid));
const string path = slave::paths::getOperationPath(
slave::paths::getResourceProviderPath(
metaDir, slaveId, info.type(), info.name(), info.id()),
operationUuid);
if (os::exists(path)) {
Try<Nothing> rmdir = os::rmdir(path);
if (rmdir.isError()) {
LOG(ERROR)
<< "Failed to remove directory '" << path << "': " << rmdir.error();
}
}
}
void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
{
ResourceProviderState state;
foreachvalue (const Operation& operation, operations) {
state.add_operations()->CopyFrom(operation);
}
state.mutable_resources()->CopyFrom(totalResources);
ResourceProviderState::Storage* storage = state.mutable_storage();
// NOTE: We only checkpoint profiles associated with any storage
// pool (i.e., resource that has no volume ID) in the total resources.
// We do not need to checkpoint profiles for resources that have
// volume IDs, as their volume capabilities are already checkpointed.
hashset<string> requiredProfiles;
foreach (const Resource& resource, totalResources) {
if (!resource.disk().source().has_id()) {
CHECK(resource.disk().source().has_profile());
requiredProfiles.insert(resource.disk().source().profile());
}
}
foreach (const string& profile, requiredProfiles) {
CHECK(profileInfos.contains(profile));
const DiskProfileAdaptor::ProfileInfo& profileInfo =
profileInfos.at(profile);
ResourceProviderState::Storage::ProfileInfo& profileInfo_ =
(*storage->mutable_profiles())[profile];
*profileInfo_.mutable_capability() = profileInfo.capability;
*profileInfo_.mutable_parameters() = profileInfo.parameters;
}
const string statePath = slave::paths::getResourceProviderStatePath(
metaDir, slaveId, info.type(), info.name(), info.id());
// NOTE: We ensure the checkpoint is synced to the filesystem to avoid
// resulting in a stale or empty checkpoint when a system crash happens.
Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state, true);
CHECK_SOME(checkpoint)
<< "Failed to checkpoint resource provider state to '" << statePath << "': "
<< checkpoint.error();
}
void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
{
Call call;
call.set_type(Call::UPDATE_STATE);
call.mutable_resource_provider_id()->CopyFrom(info.id());
Call::UpdateState* update = call.mutable_update_state();
update->mutable_resources()->CopyFrom(totalResources);
update->mutable_resource_version_uuid()->CopyFrom(
protobuf::createUUID(resourceVersion));
foreachvalue (const Operation& operation, operations) {
update->add_operations()->CopyFrom(operation);
}
LOG(INFO)
<< "Sending UPDATE_STATE call with resources '" << totalResources
<< "' and " << update->operations_size() << " operations to agent "
<< slaveId;
// NOTE: We terminate the resource provider here if the state cannot be
// updated, so that the state is in sync with the agent's view.
auto die = [=](const ResourceProviderID& id, const string& message) {
LOG(ERROR)
<< "Failed to update state for resource provider " << id << ": "
<< message;
fatal();
};
driver->send(evolve(call))
.onFailed(defer(self(), std::bind(die, info.id(), lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, info.id(), "future discarded")));
}
void StorageLocalResourceProviderProcess::sendOperationStatusUpdate(
const UpdateOperationStatusMessage& _update)
{
Call call;
call.set_type(Call::UPDATE_OPERATION_STATUS);
call.mutable_resource_provider_id()->CopyFrom(info.id());
Call::UpdateOperationStatus* update =
call.mutable_update_operation_status();
update->mutable_operation_uuid()->CopyFrom(_update.operation_uuid());
update->mutable_status()->CopyFrom(_update.status());
if (_update.has_framework_id()) {
update->mutable_framework_id()->CopyFrom(_update.framework_id());
}
if (_update.has_latest_status()) {
update->mutable_latest_status()->CopyFrom(_update.latest_status());
}
auto err = [](const id::UUID& uuid, const string& message) {
LOG(ERROR)
<< "Failed to send status update for operation (uuid: " << uuid << "): "
<< message;
};
Try<id::UUID> uuid =
id::UUID::fromBytes(_update.operation_uuid().value());
CHECK_SOME(uuid);
driver->send(evolve(call))
.onFailed(std::bind(err, uuid.get(), lambda::_1))
.onDiscarded(std::bind(err, uuid.get(), "future discarded"));
}
StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix)
: csi::Metrics(prefix)
{
vector<Offer::Operation::Type> operationTypes;
// NOTE: We use a switch statement here as a compile-time sanity check so we
// won't forget to add metrics for new operations in the future.
Offer::Operation::Type firstOperationType = Offer::Operation::RESERVE;
switch (firstOperationType) {
case Offer::Operation::RESERVE:
operationTypes.push_back(Offer::Operation::RESERVE);
case Offer::Operation::UNRESERVE:
operationTypes.push_back(Offer::Operation::UNRESERVE);
case Offer::Operation::CREATE:
operationTypes.push_back(Offer::Operation::CREATE);
case Offer::Operation::DESTROY:
operationTypes.push_back(Offer::Operation::DESTROY);
case Offer::Operation::CREATE_DISK:
operationTypes.push_back(Offer::Operation::CREATE_DISK);
case Offer::Operation::DESTROY_DISK:
operationTypes.push_back(Offer::Operation::DESTROY_DISK);
break;
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME:
// TODO(chhsiao): These operations are currently not supported for
// resource providers, and should have been validated by the master.
UNREACHABLE();
case Offer::Operation::UNKNOWN:
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP:
UNREACHABLE();
};
foreach (const Offer::Operation::Type& type, operationTypes) {
const string name = strings::lower(Offer::Operation::Type_Name(type));
operations_pending.put(type, PushGauge(
prefix + "operations/" + name + "/pending"));
operations_finished.put(type, Counter(
prefix + "operations/" + name + "/finished"));
operations_failed.put(type, Counter(
prefix + "operations/" + name + "/failed"));
operations_dropped.put(type, Counter(
prefix + "operations/" + name + "/dropped"));
process::metrics::add(operations_pending.at(type));
process::metrics::add(operations_finished.at(type));
process::metrics::add(operations_failed.at(type));
process::metrics::add(operations_dropped.at(type));
}
// Special metric for counting the number of `OPERATION_DROPPED` statuses when
// receiving explicit reconciliation for unknown operation UUIDs.
operations_dropped.put(
Offer::Operation::UNKNOWN,
Counter(prefix + "operations/unknown/dropped"));
process::metrics::add(operations_dropped.at(Offer::Operation::UNKNOWN));
}
StorageLocalResourceProviderProcess::Metrics::~Metrics()
{
foreachvalue (const PushGauge& gauge, operations_pending) {
process::metrics::remove(gauge);
}
foreachvalue (const Counter& counter, operations_finished) {
process::metrics::remove(counter);
}
foreachvalue (const Counter& counter, operations_failed) {
process::metrics::remove(counter);
}
foreachvalue (const Counter& counter, operations_dropped) {
process::metrics::remove(counter);
}
}
Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
const http::URL& url,
const string& workDir,
const ResourceProviderInfo& info,
const SlaveID& slaveId,
const Option<string>& authToken,
bool strict)
{
Option<Error> error = validate(info);
if (error.isSome()) {
return error.get();
}
return Owned<LocalResourceProvider>(new StorageLocalResourceProvider(
url, workDir, info, slaveId, authToken, strict));
}
Option<Error> StorageLocalResourceProvider::validate(
const ResourceProviderInfo& info)
{
if (info.has_id()) {
return Error("'ResourceProviderInfo.id' must not be set");
}
// Verify that the name follows Java package naming convention.
// TODO(chhsiao): We should move this check to a validation function
// for `ResourceProviderInfo`.
if (!isValidName(info.name())) {
return Error(
"Resource provider name '" + info.name() +
"' does not follow Java package naming convention");
}
if (!info.has_storage()) {
return Error("'ResourceProviderInfo.storage' must be set");
}
// Verify that the type and name of the CSI plugin follow Java package
// naming convention.
// TODO(chhsiao): We should move this check to a validation function
// for `CSIPluginInfo`.
if (!isValidType(info.storage().plugin().type()) ||
!isValidName(info.storage().plugin().name())) {
return Error(
"CSI plugin type '" + info.storage().plugin().type() +
"' and name '" + info.storage().plugin().name() +
"' does not follow Java package naming convention");
}
// Verify that the plugin provides the CSI node service.
// TODO(chhsiao): We should move this check to a validation function
// for `CSIPluginInfo`.
bool hasNodeService = false;
foreach (const CSIPluginContainerInfo& container,
info.storage().plugin().containers()) {
if (container.services().end() != find(
container.services().begin(),
container.services().end(),
csi::NODE_SERVICE)) {
hasNodeService = true;
break;
}
}
if (!hasNodeService) {
return Error(stringify(csi::NODE_SERVICE) + " not found");
}
return None();
}
StorageLocalResourceProvider::StorageLocalResourceProvider(
const http::URL& url,
const string& workDir,
const ResourceProviderInfo& info,
const SlaveID& slaveId,
const Option<string>& authToken,
bool strict)
: process(new StorageLocalResourceProviderProcess(
url, workDir, info, slaveId, authToken, strict))
{
spawn(CHECK_NOTNULL(process.get()));
}
StorageLocalResourceProvider::~StorageLocalResourceProvider()
{
process::terminate(process.get());
process::wait(process.get());
}
} // namespace internal {
} // namespace mesos {