// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <dlfcn.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <arpa/inet.h>

#include <iostream>
#include <memory>
#include <queue>
#include <string>
#include <sstream>
#include <tuple>

#include <mesos/v1/mesos.hpp>
#include <mesos/v1/scheduler.hpp>

#include <mesos/master/detector.hpp>

#include <process/async.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/mutex.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>

#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/flags.hpp>
#include <stout/hashmap.hpp>
#include <stout/ip.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/recordio.hpp>
#include <stout/unreachable.hpp>
#include <stout/uuid.hpp>

#include "common/http.hpp"
#include "common/recordio.hpp"

#include "internal/devolve.hpp"
#include "internal/evolve.hpp"

#include "local/local.hpp"

#include "logging/logging.hpp"

#include "master/validation.hpp"

#include "messages/messages.hpp"

#include "scheduler/flags.hpp"

using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::master;

using namespace process;

using std::get;
using std::ostream;
using std::queue;
using std::shared_ptr;
using std::string;
using std::tuple;
using std::vector;

using mesos::internal::recordio::Reader;

using mesos::master::detector::MasterDetector;

using process::collect;
using process::Owned;
using process::wait; // Necessary on some OS's to disambiguate.

using process::http::Connection;
using process::http::Pipe;
using process::http::post;
using process::http::Request;
using process::http::Response;
using process::http::URL;

using ::recordio::Decoder;

namespace mesos {
namespace v1 {
namespace scheduler {

struct Connections
{
  bool operator==(const Connections& that) const
  {
    return subscribe == that.subscribe && nonSubscribe == that.nonSubscribe;
  }

  Connection subscribe; // Used for subscribe call/response.
  Connection nonSubscribe; // Used for all other calls/responses.
};


// The process (below) is responsible for sending/receiving HTTP messages
// to/from the master.
class MesosProcess : public ProtobufProcess<MesosProcess>
{
public:
  MesosProcess(
      const string& master,
      ContentType _contentType,
      const lambda::function<void()>& connected,
      const lambda::function<void()>& disconnected,
      const lambda::function<void(const queue<Event>&)>& received,
      const Option<Credential>& _credential,
      const Option<shared_ptr<MasterDetector>>& _detector,
      const Flags& _flags)
    : ProcessBase(ID::generate("scheduler")),
      state(DISCONNECTED),
      contentType(_contentType),
      callbacks {connected, disconnected, received},
      credential(_credential),
      local(false),
      flags(_flags)
  {
    GOOGLE_PROTOBUF_VERIFY_VERSION;

    // Initialize libprocess (done here since at some point we might
    // want to use flags to initialize libprocess).
    process::initialize();

    if (self().address.ip.isLoopback()) {
      LOG(WARNING) << "\n**************************************************\n"
                   << "Scheduler driver bound to loopback interface!"
                   << " Cannot communicate with remote master(s)."
                   << " You might want to set 'LIBPROCESS_IP' environment"
                   << " variable to use a routable IP address.\n"
                   << "**************************************************";
    }

    // Initialize logging.
    if (flags.initialize_driver_logging) {
      logging::initialize("mesos", flags);
    } else {
      VLOG(1) << "Disabling initialization of GLOG logging";
    }

    LOG(INFO) << "Version: " << MESOS_VERSION;

    // Launch a local cluster if necessary.
    Option<UPID> pid = None();
    if (master == "local") {
      pid = local::launch(flags);
      local = true;
    }

    if (_detector.isNone()) {
      Try<MasterDetector*> create =
        MasterDetector::create(pid.isSome() ? string(pid.get()) : master);

      if (create.isError()) {
        EXIT(EXIT_FAILURE)
          << "Failed to create a master detector: " << create.error();
      }

      detector.reset(create.get());
    } else {
      detector = _detector.get();
    }
  }

  virtual ~MesosProcess()
  {
    disconnect();

    // Check and see if we need to shutdown a local cluster.
    if (local) {
      local::shutdown();
    }

    // Note that we ignore any callbacks that are enqueued.
  }

