// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/remote_bootstrap_service.h"

#include <algorithm>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <string>
#include <vector>

#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/log.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/map-util.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/tserver/remote_bootstrap_session.h"
#include "kudu/tserver/tablet_peer_lookup.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/util/crc.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"

// Note, this macro assumes the existence of a local var named 'context'.
#define RPC_RETURN_APP_ERROR(app_err, message, s) \
  do { \
    SetupErrorAndRespond(context, app_err, message, s); \
    return; \
  } while (false)

#define RPC_RETURN_NOT_OK(expr, app_err, message) \
  do { \
    Status s = (expr); \
    if (!s.ok()) { \
      RPC_RETURN_APP_ERROR(app_err, message, s); \
    } \
  } while (false)

DEFINE_uint64(tablet_copy_idle_timeout_ms, 180000,
              "Amount of time without activity before a tablet copy "
              "session will expire, in millis");
TAG_FLAG(tablet_copy_idle_timeout_ms, hidden);

DEFINE_uint64(tablet_copy_timeout_poll_period_ms, 10000,
              "How often the tablet_copy service polls for expired "
              "tablet copy sessions, in millis");
TAG_FLAG(tablet_copy_timeout_poll_period_ms, hidden);

DEFINE_double(fault_crash_on_handle_rb_fetch_data, 0.0,
              "Fraction of the time when the tablet will crash while "
              "servicing a TabletCopyService FetchData() RPC call. "
              "(For testing only!)");
TAG_FLAG(fault_crash_on_handle_rb_fetch_data, unsafe);

