blob: 1d7104cbe4e8c710f10843a81b3d138e2709156f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/remote_bootstrap_service.h"
#include <algorithm>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <string>
#include <vector>
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/log.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/map-util.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/tserver/remote_bootstrap_session.h"
#include "kudu/tserver/tablet_peer_lookup.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/util/crc.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
// Note, this macro assumes the existence of a local var named 'context'.
#define RPC_RETURN_APP_ERROR(app_err, message, s) \
do { \
SetupErrorAndRespond(context, app_err, message, s); \
return; \
} while (false)
#define RPC_RETURN_NOT_OK(expr, app_err, message) \
do { \
Status s = (expr); \
if (!s.ok()) { \
RPC_RETURN_APP_ERROR(app_err, message, s); \
} \
} while (false)
DEFINE_uint64(tablet_copy_idle_timeout_ms, 180000,
"Amount of time without activity before a tablet copy "
"session will expire, in millis");
TAG_FLAG(tablet_copy_idle_timeout_ms, hidden);
DEFINE_uint64(tablet_copy_timeout_poll_period_ms, 10000,
"How often the tablet_copy service polls for expired "
"tablet copy sessions, in millis");
TAG_FLAG(tablet_copy_timeout_poll_period_ms, hidden);
DEFINE_double(fault_crash_on_handle_tc_fetch_data, 0.0,
"Fraction of the time when the tablet will crash while "
"servicing a TabletCopyService FetchData() RPC call. "
"(For testing only!)");
TAG_FLAG(fault_crash_on_handle_tc_fetch_data, unsafe);
namespace kudu {
namespace tserver {
using crc::Crc32c;
using strings::Substitute;
using tablet::TabletPeer;
static void SetupErrorAndRespond(rpc::RpcContext* context,
TabletCopyErrorPB::Code code,
const string& message,
const Status& s) {
LOG(WARNING) << "Error handling TabletCopyService RPC request from "
<< context->requestor_string() << ": "
<< s.ToString();
TabletCopyErrorPB error;
StatusToPB(s, error.mutable_status());
error.set_code(code);
context->RespondApplicationError(TabletCopyErrorPB::tablet_copy_error_ext.number(),
message, error);
}
TabletCopyServiceImpl::TabletCopyServiceImpl(
FsManager* fs_manager,
TabletPeerLookupIf* tablet_peer_lookup,
const scoped_refptr<MetricEntity>& metric_entity,
const scoped_refptr<rpc::ResultTracker>& result_tracker)
: TabletCopyServiceIf(metric_entity, result_tracker),
fs_manager_(CHECK_NOTNULL(fs_manager)),
tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)),
shutdown_latch_(1) {
CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp",
&TabletCopyServiceImpl::EndExpiredSessions, this,
&session_expiration_thread_));
}
void TabletCopyServiceImpl::BeginTabletCopySession(
const BeginTabletCopySessionRequestPB* req,
BeginTabletCopySessionResponsePB* resp,
rpc::RpcContext* context) {
const string& requestor_uuid = req->requestor_uuid();
const string& tablet_id = req->tablet_id();
// For now, we use the requestor_uuid with the tablet id as the session id,
// but there is no guarantee this will not change in the future.
const string session_id = Substitute("$0-$1", requestor_uuid, tablet_id);
scoped_refptr<TabletPeer> tablet_peer;
RPC_RETURN_NOT_OK(tablet_peer_lookup_->GetTabletPeer(tablet_id, &tablet_peer),
TabletCopyErrorPB::TABLET_NOT_FOUND,
Substitute("Unable to find specified tablet: $0", tablet_id));
scoped_refptr<TabletCopySession> session;
{
MutexLock l(sessions_lock_);
if (!FindCopy(sessions_, session_id, &session)) {
LOG(INFO) << Substitute(
"Beginning new tablet copy session on tablet $0 from peer $1"
" at $2: session id = $3",
tablet_id, requestor_uuid, context->requestor_string(), session_id);
session.reset(new TabletCopySession(tablet_peer, session_id,
requestor_uuid, fs_manager_));
RPC_RETURN_NOT_OK(session->Init(),
TabletCopyErrorPB::UNKNOWN_ERROR,
Substitute("Error initializing tablet copy session for tablet $0",
tablet_id));
InsertOrDie(&sessions_, session_id, session);
} else {
LOG(INFO) << Substitute(
"Re-sending initialization info for existing tablet copy session on tablet $0"
" from peer $1 at $2: session_id = $3",
tablet_id, requestor_uuid, context->requestor_string(), session_id);
}
ResetSessionExpirationUnlocked(session_id);
}
resp->set_responder_uuid(fs_manager_->uuid());
resp->set_session_id(session_id);
resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_ms);
resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
resp->mutable_initial_committed_cstate()->CopyFrom(session->initial_committed_cstate());
for (const scoped_refptr<log::ReadableLogSegment>& segment : session->log_segments()) {
resp->add_wal_segment_seqnos(segment->header().sequence_number());
}
context->RespondSuccess();
}
void TabletCopyServiceImpl::CheckSessionActive(
const CheckTabletCopySessionActiveRequestPB* req,
CheckTabletCopySessionActiveResponsePB* resp,
rpc::RpcContext* context) {
const string& session_id = req->session_id();
// Look up and validate tablet copy session.
scoped_refptr<TabletCopySession> session;
MutexLock l(sessions_lock_);
TabletCopyErrorPB::Code app_error;
Status status = FindSessionUnlocked(session_id, &app_error, &session);
if (status.ok()) {
if (req->keepalive()) {
ResetSessionExpirationUnlocked(session_id);
}
resp->set_session_is_active(true);
context->RespondSuccess();
return;
} else if (app_error == TabletCopyErrorPB::NO_SESSION) {
resp->set_session_is_active(false);
context->RespondSuccess();
return;
} else {
RPC_RETURN_NOT_OK(status, app_error,
Substitute("Error trying to check whether session $0 is active", session_id));
}
}
void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
FetchDataResponsePB* resp,
rpc::RpcContext* context) {
const string& session_id = req->session_id();
// Look up and validate tablet copy session.
scoped_refptr<TabletCopySession> session;
{
MutexLock l(sessions_lock_);
TabletCopyErrorPB::Code app_error;
RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session),
app_error, "No such session");
ResetSessionExpirationUnlocked(session_id);
}
MAYBE_FAULT(FLAGS_fault_crash_on_handle_tc_fetch_data);
uint64_t offset = req->offset();
int64_t client_maxlen = req->max_length();
const DataIdPB& data_id = req->data_id();
TabletCopyErrorPB::Code error_code = TabletCopyErrorPB::UNKNOWN_ERROR;
RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &error_code, session),
error_code, "Invalid DataId");
DataChunkPB* data_chunk = resp->mutable_chunk();
string* data = data_chunk->mutable_data();
int64_t total_data_length = 0;
if (data_id.type() == DataIdPB::BLOCK) {
// Fetching a data block chunk.
const BlockId& block_id = BlockId::FromPB(data_id.block_id());
RPC_RETURN_NOT_OK(session->GetBlockPiece(block_id, offset, client_maxlen,
data, &total_data_length, &error_code),
error_code, "Unable to get piece of data block");
} else {
// Fetching a log segment chunk.
uint64_t segment_seqno = data_id.wal_segment_seqno();
RPC_RETURN_NOT_OK(session->GetLogSegmentPiece(segment_seqno, offset, client_maxlen,
data, &total_data_length, &error_code),
error_code, "Unable to get piece of log segment");
}
data_chunk->set_total_data_length(total_data_length);
data_chunk->set_offset(offset);
// Calculate checksum.
uint32_t crc32 = Crc32c(data->data(), data->length());
data_chunk->set_crc32(crc32);
context->RespondSuccess();
}
void TabletCopyServiceImpl::EndTabletCopySession(
const EndTabletCopySessionRequestPB* req,
EndTabletCopySessionResponsePB* resp,
rpc::RpcContext* context) {
{
MutexLock l(sessions_lock_);
TabletCopyErrorPB::Code app_error;
LOG(INFO) << "Request end of tablet copy session " << req->session_id()
<< " received from " << context->requestor_string();
RPC_RETURN_NOT_OK(DoEndTabletCopySessionUnlocked(req->session_id(), &app_error),
app_error, "No such session");
}
context->RespondSuccess();
}
void TabletCopyServiceImpl::Shutdown() {
shutdown_latch_.CountDown();
session_expiration_thread_->Join();
// Destroy all tablet copy sessions.
vector<string> session_ids;
for (const MonoTimeMap::value_type& entry : session_expirations_) {
session_ids.push_back(entry.first);
}
for (const string& session_id : session_ids) {
LOG(INFO) << "Destroying tablet copy session " << session_id << " due to service shutdown";
TabletCopyErrorPB::Code app_error;
CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
}
}
Status TabletCopyServiceImpl::FindSessionUnlocked(
const string& session_id,
TabletCopyErrorPB::Code* app_error,
scoped_refptr<TabletCopySession>* session) const {
if (!FindCopy(sessions_, session_id, session)) {
*app_error = TabletCopyErrorPB::NO_SESSION;
return Status::NotFound(
Substitute("Tablet Copy session with Session ID \"$0\" not found", session_id));
}
return Status::OK();
}
Status TabletCopyServiceImpl::ValidateFetchRequestDataId(
const DataIdPB& data_id,
TabletCopyErrorPB::Code* app_error,
const scoped_refptr<TabletCopySession>& session) const {
if (PREDICT_FALSE(data_id.has_block_id() && data_id.has_wal_segment_seqno())) {
*app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
return Status::InvalidArgument(
Substitute("Only one of BlockId or segment sequence number are required, "
"but both were specified. DataTypeID: $0", data_id.ShortDebugString()));
} else if (PREDICT_FALSE(!data_id.has_block_id() && !data_id.has_wal_segment_seqno())) {
*app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
return Status::InvalidArgument(
Substitute("Only one of BlockId or segment sequence number are required, "
"but neither were specified. DataTypeID: $0", data_id.ShortDebugString()));
}
if (data_id.type() == DataIdPB::BLOCK) {
if (PREDICT_FALSE(!data_id.has_block_id())) {
return Status::InvalidArgument("block_id must be specified for type == BLOCK",
data_id.ShortDebugString());
}
} else {
if (PREDICT_FALSE(!data_id.wal_segment_seqno())) {
return Status::InvalidArgument(
"segment sequence number must be specified for type == LOG_SEGMENT",
data_id.ShortDebugString());
}
}
return Status::OK();
}
void TabletCopyServiceImpl::ResetSessionExpirationUnlocked(const std::string& session_id) {
MonoTime expiration(MonoTime::Now(MonoTime::FINE));
expiration.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_idle_timeout_ms));
InsertOrUpdate(&session_expirations_, session_id, expiration);
}
Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
const std::string& session_id,
TabletCopyErrorPB::Code* app_error) {
scoped_refptr<TabletCopySession> session;
RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session));
// Remove the session from the map.
// It will get destroyed once there are no outstanding refs.
LOG(INFO) << "Ending tablet copy session " << session_id << " on tablet "
<< session->tablet_id() << " with peer " << session->requestor_uuid();
CHECK_EQ(1, sessions_.erase(session_id));
CHECK_EQ(1, session_expirations_.erase(session_id));
return Status::OK();
}
void TabletCopyServiceImpl::EndExpiredSessions() {
do {
MutexLock l(sessions_lock_);
MonoTime now = MonoTime::Now(MonoTime::FINE);
vector<string> expired_session_ids;
for (const MonoTimeMap::value_type& entry : session_expirations_) {
const string& session_id = entry.first;
const MonoTime& expiration = entry.second;
if (expiration.ComesBefore(now)) {
expired_session_ids.push_back(session_id);
}
}
for (const string& session_id : expired_session_ids) {
LOG(INFO) << "Tablet Copy session " << session_id
<< " has expired. Terminating session.";
TabletCopyErrorPB::Code app_error;
CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
}
} while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(
FLAGS_tablet_copy_timeout_poll_period_ms)));
}
} // namespace tserver
} // namespace kudu