  void send(const Call& call)
  {
    Option<Error> error = validation::scheduler::call::validate(devolve(call));

    if (error.isSome()) {
      drop(call, error->message);
      return;
    }

    if (call.type() == Call::SUBSCRIBE && state != CONNECTED) {
      // It might be possible that the scheduler is retrying. We drop the
      // request if we have an ongoing subscribe request in flight or if the
      // scheduler is already subscribed.
      drop(call, "Scheduler is in state " + stringify(state));
      return;
    }

    if (call.type() != Call::SUBSCRIBE && state != SUBSCRIBED) {
      // We drop all non-subscribe calls if we are not currently subscribed.
      drop(call, "Scheduler is in state " + stringify(state));
      return;
    }

    VLOG(1) << "Sending " << call.type() << " call to " << master.get();

    // TODO(vinod): Add support for sending MESSAGE calls directly
    // to the slave, instead of relaying it through the master, as
    // the scheduler driver does.

    ::Request request;
    request.method = "POST";
    request.url = master.get();
    request.body = serialize(contentType, call);
    request.keepAlive = true;
    request.headers = {{"Accept", stringify(contentType)},
                       {"Content-Type", stringify(contentType)}};

    // TODO(anand): Add support for other authentication schemes.

    if (credential.isSome()) {
      request.headers["Authorization"] =
        "Basic " +
        base64::encode(credential->principal() + ":" + credential->secret());
    }

    CHECK_SOME(connections);

    Future<Response> response;
    if (call.type() == Call::SUBSCRIBE) {
      state = SUBSCRIBING;

      // Send a streaming request for Subscribe call.
      response = connections->subscribe.send(request, true);
    } else {
      CHECK_SOME(streamId);

      // Set the stream ID associated with this connection.
      request.headers["Mesos-Stream-Id"] = streamId->toString();

      response = connections->nonSubscribe.send(request);
    }

    CHECK_SOME(connectionId);
    response.onAny(defer(self(),
                         &Self::_send,
                         connectionId.get(),
                         call,
                         lambda::_1));
  }

  void reconnect()
  {
    // Ignore the reconnection request if we are currently disconnected
    // from the master.
    if (state == DISCONNECTED) {
      VLOG(1) << "Ignoring reconnect request from scheduler since we are"
              << " disconnected";

      return;
    }

    CHECK_SOME(connectionId);

    disconnected(connectionId.get(),
                 "Received reconnect request from scheduler");
  }

protected:
  virtual void initialize()
  {
    // Start detecting masters.
    detection = detector->detect()
      .onAny(defer(self(), &MesosProcess::detected, lambda::_1));
  }

  void connect(const UUID& _connectionId)
  {
    // It is possible that a new master was detected while we were waiting
    // to establish a connection with the old master.
    if (connectionId != _connectionId) {
      VLOG(1) << "Ignoring connection attempt from stale connection";
      return;
    }

    CHECK_EQ(DISCONNECTED, state);
    CHECK_SOME(master);

    state = CONNECTING;

    auto connector = [this]() -> Future<Connection> {
      return process::http::connect(master.get());
    };

    // We create two persistent connections here, one for subscribe
    // call/streaming response and another for non-subscribe calls/responses.
    collect(connector(), connector())
      .onAny(defer(self(), &Self::connected, connectionId.get(), lambda::_1));
  }

  void connected(
      const UUID& _connectionId,
      const Future<tuple<Connection, Connection>>& _connections)
  {
    // It is possible that a new master was detected while we had an ongoing
    // (re-)connection attempt with the old master.
    if (connectionId != _connectionId) {
      VLOG(1) << "Ignoring connection attempt from stale connection";
      return;
    }

    CHECK_EQ(CONNECTING, state);
    CHECK_SOME(connectionId);

    if (!_connections.isReady()) {
      disconnected(connectionId.get(),
                   _connections.isFailed()
                     ? _connections.failure()
                     : "Connection future discarded");
      return;
    }

    VLOG(1) << "Connected with the master at " << master.get();

    state = CONNECTED;

    connections =
      Connections {get<0>(_connections.get()), get<1>(_connections.get())};

    connections->subscribe.disconnected()
      .onAny(defer(self(),
                   &Self::disconnected,
                   connectionId.get(),
                   "Subscribe connection interrupted"));

    connections->nonSubscribe.disconnected()
      .onAny(defer(self(),
                   &Self::disconnected,
                   connectionId.get(),
                   "Non-subscribe connection interrupted"));

    // Invoke the connected callback once we have established both subscribe
    // and non-subscribe connections with the master.
    mutex.lock()
      .then(defer(self(), [this]() {
        return async(callbacks.connected);
      }))
      .onAny(lambda::bind(&Mutex::unlock, mutex));
  }

