| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #include "kudu/tserver/tablet_copy_service.h" |
| |
| #include <cstdint> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/log.pb.h" |
| #include "kudu/consensus/log_util.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/fs/block_id.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/rpc_context.h" |
| #include "kudu/server/server_base.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/tablet_replica_lookup.h" |
| #include "kudu/util/crc.h" |
| #include "kudu/util/fault_injection.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/random_util.h" |
| |
| #define RPC_RETURN_NOT_OK(expr, app_err, message, context) \ |
| do { \ |
| const Status& s = (expr); \ |
| if (PREDICT_FALSE(!s.ok())) { \ |
| SetupErrorAndRespond(context, app_err, message, s); \ |
| return; \ |
| } \ |
| } while (false) |
| |
| DEFINE_uint64(tablet_copy_idle_timeout_sec, 600, |
| "Amount of time without activity before a tablet copy " |
| "session will expire, in seconds"); |
| TAG_FLAG(tablet_copy_idle_timeout_sec, advanced); |
| TAG_FLAG(tablet_copy_idle_timeout_sec, evolving); |
| |
| DEFINE_uint64(tablet_copy_timeout_poll_period_ms, 10000, |
| "How often the tablet_copy service polls for expired " |
| "tablet copy sessions, in millis"); |
| TAG_FLAG(tablet_copy_timeout_poll_period_ms, hidden); |
| |
| DEFINE_double(fault_crash_on_handle_tc_fetch_data, 0.0, |
| "Fraction of the time when the tablet will crash while " |
| "servicing a TabletCopyService FetchData() RPC call. " |
| "(For testing only!)"); |
| TAG_FLAG(fault_crash_on_handle_tc_fetch_data, unsafe); |
| |
| DEFINE_double(tablet_copy_early_session_timeout_prob, 0, |
| "The probability that a tablet copy session will time out early, " |
| "resulting in tablet copy failure. (For testing only!)"); |
| TAG_FLAG(tablet_copy_early_session_timeout_prob, runtime); |
| TAG_FLAG(tablet_copy_early_session_timeout_prob, unsafe); |
| |
| using std::string; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace google { |
| namespace protobuf { |
| class Message; |
| } |
| } |
| |
| namespace kudu { |
| |
| using crc::Crc32c; |
| using server::ServerBase; |
| using pb_util::SecureShortDebugString; |
| using tablet::TabletReplica; |
| |
| namespace tserver { |
| |
| TabletCopyServiceImpl::TabletCopyServiceImpl( |
| ServerBase* server, |
| TabletReplicaLookupIf* tablet_replica_lookup) |
| : TabletCopyServiceIf(server->metric_entity(), server->result_tracker()), |
| server_(server), |
| fs_manager_(CHECK_NOTNULL(server->fs_manager())), |
| tablet_replica_lookup_(CHECK_NOTNULL(tablet_replica_lookup)), |
| rand_(GetRandomSeed32()), |
| shutdown_latch_(1), |
| tablet_copy_metrics_(server->metric_entity()) { |
| CHECK_OK(Thread::Create("tablet-copy", "tc-session-exp", |
| &TabletCopyServiceImpl::EndExpiredSessions, this, |
| &session_expiration_thread_)); |
| } |
| |
| TabletCopyServiceImpl::SessionEntry::SessionEntry(scoped_refptr<TabletCopySourceSession> session_in) |
| : session(std::move(session_in)), |
| last_accessed_time(MonoTime::Now()), |
| expire_timeout(MonoDelta::FromSeconds(FLAGS_tablet_copy_idle_timeout_sec)) { |
| } |
| |
| bool TabletCopyServiceImpl::AuthorizeServiceUser(const google::protobuf::Message* /*req*/, |
| google::protobuf::Message* /*resp*/, |
| rpc::RpcContext* rpc) { |
| return server_->Authorize(rpc, ServerBase::SUPER_USER | ServerBase::SERVICE_USER); |
| } |
| |
| void TabletCopyServiceImpl::BeginTabletCopySession( |
| const BeginTabletCopySessionRequestPB* req, |
| BeginTabletCopySessionResponsePB* resp, |
| rpc::RpcContext* context) { |
| const string& requestor_uuid = req->requestor_uuid(); |
| const string& tablet_id = req->tablet_id(); |
| |
| LOG_WITH_PREFIX(INFO) << Substitute( |
| "Received BeginTabletCopySession request for tablet $0 from peer $1 ($2)", |
| tablet_id, requestor_uuid, context->requestor_string()); |
| |
| // For now, we use the requestor_uuid with the tablet id as the session id, |
| // but there is no guarantee this will not change in the future. |
| const string session_id = Substitute("$0-$1", requestor_uuid, tablet_id); |
| |
| scoped_refptr<TabletReplica> tablet_replica; |
| RPC_RETURN_NOT_OK(tablet_replica_lookup_->GetTabletReplica(tablet_id, &tablet_replica), |
| TabletCopyErrorPB::TABLET_NOT_FOUND, |
| Substitute("Unable to find specified tablet: $0", tablet_id), |
| context); |
| |
| scoped_refptr<TabletCopySourceSession> session; |
| bool new_session; |
| { |
| MutexLock l(sessions_lock_); |
| const SessionEntry* session_entry = FindOrNull(sessions_, session_id); |
| new_session = session_entry == nullptr; |
| if (new_session) { |
| LOG_WITH_PREFIX(INFO) << Substitute( |
| "Beginning new tablet copy session on tablet $0 from peer $1" |
| " at $2: session id = $3", |
| tablet_id, requestor_uuid, context->requestor_string(), session_id); |
| session.reset(new TabletCopySourceSession(tablet_replica, session_id, |
| requestor_uuid, fs_manager_, |
| &tablet_copy_metrics_)); |
| InsertOrDie(&sessions_, session_id, SessionEntry(session)); |
| } else { |
| session = session_entry->session; |
| LOG_WITH_PREFIX(INFO) << Substitute( |
| "Re-sending initialization info for existing tablet copy session on tablet $0" |
| " from peer $1 at $2: session_id = $3", |
| tablet_id, requestor_uuid, context->requestor_string(), session_id); |
| } |
| ResetSessionExpirationUnlocked(session_id); |
| } |
| |
| if (!new_session && !session->IsInitialized()) { |
| RPC_RETURN_NOT_OK( |
| Status::ServiceUnavailable( |
| Substitute("tablet copy session for tablet $0 is initializing", tablet_id)), |
| TabletCopyErrorPB::UNKNOWN_ERROR, |
| "try again later", |
| context); |
| } |
| |
| // Ensure that the session initialization succeeds. If the initialization |
| // fails, then remove it from the sessions map. |
| Status status = session->Init(); |
| if (!status.ok()) { |
| MutexLock l(sessions_lock_); |
| SessionEntry* session_entry = FindOrNull(sessions_, session_id); |
| // Identity check the session to ensure that other interleaved threads have |
| // not already removed the failed session, and replaced it with a new one. |
| if (session_entry && session == session_entry->session) { |
| sessions_.erase(session_id); |
| } |
| } |
| RPC_RETURN_NOT_OK(status, |
| TabletCopyErrorPB::UNKNOWN_ERROR, |
| Substitute("Error beginning tablet copy session for tablet $0", tablet_id), |
| context); |
| |
| resp->set_responder_uuid(fs_manager_->uuid()); |
| resp->set_session_id(session_id); |
| resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_sec * 1000); |
| resp->mutable_superblock()->CopyFrom(session->tablet_superblock()); |
| resp->mutable_initial_cstate()->CopyFrom(session->initial_cstate()); |
| |
| for (const scoped_refptr<log::ReadableLogSegment>& segment : session->log_segments()) { |
| resp->add_wal_segment_seqnos(segment->header().sequence_number()); |
| } |
| |
| // For testing: Close the session prematurely if unsafe gflag is set but |
| // still respond as if it was opened. |
| const auto timeout_prob = FLAGS_tablet_copy_early_session_timeout_prob; |
| if (PREDICT_FALSE(timeout_prob > 0 && rand_.NextDoubleFraction() <= timeout_prob)) { |
| LOG_WITH_PREFIX(WARNING) << "Timing out tablet copy session due to flag " |
| << "--tablet_copy_early_session_timeout_prob " |
| << "being set to " << timeout_prob; |
| MutexLock l(sessions_lock_); |
| TabletCopyErrorPB::Code app_error; |
| WARN_NOT_OK(TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(session_id, &app_error), |
| Substitute("Unable to forcibly end tablet copy session $0", session_id)); |
| } |
| |
| context->RespondSuccess(); |
| } |
| |
| void TabletCopyServiceImpl::CheckSessionActive( |
| const CheckTabletCopySessionActiveRequestPB* req, |
| CheckTabletCopySessionActiveResponsePB* resp, |
| rpc::RpcContext* context) { |
| const string& session_id = req->session_id(); |
| |
| // Look up and validate tablet copy session. |
| scoped_refptr<TabletCopySourceSession> session; |
| MutexLock l(sessions_lock_); |
| TabletCopyErrorPB::Code app_error; |
| Status status = FindSessionUnlocked(session_id, &app_error, &session); |
| if (status.ok()) { |
| if (req->keepalive()) { |
| ResetSessionExpirationUnlocked(session_id); |
| } |
| resp->set_session_is_active(true); |
| context->RespondSuccess(); |
| return; |
| } else if (app_error == TabletCopyErrorPB::NO_SESSION) { |
| resp->set_session_is_active(false); |
| context->RespondSuccess(); |
| return; |
| } else { |
| RPC_RETURN_NOT_OK(status, app_error, |
| Substitute("Error trying to check whether session $0 is active", session_id), |
| context); |
| } |
| } |
| |
| void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req, |
| FetchDataResponsePB* resp, |
| rpc::RpcContext* context) { |
| const string& session_id = req->session_id(); |
| |
| // Look up and validate tablet copy session. |
| scoped_refptr<TabletCopySourceSession> session; |
| { |
| MutexLock l(sessions_lock_); |
| TabletCopyErrorPB::Code app_error; |
| RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session), |
| app_error, "No such session", context); |
| ResetSessionExpirationUnlocked(session_id); |
| } |
| |
| if (!session->IsInitialized()) { |
| RPC_RETURN_NOT_OK( |
| Status::ServiceUnavailable("tablet copy session for tablet $0 is initializing", |
| session->tablet_id()), |
| TabletCopyErrorPB::UNKNOWN_ERROR, |
| "try again later", |
| context); |
| } |
| |
| MAYBE_FAULT(FLAGS_fault_crash_on_handle_tc_fetch_data); |
| |
| uint64_t offset = req->offset(); |
| int64_t client_maxlen = req->max_length(); |
| |
| const DataIdPB& data_id = req->data_id(); |
| TabletCopyErrorPB::Code error_code = TabletCopyErrorPB::UNKNOWN_ERROR; |
| RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &error_code, session), |
| error_code, "Invalid DataId", context); |
| |
| DataChunkPB* data_chunk = resp->mutable_chunk(); |
| string* data = data_chunk->mutable_data(); |
| int64_t total_data_length = 0; |
| if (data_id.type() == DataIdPB::BLOCK) { |
| // Fetching a data block chunk. |
| const BlockId& block_id = BlockId::FromPB(data_id.block_id()); |
| RPC_RETURN_NOT_OK(session->GetBlockPiece(block_id, offset, client_maxlen, |
| data, &total_data_length, &error_code), |
| error_code, "Unable to get piece of data block", context); |
| } else { |
| // Fetching a log segment chunk. |
| uint64_t segment_seqno = data_id.wal_segment_seqno(); |
| RPC_RETURN_NOT_OK(session->GetLogSegmentPiece(segment_seqno, offset, client_maxlen, |
| data, &total_data_length, &error_code), |
| error_code, "Unable to get piece of log segment", context); |
| } |
| |
| data_chunk->set_total_data_length(total_data_length); |
| data_chunk->set_offset(offset); |
| |
| tablet_copy_metrics_.bytes_sent->IncrementBy(resp->chunk().data().size()); |
| |
| // Calculate checksum. |
| uint32_t crc32 = Crc32c(data->data(), data->length()); |
| data_chunk->set_crc32(crc32); |
| |
| context->RespondSuccess(); |
| } |
| |
| void TabletCopyServiceImpl::EndTabletCopySession( |
| const EndTabletCopySessionRequestPB* req, |
| EndTabletCopySessionResponsePB* /*resp*/, |
| rpc::RpcContext* context) { |
| { |
| MutexLock l(sessions_lock_); |
| TabletCopyErrorPB::Code app_error; |
| LOG_WITH_PREFIX(INFO) << "Request end of tablet copy session " << req->session_id() |
| << " received from " << context->requestor_string(); |
| RPC_RETURN_NOT_OK(DoEndTabletCopySessionUnlocked(req->session_id(), &app_error), |
| app_error, "No such session", context); |
| } |
| context->RespondSuccess(); |
| } |
| |
| void TabletCopyServiceImpl::Shutdown() { |
| shutdown_latch_.CountDown(); |
| session_expiration_thread_->Join(); |
| |
| // Destroy all tablet copy sessions. |
| MutexLock l(sessions_lock_); |
| auto iter = sessions_.cbegin(); |
| while (iter != sessions_.cend()) { |
| const string& session_id = iter->first; |
| // Increment the iterator before erasing the corresponding session from the |
| // map in DoEndTabletCopySessionUnlocked(). |
| ++iter; // Don't use until next iteration of the loop. |
| LOG_WITH_PREFIX(INFO) << "Destroying tablet copy session " << session_id |
| << " due to service shutdown"; |
| TabletCopyErrorPB::Code app_error; |
| WARN_NOT_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error), |
| "Unable to end tablet copy session during service shutdown"); |
| } |
| } |
| |
| Status TabletCopyServiceImpl::FindSessionUnlocked( |
| const string& session_id, |
| TabletCopyErrorPB::Code* app_error, |
| scoped_refptr<TabletCopySourceSession>* session) const { |
| const SessionEntry* session_entry = FindOrNull(sessions_, session_id); |
| if (!session_entry) { |
| *app_error = TabletCopyErrorPB::NO_SESSION; |
| return Status::NotFound( |
| Substitute("Tablet Copy session with Session ID \"$0\" not found", session_id)); |
| } |
| *session = session_entry->session; |
| return Status::OK(); |
| } |
| |
| Status TabletCopyServiceImpl::ValidateFetchRequestDataId( |
| const DataIdPB& data_id, |
| TabletCopyErrorPB::Code* app_error, |
| const scoped_refptr<TabletCopySourceSession>& session) const { |
| if (PREDICT_FALSE(data_id.has_block_id() && data_id.has_wal_segment_seqno())) { |
| *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST; |
| return Status::InvalidArgument( |
| Substitute("Only one of BlockId or segment sequence number are required, " |
| "but both were specified. DataTypeID: $0", SecureShortDebugString(data_id))); |
| } else if (PREDICT_FALSE(!data_id.has_block_id() && !data_id.has_wal_segment_seqno())) { |
| *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST; |
| return Status::InvalidArgument( |
| Substitute("Only one of BlockId or segment sequence number are required, " |
| "but neither were specified. DataTypeID: $0", SecureShortDebugString(data_id))); |
| } |
| |
| if (data_id.type() == DataIdPB::BLOCK) { |
| if (PREDICT_FALSE(!data_id.has_block_id())) { |
| return Status::InvalidArgument("block_id must be specified for type == BLOCK", |
| SecureShortDebugString(data_id)); |
| } |
| } else { |
| if (PREDICT_FALSE(!data_id.wal_segment_seqno())) { |
| return Status::InvalidArgument( |
| "segment sequence number must be specified for type == LOG_SEGMENT", |
| SecureShortDebugString(data_id)); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| void TabletCopyServiceImpl::ResetSessionExpirationUnlocked(const std::string& session_id) { |
| SessionEntry* session_entry = FindOrNull(sessions_, session_id); |
| if (!session_entry) return; |
| session_entry->last_accessed_time = MonoTime::Now(); |
| } |
| |
| Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked( |
| const std::string& session_id, |
| TabletCopyErrorPB::Code* app_error) { |
| sessions_lock_.AssertAcquired(); |
| scoped_refptr<TabletCopySourceSession> session; |
| RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session)); |
| // Remove the session from the map. |
| // It will get destroyed once there are no outstanding refs. |
| LOG_WITH_PREFIX(INFO) << "ending tablet copy session " << session_id << " on tablet " |
| << session->tablet_id() << " with peer " << session->requestor_uuid(); |
| CHECK_EQ(1, sessions_.erase(session_id)); |
| return Status::OK(); |
| } |
| |
| void TabletCopyServiceImpl::EndExpiredSessions() { |
| do { |
| MutexLock l(sessions_lock_); |
| const MonoTime now = MonoTime::Now(); |
| |
| vector<SessionEntry> expired_session_entries; |
| for (const auto& entry : sessions_) { |
| const MonoTime& expiration = entry.second.last_accessed_time + entry.second.expire_timeout; |
| if (expiration < now) { |
| expired_session_entries.push_back(entry.second); |
| } |
| } |
| for (const auto& session_entry : expired_session_entries) { |
| auto idle_time = MonoTime::Now() - session_entry.last_accessed_time; |
| auto& session = session_entry.session; |
| LOG_WITH_PREFIX(INFO) << "tablet copy session " << session->session_id() |
| << " on tablet " << session->tablet_id() |
| << " with peer " << session->requestor_uuid() |
| << " has expired after " << idle_time.ToString() |
| << " of idle time"; |
| TabletCopyErrorPB::Code app_error; |
| CHECK_OK(DoEndTabletCopySessionUnlocked(session->session_id(), &app_error)); |
| } |
| } while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds( |
| FLAGS_tablet_copy_timeout_poll_period_ms))); |
| } |
| |
| string TabletCopyServiceImpl::LogPrefix() const { |
| // We use a truncated form of the "T xxxx P yyyy" prefix here, with only the |
| // "P" part, because we don't want it to appear that tablet 'foo' is running |
| // when logging error messages like "Can't find tablet 'foo'". |
| return Substitute("P $0: ", fs_manager_->uuid()); |
| } |
| |
| void TabletCopyServiceImpl::SetupErrorAndRespond( |
| rpc::RpcContext* context, |
| TabletCopyErrorPB::Code code, |
| const string& message, |
| const Status& s) { |
| LOG_WITH_PREFIX(WARNING) << "Error handling TabletCopyService RPC request from " |
| << context->requestor_string() |
| << ": " << message << ": " << s.ToString(); |
| TabletCopyErrorPB error; |
| StatusToPB(s, error.mutable_status()); |
| error.set_code(code); |
| context->RespondApplicationError(TabletCopyErrorPB::tablet_copy_error_ext.number(), |
| message, error); |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |