// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <stdint.h>

#include <list>
#include <set>
#include <string>

#include <gmock/gmock.h>

#include <mesos/log/log.hpp>

#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/shared.hpp>

#include <stout/gtest.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/stopwatch.hpp>
#include <stout/try.hpp>

#include <stout/tests/utils.hpp>

#include "log/catchup.hpp"
#include "log/coordinator.hpp"
#include "log/leveldb.hpp"
#include "log/network.hpp"
#include "log/storage.hpp"
#include "log/recover.hpp"
#include "log/replica.hpp"
#include "log/tool/initialize.hpp"

#include "tests/environment.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"

#ifdef MESOS_HAS_JAVA
#include "tests/zookeeper.hpp"
#endif

using namespace mesos::internal::log;

using namespace process;

using std::list;
using std::set;
using std::string;

using testing::_;
using testing::Eq;
using testing::Invoke;
using testing::Return;

using mesos::log::Log;

namespace mesos {
namespace internal {
namespace tests {


TEST(NetworkTest, Watch)
{
  UPID pid1 = ProcessBase().self();
  UPID pid2 = ProcessBase().self();

  Network network;

  // Test the default parameter.
  Future<size_t> future = network.watch(1u);
  AWAIT_READY(future);
  EXPECT_EQ(0u, future.get());

  future = network.watch(2u, Network::NOT_EQUAL_TO);
  AWAIT_READY(future);
  EXPECT_EQ(0u, future.get());

  future = network.watch(0u, Network::GREATER_THAN_OR_EQUAL_TO);
  AWAIT_READY(future);
  EXPECT_EQ(0u, future.get());

  future = network.watch(1u, Network::LESS_THAN);
  AWAIT_READY(future);
  EXPECT_EQ(0u, future.get());

  network.add(pid1);

  future = network.watch(1u, Network::EQUAL_TO);
  AWAIT_READY(future);
  EXPECT_EQ(1u, future.get());

  future = network.watch(1u, Network::GREATER_THAN);
  ASSERT_TRUE(future.isPending());

  network.add(pid2);

  AWAIT_READY(future);
  EXPECT_EQ(2u, future.get());

  future = network.watch(1u, Network::LESS_THAN_OR_EQUAL_TO);
  ASSERT_TRUE(future.isPending());

  network.remove(pid2);

  AWAIT_READY(future);
  EXPECT_EQ(1u, future.get());
}


template <typename T>
class LogStorageTest : public TemporaryDirectoryTest {};


typedef ::testing::Types<LevelDBStorage> LogStorageTypes;


TYPED_TEST_CASE(LogStorageTest, LogStorageTypes);


TYPED_TEST(LogStorageTest, Truncate)
{
  TypeParam storage;

  Try<Storage::State> state = storage.restore(os::getcwd() + "/.log");
  ASSERT_SOME(state);

  EXPECT_EQ(Metadata::EMPTY, state->metadata.status());
  EXPECT_EQ(0u, state->metadata.promised());
  EXPECT_EQ(0u, state->begin);
  EXPECT_EQ(0u, state->end);

  // Append from position 0 to position 9.
  for (uint64_t i = 0; i < 10; i++) {
    Action action;
    action.set_position(i);
    action.set_promised(1);
    action.set_performed(1);
    action.set_learned(true);
    action.set_type(Action::APPEND);
    action.mutable_append()->set_bytes(stringify(i));

    ASSERT_SOME(storage.persist(action));
  }

  for (uint64_t i = 0; i < 10; i++) {
    Try<Action> action = storage.read(i);
    ASSERT_SOME(action);

    EXPECT_EQ(i, action->position());
    EXPECT_EQ(1u, action->promised());
    EXPECT_EQ(1u, action->performed());
    EXPECT_TRUE(action->learned());
    EXPECT_EQ(Action::APPEND, action->type());
    ASSERT_TRUE(action->has_append());
    EXPECT_EQ(stringify(i), action->append().bytes());
  }

  // Truncate to position 3 (at position 10).
  Action truncate;
  truncate.set_position(10);
  truncate.set_promised(1);
  truncate.set_performed(1);
  truncate.set_learned(true);
  truncate.set_type(Action::TRUNCATE);
  truncate.mutable_truncate()->set_to(3);

  ASSERT_SOME(storage.persist(truncate));

  for (uint64_t i = 0; i < 11; i++) {
    Try<Action> action = storage.read(i);

    if (i < 3) {
      // Position 0, 1 and 2 have been truncated.
      EXPECT_ERROR(action);
    } else if (i == 10) {
      // Position 10 is a truncate.
      EXPECT_EQ(i, action->position());
      EXPECT_EQ(1u, action->promised());
      EXPECT_EQ(1u, action->performed());
      EXPECT_TRUE(action->learned());
      EXPECT_EQ(Action::TRUNCATE, action->type());
      ASSERT_TRUE(action->has_truncate());
      EXPECT_EQ(3u, action->truncate().to());
    } else {
      EXPECT_EQ(i, action->position());
      EXPECT_EQ(1u, action->promised());
      EXPECT_EQ(1u, action->performed());
      EXPECT_TRUE(action->learned());
      EXPECT_EQ(Action::APPEND, action->type());
      ASSERT_TRUE(action->has_append());
      EXPECT_EQ(stringify(i), action->append().bytes());
    }
  }

  // Truncate to position 10 (at position 11).
  truncate.set_position(11);
  truncate.set_promised(1);
  truncate.set_performed(1);
  truncate.set_learned(true);
  truncate.set_type(Action::TRUNCATE);
  truncate.mutable_truncate()->set_to(10);

  ASSERT_SOME(storage.persist(truncate));

  for (uint64_t i = 0; i < 12; i++) {
    Try<Action> action = storage.read(i);

    if (i < 10) {
      // Position 0 to 9 have been truncated.
      EXPECT_ERROR(action);
    } else if (i == 10) {
      // Position 10 is a truncate (to position 3).
      EXPECT_EQ(i, action->position());
      EXPECT_EQ(1u, action->promised());
      EXPECT_EQ(1u, action->performed());
      EXPECT_TRUE(action->learned());
      EXPECT_EQ(Action::TRUNCATE, action->type());
      ASSERT_TRUE(action->has_truncate());
      EXPECT_EQ(3u, action->truncate().to());
    } else if (i == 11) {
      // Position 11 is a truncate (to position 10).
      EXPECT_EQ(i, action->position());
      EXPECT_EQ(1u, action->promised());
      EXPECT_EQ(1u, action->performed());
      EXPECT_TRUE(action->learned());
      EXPECT_EQ(Action::TRUNCATE, action->type());
      ASSERT_TRUE(action->has_truncate());
      EXPECT_EQ(10u, action->truncate().to());
    }
  }
}


TYPED_TEST(LogStorageTest, TruncateWithEmptyLog)
{
  TypeParam storage;

  Try<Storage::State> state = storage.restore(os::getcwd() + "/.log");
  ASSERT_SOME(state);

  Action truncate;
  truncate.set_position(1);
  truncate.set_promised(1);
  truncate.set_performed(1);
  truncate.set_learned(true);
  truncate.set_type(Action::TRUNCATE);
  truncate.mutable_truncate()->set_to(0);

  ASSERT_SOME(storage.persist(truncate));

  Try<Action> action0 = storage.read(0);
  EXPECT_ERROR(action0);

  Try<Action> action1 = storage.read(1);
  EXPECT_EQ(1u, action1->position());
  EXPECT_EQ(1u, action1->promised());
  EXPECT_EQ(1u, action1->performed());
  EXPECT_TRUE(action1->learned());
  EXPECT_EQ(Action::TRUNCATE, action1->type());
  ASSERT_TRUE(action1->has_truncate());
  EXPECT_EQ(0u, action1->truncate().to());
}


TYPED_TEST(LogStorageTest, TruncateWithManyHoles)
{
  TypeParam storage;

  Try<Storage::State> state = storage.restore(os::getcwd() + "/.log");
  ASSERT_SOME(state);

  Action truncate;
  truncate.set_position(600020000);
  truncate.set_promised(1);
  truncate.set_performed(1);
  truncate.set_learned(true);
  truncate.set_type(Action::TRUNCATE);
  truncate.mutable_truncate()->set_to(600000000);

  // Measure the time taken for the truncation.
  Stopwatch stopwatch;
  stopwatch.start();

  ASSERT_SOME(storage.persist(truncate));

  // This truncation should not take much time because no position is
  // actually being truncated.
  EXPECT_GT(Seconds(1), stopwatch.elapsed());

  Try<Action> action = storage.read(600020000);

  EXPECT_EQ(600020000u, action->position());
  EXPECT_EQ(1u, action->promised());
  EXPECT_EQ(1u, action->performed());
  EXPECT_TRUE(action->learned());
  EXPECT_EQ(Action::TRUNCATE, action->type());
  ASSERT_TRUE(action->has_truncate());
  EXPECT_EQ(600000000u, action->truncate().to());
}


class ReplicaTest : public TemporaryDirectoryTest
{
protected:
  // Used to change the status of a replicated log from `EMPTY` to `VOTING`.
  tool::Initialize initializer;
};


TEST_F(ReplicaTest, Promise)
{
  const string path = os::getcwd() + "/.log";
  initializer.flags.path = path;
  ASSERT_SOME(initializer.execute());

  Replica replica(path);

  PromiseRequest request;
  request.set_proposal(2);

  Future<PromiseResponse> response =
    protocol::promise(replica.pid(), request);

  AWAIT_READY(response);

  EXPECT_EQ(PromiseResponse::ACCEPT, response->type());
  EXPECT_TRUE(response->okay());
  EXPECT_EQ(2u, response->proposal());
  EXPECT_TRUE(response->has_position());
  EXPECT_EQ(0u, response->position());
  EXPECT_FALSE(response->has_action());

  request.set_proposal(1);

  response = protocol::promise(replica.pid(), request);

  AWAIT_READY(response);

  EXPECT_EQ(PromiseResponse::REJECT, response->type());
  EXPECT_FALSE(response->okay());
  EXPECT_EQ(2u, response->proposal()); // Highest proposal seen so far.
  EXPECT_FALSE(response->has_position());
  EXPECT_FALSE(response->has_action());

  request.set_proposal(3);

  response = protocol::promise(replica.pid(), request);

  AWAIT_READY(response);

  EXPECT_EQ(PromiseResponse::ACCEPT, response->type());
  EXPECT_TRUE(response->okay());
  EXPECT_EQ(3u, response->proposal());
  EXPECT_TRUE(response->has_position());
  EXPECT_EQ(0u, response->position());
  EXPECT_FALSE(response->has_action());
}


TEST_F(ReplicaTest, Append)
{
  const string path = os::getcwd() + "/.log";
  initializer.flags.path = path;
  ASSERT_SOME(initializer.execute());

  Replica replica(path);

  const uint64_t proposal = 1;

  PromiseRequest request1;
  request1.set_proposal(proposal);

  Future<PromiseResponse> response1 =
    protocol::promise(replica.pid(), request1);

  AWAIT_READY(response1);

  EXPECT_EQ(PromiseResponse::ACCEPT, response1->type());
  EXPECT_TRUE(response1->okay());
  EXPECT_EQ(proposal, response1->proposal());
  EXPECT_TRUE(response1->has_position());
  EXPECT_EQ(0u, response1->position());
  EXPECT_FALSE(response1->has_action());

  WriteRequest request2;
  request2.set_proposal(proposal);
  request2.set_position(1);
  request2.set_type(Action::APPEND);
  request2.mutable_append()->set_bytes("hello world");

  Future<WriteResponse> response2 =
    protocol::write(replica.pid(), request2);

  AWAIT_READY(response2);

  EXPECT_EQ(WriteResponse::ACCEPT, response2->type());
  EXPECT_TRUE(response2->okay());
  EXPECT_EQ(proposal, response2->proposal());
  EXPECT_EQ(1u, response2->position());

  Future<list<Action>> actions = replica.read(1, 1);

  AWAIT_READY(actions);
  ASSERT_EQ(1u, actions->size());

  Action action = actions->front();
  EXPECT_EQ(1u, action.position());
  EXPECT_EQ(1u, action.promised());
  EXPECT_TRUE(action.has_performed());
  EXPECT_EQ(1u, action.performed());
  EXPECT_FALSE(action.has_learned());
  EXPECT_TRUE(action.has_type());
  EXPECT_EQ(Action::APPEND, action.type());
  EXPECT_FALSE(action.has_nop());
  EXPECT_TRUE(action.has_append());
  EXPECT_FALSE(action.has_truncate());
  EXPECT_EQ("hello world", action.append().bytes());
}


TEST_F(ReplicaTest, Restore)
{
  const string path = os::getcwd() + "/.log";
  initializer.flags.path = path;
  ASSERT_SOME(initializer.execute());

  // By design only a single process can access leveldb at a time. In
  // this test, two instances of log replica need to open a connection
  // to the leveldb. By introducing scope levels we ensure that the first
  // instance is destructed and hence closes the connection before the
  // second instance opens it.
  {
    Replica replica1(path);

    const uint64_t proposal = 1;

    PromiseRequest request1;
    request1.set_proposal(proposal);

    Future<PromiseResponse> response1 =
      protocol::promise(replica1.pid(), request1);

    AWAIT_READY(response1);

    EXPECT_EQ(PromiseResponse::ACCEPT, response1->type());
    EXPECT_TRUE(response1->okay());
    EXPECT_EQ(proposal, response1->proposal());
    EXPECT_TRUE(response1->has_position());
    EXPECT_EQ(0u, response1->position());
    EXPECT_FALSE(response1->has_action());

    WriteRequest request2;
    request2.set_proposal(proposal);
    request2.set_position(1);
    request2.set_type(Action::APPEND);
    request2.mutable_append()->set_bytes("hello world");

    Future<WriteResponse> response2 =
      protocol::write(replica1.pid(), request2);

    AWAIT_READY(response2);

    EXPECT_EQ(WriteResponse::ACCEPT, response2->type());
    EXPECT_TRUE(response2->okay());
    EXPECT_EQ(proposal, response2->proposal());
    EXPECT_EQ(1u, response2->position());

    Future<list<Action>> actions1 = replica1.read(1, 1);

    AWAIT_READY(actions1);
    ASSERT_EQ(1u, actions1->size());

    {
      Action action = actions1->front();
      EXPECT_EQ(1u, action.position());
      EXPECT_EQ(1u, action.promised());
      EXPECT_TRUE(action.has_performed());
      EXPECT_EQ(1u, action.performed());
      EXPECT_FALSE(action.has_learned());
      EXPECT_TRUE(action.has_type());
      EXPECT_EQ(Action::APPEND, action.type());
      EXPECT_FALSE(action.has_nop());
      EXPECT_TRUE(action.has_append());
      EXPECT_FALSE(action.has_truncate());
      EXPECT_EQ("hello world", action.append().bytes());
    }
  }

  {
    Replica replica2(path);

    Future<list<Action>> actions2 = replica2.read(1, 1);

    AWAIT_READY(actions2);
    ASSERT_EQ(1u, actions2->size());

    {
      Action action = actions2->front();
      EXPECT_EQ(1u, action.position());
      EXPECT_EQ(1u, action.promised());
      EXPECT_TRUE(action.has_performed());
      EXPECT_EQ(1u, action.performed());
      EXPECT_FALSE(action.has_learned());
      EXPECT_TRUE(action.has_type());
      EXPECT_EQ(Action::APPEND, action.type());
      EXPECT_FALSE(action.has_nop());
      EXPECT_TRUE(action.has_append());
      EXPECT_FALSE(action.has_truncate());
      EXPECT_EQ("hello world", action.append().bytes());
    }
  }
}


// This test verifies that a non-VOTING replica replies to promise and
// write requests with an "ignored" response.
TEST_F(ReplicaTest, NonVoting)
{
  const string path = os::getcwd() + "/.log";

  Replica replica(path);

  PromiseRequest promiseRequest;
  promiseRequest.set_proposal(2);

  Future<PromiseResponse> promiseResponse =
    protocol::promise(replica.pid(), promiseRequest);

  AWAIT_READY(promiseResponse);

  EXPECT_EQ(PromiseResponse::IGNORED, promiseResponse->type());
  EXPECT_FALSE(promiseResponse->okay());
  EXPECT_EQ(2u, promiseResponse->proposal());

  WriteRequest writeRequest;
  writeRequest.set_proposal(3);
  writeRequest.set_position(1);
  writeRequest.set_type(Action::APPEND);
  writeRequest.mutable_append()->set_bytes("hello world");

  Future<WriteResponse> writeResponse =
    protocol::write(replica.pid(), writeRequest);

  AWAIT_READY(writeResponse);

  EXPECT_EQ(WriteResponse::IGNORED, writeResponse->type());
  EXPECT_FALSE(writeResponse->okay());
  EXPECT_EQ(3u, writeResponse->proposal());
  EXPECT_EQ(1u, writeResponse->position());
}


class CoordinatorTest : public TemporaryDirectoryTest
{
protected:
  // Used to change the status of a replicated log from `EMPTY` to `VOTING`.
  tool::Initialize initializer;
};


TEST_F(CoordinatorTest, Elect)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica1, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  {
    Future<list<Action>> actions = replica1->read(0, 0);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(0u, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::NOP, actions->front().type());
  }
}


// Verifies that a coordinator can get elected with clock paused (no
// retry involved) for an empty log.
TEST_F(CoordinatorTest, ElectWithClockPaused)
{
  Clock::pause();

  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica1, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  Clock::resume();
}


TEST_F(CoordinatorTest, AppendRead)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica1, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  uint64_t position;