  void disconnected(
      const UUID& _connectionId,
      const string& failure)
  {
    // Ignore if the disconnection happened from an old stale connection.
    if (connectionId != _connectionId) {
      VLOG(1) << "Ignoring disconnection attempt from stale connection";
      return;
    }

    // We can reach here if we noticed a disconnection for either of
    // subscribe/non-subscribe connections. We discard the future here to
    // trigger a master re-detection.
    detection.discard();
  }

  void disconnect()
  {
    if (connections.isSome()) {
      connections->subscribe.disconnect();
      connections->nonSubscribe.disconnect();
    }

    if (subscribed.isSome()) {
      subscribed->reader.close();
    }

    state = DISCONNECTED;

    connections = None();
    connectionId = None();
    subscribed = None();
  }

  void detected(const Future<Option<mesos::MasterInfo>>& future)
  {
    if (future.isFailed()) {
      error("Failed to detect a master: " + future.failure());
      return;
    }

    if (state == CONNECTED || state == SUBSCRIBING || state == SUBSCRIBED) {
      // Invoke the disconnected callback if we were previously connected.
      mutex.lock()
        .then(defer(self(), [this]() {
          return async(callbacks.disconnected);
        }))
      .onAny(lambda::bind(&Mutex::unlock, mutex));
    }

    // Disconnect any active connections.
    disconnect();

    Option<mesos::MasterInfo> latest;
    if (future.isDiscarded()) {
      LOG(INFO) << "Re-detecting master";
      master = None();
      latest = None();
    } else if (future->isNone()) {
      LOG(INFO) << "Lost leading master";
      master = None();
      latest = None();
    } else {
      const UPID& upid = future.get().get().pid();
      latest = future.get();

      string scheme = "http";

#ifdef USE_SSL_SOCKET
      Option<string> value;

      value = os::getenv("SSL_ENABLED");
      if (value.isSome() && (value.get() == "1" || value.get() == "true")) {
        scheme = "https";
      }
#endif

      master = ::URL(
        scheme,
        upid.address.ip,
        upid.address.port,
        upid.id +
        "/api/v1/scheduler");

      LOG(INFO) << "New master detected at " << upid;

      connectionId = UUID::random();

      // Wait for a random duration between 0 and `flags.connectionDelayMax`
      // before (re-)connecting with the master.
      Duration delay = flags.connectionDelayMax * ((double) os::random()
                       / RAND_MAX);

      VLOG(1) << "Waiting for " << delay << " before initiating a "
              << "re-(connection) attempt with the master";

      process::delay(delay, self(), &MesosProcess::connect, connectionId.get());
    }

    // Keep detecting masters.
    detection = detector->detect(latest)
      .onAny(defer(self(), &MesosProcess::detected, lambda::_1));
  }

  Future<Nothing> _receive()
  {
    Future<Nothing> future = async(callbacks.received, events);
    events = queue<Event>();
    return future;
  }

  // Helper for injecting an ERROR event.
  void error(const string& message)
  {
    Event event;

    event.set_type(Event::ERROR);

    Event::Error* error = event.mutable_error();

    error->set_message(message);

    receive(event, true);
  }

  void drop(const Call& call, const string& message)
  {
    LOG(WARNING) << "Dropping " << call.type() << ": " << message;
  }

