| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #include "kudu/tserver/tablet_copy-test-base.h" |
| |
| #include "kudu/consensus/quorum_util.h" |
| #include "kudu/gutil/strings/fastmem.h" |
| #include "kudu/tablet/tablet_bootstrap.h" |
| #include "kudu/tserver/tablet_copy_client.h" |
| #include "kudu/util/env_util.h" |
| |
| using std::shared_ptr; |
| |
| namespace kudu { |
| namespace tserver { |
| |
| using consensus::GetRaftConfigLeader; |
| using consensus::RaftPeerPB; |
| using tablet::TabletMetadata; |
| using tablet::TabletStatusListener; |
| |
| class TabletCopyClientTest : public TabletCopyTest { |
| public: |
| virtual void SetUp() OVERRIDE { |
| NO_FATALS(TabletCopyTest::SetUp()); |
| |
| fs_manager_.reset(new FsManager(Env::Default(), GetTestPath("client_tablet"))); |
| ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); |
| ASSERT_OK(fs_manager_->Open()); |
| |
| tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0)); |
| rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_); |
| client_.reset(new TabletCopyClient(GetTabletId(), |
| fs_manager_.get(), |
| messenger_)); |
| ASSERT_OK(GetRaftConfigLeader(tablet_peer_->consensus() |
| ->ConsensusState(consensus::CONSENSUS_CONFIG_COMMITTED), &leader_)); |
| |
| HostPort host_port; |
| ASSERT_OK(HostPortFromPB(leader_.last_known_addr(), &host_port)); |
| ASSERT_OK(client_->Start(host_port, &meta_)); |
| } |
| |
| protected: |
| Status CompareFileContents(const string& path1, const string& path2); |
| |
| gscoped_ptr<FsManager> fs_manager_; |
| shared_ptr<rpc::Messenger> messenger_; |
| gscoped_ptr<TabletCopyClient> client_; |
| scoped_refptr<TabletMetadata> meta_; |
| RaftPeerPB leader_; |
| }; |
| |
| Status TabletCopyClientTest::CompareFileContents(const string& path1, const string& path2) { |
| shared_ptr<RandomAccessFile> file1, file2; |
| RETURN_NOT_OK(env_util::OpenFileForRandom(fs_manager_->env(), path1, &file1)); |
| RETURN_NOT_OK(env_util::OpenFileForRandom(fs_manager_->env(), path2, &file2)); |
| |
| uint64_t size1, size2; |
| RETURN_NOT_OK(file1->Size(&size1)); |
| RETURN_NOT_OK(file2->Size(&size2)); |
| if (size1 != size2) { |
| return Status::Corruption("Sizes of files don't match", |
| strings::Substitute("$0 vs $1 bytes", size1, size2)); |
| } |
| |
| Slice slice1, slice2; |
| faststring scratch1, scratch2; |
| scratch1.resize(size1); |
| scratch2.resize(size2); |
| RETURN_NOT_OK(env_util::ReadFully(file1.get(), 0, size1, &slice1, scratch1.data())); |
| RETURN_NOT_OK(env_util::ReadFully(file2.get(), 0, size2, &slice2, scratch2.data())); |
| int result = strings::fastmemcmp_inlined(slice1.data(), slice2.data(), size1); |
| if (result != 0) { |
| return Status::Corruption("Files do not match"); |
| } |
| return Status::OK(); |
| } |
| |
| // Basic begin / end tablet copy session. |
| TEST_F(TabletCopyClientTest, TestBeginEndSession) { |
| TabletStatusListener listener(meta_); |
| ASSERT_OK(client_->FetchAll(&listener)); |
| ASSERT_OK(client_->Finish()); |
| } |
| |
| // Basic data block download unit test. |
| TEST_F(TabletCopyClientTest, TestDownloadBlock) { |
| TabletStatusListener listener(meta_); |
| BlockId block_id = FirstColumnBlockId(*client_->superblock_); |
| Slice slice; |
| faststring scratch; |
| |
| // Ensure the block wasn't there before (it shouldn't be, we use our own FsManager dir). |
| Status s; |
| s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice); |
| ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString(); |
| |
| // Check that the client downloaded the block and verification passed. |
| BlockId new_block_id; |
| ASSERT_OK(client_->DownloadBlock(block_id, &new_block_id)); |
| |
| // Ensure it placed the block where we expected it to. |
| s = ReadLocalBlockFile(fs_manager_.get(), block_id, &scratch, &slice); |
| ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString(); |
| ASSERT_OK(ReadLocalBlockFile(fs_manager_.get(), new_block_id, &scratch, &slice)); |
| } |
| |
| // Basic WAL segment download unit test. |
| TEST_F(TabletCopyClientTest, TestDownloadWalSegment) { |
| ASSERT_OK(fs_manager_->CreateDirIfMissing(fs_manager_->GetTabletWalDir(GetTabletId()))); |
| |
| uint64_t seqno = client_->wal_seqnos_[0]; |
| string path = fs_manager_->GetWalSegmentFileName(GetTabletId(), seqno); |
| |
| ASSERT_FALSE(fs_manager_->Exists(path)); |
| ASSERT_OK(client_->DownloadWAL(seqno)); |
| ASSERT_TRUE(fs_manager_->Exists(path)); |
| |
| log::SegmentSequence local_segments; |
| ASSERT_OK(tablet_peer_->log()->reader()->GetSegmentsSnapshot(&local_segments)); |
| const scoped_refptr<log::ReadableLogSegment>& segment = local_segments[0]; |
| string server_path = segment->path(); |
| |
| // Compare the downloaded file with the source file. |
| ASSERT_OK(CompareFileContents(path, server_path)); |
| } |
| |
| // Ensure that we detect data corruption at the per-transfer level. |
| TEST_F(TabletCopyClientTest, TestVerifyData) { |
| string good = "This is a known good string"; |
| string bad = "This is a known bad! string"; |
| const int kGoodOffset = 0; |
| const int kBadOffset = 1; |
| const int64_t kDataTotalLen = std::numeric_limits<int64_t>::max(); // Ignored. |
| |
| // Create a known-good PB. |
| DataChunkPB valid_chunk; |
| valid_chunk.set_offset(0); |
| valid_chunk.set_data(good); |
| valid_chunk.set_crc32(crc::Crc32c(good.data(), good.length())); |
| valid_chunk.set_total_data_length(kDataTotalLen); |
| |
| // Make sure we work on the happy case. |
| ASSERT_OK(client_->VerifyData(kGoodOffset, valid_chunk)); |
| |
| // Test unexpected offset. |
| DataChunkPB bad_offset = valid_chunk; |
| bad_offset.set_offset(kBadOffset); |
| Status s; |
| s = client_->VerifyData(kGoodOffset, bad_offset); |
| ASSERT_TRUE(s.IsInvalidArgument()) << "Bad offset expected: " << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "Offset did not match"); |
| LOG(INFO) << "Expected error returned: " << s.ToString(); |
| |
| // Test bad checksum. |
| DataChunkPB bad_checksum = valid_chunk; |
| bad_checksum.set_data(bad); |
| s = client_->VerifyData(kGoodOffset, bad_checksum); |
| ASSERT_TRUE(s.IsCorruption()) << "Invalid checksum expected: " << s.ToString(); |
| ASSERT_STR_CONTAINS(s.ToString(), "CRC32 does not match"); |
| LOG(INFO) << "Expected error returned: " << s.ToString(); |
| } |
| |
| namespace { |
| |
| vector<BlockId> GetAllSortedBlocks(const tablet::TabletSuperBlockPB& sb) { |
| vector<BlockId> data_blocks; |
| |
| for (const tablet::RowSetDataPB& rowset : sb.rowsets()) { |
| for (const tablet::DeltaDataPB& redo : rowset.redo_deltas()) { |
| data_blocks.push_back(BlockId::FromPB(redo.block())); |
| } |
| for (const tablet::DeltaDataPB& undo : rowset.undo_deltas()) { |
| data_blocks.push_back(BlockId::FromPB(undo.block())); |
| } |
| for (const tablet::ColumnDataPB& column : rowset.columns()) { |
| data_blocks.push_back(BlockId::FromPB(column.block())); |
| } |
| if (rowset.has_bloom_block()) { |
| data_blocks.push_back(BlockId::FromPB(rowset.bloom_block())); |
| } |
| if (rowset.has_adhoc_index_block()) { |
| data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block())); |
| } |
| } |
| |
| std::sort(data_blocks.begin(), data_blocks.end(), BlockIdCompare()); |
| return data_blocks; |
| } |
| |
| } // anonymous namespace |
| |
| TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) { |
| // Download all the blocks. |
| ASSERT_OK(client_->DownloadBlocks()); |
| |
| // Verify that the new superblock reflects the changes in block IDs. |
| // |
| // As long as block IDs are generated with UUIDs or something equally |
| // unique, there's no danger of a block in the new superblock somehow |
| // being assigned the same ID as a block in the existing superblock. |
| vector<BlockId> old_data_blocks = GetAllSortedBlocks(*client_->superblock_.get()); |
| vector<BlockId> new_data_blocks = GetAllSortedBlocks(*client_->new_superblock_.get()); |
| vector<BlockId> result; |
| std::set_intersection(old_data_blocks.begin(), old_data_blocks.end(), |
| new_data_blocks.begin(), new_data_blocks.end(), |
| std::back_inserter(result), BlockIdCompare()); |
| ASSERT_TRUE(result.empty()); |
| ASSERT_EQ(old_data_blocks.size(), new_data_blocks.size()); |
| |
| // Verify that the old blocks aren't found. We're using a different |
| // FsManager than 'tablet_peer', so the only way an old block could end |
| // up in ours is due to a tablet copy client bug. |
| for (const BlockId& block_id : old_data_blocks) { |
| gscoped_ptr<fs::ReadableBlock> block; |
| Status s = fs_manager_->OpenBlock(block_id, &block); |
| ASSERT_TRUE(s.IsNotFound()) << "Expected block not found: " << s.ToString(); |
| } |
| // And the new blocks are all present. |
| for (const BlockId& block_id : new_data_blocks) { |
| gscoped_ptr<fs::ReadableBlock> block; |
| ASSERT_OK(fs_manager_->OpenBlock(block_id, &block)); |
| } |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |