// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <iostream>
#include <queue>
#include <string>

#include <mesos/v1/mesos.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/scheduler.hpp>

#include <process/delay.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>

#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/uuid.hpp>

#include "common/status_utils.hpp"

#include "logging/flags.hpp"
#include "logging/logging.hpp"

using namespace mesos::v1;

using std::cerr;
using std::cout;
using std::endl;
using std::queue;
using std::string;
using std::vector;

using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;

constexpr Seconds REFUSE_TIME = Seconds::max();

constexpr char FRAMEWORK_NAME[] =
  "CSI User Event Call Scheduler using libprocess (C++)";


/**
 * Example framework performing CSI-related operations.
 *
 * This class implements a framework communicating with Mesos over the v1 HTTP
 * API which transform certain disk resources using CSI operations.
 *
 * It expects to be offered disk resources from resource providers reserved for
 * its role (which likely should not be shared by any other framework in the
 * cluster).  It will then convert `RAW` disk resources into `MOUNT` volumes and
 * unreserve these `MOUNT` resources when they are offered again so that
 * frameworks in other roles can consume the created disk resource.
 *
 * The bulk of the framework logic is implemented in `resourceOffers`.
 */
class HTTPScheduler : public process::Process<HTTPScheduler>
{
public:
  HTTPScheduler(
      const FrameworkInfo& _framework,
      const string& _master)
    : framework(_framework),
      master(_master),
      state(INITIALIZING) {}

  HTTPScheduler(
      const FrameworkInfo& _framework,
      const string& _master,
      const Credential& credential)
    : framework(_framework),
      master(_master),
      state(INITIALIZING) {}

  ~HTTPScheduler() {}

  void connected()
  {
    doReliableRegistration();
  }

  void disconnected()
  {
    state = DISCONNECTED;
  }

  void received(queue<Event> events)
  {
    while (!events.empty()) {
      Event event = events.front();
      events.pop();

      switch (event.type()) {
        case Event::SUBSCRIBED: {
          cout << endl << "Received a SUBSCRIBED event" << endl;

          framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
          state = SUBSCRIBED;

          cout << "Subscribed with ID " << framework.id() << endl;
          break;
        }

        case Event::OFFERS: {
          cout << endl << "Received an OFFERS event" << endl;
          resourceOffers(google::protobuf::convert(event.offers().offers()));
          break;
        }

        case Event::INVERSE_OFFERS: {
          cout << endl << "Received an INVERSE_OFFERS event" << endl;
          break;
        }

        case Event::RESCIND: {
          cout << endl << "Received a RESCIND event" << endl;
          break;
        }

        case Event::RESCIND_INVERSE_OFFER: {
          cout << endl << "Received a RESCIND_INVERSE_OFFER event" << endl;
          break;
        }

        case Event::UPDATE: {
          cout << endl << "Received an UPDATE event" << endl;
          cout << "Unexpected UPDATE event: " << event.update().DebugString()
               << endl;
          process::terminate(self());
          break;
        }

        // TODO(greggomann): Implement handling of offer operation updates.
        case Event::UPDATE_OPERATION_STATUS:
          break;

        case Event::MESSAGE: {
          cout << endl << "Received a MESSAGE event" << endl;
          break;
        }

        case Event::FAILURE: {
          cout << endl << "Received a FAILURE event" << endl;

          if (event.failure().has_agent_id()) {
            // Agent failed.
            cout << "Agent '" << event.failure().agent_id().value()
                 << "' terminated" << endl;
          } else {
            cout << "Unexpected FAILURE event: "
                 << event.failure().DebugString() << endl;
            process::terminate(self());
          }
          break;
        }

        case Event::ERROR: {
          cout << endl << "Received an ERROR event: "
               << event.error().message() << endl;
          process::terminate(self());
          break;
        }

        case Event::HEARTBEAT: {
          cout << endl << "Received a HEARTBEAT event" << endl;
          break;
        }

        case Event::UNKNOWN: {
          LOG(WARNING) << "Received an UNKNOWN event and ignored";
          break;
        }
      }
    }
  }

protected:
  virtual void initialize()
  {
    // We initialize the library here to ensure that callbacks are only invoked
    // after the process has spawned.
    mesos.reset(
        new scheduler::Mesos(
            master,
            mesos::ContentType::PROTOBUF,
            process::defer(self(), &Self::connected),
            process::defer(self(), &Self::disconnected),
            process::defer(self(), &Self::received, lambda::_1),
            None()));
  }

private:
  void resourceOffers(const vector<Offer>& offers)
  {
    foreach (const Offer& offer, offers) {
      cout << "Received offer " << offer.id() << " with "
           << Resources(offer.resources()) << endl;

      // Filter resources and group interesting resources by resource provider.
      //
      // NOTE: We introduce a typedef here so we can use this type
      // with the `foreachvalue` preprocessor macro below.
      using ResourcesByType =
        hashmap<Resource::DiskInfo::Source::Type, Resources>;

      hashmap<ResourceProviderID, ResourcesByType> resourcesByProvider;

      constexpr Resource::DiskInfo::Source::Type RAW =
        Resource::DiskInfo::Source::RAW;

      constexpr Resource::DiskInfo::Source::Type MOUNT =
        Resource::DiskInfo::Source::MOUNT;

      foreach(const Resource& resource, offer.resources()) {
        // Ignore any resources not from a resource provider.
        if (!resource.has_provider_id()) {
          continue;
        }

        // We only convert reserved resources.
        if (!Resources::isReserved(resource)) {
          continue;
        }

        // We either work on `RAW` or `MOUNT` disk resources. Store
        // them by type and ignore any other resources.
        if (Resources::isDisk(resource, RAW)) {
          resourcesByProvider[resource.provider_id()][RAW] += resource;
        } else if (Resources::isDisk(resource, MOUNT)) {
          resourcesByProvider[resource.provider_id()][MOUNT] += resource;
        }
      }

      // Create operations.

      // Create a single call which is either an accept and contains
      // at least one operation, or a decline.
      Call call;
      CHECK(framework.has_id());
      call.mutable_framework_id()->CopyFrom(framework.id());

      // We create operations on resources from each resource provider
      // separately as most operations currently cannot operate on multiple
      // resource providers at once (`LAUNCH` being the obvious exception).
      foreachvalue (
          const ResourcesByType& resourcesByType,
          resourcesByProvider) {
        // Iterate over disk resources grouped by disk type since the
        // performed operation depends on the type.
        foreachpair (
            const Resource::DiskInfo::Source::Type& type,
            const Resources& resources,
            resourcesByType) {
          call.set_type(Call::ACCEPT);
          Call::Accept* accept = call.mutable_accept();
          if (accept->offer_ids().empty()) {
            accept->add_offer_ids()->CopyFrom(offer.id());
          }

          if (type == RAW) {
            // We create `MOUNT` volumes out of `RAW` disk resources.
            foreach (const Resource& resource, resources) {
              cout << "Converting 'RAW' disk to 'MOUNT' disk" << endl;

              Offer::Operation* operation = accept->add_operations();
              operation->set_type(Offer::Operation::CREATE_VOLUME);

              Offer::Operation::CreateVolume* create_volume =
                operation->mutable_create_volume();

              create_volume->mutable_source()->CopyFrom(resource);
              create_volume->set_target_type(
                  Resource::DiskInfo::Source::MOUNT);
            }
          } else if (type == MOUNT) {
            // We unreserve `MOUNT` disk resources so they can be
            // consumed by frameworks in other roles.
            foreach (const Resource& resource, resources) {
              cout << "Unreserving 'MOUNT' disk" << endl;

              Offer::Operation* operation = accept->add_operations();
              operation->set_type(Offer::Operation::UNRESERVE);

              Offer::Operation::Unreserve* unreserve =
                operation->mutable_unreserve();

              unreserve->add_resources()->CopyFrom(resource);
            }
          }
        }
      }

      // If we did not create operations to accept the offer with decline it.
      if (!call.has_accept() || call.accept().operations().empty()) {
        cout << "Declining offer" << endl;

        call.clear_accept();
        call.set_type(Call::DECLINE);
        Call::Decline* decline = call.mutable_decline();
        decline->add_offer_ids()->CopyFrom(offer.id());
        decline->mutable_filters()->set_refuse_seconds(REFUSE_TIME.secs());
      }

      mesos->send(call);
    }
  }

