// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__

#include <functional>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>

#include <mesos/http.hpp>
#include <mesos/mesos.hpp>
#include <mesos/resources.hpp>

#include <mesos/resource_provider/resource_provider.hpp>

#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>

#include <mesos/v1/resource_provider.hpp>

#include <process/future.hpp>
#include <process/grpc.hpp>
#include <process/http.hpp>
#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/sequence.hpp>

#include <process/metrics/counter.hpp>
#include <process/metrics/push_gauge.hpp>

#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/hashset.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
#include <stout/uuid.hpp>

#include "csi/client.hpp"
#include "csi/rpc.hpp"
#include "csi/state.hpp"
#include "csi/utils.hpp"

#include "slave/container_daemon.hpp"

#include "status_update_manager/operation.hpp"

namespace mesos {
namespace internal {

// Storage local resource provider initially picks a random amount of time
// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
// retries are exponentially backed off based on this interval (e.g., 2nd retry
// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
//
// TODO(chhsiao): Make the retry parameters configurable.
constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);


class StorageLocalResourceProviderProcess
  : public process::Process<StorageLocalResourceProviderProcess>
{
public:
  explicit StorageLocalResourceProviderProcess(
      const process::http::URL& _url,
      const std::string& _workDir,
      const ResourceProviderInfo& _info,
      const SlaveID& _slaveId,
      const Option<std::string>& _authToken,
      bool _strict);

  StorageLocalResourceProviderProcess(
      const StorageLocalResourceProviderProcess& other) = delete;

  StorageLocalResourceProviderProcess& operator=(
      const StorageLocalResourceProviderProcess& other) = delete;

  void connected();
  void disconnected();
  void received(const resource_provider::Event& event);

  // Wrapper functions to make CSI calls and update RPC metrics. Made public for
  // testing purpose.
  //
  // The call is made asynchronously and thus no guarantee is provided on the
  // order in which calls are sent. Callers need to either ensure to not have
  // multiple conflicting calls in flight, or treat results idempotently.
  //
  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
  // calls on the same volume, and 2) no profile update while there are ongoing
  // `CREATE_DISK` or `DESTROY_DISK` operations.
  //
  // NOTE: Since this function uses `getService` to obtain the latest service
  // future, which depends on probe results, it is disabled for making probe
  // calls; `_call` should be used directly instead.
  template <
      csi::v0::RPC rpc,
      typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
  process::Future<csi::v0::Response<rpc>> call(
      const ContainerID& containerId,
      const csi::v0::Request<rpc>& request,
      const bool retry = false); // remains const in a mutable lambda.

  template <csi::v0::RPC rpc>
  process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>>
  _call(csi::v0::Client client, const csi::v0::Request<rpc>& request);

  template <csi::v0::RPC rpc>
  process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call(
      const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result,
      const Option<Duration>& backoff);

private:
  struct VolumeData
  {
    VolumeData(csi::state::VolumeState&& _state)
      : state(_state), sequence(new process::Sequence("volume-sequence")) {}

    csi::state::VolumeState state;

    // We run all CSI operations for the same volume on a sequence to
    // ensure that they are processed in a sequential order.
    process::Owned<process::Sequence> sequence;
  };

  void initialize() override;
  void fatal();

  // The recover functions are responsible to recover the state of the
  // resource provider and CSI volumes from checkpointed data.
  process::Future<Nothing> recover();
  process::Future<Nothing> recoverServices();
  process::Future<Nothing> recoverVolumes();
  process::Future<Nothing> recoverResourceProviderState();

  void doReliableRegistration();

  // The reconcile functions are responsible to reconcile the state of
  // the resource provider from the recovered state and other sources of
  // truth, such as CSI plugin responses or the status update manager.
  process::Future<Nothing> reconcileResourceProviderState();
  process::Future<Nothing> reconcileOperationStatuses();
  ResourceConversion reconcileResources(
      const Resources& checkpointed,
      const Resources& discovered);

  // Spawns a loop to watch for changes in the set of known profiles and update
  // the profile mapping and storage pools accordingly.
  void watchProfiles();

  // Update the profile mapping when the set of known profiles changes.
  // NOTE: This function never fails. If it fails to translate a new
  // profile, the resource provider will continue to operate with the
  // set of profiles it knows about.
  process::Future<Nothing> updateProfiles(const hashset<std::string>& profiles);

  // Reconcile the storage pools when the set of known profiles changes,
  // or a volume with an unknown profile is destroyed.
  process::Future<Nothing> reconcileStoragePools();

  // Returns true if the storage pools are allowed to be reconciled when
  // the operation is being applied.
  static bool allowsReconciliation(const Offer::Operation& operation);

  // Functions for received events.
  void subscribed(const resource_provider::Event::Subscribed& subscribed);
  void applyOperation(
      const resource_provider::Event::ApplyOperation& operation);
  void publishResources(
      const resource_provider::Event::PublishResources& publish);
  void acknowledgeOperationStatus(
      const resource_provider::Event::AcknowledgeOperationStatus& acknowledge);
  void reconcileOperations(
      const resource_provider::Event::ReconcileOperations& reconcile);

  // Returns a future of a CSI client that waits for the endpoint socket to
  // appear if necessary, then connects to the socket and check its readiness.
  process::Future<csi::v0::Client> waitService(const std::string& endpoint);

  // Returns a future of the latest CSI client for the specified plugin
  // container. If the container is not already running, this method will start
  // a new a new container daemon.
  process::Future<csi::v0::Client> getService(const ContainerID& containerId);

  // Lists all running plugin containers for this resource provider.
  // NOTE: This might return containers that are not actually running, e.g., if
  // they are being destroyed.
  process::Future<hashmap<ContainerID, Option<ContainerStatus>>>
  getContainers();

  // Waits for the specified plugin container to be terminated.
  process::Future<Nothing> waitContainer(const ContainerID& containerId);

  // Kills the specified plugin container.
  process::Future<Nothing> killContainer(const ContainerID& containerId);

  process::Future<Nothing> prepareIdentityService();

  // NOTE: This can only be called after `prepareIdentityService`.
  process::Future<Nothing> prepareControllerService();

  // NOTE: This can only be called after `prepareIdentityService` and
  // `prepareControllerService`.
  process::Future<Nothing> prepareNodeService();

  // Transitions the state of the specified volume from `CREATED` or
  // `CONTROLLER_PUBLISH` to `NODE_READY`.
  //
  // NOTE: This can only be called after `prepareControllerService` and
  // `prepareNodeService`.
  process::Future<Nothing> controllerPublish(const std::string& volumeId);

  // Transitions the state of the specified volume from `NODE_READY`,
  // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
  //
  // NOTE: This can only be called after `prepareControllerService` and
  // `prepareNodeService`.
  process::Future<Nothing> controllerUnpublish(const std::string& volumeId);

  // Transitions the state of the specified volume from `NODE_READY` or
  // `NODE_STAGE` to `VOL_READY`.
  //
  // NOTE: This can only be called after `prepareNodeService`.
  process::Future<Nothing> nodeStage(const std::string& volumeId);

  // Transitions the state of the specified volume from `VOL_READY`,
  // `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`.
  //
  // NOTE: This can only be called after `prepareNodeService`.
  process::Future<Nothing> nodeUnstage(const std::string& volumeId);

  // Transitions the state of the specified volume from `VOL_READY` or
  // `NODE_PUBLISH` to `PUBLISHED`.
  //
  // NOTE: This can only be called after `prepareNodeService`.
  process::Future<Nothing> nodePublish(const std::string& volumeId);

  // Transitions the state of the specified volume from `PUBLISHED`,
  // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
  //
  // NOTE: This can only be called after `prepareNodeService`.
  process::Future<Nothing> nodeUnpublish(const std::string& volumeId);

  // Returns a CSI volume ID.
  //
  // NOTE: This can only be called after `prepareControllerService`.
  process::Future<std::string> createVolume(
      const std::string& name,
      const Bytes& capacity,
      const DiskProfileAdaptor::ProfileInfo& profileInfo);

  // Returns true if the volume has been deprovisioned.
  //
  // NOTE: This can only be called after `prepareControllerService` and
  // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
  process::Future<bool> deleteVolume(const std::string& volumeId);

  // Validates if a volume supports the capability of the specified profile.
  //
  // NOTE: This can only be called after `prepareIdentityService`.
  process::Future<Nothing> validateVolume(
      const std::string& volumeId,
      const Option<Labels>& metadata,
      const DiskProfileAdaptor::ProfileInfo& profileInfo);

  // NOTE: This can only be called after `prepareControllerService` and the
  // resource provider ID has been obtained.
  process::Future<Resources> listVolumes();

  // NOTE: This can only be called after `prepareControllerService` and the
  // resource provider ID has been obtained.
  process::Future<Resources> getCapacities();

  // Applies the operation. Speculative operations will be synchronously
  // applied. Do nothing if the operation is already in a terminal state.
  process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);