  {
    Future<Option<uint64_t>> appending = coord.append("hello world");
    AWAIT_READY(appending);
    ASSERT_SOME(appending.get());
    position = appending->get();
    EXPECT_EQ(1u, position);
  }

  {
    Future<list<Action>> actions = replica1->read(position, position);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(position, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::APPEND, actions->front().type());
    EXPECT_EQ("hello world", actions->front().append().bytes());
  }
}


TEST_F(CoordinatorTest, AppendReadError)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica1, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  uint64_t position;

  {
    Future<Option<uint64_t>> appending = coord.append("hello world");
    AWAIT_READY(appending);
    ASSERT_SOME(appending.get());
    position = appending->get();
    EXPECT_EQ(1u, position);
  }

  {
    position += 1;
    Future<list<Action>> actions = replica1->read(position, position);
    AWAIT_FAILED(actions);
    EXPECT_EQ("Bad read range (past end of log)", actions.failure());
  }
}


TEST_F(CoordinatorTest, AppendDiscarded)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica1, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    ASSERT_SOME(electing.get());
    EXPECT_EQ(0u, electing->get());
  }

  process::terminate(replica2->pid());
  process::wait(replica2->pid());
  replica2.reset();

  {
    Future<Option<uint64_t>> appending = coord.append("hello world");
    ASSERT_TRUE(appending.isPending());

    appending.discard();
    AWAIT_DISCARDED(appending);
  }

  {
    Future<Option<uint64_t>> appending = coord.append("hello moto");
    AWAIT_READY(appending);

    EXPECT_NONE(appending.get());
  }
}