namespace kudu {
namespace tserver {

using crc::Crc32c;
using strings::Substitute;
using tablet::TabletPeer;

static void SetupErrorAndRespond(rpc::RpcContext* context,
                                 TabletCopyErrorPB::Code code,
                                 const string& message,
                                 const Status& s) {
  LOG(WARNING) << "Error handling TabletCopyService RPC request from "
               << context->requestor_string() << ": "
               << s.ToString();
  TabletCopyErrorPB error;
  StatusToPB(s, error.mutable_status());
  error.set_code(code);
  context->RespondApplicationError(TabletCopyErrorPB::tablet_copy_error_ext.number(),
                                   message, error);
}

TabletCopyServiceImpl::TabletCopyServiceImpl(
    FsManager* fs_manager,
    TabletPeerLookupIf* tablet_peer_lookup,
    const scoped_refptr<MetricEntity>& metric_entity,
    const scoped_refptr<rpc::ResultTracker>& result_tracker)
    : TabletCopyServiceIf(metric_entity, result_tracker),
      fs_manager_(CHECK_NOTNULL(fs_manager)),
      tablet_peer_lookup_(CHECK_NOTNULL(tablet_peer_lookup)),
      shutdown_latch_(1) {
  CHECK_OK(Thread::Create("remote-bootstrap", "rb-session-exp",
                          &TabletCopyServiceImpl::EndExpiredSessions, this,
                          &session_expiration_thread_));
}

void TabletCopyServiceImpl::BeginTabletCopySession(
        const BeginTabletCopySessionRequestPB* req,
        BeginTabletCopySessionResponsePB* resp,
        rpc::RpcContext* context) {
  const string& requestor_uuid = req->requestor_uuid();
  const string& tablet_id = req->tablet_id();

  // For now, we use the requestor_uuid with the tablet id as the session id,
  // but there is no guarantee this will not change in the future.
  const string session_id = Substitute("$0-$1", requestor_uuid, tablet_id);

  scoped_refptr<TabletPeer> tablet_peer;
  RPC_RETURN_NOT_OK(tablet_peer_lookup_->GetTabletPeer(tablet_id, &tablet_peer),
                    TabletCopyErrorPB::TABLET_NOT_FOUND,
                    Substitute("Unable to find specified tablet: $0", tablet_id));

  scoped_refptr<TabletCopySession> session;
  {
    MutexLock l(sessions_lock_);
    if (!FindCopy(sessions_, session_id, &session)) {
      LOG(INFO) << Substitute(
          "Beginning new tablet copy session on tablet $0 from peer $1"
          " at $2: session id = $3",
          tablet_id, requestor_uuid, context->requestor_string(), session_id);
      session.reset(new TabletCopySession(tablet_peer, session_id,
                                               requestor_uuid, fs_manager_));
      RPC_RETURN_NOT_OK(session->Init(),
                        TabletCopyErrorPB::UNKNOWN_ERROR,
                        Substitute("Error initializing tablet copy session for tablet $0",
                                   tablet_id));
      InsertOrDie(&sessions_, session_id, session);
    } else {
      LOG(INFO) << Substitute(
          "Re-sending initialization info for existing tablet copy session on tablet $0"
          " from peer $1 at $2: session_id = $3",
          tablet_id, requestor_uuid, context->requestor_string(), session_id);
    }
    ResetSessionExpirationUnlocked(session_id);
  }

  resp->set_responder_uuid(fs_manager_->uuid());
  resp->set_session_id(session_id);
  resp->set_session_idle_timeout_millis(FLAGS_tablet_copy_idle_timeout_ms);
  resp->mutable_superblock()->CopyFrom(session->tablet_superblock());
  resp->mutable_initial_committed_cstate()->CopyFrom(session->initial_committed_cstate());

  for (const scoped_refptr<log::ReadableLogSegment>& segment : session->log_segments()) {
    resp->add_wal_segment_seqnos(segment->header().sequence_number());
  }

  context->RespondSuccess();
}

void TabletCopyServiceImpl::CheckSessionActive(
        const CheckTabletCopySessionActiveRequestPB* req,
        CheckTabletCopySessionActiveResponsePB* resp,
        rpc::RpcContext* context) {
  const string& session_id = req->session_id();

  // Look up and validate tablet copy session.
  scoped_refptr<TabletCopySession> session;
  MutexLock l(sessions_lock_);
  TabletCopyErrorPB::Code app_error;
  Status status = FindSessionUnlocked(session_id, &app_error, &session);
  if (status.ok()) {
    if (req->keepalive()) {
      ResetSessionExpirationUnlocked(session_id);
    }
    resp->set_session_is_active(true);
    context->RespondSuccess();
    return;
  } else if (app_error == TabletCopyErrorPB::NO_SESSION) {
    resp->set_session_is_active(false);
    context->RespondSuccess();
    return;
  } else {
    RPC_RETURN_NOT_OK(status, app_error,
                      Substitute("Error trying to check whether session $0 is active", session_id));
  }
}

void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req,
                                           FetchDataResponsePB* resp,
                                           rpc::RpcContext* context) {
  const string& session_id = req->session_id();

  // Look up and validate tablet copy session.
  scoped_refptr<TabletCopySession> session;
  {
    MutexLock l(sessions_lock_);
    TabletCopyErrorPB::Code app_error;
    RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session),
                      app_error, "No such session");
    ResetSessionExpirationUnlocked(session_id);
  }

  MAYBE_FAULT(FLAGS_fault_crash_on_handle_rb_fetch_data);

  uint64_t offset = req->offset();
  int64_t client_maxlen = req->max_length();

  const DataIdPB& data_id = req->data_id();
  TabletCopyErrorPB::Code error_code = TabletCopyErrorPB::UNKNOWN_ERROR;
  RPC_RETURN_NOT_OK(ValidateFetchRequestDataId(data_id, &error_code, session),
                    error_code, "Invalid DataId");

  DataChunkPB* data_chunk = resp->mutable_chunk();
  string* data = data_chunk->mutable_data();
  int64_t total_data_length = 0;
  if (data_id.type() == DataIdPB::BLOCK) {
    // Fetching a data block chunk.
    const BlockId& block_id = BlockId::FromPB(data_id.block_id());
    RPC_RETURN_NOT_OK(session->GetBlockPiece(block_id, offset, client_maxlen,
                                             data, &total_data_length, &error_code),
                      error_code, "Unable to get piece of data block");
  } else {
    // Fetching a log segment chunk.
    uint64_t segment_seqno = data_id.wal_segment_seqno();
    RPC_RETURN_NOT_OK(session->GetLogSegmentPiece(segment_seqno, offset, client_maxlen,
                                                  data, &total_data_length, &error_code),
                      error_code, "Unable to get piece of log segment");
  }

  data_chunk->set_total_data_length(total_data_length);
  data_chunk->set_offset(offset);

  // Calculate checksum.
  uint32_t crc32 = Crc32c(data->data(), data->length());
  data_chunk->set_crc32(crc32);

  context->RespondSuccess();
}

void TabletCopyServiceImpl::EndTabletCopySession(
        const EndTabletCopySessionRequestPB* req,
        EndTabletCopySessionResponsePB* resp,
        rpc::RpcContext* context) {
  {
    MutexLock l(sessions_lock_);
    TabletCopyErrorPB::Code app_error;
    LOG(INFO) << "Request end of tablet copy session " << req->session_id()
      << " received from " << context->requestor_string();
    RPC_RETURN_NOT_OK(DoEndTabletCopySessionUnlocked(req->session_id(), &app_error),
                      app_error, "No such session");
  }
  context->RespondSuccess();
}

