blob: a5536b3d735e01eb1c4dc52d0602d973155f3c93 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
#include <functional>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>
#include <mesos/http.hpp>
#include <mesos/mesos.hpp>
#include <mesos/resources.hpp>
#include <mesos/resource_provider/resource_provider.hpp>
#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
#include <mesos/v1/resource_provider.hpp>
#include <process/future.hpp>
#include <process/grpc.hpp>
#include <process/http.hpp>
#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/sequence.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/push_gauge.hpp>
#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/hashset.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
#include <stout/uuid.hpp>
#include "csi/client.hpp"
#include "csi/rpc.hpp"
#include "csi/state.hpp"
#include "csi/utils.hpp"
#include "slave/container_daemon.hpp"
#include "status_update_manager/operation.hpp"
namespace mesos {
namespace internal {
// Storage local resource provider initially picks a random amount of time
// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
// retries are exponentially backed off based on this interval (e.g., 2nd retry
// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
//
// TODO(chhsiao): Make the retry parameters configurable.
constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
class StorageLocalResourceProviderProcess
: public process::Process<StorageLocalResourceProviderProcess>
{
public:
explicit StorageLocalResourceProviderProcess(
const process::http::URL& _url,
const std::string& _workDir,
const ResourceProviderInfo& _info,
const SlaveID& _slaveId,
const Option<std::string>& _authToken,
bool _strict);
StorageLocalResourceProviderProcess(
const StorageLocalResourceProviderProcess& other) = delete;
StorageLocalResourceProviderProcess& operator=(
const StorageLocalResourceProviderProcess& other) = delete;
void connected();
void disconnected();
void received(const resource_provider::Event& event);
// Wrapper functions to make CSI calls and update RPC metrics. Made public for
// testing purpose.
//
// The call is made asynchronously and thus no guarantee is provided on the
// order in which calls are sent. Callers need to either ensure to not have
// multiple conflicting calls in flight, or treat results idempotently.
//
// NOTE: We currently ensure this by 1) resource locking to forbid concurrent
// calls on the same volume, and 2) no profile update while there are ongoing
// `CREATE_DISK` or `DESTROY_DISK` operations.
//
// NOTE: Since this function uses `getService` to obtain the latest service
// future, which depends on probe results, it is disabled for making probe
// calls; `_call` should be used directly instead.
template <
csi::v0::RPC rpc,
typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
process::Future<csi::v0::Response<rpc>> call(
const ContainerID& containerId,
const csi::v0::Request<rpc>& request,
const bool retry = false); // remains const in a mutable lambda.
template <csi::v0::RPC rpc>
process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>>
_call(csi::v0::Client client, const csi::v0::Request<rpc>& request);
template <csi::v0::RPC rpc>
process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call(
const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result,
const Option<Duration>& backoff);
private:
struct VolumeData
{
VolumeData(csi::state::VolumeState&& _state)
: state(_state), sequence(new process::Sequence("volume-sequence")) {}
csi::state::VolumeState state;
// We run all CSI operations for the same volume on a sequence to
// ensure that they are processed in a sequential order.
process::Owned<process::Sequence> sequence;
};
void initialize() override;
void fatal();
// The recover functions are responsible to recover the state of the
// resource provider and CSI volumes from checkpointed data.
process::Future<Nothing> recover();
process::Future<Nothing> recoverServices();
process::Future<Nothing> recoverVolumes();
process::Future<Nothing> recoverResourceProviderState();
void doReliableRegistration();
// The reconcile functions are responsible to reconcile the state of
// the resource provider from the recovered state and other sources of
// truth, such as CSI plugin responses or the status update manager.
process::Future<Nothing> reconcileResourceProviderState();
process::Future<Nothing> reconcileOperationStatuses();
ResourceConversion reconcileResources(
const Resources& checkpointed,
const Resources& discovered);
// Spawns a loop to watch for changes in the set of known profiles and update
// the profile mapping and storage pools accordingly.
void watchProfiles();
// Update the profile mapping when the set of known profiles changes.
// NOTE: This function never fails. If it fails to translate a new
// profile, the resource provider will continue to operate with the
// set of profiles it knows about.
process::Future<Nothing> updateProfiles(const hashset<std::string>& profiles);
// Reconcile the storage pools when the set of known profiles changes,
// or a volume with an unknown profile is destroyed.
process::Future<Nothing> reconcileStoragePools();
// Returns true if the storage pools are allowed to be reconciled when
// the operation is being applied.
static bool allowsReconciliation(const Offer::Operation& operation);
// Functions for received events.
void subscribed(const resource_provider::Event::Subscribed& subscribed);
void applyOperation(
const resource_provider::Event::ApplyOperation& operation);
void publishResources(
const resource_provider::Event::PublishResources& publish);
void acknowledgeOperationStatus(
const resource_provider::Event::AcknowledgeOperationStatus& acknowledge);
void reconcileOperations(
const resource_provider::Event::ReconcileOperations& reconcile);
// Returns a future of a CSI client that waits for the endpoint socket to
// appear if necessary, then connects to the socket and check its readiness.
process::Future<csi::v0::Client> waitService(const std::string& endpoint);
// Returns a future of the latest CSI client for the specified plugin
// container. If the container is not already running, this method will start
// a new a new container daemon.
process::Future<csi::v0::Client> getService(const ContainerID& containerId);
// Lists all running plugin containers for this resource provider.
// NOTE: This might return containers that are not actually running, e.g., if
// they are being destroyed.
process::Future<hashmap<ContainerID, Option<ContainerStatus>>>
getContainers();
// Waits for the specified plugin container to be terminated.
process::Future<Nothing> waitContainer(const ContainerID& containerId);
// Kills the specified plugin container.
process::Future<Nothing> killContainer(const ContainerID& containerId);
process::Future<Nothing> prepareIdentityService();
// NOTE: This can only be called after `prepareIdentityService`.
process::Future<Nothing> prepareControllerService();
// NOTE: This can only be called after `prepareIdentityService` and
// `prepareControllerService`.
process::Future<Nothing> prepareNodeService();
// Transitions the state of the specified volume from `CREATED` or
// `CONTROLLER_PUBLISH` to `NODE_READY`.
//
// NOTE: This can only be called after `prepareControllerService` and
// `prepareNodeService`.
process::Future<Nothing> controllerPublish(const std::string& volumeId);
// Transitions the state of the specified volume from `NODE_READY`,
// `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
//
// NOTE: This can only be called after `prepareControllerService` and
// `prepareNodeService`.
process::Future<Nothing> controllerUnpublish(const std::string& volumeId);
// Transitions the state of the specified volume from `NODE_READY` or
// `NODE_STAGE` to `VOL_READY`.
//
// NOTE: This can only be called after `prepareNodeService`.
process::Future<Nothing> nodeStage(const std::string& volumeId);
// Transitions the state of the specified volume from `VOL_READY`,
// `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`.
//
// NOTE: This can only be called after `prepareNodeService`.
process::Future<Nothing> nodeUnstage(const std::string& volumeId);
// Transitions the state of the specified volume from `VOL_READY` or
// `NODE_PUBLISH` to `PUBLISHED`.
//
// NOTE: This can only be called after `prepareNodeService`.
process::Future<Nothing> nodePublish(const std::string& volumeId);
// Transitions the state of the specified volume from `PUBLISHED`,
// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
//
// NOTE: This can only be called after `prepareNodeService`.
process::Future<Nothing> nodeUnpublish(const std::string& volumeId);
// Returns a CSI volume ID.
//
// NOTE: This can only be called after `prepareControllerService`.
process::Future<std::string> createVolume(
const std::string& name,
const Bytes& capacity,
const DiskProfileAdaptor::ProfileInfo& profileInfo);
// Returns true if the volume has been deprovisioned.
//
// NOTE: This can only be called after `prepareControllerService` and
// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
process::Future<bool> deleteVolume(const std::string& volumeId);
// Validates if a volume supports the capability of the specified profile.
//
// NOTE: This can only be called after `prepareIdentityService`.
process::Future<Nothing> validateVolume(
const std::string& volumeId,
const Option<Labels>& metadata,
const DiskProfileAdaptor::ProfileInfo& profileInfo);
// NOTE: This can only be called after `prepareControllerService` and the
// resource provider ID has been obtained.
process::Future<Resources> listVolumes();
// NOTE: This can only be called after `prepareControllerService` and the
// resource provider ID has been obtained.
process::Future<Resources> getCapacities();
// Applies the operation. Speculative operations will be synchronously
// applied. Do nothing if the operation is already in a terminal state.
process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
// Sends `OPERATION_DROPPED` status update. The operation status will be
// checkpointed if `operation` is set.
void dropOperation(
const id::UUID& operationUuid,
const Option<FrameworkID>& frameworkId,
const Option<Offer::Operation>& operation,
const std::string& message);
process::Future<std::vector<ResourceConversion>> applyCreateDisk(
const Resource& resource,
const id::UUID& operationUuid,
const Resource::DiskInfo::Source::Type& targetType,
const Option<std::string>& targetProfile);
process::Future<std::vector<ResourceConversion>> applyDestroyDisk(
const Resource& resource);
// Synchronously creates persistent volumes.
Try<std::vector<ResourceConversion>> applyCreate(
const Offer::Operation& operation) const;
// Synchronously cleans up and destroys persistent volumes.
Try<std::vector<ResourceConversion>> applyDestroy(
const Offer::Operation& operation) const;
// Synchronously updates `totalResources` and the operation status and
// then asks the status update manager to send status updates.
Try<Nothing> updateOperationStatus(
const id::UUID& operationUuid,
const Try<std::vector<ResourceConversion>>& conversions);
void garbageCollectOperationPath(const id::UUID& operationUuid);
void garbageCollectMountPath(const std::string& volumeId);
void checkpointResourceProviderState();
void checkpointVolumeState(const std::string& volumeId);
void sendResourceProviderStateUpdate();
void sendOperationStatusUpdate(
const UpdateOperationStatusMessage& update);
enum State
{
RECOVERING,
DISCONNECTED,
CONNECTED,
SUBSCRIBED,
READY
} state;
const process::http::URL url;
const std::string workDir;
const std::string metaDir;
const ContentType contentType;
ResourceProviderInfo info;
const std::string vendor;
const SlaveID slaveId;
const Option<std::string> authToken;
const bool strict;
std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
std::string bootId;
process::grpc::client::Runtime runtime;
process::Owned<v1::resource_provider::Driver> driver;
OperationStatusUpdateManager statusUpdateManager;
// The mapping of known profiles fetched from the DiskProfileAdaptor.
hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
hashmap<ContainerID, process::Owned<slave::ContainerDaemon>> daemons;
hashmap<ContainerID, process::Owned<process::Promise<csi::v0::Client>>>
services;
Option<ContainerID> nodeContainerId;
Option<ContainerID> controllerContainerId;
Option<csi::v0::GetPluginInfoResponse> pluginInfo;
csi::v0::PluginCapabilities pluginCapabilities;
csi::v0::ControllerCapabilities controllerCapabilities;
csi::v0::NodeCapabilities nodeCapabilities;
Option<std::string> nodeId;
// We maintain the following invariant: if one operation depends on
// another, they cannot be in PENDING state at the same time, i.e.,
// the result of the preceding operation must have been reflected in
// the total resources.
//
// NOTE: We store the list of operations in a `LinkedHashMap` to
// preserve the order we receive the operations in case we need it.
LinkedHashMap<id::UUID, Operation> operations;
Resources totalResources;
id::UUID resourceVersion;
hashmap<std::string, VolumeData> volumes;
// If pending, it means that the storage pools are being reconciled, and all
// incoming operations that disallow reconciliation will be dropped.
process::Future<Nothing> reconciled;
// We maintain a sequence to coordinate reconciliations of storage pools. It
// keeps track of pending operations that disallow reconciliation, and ensures
// that any reconciliation waits for these operations to finish.
process::Sequence sequence;
struct Metrics
{
explicit Metrics(const std::string& prefix);
~Metrics();
// CSI plugin metrics.
process::metrics::Counter csi_plugin_container_terminations;
hashmap<csi::v0::RPC, process::metrics::PushGauge> csi_plugin_rpcs_pending;
hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_successes;
hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_errors;
hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_cancelled;
// Operation state metrics.
hashmap<Offer::Operation::Type, process::metrics::PushGauge>
operations_pending;
hashmap<Offer::Operation::Type, process::metrics::Counter>
operations_finished;
hashmap<Offer::Operation::Type, process::metrics::Counter>
operations_failed;
hashmap<Offer::Operation::Type, process::metrics::Counter>
operations_dropped;
} metrics;
};
} // namespace internal {
} // namespace mesos {
#endif // __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__