TEST_F(CoordinatorTest, ElectNoQuorum)
{
  const string path = os::getcwd() + "/.log";
  initializer.flags.path = path;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica(new Replica(path));

  set<UPID> pids;
  pids.insert(replica->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica, network);

  Clock::pause();

  Future<Option<uint64_t>> electing = coord.elect();

  Clock::advance(Seconds(10));
  Clock::settle();

  EXPECT_TRUE(electing.isPending());

  Clock::resume();
}


TEST_F(CoordinatorTest, AppendNoQuorum)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica1, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  process::terminate(replica2->pid());
  process::wait(replica2->pid());
  replica2.reset();

  Clock::pause();

  Future<Option<uint64_t>> appending = coord.append("hello world");

  Clock::advance(Seconds(10));
  Clock::settle();

  EXPECT_TRUE(appending.isPending());

  Clock::resume();
}


TEST_F(CoordinatorTest, Failover)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord1(2, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  uint64_t position;

  {
    Future<Option<uint64_t>> appending = coord1.append("hello world");
    AWAIT_READY(appending);
    ASSERT_SOME(appending.get());
    position = appending->get();
    EXPECT_EQ(1u, position);
  }

  Shared<Network> network2(new Network(pids));

  Coordinator coord2(2, replica2, network2);

  {
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(position, electing.get());
  }

  {
    Future<list<Action>> actions = replica2->read(position, position);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(position, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::APPEND, actions->front().type());
    EXPECT_EQ("hello world", actions->front().append().bytes());
  }
}


TEST_F(CoordinatorTest, Demoted)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord1(2, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  uint64_t position1;

  {
    Future<Option<uint64_t>> appending = coord1.append("hello world");
    AWAIT_READY(appending);
    ASSERT_SOME(appending.get());
    position1 = appending->get();
    EXPECT_EQ(1u, position1);
  }

  Shared<Network> network2(new Network(pids));

  Coordinator coord2(2, replica2, network2);

  {
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(position1, electing.get());
  }

  {
    Future<Option<uint64_t>> appending = coord1.append("hello moto");
    AWAIT_READY(appending);
    EXPECT_NONE(appending.get());
  }

  uint64_t position2;

  {
    Future<Option<uint64_t>> appending = coord2.append("hello hello");
    AWAIT_READY(appending);
    ASSERT_SOME(appending.get());
    position2 = appending->get();
    EXPECT_EQ(2u, position2);
  }

  {
    Future<list<Action>> actions = replica2->read(position2, position2);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(position2, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::APPEND, actions->front().type());
    EXPECT_EQ("hello hello", actions->front().append().bytes());
  }
}