  void doReliableRegistration()
  {
    if (state == SUBSCRIBED) {
      return;
    }

    Call call;
    if (framework.has_id()) {
      call.mutable_framework_id()->CopyFrom(framework.id());
    }
    call.set_type(Call::SUBSCRIBE);

    Call::Subscribe* subscribe = call.mutable_subscribe();
    subscribe->mutable_framework_info()->CopyFrom(framework);

    mesos->send(call);

    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
  }

  FrameworkInfo framework;
  string master;
  process::Owned<scheduler::Mesos> mesos;

  enum State
  {
    INITIALIZING = 0,
    SUBSCRIBED = 1,
    DISCONNECTED = 2
  } state;
};


class Flags : public virtual mesos::internal::logging::Flags
{
public:
  Flags()
  {
    add(&Flags::role, "role", "Role to use when registering", "*");

    add(&Flags::master, "master", "ip:port of master to connect");

    add(&Flags::checkpoint,
        "checkpoint",
        "Whether this framework should be checkpointed.",
        false);

    add(&Flags::principal,
        "principal",
        "Authentication principal of the framework");
  }

  string role;
  string master;
  string principal;
  bool checkpoint;
};


int main(int argc, char** argv)
{
  Flags flags;

  Try<flags::Warnings> load = flags.load(None(), argc, argv);

  if (flags.help) {
    cout << flags.usage() << endl;
    return EXIT_SUCCESS;
  }

  if (load.isError()) {
    cerr << flags.usage(load.error()) << endl;
    return EXIT_FAILURE;
  }

  mesos::internal::logging::initialize(argv[0], true, flags); // Catch signals.

  // Log any flag warnings.
  foreach (const flags::Warning& warning, load->warnings) {
    LOG(WARNING) << warning.message;
  }

  FrameworkInfo framework;
  framework.set_name(FRAMEWORK_NAME);
  framework.add_roles(flags.role);
  framework.add_capabilities()->set_type(
      FrameworkInfo::Capability::MULTI_ROLE);
  framework.add_capabilities()->set_type(
      FrameworkInfo::Capability::RESERVATION_REFINEMENT);

  const Result<string> user = os::user();

  CHECK_SOME(user);
  framework.set_user(user.get());
  framework.set_checkpoint(flags.checkpoint);
  framework.set_principal(flags.principal);

  process::Owned<HTTPScheduler> scheduler(
      new HTTPScheduler(framework, flags.master));

  process::spawn(scheduler.get());
  process::wait(scheduler.get());

  return EXIT_SUCCESS;
}
