/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdint.h>

#include <set>

#include <mesos/mesos.hpp>

#include <process/owned.hpp>
#include <process/pid.hpp>

#include <stout/check.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>

#include "authorizer/authorizer.hpp"

#include "common/build.hpp"
#include "common/protobuf_utils.hpp"

#include "logging/flags.hpp"
#include "logging/logging.hpp"

#include "master/allocator.hpp"
#include "master/contender.hpp"
#include "master/detector.hpp"
#include "master/drf_sorter.hpp"
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
#include "master/repairer.hpp"

#include "state/in_memory.hpp"
#include "state/log.hpp"
#include "state/protobuf.hpp"
#include "state/storage.hpp"


#include "zookeeper/detector.hpp"

using namespace mesos::internal;
using namespace mesos::internal::log;
using namespace mesos::internal::master;
using namespace zookeeper;

using mesos::MasterInfo;

using process::Owned;
using process::UPID;

using std::cerr;
using std::cout;
using std::endl;
using std::set;
using std::string;


void usage(const char* argv0, const flags::FlagsBase& flags)
{
  cerr << "Usage: " << os::basename(argv0).get() << " [...]" << endl
       << endl
       << "Supported options:" << endl
       << flags.usage();
}


void version()
{
  cout << "mesos" << " " << MESOS_VERSION << endl;
}