TEST_F(CoordinatorTest, Fill)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = os::getcwd() + "/.log3";
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord1(2, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  uint64_t position;

  {
    Future<Option<uint64_t>> appending = coord1.append("hello world");
    AWAIT_READY(appending);
    ASSERT_SOME(appending.get());
    position = appending->get();
    EXPECT_EQ(1u, position);
  }

  Shared<Replica> replica3(new Replica(path3));

  pids.clear();
  pids.insert(replica2->pid());
  pids.insert(replica3->pid());

  Shared<Network> network2(new Network(pids));

  Coordinator coord2(2, replica3, network2);

  {
    // Note that the first election should fail because 'coord2' gets
    // its proposal number from 'replica3' which has an empty log and
    // thus a second attempt will need to be made.
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    ASSERT_NONE(electing.get());

    electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(position, electing.get());
  }

  {
    Future<list<Action>> actions = replica3->read(position, position);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(position, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::APPEND, actions->front().type());
    EXPECT_EQ("hello world", actions->front().append().bytes());
  }
}


TEST_F(CoordinatorTest, NotLearnedFill)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = os::getcwd() + "/.log3";
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  // Drop messages here in order to obtain the pid of replica2. We
  // only want to drop learned message sent to replica2.
  DROP_PROTOBUFS(LearnedMessage(), _, Eq(replica2->pid()));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord1(2, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  uint64_t position;

  {
    Future<Option<uint64_t>> appending = coord1.append("hello world");
    AWAIT_READY(appending);
    ASSERT_SOME(appending.get());
    position = appending->get();
    EXPECT_EQ(1u, position);
  }

  Shared<Replica> replica3(new Replica(path3));

  pids.clear();
  pids.insert(replica2->pid());
  pids.insert(replica3->pid());

  Shared<Network> network2(new Network(pids));

  Coordinator coord2(2, replica3, network2);

  {
    // Note that the first election should fail because 'coord2' gets
    // its proposal number from 'replica3' which has an empty log and
    // thus a second attempt will need to be made.
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    ASSERT_NONE(electing.get());

    electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(position, electing.get());
  }

  {
    Future<list<Action>> actions = replica3->read(position, position);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(position, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::APPEND, actions->front().type());
    EXPECT_EQ("hello world", actions->front().append().bytes());
  }
}


TEST_F(CoordinatorTest, MultipleAppends)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica1, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  {
    Future<list<Action>> actions = replica1->read(1, 10);
    AWAIT_READY(actions);
    EXPECT_EQ(10u, actions->size());
    foreach (const Action& action, actions.get()) {
      ASSERT_TRUE(action.has_type());
      ASSERT_EQ(Action::APPEND, action.type());
      EXPECT_EQ(stringify(action.position()), action.append().bytes());
    }
  }
}


TEST_F(CoordinatorTest, MultipleAppendsNotLearnedFill)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = os::getcwd() + "/.log3";
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  // Drop messages here in order to obtain the pid of replica2. We
  // only want to drop learned message sent to replica2.
  DROP_PROTOBUFS(LearnedMessage(), _, Eq(replica2->pid()));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord1(2, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord1.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  Shared<Replica> replica3(new Replica(path3));

  pids.clear();
  pids.insert(replica2->pid());
  pids.insert(replica3->pid());

  Shared<Network> network2(new Network(pids));

  Coordinator coord2(2, replica3, network2);

  {
    // Note that the first election should fail because 'coord2' gets
    // its proposal number from 'replica3' which has an empty log and
    // thus a second attempt will need to be made.
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    ASSERT_NONE(electing.get());

    electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(10u, electing.get());
  }

  {
    Future<list<Action>> actions = replica3->read(1, 10);
    AWAIT_READY(actions);
    EXPECT_EQ(10u, actions->size());
    foreach (const Action& action, actions.get()) {
      ASSERT_TRUE(action.has_type());
      ASSERT_EQ(Action::APPEND, action.type());
      EXPECT_EQ(stringify(action.position()), action.append().bytes());
    }
  }
}


TEST_F(CoordinatorTest, Truncate)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica1, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  {
    Future<Option<uint64_t>> truncating = coord.truncate(7);
    AWAIT_READY(truncating);
    EXPECT_SOME_EQ(11u, truncating.get());
  }

  {
    Future<list<Action>> actions = replica1->read(6, 10);
    AWAIT_FAILED(actions);
    EXPECT_EQ("Bad read range (truncated position)", actions.failure());
  }

  {
    Future<list<Action>> actions = replica1->read(7, 10);
    AWAIT_READY(actions);
    EXPECT_EQ(4u, actions->size());
    foreach (const Action& action, actions.get()) {
      ASSERT_TRUE(action.has_type());
      ASSERT_EQ(Action::APPEND, action.type());
      EXPECT_EQ(stringify(action.position()), action.append().bytes());
    }
  }
}


TEST_F(CoordinatorTest, TruncateNotLearnedFill)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = os::getcwd() + "/.log3";
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  // Drop messages here in order to obtain the pid of replica2. We
  // only want to drop learned message sent to replica2.
  DROP_PROTOBUFS(LearnedMessage(), _, Eq(replica2->pid()));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord1(2, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord1.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  {
    Future<Option<uint64_t>> truncating = coord1.truncate(7);
    AWAIT_READY(truncating);
    EXPECT_SOME_EQ(11u, truncating.get());
  }

  Shared<Replica> replica3(new Replica(path3));

  pids.clear();
  pids.insert(replica2->pid());
  pids.insert(replica3->pid());

  Shared<Network> network2(new Network(pids));

  Coordinator coord2(2, replica3, network2);

  {
    // Note that the first election should fail because 'coord2' gets
    // its proposal number from 'replica3' which has an empty log and
    // thus a second attempt will need to be made.
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    ASSERT_NONE(electing.get());

    electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(11u, electing.get());
  }

  {
    Future<list<Action>> actions = replica3->read(6, 10);
    AWAIT_FAILED(actions);
    EXPECT_EQ("Bad read range (truncated position)", actions.failure());
  }

  {
    Future<list<Action>> actions = replica3->read(7, 10);
    AWAIT_READY(actions);
    EXPECT_EQ(4u, actions->size());
    foreach (const Action& action, actions.get()) {
      ASSERT_TRUE(action.has_type());
      ASSERT_EQ(Action::APPEND, action.type());
      EXPECT_EQ(stringify(action.position()), action.append().bytes());
    }
  }
}


TEST_F(CoordinatorTest, TruncateLearnedFill)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = os::getcwd() + "/.log3";
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord1(2, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord1.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  {
    Future<Option<uint64_t>> truncating = coord1.truncate(7);
    AWAIT_READY(truncating);
    EXPECT_SOME_EQ(11u, truncating.get());
  }

  Shared<Replica> replica3(new Replica(path3));

  pids.clear();
  pids.insert(replica2->pid());
  pids.insert(replica3->pid());

  Shared<Network> network2(new Network(pids));

  Coordinator coord2(2, replica3, network2);

  {
    // Note that the first election should fail because 'coord2' gets
    // its proposal number from 'replica3' which has an empty log and
    // thus a second attempt will need to be made.
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    ASSERT_NONE(electing.get());

    electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(11u, electing.get());
  }

  {
    Future<list<Action>> actions = replica3->read(6, 10);
    AWAIT_FAILED(actions);
    EXPECT_EQ("Bad read range (truncated position)", actions.failure());
  }

  {
    Future<list<Action>> actions = replica3->read(7, 10);
    AWAIT_READY(actions);
    EXPECT_EQ(4u, actions->size());
    foreach (const Action& action, actions.get()) {
      ASSERT_TRUE(action.has_type());
      ASSERT_EQ(Action::APPEND, action.type());
      EXPECT_EQ(stringify(action.position()), action.append().bytes());
    }
  }
}


