blob: d522949eb92bd2ea60b179381d7af873a44e388e [file] [log] [blame]
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_replica-test-base.h"
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/test_macros.h"
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::consensus::ConsensusMetadata;
using kudu::consensus::ConsensusMetadataManager;
using kudu::consensus::RaftConfigPB;
using kudu::consensus::RaftPeerPB;
using kudu::log::Log;
using kudu::log::LogOptions;
using kudu::pb_util::SecureDebugString;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::ResultTracker;
using kudu::tablet::KuduTabletTest;
using kudu::tserver::WriteRequestPB;
using kudu::tserver::WriteResponsePB;
using std::shared_ptr;
using std::unique_ptr;
using std::string;
METRIC_DECLARE_entity(tablet);
namespace kudu {
namespace tablet {
namespace {
const MonoDelta kLeadershipTimeout = MonoDelta::FromSeconds(10);
} // anonymous namespace
Status TabletReplicaTestBase::ExecuteWrite(TabletReplica* replica, const WriteRequestPB& req) {
WriteResponsePB resp;
unique_ptr<WriteOpState> op_state(new WriteOpState(replica,
&req,
nullptr, // No RequestIdPB
&resp));
CountDownLatch rpc_latch(1);
op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
new LatchOpCompletionCallback<WriteResponsePB>(&rpc_latch, &resp)));
RETURN_NOT_OK(replica->SubmitWrite(std::move(op_state)));
rpc_latch.Wait();
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
if (resp.per_row_errors_size() > 0) {
return StatusFromPB(resp.per_row_errors(0).error());
}
return Status::OK();
}
void TabletReplicaTestBase::SetUp() {
KuduTabletTest::SetUp();
ASSERT_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
MessengerBuilder builder("TxnStatusManagerTest");
ASSERT_OK(builder.Build(&messenger_));
cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager()));
metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
ASSERT_OK(SetUpReplica());
}
void TabletReplicaTestBase::TearDown() {
tablet_replica_->Shutdown();
prepare_pool_->Shutdown();
apply_pool_->Shutdown();
raft_pool_->Shutdown();
KuduTabletTest::TearDown();
}
Status TabletReplicaTestBase::SetUpReplica(bool new_replica) {
DCHECK(tablet_replica_ == nullptr);
RaftConfigPB config;
config.set_opid_index(consensus::kInvalidOpIdIndex);
RaftPeerPB* config_peer = config.add_peers();
config_peer->set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
config_peer->mutable_last_known_addr()->set_host("0.0.0.0");
config_peer->mutable_last_known_addr()->set_port(0);
config_peer->set_member_type(RaftPeerPB::VOTER);
if (new_replica) {
RETURN_NOT_OK(cmeta_manager_->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm));
}
// "Bootstrap" and start the TabletReplica.
const auto& tablet_id = tablet()->tablet_id();
tablet_replica_.reset(
new TabletReplica(tablet()->shared_metadata(),
cmeta_manager_,
*config_peer,
apply_pool_.get(),
/*reload_txn_status_tablet_pool*/nullptr,
/*txn_coordinator_factory*/nullptr,
[tablet_id] (const string& reason) {
LOG(INFO) << Substitute(
"state change callback run for $0: $1", tablet_id, reason);
}));
// Make the replica use the same LogAnchorRegistry as the tablet harness.
// TODO(mpercy): Refactor TabletHarness to allow taking a LogAnchorRegistry,
// while also providing TabletMetadata for consumption by TabletReplica
// before Tablet is instantiated.
RETURN_NOT_OK(tablet_replica_->Init({ /*quiescing*/nullptr,
/*num_leaders*/nullptr,
raft_pool_.get() }));
tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
return Status::OK();
}
Status TabletReplicaTestBase::StartReplica(const ConsensusBootstrapInfo& info) {
scoped_refptr<Log> log;
RETURN_NOT_OK(Log::Open(LogOptions(),
fs_manager(),
/*file_cache*/nullptr,
tablet()->tablet_id(),
*tablet()->schema(),
tablet()->metadata()->schema_version(),
metric_entity_.get(),
&log));
tablet_replica_->SetBootstrapping();
return tablet_replica_->Start(info,
tablet(),
clock(),
messenger_,
scoped_refptr<ResultTracker>(),
log,
prepare_pool_.get(),
dns_resolver_.get());
}
Status TabletReplicaTestBase::StartReplicaAndWaitUntilLeader(const ConsensusBootstrapInfo& info) {
RETURN_NOT_OK(StartReplica(info));
return tablet_replica_->consensus()->WaitUntilLeader(kLeadershipTimeout);
}
Status TabletReplicaTestBase::RestartReplica(bool reset_tablet) {
tablet_replica_->Shutdown();
tablet_replica_.reset();
// Reset the underlying harness's tablet if requested.
if (reset_tablet) {
CreateTestTablet();
// NOTE: the FsManager is owned by the harness, so refreshing the harness
// means we have to refresh anything that depends on its FsManager.
cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager()));
}
RETURN_NOT_OK(SetUpReplica(/*new_replica=*/ false));
scoped_refptr<ConsensusMetadata> cmeta;
RETURN_NOT_OK(cmeta_manager_->Load(tablet_replica_->tablet_id(), &cmeta));
shared_ptr<Tablet> tablet;
scoped_refptr<Log> log;
ConsensusBootstrapInfo bootstrap_info;
tablet_replica_->SetBootstrapping();
RETURN_NOT_OK(BootstrapTablet(tablet_replica_->tablet_metadata(),
cmeta->CommittedConfig(),
clock(),
/*mem_tracker*/nullptr,
/*result_tracker*/nullptr,
&metric_registry_,
/*file_cache*/nullptr,
tablet_replica_,
tablet_replica_->log_anchor_registry(),
&tablet,
&log,
&bootstrap_info));
RETURN_NOT_OK(tablet_replica_->Start(bootstrap_info,
tablet,
clock(),
messenger_,
scoped_refptr<ResultTracker>(),
log,
prepare_pool_.get(),
dns_resolver_.get()));
// Wait for the replica to be usable.
return tablet_replica_->consensus()->WaitUntilLeader(kLeadershipTimeout);
}
} // namespace tablet
} // namespace kudu