  void _send(
      const UUID& _connectionId,
      const Call& call,
      const Future<Response>& response)
  {
    // It is possible that we detected a new master before a response could
    // be received.
    if (connectionId != _connectionId) {
      return;
    }

    CHECK(!response.isDiscarded());
    CHECK(state == SUBSCRIBING || state == SUBSCRIBED) << state;

    // This can happen during a master failover or a network blip
    // causing the socket to timeout. Eventually, the scheduler would
    // detect the disconnection via ZK(disconnect()) or lack of heartbeats.
    if (response.isFailed()) {
      LOG(ERROR) << "Request for call type " << call.type() << " failed: "
                 << response.failure();
      return;
    }

    if (response->code == process::http::Status::OK) {
      // Only SUBSCRIBE call should get a "200 OK" response.
      CHECK_EQ(Call::SUBSCRIBE, call.type());
      CHECK_EQ(response->type, http::Response::PIPE);
      CHECK_SOME(response->reader);

      state = SUBSCRIBED;

      Pipe::Reader reader = response->reader.get();

      auto deserializer =
        lambda::bind(deserialize<Event>, contentType, lambda::_1);

      Owned<Reader<Event>> decoder(
          new Reader<Event>(Decoder<Event>(deserializer), reader));

      subscribed = SubscribedResponse {reader, decoder};

      // Responses to SUBSCRIBE calls should always include a stream ID.
      CHECK(response->headers.contains("Mesos-Stream-Id"));

      Try<UUID> uuid =
        UUID::fromString(response->headers.at("Mesos-Stream-Id"));

      CHECK_SOME(uuid);

      streamId = uuid.get();

      read();

      return;
    }

    if (response->code == process::http::Status::ACCEPTED) {
      // Only non SUBSCRIBE calls should get a "202 Accepted" response.
      CHECK_NE(Call::SUBSCRIBE, call.type());
      return;
    }

    // We reset the state to connected if the subscribe call did not
    // succceed (e.g., the master was still recovering). The scheduler can
    // then retry the subscribe call.
    if (call.type() == Call::SUBSCRIBE) {
      state = CONNECTED;
    }

    if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
      // This could happen if the master hasn't realized it is the leader yet
      // or is still in the process of recovery.
      LOG(WARNING) << "Received '" << response->status << "' ("
                   << response->body << ") for " << call.type();
      return;
    }

    if (response->code == process::http::Status::NOT_FOUND) {
      // This could happen if the master libprocess process has not yet set up
      // HTTP routes.
      LOG(WARNING) << "Received '" << response->status << "' ("
                   << response->body << ") for " << call.type();
      return;
    }

    if (response->code == process::http::Status::TEMPORARY_REDIRECT) {
      // This could happen if the detector detects a new leading master before
      // master itself realizes it (e.g., ZK watch delay).
      LOG(WARNING) << "Received '" << response->status << "' ("
                   << response->body << ") for " << call.type();
      return;
    }

    // We should be able to get here only for AuthN errors which is not
    // yet supported for HTTP frameworks.
    error("Received unexpected '" + response->status + "' (" +
          response->body + ") for " + stringify(call.type()));
  }

  void read()
  {
    subscribed->decoder->read()
      .onAny(defer(self(),
                   &Self::_read,
                   subscribed->reader,
                   lambda::_1));
  }

  void _read(const Pipe::Reader& reader, const Future<Result<Event>>& event)
  {
    CHECK(!event.isDiscarded());

    // Ignore enqueued events from the previous Subscribe call reader.
    if (!subscribed.isSome() || subscribed->reader != reader) {
      VLOG(1) << "Ignoring event from old stale connection";
      return;
    }

    CHECK_EQ(SUBSCRIBED, state);
    CHECK_SOME(connectionId);

    // This could happen if the master failed over while sending a event.
    if (event.isFailed()) {
      LOG(ERROR) << "Failed to decode the stream of events: "
                 << event.failure();
      disconnected(connectionId.get(), event.failure());
      return;
    }

    // This could happen if the master failed over after sending an event.
    if (!event->isSome()) {
      const string error = "End-Of-File received from master. The master "
                           "closed the event stream";
      LOG(ERROR) << error;

      disconnected(connectionId.get(), error);
      return;
    }

    if (event->isError()) {
      error("Failed to de-serialize event: " + event->error());
    } else {
      receive(event.get().get(), false);
    }

    read();
  }