class MockReplica : public Replica
{
public:
  explicit MockReplica(const string& path) :
    Replica(path) {}

  ~MockReplica() override {}

  MOCK_METHOD1(update, Future<bool>(const Metadata::Status& status));

  Future<bool> _update(const Metadata::Status& status)
  {
    return Replica::update(status);
  }
};


// If a coordinator tries to get elected while there is not a quorum
// of replicas in VOTING state, the non-VOTING replicas should
// instruct the coordinator that they have ignored the coordinator's
// request, so the coordinator can promptly retry. MESOS-3280.
TEST_F(CoordinatorTest, RecoveryRace)
{
  const string path1 = os::getcwd() + "/.log1";
  const string path2 = os::getcwd() + "/.log2";
  const string path3 = os::getcwd() + "/.log3";

  MockReplica* replica1(new MockReplica(path1));
  MockReplica* replica2(new MockReplica(path2));
  MockReplica* replica3(new MockReplica(path3));

  set<UPID> pids{replica1->pid(), replica2->pid(), replica3->pid()};
  Shared<Network> network(new Network(pids));

  // Set when each replica transitions from EMPTY -> STARTING; the
  // replica will then block until the associated "continue" promise
  // is set.
  Future<Nothing> replica1Starting;
  Future<Nothing> replica2Starting;
  Future<Nothing> replica3Starting;
  process::Promise<bool> replica1ContinueStarting;
  process::Promise<bool> replica2ContinueStarting;
  process::Promise<bool> replica3ContinueStarting;

  // Set when each replica transitions from STARTING -> VOTING.
  Future<Nothing> replica1Voting;
  Future<Nothing> replica2Voting;
  process::Promise<bool> replica1ContinueVoting;
  process::Promise<bool> replica2ContinueVoting;

  // Arrange mocks to allow us to block and unblock each replica when
  // it changes state.
  // TODO(neilc): Refactor this to reduce duplicated code.
  EXPECT_CALL(*replica1, update(_))
    .WillOnce(DoAll(IgnoreResult(Invoke(replica1, &MockReplica::_update)),
                    FutureSatisfy(&replica1Starting),
                    Return(replica1ContinueStarting.future())))
    .WillOnce(DoAll(IgnoreResult(Invoke(replica1, &MockReplica::_update)),
                    FutureSatisfy(&replica1Voting),
                    Return(replica1ContinueVoting.future())));
  EXPECT_CALL(*replica2, update(_))
    .WillOnce(DoAll(IgnoreResult(Invoke(replica2, &MockReplica::_update)),
                    FutureSatisfy(&replica2Starting),
                    Return(replica2ContinueStarting.future())))
    .WillOnce(DoAll(IgnoreResult(Invoke(replica2, &MockReplica::_update)),
                    FutureSatisfy(&replica2Voting),
                    Return(replica2ContinueVoting.future())));
  EXPECT_CALL(*replica3, update(_))
    .WillOnce(DoAll(IgnoreResult(Invoke(replica3, &MockReplica::_update)),
                    FutureSatisfy(&replica3Starting),
                    Return(replica3ContinueStarting.future())))
    .WillRepeatedly(Invoke(replica3, &MockReplica::_update));

  Future<Owned<Replica>> recovering1 =
    recover(2, Owned<Replica>(replica1), network, true);
  Future<Owned<Replica>> recovering2 =
    recover(2, Owned<Replica>(replica2), network, true);
  Future<Owned<Replica>> recovering3 =
    recover(2, Owned<Replica>(replica3), network, true);

  AWAIT_READY(replica1Starting);
  AWAIT_READY(replica2Starting);
  AWAIT_READY(replica3Starting);

  // Allow replica1 to advance from STARTING -> VOTING.
  // TODO(neilc): Due to an apparent bug in FutureResult (MESOS-3812),
  // we can't save the return value of _update() when setting up the
  // mocks above. Hence, we have to assume that _update() returned
  // true, which we then use to unblock the process.
  replica1ContinueStarting.set(true);
  AWAIT_READY(replica1Voting);
  replica1ContinueVoting.set(true);

  AWAIT_READY(recovering1);
  Owned<Replica> shared_ = recovering1.get();
  Shared<Replica> shared = shared_.share();

  // Electing a coordinator should fail because we don't have a quorum
  // of replicas in VOTING status.
  {
    Coordinator coord1(2, shared, network);
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    ASSERT_NONE(electing.get());
  }

  // Allow replica2 to advance from STARTING -> VOTING.
  replica2ContinueStarting.set(true);
  AWAIT_READY(replica2Voting);
  replica2ContinueVoting.set(true);

  AWAIT_READY(recovering2);

  // Electing a coordinator should now succeed.
  Coordinator coord2(2, shared, network);
  {
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  // Allow replica3 to advance from STARTING -> RECOVERING -> VOTING.
  // TODO(neilc): Transition to RECOVERING is dubious and should
  // probably be omitted.
  replica3ContinueStarting.set(true);

  AWAIT_READY(recovering3);

  {
    Future<Option<uint64_t>> appending = coord2.append("hello world");
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(1u, appending.get());
  }
}


class RecoverTest : public TemporaryDirectoryTest
{
protected:
  // Used to change the status of a replicated log from `EMPTY` to `VOTING`.
  tool::Initialize initializer;
};


// Two logs both need recovery compete with each other.
TEST_F(RecoverTest, RacingCatchup)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = os::getcwd() + "/.log3";
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  const string path4 = os::getcwd() + "/.log4";
  const string path5 = os::getcwd() + "/.log5";

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));
  Shared<Replica> replica3(new Replica(path3));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());
  pids.insert(replica3->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord1(3, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord1.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord1.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  // Two replicas both want to recover.
  Owned<Replica> replica4(new Replica(path4));
  Owned<Replica> replica5(new Replica(path5));

  pids.insert(replica4->pid());
  pids.insert(replica5->pid());

  Shared<Network> network2(new Network(pids));

  Future<Owned<Replica>> recovering4 = recover(3, replica4, network2);
  Future<Owned<Replica>> recovering5 = recover(3, replica5, network2);

  // Wait until recovery is done.
  AWAIT_READY(recovering4);
  AWAIT_READY(recovering5);

  Owned<Replica> shared4_ = recovering4.get();
  Shared<Replica> shared4 = shared4_.share();
  Coordinator coord2(3, shared4, network2);

  {
    // Note that the first election should fail because 'coord2' gets
    // its proposal number from 'replica3' which has an empty log and
    // thus a second attempt will need to be made.
    Future<Option<uint64_t>> electing = coord2.elect();
    AWAIT_READY(electing);
    ASSERT_NONE(electing.get());

    electing = coord2.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(10u, electing.get());
  }

  {
    Future<list<Action>> actions = shared4->read(1, 10);
    AWAIT_READY(actions);
    EXPECT_EQ(10u, actions->size());
    foreach (const Action& action, actions.get()) {
      ASSERT_TRUE(action.has_type());
      ASSERT_EQ(Action::APPEND, action.type());
      EXPECT_EQ(stringify(action.position()), action.append().bytes());
    }
  }

  {
    Future<Option<uint64_t>> appending = coord2.append("hello hello");
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(11u, appending.get());
  }

  {
    Future<list<Action>> actions = shared4->read(11u, 11u);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(11u, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::APPEND, actions->front().type());
    EXPECT_EQ("hello hello", actions->front().append().bytes());
  }
}


TEST_F(RecoverTest, CatchupRetry)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = os::getcwd() + "/.log3";

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  // Make sure replica2 does not receive learned messages.
  DROP_PROTOBUFS(LearnedMessage(), _, Eq(replica2->pid()));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());

  Shared<Network> network1(new Network(pids));

  Coordinator coord(2, replica1, network1);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  IntervalSet<uint64_t> positions;

  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
    positions += position;
  }

  Shared<Replica> replica3(new Replica(path3));

  pids.insert(replica3->pid());

  Shared<Network> network2(new Network(pids));

  // Drop a promise request to replica1 so that the catch-up process
  // won't be able to get a quorum of explicit promises. Also, since
  // learned messages are blocked from being sent replica2, the
  // catch-up process has to wait for a quorum of explicit promises.
  // If we don't allow retry, the catch-up process will get stuck at
  // promise phase even if replica1 reemerges later.
  DROP_PROTOBUF(PromiseRequest(), _, Eq(replica1->pid()));

  Future<Nothing> catching =
    catchup(2, replica3, network2, None(), positions, Seconds(10));

  Clock::pause();

  // Wait for the retry timer in 'catchup' to be setup.
  Clock::settle();

  // Wait for the proposal number to be bumped.
  Clock::advance(Seconds(1));
  Clock::settle();

  // Wait for 'catchup' to retry.
  Clock::advance(Seconds(10));
  Clock::settle();

  // Wait for another proposal number bump.
  Clock::advance(Seconds(1));
  Clock::settle();

  Clock::resume();

  AWAIT_READY(catching);
}