void TabletCopyServiceImpl::Shutdown() {
  shutdown_latch_.CountDown();
  session_expiration_thread_->Join();

  // Destroy all tablet copy sessions.
  vector<string> session_ids;
  for (const MonoTimeMap::value_type& entry : session_expirations_) {
    session_ids.push_back(entry.first);
  }
  for (const string& session_id : session_ids) {
    LOG(INFO) << "Destroying tablet copy session " << session_id << " due to service shutdown";
    TabletCopyErrorPB::Code app_error;
    CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
  }
}

Status TabletCopyServiceImpl::FindSessionUnlocked(
        const string& session_id,
        TabletCopyErrorPB::Code* app_error,
        scoped_refptr<TabletCopySession>* session) const {
  if (!FindCopy(sessions_, session_id, session)) {
    *app_error = TabletCopyErrorPB::NO_SESSION;
    return Status::NotFound(
        Substitute("Tablet Copy session with Session ID \"$0\" not found", session_id));
  }
  return Status::OK();
}

Status TabletCopyServiceImpl::ValidateFetchRequestDataId(
        const DataIdPB& data_id,
        TabletCopyErrorPB::Code* app_error,
        const scoped_refptr<TabletCopySession>& session) const {
  if (PREDICT_FALSE(data_id.has_block_id() && data_id.has_wal_segment_seqno())) {
    *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
    return Status::InvalidArgument(
        Substitute("Only one of BlockId or segment sequence number are required, "
            "but both were specified. DataTypeID: $0", data_id.ShortDebugString()));
  } else if (PREDICT_FALSE(!data_id.has_block_id() && !data_id.has_wal_segment_seqno())) {
    *app_error = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
    return Status::InvalidArgument(
        Substitute("Only one of BlockId or segment sequence number are required, "
            "but neither were specified. DataTypeID: $0", data_id.ShortDebugString()));
  }

  if (data_id.type() == DataIdPB::BLOCK) {
    if (PREDICT_FALSE(!data_id.has_block_id())) {
      return Status::InvalidArgument("block_id must be specified for type == BLOCK",
                                     data_id.ShortDebugString());
    }
  } else {
    if (PREDICT_FALSE(!data_id.wal_segment_seqno())) {
      return Status::InvalidArgument(
          "segment sequence number must be specified for type == LOG_SEGMENT",
          data_id.ShortDebugString());
    }
  }

  return Status::OK();
}

void TabletCopyServiceImpl::ResetSessionExpirationUnlocked(const std::string& session_id) {
  MonoTime expiration(MonoTime::Now(MonoTime::FINE));
  expiration.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_idle_timeout_ms));
  InsertOrUpdate(&session_expirations_, session_id, expiration);
}

Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(
        const std::string& session_id,
        TabletCopyErrorPB::Code* app_error) {
  scoped_refptr<TabletCopySession> session;
  RETURN_NOT_OK(FindSessionUnlocked(session_id, app_error, &session));
  // Remove the session from the map.
  // It will get destroyed once there are no outstanding refs.
  LOG(INFO) << "Ending tablet copy session " << session_id << " on tablet "
    << session->tablet_id() << " with peer " << session->requestor_uuid();
  CHECK_EQ(1, sessions_.erase(session_id));
  CHECK_EQ(1, session_expirations_.erase(session_id));

  return Status::OK();
}

void TabletCopyServiceImpl::EndExpiredSessions() {
  do {
    MutexLock l(sessions_lock_);
    MonoTime now = MonoTime::Now(MonoTime::FINE);

    vector<string> expired_session_ids;
    for (const MonoTimeMap::value_type& entry : session_expirations_) {
      const string& session_id = entry.first;
      const MonoTime& expiration = entry.second;
      if (expiration.ComesBefore(now)) {
        expired_session_ids.push_back(session_id);
      }
    }
    for (const string& session_id : expired_session_ids) {
      LOG(INFO) << "Tablet Copy session " << session_id
                << " has expired. Terminating session.";
      TabletCopyErrorPB::Code app_error;
      CHECK_OK(DoEndTabletCopySessionUnlocked(session_id, &app_error));
    }
  } while (!shutdown_latch_.WaitFor(MonoDelta::FromMilliseconds(
                                    FLAGS_tablet_copy_timeout_poll_period_ms)));
}

} // namespace tserver
} // namespace kudu