  void receive(const Event& event, bool isLocallyInjected)
  {
    // Check if we're are no longer subscribed but received an event.
    if (!isLocallyInjected && state != SUBSCRIBED) {
      LOG(WARNING) << "Ignoring " << stringify(event.type())
                   << " event because we're no longer subscribed";
      return;
    }

    if (isLocallyInjected) {
      VLOG(1) << "Enqueuing locally injected event " << stringify(event.type());
    } else {
      VLOG(1) << "Enqueuing event " << stringify(event.type()) << " received"
              << " from " << master.get();
    }

    // Queue up the event and invoke the 'received' callback if this
    // is the first event (between now and when the 'received'
    // callback actually gets invoked more events might get queued).
    events.push(event);

    if (events.size() == 1) {
      mutex.lock()
        .then(defer(self(), &Self::_receive))
        .onAny(lambda::bind(&Mutex::unlock, mutex));
    }
  }

private:
  struct Callbacks
  {
    lambda::function<void(void)> connected;
    lambda::function<void(void)> disconnected;
    lambda::function<void(const queue<Event>&)> received;
  };

  struct SubscribedResponse
  {
    Pipe::Reader reader;
    process::Owned<Reader<Event>> decoder;
  };

  enum State
  {
    DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
    CONNECTING, // Trying to establish subscribe and non-subscribe connections.
    CONNECTED, // Established subscribe and non-subscribe connections.
    SUBSCRIBING, // Trying to subscribe with the master.
    SUBSCRIBED // Subscribed with the master.
  } state;

  friend ostream& operator<<(ostream& stream, State state)
  {
    switch (state) {
      case DISCONNECTED: return stream << "DISCONNECTED";
      case CONNECTING:   return stream << "CONNECTING";
      case CONNECTED:    return stream << "CONNECTED";
      case SUBSCRIBING:  return stream << "SUBSCRIBING";
      case SUBSCRIBED:   return stream << "SUBSCRIBED";
    }

    UNREACHABLE();
  }

  // There can be multiple simulataneous ongoing (re-)connection attempts with
  // the master (e.g., the master failed over while an attempt was in progress).
  // This helps us in uniquely identifying the current connection instance and
  // ignoring the stale instance.
  Option<UUID> connectionId; // UUID to identify the connection instance.

  Option<Connections> connections;
  Option<SubscribedResponse> subscribed;
  ContentType contentType;
  Callbacks callbacks;
  const Option<Credential> credential;
  Mutex mutex; // Used to serialize the callback invocations.
  bool local; // Whether or not we launched a local cluster.
  shared_ptr<MasterDetector> detector;
  queue<Event> events;
  Option<::URL> master;
  Option<UUID> streamId;
  const Flags flags;

  // Master detection future.
  process::Future<Option<mesos::MasterInfo>> detection;
};


Mesos::Mesos(
    const string& master,
    ContentType contentType,
    const lambda::function<void()>& connected,
    const lambda::function<void()>& disconnected,
    const lambda::function<void(const queue<Event>&)>& received,
    const Option<Credential>& credential,
    const Option<shared_ptr<MasterDetector>>& detector)
{
  Flags flags;

  Try<flags::Warnings> load = flags.load("MESOS_");

  if (load.isError()) {
    EXIT(EXIT_FAILURE) << "Failed to load flags: " << load.error();
  }

  // Log any flag warnings (after logging is initialized).
  foreach (const flags::Warning& warning, load->warnings) {
    LOG(WARNING) << warning.message;
  }

  process = new MesosProcess(
      master,
      contentType,
      connected,
      disconnected,
      received,
      credential,
      detector,
      flags);

  spawn(process);
}


Mesos::Mesos(
    const string& master,
    ContentType contentType,
    const lambda::function<void()>& connected,
    const lambda::function<void()>& disconnected,
    const lambda::function<void(const queue<Event>&)>& received,
    const Option<Credential>& credential)
  : Mesos(master,
          contentType,
          connected,
          disconnected,
          received,
          credential,
          None()) {}


Mesos::~Mesos()
{
  if (process != nullptr) {
    stop();
  }
}


void Mesos::send(const Call& call)
{
  dispatch(process, &MesosProcess::send, call);
}


void Mesos::reconnect()
{
  dispatch(process, &MesosProcess::reconnect);
}


void Mesos::stop()
{
  if (process != nullptr) {
    terminate(process);
    wait(process);

    delete process;
    process = nullptr;
  }
}

} // namespace scheduler {
} // namespace v1 {
} // namespace mesos {
