| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #include "kudu/tserver/tablet_copy_source_session.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstring> |
| #include <optional> |
| #include <ostream> |
| #include <type_traits> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/consensus_meta.h" |
| #include "kudu/consensus/consensus_meta_manager.h" |
| #include "kudu/consensus/log.h" |
| #include "kudu/consensus/log.pb.h" |
| #include "kudu/consensus/log_index.h" |
| #include "kudu/consensus/log_reader.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/opid_util.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/data_dirs.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/type_traits.h" |
| #include "kudu/rpc/transfer.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/trace.h" |
| |
| DEFINE_int32(tablet_copy_transfer_chunk_size_bytes, 4 * 1024 * 1024, |
| "Size of chunks to transfer when copying tablets between " |
| "tablet servers."); |
| TAG_FLAG(tablet_copy_transfer_chunk_size_bytes, hidden); |
| |
| METRIC_DEFINE_counter(server, tablet_copy_bytes_sent, |
| "Bytes Sent For Tablet Copy", |
| kudu::MetricUnit::kBytes, |
| "Number of bytes sent during tablet copy operations since server start", |
| kudu::MetricLevel::kDebug); |
| |
| METRIC_DEFINE_gauge_int32(server, tablet_copy_open_source_sessions, |
| "Open Table Copy Source Sessions", |
| kudu::MetricUnit::kSessions, |
| "Number of currently open tablet copy source sessions on this server", |
| kudu::MetricLevel::kInfo); |
| |
| DEFINE_int32(tablet_copy_session_inject_latency_on_init_ms, 0, |
| "How much latency (in ms) to inject when a tablet copy session is initialized. " |
| "(For testing only!)"); |
| TAG_FLAG(tablet_copy_session_inject_latency_on_init_ms, unsafe); |
| TAG_FLAG(tablet_copy_session_inject_latency_on_init_ms, hidden); |
| |
| namespace kudu { |
| namespace tserver { |
| |
| using consensus::MinimumOpId; |
| using consensus::OpId; |
| using fs::ReadableBlock; |
| using log::ReadableLogSegment; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| using tablet::TabletMetadata; |
| using tablet::TabletReplica; |
| |
| TabletCopySourceMetrics::TabletCopySourceMetrics(const scoped_refptr<MetricEntity>& metric_entity) |
| : bytes_sent(METRIC_tablet_copy_bytes_sent.Instantiate(metric_entity)), |
| open_source_sessions(METRIC_tablet_copy_open_source_sessions.Instantiate(metric_entity, 0)) { |
| } |
| |
| TabletCopySourceSession::TabletCopySourceSession( |
| string tablet_id, |
| FsManager* fs_manager, |
| TabletCopySourceMetrics* tablet_copy_metrics) |
| : tablet_id_(std::move(tablet_id)), |
| fs_manager_(fs_manager), |
| blocks_deleter_(&blocks_), |
| logs_deleter_(&logs_), |
| tablet_copy_metrics_(tablet_copy_metrics) { |
| if (tablet_copy_metrics_) { |
| tablet_copy_metrics_->open_source_sessions->Increment(); |
| } |
| } |
| |
| TabletCopySourceSession::~TabletCopySourceSession() { |
| if (tablet_copy_metrics_) { |
| tablet_copy_metrics_->open_source_sessions->IncrementBy(-1); |
| } |
| } |
| |
| Status TabletCopySourceSession::Init() { |
| return init_once_.Init(&TabletCopySourceSession::InitOnce, this); |
| } |
| |
| Status RemoteTabletCopySourceSession::InitOnce() { |
| // Inject latency during Init() for testing purposes. |
| if (PREDICT_FALSE(FLAGS_tablet_copy_session_inject_latency_on_init_ms > 0)) { |
| TRACE("Injecting $0ms of latency due to --tablet_copy_session_inject_latency_on_init_ms", |
| FLAGS_tablet_copy_session_inject_latency_on_init_ms); |
| SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_session_inject_latency_on_init_ms)); |
| } |
| |
| RETURN_NOT_OK(tablet_replica_->CheckRunning()); |
| RETURN_NOT_OK(CheckHealthyDirGroup()); |
| |
| // Prevent log GC while we grab log segments and Tablet metadata. |
| string anchor_owner_token = Substitute("TabletCopy-$0", session_id_); |
| tablet_replica_->log_anchor_registry()->Register( |
| MinimumOpId().index(), anchor_owner_token, &log_anchor_); |
| |
| // Read the SuperBlock from disk. |
| const scoped_refptr<TabletMetadata>& metadata = tablet_replica_->tablet_metadata(); |
| RETURN_NOT_OK_PREPEND(metadata->ReadSuperBlockFromDisk(&tablet_superblock_), |
| Substitute("Unable to access superblock for tablet $0", |
| tablet_id_)); |
| |
| // Anchor the data blocks by opening them and adding them to the cache. |
| // |
| // All subsequent requests should reuse the opened blocks. |
| vector<BlockIdPB> data_blocks = |
| TabletMetadata::CollectBlockIdPBs(tablet_superblock_); |
| for (const BlockIdPB& block_id : data_blocks) { |
| VLOG(1) << "Opening block " << pb_util::SecureDebugString(block_id); |
| RETURN_NOT_OK(OpenBlock(BlockId::FromPB(block_id))); |
| } |
| |
| // Get the latest opid in the log at this point in time so we can re-anchor. |
| // TODO(mpercy): Do we need special handling for std::nullopt case? |
| std::optional<OpId> last_logged_opid = |
| tablet_replica_->consensus()->GetLastOpId(consensus::RECEIVED_OPID); |
| if (!last_logged_opid) { |
| last_logged_opid = MinimumOpId(); |
| } |
| |
| // Get the current segments from the log, including the active segment. |
| // The Log doesn't add the active segment to the log reader's list until |
| // a header has been written to it (but it will not have a footer). |
| shared_ptr<log::LogReader> reader = tablet_replica_->log()->reader(); |
| if (!reader) { |
| tablet::TabletStatePB tablet_state = tablet_replica_->state(); |
| return Status::IllegalState(Substitute( |
| "Unable to initialize tablet copy session for tablet $0. " |
| "Log reader is not available. Tablet state: $1 ($2)", |
| tablet_id_, tablet::TabletStatePB_Name(tablet_state), tablet_state)); |
| } |
| reader->GetSegmentsSnapshot(&log_segments_); |
| for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) { |
| RETURN_NOT_OK(OpenLogSegment(segment->header().sequence_number())); |
| } |
| |
| // Look up the committed consensus state. |
| // We do this after snapshotting the log to avoid a scenario where the latest |
| // entry in the log has a term higher than the term stored in the consensus |
| // metadata, which will results in a CHECK failure on RaftConsensus init. |
| shared_ptr<consensus::RaftConsensus> consensus = tablet_replica_->shared_consensus(); |
| if (!consensus) { |
| tablet::TabletStatePB tablet_state = tablet_replica_->state(); |
| return Status::IllegalState(Substitute( |
| "Unable to initialize tablet copy session for tablet $0. " |
| "Raft Consensus is not available. Tablet state: $1 ($2)", |
| tablet_id_, tablet::TabletStatePB_Name(tablet_state), tablet_state)); |
| } |
| RETURN_NOT_OK_PREPEND(consensus->ConsensusState(&initial_cstate_), |
| "Consensus state not available"); |
| |
| // Re-anchor on the highest OpId that was in the log right before we |
| // snapshotted the log segments. This helps ensure that we don't end up in a |
| // tablet copy loop due to a follower falling too far behind the |
| // leader's log when tablet copy is slow. The remote controls when |
| // this anchor is released by ending the tablet copy session. |
| RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->RegisterOrUpdate( |
| last_logged_opid->index(), anchor_owner_token, &log_anchor_)); |
| |
| LOG(INFO) << Substitute( |
| "T $0 P $1: Tablet Copy: opened $2 blocks and $3 log segments", |
| tablet_id_, consensus->peer_uuid(), data_blocks.size(), log_segments_.size()); |
| return Status::OK(); |
| } |
| |
| // Determine the length of the data chunk to return to the client. |
| static int64_t DetermineReadLength(int64_t bytes_remaining, int64_t requested_len) { |
| // Overhead in the RPC for things like headers, protobuf data, etc. |
| static const int kOverhead = 4096; |
| |
| if (requested_len <= 0) { |
| requested_len = FLAGS_tablet_copy_transfer_chunk_size_bytes; |
| } else { |
| requested_len = std::min<int64_t>(requested_len, |
| FLAGS_tablet_copy_transfer_chunk_size_bytes); |
| } |
| requested_len = std::min<int64_t>(requested_len, FLAGS_rpc_max_message_size - kOverhead); |
| CHECK_GT(requested_len, 0) << "rpc_max_message_size is too low to transfer data: " |
| << FLAGS_rpc_max_message_size; |
| return std::min(bytes_remaining, requested_len); |
| } |
| |
| // Calculate the size of the data to return given a maximum client message |
| // length, the file itself, and the offset into the file to be read from. |
| static Status GetResponseDataSize(int64_t total_size, |
| uint64_t offset, int64_t client_maxlen, |
| TabletCopyErrorPB::Code* error_code, int64_t* data_size) { |
| // If requested offset is off the end of the data, bail. |
| if (offset >= total_size) { |
| *error_code = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST; |
| return Status::InvalidArgument( |
| Substitute("Requested offset ($0) is beyond the data size ($1)", |
| offset, total_size)); |
| } |
| |
| int64_t bytes_remaining = total_size - offset; |
| |
| *data_size = DetermineReadLength(bytes_remaining, client_maxlen); |
| DCHECK_GT(*data_size, 0); |
| if (client_maxlen > 0) { |
| DCHECK_LE(*data_size, client_maxlen); |
| } |
| |
| return Status::OK(); |
| } |
| |
| // Read a chunk of a file into a buffer. |
| // data_name provides a string for the block/log to be used in error messages. |
| template <class Info> |
| static Status ReadFileChunkToBuf(const Info* info, |
| uint64_t offset, int64_t client_maxlen, |
| const string& data_name, |
| string* data, int64_t* file_size, |
| TabletCopyErrorPB::Code* error_code) { |
| int64_t response_data_size = 0; |
| RETURN_NOT_OK_PREPEND(GetResponseDataSize(info->size, offset, client_maxlen, error_code, |
| &response_data_size), |
| Substitute("Error reading $0", data_name)); |
| |
| Stopwatch chunk_timer(Stopwatch::THIS_THREAD); |
| chunk_timer.start(); |
| |
| // Writing into a std::string buffer is basically guaranteed to work on C++11, |
| // however any modern compiler should be compatible with it. |
| // Violates the API contract, but avoids excessive copies. |
| data->resize(response_data_size); |
| uint8_t* buf = reinterpret_cast<uint8_t*>(const_cast<char*>(data->data())); |
| Slice slice(buf, response_data_size); |
| Status s = info->Read(offset, slice); |
| if (PREDICT_FALSE(!s.ok())) { |
| s = s.CloneAndPrepend( |
| Substitute("Unable to read existing file for $0", data_name)); |
| LOG(WARNING) << s.ToString(); |
| *error_code = TabletCopyErrorPB::IO_ERROR; |
| return s; |
| } |
| // Figure out if Slice points to buf or if Slice points to the mmap. |
| // If it points to the mmap then copy into buf. |
| if (slice.data() != buf) { |
| memcpy(buf, slice.data(), slice.size()); |
| } |
| chunk_timer.stop(); |
| TRACE("Tablet Copy: $0: $1 total bytes read. Total time elapsed: $2", |
| data_name, response_data_size, chunk_timer.elapsed().ToString()); |
| |
| *file_size = info->size; |
| return Status::OK(); |
| } |
| |
| Status TabletCopySourceSession::GetBlockPiece(const BlockId& block_id, |
| uint64_t offset, int64_t client_maxlen, |
| string* data, int64_t* block_file_size, |
| TabletCopyErrorPB::Code* error_code) { |
| DCHECK(init_once_.init_succeeded()); |
| RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(error_code), |
| "Tablet copy source could not get block"); |
| ImmutableReadableBlockInfo* block_info; |
| RETURN_NOT_OK(FindBlock(block_id, &block_info, error_code)); |
| |
| RETURN_NOT_OK(ReadFileChunkToBuf(block_info, offset, client_maxlen, |
| Substitute("block $0", block_id.ToString()), |
| data, block_file_size, error_code)); |
| |
| // Note: We do not eagerly close the block, as doing so may delete the |
| // underlying data if this was its last reader and it had been previously |
| // marked for deletion. This would be a problem for parallel readers in |
| // the same session; they would not be able to find the block. |
| |
| return Status::OK(); |
| } |
| |
| Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno, |
| uint64_t offset, int64_t client_maxlen, |
| string* data, int64_t* log_file_size, |
| TabletCopyErrorPB::Code* error_code) { |
| DCHECK(init_once_.init_succeeded()); |
| RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(error_code), |
| "Tablet copy source could not get log segment"); |
| ImmutableRWFileInfo* file_info = nullptr; |
| RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code)); |
| CHECK(file_info); |
| const auto& kHeaderSize = file_info->readable->GetEncryptionHeaderSize(); |
| // To make sure unencrypted tservers can copy tablets from encrypted tservers |
| // and vice versa (which is a valid scenario when encrypting an existing |
| // cluster in place), we add the encryption header size here and start at 0 at |
| // the client side. |
| RETURN_NOT_OK(ReadFileChunkToBuf(file_info, offset + kHeaderSize, client_maxlen, |
| Substitute("log segment $0", segment_seqno), |
| data, log_file_size, error_code)); |
| *log_file_size -= kHeaderSize; |
| |
| // Note: We do not eagerly close log segment files, since we share ownership |
| // of the LogSegment objects with the Log itself. |
| return Status::OK(); |
| } |
| |
| bool TabletCopySourceSession::IsBlockOpenForTests(const BlockId& block_id) const { |
| DCHECK(init_once_.init_succeeded()); |
| return ContainsKey(blocks_, block_id); |
| } |
| |
| // Add a file to the cache and populate the given ImmutableRandomAcccessFileInfo |
| // object with the file ref and size. |
| template <class Collection, class Key, class Readable> |
| static Status AddImmutableFileToMap(Collection* const cache, |
| const Key& key, |
| const Readable& readable, |
| uint64_t size) { |
| // Sanity check for 0-length files. |
| if (size == 0) { |
| return Status::Corruption("Found 0-length object"); |
| } |
| |
| // Looks good, add it to the cache. |
| typedef typename Collection::mapped_type InfoPtr; |
| typedef typename base::remove_pointer<InfoPtr>::type Info; |
| InsertOrDie(cache, key, new Info(readable, size)); |
| |
| return Status::OK(); |
| } |
| |
| Status TabletCopySourceSession::CheckHealthyDirGroup(TabletCopyErrorPB::Code* error_code) const { |
| if (fs_manager_->dd_manager()->IsTabletInFailedDir(tablet_id_)) { |
| if (error_code) { |
| *error_code = TabletCopyErrorPB::IO_ERROR; |
| } |
| return Status::IOError( |
| Substitute("Tablet $0 is in a failed directory", tablet_id_)); |
| } |
| return Status::OK(); |
| } |
| |
| Status TabletCopySourceSession::OpenBlock(const BlockId& block_id) { |
| unique_ptr<ReadableBlock> block; |
| Status s = fs_manager_->OpenBlock(block_id, &block); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG(WARNING) << "Unable to open requested (existing) block file: " |
| << block_id.ToString() << ": " << s.ToString(); |
| return s.CloneAndPrepend(Substitute("Unable to open block file for block $0", |
| block_id.ToString())); |
| } |
| |
| uint64_t size; |
| s = block->Size(&size); |
| if (PREDICT_FALSE(!s.ok())) { |
| return s.CloneAndPrepend("Unable to get size of block"); |
| } |
| |
| s = AddImmutableFileToMap(&blocks_, block_id, block.get(), size); |
| if (!s.ok()) { |
| s = s.CloneAndPrepend(Substitute("Error accessing data for block $0", block_id.ToString())); |
| LOG(DFATAL) << "Data block disappeared: " << s.ToString(); |
| } else { |
| ignore_result(block.release()); |
| } |
| return s; |
| } |
| |
| Status TabletCopySourceSession::FindBlock(const BlockId& block_id, |
| ImmutableReadableBlockInfo** block_info, |
| TabletCopyErrorPB::Code* error_code) { |
| if (!FindCopy(blocks_, block_id, block_info)) { |
| *error_code = TabletCopyErrorPB::BLOCK_NOT_FOUND; |
| return Status::NotFound("Block not found", block_id.ToString()); |
| } |
| return Status::OK(); |
| } |
| |
| Status TabletCopySourceSession::OpenLogSegment(uint64_t segment_seqno) { |
| scoped_refptr<log::ReadableLogSegment> log_segment; |
| int position = -1; |
| if (!log_segments_.empty()) { |
| position = segment_seqno - log_segments_[0]->header().sequence_number(); |
| } |
| if (position < 0 || position >= log_segments_.size()) { |
| return Status::NotFound(Substitute("Segment with sequence number $0 not found", |
| segment_seqno)); |
| } |
| log_segment = log_segments_[position]; |
| CHECK_EQ(log_segment->header().sequence_number(), segment_seqno); |
| |
| uint64_t size = log_segment->readable_up_to(); |
| Status s = AddImmutableFileToMap(&logs_, segment_seqno, log_segment->file(), size); |
| if (!s.ok()) { |
| s = s.CloneAndPrepend( |
| Substitute("Error accessing data for log segment with seqno $0", |
| segment_seqno)); |
| LOG(INFO) << s.ToString(); |
| } |
| return s; |
| } |
| |
| Status TabletCopySourceSession::FindLogSegment(uint64_t segment_seqno, |
| ImmutableRWFileInfo** file_info, |
| TabletCopyErrorPB::Code* error_code) { |
| if (!FindCopy(logs_, segment_seqno, file_info)) { |
| *error_code = TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND; |
| return Status::NotFound(Substitute("Segment with sequence number $0 not found", |
| segment_seqno)); |
| } |
| return Status::OK(); |
| } |
| |
| RemoteTabletCopySourceSession::RemoteTabletCopySourceSession( |
| scoped_refptr<TabletReplica> tablet_replica, |
| string session_id, |
| string requestor_uuid, |
| FsManager* fs_manager, |
| TabletCopySourceMetrics* tablet_copy_metrics) |
| : TabletCopySourceSession(tablet_replica->tablet_id(), |
| fs_manager, |
| tablet_copy_metrics), |
| tablet_replica_(std::move(tablet_replica)), |
| session_id_(std::move(session_id)), |
| requestor_uuid_(std::move(requestor_uuid)) { |
| } |
| |
| RemoteTabletCopySourceSession::~RemoteTabletCopySourceSession() { |
| // No lock taken in the destructor, should only be 1 thread with access now. |
| CHECK_OK(UnregisterAnchorIfNeededUnlocked()); |
| } |
| |
| Status RemoteTabletCopySourceSession::UnregisterAnchorIfNeededUnlocked() { |
| return tablet_replica_->log_anchor_registry()->UnregisterIfAnchored(&log_anchor_); |
| } |
| |
| LocalTabletCopySourceSession::LocalTabletCopySourceSession( |
| string tablet_id, |
| FsManager* fs_manager, |
| TabletCopySourceMetrics* tablet_copy_metrics) |
| : TabletCopySourceSession(std::move(tablet_id), |
| fs_manager, |
| tablet_copy_metrics) { |
| } |
| |
| Status LocalTabletCopySourceSession::InitOnce() { |
| RETURN_NOT_OK(CheckHealthyDirGroup()); |
| |
| // Read the SuperBlock from disk. |
| string path = fs_manager_->GetTabletMetadataPath(tablet_id_); |
| RETURN_NOT_OK_PREPEND( |
| pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, &tablet_superblock_, |
| pb_util::SENSITIVE), |
| Substitute("Unable to access superblock for tablet $0", tablet_id_)); |
| |
| // Open the data blocks and add them to the cache. |
| // NOTE: Since the server is stopped, it's not needed to anchor either |
| // data blocks or log segments. |
| // |
| // All subsequent requests should reuse the opened blocks. |
| vector<BlockIdPB> data_blocks = |
| TabletMetadata::CollectBlockIdPBs(tablet_superblock_); |
| for (const BlockIdPB& block_id : data_blocks) { |
| VLOG(1) << "Opening block " << pb_util::SecureDebugString(block_id); |
| RETURN_NOT_OK(OpenBlock(BlockId::FromPB(block_id))); |
| } |
| |
| // Get the current segments from the log. |
| shared_ptr<log::LogReader> reader; |
| RETURN_NOT_OK(log::LogReader::Open(fs_manager_, |
| /*index*/nullptr, |
| tablet_id_, |
| /*metric_entity*/nullptr, |
| /*file_cache*/nullptr, |
| &reader)); |
| reader->GetSegmentsSnapshot(&log_segments_); |
| for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) { |
| RETURN_NOT_OK(OpenLogSegment(segment->header().sequence_number())); |
| } |
| |
| // Load the current consensus state. |
| scoped_refptr<consensus::ConsensusMetadataManager> cmeta_mgr( |
| new consensus::ConsensusMetadataManager(fs_manager_)); |
| scoped_refptr<consensus::ConsensusMetadata> cmeta; |
| RETURN_NOT_OK(cmeta_mgr->Load(tablet_id_, &cmeta)); |
| initial_cstate_ = cmeta->ToConsensusStatePB(); |
| |
| LOG(INFO) << Substitute( |
| "T $0: Tablet Copy: opened $1 blocks and $2 log segments", |
| tablet_id_, data_blocks.size(), log_segments_.size()); |
| return Status::OK(); |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |