blob: e6d7b7de2d606e567f656c552cea020e9c76fe13 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/consensus/local_consensus.h"
#include <boost/thread/locks.hpp>
#include <iostream>
#include "kudu/consensus/log.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/server/metadata.h"
#include "kudu/server/clock.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/logging.h"
#include "kudu/util/trace.h"
namespace kudu {
namespace consensus {
using base::subtle::Barrier_AtomicIncrement;
using log::Log;
using log::LogEntryBatch;
using strings::Substitute;
LocalConsensus::LocalConsensus(ConsensusOptions options,
gscoped_ptr<ConsensusMetadata> cmeta,
string peer_uuid,
const scoped_refptr<server::Clock>& clock,
ReplicaTransactionFactory* txn_factory, Log* log)
: peer_uuid_(std::move(peer_uuid)),
options_(std::move(options)),
cmeta_(std::move(cmeta)),
txn_factory_(DCHECK_NOTNULL(txn_factory)),
log_(DCHECK_NOTNULL(log)),
clock_(clock),
state_(kInitializing),
next_op_id_index_(-1) {
CHECK(cmeta_) << "Passed ConsensusMetadata object is NULL";
}
Status LocalConsensus::Start(const ConsensusBootstrapInfo& info) {
TRACE_EVENT0("consensus", "LocalConsensus::Start");
CHECK_EQ(state_, kInitializing);
LOG_WITH_PREFIX(INFO) << "Starting LocalConsensus...";
{
boost::lock_guard<simple_spinlock> lock(lock_);
const RaftConfigPB& config = cmeta_->committed_config();
CHECK(config.local()) << "Local consensus must be passed a local config";
RETURN_NOT_OK_PREPEND(VerifyRaftConfig(config, COMMITTED_QUORUM),
"Invalid config found in LocalConsensus::Start()");
next_op_id_index_ = info.last_id.index() + 1;
CHECK(config.peers(0).has_permanent_uuid()) << config.ShortDebugString();
cmeta_->set_leader_uuid(config.peers(0).permanent_uuid());
RETURN_NOT_OK_PREPEND(ResubmitOrphanedReplicates(info.orphaned_replicates),
"Could not restart replicated operations");
state_ = kRunning;
}
TRACE("Consensus started");
return Status::OK();
}
Status LocalConsensus::ResubmitOrphanedReplicates(const std::vector<ReplicateMsg*> replicates) {
for (ReplicateMsg* msg : replicates) {
DCHECK_LT(msg->id().index(), next_op_id_index_)
<< "Orphaned replicate " << OpIdToString(msg->id())
<< " is newer than next op index " << next_op_id_index_;
LOG_WITH_PREFIX(INFO) << "Resubmitting operation "
<< OpIdToString(msg->id()) << " after restart";
ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*msg));
scoped_refptr<ConsensusRound> round(new ConsensusRound(this, replicate_ptr));
RETURN_NOT_OK(txn_factory_->StartReplicaTransaction(round));
round->NotifyReplicationFinished(Status::OK());
}
return Status::OK();
}
bool LocalConsensus::IsRunning() const {
boost::lock_guard<simple_spinlock> lock(lock_);
return state_ == kRunning;
}
Status LocalConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
TRACE_EVENT0("consensus", "LocalConsensus::Replicate");
DCHECK_GE(state_, kConfiguring);
ReplicateMsg* msg = round->replicate_msg();
OpId* cur_op_id = DCHECK_NOTNULL(msg)->mutable_id();
cur_op_id->set_term(0);
// Pre-cache the ByteSize outside of the lock, since this is somewhat
// expensive.
ignore_result(msg->ByteSize());
LogEntryBatch* reserved_entry_batch;
{
boost::lock_guard<simple_spinlock> lock(lock_);
// create the new op id for the entry.
cur_op_id->set_index(next_op_id_index_++);
// Reserve the correct slot in the log for the replication operation.
// It's important that we do this under the same lock as we generate
// the op id, so that we log things in-order.
gscoped_ptr<log::LogEntryBatchPB> entry_batch;
log::CreateBatchFromAllocatedOperations({ round->replicate_scoped_refptr() }, &entry_batch);
RETURN_NOT_OK(log_->Reserve(log::REPLICATE, std::move(entry_batch),
&reserved_entry_batch));
// Local consensus transactions are always committed so we
// can just persist the configuration, if this is a change config.
if (round->replicate_msg()->op_type() == CHANGE_CONFIG_OP) {
RaftConfigPB new_config = round->replicate_msg()->change_config_record().new_config();
DCHECK(!new_config.has_opid_index());
new_config.set_opid_index(round->replicate_msg()->id().index());
cmeta_->set_committed_config(new_config);
CHECK_OK(cmeta_->Flush());
}
}
// Serialize and mark the message as ready to be appended.
// When the Log actually fsync()s this message to disk, 'repl_callback'
// is triggered.
RETURN_NOT_OK(log_->AsyncAppend(
reserved_entry_batch,
Bind(&ConsensusRound::NotifyReplicationFinished, round)));
return Status::OK();
}
RaftPeerPB::Role LocalConsensus::role() const {
return RaftPeerPB::LEADER;
}
Status LocalConsensus::Update(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
return Status::NotSupported("LocalConsensus does not support Update() calls.");
}
Status LocalConsensus::RequestVote(const VoteRequestPB* request,
VoteResponsePB* response) {
return Status::NotSupported("LocalConsensus does not support RequestVote() calls.");
}
ConsensusStatePB LocalConsensus::ConsensusState(ConsensusConfigType type) const {
boost::lock_guard<simple_spinlock> lock(lock_);
return cmeta_->ToConsensusStatePB(type);
}
RaftConfigPB LocalConsensus::CommittedConfig() const {
boost::lock_guard<simple_spinlock> lock(lock_);
return cmeta_->committed_config();
}
void LocalConsensus::Shutdown() {
VLOG_WITH_PREFIX(1) << "LocalConsensus Shutdown!";
}
void LocalConsensus::DumpStatusHtml(std::ostream& out) const {
out << "<h1>Local Consensus Status</h1>\n";
boost::lock_guard<simple_spinlock> lock(lock_);
out << "next op: " << next_op_id_index_;
}
std::string LocalConsensus::LogPrefix() const {
return Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid_);
}
} // end namespace consensus
} // end namespace kudu