int main(int argc, char** argv)
{
  GOOGLE_PROTOBUF_VERIFY_VERSION;

  master::Flags flags;

  // The following flags are executable specific (e.g., since we only
  // have one instance of libprocess per execution, we only want to
  // advertise the IP and port option once, here).
  Option<string> ip;
  flags.add(&ip, "ip", "IP address to listen on");

  uint16_t port;
  flags.add(&port, "port", "Port to listen on", MasterInfo().port());

  Option<string> zk;
  flags.add(&zk,
            "zk",
            "ZooKeeper URL (used for leader election amongst masters)\n"
            "May be one of:\n"
            "  zk://host1:port1,host2:port2,.../path\n"
            "  zk://username:password@host1:port1,host2:port2,.../path\n"
            "  file:///path/to/file (where file contains one of the above)");

  bool help;
  flags.add(&help,
            "help",
            "Prints this help message",
            false);

  Try<Nothing> load = flags.load("MESOS_", argc, argv);

  if (load.isError()) {
    cerr << load.error() << endl;
    usage(argv[0], flags);
    exit(1);
  }

  if (flags.version) {
    version();
    exit(0);
  }

  if (help) {
    usage(argv[0], flags);
    exit(1);
  }

  // Initialize libprocess.
  if (ip.isSome()) {
    os::setenv("LIBPROCESS_IP", ip.get());
  }

  os::setenv("LIBPROCESS_PORT", stringify(port));

  process::initialize("master");

  logging::initialize(argv[0], flags, true); // Catch signals.

  LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;

  LOG(INFO) << "Version: " << MESOS_VERSION;

  if (build::GIT_TAG.isSome()) {
    LOG(INFO) << "Git tag: " << build::GIT_TAG.get();
  }

  if (build::GIT_SHA.isSome()) {
    LOG(INFO) << "Git SHA: " << build::GIT_SHA.get();
  }

  allocator::AllocatorProcess* allocatorProcess =
    new allocator::HierarchicalDRFAllocatorProcess();
  allocator::Allocator* allocator =
    new allocator::Allocator(allocatorProcess);

  state::Storage* storage = NULL;
  Log* log = NULL;

  if (flags.registry == "in_memory") {
    if (flags.registry_strict) {
      EXIT(1) << "Cannot use '--registry_strict' when using in-memory storage"
              << " based registry";
    }
    storage = new state::InMemoryStorage();
  } else if (flags.registry == "replicated_log" ||
             flags.registry == "log_storage") {
    // TODO(bmahler): "log_storage" is present for backwards
    // compatibility, can be removed before 0.19.0.
    if (flags.work_dir.isNone()) {
      EXIT(1) << "--work_dir needed for replicated log based registry";
    }

    Try<Nothing> mkdir = os::mkdir(flags.work_dir.get());
    if (mkdir.isError()) {
      EXIT(1) << "Failed to create work directory '" << flags.work_dir.get()
              << "': " << mkdir.error();
    }

    if (zk.isSome()) {
      // Use replicated log with ZooKeeper.
      if (flags.quorum.isNone()) {
        EXIT(1) << "Need to specify --quorum for replicated log based"
                << " registry when using ZooKeeper";
      }

      string zk_;
      if (strings::startsWith(zk.get(), "file://")) {
        const string& path = zk.get().substr(7);
        const Try<string> read = os::read(path);
        if (read.isError()) {
          EXIT(1) << "Failed to read from file at '" + path + "': "
                  << read.error();
        }
        zk_ = read.get();
      } else {
        zk_ = zk.get();
      }

      Try<URL> url = URL::parse(zk_);
      if (url.isError()) {
        EXIT(1) << "Error parsing ZooKeeper URL: " << url.error();
      }

      log = new Log(
          flags.quorum.get(),
          path::join(flags.work_dir.get(), "replicated_log"),
          url.get().servers,
          flags.zk_session_timeout,
          path::join(url.get().path, "log_replicas"),
          url.get().authentication,
          flags.log_auto_initialize);
    } else {
      // Use replicated log without ZooKeeper.
      log = new Log(
          1,
          path::join(flags.work_dir.get(), "replicated_log"),
          set<UPID>(),
          flags.log_auto_initialize);
    }
    storage = new state::LogStorage(log);
  } else {
    EXIT(1) << "'" << flags.registry << "' is not a supported"
            << " option for registry persistence";
  }

  CHECK_NOTNULL(storage);

  state::protobuf::State* state = new state::protobuf::State(storage);
  Registrar* registrar = new Registrar(flags, state);
  Repairer* repairer = new Repairer();

  Files files;

  MasterContender* contender;
  MasterDetector* detector;

  // TODO(vinod): 'MasterContender::create()' should take
  // Option<string>.
  Try<MasterContender*> contender_ = MasterContender::create(zk.get(""));
  if (contender_.isError()) {
    EXIT(1) << "Failed to create a master contender: " << contender_.error();
  }
  contender = contender_.get();

  // TODO(vinod): 'MasterDetector::create()' should take
  // Option<string>.
  Try<MasterDetector*> detector_ = MasterDetector::create(zk.get(""));
  if (detector_.isError()) {
    EXIT(1) << "Failed to create a master detector: " << detector_.error();
  }
  detector = detector_.get();

  Option<Authorizer*> authorizer = None();
  if (flags.acls.isSome()) {
    Try<Owned<Authorizer> > authorizer_ = Authorizer::create(flags.acls.get());
    if (authorizer_.isError()) {
      EXIT(1) << "Failed to initialize the authorizer: "
              << authorizer_.error() << " (see --acls flag)";
    }
    Owned<Authorizer> authorizer__ = authorizer_.get();
    authorizer = authorizer__.release();
  }

  LOG(INFO) << "Starting Mesos master";

  Master* master =
    new Master(
      allocator,
      registrar,
      repairer,
      &files,
      contender,
      detector,
      authorizer,
      flags);

  if (zk.isNone()) {
    // It means we are using the standalone detector so we need to
    // appoint this Master as the leader.
    dynamic_cast<StandaloneMasterDetector*>(detector)->appoint(master->info());
  }

  process::spawn(master);

  process::wait(master->self());
  delete master;
  delete allocator;
  delete allocatorProcess;

  delete registrar;
  delete repairer;
  delete state;
  delete storage;
  delete log;

  delete contender;
  delete detector;

  if (authorizer.isSome()) {
    delete authorizer.get();
  }

  return 0;
}
