// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "master/detector/zookeeper.hpp"

#include <set>
#include <string>

#include <mesos/master/detector.hpp>

#include <mesos/zookeeper/detector.hpp>
#include <mesos/zookeeper/group.hpp>
#include <mesos/zookeeper/url.hpp>

#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>

#include <stout/duration.hpp>
#include <stout/lambda.hpp>
#include <stout/protobuf.hpp>

#include "common/protobuf_utils.hpp"

#include "master/constants.hpp"

using namespace process;
using namespace zookeeper;

using std::set;
using std::string;

namespace mesos {
namespace master {
namespace detector {

const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);

class ZooKeeperMasterDetectorProcess
  : public Process<ZooKeeperMasterDetectorProcess>
{
public:
  explicit ZooKeeperMasterDetectorProcess(
      const zookeeper::URL& url,
      const Duration& sessionTimeout);

  explicit ZooKeeperMasterDetectorProcess(Owned<Group> group);
  ~ZooKeeperMasterDetectorProcess();

  virtual void initialize();
  Future<Option<MasterInfo>> detect(const Option<MasterInfo>& previous);

private:
  void discard(const Future<Option<MasterInfo>>& future);

  // Invoked when the group leadership has changed.
  void detected(const Future<Option<Group::Membership>>& leader);

  // Invoked when we have fetched the data associated with the leader.
  void fetched(
      const Group::Membership& membership,
      const Future<Option<string>>& data);

  Owned<Group> group;
  LeaderDetector detector;

  // The leading Master.
  Option<MasterInfo> leader;
  set<Promise<Option<MasterInfo>>*> promises;

  // Potential non-retryable error.
  Option<Error> error;
};


ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
    const zookeeper::URL& url,
    const Duration& sessionTimeout)
  : ZooKeeperMasterDetectorProcess(Owned<Group>(
    new Group(url.servers,
              sessionTimeout,
              url.path,
              url.authentication))) {}


ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
    Owned<Group> _group)
  : ProcessBase(ID::generate("zookeeper-master-detector")),
    group(_group),
    detector(group.get()),
    leader(None()) {}


ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
{
  discardPromises(&promises);
}


void ZooKeeperMasterDetectorProcess::initialize()
{
  detector.detect()
    .onAny(defer(self(), &Self::detected, lambda::_1));
}


void ZooKeeperMasterDetectorProcess::discard(
    const Future<Option<MasterInfo>>& future)
{
  // Discard the promise holding this future.
  discardPromises(&promises, future);
}


Future<Option<MasterInfo>> ZooKeeperMasterDetectorProcess::detect(
    const Option<MasterInfo>& previous)
{
  // Return immediately if the detector is no longer operational due
  // to a non-retryable error.
  if (error.isSome()) {
    return Failure(error.get().message);
  }

  if (leader != previous) {
    return leader;
  }

  Promise<Option<MasterInfo>>* promise = new Promise<Option<MasterInfo>>();

  promise->future()
    .onDiscard(defer(self(), &Self::discard, promise->future()));

  promises.insert(promise);
  return promise->future();
}


void ZooKeeperMasterDetectorProcess::detected(
    const Future<Option<Group::Membership>>& _leader)
{
  CHECK(!_leader.isDiscarded());

  if (_leader.isFailed()) {
    LOG(ERROR) << "Failed to detect the leader: " << _leader.failure();

    // Setting this error stops the detection loop and the detector
    // transitions to an erroneous state. Further calls to detect()
    // will directly fail as a result.
    error = Error(_leader.failure());
    leader = None();

    failPromises(&promises, _leader.failure());

    return;
  }

  if (_leader.get().isNone()) {
    leader = None();

    setPromises(&promises, leader);
  } else {
    // Fetch the data associated with the leader.
    group->data(_leader.get().get())
      .onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1));
  }

  // Keep trying to detect leadership changes.
  detector.detect(_leader.get())
    .onAny(defer(self(), &Self::detected, lambda::_1));
}


void ZooKeeperMasterDetectorProcess::fetched(
    const Group::Membership& membership,
    const Future<Option<string>>& data)
{
  CHECK(!data.isDiscarded());

  if (data.isFailed()) {
    leader = None();
    failPromises(&promises, data.failure());
    return;
  } else if (data.get().isNone()) {
    // Membership is gone before we can read its data.
    leader = None();
    setPromises(&promises, leader);
    return;
  }

  // Parse the data based on the membership label and cache the
  // leader for subsequent requests.
  Option<string> label = membership.label();
  if (label.isNone()) {
    // If we are here it means some masters are still creating znodes
    // with the old format.
    UPID pid = UPID(data.get().get());
    LOG(WARNING) << "Leading master " << pid << " has data in old format";
    leader = mesos::internal::protobuf::createMasterInfo(pid);
  } else if (label.isSome() &&
             label.get() == internal::master::MASTER_INFO_LABEL) {
    MasterInfo info;
    if (!info.ParseFromString(data.get().get())) {
      leader = None();
      failPromises(&promises,
          "Failed to parse data into MasterInfo");
      return;
    }
    LOG(WARNING) << "Leading master " << info.pid()
                 << " is using a Protobuf binary format when registering with "
                 << "ZooKeeper (" << label.get() << "): this will be deprecated"
                 << " as of Mesos 0.24 (see MESOS-2340)";
    leader = info;
  } else if (label.isSome() &&
             label.get() == internal::master::MASTER_INFO_JSON_LABEL) {
    Try<JSON::Object> object = JSON::parse<JSON::Object>(data.get().get());

    if (object.isError()) {
      leader = None();
      failPromises(
          &promises,
          "Failed to parse data into valid JSON: " + object.error());
      return;
    }

    Try<mesos::MasterInfo> info =
      ::protobuf::parse<mesos::MasterInfo>(object.get());

    if (info.isError()) {
      leader = None();
      failPromises(
          &promises,
          "Failed to parse JSON into a valid MasterInfo protocol buffer: " +
          info.error());
      return;
    }

    leader = info.get();
  } else {
    leader = None();
    failPromises(
        &promises,
        "Failed to parse data of unknown label '" + label.get() + "'");
    return;
  }

  LOG(INFO) << "A new leading master (UPID="
            << UPID(leader.get().pid()) << ") is detected";

  setPromises(&promises, leader);
}


ZooKeeperMasterDetector::ZooKeeperMasterDetector(
    const zookeeper::URL& url,
    const Duration& sessionTimeout)
{
  process = new ZooKeeperMasterDetectorProcess(url, sessionTimeout);
  spawn(process);
}


ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
{
  process = new ZooKeeperMasterDetectorProcess(group);
  spawn(process);
}


ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
{
  terminate(process);
  process::wait(process);
  delete process;
}


Future<Option<MasterInfo>> ZooKeeperMasterDetector::detect(
    const Option<MasterInfo>& previous)
{
  return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
}

} // namespace detector {
} // namespace master {
} // namespace mesos {