  // Sends `OPERATION_DROPPED` status update. The operation status will be
  // checkpointed if `operation` is set.
  void dropOperation(
      const id::UUID& operationUuid,
      const Option<FrameworkID>& frameworkId,
      const Option<Offer::Operation>& operation,
      const std::string& message);

  process::Future<std::vector<ResourceConversion>> applyCreateDisk(
      const Resource& resource,
      const id::UUID& operationUuid,
      const Resource::DiskInfo::Source::Type& targetType,
      const Option<std::string>& targetProfile);

  process::Future<std::vector<ResourceConversion>> applyDestroyDisk(
      const Resource& resource);

  // Synchronously creates persistent volumes.
  Try<std::vector<ResourceConversion>> applyCreate(
      const Offer::Operation& operation) const;

  // Synchronously cleans up and destroys persistent volumes.
  Try<std::vector<ResourceConversion>> applyDestroy(
      const Offer::Operation& operation) const;

  // Synchronously updates `totalResources` and the operation status and
  // then asks the status update manager to send status updates.
  Try<Nothing> updateOperationStatus(
      const id::UUID& operationUuid,
      const Try<std::vector<ResourceConversion>>& conversions);

  void garbageCollectOperationPath(const id::UUID& operationUuid);
  void garbageCollectMountPath(const std::string& volumeId);

