blob: 3452d6ee07ec8fcae21de65d0f62815213f4c6f8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include <gtest/gtest_prod.h>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
namespace kudu {
class BlockId;
class BlockIdPB;
class FsManager;
class HostPort;
class ThreadPool;
namespace consensus {
class ConsensusMetadata;
class ConsensusMetadataManager;
class ConsensusStatePB;
} // namespace consensus
namespace fs {
class BlockCreationTransaction;
} // namespace fs
namespace rpc {
class Messenger;
class RpcController;
} // namespace rpc
namespace tablet {
class RowSetDataPB;
class TabletMetadata;
class TabletReplica;
class TabletSuperBlockPB;
} // namespace tablet
namespace tserver {
class DataChunkPB;
class DataIdPB;
class TabletCopyServiceProxy;
// Server-wide tablet copy metrics.
struct TabletCopyClientMetrics {
explicit TabletCopyClientMetrics(const scoped_refptr<MetricEntity>& metric_entity);
scoped_refptr<Counter> bytes_fetched;
scoped_refptr<AtomicGauge<int32_t>> open_client_sessions;
};
// Client class for using tablet copy to copy a tablet from another host.
// This class is not thread-safe.
//
class TabletCopyClient {
public:
// Construct the tablet copy client.
//
// Objects behind raw pointers must remain valid until this object is destroyed.
TabletCopyClient(std::string tablet_id,
FsManager* fs_manager,
scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
std::shared_ptr<rpc::Messenger> messenger,
TabletCopyClientMetrics* tablet_copy_metrics);
// Attempt to clean up resources on the remote end by sending an
// EndTabletCopySession() RPC
~TabletCopyClient();
// Pass in the existing metadata for a tombstoned tablet, which will be
// replaced if validation checks pass in Start().
// 'meta' is the metadata for the tombstoned tablet and 'caller_term' is the
// term provided by the caller (assumed to be the current leader of the
// consensus config) for validation purposes.
// If the consensus metadata exists on disk for this tablet, and if
// 'caller_term' is lower than the current term stored in that consensus
// metadata, then this method will fail with a Status::InvalidArgument error.
Status SetTabletToReplace(const scoped_refptr<tablet::TabletMetadata>& meta,
int64_t caller_term);
// Start up a tablet copy session to bootstrap from the specified
// bootstrap peer. Place a new superblock indicating that tablet copy is
// in progress. If the 'metadata' pointer is passed as NULL, it is ignored,
// otherwise the TabletMetadata object resulting from the initial remote
// bootstrap response is returned.
//
// Upon success, tablet metadata will be created and the tablet will be
// assigned to a data directory group.
Status Start(const HostPort& copy_source_addr,
scoped_refptr<tablet::TabletMetadata>* meta);
// Runs a "full" tablet copy, copying the physical layout of a tablet
// from the leader of the specified consensus configuration.
// Adds all downloaded blocks during copying to the tablet copy's transaction,
// which will be closed together at Finish().
Status FetchAll(const scoped_refptr<tablet::TabletReplica>& tablet_replica);
// After downloading all files successfully, commit all downloaded blocks.
// Write out the completed replacement superblock.
// Must be called after Start() and FetchAll(). Must not be called after Abort().
Status Finish();
// Abort an in-progress transfer and immediately delete the data blocks and
// WALs downloaded so far. Does nothing if called after Finish().
Status Abort();
private:
FRIEND_TEST(TabletCopyClientTest, TestNoBlocksAtStart);
FRIEND_TEST(TabletCopyClientTest, TestBeginEndSession);
FRIEND_TEST(TabletCopyClientTest, TestDownloadBlock);
FRIEND_TEST(TabletCopyClientTest, TestLifeCycle);
FRIEND_TEST(TabletCopyClientTest, TestVerifyData);
FRIEND_TEST(TabletCopyClientTest, TestDownloadBlockMayFail);
FRIEND_TEST(TabletCopyClientTest, TestDownloadWalMayFail);
FRIEND_TEST(TabletCopyClientTest, TestDownloadWalSegment);
FRIEND_TEST(TabletCopyClientTest, TestDownloadAllBlocks);
FRIEND_TEST(TabletCopyClientAbortTest, TestAbort);
// State machine that guides the progression of a single tablet copy.
// A tablet copy will go through the states:
//
// kInitialized --> kStarting --> kStarted --> kFinished
// | | | ^^^
// | | +----------+||
// | +-------------------------+|
// +------------------------------------------+
enum State {
// The copy has yet to do anything and no changes have been made on disk.
kInitialized,
// The copy has begun updating metadata on disk, but has not begun
// receiving WAL segments or blocks from the tablet copy source. Being in
// this state or beyond implies that 'meta_' is non-null.
kStarting,
// The metadata has been updated on disk to indicate the start of a new
// copy. Blocks, metadata, and WAL segments may be received from the tablet
// copy source in this state.
kStarted,
// The copy is finished and needs no cleanup.
kFinished,
};
static Status UnwindRemoteError(const Status& status, const rpc::RpcController& controller);
// Returns an error if any directories in the tablet's directory group are
// unhealthy.
Status CheckHealthyDirGroup() const;
// Set a new status message on the TabletReplica.
// The string "TabletCopy: " will be prepended to each message.
void SetStatusMessage(const std::string& message);
// End the tablet copy session.
Status EndRemoteSession();
// Download all WAL files in parallel.
Status DownloadWALs();
// Download a single WAL file.
// Assumes the WAL directories have already been created.
// WAL file is opened with options so that it will fsync() on close.
Status DownloadWAL(uint64_t wal_segment_seqno);
// Write out the Consensus Metadata file based on the ConsensusStatePB
// downloaded as part of initiating the tablet copy session.
Status WriteConsensusMetadata();
// Count the number of blocks on the remote (from 'remote_superblock_').
int CountRemoteBlocks() const;
// A task do download blocks of the specified rowset.
// In case of a failure, a thread notifies others via 'end_status',
// so they can abort downloading of their rowsets' blocks.
// If all threads succeed, 'end_status' is set to Status::OK().
void DownloadRowset(const tablet::RowSetDataPB& src_rowset,
int num_remote_blocks,
std::atomic<int>* block_count,
Status* end_status);
// Download all blocks belonging to a tablet sequentially. Add all
// downloaded blocks to the tablet copy's transaction.
//
// Blocks are given new IDs upon creation. On success, 'superblock_'
// is populated to reflect the new block IDs.
Status DownloadBlocks();
// Download the remote block specified by 'src_block_id'. 'num_blocks' should
// be given as the total number of blocks there are to download (for logging
// purposes). Add the block to the tablet copy's transaction, to close blocks
// belonging to the transaction together when the copying is complete.
//
// On success:
// - 'dest_block_id' is set to the new ID of the downloaded block.
// - 'block_count' is incremented by 1.
Status DownloadAndRewriteBlock(const BlockIdPB& src_block_id,
int num_blocks,
std::atomic<int32_t>* block_count,
BlockIdPB* dest_block_id);
// Download and rewrite remote block if end_status it hasn't been set as failed.
// This method calls DownloadAndRewriteBlock and return its' status.
// This method is thread-safe.
//
// On failure, end_status is set as error status of DownloadAndRewriteBlock.
Status DownloadAndRewriteBlockIfEndStatusOK(const BlockIdPB& src_block_id,
int num_blocks,
std::atomic<int32_t>* block_count,
BlockIdPB* dest_block_id,
Status* end_status);
// Download a single block.
// Data block is opened with new ID. After downloading, the block is finalized
// and added to the tablet copy's transaction.
//
// On success, 'new_block_id' is set to the new ID of the downloaded block.
Status DownloadBlock(const BlockId& old_block_id,
BlockId* new_block_id);
// Download a single remote file. The block and WAL implementations delegate
// to this method when downloading files.
//
// An Appendable is typically a WritableBlock (block) or WritableFile (WAL).
//
// Only used in one compilation unit, otherwise the implementation would
// need to be in the header.
template<class Appendable>
Status DownloadFile(const DataIdPB& data_id, Appendable* appendable);
Status VerifyData(uint64_t offset, const DataChunkPB& chunk);
// Runs the provided functor, which must send an RPC and return the result
// status, until it succeeds, times out, or fails with a non-retriable error.
template<typename F>
Status SendRpcWithRetry(rpc::RpcController* controller, F f);
// Return standard log prefix.
std::string LogPrefix();
// Set-once members.
const std::string tablet_id_;
FsManager* const fs_manager_;
const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
const std::shared_ptr<rpc::Messenger> messenger_;
// State of the progress of the tablet copy operation.
State state_;
// Session-specific data items.
bool replace_tombstoned_tablet_;
// Local tablet metadata file.
scoped_refptr<tablet::TabletMetadata> meta_;
// Local Consensus metadata file. This may initially be NULL if this is
// bootstrapping a new replica (rather than replacing an old one) but it is
// guaranteed to be set after a successful call to Start().
scoped_refptr<consensus::ConsensusMetadata> cmeta_;
scoped_refptr<tablet::TabletReplica> tablet_replica_;
std::shared_ptr<TabletCopyServiceProxy> proxy_;
std::string session_id_;
uint64_t session_idle_timeout_millis_;
std::unique_ptr<tablet::TabletSuperBlockPB> remote_superblock_;
std::unique_ptr<tablet::TabletSuperBlockPB> superblock_;
std::unique_ptr<consensus::ConsensusStatePB> remote_cstate_;
std::vector<uint64_t> wal_seqnos_;
int64_t start_time_micros_;
Random rng_;
TabletCopyClientMetrics* tablet_copy_metrics_;
// Block transaction for the tablet copy.
std::unique_ptr<fs::BlockCreationTransaction> transaction_;
// Thread pool for downloading all data blocks and wals in parallel.
std::unique_ptr<ThreadPool> tablet_download_pool_;
// Protects adding/creating blocks, adding a rowset,
// reading/updating rowset/wal download status.
simple_spinlock simple_lock_;
DISALLOW_COPY_AND_ASSIGN(TabletCopyClient);
};
} // namespace tserver
} // namespace kudu