TEST_F(RecoverTest, RecoverProtocolRetry)
{
  const string path1 = path::join(os::getcwd(), ".log1");
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = path::join(os::getcwd(), ".log2");
  const string path3 = path::join(os::getcwd(), ".log3");

  Owned<Replica> replica1(new Replica(path1));
  Owned<Replica> replica2(new Replica(path2));
  Owned<Replica> replica3(new Replica(path3));

  set<UPID> pids{replica1->pid(), replica2->pid(), replica3->pid()};
  Shared<Network> network(new Network(pids));

  Future<Owned<Replica>> recovering = recover(2, replica3, network);

  Clock::pause();

  // Wait for the retry timer to be setup.
  Clock::settle();
  ASSERT_TRUE(recovering.isPending());

  // Wait for recover process to retry.
  Clock::advance(Seconds(10));
  Clock::settle();
  ASSERT_TRUE(recovering.isPending());

  // Remove replica 2 from the network to be initialized. It is safe
  // to have non-const access to shared Network here, because all
  // Network operations are serialized through a Process.
  const_cast<Network&>(*network).remove(replica2->pid());
  replica2.reset();

  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  replica2.reset(new Replica(path2));
  const_cast<Network&>(*network).add(replica2->pid());

  // Wait for recover process to retry again, now with 2 VOTING
  // replicas. It should successfully finish now.
  Clock::advance(Seconds(10));
  Clock::resume();

  AWAIT_READY(recovering);
}


TEST_F(RecoverTest, AutoInitialization)
{
  const string path1 = os::getcwd() + "/.log1";
  const string path2 = os::getcwd() + "/.log2";
  const string path3 = os::getcwd() + "/.log3";

  Owned<Replica> replica1(new Replica(path1));
  Owned<Replica> replica2(new Replica(path2));
  Owned<Replica> replica3(new Replica(path3));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());
  pids.insert(replica3->pid());

  Shared<Network> network(new Network(pids));

  Future<Owned<Replica>> recovering1 = recover(2, replica1, network, true);
  Future<Owned<Replica>> recovering2 = recover(2, replica2, network, true);

  // Verifies that replica1 and replica2 cannot transit into VOTING
  // status because replica3 is still in EMPTY status. We flush the
  // event queue before checking.
  Clock::pause();
  Clock::settle();
  Clock::resume();

  EXPECT_TRUE(recovering1.isPending());
  EXPECT_TRUE(recovering2.isPending());

  Future<Owned<Replica>> recovering3 = recover(2, replica3, network, true);

  Clock::pause();
  Clock::settle();

  // At this moment `replica1` and `replica2` are in EMPTY status, and
  // are retrying with a random interval between [0.5 sec, 1 sec]. Since
  // the retry interval is hard coded and is not configurable, we need
  // to advance the clock here to avoid waiting for the backoff time.
  Clock::advance(Seconds(1));
  Clock::resume();

  AWAIT_READY(recovering1);
  AWAIT_READY(recovering2);
  AWAIT_READY(recovering3);

  Owned<Replica> shared_ = recovering1.get();
  Shared<Replica> shared = shared_.share();

  Coordinator coord(2, shared, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  {
    Future<Option<uint64_t>> appending = coord.append("hello world");
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(1u, appending.get());
  }

  {
    Future<list<Action>> actions = shared->read(1, 1);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(1u, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::APPEND, actions->front().type());
    EXPECT_EQ("hello world", actions->front().append().bytes());
  }
}


TEST_F(RecoverTest, AutoInitializationRetry)
{
  const string path1 = os::getcwd() + "/.log1";
  const string path2 = os::getcwd() + "/.log2";
  const string path3 = os::getcwd() + "/.log3";

  Owned<Replica> replica1(new Replica(path1));
  Owned<Replica> replica2(new Replica(path2));
  Owned<Replica> replica3(new Replica(path3));

  set<UPID> pids;
  pids.insert(replica1->pid());
  pids.insert(replica2->pid());
  pids.insert(replica3->pid());

  Shared<Network> network(new Network(pids));

  // Simulate the case where replica3 is temporarily removed.
  DROP_PROTOBUF(RecoverRequest(), _, Eq(replica3->pid()));
  DROP_PROTOBUF(RecoverRequest(), _, Eq(replica3->pid()));

  Clock::pause();

  Future<Owned<Replica>> recovering1 = recover(2, replica1, network, true);
  Future<Owned<Replica>> recovering2 = recover(2, replica2, network, true);

  // Flush the event queue.
  Clock::settle();

  EXPECT_TRUE(recovering1.isPending());
  EXPECT_TRUE(recovering2.isPending());

  Future<Owned<Replica>> recovering3 = recover(2, replica3, network, true);

  // Replica1 and replica2 will retry recovery after 10 seconds.
  Clock::advance(Seconds(10));
  Clock::settle();

  Clock::resume();

  AWAIT_READY(recovering1);
  AWAIT_READY(recovering2);
  AWAIT_READY(recovering3);

  Owned<Replica> shared_ = recovering1.get();
  Shared<Replica> shared = shared_.share();

  Coordinator coord(2, shared, network);

  {
    Future<Option<uint64_t>> electing = coord.elect();
    AWAIT_READY(electing);
    EXPECT_SOME_EQ(0u, electing.get());
  }

  {
    Future<Option<uint64_t>> appending = coord.append("hello world");
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(1u, appending.get());
  }

  {
    Future<list<Action>> actions = shared->read(1, 1);
    AWAIT_READY(actions);
    ASSERT_EQ(1u, actions->size());
    EXPECT_EQ(1u, actions->front().position());
    ASSERT_TRUE(actions->front().has_type());
    ASSERT_EQ(Action::APPEND, actions->front().type());
    EXPECT_EQ("hello world", actions->front().append().bytes());
  }
}