  void checkpointResourceProviderState();
  void checkpointVolumeState(const std::string& volumeId);

  void sendResourceProviderStateUpdate();

  void sendOperationStatusUpdate(
      const UpdateOperationStatusMessage& update);

  enum State
  {
    RECOVERING,
    DISCONNECTED,
    CONNECTED,
    SUBSCRIBED,
    READY
  } state;

  const process::http::URL url;
  const std::string workDir;
  const std::string metaDir;
  const ContentType contentType;
  ResourceProviderInfo info;
  const std::string vendor;
  const SlaveID slaveId;
  const Option<std::string> authToken;
  const bool strict;

  std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;

  std::string bootId;
  process::grpc::client::Runtime runtime;
  process::Owned<v1::resource_provider::Driver> driver;
  OperationStatusUpdateManager statusUpdateManager;

  // The mapping of known profiles fetched from the DiskProfileAdaptor.
  hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;

  hashmap<ContainerID, process::Owned<slave::ContainerDaemon>> daemons;
  hashmap<ContainerID, process::Owned<process::Promise<csi::v0::Client>>>
    services;

  Option<ContainerID> nodeContainerId;
  Option<ContainerID> controllerContainerId;
  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
  csi::v0::PluginCapabilities pluginCapabilities;
  csi::v0::ControllerCapabilities controllerCapabilities;
  csi::v0::NodeCapabilities nodeCapabilities;
  Option<std::string> nodeId;

  // We maintain the following invariant: if one operation depends on
  // another, they cannot be in PENDING state at the same time, i.e.,
  // the result of the preceding operation must have been reflected in
  // the total resources.
  //
  // NOTE: We store the list of operations in a `LinkedHashMap` to
  // preserve the order we receive the operations in case we need it.
  LinkedHashMap<id::UUID, Operation> operations;
  Resources totalResources;
  id::UUID resourceVersion;
  hashmap<std::string, VolumeData> volumes;

  // If pending, it means that the storage pools are being reconciled, and all
  // incoming operations that disallow reconciliation will be dropped.
  process::Future<Nothing> reconciled;

  // We maintain a sequence to coordinate reconciliations of storage pools. It
  // keeps track of pending operations that disallow reconciliation, and ensures
  // that any reconciliation waits for these operations to finish.
  process::Sequence sequence;

  struct Metrics
  {
    explicit Metrics(const std::string& prefix);
    ~Metrics();

    // CSI plugin metrics.
    process::metrics::Counter csi_plugin_container_terminations;
    hashmap<csi::v0::RPC, process::metrics::PushGauge> csi_plugin_rpcs_pending;
    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_successes;
    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_errors;
    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_cancelled;

    // Operation state metrics.
    hashmap<Offer::Operation::Type, process::metrics::PushGauge>
      operations_pending;
    hashmap<Offer::Operation::Type, process::metrics::Counter>
      operations_finished;
    hashmap<Offer::Operation::Type, process::metrics::Counter>
      operations_failed;
    hashmap<Offer::Operation::Type, process::metrics::Counter>
      operations_dropped;
  } metrics;
};

} // namespace internal {
} // namespace mesos {

#endif // __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
