blob: 45d327e79584613929f69319f0b4edb3df948c65 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/tablet_copy_source_session.h"
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/tablet_copy.pb.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/crc.h"
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/threadpool.h"
METRIC_DECLARE_entity(tablet);
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
class BlockIdPB;
namespace tserver {
using consensus::ConsensusMetadataManager;
using consensus::RaftConfigPB;
using consensus::RaftPeerPB;
using consensus::kMinimumTerm;
using fs::BlockDeletionTransaction;
using fs::ReadableBlock;
using log::Log;
using log::LogOptions;
using log::LogAnchorRegistry;
using rpc::Messenger;
using rpc::MessengerBuilder;
using strings::Substitute;
using tablet::ColumnDataPB;
using tablet::DeltaDataPB;
using tablet::KuduTabletTest;
using tablet::RowSetDataPB;
using tablet::TabletReplica;
using tablet::TabletSuperBlockPB;
using tablet::WriteOpState;
class TabletCopyTest : public KuduTabletTest {
public:
TabletCopyTest()
: KuduTabletTest(Schema({ ColumnSchema("key", STRING),
ColumnSchema("val", INT32) }, 1)),
dns_resolver_(new DnsResolver) {
CHECK_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
}
void SetUp() override {
NO_FATALS(KuduTabletTest::SetUp());
NO_FATALS(SetUpTabletReplica());
NO_FATALS(PopulateTablet());
NO_FATALS(InitSession());
}
void TearDown() override {
session_.reset();
tablet_replica_->Shutdown();
KuduTabletTest::TearDown();
}
protected:
void SetUpTabletReplica() {
scoped_refptr<Log> log;
ASSERT_OK(Log::Open(LogOptions(),
fs_manager(),
/*file_cache=*/ nullptr,
tablet()->tablet_id(),
*tablet()->schema(),
/*schema_version=*/ 0,
/*metric_entity=*/ nullptr,
&log));
scoped_refptr<MetricEntity> metric_entity =
METRIC_ENTITY_tablet.Instantiate(&metric_registry_, CURRENT_TEST_NAME());
// TODO(mpercy): Similar to code in tablet_replica-test, consider refactor.
RaftConfigPB config;
config.set_opid_index(consensus::kInvalidOpIdIndex);
RaftPeerPB* config_peer = config.add_peers();
config_peer->set_permanent_uuid(fs_manager()->uuid());
config_peer->mutable_last_known_addr()->set_host("0.0.0.0");
config_peer->mutable_last_known_addr()->set_port(0);
config_peer->set_member_type(RaftPeerPB::VOTER);
scoped_refptr<ConsensusMetadataManager> cmeta_manager(
new ConsensusMetadataManager(fs_manager()));
ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, kMinimumTerm));
const auto& tablet_id = tablet()->tablet_id();
tablet_replica_.reset(
new TabletReplica(tablet()->metadata(),
cmeta_manager,
*config_peer,
apply_pool_.get(),
nullptr,
[this, tablet_id](const string& reason) {
this->TabletReplicaStateChangedCallback(tablet_id, reason);
}));
ASSERT_OK(tablet_replica_->Init({ /*quiescing*/nullptr,
/*num_leaders*/nullptr,
raft_pool_.get() }));
shared_ptr<Messenger> messenger;
MessengerBuilder mbuilder(CURRENT_TEST_NAME());
mbuilder.Build(&messenger);
log_anchor_registry_.reset(new LogAnchorRegistry());
tablet_replica_->SetBootstrapping();
consensus::ConsensusBootstrapInfo boot_info;
ASSERT_OK(tablet_replica_->Start(boot_info,
tablet(),
clock(),
messenger,
scoped_refptr<rpc::ResultTracker>(),
log,
prepare_pool_.get(),
dns_resolver_.get()));
ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
}
void TabletReplicaStateChangedCallback(const string& tablet_id, const string& reason) {
LOG(INFO) << "Tablet replica state changed for tablet " << tablet_id << ". Reason: " << reason;
}
void PopulateTablet() {
for (int32_t i = 0; i < 1000; i++) {
WriteRequestPB req;
req.set_tablet_id(tablet_replica_->tablet_id());
ASSERT_OK(SchemaToPB(client_schema_, req.mutable_schema()));
RowOperationsPB* data = req.mutable_row_operations();
RowOperationsPBEncoder enc(data);
KuduPartialRow row(&client_schema_);
string key = Substitute("key$0", i);
ASSERT_OK(row.SetStringNoCopy(0, key));
ASSERT_OK(row.SetInt32(1, i));
enc.Add(RowOperationsPB::INSERT, row);
WriteResponsePB resp;
CountDownLatch latch(1);
unique_ptr<tablet::WriteOpState> state(
new tablet::WriteOpState(tablet_replica_.get(),
&req,
nullptr, // No RequestIdPB
&resp));
state->set_completion_callback(unique_ptr<tablet::OpCompletionCallback>(
new tablet::LatchOpCompletionCallback<WriteResponsePB>(&latch, &resp)));
ASSERT_OK(tablet_replica_->SubmitWrite(std::move(state)));
latch.Wait();
ASSERT_FALSE(resp.has_error())
<< "Request failed: " << pb_util::SecureShortDebugString(resp.error());
ASSERT_EQ(0, resp.per_row_errors_size())
<< "Insert error: " << pb_util::SecureShortDebugString(resp);
}
ASSERT_OK(tablet()->Flush());
}
void InitSession() {
session_.reset(new TabletCopySourceSession(tablet_replica_.get(), "TestSession", "FakeUUID",
fs_manager(), nullptr /* no metrics */));
ASSERT_OK(session_->Init());
}
// Read the specified BlockId, via the TabletCopySourceSession, into a file.
// 'path' will be populated with the name of the file used.
// 'file' will be set to point to the SequentialFile containing the data.
void FetchBlockToFile(const BlockId& block_id,
string* path,
unique_ptr<SequentialFile>* file) {
string data;
int64_t block_file_size = 0;
TabletCopyErrorPB::Code error_code;
CHECK_OK(session_->GetBlockPiece(block_id, 0, 0, &data, &block_file_size, &error_code));
if (block_file_size > 0) {
CHECK_GT(data.size(), 0);
}
// Write the file to a temporary location.
WritableFileOptions opts;
string path_template = GetTestPath(Substitute("test_block_$0$1.XXXXXX",
block_id.ToString(),
kTmpInfix));
unique_ptr<WritableFile> writable_file;
CHECK_OK(Env::Default()->NewTempWritableFile(opts, path_template, path, &writable_file));
CHECK_OK(writable_file->Append(Slice(data.data(), data.size())));
CHECK_OK(writable_file->Close());
CHECK_OK(Env::Default()->NewSequentialFile(*path, file));
}
MetricRegistry metric_registry_;
scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
unique_ptr<ThreadPool> prepare_pool_;
unique_ptr<ThreadPool> apply_pool_;
unique_ptr<ThreadPool> raft_pool_;
unique_ptr<DnsResolver> dns_resolver_;
scoped_refptr<TabletReplica> tablet_replica_;
scoped_refptr<TabletCopySourceSession> session_;
};
// Ensure that the serialized SuperBlock included in the TabletCopySourceSession is
// equal to the serialized live superblock (on a quiesced tablet).
TEST_F(TabletCopyTest, TestSuperBlocksEqual) {
// Compare content of superblocks.
faststring session_buf;
faststring tablet_buf;
{
const TabletSuperBlockPB& session_superblock = session_->tablet_superblock();
size_t size = session_superblock.ByteSizeLong();
session_buf.resize(size);
uint8_t* session_dst = session_buf.data();
session_superblock.SerializeWithCachedSizesToArray(session_dst);
}
{
TabletSuperBlockPB tablet_superblock;
ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
size_t size = tablet_superblock.ByteSizeLong();
tablet_buf.resize(size);
uint8_t* tablet_dst = tablet_buf.data();
tablet_superblock.SerializeWithCachedSizesToArray(tablet_dst);
}
ASSERT_EQ(session_buf.size(), tablet_buf.size());
int size = tablet_buf.size();
ASSERT_EQ(0, strings::fastmemcmp_inlined(session_buf.data(), tablet_buf.data(), size));
}
// Test fetching all files from tablet server, ensure the checksums for each
// chunk and the total file sizes match.
TEST_F(TabletCopyTest, TestBlocksEqual) {
TabletSuperBlockPB tablet_superblock;
ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
for (int i = 0; i < tablet_superblock.rowsets_size(); i++) {
const RowSetDataPB& rowset = tablet_superblock.rowsets(i);
for (int j = 0; j < rowset.columns_size(); j++) {
const ColumnDataPB& column = rowset.columns(j);
const BlockIdPB& block_id_pb = column.block();
BlockId block_id = BlockId::FromPB(block_id_pb);
string path;
unique_ptr<SequentialFile> file;
FetchBlockToFile(block_id, &path, &file);
uint64_t session_block_size = 0;
ASSERT_OK(Env::Default()->GetFileSize(path, &session_block_size));
faststring buf;
buf.resize(session_block_size);
Slice data(buf.data(), session_block_size);
ASSERT_OK(file->Read(&data));
uint32_t session_crc = crc::Crc32c(data.data(), data.size());
LOG(INFO) << "session block file has size of " << session_block_size
<< " and CRC32C of " << session_crc << ": " << path;
unique_ptr<ReadableBlock> tablet_block;
ASSERT_OK(fs_manager()->OpenBlock(block_id, &tablet_block));
uint64_t tablet_block_size = 0;
ASSERT_OK(tablet_block->Size(&tablet_block_size));
buf.resize(tablet_block_size);
Slice data2(buf.data(), tablet_block_size);
ASSERT_OK(tablet_block->Read(0, data2));
uint32_t tablet_crc = crc::Crc32c(data.data(), data.size());
LOG(INFO) << "tablet block file has size of " << tablet_block_size
<< " and CRC32C of " << tablet_crc
<< ": " << block_id;
// Compare the blocks.
ASSERT_EQ(tablet_block_size, session_block_size);
ASSERT_EQ(tablet_crc, session_crc);
}
}
}
// Ensure that blocks are still readable through the open session even
// after they've been deleted.
TEST_F(TabletCopyTest, TestBlocksAreFetchableAfterBeingDeleted) {
TabletSuperBlockPB tablet_superblock;
ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
// Gather all the blocks.
vector<BlockId> data_blocks;
for (const RowSetDataPB& rowset : tablet_superblock.rowsets()) {
for (const DeltaDataPB& redo : rowset.redo_deltas()) {
data_blocks.push_back(BlockId::FromPB(redo.block()));
}
for (const DeltaDataPB& undo : rowset.undo_deltas()) {
data_blocks.push_back(BlockId::FromPB(undo.block()));
}
for (const ColumnDataPB& column : rowset.columns()) {
data_blocks.push_back(BlockId::FromPB(column.block()));
}
if (rowset.has_bloom_block()) {
data_blocks.push_back(BlockId::FromPB(rowset.bloom_block()));
}
if (rowset.has_adhoc_index_block()) {
data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block()));
}
}
// Delete them.
shared_ptr<BlockDeletionTransaction> deletion_transaction =
fs_manager()->block_manager()->NewDeletionTransaction();
for (const BlockId& block_id : data_blocks) {
deletion_transaction->AddDeletedBlock(block_id);
}
vector<BlockId> deleted;
ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
ASSERT_EQ(data_blocks.size(), deleted.size());
// Read them back.
for (const BlockId& block_id : data_blocks) {
ASSERT_TRUE(session_->IsBlockOpenForTests(block_id));
string data;
TabletCopyErrorPB::Code error_code;
int64_t piece_size;
ASSERT_OK(session_->GetBlockPiece(block_id, 0, 0,
&data, &piece_size, &error_code));
}
}
} // namespace tserver
} // namespace kudu