TEST_F(RecoverTest, CatchupTruncated)
{
  const string path1 = path::join(os::getcwd(), ".log1");
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = path::join(os::getcwd(), ".log2");
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = path::join(os::getcwd(), ".log3");

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids{replica1->pid(), replica2->pid()};
  Shared<Network> network1(new Network(pids));

  Coordinator coord(2, replica1, network1);
  Future<Option<uint64_t>> electing = coord.elect();
  AWAIT_READY(electing);
  EXPECT_SOME_EQ(0u, electing.get());

  // Add some positions to the log.
  IntervalSet<uint64_t> positions;
  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
    positions += position;
  }

  // Truncate the log.
  Future<Option<uint64_t>> truncating = coord.truncate(5);
  AWAIT_READY(truncating);
  EXPECT_SOME_EQ(11u, truncating.get());

  Shared<Replica> replica3(new Replica(path3));

  pids.insert(replica3->pid());
  Shared<Network> network2(new Network(pids));

  // Pretend we recovered stale 'begin' position of the log before
  // truncation has happened.
  Future<Nothing> catching = catchup(
      2, replica3, network2, None(), positions, Seconds(10));
  AWAIT_READY(catching);

  AWAIT_EXPECT_EQ(5u, replica3->beginning());
  AWAIT_EXPECT_EQ(10u, replica3->ending());

  // Recreate the replica to verify that storage recovery succeeds.
  // Make sure no one retains a shared pointer to replica3 and thus
  // can prevent the DB from closing before we proceed.
  AWAIT_READY(replica3.own());
  replica3.reset(new Replica(path3));

  AWAIT_EXPECT_EQ(5u, replica3->beginning());
  AWAIT_EXPECT_EQ(10u, replica3->ending());
}


// Verifiy that we can catch-up a following VOTING replica.
TEST_F(RecoverTest, CatchupVoting)
{
  const string path1 = path::join(os::getcwd(), ".log1");
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = path::join(os::getcwd(), ".log2");
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = path::join(os::getcwd(), ".log3");
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids{replica1->pid(), replica2->pid()};
  Shared<Network> network1(new Network(pids));

  Coordinator coord(2, replica1, network1);
  Future<Option<uint64_t>> electing = coord.elect();
  AWAIT_READY(electing);
  EXPECT_SOME_EQ(0u, electing.get());

  // Add some entries to the log.
  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  // Truncate the log.
  Future<Option<uint64_t>> truncating = coord.truncate(5);
  AWAIT_READY(truncating);
  EXPECT_SOME_EQ(11u, truncating.get());

  // Create one more replica. It is in VOTING status, but it missed
  // positions adding and truncation.
  Shared<Replica> replica3(new Replica(path3));

  pids.insert(replica3->pid());
  Shared<Network> network2(new Network(pids));

  // Catch-up the VOTING replica for reading. We're using 3 as the
  // quorum size here to simulate recovering a stale lowest position
  // (from the local replica).
  Future<uint64_t> catching = catchup(3, replica3, network2);
  AWAIT_EXPECT_EQ(10u, catching);

  Future<uint64_t> begin = replica3->beginning();
  AWAIT_EXPECT_EQ(5u, begin);

  Future<uint64_t> end = replica3->ending();
  AWAIT_EXPECT_EQ(catching.get(), end);

  Future<list<Action>> actions = replica3->read(begin.get(), end.get());
  AWAIT_READY(actions);
  EXPECT_EQ(end.get() - begin.get() + 1, actions->size());
  foreach (const Action& action, actions.get()) {
    ASSERT_TRUE(action.has_type());
    ASSERT_EQ(Action::APPEND, action.type());
    EXPECT_EQ(stringify(action.position()), action.append().bytes());
  }
}


// Verifiy that we can catch-up a following VOTING replica.
TEST_F(RecoverTest, CatchupVotingWithGap)
{
  const string path1 = path::join(os::getcwd(), ".log1");
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = path::join(os::getcwd(), ".log2");
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = path::join(os::getcwd(), ".log3");
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids{replica1->pid(), replica2->pid()};
  Shared<Network> network1(new Network(pids));

  Coordinator coord(2, replica1, network1);
  Future<Option<uint64_t>> electing = coord.elect();
  AWAIT_READY(electing);
  EXPECT_SOME_EQ(0u, electing.get());

  // Add some entries to the log.
  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  // Truncate the log.
  Future<Option<uint64_t>> truncating = coord.truncate(5);
  AWAIT_READY(truncating);
  EXPECT_SOME_EQ(11u, truncating.get());

  // Create one more replica. It is in VOTING status, but it missed
  // positions adding and truncation.
  Shared<Replica> replica3(new Replica(path3));

  pids.insert(replica3->pid());
  Shared<Network> network2(new Network(pids));

  // Make sure replica3 doesn't receive recover request, so we won't
  // recover stale 'begin' position.
  DROP_PROTOBUFS(RecoverRequest(), _, Eq(replica3->pid()));

  // Catch-up the VOTING replica for reading.
  Future<uint64_t> catching = catchup(2, replica3, network2);
  AWAIT_EXPECT_EQ(10u, catching);

  Future<uint64_t> begin = replica3->beginning();
  AWAIT_EXPECT_EQ(5u, begin);

  Future<uint64_t> end = replica3->ending();
  AWAIT_EXPECT_EQ(catching.get(), end);

  Future<list<Action>> actions = replica3->read(begin.get(), end.get());
  AWAIT_READY(actions);
  EXPECT_EQ(end.get() - begin.get() + 1, actions->size());
  foreach (const Action& action, actions.get()) {
    ASSERT_TRUE(action.has_type());
    ASSERT_EQ(Action::APPEND, action.type());
    EXPECT_EQ(stringify(action.position()), action.append().bytes());
  }
}


// Verifiy that catch-up fails if we recover only 1 position.
TEST_F(RecoverTest, CatchupVotingOnePosition)
{
  const string path1 = path::join(os::getcwd(), ".log1");
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = path::join(os::getcwd(), ".log2");
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = path::join(os::getcwd(), ".log3");
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));
  Shared<Replica> replica3(new Replica(path3));

  set<UPID> pids{replica1->pid(), replica2->pid(), replica3->pid()};
  Shared<Network> network(new Network(pids));

  AWAIT_FAILED(catchup(2, replica3, network));
  AWAIT_EXPECT_EQ(0u, replica3->beginning());
  AWAIT_EXPECT_EQ(0u, replica3->ending());
}


class LogTest : public TemporaryDirectoryTest
{
protected:
  // Used to change the status of a replicated log from `EMPTY` to `VOTING`.
  tool::Initialize initializer;
};


TEST_F(LogTest, WriteRead)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Replica replica1(path1);

  set<UPID> pids;
  pids.insert(replica1.pid());

  Log log(2, path2, pids);

  Log::Writer writer(&log);

  Future<Option<Log::Position>> start = writer.start();

  AWAIT_READY(start);
  ASSERT_SOME(start.get());

  Future<Option<Log::Position>> position = writer.append("hello world");

  AWAIT_READY(position);
  ASSERT_SOME(position.get());

  Log::Reader reader(&log);

  Future<list<Log::Entry>> entries =
    reader.read(position->get(), position->get());

  AWAIT_READY(entries);

  ASSERT_EQ(1u, entries->size());
  EXPECT_EQ(position->get(), entries->front().position);
  EXPECT_EQ("hello world", entries->front().data);
}


