blob: b366818767adb259ee6805f9978fd330b790a264 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdint.h>
#include <stdlib.h>
#include <set>
#include <mesos/type_utils.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include "log/catchup.hpp"
#include "log/recover.hpp"
#include "messages/log.hpp"
using namespace process;
using std::set;
namespace mesos {
namespace internal {
namespace log {
// This class is responsible for executing the log recover protocol.
// Any time a replica in non-VOTING status starts, we will run this
// protocol. We first broadcast a recover request to all the replicas
// in the network, and then collect recover responses to decide what
// status the local replica should be in next. The details of the
// recover protocol is shown as follows:
// A) Broadcast a RecoverRequest to all replicas in the network.
// B) Collect RecoverResponse from each replica
// B1) If a quorum of replicas are found in VOTING status (no matter
// what status the local replica is in currently), the local
// replica will be put in RECOVERING status next.
// B2) If the local replica is in EMPTY status and all replicas are
// found in either EMPTY status or STARTING status, the local
// replica will be put in STARTING status next.
// B3) If the local replica is in STARTING status and all replicas
// are found in either STARTING status or VOTING status, the
// local replica will be put in VOTING status next.
// B4) Otherwise, goto (A).
// (B2 and B3 are used to do the two-phase auto initialization.)
// We re-use RecoverResponse to specify the return value. The 'status'
// field specifies the next status of the local replica. If the next
// status is RECOVERING, we set the fields 'begin' and 'end' to be the
// lowest begin and highest end position seen in these responses.
class RecoverProtocolProcess : public Process<RecoverProtocolProcess>
size_t _quorum,
const Shared<Network>& _network,
const Metadata::Status& _status,
bool _autoInitialize,
const Duration& _timeout)
: ProcessBase(ID::generate("log-recover-protocol")),
terminating(false) {}
Future<RecoverResponse> future() { return promise.future(); }
virtual void initialize()
// Register a callback to handle user initiated discard.
promise.future().onDiscard(defer(self(), &Self::discard));
static Future<Option<RecoverResponse>> timedout(
Future<Option<RecoverResponse>> future,
const Duration& timeout)
LOG(INFO) << "Unable to finish the recover protocol in "
<< timeout << ", retrying";
// The 'future' will eventually become DISCARDED, at which time we
// will re-run the recover protocol. We use the boolean flag
// 'terminating' to distinguish between a user initiated discard
// and a timeout induced discard.
return future;
void discard()
// TODO(jieyu): The discard logic here is problematic. It is
// likely that this process never terminates after 'discard' has
// been called on the returned future. See details in MESOS-5626.
terminating = true;
void start()
VLOG(2) << "Starting to wait for enough quorum of replicas before running "
<< "recovery protocol, expected quroum size: " << stringify(quorum);
// Wait until there are enough (i.e., quorum of) replicas in the
// network to avoid unnecessary retries.
chain = network->watch(quorum, Network::GREATER_THAN_OR_EQUAL_TO)
.then(defer(self(), &Self::broadcast))
.then(defer(self(), &Self::receive))
.after(timeout, lambda::bind(&Self::timedout, lambda::_1, timeout))
.onAny(defer(self(), &Self::finished, lambda::_1));
Future<Nothing> broadcast()
VLOG(2) << "Broadcasting recover request to all replicas";
// Broadcast recover request to all replicas.
return network->broadcast(protocol::recover, RecoverRequest())
.then(defer(self(), &Self::broadcasted, lambda::_1));
Future<Nothing> broadcasted(const set<Future<RecoverResponse>>& _responses)
VLOG(2) << "Broadcast request completed";
responses = _responses;
// Reset the counters.
lowestBeginPosition = None();
highestEndPosition = None();
return Nothing();
// Returns None if we need to re-run the protocol.
Future<Option<RecoverResponse>> receive()
if (responses.empty()) {
// All responses have been received but we haven't received
// enough (i.e., a quorum of) responses from VOTING replicas to
// start the catch-up. We will re-run the recovery protocol.
return None();
// Instead of using a for loop here, we use select to process
// responses one after another so that we can ignore the rest if
// we have collected enough responses.
return select(responses)
.then(defer(self(), &Self::received, lambda::_1));
Future<Option<RecoverResponse>> received(
const Future<RecoverResponse>& future)
// Enforced by the select semantics.
// Remove this future from 'responses' so that we do not listen on
// it the next time we invoke select.
const RecoverResponse& response = future.get();
LOG(INFO) << "Received a recover response from a replica in "
<< response.status() << " status";
// We need to remember the lowest begin position and highest end
// position seen from VOTING replicas.
if (response.status() == Metadata::VOTING) {
CHECK(response.has_begin() && response.has_end());
lowestBeginPosition = min(lowestBeginPosition, response.begin());
highestEndPosition = max(highestEndPosition, response.end());
// If we got responses from a quorum of VOTING replicas, the local
// replica will be put in RECOVERING status and start catching up.
// It is likely that the local replica is in RECOVERING status
// already. This is the case where the replica crashes during
// catch-up. When it restarts, we need to recalculate the lowest
// begin position and the highest end position since we haven't
// persisted this information on disk.
if (responsesReceived[Metadata::VOTING] >= quorum) {
CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get());
RecoverResponse result;
return result;
// TODO(jieyu): Currently, we simply calculate the size of the
// cluster from the quorum size. In the future, we may want to
// allow users to specify the cluster size in case they want to
// use a non-standard quorum size (e.g., cluster size = 5, quorum
// size = 4).
size_t clusterSize = (2 * quorum) - 1;
if (autoInitialize) {
// The following code handles the auto-initialization. Our idea
// is: we allow a replica in EMPTY status to become VOTING
// immediately if it finds ALL (i.e., 2 * quorum - 1) replicas
// are in EMPTY status. This is based on the assumption that the
// only time ALL replicas are in EMPTY status is during start-up
// This may not be true if we have a catastrophic failure in
// which all replicas are gone, and that's exactly the reason we
// allow users to disable auto-initialization.
// To do auto-initialization, if we use a single phase protocol
// and allow a replica to directly transit from EMPTY status to
// VOTING status, we may run into a state where we cannot make
// progress even if all replicas are in EMPTY status initially.
// For example, say the quorum size is 2. All replicas are in
// EMPTY status initially. One replica broadcasts a recover
// request and becomes VOTING before other replicas start
// broadcasting recover requests. In that case, no replica can
// make progress. To solve this problem, we use a two-phase
// protocol and introduce an intermediate transient status
// (STARTING) between EMPTY and VOTING status. A replica in
// EMPTY status can transit to STARTING status if it find all
// replicas are in either EMPTY or STARTING status. A replica in
// STARTING status can transit to VOTING status if it finds all
// replicas are in either STARTING or VOTING status. In that
// way, in our previous example, all replicas will be in
// STARTING status before any of them can transit to VOTING
// status.
switch (status) {
case Metadata::EMPTY:
if ((responsesReceived[Metadata::EMPTY] +
responsesReceived[Metadata::STARTING]) >= clusterSize) {
RecoverResponse result;
return result;
case Metadata::STARTING:
if ((responsesReceived[Metadata::STARTING] +
responsesReceived[Metadata::VOTING]) >= clusterSize) {
RecoverResponse result;
return result;
// Ignore all other cases.
} else {
// Since auto initialization is disabled, we print an advisory
// message to remind the user to initialize the log manually.
if (responsesReceived[Metadata::EMPTY] >= clusterSize) {
LOG(WARNING) << "\n"
<< "----------------------------------------------------\n"
<< "Replicated log has not been initialized. Did you\n"
<< "forget to manually initialize the log (i.e.,\n"
<< "mesos-log initialize --path=<PATH>)? Note that all\n"
<< "replicas are not initialized and the above command\n"
<< "needs to be run on each host!\n"
<< "----------------------------------------------------";
// Handle the next response.
return receive();
void finished(const Future<Option<RecoverResponse>>& future)
if (future.isDiscarded()) {
// We use the boolean flag 'terminating' to distinguish between
// a user initiated discard and a timeout induced discard. In
// the case of a user initiated discard, the flag 'terminating'
// will be set to true in 'Self::discard()'.
if (terminating) {
} else {
VLOG(2) << "Log recovery timed out waiting for responses, retrying";
start(); // Re-run the recover protocol after timeout.
} else if (future.isFailed()) {;
} else if (future.get().isNone()) {
// Re-run the protocol. We add a random delay before each retry
// because we do not want to saturate the network/disk IO in
// some cases. The delay is chosen randomly to reduce the
// likelihood of conflicts (i.e., a replica receives a recover
// request while it is changing its status).
static const Duration T = Milliseconds(500);
Duration d = T * (1.0 + (double) os::random() / RAND_MAX);
VLOG(2) << "Didn't receive enough responses for recovery, retrying "
<< "in " << stringify(d);
delay(d, self(), &Self::start);
} else {
const size_t quorum;
const Shared<Network> network;
const Metadata::Status status;
const bool autoInitialize;
const Duration timeout;
set<Future<RecoverResponse>> responses;
hashmap<Metadata::Status, size_t> responsesReceived;
Option<uint64_t> lowestBeginPosition;
Option<uint64_t> highestEndPosition;
Future<Option<RecoverResponse>> chain;
bool terminating;
process::Promise<RecoverResponse> promise;
// The wrapper for running the recover protocol. We will re-run the
// recover protocol if it cannot be finished within 'timeout'.
static Future<RecoverResponse> runRecoverProtocol(
size_t quorum,
const Shared<Network>& network,
const Metadata::Status& status,
bool autoInitialize,
const Duration& timeout = Seconds(10))
RecoverProtocolProcess* process =
new RecoverProtocolProcess(
Future<RecoverResponse> future = process->future();
spawn(process, true);
return future;
// This process is used to recover a replica. We first check the
// status of the local replica. If it is in VOTING status, the recover
// process will terminate immediately. If the local replica is in
// non-VOTING status, we will run the log recover protocol described
// above to decide what status the local replica should be in next. If
// the next status is determined to be RECOVERING, we will start doing
// catch-up. Later, if the local replica has caught-up, we will set
// the status of the local replica to VOTING and terminate the
// process, indicating the recovery has completed. If all replicas are
// in EMPTY status and auto-initialization is enabled, a two-phase
// protocol will be used to bootstrap the replicated log.
// Here, we list a few scenarios and show how the recover process will
// respond in those scenarios. All the examples assume a quorum size
// of 2. Remember that a new replica is always put in EMPTY status
// initially.
// 1) Replica A, B and C are all in VOTING status. The operator adds
// replica D. In that case, D will go into RECOVERING status and
// then go into VOTING status. Therefore, we should avoid adding a
// new replica unless we know that one replica has been removed.
// 2) Replica A and B are in VOTING status. The operator adds replica
// C. In that case, C will go into RECOVERING status and then go
// into VOTING status, which is expected.
// 3) Replica A is in VOTING status. The operator adds replica B. In
// that case, B will stay in EMPTY status forever. This is expected
// because we cannot make progress if VOTING replicas are not
// enough (i.e., less than quorum).
// 4) Replica A is in VOTING status and B is in EMPTY status. The
// operator adds replica C. In that case, C will stay in EMPTY
// status forever similar to case 3).
// 5) Replica A, B and C are all in EMPTY status. Depending on whether
// auto-initialization is enabled or not, the replicas will behave
// differently. If auto-initialization is enabled, all replicas
// will first go into STARTING status. Once *all* replicas have
// transitioned out of EMPTY status, the replicas will go into
// VOTING status. If auto-initialization is disabled, all replicas
// will remain in EMPTY status.
class RecoverProcess : public Process<RecoverProcess>
size_t _quorum,
const Owned<Replica>& _replica,
const Shared<Network>& _network,
bool _autoInitialize)
: ProcessBase(ID::generate("log-recover")),
autoInitialize(_autoInitialize) {}
Future<Owned<Replica>> future() { return promise.future(); }
virtual void initialize()
LOG(INFO) << "Starting replica recovery";
// Register a callback to handle user initiated discard.
promise.future().onDiscard(defer(self(), &Self::discard));
// Check the current status of the local replica and decide if
// recovery is needed. Recovery is needed only if the local
// replica is not in VOTING status.
chain = replica->status()
.then(defer(self(), &Self::recover, lambda::_1))
.onAny(defer(self(), &Self::finished, lambda::_1));
virtual void finalize()
VLOG(1) << "Recover process terminated";
void discard()
Future<Nothing> recover(const Metadata::Status& status)
LOG(INFO) << "Replica is in " << status << " status";
if (status == Metadata::VOTING) {
// No need to do recovery.
return Nothing();
} else {
return runRecoverProtocol(quorum, network, status, autoInitialize)
.then(defer(self(), &Self::_recover, lambda::_1));
Future<Nothing> _recover(const RecoverResponse& result)
switch (result.status()) {
case Metadata::STARTING:
// This is the auto-initialization case. As mentioned above, we
// use a two-phase protocol to bootstrap. When the control
// reaches here, the first phase just ended. We start the second
// phase by re-running the recover protocol.
return updateReplicaStatus(Metadata::STARTING)
.then(defer(self(), &Self::recover, Metadata::STARTING));
case Metadata::VOTING:
// This is the also the auto-initialization case. When the
// control reaches here, the second phase just ended.
return updateReplicaStatus(Metadata::VOTING);
case Metadata::RECOVERING:
CHECK(result.has_begin() && result.has_end());
return updateReplicaStatus(Metadata::RECOVERING)
.then(defer(self(), &Self::catchup, result.begin(), result.end()));
return Failure("Unexpected status returned from the recover protocol");
Future<Nothing> catchup(uint64_t begin, uint64_t end)
// We reach here either because the log is empty (uninitialized),
// or the log is not empty but a previous unfinished catch-up
// attempt has been detected (the process crashes/killed when
// catching up). In either case, the local replica may have lost
// some data and Paxos states, and should not be allowed to vote.
// Otherwise, we may introduce inconsistency in the log as the
// local replica could have accepted a write which it would not
// have accepted if the data and the Paxos states were not lost.
// Now, the question is how many positions the local replica
// should catch up before it can be allowed to vote. We find that
// it is sufficient to catch-up positions from _begin_ to _end_
// where _begin_ is the smallest position seen in a quorum of
// VOTING replicas and _end_ is the largest position seen in a
// quorum of VOTING replicas. Here is the correctness argument.
// For a position _e_ larger than _end_, obviously no value has
// been agreed on for that position. Otherwise, we should find at
// least one VOTING replica in a quorum of replicas such that its
// end position is larger than _end_. For the same reason, a
// coordinator should not have collected enough promises for
// position _e_. Therefore, it's safe for the local replica to
// vote for that position. For a position _b_ smaller than
// _begin_, it should have already been truncated and the
// truncation should have already been agreed. Therefore, allowing
// the local replica to vote for that position is safe.
CHECK_LE(begin, end);
LOG(INFO) << "Starting catch-up from position " << begin << " to " << end;
IntervalSet<uint64_t> positions(
// Share the ownership of the replica. From this point until the
// point where the ownership of the replica is regained, we should
// not access the 'replica' field.
Shared<Replica> shared = replica.share();
// Since we do not know what proposal number to use (the log is
// empty), we use none and leave log::catchup to automatically
// bump the proposal number.
return log::catchup(quorum, shared, network, None(), positions)
.then(defer(self(), &Self::getReplicaOwnership, shared))
.then(defer(self(), &Self::updateReplicaStatus, Metadata::VOTING));
Future<Nothing> updateReplicaStatus(const Metadata::Status& status)
LOG(INFO) << "Updating replica status to " << status;
return replica->update(status)
.then(defer(self(), &Self::_updateReplicaStatus, lambda::_1, status));
Future<Nothing> _updateReplicaStatus(
bool updated, const Metadata::Status& status)
if (!updated) {
return Failure("Failed to update replica status");
if (status == Metadata::VOTING) {
LOG(INFO) << "Successfully joined the Paxos group";
return Nothing();
Future<Nothing> getReplicaOwnership(Shared<Replica> shared)
// Try to re-gain the ownership of the replica.
return shared.own()
.then(defer(self(), &Self::_getReplicaOwnership, lambda::_1));
Future<Nothing> _getReplicaOwnership(Owned<Replica> owned)
replica = owned;
return Nothing();
void finished(const Future<Nothing>& future)
if (future.isDiscarded()) {
} else if (future.isFailed()) {;
} else {
const size_t quorum;
Owned<Replica> replica;
const Shared<Network> network;
const bool autoInitialize;
Future<Nothing> chain;
process::Promise<Owned<Replica>> promise;
Future<Owned<Replica>> recover(
size_t quorum,
const Owned<Replica>& replica,
const Shared<Network>& network,
bool autoInitialize)
RecoverProcess* process =
new RecoverProcess(
Future<Owned<Replica>> future = process->future();
spawn(process, true);
return future;
} // namespace log {
} // namespace internal {
} // namespace mesos {