blob: a7cdde1a4a759493c48ff09f60f05c53ac3d260c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/remote_bootstrap_client.h"
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/gutil/walltime.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/transfer.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tserver/remote_bootstrap.pb.h"
#include "kudu/tserver/remote_bootstrap.proxy.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/util/crc.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/net/net_util.h"
DEFINE_int32(tablet_copy_begin_session_timeout_ms, 3000,
"Tablet server RPC client timeout for BeginTabletCopySession calls. "
"Also used for EndTabletCopySession calls.");
TAG_FLAG(tablet_copy_begin_session_timeout_ms, hidden);
DEFINE_bool(tablet_copy_save_downloaded_metadata, false,
"Save copies of the downloaded tablet copy files for debugging purposes. "
"Note: This is only intended for debugging and should not be normally used!");
TAG_FLAG(tablet_copy_save_downloaded_metadata, advanced);
TAG_FLAG(tablet_copy_save_downloaded_metadata, hidden);
TAG_FLAG(tablet_copy_save_downloaded_metadata, runtime);
DEFINE_int32(tablet_copy_dowload_file_inject_latency_ms, 0,
"Injects latency into the loop that downloads files, causing tablet copy "
"to take much longer. For use in tests only.");
TAG_FLAG(tablet_copy_dowload_file_inject_latency_ms, hidden);
DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
// RETURN_NOT_OK_PREPEND() with a remote-error unwinding step.
#define RETURN_NOT_OK_UNWIND_PREPEND(status, controller, msg) \
RETURN_NOT_OK_PREPEND(UnwindRemoteError(status, controller), msg)
namespace kudu {
namespace tserver {
using consensus::ConsensusMetadata;
using consensus::ConsensusStatePB;
using consensus::OpId;
using consensus::RaftConfigPB;
using consensus::RaftPeerPB;
using env_util::CopyFile;
using fs::WritableBlock;
using rpc::Messenger;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
using tablet::ColumnDataPB;
using tablet::DeltaDataPB;
using tablet::RowSetDataPB;
using tablet::TabletDataState;
using tablet::TabletDataState_Name;
using tablet::TabletMetadata;
using tablet::TabletStatusListener;
using tablet::TabletSuperBlockPB;
TabletCopyClient::TabletCopyClient(std::string tablet_id,
FsManager* fs_manager,
shared_ptr<Messenger> messenger)
: tablet_id_(std::move(tablet_id)),
fs_manager_(fs_manager),
messenger_(std::move(messenger)),
started_(false),
downloaded_wal_(false),
downloaded_blocks_(false),
replace_tombstoned_tablet_(false),
status_listener_(nullptr),
session_idle_timeout_millis_(0),
start_time_micros_(0) {}
TabletCopyClient::~TabletCopyClient() {
// Note: Ending the tablet copy session releases anchors on the remote.
WARN_NOT_OK(EndRemoteSession(), "Unable to close tablet copy session");
}
Status TabletCopyClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>& meta,
int64_t caller_term) {
CHECK_EQ(tablet_id_, meta->tablet_id());
TabletDataState data_state = meta->tablet_data_state();
if (data_state != tablet::TABLET_DATA_TOMBSTONED) {
return Status::IllegalState(Substitute("Tablet $0 not in tombstoned state: $1 ($2)",
tablet_id_,
TabletDataState_Name(data_state),
data_state));
}
replace_tombstoned_tablet_ = true;
meta_ = meta;
int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
if (last_logged_term > caller_term) {
return Status::InvalidArgument(
Substitute("Leader has term $0 but the last log entry written by the tombstoned replica "
"for tablet $1 has higher term $2. Refusing tablet copy from leader",
caller_term, tablet_id_, last_logged_term));
}
// Load the old consensus metadata, if it exists.
gscoped_ptr<ConsensusMetadata> cmeta;
Status s = ConsensusMetadata::Load(fs_manager_, tablet_id_,
fs_manager_->uuid(), &cmeta);
if (s.IsNotFound()) {
// The consensus metadata was not written to disk, possibly due to a failed
// tablet copy.
return Status::OK();
}
RETURN_NOT_OK(s);
cmeta_.swap(cmeta);
return Status::OK();
}
Status TabletCopyClient::Start(const HostPort& copy_source_addr,
scoped_refptr<TabletMetadata>* meta) {
CHECK(!started_);
start_time_micros_ = GetCurrentTimeMicros();
Sockaddr addr;
RETURN_NOT_OK(SockaddrFromHostPort(copy_source_addr, &addr));
if (addr.IsWildcard()) {
return Status::InvalidArgument("Invalid wildcard address to tablet copy from",
Substitute("$0 (resolved to $1)",
copy_source_addr.host(), addr.host()));
}
LOG_WITH_PREFIX(INFO) << "Beginning tablet copy session"
<< " from remote peer at address " << copy_source_addr.ToString();
// Set up an RPC proxy for the TabletCopyService.
proxy_.reset(new TabletCopyServiceProxy(messenger_, addr));
BeginTabletCopySessionRequestPB req;
req.set_requestor_uuid(fs_manager_->uuid());
req.set_tablet_id(tablet_id_);
rpc::RpcController controller;
controller.set_timeout(MonoDelta::FromMilliseconds(
FLAGS_tablet_copy_begin_session_timeout_ms));
// Begin the tablet copy session with the remote peer.
BeginTabletCopySessionResponsePB resp;
RETURN_NOT_OK_UNWIND_PREPEND(proxy_->BeginTabletCopySession(req, &resp, &controller),
controller,
"Unable to begin tablet copy session");
string copy_peer_uuid = resp.has_responder_uuid()
? resp.responder_uuid() : "(unknown uuid)";
if (resp.superblock().tablet_data_state() != tablet::TABLET_DATA_READY) {
Status s = Status::IllegalState("Remote peer (" + copy_peer_uuid + ")" +
" is currently copying itself!",
resp.superblock().ShortDebugString());
LOG_WITH_PREFIX(WARNING) << s.ToString();
return s;
}
session_id_ = resp.session_id();
session_idle_timeout_millis_ = resp.session_idle_timeout_millis();
superblock_.reset(resp.release_superblock());
superblock_->set_tablet_data_state(tablet::TABLET_DATA_COPYING);
wal_seqnos_.assign(resp.wal_segment_seqnos().begin(), resp.wal_segment_seqnos().end());
remote_committed_cstate_.reset(resp.release_initial_committed_cstate());
Schema schema;
RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock_->schema(), &schema),
"Cannot deserialize schema from remote superblock");
if (replace_tombstoned_tablet_) {
// Also validate the term of the source peer, in case they are
// different. This is a sanity check that protects us in case a bug or
// misconfiguration causes us to attempt to copy from an out-of-date
// source peer, even after passing the term check from the caller in
// SetTabletToReplace().
int64_t last_logged_term = meta_->tombstone_last_logged_opid().term();
if (last_logged_term > remote_committed_cstate_->current_term()) {
return Status::InvalidArgument(
Substitute("Tablet $0: source peer has term $1 but "
"tombstoned replica has last-logged opid with higher term $2. "
"Refusing tablet copy from source peer $3",
tablet_id_,
remote_committed_cstate_->current_term(),
last_logged_term,
copy_peer_uuid));
}
// This will flush to disk, but we set the data state to COPYING above.
RETURN_NOT_OK_PREPEND(meta_->ReplaceSuperBlock(*superblock_),
"Tablet Copy unable to replace superblock on tablet " +
tablet_id_);
} else {
Partition partition;
Partition::FromPB(superblock_->partition(), &partition);
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(superblock_->partition_schema(),
schema, &partition_schema));
// Create the superblock on disk.
RETURN_NOT_OK(TabletMetadata::CreateNew(fs_manager_, tablet_id_,
superblock_->table_name(),
superblock_->table_id(),
schema,
partition_schema,
partition,
tablet::TABLET_DATA_COPYING,
&meta_));
}
started_ = true;
if (meta) {
*meta = meta_;
}
return Status::OK();
}
Status TabletCopyClient::FetchAll(TabletStatusListener* status_listener) {
CHECK(started_);
status_listener_ = status_listener;
// Download all the files (serially, for now, but in parallel in the future).
RETURN_NOT_OK(DownloadBlocks());
RETURN_NOT_OK(DownloadWALs());
return Status::OK();
}
Status TabletCopyClient::Finish() {
CHECK(meta_);
CHECK(started_);
CHECK(downloaded_wal_);
CHECK(downloaded_blocks_);
RETURN_NOT_OK(WriteConsensusMetadata());
// Replace tablet metadata superblock. This will set the tablet metadata state
// to TABLET_DATA_READY, since we checked above that the response
// superblock is in a valid state to bootstrap from.
LOG_WITH_PREFIX(INFO) << "Tablet Copy complete. Replacing tablet superblock.";
UpdateStatusMessage("Replacing tablet superblock");
new_superblock_->set_tablet_data_state(tablet::TABLET_DATA_READY);
RETURN_NOT_OK(meta_->ReplaceSuperBlock(*new_superblock_));
if (FLAGS_tablet_copy_save_downloaded_metadata) {
string meta_path = fs_manager_->GetTabletMetadataPath(tablet_id_);
string meta_copy_path = Substitute("$0.copy.$1.tmp", meta_path, start_time_micros_);
RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), meta_path, meta_copy_path,
WritableFileOptions()),
"Unable to make copy of tablet metadata");
}
return Status::OK();
}
// Decode the remote error into a human-readable Status object.
Status TabletCopyClient::ExtractRemoteError(const rpc::ErrorStatusPB& remote_error) {
if (PREDICT_TRUE(remote_error.HasExtension(TabletCopyErrorPB::tablet_copy_error_ext))) {
const TabletCopyErrorPB& error =
remote_error.GetExtension(TabletCopyErrorPB::tablet_copy_error_ext);
return StatusFromPB(error.status()).CloneAndPrepend("Received error code " +
TabletCopyErrorPB::Code_Name(error.code()) + " from remote service");
} else {
return Status::InvalidArgument("Unable to decode tablet copy RPC error message",
remote_error.ShortDebugString());
}
}
// Enhance a RemoteError Status message with additional details from the remote.
Status TabletCopyClient::UnwindRemoteError(const Status& status,
const rpc::RpcController& controller) {
if (!status.IsRemoteError()) {
return status;
}
Status extension_status = ExtractRemoteError(*controller.error_response());
return status.CloneAndAppend(extension_status.ToString());
}
void TabletCopyClient::UpdateStatusMessage(const string& message) {
if (status_listener_ != nullptr) {
status_listener_->StatusMessage("TabletCopy: " + message);
}
}
Status TabletCopyClient::EndRemoteSession() {
if (!started_) {
return Status::OK();
}
rpc::RpcController controller;
controller.set_timeout(MonoDelta::FromMilliseconds(
FLAGS_tablet_copy_begin_session_timeout_ms));
EndTabletCopySessionRequestPB req;
req.set_session_id(session_id_);
req.set_is_success(true);
EndTabletCopySessionResponsePB resp;
RETURN_NOT_OK_UNWIND_PREPEND(proxy_->EndTabletCopySession(req, &resp, &controller),
controller,
"Failure ending tablet copy session");
return Status::OK();
}
Status TabletCopyClient::DownloadWALs() {
CHECK(started_);
// Delete and recreate WAL dir if it already exists, to ensure stray files are
// not kept from previous copies and runs.
string path = fs_manager_->GetTabletWalDir(tablet_id_);
if (fs_manager_->env()->FileExists(path)) {
RETURN_NOT_OK(fs_manager_->env()->DeleteRecursively(path));
}
RETURN_NOT_OK(fs_manager_->env()->CreateDir(path));
RETURN_NOT_OK(fs_manager_->env()->SyncDir(DirName(path))); // fsync() parent dir.
// Download the WAL segments.
int num_segments = wal_seqnos_.size();
LOG_WITH_PREFIX(INFO) << "Starting download of " << num_segments << " WAL segments...";
uint64_t counter = 0;
for (uint64_t seg_seqno : wal_seqnos_) {
UpdateStatusMessage(Substitute("Downloading WAL segment with seq. number $0 ($1/$2)",
seg_seqno, counter + 1, num_segments));
RETURN_NOT_OK(DownloadWAL(seg_seqno));
++counter;
}
downloaded_wal_ = true;
return Status::OK();
}
Status TabletCopyClient::DownloadBlocks() {
CHECK(started_);
// Count up the total number of blocks to download.
int num_blocks = 0;
for (const RowSetDataPB& rowset : superblock_->rowsets()) {
num_blocks += rowset.columns_size();
num_blocks += rowset.redo_deltas_size();
num_blocks += rowset.undo_deltas_size();
if (rowset.has_bloom_block()) {
num_blocks++;
}
if (rowset.has_adhoc_index_block()) {
num_blocks++;
}
}
// Download each block, writing the new block IDs into the new superblock
// as each block downloads.
gscoped_ptr<TabletSuperBlockPB> new_sb(new TabletSuperBlockPB());
new_sb->CopyFrom(*superblock_);
int block_count = 0;
LOG_WITH_PREFIX(INFO) << "Starting download of " << num_blocks << " data blocks...";
for (RowSetDataPB& rowset : *new_sb->mutable_rowsets()) {
for (ColumnDataPB& col : *rowset.mutable_columns()) {
RETURN_NOT_OK(DownloadAndRewriteBlock(col.mutable_block(),
&block_count, num_blocks));
}
for (DeltaDataPB& redo : *rowset.mutable_redo_deltas()) {
RETURN_NOT_OK(DownloadAndRewriteBlock(redo.mutable_block(),
&block_count, num_blocks));
}
for (DeltaDataPB& undo : *rowset.mutable_undo_deltas()) {
RETURN_NOT_OK(DownloadAndRewriteBlock(undo.mutable_block(),
&block_count, num_blocks));
}
if (rowset.has_bloom_block()) {
RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_bloom_block(),
&block_count, num_blocks));
}
if (rowset.has_adhoc_index_block()) {
RETURN_NOT_OK(DownloadAndRewriteBlock(rowset.mutable_adhoc_index_block(),
&block_count, num_blocks));
}
}
// The orphaned physical block ids at the remote have no meaning to us.
new_sb->clear_orphaned_blocks();
new_superblock_.swap(new_sb);
downloaded_blocks_ = true;
return Status::OK();
}
Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
VLOG_WITH_PREFIX(1) << "Downloading WAL segment with seqno " << wal_segment_seqno;
DataIdPB data_id;
data_id.set_type(DataIdPB::LOG_SEGMENT);
data_id.set_wal_segment_seqno(wal_segment_seqno);
string dest_path = fs_manager_->GetWalSegmentFileName(tablet_id_, wal_segment_seqno);
WritableFileOptions opts;
opts.sync_on_close = true;
gscoped_ptr<WritableFile> writer;
RETURN_NOT_OK_PREPEND(fs_manager_->env()->NewWritableFile(opts, dest_path, &writer),
"Unable to open file for writing");
RETURN_NOT_OK_PREPEND(DownloadFile(data_id, writer.get()),
Substitute("Unable to download WAL segment with seq. number $0",
wal_segment_seqno));
return Status::OK();
}
Status TabletCopyClient::WriteConsensusMetadata() {
// If we didn't find a previous consensus meta file, create one.
if (!cmeta_) {
gscoped_ptr<ConsensusMetadata> cmeta;
return ConsensusMetadata::Create(fs_manager_, tablet_id_, fs_manager_->uuid(),
remote_committed_cstate_->config(),
remote_committed_cstate_->current_term(),
&cmeta);
}
// Otherwise, update the consensus metadata to reflect the config and term
// sent by the tablet copy source.
cmeta_->MergeCommittedConsensusStatePB(*remote_committed_cstate_);
RETURN_NOT_OK(cmeta_->Flush());
if (FLAGS_tablet_copy_save_downloaded_metadata) {
string cmeta_path = fs_manager_->GetConsensusMetadataPath(tablet_id_);
string cmeta_copy_path = Substitute("$0.copy.$1.tmp", cmeta_path, start_time_micros_);
RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), cmeta_path, cmeta_copy_path,
WritableFileOptions()),
"Unable to make copy of consensus metadata");
}
return Status::OK();
}
Status TabletCopyClient::DownloadAndRewriteBlock(BlockIdPB* block_id,
int* block_count, int num_blocks) {
BlockId old_block_id(BlockId::FromPB(*block_id));
UpdateStatusMessage(Substitute("Downloading block $0 ($1/$2)",
old_block_id.ToString(), *block_count,
num_blocks));
BlockId new_block_id;
RETURN_NOT_OK_PREPEND(DownloadBlock(old_block_id, &new_block_id),
"Unable to download block with id " + old_block_id.ToString());
new_block_id.CopyToPB(block_id);
(*block_count)++;
return Status::OK();
}
Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
BlockId* new_block_id) {
VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << old_block_id.ToString();
gscoped_ptr<WritableBlock> block;
RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(&block),
"Unable to create new block");
DataIdPB data_id;
data_id.set_type(DataIdPB::BLOCK);
old_block_id.CopyToPB(data_id.mutable_block_id());
RETURN_NOT_OK_PREPEND(DownloadFile(data_id, block.get()),
Substitute("Unable to download block $0",
old_block_id.ToString()));
*new_block_id = block->id();
RETURN_NOT_OK_PREPEND(block->Close(), "Unable to close block");
return Status::OK();
}
template<class Appendable>
Status TabletCopyClient::DownloadFile(const DataIdPB& data_id,
Appendable* appendable) {
uint64_t offset = 0;
rpc::RpcController controller;
controller.set_timeout(MonoDelta::FromMilliseconds(session_idle_timeout_millis_));
FetchDataRequestPB req;
bool done = false;
while (!done) {
controller.Reset();
req.set_session_id(session_id_);
req.mutable_data_id()->CopyFrom(data_id);
req.set_offset(offset);
req.set_max_length(FLAGS_tablet_copy_transfer_chunk_size_bytes);
FetchDataResponsePB resp;
RETURN_NOT_OK_UNWIND_PREPEND(proxy_->FetchData(req, &resp, &controller),
controller,
"Unable to fetch data from remote");
// Sanity-check for corruption.
RETURN_NOT_OK_PREPEND(VerifyData(offset, resp.chunk()),
Substitute("Error validating data item $0", data_id.ShortDebugString()));
// Write the data.
RETURN_NOT_OK(appendable->Append(resp.chunk().data()));
if (PREDICT_FALSE(FLAGS_tablet_copy_dowload_file_inject_latency_ms > 0)) {
LOG_WITH_PREFIX(INFO) << "Injecting latency into file download: " <<
FLAGS_tablet_copy_dowload_file_inject_latency_ms;
SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_dowload_file_inject_latency_ms));
}
if (offset + resp.chunk().data().size() == resp.chunk().total_data_length()) {
done = true;
}
offset += resp.chunk().data().size();
}
return Status::OK();
}
Status TabletCopyClient::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
// Verify the offset is what we expected.
if (offset != chunk.offset()) {
return Status::InvalidArgument("Offset did not match what was asked for",
Substitute("$0 vs $1", offset, chunk.offset()));
}
// Verify the checksum.
uint32_t crc32 = crc::Crc32c(chunk.data().data(), chunk.data().length());
if (PREDICT_FALSE(crc32 != chunk.crc32())) {
return Status::Corruption(
Substitute("CRC32 does not match at offset $0 size $1: $2 vs $3",
offset, chunk.data().size(), crc32, chunk.crc32()));
}
return Status::OK();
}
string TabletCopyClient::LogPrefix() {
return Substitute("T $0 P $1: Tablet Copy client: ",
tablet_id_, fs_manager_->uuid());
}
} // namespace tserver
} // namespace kudu