TEST_F(LogTest, Position)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  Replica replica1(path1);

  set<UPID> pids;
  pids.insert(replica1.pid());

  Log log(2, path2, pids);

  Log::Writer writer(&log);

  Future<Option<Log::Position>> start = writer.start();

  AWAIT_READY(start);
  ASSERT_SOME(start.get());

  Future<Option<Log::Position>> position = writer.append("hello world");

  AWAIT_READY(position);
  ASSERT_SOME(position.get());

  ASSERT_EQ(
      position->get(),
      log.position(position->get().identity()));
}


TEST_F(LogTest, Metrics)
{
  // TODO(jieyu): Added a check for the case where the log is not
  // recovered once MESOS-5626 is resolved. One way to do that is to
  // create a log without running the initializaiton tool.

  const string path = os::getcwd() + "/.log";
  initializer.flags.path = path;
  ASSERT_SOME(initializer.execute());

  Log log(1, path, set<process::UPID>(), false, "prefix/");

  // Make sure the log is recovered. If the log is not recovered, the
  // writer cannot be elected.
  Log::Writer writer(&log);

  Future<Option<Log::Position>> start = writer.start();

  AWAIT_READY(start);
  ASSERT_SOME(start.get());

  JSON::Object snapshot = Metrics();

  ASSERT_EQ(1u, snapshot.values.count("prefix/log/recovered"));
  EXPECT_EQ(1, snapshot.values["prefix/log/recovered"]);

  ASSERT_EQ(1u, snapshot.values.count("prefix/log/ensemble_size"));
  EXPECT_EQ(1, snapshot.values["prefix/log/ensemble_size"]);
}


TEST_F(LogTest, ReaderCatchup)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  const string path3 = os::getcwd() + "/.log3";
  initializer.flags.path = path3;
  ASSERT_SOME(initializer.execute());

  Shared<Replica> replica1(new Replica(path1));
  Shared<Replica> replica2(new Replica(path2));

  set<UPID> pids{replica1->pid(), replica2->pid()};
  Shared<Network> network(new Network(pids));

  Coordinator coord(2, replica2, network);

  Future<Option<uint64_t>> electing = coord.elect();
  AWAIT_READY(electing);
  EXPECT_SOME_EQ(0u, electing.get());

  // Add some entries to the log.
  for (uint64_t position = 1; position <= 10; position++) {
    Future<Option<uint64_t>> appending = coord.append(stringify(position));
    AWAIT_READY(appending);
    EXPECT_SOME_EQ(position, appending.get());
  }

  Log log3(2, path3, pids);
  Log::Reader reader(&log3);

  // Catch-up the replica that missed positions adding.
  Future<Log::Position> end = reader.catchup();
  AWAIT_READY(end);

  Future<Log::Position> begin = reader.beginning();
  AWAIT_READY(begin);

  // We expect to read 9 entries instead of 10, because the catch-up
  // procedure doesn't catch-up the last recovered position. See
  // comments for RecoverMissingProcess.
  Future<list<Log::Entry>> entries = reader.read(begin.get(), end.get());
  AWAIT_READY(entries);
  ASSERT_EQ(9u, entries->size());

  uint64_t position = 1;
  foreach (const Log::Entry& entry, entries.get()) {
    EXPECT_EQ(stringify(position), entry.data);
    ++position;
  }
}


#ifdef MESOS_HAS_JAVA
// TODO(jieyu): We copy the code from TemporaryDirectoryTest here
// because we cannot inherit from two test fixtures. In this future,
// we need a way to compose multiple test fixtures together.
class LogZooKeeperTest : public ZooKeeperTest
{
protected:
  void SetUp() override
  {
    ZooKeeperTest::SetUp();

    // Save the current working directory.
    cwd = os::getcwd();

    // Create a temporary directory for the test.
    Try<string> directory = environment->mkdtemp();

    ASSERT_SOME(directory) << "Failed to mkdtemp";

    sandbox = directory.get();

    LOG(INFO) << "Using temporary directory '" << sandbox.get() << "'";

    // Run the test out of the temporary directory we created.
    ASSERT_SOME(os::chdir(sandbox.get()))
      << "Failed to chdir into '" << sandbox.get() << "'";
  }

  void TearDown() override
  {
    // Return to previous working directory and cleanup the sandbox.
    ASSERT_SOME(os::chdir(cwd));

    if (sandbox.isSome()) {
      ASSERT_SOME(os::rmdir(sandbox.get()));
    }
  }

  // Used to change the status of a replicated log from `EMPTY` to `VOTING`.
  tool::Initialize initializer;

private:
  string cwd;
  Option<string> sandbox;
};


TEST_F(LogZooKeeperTest, WriteRead)
{
  const string path1 = os::getcwd() + "/.log1";
  initializer.flags.path = path1;
  ASSERT_SOME(initializer.execute());

  const string path2 = os::getcwd() + "/.log2";
  initializer.flags.path = path2;
  ASSERT_SOME(initializer.execute());

  string servers = server->connectString();

  Log log1(2, path1, servers, NO_TIMEOUT, "/log/", None());
  Log log2(2, path2, servers, NO_TIMEOUT, "/log/", None());

  Log::Writer writer(&log2);

  Future<Option<Log::Position>> start = writer.start();

  AWAIT_READY(start);
  ASSERT_SOME(start.get());

  Future<Option<Log::Position>> position = writer.append("hello world");

  AWAIT_READY(position);
  ASSERT_SOME(position.get());

  Log::Reader reader(&log2);

  Future<list<Log::Entry>> entries =
    reader.read(position->get(), position->get());

  AWAIT_READY(entries);

  ASSERT_EQ(1u, entries->size());
  EXPECT_EQ(position->get(), entries->front().position);
  EXPECT_EQ("hello world", entries->front().data);
}


TEST_F(LogZooKeeperTest, LostZooKeeper)
{
  const string path = os::getcwd() + "/.log";
  const string servers = server->connectString();

  // We reply on auto-initialization to initialize the log.
  Log log(1, path, servers, NO_TIMEOUT, "/log/", None(), true);

  Log::Writer writer(&log);

  Future<Option<Log::Position>> start = writer.start();

  AWAIT_READY(start);
  ASSERT_SOME(start.get());

  // Shutdown ZooKeeper network.
  server->shutdownNetwork();

  // We should still be able to append as the local replica is in the
  // base set of the ZooKeeper network.
  Future<Option<Log::Position>> position = writer.append("hello world");

  AWAIT_READY(position);
  ASSERT_SOME(position.get());

  Log::Reader reader(&log);

  Future<list<Log::Entry>> entries =
    reader.read(position->get(), position->get());

  AWAIT_READY(entries);

  ASSERT_EQ(1u, entries->size());
  EXPECT_EQ(position->get(), entries->front().position);
  EXPECT_EQ("hello world", entries->front().data);
}
#endif // MESOS_HAS_JAVA


TEST_F(CoordinatorTest, RacingElect) {}

TEST_F(CoordinatorTest, FillNoQuorum) {}

TEST_F(CoordinatorTest, FillInconsistent) {}

TEST_F(CoordinatorTest, LearnedOnOneReplica_NotLearnedOnAnother) {}

TEST_F(CoordinatorTest,
       LearnedOnOneReplica_NotLearnedOnAnother_AnotherFailsAndRecovers) {}

} // namespace tests {
} // namespace internal {
} // namespace mesos {
