blob: 55ef98a3a7a187269f02deb86afa430485ce7813 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/tablet_copy_service.h"
#include <cstdint>
#include <functional>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/server/server_base.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/tablet_replica_lookup.h"
#include "kudu/util/crc.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random_util.h"
#include "kudu/util/thread.h"
#define RPC_RETURN_NOT_OK(expr, app_err, message, context) \
do { \
const Status& s = (expr); \
if (PREDICT_FALSE(!s.ok())) { \
SetupErrorAndRespond(context, app_err, message, s); \
return; \
} \
} while (false)
DEFINE_uint64(tablet_copy_idle_timeout_sec, 600,
"Amount of time without activity before a tablet copy "
"session will expire, in seconds");
TAG_FLAG(tablet_copy_idle_timeout_sec, advanced);
TAG_FLAG(tablet_copy_idle_timeout_sec, evolving);
DEFINE_uint64(tablet_copy_timeout_poll_period_ms, 10000,
"How often the tablet_copy service polls for expired "
"tablet copy sessions, in millis");
TAG_FLAG(tablet_copy_timeout_poll_period_ms, hidden);
DEFINE_double(fault_crash_on_handle_tc_fetch_data, 0.0,
"Fraction of the time when the tablet will crash while "
"servicing a TabletCopyService FetchData() RPC call. "
"(For testing only!)");
TAG_FLAG(fault_crash_on_handle_tc_fetch_data, unsafe);
DEFINE_double(tablet_copy_early_session_timeout_prob, 0,
"The probability that a tablet copy session will time out early, "
"resulting in tablet copy failure. (For testing only!)");
TAG_FLAG(tablet_copy_early_session_timeout_prob, runtime);
TAG_FLAG(tablet_copy_early_session_timeout_prob, unsafe);
using std::string;
using std::vector;
using strings::Substitute;
namespace google {
namespace protobuf {
class Message;
}
}
namespace kudu {
using crc::Crc32c;
using server::ServerBase;
using pb_util::SecureShortDebugString;
using tablet::TabletReplica;
namespace tserver {
TabletCopyServiceImpl::TabletCopyServiceImpl(
ServerBase* server,
TabletReplicaLookupIf* tablet_replica_lookup)
: TabletCopyServiceIf(server->metric_entity(), server->result_tracker()),
server_(server),
fs_manager_(CHECK_NOTNULL(server->fs_manager())),
tablet_replica_lookup_(CHECK_NOTNULL(tablet_replica_lookup)),
rand_(GetRandomSeed32()),
shutdown_latch_(1),
tablet_copy_metrics_(server->metric_entity()) {
CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp",
[this]() { this->EndExpiredSessions(); },
&session_expiration_thread_));
}
TabletCopyServiceImpl::SessionEntry::SessionEntry(scoped_refptr<TabletCopySourceSession> session_in)
: session(std::move(session_in)),
last_accessed_time(MonoTime::Now()),
expire_timeout(MonoDelta::FromSeconds(FLAGS_tablet_copy_idle_timeout_sec)) {
}
bool TabletCopyServiceImpl::AuthorizeServiceUser(const google::protobuf::Message* /*req*/,
google::protobuf::Message* /*resp*/,
rpc::RpcContext* rpc) {
return server_->Authorize(rpc, ServerBase::SUPER_USER | ServerBase::SERVICE_USER);
}
void TabletCopyServiceImpl::BeginTabletCopySession(
const BeginTabletCopySessionRequestPB* req,
BeginTabletCopySessionResponsePB* resp,
rpc::RpcContext* context) {
const string& requestor_uuid = req->requestor_uuid();
const string& tablet_id = req->tablet_id();
LOG_WITH_PREFIX(INFO) << Substitute(
"Received BeginTabletCopySession request for tablet $0 from peer $1 ($2)",
tablet_id, requestor_uuid, context->requestor_string());
// For now, we use the requestor_uuid with the tablet id as the session id,
// but there is no guarantee this will not change in the future.
const string session_id = Substitute("$0-$1", requestor_uuid, tablet_id);
scoped_refptr<TabletReplica> tablet_replica;
RPC_RETURN_NOT_OK(tablet_replica_lookup_->GetTabletReplica(tablet_id, &tablet_replica),
TabletCopyErrorPB::TABLET_NOT_FOUND,
Substitute("Unable to find specified tablet: $0", tablet_id),
context);
scoped_refptr<TabletCopySourceSession> session;
bool new_session;
{
MutexLock l(sessions_lock_);
const SessionEntry* session_entry = FindOrNull(sessions_, session_id);
new_session = session_entry == nullptr;
if (new_session) {
LOG_WITH_PREFIX(INFO) << Substitute(
"Beginning new tablet copy session on tablet $0 from peer $1"
" at $2: session id = $3",
tablet_id, requestor_uuid, context->requestor_string(), session_id);
session.reset(new TabletCopySourceSession(tablet_replica, session_id,
requestor_uuid, fs_manager_,
&tablet_copy_metrics_));
InsertOrDie(&sessions_, session_id, SessionEntry(session));
} else {
session = session_entry->session;
LOG_WITH_PREFIX(INFO) << Substitute(
"Re-sending initialization info for existing tablet copy session on tablet $0"
" from peer $1 at $2: session_id = $3",
tablet_id, requestor_uuid, context->requestor_string(), session_id);
}
ResetSessionExpirationUnlocked(session_id);
}
if (!new_session && !session->IsInitialized()) {
RPC_RETURN_NOT_OK(
Status::ServiceUnavailable(
Substitute("tablet copy session for tablet $0 is initializing", tablet_id)),
TabletCopyErrorPB::UNKNOWN_ERROR,
"try again later",
context);
}
// Ensure that the session initialization succeeds. If the initialization
// fails, then remove it from the sessions map.
Status status = session->Init();
if (!status.ok()) {
MutexLock l(sessions_lock_);
SessionEntry* session_entry = FindOrNull(sessions_, session_id);
// Identity check the session to ensure that other interleaved threads have
// not already removed the failed session, and replaced it with a new one.
if (session_entry && session == session_entry->session) {
sessions_.erase(session_id);
}
}
RPC_RETURN_NOT_OK(status,
TabletCopyErrorPB::UNKNOWN_ERROR,
Substitute("Error beginning tablet copy session for tablet $0", tablet_id),
context);
resp->set_responder_uuid(fs_manager_->uuid());
resp->set_session_id(session_id);
resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_sec * 1000);
resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
resp->mutable_initial_cstate()->CopyFrom(session->initial_cstate());
for (const scoped_refptr<log::ReadableLogSegment>& segment : session->log_segments()) {
resp->add_wal_segment_seqnos(segment->header().sequence_number());
}
// For testing: Close the session prematurely if unsafe gflag is set but
// still respond as if it was opened.
const auto timeout_prob = FLAGS_tablet_copy_early_session_timeout_prob;
if (PREDICT_FALSE(timeout_prob > 0 && rand_.NextDoubleFraction() <= timeout_prob)) {
LOG_WITH_PREFIX(WARNING) << "Timing out tablet copy session due to flag "
<< "--tablet_copy_early_session_timeout_prob "
<< "being set to " << timeout_prob;
MutexLock l(sessions_lock_);
TabletCopyErrorPB::Code app_error;
WARN_NOT_OK(TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(session_id, &app_error),
Substitute("Unable to forcibly end tablet copy session $0", session_id));
}
context->RespondSuccess();
}
void TabletCopyServiceImpl::CheckSessionActive(
const CheckTabletCopySessionActiveRequestPB* req,
CheckTabletCopySessionActiveResponsePB* resp,
rpc::RpcContext* context) {
const string& session_id = req->session_id();
// Look up and validate tablet copy session.
scoped_refptr<TabletCopySourceSession> session;
MutexLock l(sessions_lock_);
TabletCopyErrorPB::Code app_error;
Status status = FindSessionUnlocked(session_id, &app_error, &session);
if (status.ok()) {
if (req->keepalive()) {
ResetSessionExpirationUnlocked(session_id);
}
resp->set_session_is_active(true);
context->RespondSuccess();
return;
} else if (app_error == TabletCopyErrorPB::NO_SESSION) {
resp->set_session_is_active(false);
context->RespondSuccess();
return;
} else {
RPC_RETURN_NOT_OK(status, app_error,
Substitute("Error trying to check whether session $0 is active", session_id),
context);
}
}
void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
FetchDataResponsePB* resp,
rpc::RpcContext* context) {
const string& session_id = req->session_id();
// Look up and validate tablet copy session.
scoped_refptr<TabletCopySourceSession> session;
{
MutexLock l(sessions_lock_);
TabletCopyErrorPB::Code app_error;
RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session),
app_error, "No such session", context);
ResetSessionExpirationUnlocked(session_id);
}
if (!session->IsInitialized()) {
RPC_RETURN_NOT_OK(
Status::ServiceUnavailable("tablet copy session for tablet $0 is initializing",
session->tablet_id()),
TabletCopyErrorPB::UNKNOWN_ERROR,
"try again later",
context);
}
MAYBE_FAULT(FLAGS_fault_crash_on_handle_tc_fetch_data);
uint64_t offset = req->offset();
int64_t client_maxlen = req->max_length();
const DataIdPB& data_id = req->data_id();
TabletCopyErrorPB::Code error_code = TabletCopyErrorPB::UNKNOWN_ERROR;
RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &error_code, session),
error_code, "Invalid DataId", context);
DataChunkPB* data_chunk = resp->mutable_chunk();
string* data = data_chunk->mutable_data();
int64_t total_data_length = 0;
if (data_id.type() == DataIdPB::BLOCK) {
// Fetching a data block chunk.
const BlockId& block_id = BlockId::FromPB(data_id.block_id());
RPC_RETURN_NOT_OK(session->GetBlockPiece(block_id, offset, client_maxlen,
data, &total_data_length, &error_code),
error_code, "Unable to get piece of data block", context);
} else {
// Fetching a log segment chunk.
uint64_t segment_seqno = data_id.wal_segment_seqno();
RPC_RETURN_NOT_OK(session->GetLogSegmentPiece(segment_seqno, offset, client_maxlen,
data, &total_data_length, &error_code),
error_code, "Unable to get piece of log segment", context);
}
data_chunk->set_total_data_length(total_data_length);
data_chunk->set_offset(offset);
tablet_copy_metrics_.bytes_sent->IncrementBy(resp->chunk().data().size());
// Calculate checksum.
uint32_t crc32 = Crc32c(data->data(), data->length());
data_chunk->set_crc32(crc32);
context->RespondSuccess();
}
void TabletCopyServiceImpl::EndTabletCopySession(
const EndTabletCopySessionRequestPB* req,
EndTabletCopySessionResponsePB* /*resp*/,
rpc::RpcContext* context) {
{
MutexLock l(sessions_lock_);
TabletCopyErrorPB::Code app_error;
LOG_WITH_PREFIX(INFO) << "Request end of tablet copy session " << req->session_id()
<< " received from " << context->requestor_string();
RPC_RETURN_NOT_OK(DoEndTabletCopySessionUnlocked(req->session_id(), &app_error),
app_error, "No such session", context);
}
context->RespondSuccess();
}
void TabletCopyServiceImpl::Shutdown() {
shutdown_latch_.CountDown();
session_expiration_thread_->Join();
// Destroy all tablet copy sessions.
MutexLock l(sessions_lock_);
auto iter = sessions_.cbegin();
while (iter != sessions_.cend()) {
const string& session_id = iter->first;
// Increment the iterator before erasing the corresponding session from the
// map in DoEndTabletCopySessionUnlocked().
++iter; // Don't use until next iteration of the loop.
LOG_WITH_PREFIX(INFO) << "Destroying tablet copy session " << session_id
<< " due to service shutdown";
TabletCopyErrorPB::Code app_error;
WARN_NOT_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error),
"Unable to end tablet copy session during service shutdown");
}
}
Status TabletCopyServiceImpl::FindSessionUnlocked(
const string& session_id,
TabletCopyErrorPB::Code* app_error,
scoped_refptr<TabletCopySourceSession>* session) const {
const SessionEntry* session_entry = FindOrNull(sessions_, session_id);
if (!session_entry) {
*app_error = TabletCopyErrorPB::NO_SESSION;
return Status::NotFound(
Substitute("Tablet Copy session with Session ID \"$0\" not found", session_id));
}
*session = session_entry->session;
return Status::OK();
}
Status TabletCopyServiceImpl::ValidateFetchRequestDataId(
const DataIdPB& data_id,
TabletCopyErrorPB::Code* app_error,
const scoped_refptr<TabletCopySourceSession>& session) const {
if (PREDICT_FALSE(data_id.has_block_id() && data_id.has_wal_segment_seqno())) {
*app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
return Status::InvalidArgument(
Substitute("Only one of BlockId or segment sequence number are required, "
"but both were specified. DataTypeID: $0", SecureShortDebugString(data_id)));
} else if (PREDICT_FALSE(!data_id.has_block_id() && !data_id.has_wal_segment_seqno())) {
*app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
return Status::InvalidArgument(
Substitute("Only one of BlockId or segment sequence number are required, "
"but neither were specified. DataTypeID: $0", SecureShortDebugString(data_id)));
}
if (data_id.type() == DataIdPB::BLOCK) {
if (PREDICT_FALSE(!data_id.has_block_id())) {
return Status::InvalidArgument("block_id must be specified for type == BLOCK",
SecureShortDebugString(data_id));
}
} else {
if (PREDICT_FALSE(!data_id.wal_segment_seqno())) {
return Status::InvalidArgument(
"segment sequence number must be specified for type == LOG_SEGMENT",
SecureShortDebugString(data_id));
}
}
return Status::OK();
}
void TabletCopyServiceImpl::ResetSessionExpirationUnlocked(const std::string& session_id) {
SessionEntry* session_entry = FindOrNull(sessions_, session_id);
if (!session_entry) return;
session_entry->last_accessed_time = MonoTime::Now();
}
Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
const std::string& session_id,
TabletCopyErrorPB::Code* app_error) {
sessions_lock_.AssertAcquired();
scoped_refptr<TabletCopySourceSession> session;
RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session));
// Remove the session from the map.
// It will get destroyed once there are no outstanding refs.
LOG_WITH_PREFIX(INFO) << "ending tablet copy session " << session_id << " on tablet "
<< session->tablet_id() << " with peer " << session->requestor_uuid();
CHECK_EQ(1, sessions_.erase(session_id));
return Status::OK();
}
void TabletCopyServiceImpl::EndExpiredSessions() {
do {
MutexLock l(sessions_lock_);
const MonoTime now = MonoTime::Now();
vector<SessionEntry> expired_session_entries;
for (const auto& entry : sessions_) {
const MonoTime& expiration = entry.second.last_accessed_time + entry.second.expire_timeout;
if (expiration < now) {
expired_session_entries.push_back(entry.second);
}
}
for (const auto& session_entry : expired_session_entries) {
auto idle_time = MonoTime::Now() - session_entry.last_accessed_time;
auto& session = session_entry.session;
LOG_WITH_PREFIX(INFO) << "tablet copy session " << session->session_id()
<< " on tablet " << session->tablet_id()
<< " with peer " << session->requestor_uuid()
<< " has expired after " << idle_time.ToString()
<< " of idle time";
TabletCopyErrorPB::Code app_error;
CHECK_OK(DoEndTabletCopySessionUnlocked(session->session_id(), &app_error));
}
} while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(
FLAGS_tablet_copy_timeout_poll_period_ms)));
}
string TabletCopyServiceImpl::LogPrefix() const {
// We use a truncated form of the "T xxxx P yyyy" prefix here, with only the
// "P" part, because we don't want it to appear that tablet 'foo' is running
// when logging error messages like "Can't find tablet 'foo'".
return Substitute("P $0: ", fs_manager_->uuid());
}
void TabletCopyServiceImpl::SetupErrorAndRespond(
rpc::RpcContext* context,
TabletCopyErrorPB::Code code,
const string& message,
const Status& s) {
LOG_WITH_PREFIX(WARNING) << "Error handling TabletCopyService RPC request from "
<< context->requestor_string()
<< ": " << message << ": " << s.ToString();
TabletCopyErrorPB error;
StatusToPB(s, error.mutable_status());
error.set_code(code);
context->RespondApplicationError(TabletCopyErrorPB::tablet_copy_error_ext.number(),
message, error);
}
} // namespace tserver
} // namespace kudu