// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/tserver/tablet_service.h"

#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstring>
#include <functional>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <type_traits>
#include <unordered_set>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/port.h>

#include "kudu/clock/clock.h"
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnar_serialization.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/iterator.h"
#include "kudu/common/iterator_stats.h"
#include "kudu/common/key_range.h"
#include "kudu/common/partition.h"
#include "kudu/common/row_operations.pb.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/replica_management.pb.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/remote_user.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/rpc/rpc_verification_util.h"
#include "kudu/security/token.pb.h"
#include "kudu/security/token_verifier.h"
#include "kudu/server/server_base.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/ops/alter_schema_op.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/participant_op.h"
#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_coordinator.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/transactions/txn_status_manager.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_replica_lookup.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_service.pb.h"
#include "kudu/util/crc.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/faststring.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/process_memory.h"
#include "kudu/util/random_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
#include "kudu/util/trace_metrics.h"

DEFINE_int32(scanner_default_batch_size_bytes, 1024 * 1024,
             "The default size for batches of scan results");
TAG_FLAG(scanner_default_batch_size_bytes, advanced);
TAG_FLAG(scanner_default_batch_size_bytes, runtime);

DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024,
             "The maximum batch size that a client may request for "
             "scan results.");
TAG_FLAG(scanner_max_batch_size_bytes, advanced);
TAG_FLAG(scanner_max_batch_size_bytes, runtime);

// The default value is sized to a power of 2 to improve BitmapCopy performance
// when copying a RowBlock (in ORDERED scans).
DEFINE_int32(scanner_batch_size_rows, 128,
             "The number of rows to batch for servicing scan requests.");
TAG_FLAG(scanner_batch_size_rows, advanced);
TAG_FLAG(scanner_batch_size_rows, runtime);

DEFINE_bool(scanner_allow_snapshot_scans_with_logical_timestamps, false,
            "If set, the server will support snapshot scans with logical timestamps.");
TAG_FLAG(scanner_allow_snapshot_scans_with_logical_timestamps, unsafe);

DEFINE_int32(scanner_max_wait_ms, 1000,
             "The maximum amount of time (in milliseconds) a scan "
             "at a snapshot is allowed to wait for safe time to advance "
             "or pending write operations to apply, even if the deadline "
             "of the scan request itself allows for waiting longer.");
TAG_FLAG(scanner_max_wait_ms, advanced);
TAG_FLAG(scanner_max_wait_ms, runtime);

// Fault injection flags.
DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0,
             "If set, the scanner will pause the specified number of milliesconds "
             "before reading each batch of data on the tablet server. "
             "Used for tests.");
TAG_FLAG(scanner_inject_latency_on_each_batch_ms, unsafe);

DEFINE_bool(scanner_inject_service_unavailable_on_continue_scan, false,
           "If set, the scanner will return a ServiceUnavailable Status on "
           "any Scan continuation RPC call. Used for tests.");
TAG_FLAG(scanner_inject_service_unavailable_on_continue_scan, unsafe);

DEFINE_bool(scanner_unregister_on_invalid_seq_id, true,
            "If set, an invalid sequence ID will cause a scanner to get unregistered. "
            "Used for tests.");
TAG_FLAG(scanner_unregister_on_invalid_seq_id, unsafe);


DEFINE_bool(tserver_enforce_access_control, false,
            "If set, the server will apply fine-grained access control rules "
            "to client RPCs.");
TAG_FLAG(tserver_enforce_access_control, runtime);

DEFINE_double(tserver_inject_invalid_authz_token_ratio, 0.0,
              "Fraction of the time that authz token validation will fail. Used for tests.");
TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden);

DEFINE_bool(tserver_txn_write_op_handling_enabled, true,
            "Whether to enable appropriate handling of write operations "
            "in the context of multi-row transactions");
TAG_FLAG(tserver_txn_write_op_handling_enabled, hidden);

DECLARE_bool(enable_txn_system_client_init);
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int32(memory_limit_warn_threshold_percentage);
DECLARE_int32(tablet_history_max_age_sec);
DECLARE_uint32(txn_keepalive_interval_ms);

METRIC_DEFINE_counter(
    server,
    op_apply_queue_overload_rejections,
    "Number of Rejected Write Requests Due to Queue Overloaded Error",
    kudu::MetricUnit::kRequests,
    "Number of rejected write requests due to overloaded op apply queue",
    kudu::MetricLevel::kWarn);

using google::protobuf::RepeatedPtrField;
using kudu::consensus::BulkChangeConfigRequestPB;
using kudu::consensus::ChangeConfigRequestPB;
using kudu::consensus::ChangeConfigResponsePB;
using kudu::consensus::ConsensusRequestPB;
using kudu::consensus::ConsensusResponsePB;
using kudu::consensus::GetLastOpIdRequestPB;
using kudu::consensus::GetNodeInstanceRequestPB;
using kudu::consensus::GetNodeInstanceResponsePB;
using kudu::consensus::LeaderStepDownMode;
using kudu::consensus::LeaderStepDownRequestPB;
using kudu::consensus::LeaderStepDownResponsePB;
using kudu::consensus::OpId;
using kudu::consensus::RaftConsensus;
using kudu::consensus::RaftPeerPB;
using kudu::consensus::RunLeaderElectionRequestPB;
using kudu::consensus::RunLeaderElectionResponsePB;
using kudu::consensus::StartTabletCopyRequestPB;
using kudu::consensus::StartTabletCopyResponsePB;
using kudu::consensus::TimeManager;
using kudu::consensus::UnsafeChangeConfigRequestPB;
using kudu::consensus::UnsafeChangeConfigResponsePB;
using kudu::consensus::VoteRequestPB;
using kudu::consensus::VoteResponsePB;
using kudu::fault_injection::MaybeTrue;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::ErrorStatusPB;
using kudu::rpc::ParseTokenVerificationResult;
using kudu::rpc::RpcContext;
using kudu::rpc::RpcSidecar;
using kudu::security::TokenPB;
using kudu::security::TokenVerifier;
using kudu::server::ServerBase;
using kudu::tablet::AlterSchemaOpState;
using kudu::tablet::MvccSnapshot;
using kudu::tablet::OpCompletionCallback;
using kudu::tablet::ParticipantOpState;
using kudu::tablet::TABLET_DATA_COPYING;
using kudu::tablet::TABLET_DATA_DELETED;
using kudu::tablet::TABLET_DATA_TOMBSTONED;
using kudu::tablet::Tablet;
using kudu::tablet::TabletReplica;
using kudu::tablet::TabletStatePB;
using kudu::tablet::TxnMetadataPB;;
using kudu::tablet::WriteAuthorizationContext;
using kudu::tablet::WriteOpState;
using kudu::tablet::WritePrivilegeType;
using kudu::tablet::WritePrivileges;
using std::make_optional;
using std::nullopt;
using std::optional;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;

namespace kudu {

namespace cfile {
extern const char* CFILE_CACHE_MISS_BYTES_METRIC_NAME;
extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME;
}

namespace tserver {

const char* SCANNER_BYTES_READ_METRIC_NAME = "scanner_bytes_read";

namespace {

// Lookup the given tablet, only ensuring that it exists.
// If it does not, responds to the RPC associated with 'context' after setting
// resp->mutable_error() to indicate the failure reason.
//
// Returns true if successful.
template<class RespClass>
bool LookupTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager,
                                  const string& tablet_id,
                                  RespClass* resp,
                                  RpcContext* context,
                                  scoped_refptr<TabletReplica>* replica) {
  Status s = tablet_manager->GetTabletReplica(tablet_id, replica);
  if (PREDICT_FALSE(!s.ok())) {
    if (s.IsServiceUnavailable()) {
      // If the tablet manager isn't initialized, the remote should check again
      // soon.
      SetupErrorAndRespond(resp->mutable_error(), s,
                           TabletServerErrorPB::UNKNOWN_ERROR, context);
    } else {
      SetupErrorAndRespond(resp->mutable_error(), s,
                           TabletServerErrorPB::TABLET_NOT_FOUND, context);
    }
    return false;
  }
  return true;
}

template<class RespClass>
void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica,
                             TabletStatePB tablet_state,
                             RespClass* resp,
                             RpcContext* context) {
  Status s = Status::IllegalState("Tablet not RUNNING",
                                  tablet::TabletStatePB_Name(tablet_state));
  auto error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
  if (replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_TOMBSTONED ||
      replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_DELETED) {
    // Treat tombstoned tablets as if they don't exist for most purposes.
    // This takes precedence over failed, since we don't reset the failed
    // status of a TabletReplica when deleting it. Only tablet copy does that.
    error_code = TabletServerErrorPB::TABLET_NOT_FOUND;
  } else if (tablet_state == tablet::FAILED) {
    s = s.CloneAndAppend(replica->error().ToString());
    error_code = TabletServerErrorPB::TABLET_FAILED;
  }
  SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
}

// Check if the replica is running.
template<class RespClass>
bool CheckTabletReplicaRunningOrRespond(const scoped_refptr<TabletReplica>& replica,
                                        RespClass* resp,
                                        RpcContext* context) {
  // Check RUNNING state.
  TabletStatePB state = replica->state();
  if (PREDICT_FALSE(state != tablet::RUNNING)) {
    RespondTabletNotRunning(replica, state, resp, context);
    return false;
  }
  return true;
}

// Lookup the given tablet, ensuring that it both exists and is RUNNING.
// If it is not, responds to the RPC associated with 'context' after setting
// resp->mutable_error() to indicate the failure reason.
//
// Returns true if successful.
template<class RespClass>
bool LookupRunningTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager,
                                         const string& tablet_id,
                                         RespClass* resp,
                                         RpcContext* context,
                                         scoped_refptr<TabletReplica>* replica) {
  if (!LookupTabletReplicaOrRespond(tablet_manager, tablet_id, resp, context, replica)) {
    return false;
  }
  if (!CheckTabletReplicaRunningOrRespond(*replica, resp, context)) {
    return false;
  }
  return true;
}

template<class ReqClass, class RespClass>
bool CheckUuidMatchOrRespond(TabletReplicaLookupIf* tablet_manager,
                             const char* method_name,
                             const ReqClass* req,
                             RespClass* resp,
                             RpcContext* context) {
  const string& local_uuid = tablet_manager->NodeInstance().permanent_uuid();
  if (PREDICT_FALSE(!req->has_dest_uuid())) {
    // Maintain compat in release mode, but complain.
    string msg = Substitute("$0: Missing destination UUID in request from $1: $2",
                            method_name, context->requestor_string(), SecureShortDebugString(*req));
#ifdef NDEBUG
    KLOG_EVERY_N(ERROR, 100) << msg;
#else
    LOG(DFATAL) << msg;
#endif
    return true;
  }
  if (PREDICT_FALSE(req->dest_uuid() != local_uuid)) {
    Status s = Status::InvalidArgument(Substitute("$0: Wrong destination UUID requested. "
                                                  "Local UUID: $1. Requested UUID: $2",
                                                  method_name, local_uuid, req->dest_uuid()));
    LOG(WARNING) << s.ToString() << ": from " << context->requestor_string()
                 << ": " << SecureShortDebugString(*req);
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::WRONG_SERVER_UUID, context);
    return false;
  }
  return true;
}

template<class RespClass>
bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>& replica,
                           RespClass* resp,
                           RpcContext* context,
                           shared_ptr<RaftConsensus>* consensus_out) {
  shared_ptr<RaftConsensus> tmp_consensus = replica->shared_consensus();
  if (!tmp_consensus) {
    Status s = Status::ServiceUnavailable("Raft Consensus unavailable",
                                          "Tablet replica not initialized");
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::TABLET_NOT_RUNNING, context);
    return false;
  }
  *consensus_out = std::move(tmp_consensus);
  return true;
}

template<class RespClass>
bool CheckTabletServerNotQuiescingOrRespond(const TabletServer* server, RespClass* resp,
                                            RpcContext* context) {
  if (PREDICT_FALSE(server->quiescing())) {
    Status s = Status::ServiceUnavailable("Tablet server is quiescing");
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::TABLET_NOT_RUNNING, context);
    return false;
  }
  return true;
}

Status GetTabletRef(const scoped_refptr<TabletReplica>& replica,
                    shared_ptr<Tablet>* tablet,
                    TabletServerErrorPB::Code* error_code) {
  *DCHECK_NOTNULL(tablet) = replica->shared_tablet();
  if (PREDICT_FALSE(!*tablet)) {
    *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
    return Status::IllegalState("Tablet is not running");
  }
  return Status::OK();
}

template <class RespType>
void HandleUnknownError(const Status& s, RespType* resp, RpcContext* context) {
  resp->Clear();
  SetupErrorAndRespond(resp->mutable_error(), s,
                       TabletServerErrorPB::UNKNOWN_ERROR,
                       context);
}

template <class ReqType, class RespType>
void HandleResponse(const ReqType* req, RespType* resp,
                    RpcContext* context, const Status& s) {
  if (PREDICT_FALSE(!s.ok())) {
    HandleUnknownError(s, resp, context);
    return;
  }
  context->RespondSuccess();
}

} // namespace

typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;

// Populates 'required_column_privileges' with the column-level privileges
// required to perform the scan specified by 'scan_pb', consulting the column
// IDs found in 'schema'.
//
// Users of NewScanRequestPB (e.g. Scans and Checksums) require the following
// privileges:
//   if no projected columns (i.e. a "counting" scan) ||
//       projected columns has virtual column (e.g. "diff" scan):
//     SCAN ON TABLE || foreach (column): SCAN ON COLUMN
//   else:
//     if uses pk (e.g. ORDERED scan, or primary key fields set):
//       foreach(primary key column): SCAN ON COLUMN
//     foreach(projected column): SCAN ON COLUMN
//     foreach(predicated column): SCAN ON COLUMN
//
// Returns false if the request is malformed (e.g. unknown non-virtual column
// name), and sends an error response via 'context' if so. 'req_type' is used
// to add context in logs.
static bool GetScanPrivilegesOrRespond(const NewScanRequestPB& scan_pb, const Schema& schema,
                                       const string& req_type,
                                       unordered_set<ColumnId>* required_column_privileges,
                                       RpcContext* context) {
  const auto respond_not_authorized = [&] (const string& col_name) {
    LOG(WARNING) << Substitute("rejecting $0 request from $1: no column named '$2'",
                               req_type, context->requestor_string(), col_name);
    context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
        Status::NotAuthorized(Substitute("not authorized to $0", req_type)));
  };
  // If there is no projection (i.e. this is a "counting" scan), the user
  // needs full scan privileges on the table.
  if (scan_pb.projected_columns_size() == 0) {
    *required_column_privileges = unordered_set<ColumnId>(schema.column_ids().begin(),
                                                          schema.column_ids().end());
    return true;
  }
  unordered_set<ColumnId> required_privileges;
  // Determine the scan's projected key column IDs.
  for (int i = 0; i < scan_pb.projected_columns_size(); i++) {
    optional<ColumnSchema> projected_column;
    Status s = ColumnSchemaFromPB(scan_pb.projected_columns(i), &projected_column);
    if (PREDICT_FALSE(!s.ok())) {
      LOG(WARNING) << s.ToString();
      context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_REQUEST, s);
      return false;
    }
    // A projection may contain virtual columns, which don't exist in the
    // tablet schema. If we were to search for a virtual column, we would
    // incorrectly get a "not found" error. To reconcile this with the fact
    // that we want to return an authorization error if the user has requested
    // a non-virtual column that doesn't exist, we require full scan privileges
    // for virtual columns.
    if (projected_column->type_info()->is_virtual()) {
      *required_column_privileges = unordered_set<ColumnId>(schema.column_ids().begin(),
                                                            schema.column_ids().end());
      return true;
    }
    int col_idx = schema.find_column(projected_column->name());
    if (col_idx == Schema::kColumnNotFound) {
      respond_not_authorized(scan_pb.projected_columns(i).name());
      return false;
    }
    EmplaceIfNotPresent(&required_privileges, schema.column_id(col_idx));
  }
  // Ordered scans and any scans that make use of the primary key require
  // privileges to scan across all primary key columns.
  if (scan_pb.order_mode() == ORDERED ||
      scan_pb.has_start_primary_key() ||
      scan_pb.has_stop_primary_key() ||
      scan_pb.has_last_primary_key()) {
    const auto& key_cols = schema.get_key_column_ids();
    required_privileges.insert(key_cols.begin(), key_cols.end());
  }
  // Determine the scan's predicate column IDs.
  for (int i = 0; i < scan_pb.column_predicates_size(); i++) {
    int col_idx = schema.find_column(scan_pb.column_predicates(i).column());
    if (col_idx == Schema::kColumnNotFound) {
      respond_not_authorized(scan_pb.column_predicates(i).column());
      return false;
    }
    EmplaceIfNotPresent(&required_privileges, schema.column_id(col_idx));
  }
  // Do the same for the DEPRECATED_range_predicates field. Even though this
  // field is deprecated, it is still exposed as a part of our public API and
  // thus needs to be taken into account.
  for (int i = 0; i < scan_pb.deprecated_range_predicates_size(); i++) {
    int col_idx = schema.find_column(scan_pb.deprecated_range_predicates(i).column().name());
    if (col_idx == Schema::kColumnNotFound) {
      respond_not_authorized(scan_pb.deprecated_range_predicates(i).column().name());
      return false;
    }
    EmplaceIfNotPresent(&required_privileges, schema.column_id(col_idx));
  }
  *required_column_privileges = std::move(required_privileges);
  return true;
}

// Checks the column-level privileges required to perform the scan specified by
// 'scan_pb' against the authorized column IDs listed in
// 'authorized_column_ids', consulting the column IDs found in 'schema'.
//
// Returns false if the scan isn't authorized and uses 'context' to send an
// error response. 'req_type' is used for logging'.
static bool CheckScanPrivilegesOrRespond(const NewScanRequestPB& scan_pb, const Schema& schema,
                                         const unordered_set<ColumnId>& authorized_column_ids,
                                         const string& req_type, RpcContext* context) {
  unordered_set<ColumnId> required_column_privileges;
  if (!GetScanPrivilegesOrRespond(scan_pb, schema, req_type,
                                  &required_column_privileges, context)) {
    return false;
  }
  for (const auto& required_col_id : required_column_privileges) {
    if (!ContainsKey(authorized_column_ids, required_col_id)) {
      LOG(WARNING) << Substitute("rejecting $0 request from $1: authz token doesn't "
                                 "authorize column ID $2", req_type, context->requestor_string(),
                                 required_col_id);
      context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
          Status::NotAuthorized(Substitute("not authorized to $0", req_type)));
      return false;
    }
  }
  return true;
}

// Returns false if the table ID of 'privilege' doesn't match 'table_id',
// responding with an error via 'context' if so. Otherwise, returns true.
// 'req_type' is used for logging purposes.
static bool CheckMatchingTableIdOrRespond(const security::TablePrivilegePB& privilege,
                                          const string& table_id, const string& req_type,
                                          RpcContext* context) {
  if (privilege.table_id() != table_id) {
    LOG(WARNING) << Substitute("rejecting $0 request from $1: '$2', expected '$3'",
                               req_type, context->requestor_string(),
                               privilege.table_id(), table_id);
    context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
        Status::NotAuthorized("authorization token is for the wrong table ID"));
    return false;
  }
  return true;
}

// Returns false if the privilege has neither full scan privileges nor any
// column-level scan privileges, in which case any scan-like request should be
// rejected. Otherwise returns true, and returns any column-level scan
// privileges in 'privilege'.
static bool CheckMayHaveScanPrivilegesOrRespond(const security::TablePrivilegePB& privilege,
                                                const string& req_type,
                                                unordered_set<ColumnId>* authorized_column_ids,
                                                RpcContext* context) {
  DCHECK(authorized_column_ids);
  DCHECK(authorized_column_ids->empty());
  if (privilege.column_privileges_size() > 0) {
    for (const auto& col_id_and_privilege : privilege.column_privileges()) {
      if (col_id_and_privilege.second.scan_privilege()) {
        EmplaceOrDie(authorized_column_ids, col_id_and_privilege.first);
      }
    }
  }
  if (privilege.scan_privilege() || !authorized_column_ids->empty()) {
    return true;
  }
  LOG(WARNING) << Substitute("rejecting $0 request from $1: no column privileges",
                             req_type, context->requestor_string());
  context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
      Status::NotAuthorized(Substitute("not authorized to $0", req_type)));
  return false;
}

// Verifies the authorization token's correctness. Returns false and sends an
// appropriate response if the request's authz token is invalid.
template <class AuthorizableRequest>
static bool VerifyAuthzTokenOrRespond(const TokenVerifier& token_verifier,
                                      const AuthorizableRequest& req,
                                      RpcContext* context,
                                      TokenPB* token) {
  DCHECK(token);
  if (!req.has_authz_token()) {
    context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
        Status::NotAuthorized("no authorization token presented"));
    return false;
  }
  TokenPB token_pb;
  const auto result = token_verifier.VerifyTokenSignature(req.authz_token(), &token_pb);
  ErrorStatusPB::RpcErrorCodePB error;
  Status s = ParseTokenVerificationResult(result,
      ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN, &error);
  if (!s.ok()) {
    context->RespondRpcFailure(error, s.CloneAndPrepend("authz token verification failure"));
    return false;
  }
  if (!token_pb.has_authz() ||
      !token_pb.authz().has_table_privilege() ||
      token_pb.authz().username() != context->remote_user().username()) {
    context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
        Status::NotAuthorized("invalid authorization token presented"));
    return false;
  }
  if (MaybeTrue(FLAGS_tserver_inject_invalid_authz_token_ratio)) {
    context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN,
        Status::NotAuthorized("INJECTED FAILURE"));
    return false;
  }
  *token = std::move(token_pb);
  return true;
}

static void SetupErrorAndRespond(TabletServerErrorPB* error,
                                 const Status& s,
                                 TabletServerErrorPB::Code code,
                                 RpcContext* context) {
  // Non-authorized errors will drop the connection.
  if (code == TabletServerErrorPB::NOT_AUTHORIZED) {
    DCHECK(s.IsNotAuthorized());
    context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED, s);
    return;
  }
  // Generic "service unavailable" errors will cause the client to retry later.
  if ((code == TabletServerErrorPB::UNKNOWN_ERROR ||
       code == TabletServerErrorPB::THROTTLED) && s.IsServiceUnavailable()) {
    context->RespondRpcFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, s);
    return;
  }

  StatusToPB(s, error->mutable_status());
  error->set_code(code);
  context->RespondNoCache();
}

template <class ReqType, class RespType>
void HandleErrorResponse(const ReqType* req, RespType* resp, RpcContext* context,
                         const optional<TabletServerErrorPB::Code>& error_code,
                         const Status& s) {
  resp->Clear();
  if (error_code) {
    SetupErrorAndRespond(resp->mutable_error(), s, *error_code, context);
  } else {
    HandleUnknownError(s, resp, context);
  }
}

// A op completion callback that responds to the client when ops complete and
// sets the client error if there is one to set.
template<class Response>
class RpcOpCompletionCallback : public OpCompletionCallback {
 public:
  RpcOpCompletionCallback(RpcContext* context, Response* response)
      : context_(context),
        response_(response) {}

  void OpCompleted() override {
    if (!status_.ok()) {
      LOG(WARNING) << Substitute("failed op from $0: $1",
                                 context_->requestor_string(), status_.ToString());
      SetupErrorAndRespond(get_error(), status_, code_, context_);
    } else {
      context_->RespondSuccess();
    }
  }

 private:
  TabletServerErrorPB* get_error() {
    return response_->mutable_error();
  }

  rpc::RpcContext* context_;
  Response* response_;
};

class TxnWriteCompletionCallback : public RpcOpCompletionCallback<WriteResponsePB> {
 public:
  TxnWriteCompletionCallback(RpcContext* context, WriteResponsePB* response,
                             std::function<Status(void)> abort_func)
      : RpcOpCompletionCallback(context, response),
        abort_func_(std::move(abort_func)) {}

  void OpCompleted() override {
    if (PREDICT_FALSE(code_ == TabletServerErrorPB::TXN_LOCKED_ABORT)) {
      WARN_NOT_OK(abort_func_(), "Error running txn abort callback");
    }
    RpcOpCompletionCallback::OpCompleted();
  }

 private:
  std::function<Status(void)> abort_func_;
};

// Generic interface to handle scan results.
class ScanResultCollector {
 public:
  virtual void HandleRowBlock(Scanner* scanner,
                              const RowBlock& row_block) = 0;

  // Returns number of bytes which will be returned in the response.
  virtual int64_t ResponseSize() const = 0;

  // Return the number of rows actually returned to the client.
  virtual int64_t NumRowsReturned() const = 0;

  // Initialize the serializer with the given row format flags.
  //
  // This is a separate function instead of a constructor argument passed to specific
  // collector implementations because, currently, the collector is built before the
  // request is decoded and checked for 'row_format_flags'.
  //
  // Does nothing by default.
  virtual Status InitSerializer(uint64_t /* row_format_flags */,
                                const Schema& /* scanner_schema */,
                                const Schema& /* client_schema */) {
    return Status::OK();
  }

  CpuTimes* cpu_times() {
    return &cpu_times_;
  }

 private:
  CpuTimes cpu_times_;
};

namespace {

// Given a RowBlock, set last_primary_key to the primary key of the last selected row
// in the RowBlock. If no row is selected, last_primary_key is not set.
void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) {
  // Find the last selected row and save its encoded key.
  const SelectionVector* sel = row_block.selection_vector();
  if (sel->AnySelected()) {
    for (int i = sel->nrows() - 1; i >= 0; i--) {
      if (sel->IsRowSelected(i)) {
        RowBlockRow last_row = row_block.row(i);
        const Schema* schema = last_row.schema();
        schema->EncodeComparableKey(last_row, last_primary_key);
        break;
      }
    }
  }
}

// Interface for serializing results into a scan response.
class ResultSerializer {
 public:
  virtual ~ResultSerializer() = default;

  // Add the given RowBlock to the pending response.
  virtual int SerializeRowBlock(const RowBlock& row_block,
                                const Schema* client_projection_schema) = 0;

  // Return the approximate size (in bytes) of the pending response. Once this
  // result is greater than the client's requested batch size, the pending rows
  // will be returned to the client.
  virtual size_t ResponseSize() const = 0;

  // Serialize the pending rows into the response protobuf.
  // Must be called at most once.
  virtual void SetupResponse(RpcContext* context, ScanResponsePB* resp) = 0;
};

class RowwiseResultSerializer : public ResultSerializer {
 public:
  RowwiseResultSerializer(int batch_size_bytes, uint64_t flags)
      : rows_data_(batch_size_bytes * 11 / 10),
        indirect_data_(batch_size_bytes * 11 / 10),
        pad_unixtime_micros_to_16_bytes_(flags & RowFormatFlags::PAD_UNIX_TIME_MICROS_TO_16_BYTES) {
    // TODO(todd): use a chain of faststrings instead of a single one to avoid
    // allocating this large buffer. Large buffer allocations are slow and
    // potentially wasteful.
  }

  int SerializeRowBlock(const RowBlock& row_block,
                        const Schema* client_projection_schema) override {
    // TODO(todd) create some kind of serializer object that caches the projection
    // information to avoid recalculating it on every SerializeRowBlock call.
    int num_selected = kudu::SerializeRowBlock(
        row_block, client_projection_schema,
        &rows_data_, &indirect_data_, pad_unixtime_micros_to_16_bytes_);
    rowblock_pb_.set_num_rows(rowblock_pb_.num_rows() + num_selected);
    return num_selected;
  }

  size_t ResponseSize() const override {
    return rows_data_.size() + indirect_data_.size();
  }

  void SetupResponse(RpcContext* context, ScanResponsePB* resp) override {
    CHECK(!done_);
    done_ = true;

    *resp->mutable_data() = std::move(rowblock_pb_);
    // Add sidecar data to context and record the returned indices.
    int rows_idx;
    CHECK_OK(context->AddOutboundSidecar(
        RpcSidecar::FromFaststring((std::move(rows_data_))), &rows_idx));
    resp->mutable_data()->set_rows_sidecar(rows_idx);

    // Add indirect data as a sidecar, if applicable.
    if (indirect_data_.size() > 0) {
      int indirect_idx;
      CHECK_OK(context->AddOutboundSidecar(
          RpcSidecar::FromFaststring(std::move(indirect_data_)), &indirect_idx));
      resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
    }
  }

 private:
  RowwiseRowBlockPB rowblock_pb_;
  faststring rows_data_;
  faststring indirect_data_;
  bool pad_unixtime_micros_to_16_bytes_;
  bool done_ = false;
};

class ColumnarResultSerializer : public ResultSerializer {
 public:
  static Status Create(uint64_t flags,
                       int batch_size_bytes,
                       const Schema& scanner_schema,
                       const Schema& client_schema,
                       unique_ptr<ResultSerializer>* serializer) {
    if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) {
      return Status::InvalidArgument("Row format flags not supported with columnar layout");
    }
    serializer->reset(new ColumnarResultSerializer(
        scanner_schema, client_schema, batch_size_bytes));
    return Status::OK();
  }

  int SerializeRowBlock(const RowBlock& row_block,
                        const Schema* /* unused */) override {
    CHECK(!done_);
    int n_sel = results_.AddRowBlock(row_block);
    num_rows_ += n_sel;
    return n_sel;
  }

  size_t ResponseSize() const override {
    CHECK(!done_);

    int total = 0;
    for (const auto& col : results_.columns()) {
      total += col.data.size();
      if (col.varlen_data) {
        total += col.varlen_data->size();
      }
      if (col.non_null_bitmap) {
        total += col.non_null_bitmap->size();
      }
    }
    return total;
  }

  void SetupResponse(RpcContext* context, ScanResponsePB* resp) override {
    CHECK(!done_);
    done_ = true;
    ColumnarRowBlockPB* data = resp->mutable_columnar_data();
    auto cols = std::move(results_).TakeColumns();
    for (auto& col : cols) {
      auto* col_pb = data->add_columns();
      int sidecar_idx;
      CHECK_OK(context->AddOutboundSidecar(
          RpcSidecar::FromFaststring((std::move(col.data))), &sidecar_idx));
      col_pb->set_data_sidecar(sidecar_idx);

      if (col.varlen_data) {
        CHECK_OK(context->AddOutboundSidecar(
            RpcSidecar::FromFaststring((std::move(*col.varlen_data))), &sidecar_idx));
        col_pb->set_varlen_data_sidecar(sidecar_idx);
      }

      if (col.non_null_bitmap) {
        CHECK_OK(context->AddOutboundSidecar(
            RpcSidecar::FromFaststring((std::move(*col.non_null_bitmap))), &sidecar_idx));
        col_pb->set_non_null_bitmap_sidecar(sidecar_idx);
      }
    }
    data->set_num_rows(num_rows_);
  }

 private:
  ColumnarResultSerializer(const Schema& scanner_schema,
                           const Schema& client_schema,
                           int batch_size_bytes)
      : results_(scanner_schema, client_schema, batch_size_bytes) {
  }

  int64_t num_rows_ = 0;
  ColumnarSerializedBatch results_;
  bool done_ = false;
};

} // anonymous namespace

// Copies the scan result to the given row block PB and data buffers.
//
// This implementation is used in the common case where a client is running
// a scan and the data needs to be returned to the client.
//
// (This is in contrast to some other ScanResultCollector implementation that
// might do an aggregation or gather some other types of statistics via a
// server-side scan and thus never need to return the actual data.)
class ScanResultCopier : public ScanResultCollector {
 public:
  explicit ScanResultCopier(int batch_size_bytes)
      : batch_size_bytes_(batch_size_bytes),
        num_rows_returned_(0) {
  }

  void HandleRowBlock(Scanner* scanner, const RowBlock& row_block) override {
    int num_selected = serializer_->SerializeRowBlock(
        row_block, scanner->client_projection_schema());

    if (num_selected > 0) {
      num_rows_returned_ += num_selected;
      scanner->add_num_rows_returned(num_selected);
      SetLastRow(row_block, &last_primary_key_);
    }
  }

  // Returns number of bytes buffered to return.
  int64_t ResponseSize() const override {
    return serializer_->ResponseSize();
  }

  int64_t NumRowsReturned() const override {
    return num_rows_returned_;
  }

  Status InitSerializer(uint64_t row_format_flags,
                        const Schema& scanner_schema,
                        const Schema& client_schema) override {
    if (serializer_) {
      // TODO(todd) for the NewScanner case, this gets called twice
      // which is a bit ugly. Refactor to avoid!
      return Status::OK();
    }
    if (row_format_flags & COLUMNAR_LAYOUT) {
      return ColumnarResultSerializer::Create(
          row_format_flags, batch_size_bytes_, scanner_schema, client_schema, &serializer_);
    }
    serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags));
    return Status::OK();
  }

  void SetupResponse(RpcContext* context, ScanResponsePB* resp) {
    if (serializer_) {
      serializer_->SetupResponse(context, resp);
    }

    // Set the last row found by the collector.
    //
    // We could have an empty batch if all the remaining rows are filtered by the
    // predicate, in which case do not set the last row.
    if (last_primary_key_.length() > 0) {
      resp->set_last_primary_key(last_primary_key_.ToString());
    }
  }

 private:
  int batch_size_bytes_;
  int64_t num_rows_returned_;
  faststring last_primary_key_;
  unique_ptr<ResultSerializer> serializer_;

  DISALLOW_COPY_AND_ASSIGN(ScanResultCopier);
};

// Checksums the scan result.
class ScanResultChecksummer : public ScanResultCollector {
 public:
  ScanResultChecksummer()
      : crc_(crc::GetCrc32cInstance()),
        agg_checksum_(0),
        rows_checksummed_(0) {
  }

  virtual void HandleRowBlock(Scanner* scanner,
                              const RowBlock& row_block) OVERRIDE {

    const Schema* client_projection_schema = scanner->client_projection_schema();
    if (!client_projection_schema) {
      client_projection_schema = row_block.schema();
    }

    size_t nrows = row_block.nrows();
    for (size_t i = 0; i < nrows; i++) {
      if (!row_block.selection_vector()->IsRowSelected(i)) continue;
      uint32_t row_crc = CalcRowCrc32(*client_projection_schema, row_block.row(i));
      agg_checksum_ += row_crc;
      rows_checksummed_++;
    }
    // Find the last selected row and save its encoded key.
    SetLastRow(row_block, &encoded_last_row_);
  }

  // Returns a constant -- we only return checksum based on a time budget.
  virtual int64_t ResponseSize() const OVERRIDE { return sizeof(agg_checksum_); }

  virtual int64_t NumRowsReturned() const OVERRIDE {
    return 0;
  }

  int64_t rows_checksummed() const {
    return rows_checksummed_;
  }

  // Accessors for initializing / setting the checksum.
  void set_agg_checksum(uint64_t value) { agg_checksum_ = value; }
  uint64_t agg_checksum() const { return agg_checksum_; }

 private:
  // Calculates a CRC32C for the given row.
  uint32_t CalcRowCrc32(const Schema& projection, const RowBlockRow& row) {
    tmp_buf_.clear();

    for (size_t j = 0; j < projection.num_columns(); j++) {
      uint32_t col_index = static_cast<uint32_t>(j);  // For the CRC.
      tmp_buf_.append(&col_index, sizeof(col_index));
      ColumnBlockCell cell = row.cell(j);
      if (cell.is_nullable()) {
        uint8_t is_defined = cell.is_null() ? 0 : 1;
        tmp_buf_.append(&is_defined, sizeof(is_defined));
        if (!is_defined) continue;
      }
      if (cell.typeinfo()->physical_type() == BINARY) {
        const Slice* data = reinterpret_cast<const Slice *>(cell.ptr());
        tmp_buf_.append(data->data(), data->size());
      } else {
        tmp_buf_.append(cell.ptr(), cell.size());
      }
    }

    uint64_t row_crc = 0;
    crc_->Compute(tmp_buf_.data(), tmp_buf_.size(), &row_crc, nullptr);
    return static_cast<uint32_t>(row_crc); // CRC32 only uses the lower 32 bits.
  }


  faststring tmp_buf_;
  crc::Crc* const crc_;
  uint64_t agg_checksum_;
  int64_t rows_checksummed_;
  faststring encoded_last_row_;

  DISALLOW_COPY_AND_ASSIGN(ScanResultChecksummer);
};

// Return the batch size to use for a given request, after clamping
// the user-requested request within the server-side allowable range.
// This is only a hint, really more of a threshold since returned bytes
// may exceed this limit, but hopefully only by a little bit.
static size_t GetMaxBatchSizeBytesHint(const ScanRequestPB* req) {
  if (!req->has_batch_size_bytes()) {
    return FLAGS_scanner_default_batch_size_bytes;
  }

  return std::min(req->batch_size_bytes(),
                  implicit_cast<uint32_t>(FLAGS_scanner_max_batch_size_bytes));
}

TabletServiceImpl::TabletServiceImpl(TabletServer* server)
    : TabletServerServiceIf(server->metric_entity(), server->result_tracker()),
      server_(server),
      rng_(GetRandomSeed32()) {
  num_op_apply_queue_rejections_ = server_->metric_entity()->FindOrCreateCounter(
      &METRIC_op_apply_queue_overload_rejections);
}

bool TabletServiceImpl::AuthorizeClientOrServiceUser(const google::protobuf::Message* /*req*/,
                                                     google::protobuf::Message* /*resp*/,
                                                     RpcContext* context) {
  return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::USER |
                            ServerBase::SERVICE_USER);
}

bool TabletServiceImpl::AuthorizeListTablets(const google::protobuf::Message* req,
                                             google::protobuf::Message* resp,
                                             RpcContext* context) {
  if (FLAGS_tserver_enforce_access_control) {
    return server_->Authorize(context, ServerBase::SUPER_USER);
  }
  return AuthorizeClient(req, resp, context);
}

bool TabletServiceImpl::AuthorizeClient(const google::protobuf::Message* /*req*/,
                                        google::protobuf::Message* /*resp*/,
                                        RpcContext* context) {
  return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::USER);
}

void TabletServiceImpl::Ping(const PingRequestPB* /*req*/,
                             PingResponsePB* /*resp*/,
                             RpcContext* context) {
  context->RespondSuccess();
}

TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server)
  : TabletServerAdminServiceIf(server->metric_entity(), server->result_tracker()),
    server_(server) {
}

bool TabletServiceAdminImpl::AuthorizeServiceUser(const google::protobuf::Message* /*req*/,
                                                  google::protobuf::Message* /*resp*/,
                                                  RpcContext* context) {
  return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::SERVICE_USER);
}

void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req,
                                         AlterSchemaResponsePB* resp,
                                         RpcContext* context) {
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "AlterSchema", req, resp, context)) {
    return;
  }
  DVLOG(3) << "Received Alter Schema RPC: " << SecureDebugString(*req);

  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
                                           context, &replica)) {
    return;
  }

  uint32_t schema_version = replica->tablet_metadata()->schema_version();

  // If the schema was already applied, respond as succeded
  if (schema_version == req->schema_version()) {
    // Sanity check, to verify that the tablet should have the same schema
    // specified in the request.
    Schema req_schema;
    Status s = SchemaFromPB(req->schema(), &req_schema);
    if (!s.ok()) {
      SetupErrorAndRespond(resp->mutable_error(), s,
                           TabletServerErrorPB::INVALID_SCHEMA, context);
      return;
    }

    const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
    const Schema& tablet_schema = *tablet_schema_ptr;
    if (req_schema == tablet_schema) {
      context->RespondSuccess();
      return;
    }

    schema_version = replica->tablet_metadata()->schema_version();
    if (schema_version == req->schema_version()) {
      LOG(ERROR) << "The current schema does not match the request schema."
                 << " version=" << schema_version
                 << " current-schema=" << tablet_schema.ToString()
                 << " request-schema=" << req_schema.ToString()
                 << " (corruption)";
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::Corruption("got a different schema for the same version number"),
                           TabletServerErrorPB::MISMATCHED_SCHEMA, context);
      return;
    }
  }

  // If the current schema is newer than the one in the request reject the request.
  if (schema_version > req->schema_version()) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::InvalidArgument("Tablet has a newer schema"),
                         TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, context);
    return;
  }

  unique_ptr<AlterSchemaOpState> op_state(
    new AlterSchemaOpState(replica.get(), req, resp));

  op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
      new RpcOpCompletionCallback<AlterSchemaResponsePB>(context, resp)));

  // Submit the alter schema op. The RPC will be responded to asynchronously.
  Status s = replica->SubmitAlterSchema(std::move(op_state));
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }
}

namespace {
// Returns an error if 'op' is missing any required fields, or if it's of an
// unknown type.
Status ValidateCoordinatorOpFields(const CoordinatorOpPB& op) {
  const auto& type = op.type();
  Status s;
  switch (type) {
    case CoordinatorOpPB::REGISTER_PARTICIPANT:
      if (!op.has_txn_participant_id()) {
        return Status::InvalidArgument(Substitute("Missing participant id: $0",
                                                  SecureShortDebugString(op)));
      }
      [[fallthrough]];
    case CoordinatorOpPB::BEGIN_TXN:
    case CoordinatorOpPB::BEGIN_COMMIT_TXN:
    case CoordinatorOpPB::ABORT_TXN:
    case CoordinatorOpPB::GET_TXN_STATUS:
    case CoordinatorOpPB::KEEP_TXN_ALIVE:
      if (!op.has_txn_id()) {
        return Status::InvalidArgument(Substitute("Missing txn id: $0",
                                                  SecureShortDebugString(op)));
      }
      return Status::OK();
    default:
      return Status::InvalidArgument(Substitute("Unknown op type: $0", type));
  }
  __builtin_unreachable();
}
} // anonymous namespace

void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRequestPB* req,
                                                   CoordinateTransactionResponsePB* resp,
                                                   RpcContext* context) {
  if (PREDICT_FALSE(!req->has_txn_status_tablet_id() ||
                    !req->has_op())) {
    Status s = Status::InvalidArgument(
        Substitute("Missing fields in request: $0", SecureShortDebugString(*req)));
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }
  const auto& op = req->op();
  Status s = ValidateCoordinatorOpFields(op);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (PREDICT_FALSE(!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(),
                                                         req->txn_status_tablet_id(),
                                                         resp, context, &replica))) {
    return;
  }
  tablet::TxnCoordinator* txn_coordinator = replica->txn_coordinator();
  if (PREDICT_FALSE(!txn_coordinator)) {
    Status s = Status::InvalidArgument(
        Substitute("Requested tablet is not a txn coordinator: $0", replica->tablet_id()));
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }
  // Catch any replication errors in this 'ts_error' so we can return an
  // appropriate error to the caller if need be.
  TabletServerErrorPB ts_error;
  transactions::TxnStatusEntryPB txn_status;
  const auto& user = op.user();
  const auto& txn_id = op.txn_id();
  int64_t highest_seen_txn_id = -1;
  {
    transactions::TxnStatusManager::ScopedLeaderSharedLock l(txn_coordinator);
    if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, context)) {
      return;
    }
    switch (op.type()) {
      case CoordinatorOpPB::BEGIN_TXN:
        s = txn_coordinator->BeginTransaction(
            txn_id, user, &highest_seen_txn_id, &ts_error);
        break;
      case CoordinatorOpPB::REGISTER_PARTICIPANT:
        s = txn_coordinator->RegisterParticipant(txn_id, op.txn_participant_id(), user, &ts_error);
        break;
      case CoordinatorOpPB::BEGIN_COMMIT_TXN:
        s = txn_coordinator->BeginCommitTransaction(txn_id, user, &ts_error);
        break;
      case CoordinatorOpPB::ABORT_TXN:
        s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error);
        break;
      case CoordinatorOpPB::GET_TXN_STATUS:
        s = txn_coordinator->GetTransactionStatus(
            txn_id, user, &txn_status, &ts_error);
        break;
      case CoordinatorOpPB::KEEP_TXN_ALIVE:
        s = txn_coordinator->KeepTransactionAlive(txn_id, user, &ts_error);
        break;
      default:
        s = Status::InvalidArgument(Substitute("Unknown op type: $0", op.type()));
   }
  }
  if (ts_error.has_status() && ts_error.status().code() != AppStatusPB::OK) {
    *resp->mutable_error() = std::move(ts_error);
    context->RespondNoCache();
    return;
  }
  // From here on out, errors are considered application errors.
  if (PREDICT_FALSE(!s.ok())) {
    StatusToPB(s, resp->mutable_op_result()->mutable_op_error());
  } else if (op.type() == CoordinatorOpPB::GET_TXN_STATUS) {
    // Populate corresponding field in the response.
    *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status);
  } else if (op.type() == CoordinatorOpPB::BEGIN_TXN) {
    resp->mutable_op_result()->set_keepalive_millis(
        FLAGS_txn_keepalive_interval_ms);
  }
  if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) {
    DCHECK_GE(highest_seen_txn_id, 0);
    resp->mutable_op_result()->set_highest_seen_txn_id(highest_seen_txn_id);
  }
  context->RespondSuccess();
}

void TabletServiceAdminImpl::ParticipateInTransaction(const ParticipantRequestPB* req,
                                                      ParticipantResponsePB* resp,
                                                      RpcContext* context) {
  if (!req->has_op() || !req->op().has_type() || !req->op().has_txn_id() ||
      !req->has_tablet_id()) {
    Status s = Status::InvalidArgument(
        Substitute("Missing fields in request: $0", SecureShortDebugString(*req)));
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
                                           context, &replica)) {
    return;
  }
  shared_ptr<Tablet> tablet;
  TabletServerErrorPB::Code error_code;
  Status s = GetTabletRef(replica, &tablet, &error_code);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
    return;
  }
  const auto& op = req->op();
  if (op.type() == ParticipantOpPB::GET_METADATA) {
    TxnMetadataPB pb;
    const auto* meta = tablet->metadata();
    if (meta->GetTxnMetadataPB(op.txn_id(), &pb)) {
      *resp->mutable_metadata() = std::move(pb);
      context->RespondSuccess();
      return;
    }
    SetupErrorAndRespond(
        resp->mutable_error(),
        Status::InvalidArgument(Substitute("txn ID $0 has no metadata", op.txn_id())),
        TabletServerErrorPB::UNKNOWN_ERROR,
        context);
    return;
  }

  // TODO(awong): consider memory-based throttling?
  // TODO(awong): we should also persist the transaction's owner, and prevent
  // other users from mutating it.
  unique_ptr<ParticipantOpState> op_state(
      new ParticipantOpState(replica.get(), tablet->txn_participant(), req, resp));
  op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
      new RpcOpCompletionCallback<ParticipantResponsePB>(context, resp)));
  s = replica->SubmitTxnParticipantOp(std::move(op_state));
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
  }
}

bool TabletServiceAdminImpl::SupportsFeature(uint32_t feature) const {
  switch (feature) {
    case TabletServerFeatures::COLUMN_PREDICATES:
    case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
    case TabletServerFeatures::QUIESCING:
    case TabletServerFeatures::BLOOM_FILTER_PREDICATE_V2:
    // TODO(awong): once transactions are useable, add a feature flag.
      return true;
    default:
      return false;
  }
}

void TabletServiceAdminImpl::Quiesce(const QuiesceTabletServerRequestPB* req,
                                     QuiesceTabletServerResponsePB* resp,
                                     RpcContext* context) {
  if (req->has_quiesce()) {
    bool quiesce_tserver = req->quiesce();
    *server_->mutable_quiescing() = quiesce_tserver;
    LOG(INFO) << Substitute("Tablet server $0 set to $1",
                            server_->fs_manager()->uuid(),
                            (quiesce_tserver ? "quiescing" : "not quiescing"));
  }
  if (req->return_stats()) {
    resp->set_num_leaders(server_->num_raft_leaders()->value());
    resp->set_num_active_scanners(server_->scanner_manager()->CountActiveScanners());
    LOG(INFO) << Substitute("Tablet server has $0 leaders and $1 scanners",
        resp->num_leaders(), resp->num_active_scanners());
  }
  resp->set_is_quiescing(server_->quiescing());
  context->RespondSuccess();
}

void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req,
                                          CreateTabletResponsePB* resp,
                                          RpcContext* context) {
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "CreateTablet", req, resp, context)) {
    return;
  }
  TRACE_EVENT1("tserver", "CreateTablet",
               "tablet_id", req->tablet_id());

  Schema schema;
  Status s = SchemaFromPB(req->schema(), &schema);
  DCHECK(schema.has_column_ids());
  if (!s.ok()) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::InvalidArgument("Invalid Schema."),
                         TabletServerErrorPB::INVALID_SCHEMA, context);
    return;
  }

  PartitionSchema partition_schema;
  s = PartitionSchema::FromPB(req->partition_schema(), schema, &partition_schema);
  if (!s.ok()) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::InvalidArgument("Invalid PartitionSchema."),
                         TabletServerErrorPB::INVALID_SCHEMA, context);
    return;
  }

  Partition partition;
  Partition::FromPB(req->partition(), &partition);

  LOG(INFO) << Substitute("Processing CreateTablet for tablet $0 ($1table=$2 [id=$3]), "
                          "partition=$4", req->tablet_id(),
                          req->has_table_type() ? TableTypePB_Name(req->table_type()) + " ": "",
                          req->table_name(), req->table_id(),
                          partition_schema.PartitionDebugString(partition, schema));
  VLOG(1) << "Full request: " << SecureDebugString(*req);

  s = server_->tablet_manager()->CreateNewTablet(
      req->table_id(),
      req->tablet_id(),
      partition,
      req->table_name(),
      schema,
      partition_schema,
      req->config(),
      req->has_extra_config() ? make_optional(req->extra_config()) : nullopt,
      req->has_dimension_label() ? make_optional(req->dimension_label()) : nullopt,
      req->has_table_type() && req->table_type() != TableTypePB::DEFAULT_TABLE ?
          make_optional(req->table_type()) : nullopt,
      nullptr);
  if (PREDICT_FALSE(!s.ok())) {
    TabletServerErrorPB::Code code;
    if (s.IsAlreadyPresent()) {
      code = TabletServerErrorPB::TABLET_ALREADY_EXISTS;
    } else {
      code = TabletServerErrorPB::UNKNOWN_ERROR;
    }
    SetupErrorAndRespond(resp->mutable_error(), s, code, context);
    return;
  }
  context->RespondSuccess();
}

void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req,
                                          DeleteTabletResponsePB* resp,
                                          RpcContext* context) {
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "DeleteTablet", req, resp, context)) {
    return;
  }
  TRACE_EVENT2("tserver", "DeleteTablet",
               "tablet_id", req->tablet_id(),
               "reason", req->reason());

  tablet::TabletDataState delete_type = tablet::TABLET_DATA_UNKNOWN;
  if (req->has_delete_type()) {
    delete_type = req->delete_type();
  }
  LOG(INFO) << "Processing DeleteTablet for tablet " << req->tablet_id()
            << " with delete_type " << TabletDataState_Name(delete_type)
            << (req->has_reason() ? (" (" + req->reason() + ")") : "")
            << " from " << context->requestor_string();
  VLOG(1) << "Full request: " << SecureDebugString(*req);

  optional<int64_t> cas_config_opid_index_less_or_equal;
  if (req->has_cas_config_opid_index_less_or_equal()) {
    cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal();
  }

  auto response_callback = [context, req, resp](const Status& s, TabletServerErrorPB::Code code) {
    if (PREDICT_FALSE(!s.ok())) {
      HandleErrorResponse(req, resp, context, code, s);
      return;
    }
    context->RespondSuccess();
  };
  server_->tablet_manager()->DeleteTabletAsync(req->tablet_id(),
                                               delete_type,
                                               cas_config_opid_index_less_or_equal,
                                               response_callback);
}

void TabletServiceImpl::Write(const WriteRequestPB* req,
                              WriteResponsePB* resp,
                              RpcContext* context) {
  const auto& tablet_id = req->tablet_id();
  TRACE_EVENT1("tserver", "TabletServiceImpl::Write",
               "tablet_id", tablet_id);
  DVLOG(3) << "Received Write RPC: " << SecureDebugString(*req);
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(
        server_->tablet_manager(), tablet_id, resp, context, &replica)) {
    return;
  }
  optional<WriteAuthorizationContext> authz_context;
  if (FLAGS_tserver_enforce_access_control) {
    TokenPB token;
    if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) {
      return;
    }
    const auto& privilege = token.authz().table_privilege();
    if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(),
                                       "Write", context)) {
      return;
    }
    WritePrivileges privileges;
    if (privilege.insert_privilege()) {
      InsertOrDie(&privileges, WritePrivilegeType::INSERT);
    }
    if (privilege.update_privilege()) {
      InsertOrDie(&privileges, WritePrivilegeType::UPDATE);
    }
    if (privilege.delete_privilege()) {
      InsertOrDie(&privileges, WritePrivilegeType::DELETE);
    }
    if (privileges.empty()) {
      // If we know there are no write-related privileges outright, we can
      // short-circuit further checking and reject the request immediately.
      // Otherwise, we'll defer the checking to the prepare phase of the
      // op after decoding the operations.
      LOG(WARNING) << Substitute("rejecting Write request from $0: no write privileges",
                                 context->requestor_string());
      context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
          Status::NotAuthorized("not authorized to write"));
      return;
    }
    authz_context = WriteAuthorizationContext{ privileges, /*requested_op_types=*/{} };
  }

  shared_ptr<Tablet> tablet;
  TabletServerErrorPB::Code error_code;
  Status s = GetTabletRef(replica, &tablet, &error_code);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
    return;
  }

  uint64_t bytes = req->row_operations().rows().size() +
      req->row_operations().indirect_data().size();
  if (!tablet->ShouldThrottleAllow(bytes)) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::ServiceUnavailable("Rejecting Write request: throttled"),
                         TabletServerErrorPB::THROTTLED,
                         context);
    return;
  }

  // Check for memory pressure; don't bother doing any additional work if we've
  // exceeded the limit.
  double capacity_pct;
  if (process_memory::SoftLimitExceeded(&capacity_pct)) {
    tablet->metrics()->leader_memory_pressure_rejections->Increment();
    string msg = StringPrintf("Soft memory limit exceeded (at %.2f%% of capacity)", capacity_pct);
    if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) {
      KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting Write request: " << msg << THROTTLE_MSG;
    } else {
      KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting Write request: " << msg << THROTTLE_MSG;
    }
    SetupErrorAndRespond(resp->mutable_error(), Status::ServiceUnavailable(msg),
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }

  if (!server_->clock()->SupportsExternalConsistencyMode(req->external_consistency_mode())) {
    Status s = Status::NotSupported("The configured clock does not support the"
        " required consistency mode.");
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }

  // If the apply queue is overloaded, the write request might be rejected.
  // The longer the queue was in overloaded state, the higher the probability
  // of rejecting the request.
  MonoDelta queue_otime;
  MonoDelta threshold;
  if (server_->tablet_apply_pool()->QueueOverloaded(&queue_otime, &threshold)) {
    DCHECK(threshold.Initialized());
    DCHECK_GT(threshold.ToMilliseconds(), 0);
    auto overload_threshold_ms = threshold.ToMilliseconds();
    // The longer the queue has been in the overloaded state, the higher the
    // probability of an op to be rejected.
    auto time_factor = queue_otime.ToMilliseconds() / overload_threshold_ms + 1;
    if (!rng_.OneIn(time_factor * time_factor + 1)) {
      static const Status kStatus = Status::ServiceUnavailable(
          "op apply queue is overloaded");
      num_op_apply_queue_rejections_->Increment();
      return SetupErrorAndRespond(resp->mutable_error(),
                                  kStatus,
                                  TabletServerErrorPB::THROTTLED,
                                  context);
    }
  }

  unique_ptr<WriteOpState> op_state(new WriteOpState(
      replica.get(),
      req,
      context->AreResultsTracked() ? context->request_id() : nullptr,
      resp,
      std::move(authz_context)));

  // If the client sent us a timestamp, decode it and update the clock so that all future
  // timestamps are greater than the passed timestamp.
  if (req->has_propagated_timestamp()) {
    Timestamp ts(req->propagated_timestamp());
    s = server_->clock()->Update(ts);
  }
  if (PREDICT_FALSE(!s.ok())) {
    return SetupErrorAndRespond(
        resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
  }

  const auto deadline = context->GetClientDeadline();
  const auto& username = context->remote_user().username();

  if (!req->has_txn_id() ||
      PREDICT_FALSE(!FLAGS_tserver_txn_write_op_handling_enabled)) {
    op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
        new RpcOpCompletionCallback<WriteResponsePB>(context, resp)));

    // Submit the write operation. The RPC will be responded asynchronously.
    s = replica->SubmitWrite(std::move(op_state));
  } else {
    if (!FLAGS_enable_txn_system_client_init) {
      return SetupErrorAndRespond(
          resp->mutable_error(),
          Status::NotSupported(Substitute("txns not supported on server $0",
                                          replica->permanent_uuid())),
          TabletServerErrorPB::UNKNOWN_ERROR, context);
    }
    auto abort_func = [this, txn_id = req->txn_id(), &username] {
      return server_->tablet_manager()->ScheduleAbortTxn(txn_id, username);
    };
    op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
        new TxnWriteCompletionCallback(context, resp, std::move(abort_func))));

    // If it's a write operation in the context of a multi-row transaction,
    // schedule running preliminary tasks if necessary: register the tablet as
    // a participant in the transaction and begin transaction for the
    // participating tablet.
    //
    // This functor is to schedule preliminary tasks prior to submitting
    // the write operation via TabletReplica::SubmitWrite().
    const auto scheduler = [this, &username, replica, deadline](
        int64_t txn_id, tablet::RegisteredTxnCallback began_txn_cb) {
      return server_->tablet_manager()->SchedulePreliminaryTasksForTxnWrite(
          std::move(replica), txn_id, username, deadline, std::move(began_txn_cb));
    };
    s = replica->SubmitTxnWrite(std::move(op_state), scheduler);
    VLOG(2) << Substitute("submitting txn write op: $0", s.ToString());
  }

  // Check that we could submit the write
  if (PREDICT_FALSE(!s.ok())) {
    return SetupErrorAndRespond(
        resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context);
  }
}

ConsensusServiceImpl::ConsensusServiceImpl(ServerBase* server,
                                           TabletReplicaLookupIf* tablet_manager)
    : ConsensusServiceIf(server->metric_entity(), server->result_tracker()),
      server_(server),
      tablet_manager_(tablet_manager) {
}

ConsensusServiceImpl::~ConsensusServiceImpl() {
}

bool ConsensusServiceImpl::AuthorizeServiceUser(const google::protobuf::Message* /*req*/,
                                                google::protobuf::Message* /*resp*/,
                                                RpcContext* rpc) {
  return server_->Authorize(rpc, ServerBase::SUPER_USER | ServerBase::SERVICE_USER);
}

void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
                                           ConsensusResponsePB* resp,
                                           RpcContext* context) {
  DVLOG(3) << "Received Consensus Update RPC: " << SecureDebugString(*req);
  if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, context)) {
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
                                           &replica)) {
    return;
  }

  // Submit the update directly to the TabletReplica's RaftConsensus instance.
  shared_ptr<RaftConsensus> consensus;
  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
  Status s = consensus->Update(req, resp);
  if (PREDICT_FALSE(!s.ok())) {
    // Clear the response first, since a partially-filled response could
    // result in confusing a caller, or in having missing required fields
    // in embedded optional messages.
    resp->Clear();

    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }
  context->RespondSuccess();
}

void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
                                                VoteResponsePB* resp,
                                                RpcContext* context) {
  DVLOG(3) << "Received Consensus Request Vote RPC: " << SecureDebugString(*req);
  if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, context)) {
    return;
  }

  // Because the last-logged opid is stored in the TabletMetadata we go through
  // the following dance:
  // 1. Get a reference to the currently-registered TabletReplica.
  // 2. Fetch (non-atomically) the current data state and last-logged opid from
  //    the TabletMetadata.
  // 3. If the data state is COPYING or TOMBSTONED, pass the last-logged opid
  // from the TabletMetadata into RaftConsensus::RequestVote().
  //
  // The reason this sequence is safe to do without atomic locks is the
  // RaftConsensus object associated with the TabletReplica will be Shutdown()
  // and thus unable to vote if another TabletCopy operation comes between
  // steps 1 and 3.
  //
  // TODO(mpercy): Move the last-logged opid into ConsensusMetadata to avoid
  // this hacky plumbing. An additional benefit would be that we would be able
  // to easily "tombstoned vote" while the tablet is bootstrapping.

  scoped_refptr<TabletReplica> replica;
  if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
    return;
  }

  optional<OpId> last_logged_opid;
  tablet::TabletDataState data_state = replica->tablet_metadata()->tablet_data_state();

  LOG(INFO) << "Received RequestConsensusVote() RPC: " << SecureShortDebugString(*req);

  // We cannot vote while DELETED. This check is not racy because DELETED is a
  // terminal state; it is not possible to transition out of DELETED.
  if (data_state == TABLET_DATA_DELETED) {
    RespondTabletNotRunning(replica, replica->state(), resp, context);
    return;
  }

  // Attempt to vote while copying or tombstoned.
  if (data_state == TABLET_DATA_COPYING || data_state == TABLET_DATA_TOMBSTONED) {
    last_logged_opid = replica->tablet_metadata()->tombstone_last_logged_opid();
  }

  // Submit the vote request directly to the consensus instance.
  shared_ptr<RaftConsensus> consensus;
  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;

  Status s = consensus->RequestVote(req,
                                    consensus::TabletVotingState(std::move(last_logged_opid),
                                                                 data_state),
                                    resp);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }
  context->RespondSuccess();
}

void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
                                        ChangeConfigResponsePB* resp,
                                        RpcContext* context) {
  VLOG(1) << "Received ChangeConfig RPC: " << SecureDebugString(*req);
  if (!CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, context)) {
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
                                           &replica)) {
    return;
  }

  shared_ptr<RaftConsensus> consensus;
  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
  optional<TabletServerErrorPB::Code> error_code;
  Status s = consensus->ChangeConfig(
      *req, [req, resp, context](const Status& s) {
        HandleResponse(req, resp, context, s);
      },
      &error_code);
  if (PREDICT_FALSE(!s.ok())) {
    HandleErrorResponse(req, resp, context, error_code, s);
    return;
  }
  // The success case is handled when the callback fires.
}

void ConsensusServiceImpl::BulkChangeConfig(const BulkChangeConfigRequestPB* req,
                                            ChangeConfigResponsePB* resp,
                                            RpcContext* context) {
  VLOG(1) << "Received BulkChangeConfig RPC: " << SecureDebugString(*req);
  if (!CheckUuidMatchOrRespond(tablet_manager_, "BulkChangeConfig", req, resp, context)) {
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
                                           &replica)) {
    return;
  }

  shared_ptr<RaftConsensus> consensus;
  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) {
    return;
  }
  optional<TabletServerErrorPB::Code> error_code;
  Status s = consensus->BulkChangeConfig(
      *req, [req, resp, context](const Status& s) {
        HandleResponse(req, resp, context, s);
      },
      &error_code);
  if (PREDICT_FALSE(!s.ok())) {
    HandleErrorResponse(req, resp, context, error_code, s);
    return;
  }
  // The success case is handled when the callback fires.
}

void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* req,
                                              UnsafeChangeConfigResponsePB* resp,
                                              RpcContext* context) {
  LOG(INFO) << "Received UnsafeChangeConfig RPC: " << SecureDebugString(*req)
            << " from " << context->requestor_string();
  if (!CheckUuidMatchOrRespond(tablet_manager_, "UnsafeChangeConfig", req, resp, context)) {
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
                                           &replica)) {
    return;
  }

  shared_ptr<RaftConsensus> consensus;
  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) {
    return;
  }
  optional<TabletServerErrorPB::Code> error_code;
  const Status s = consensus->UnsafeChangeConfig(*req, &error_code);
  if (PREDICT_FALSE(!s.ok())) {
    HandleErrorResponse(req, resp, context, error_code, s);
    return;
  }
  context->RespondSuccess();
}

void ConsensusServiceImpl::GetNodeInstance(const GetNodeInstanceRequestPB* req,
                                           GetNodeInstanceResponsePB* resp,
                                           RpcContext* context) {
  VLOG(1) << "Received Get Node Instance RPC: " << SecureDebugString(*req);
  resp->mutable_node_instance()->CopyFrom(tablet_manager_->NodeInstance());
  context->RespondSuccess();
}

void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* req,
                                             RunLeaderElectionResponsePB* resp,
                                             RpcContext* context) {
  LOG(INFO) << "Received Run Leader Election RPC: " << SecureDebugString(*req)
            << " from " << context->requestor_string();
  if (!CheckUuidMatchOrRespond(tablet_manager_, "RunLeaderElection", req, resp, context)) {
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
                                           &replica)) {
    return;
  }

  shared_ptr<RaftConsensus> consensus;
  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
  Status s = consensus->StartElection(
      consensus::RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
      consensus::RaftConsensus::EXTERNAL_REQUEST);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }
  context->RespondSuccess();
}

void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
                                          LeaderStepDownResponsePB* resp,
                                          RpcContext* context) {
  LOG(INFO) << "Received LeaderStepDown RPC: " << SecureDebugString(*req)
            << " from " << context->requestor_string();
  if (PREDICT_FALSE(!req->new_leader_uuid().empty() &&
                    req->mode() == LeaderStepDownMode::ABRUPT)) {
    Status s = Status::InvalidArgument(
        "cannot specify a new leader uuid for an abrupt step down");
    SetupErrorAndRespond(resp->mutable_error(), s,
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
  }
  if (!CheckUuidMatchOrRespond(tablet_manager_, "LeaderStepDown", req, resp, context)) {
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
                                           &replica)) {
    return;
  }

  shared_ptr<RaftConsensus> consensus;
  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
  switch (req->mode()) {
    case LeaderStepDownMode::ABRUPT:
      HandleResponse(req, resp, context, consensus->StepDown(resp));
      break;
    case LeaderStepDownMode::GRACEFUL: {
      const auto new_leader_uuid = req->new_leader_uuid().empty()
          ? nullopt
          : make_optional(req->new_leader_uuid());
      Status s = consensus->TransferLeadership(new_leader_uuid, resp);
      HandleResponse(req, resp, context, s);
      break;
    }
    default:
      Status s = Status::InvalidArgument(
          Substitute("unknown LeaderStepDown mode: $0", req->mode()));
      HandleUnknownError(s, resp, context);
  }
}

void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req,
                                       consensus::GetLastOpIdResponsePB *resp,
                                       RpcContext *context) {
  DVLOG(3) << "Received GetLastOpId RPC: " << SecureDebugString(*req);
  if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, context)) {
    return;
  }
  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
                                           &replica)) {
    return;
  }

  if (replica->state() != tablet::RUNNING) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::ServiceUnavailable("TabletReplica not in RUNNING state"),
                         TabletServerErrorPB::TABLET_NOT_RUNNING, context);
    return;
  }
  shared_ptr<RaftConsensus> consensus;
  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
  if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) {
    HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"),
                       resp, context);
    return;
  }
  optional<OpId> opid = consensus->GetLastOpId(req->opid_type());
  if (!opid) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::IllegalState("Cannot fetch last OpId in WAL"),
                         TabletServerErrorPB::TABLET_NOT_RUNNING,
                         context);
    return;
  }
  *resp->mutable_opid() = *opid;
  context->RespondSuccess();
}

void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateRequestPB* req,
                                             consensus::GetConsensusStateResponsePB* resp,
                                             RpcContext* context) {
  DVLOG(3) << "Received GetConsensusState RPC: " << SecureDebugString(*req);
  if (!CheckUuidMatchOrRespond(tablet_manager_, "GetConsensusState", req, resp, context)) {
    return;
  }

  unordered_set<string> requested_ids(req->tablet_ids().begin(), req->tablet_ids().end());
  bool all_ids = requested_ids.empty();

  vector<scoped_refptr<TabletReplica>> tablet_replicas;
  tablet_manager_->GetTabletReplicas(&tablet_replicas);
  for (const scoped_refptr<TabletReplica>& replica : tablet_replicas) {
    if (!all_ids && !ContainsKey(requested_ids, replica->tablet_id())) {
      continue;
    }

    shared_ptr<RaftConsensus> consensus(replica->shared_consensus());
    if (!consensus) {
      continue;
    }

    consensus::GetConsensusStateResponsePB_TabletConsensusInfoPB tablet_info;
    Status s = consensus->ConsensusState(tablet_info.mutable_cstate(), req->report_health());
    if (!s.ok()) {
      DCHECK(s.IsIllegalState()) << s.ToString();
      continue;
    }
    tablet_info.set_tablet_id(replica->tablet_id());
    *resp->add_tablets() = std::move(tablet_info);
  }
  const auto scheme = FLAGS_raft_prepare_replacement_before_eviction
      ? consensus::ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION
      : consensus::ReplicaManagementInfoPB::EVICT_FIRST;
  resp->mutable_replica_management_info()->set_replacement_scheme(scheme);

  context->RespondSuccess();
}

void ConsensusServiceImpl::StartTabletCopy(const StartTabletCopyRequestPB* req,
                                           StartTabletCopyResponsePB* resp,
                                           RpcContext* context) {
  if (!CheckUuidMatchOrRespond(tablet_manager_, "StartTabletCopy", req, resp, context)) {
    return;
  }
  auto response_callback = [context, resp](const Status& s, TabletServerErrorPB::Code code) {
    if (s.ok()) {
      context->RespondSuccess();
    } else {
      // Skip calling SetupErrorAndRespond since this path doesn't need the
      // error to be transformed.
      StatusToPB(s, resp->mutable_error()->mutable_status());
      resp->mutable_error()->set_code(code);
      context->RespondNoCache();
    }
  };
  tablet_manager_->StartTabletCopy(req, response_callback);
}

void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
                                         ScannerKeepAliveResponsePB *resp,
                                         RpcContext *context) {
  DCHECK(req->has_scanner_id());
  SharedScanner scanner;
  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
  Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
                                                       context->remote_user().username(),
                                                       &error_code,
                                                       &scanner);
  if (!s.ok()) {
    StatusToPB(s, resp->mutable_error()->mutable_status());
    LOG(INFO) << Substitute("ScannerKeepAlive: $0: remote=$1",
                            s.ToString(), context->requestor_string());
    if (PREDICT_TRUE(s.IsNotFound())) {
      resp->mutable_error()->set_code(error_code);
      StatusToPB(s, resp->mutable_error()->mutable_status());
      context->RespondSuccess();
      return;
    }
    DCHECK(s.IsNotAuthorized());
    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
    return;
  }
  {
    // Locking for access has the side effect of updating the access time.
    // Here we do a trylock -- a failure indicates that there is already another
    // thread currently accessing the scanner, so that thread will update the
    // access time upon release of the lock.
    auto lock = scanner->TryLockForAccess();
  }

  context->RespondSuccess();
}

namespace {
void SetResourceMetrics(const RpcContext* context,
                        const CpuTimes* cpu_times,
                        ResourceMetricsPB* metrics) {
  metrics->set_cfile_cache_miss_bytes(
    context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_MISS_BYTES_METRIC_NAME));
  metrics->set_cfile_cache_hit_bytes(
    context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_HIT_BYTES_METRIC_NAME));

  metrics->set_bytes_read(
    context->trace()->metrics()->GetMetric(SCANNER_BYTES_READ_METRIC_NAME));

  rpc::InboundCallTiming timing;
  timing.time_handled = context->GetTimeHandled();
  timing.time_received = context->GetTimeReceived();
  timing.time_completed = MonoTime::Now();

  metrics->set_queue_duration_nanos(timing.QueueDuration().ToNanoseconds());
  metrics->set_total_duration_nanos(timing.TotalDuration().ToNanoseconds());
  metrics->set_cpu_system_nanos(cpu_times->system);
  metrics->set_cpu_user_nanos(cpu_times->user);
}
} // anonymous namespace

void TabletServiceImpl::Scan(const ScanRequestPB* req,
                             ScanResponsePB* resp,
                             RpcContext* context) {
  TRACE_EVENT0("tserver", "TabletServiceImpl::Scan");

  // Validate the request: user must pass a new_scan_request or
  // a scanner ID, but not both.
  if (PREDICT_FALSE(req->has_scanner_id() &&
                    req->has_new_scan_request())) {
    context->RespondFailure(Status::InvalidArgument(
                            "Must not pass both a scanner_id and new_scan_request"));
    return;
  }

  // If this is a new scan request, we must enforce the appropriate privileges.
  TokenPB token;
  if (FLAGS_tserver_enforce_access_control && req->has_new_scan_request()) {
    const auto& scan_pb = req->new_scan_request();
    if (!VerifyAuthzTokenOrRespond(server_->token_verifier(),
                                   req->new_scan_request(), context, &token)) {
      return;
    }
    scoped_refptr<TabletReplica> replica;
    if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(),
        req->new_scan_request().tablet_id(), resp, context, &replica)) {
      return;
    }
    const auto& privilege = token.authz().table_privilege();
    if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(),
                                       "Scan", context)) {
      return;
    }
    unordered_set<ColumnId> authorized_column_ids;
    if (!CheckMayHaveScanPrivilegesOrRespond(privilege, "Scan", &authorized_column_ids, context)) {
      return;
    }
    // If the token doesn't have full scan privileges for the table, check
    // for required privileges based on the scan request.
    if (!privilege.scan_privilege()) {
      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
      if (!CheckScanPrivilegesOrRespond(scan_pb, *schema_ptr, authorized_column_ids,
                                        "Scan", context)) {
        return;
      }
    }
  }

  ScanResultCopier collector(GetMaxBatchSizeBytesHint(req));

  bool has_more_results = false;
  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
  if (req->has_new_scan_request()) {
    if (!CheckTabletServerNotQuiescingOrRespond(server_, resp, context)) {
      return;
    }
    const NewScanRequestPB& scan_pb = req->new_scan_request();
    scoped_refptr<TabletReplica> replica;
    if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), scan_pb.tablet_id(), resp,
                                             context, &replica)) {
      return;
    }
    string scanner_id;
    Timestamp scan_timestamp;
    Status s = HandleNewScanRequest(replica.get(), req, context,
                                    &collector, &scanner_id, &scan_timestamp, &has_more_results,
                                    &error_code);
    if (PREDICT_FALSE(!s.ok())) {
      SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
      return;
    }

    // Only set the scanner id if we have more results.
    if (has_more_results) {
      resp->set_scanner_id(scanner_id);
    }
    if (scan_timestamp != Timestamp::kInvalidTimestamp) {
      resp->set_snap_timestamp(scan_timestamp.ToUint64());
    }
  } else if (req->has_scanner_id()) {
    Status s = HandleContinueScanRequest(req, context, &collector, &has_more_results, &error_code);
    if (PREDICT_FALSE(!s.ok())) {
      SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
      return;
    }
  } else {
    context->RespondFailure(Status::InvalidArgument(
                              "Must pass either a scanner_id or new_scan_request"));
    return;
  }

  collector.SetupResponse(context, resp);
  resp->set_has_more_results(has_more_results);
  resp->set_propagated_timestamp(server_->clock()->Now().ToUint64());

  SetResourceMetrics(context, collector.cpu_times(), resp->mutable_resource_metrics());
  context->RespondSuccess();
}

void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
                                    ListTabletsResponsePB* resp,
                                    RpcContext* context) {
  vector<scoped_refptr<TabletReplica>> replicas;
  server_->tablet_manager()->GetTabletReplicas(&replicas);
  RepeatedPtrField<StatusAndSchemaPB>* replica_status = resp->mutable_status_and_schema();
  for (const scoped_refptr<TabletReplica>& replica : replicas) {
    StatusAndSchemaPB* status = replica_status->Add();
    replica->GetTabletStatusPB(status->mutable_tablet_status());
    if (status->tablet_status().state() == tablet::RUNNING) {
      status->set_role(replica->consensus()->role());
    } else {
      status->set_role(RaftPeerPB::UNKNOWN_ROLE);
    }

    if (req->need_schema_info()) {
      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
      const Schema& tablet_schema = *schema_ptr;
      CHECK_OK(SchemaToPB(tablet_schema, status->mutable_schema()));
      CHECK_OK(replica->tablet_metadata()->partition_schema().ToPB(
          tablet_schema, status->mutable_partition_schema()));
      status->set_schema_version(replica->tablet_metadata()->schema_version());
    }
  }
  context->RespondSuccess();
}

void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
                                      SplitKeyRangeResponsePB* resp,
                                      RpcContext* context) {
  TRACE_EVENT1("tserver", "TabletServiceImpl::SplitKeyRange",
               "tablet_id", req->tablet_id());
  DVLOG(3) << "Received SplitKeyRange RPC: " << SecureDebugString(*req);

  scoped_refptr<TabletReplica> replica;
  if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
                                           context, &replica)) {
    return;
  }

  if (FLAGS_tserver_enforce_access_control) {
    TokenPB token;
    if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) {
      return;
    }
    const auto& privilege = token.authz().table_privilege();
    if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(),
                                       "SplitKeyRange", context)) {
      return;
    }
    // Split-key requests require:
    //   if uses pk (e.g. primary key fields set):
    //     foreach(primary key column): SCAN ON COLUMN
    //   foreach(requested column): SCAN ON COLUMN
    //
    // If the privilege doesn't have full scan privileges, or column-level scan
    // privileges, the user is definitely not authorized to perform a scan.
    unordered_set<ColumnId> authorized_column_ids;
    if (!CheckMayHaveScanPrivilegesOrRespond(privilege, "SplitKeyRange",
                                             &authorized_column_ids, context)) {
      return;
    }
    if (!privilege.scan_privilege()) {
      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
      const Schema& schema = *schema_ptr;
      unordered_set<ColumnId> required_column_privileges;
      if (req->has_start_primary_key() || req->has_stop_primary_key()) {
        const auto& key_cols = schema.get_key_column_ids();
        required_column_privileges.insert(key_cols.begin(), key_cols.end());
      }
      bool is_authorized = true;
      const string rejection_prefix = Substitute("rejecting SplitKeyRange request from $0",
                                                 context->requestor_string());
      for (int i = 0; i < req->columns_size(); i++) {
        const auto& column_name = req->columns(i).name();
        int col_idx = schema.find_column(req->columns(i).name());
        if (col_idx == Schema::kColumnNotFound) {
          LOG(WARNING) << Substitute("$0: no column named '$1'", rejection_prefix, column_name);
          is_authorized = false;
          break;
        }
        EmplaceIfNotPresent(&required_column_privileges, schema.column_id(col_idx));
      }
      for (const auto& required_col_id : required_column_privileges) {
        if (!ContainsKey(authorized_column_ids, required_col_id)) {
          LOG(WARNING) << Substitute("$0: authz token doesn't authorize column ID $1",
                                     rejection_prefix, required_col_id);
          is_authorized = false;
          break;
        }
      }
      if (!is_authorized) {
        context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED,
            Status::NotAuthorized("not authorized to SplitKeyRange"));
        return;
      }
    }
  }

  shared_ptr<Tablet> tablet;
  TabletServerErrorPB::Code error_code;
  Status s = GetTabletRef(replica, &tablet, &error_code);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
    return;
  }

  // Decode encoded key
  Arena arena(256);
  const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
  const Schema& tablet_schema = *tablet_schema_ptr;
  EncodedKey* start = nullptr;
  EncodedKey* stop = nullptr;
  if (req->has_start_primary_key()) {
    s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->start_primary_key(), &start);
    if (PREDICT_FALSE(!s.ok())) {
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::InvalidArgument("Invalid SplitKeyRange start primary key"),
                           TabletServerErrorPB::UNKNOWN_ERROR,
                           context);
      return;
    }
  }
  if (req->has_stop_primary_key()) {
    s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->stop_primary_key(), &stop);
    if (PREDICT_FALSE(!s.ok())) {
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::InvalidArgument("Invalid SplitKeyRange stop primary key"),
                           TabletServerErrorPB::UNKNOWN_ERROR,
                           context);
      return;
    }
  }
  if (req->has_start_primary_key() && req->has_stop_primary_key()) {
    // Validate the start key is less than the stop key, if they are both set
    if (start->encoded_key() > stop->encoded_key()) {
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::InvalidArgument("Invalid primary key range"),
                           TabletServerErrorPB::UNKNOWN_ERROR,
                           context);
      return;
    }
  }

  // Validate the column are valid
  Schema schema;
  s = ColumnPBsToSchema(req->columns(), &schema);
  if (PREDICT_FALSE(!s.ok())) {
    SetupErrorAndRespond(resp->mutable_error(),
                         s,
                         TabletServerErrorPB::INVALID_SCHEMA,
                         context);
    return;
  }
  if (schema.has_column_ids()) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::InvalidArgument("User requests should not have Column IDs"),
                         TabletServerErrorPB::INVALID_SCHEMA,
                         context);
    return;
  }

  vector<ColumnId> column_ids;
  for (const ColumnSchema& column : schema.columns()) {
    int column_idx = tablet_schema.find_column(column.name());
    if (PREDICT_FALSE(column_idx == Schema::kColumnNotFound)) {
      SetupErrorAndRespond(resp->mutable_error(),
                           Status::InvalidArgument(
                               "Invalid SplitKeyRange column name", column.name()),
                           TabletServerErrorPB::INVALID_SCHEMA,
                           context);
      return;
    }
    column_ids.emplace_back(tablet_schema.column_id(column_idx));
  }

  // Validate the target chunk size are valid
  if (req->target_chunk_size_bytes() == 0) {
    SetupErrorAndRespond(resp->mutable_error(),
                         Status::InvalidArgument("Invalid SplitKeyRange target chunk size"),
                         TabletServerErrorPB::UNKNOWN_ERROR,
                         context);
    return;
  }

  vector<KeyRange> ranges;
  tablet->SplitKeyRange(start, stop, column_ids, req->target_chunk_size_bytes(), &ranges);
  for (const auto& range : ranges) {
    range.ToPB(resp->add_ranges());
  }

  context->RespondSuccess();
}

void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
                                 ChecksumResponsePB* resp,
                                 RpcContext* context) {
  VLOG(1) << "Full request: " << SecureDebugString(*req);

  // Validate the request: user must pass a new_scan_request or
  // a scanner ID, but not both.
  if (PREDICT_FALSE(req->has_new_request() &&
                    req->has_continue_request())) {
    context->RespondFailure(Status::InvalidArgument(
                            "Must not pass both a scanner_id and new_scan_request"));
    return;
  }

  // Convert ChecksumRequestPB to a ScanRequestPB.
  ScanRequestPB scan_req;
  if (req->has_call_seq_id()) scan_req.set_call_seq_id(req->call_seq_id());
  if (req->has_batch_size_bytes()) scan_req.set_batch_size_bytes(req->batch_size_bytes());
  if (req->has_close_scanner()) scan_req.set_close_scanner(req->close_scanner());

  ScanResultChecksummer collector;
  bool has_more = false;
  TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
  // TODO(KUDU-2870): the CLI tool doesn't currently fetch authz tokens when
  // checksumming. Until it does, allow the super-user to avoid fine-grained
  // privilege checking.
  if (FLAGS_tserver_enforce_access_control &&
      !server_->IsFromSuperUser(context) &&
      req->has_new_request()) {
    const NewScanRequestPB& new_req = req->new_request();
    TokenPB token;
    if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), req->new_request(),
                                    context, &token)) {
      return;
    }
    scoped_refptr<TabletReplica> replica;
    if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp,
                                             context, &replica)) {
      return;
    }
    const auto& privilege = token.authz().table_privilege();
    if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(),
                                       "Checksum", context)) {
      return;
    }
    unordered_set<ColumnId> authorized_column_ids;
    if (!CheckMayHaveScanPrivilegesOrRespond(privilege, "Checksum",
                                             &authorized_column_ids, context)) {
      return;
    }
    // If the token doesn't have full scan privileges for the table, check
    // for required privileges based on the checksum request.
    if (!privilege.scan_privilege()) {
      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
      if (!CheckScanPrivilegesOrRespond(new_req, *schema_ptr, authorized_column_ids,
                                        "Checksum", context)) {
        return;
      }
    }
  }
  if (req->has_new_request()) {
    if (!CheckTabletServerNotQuiescingOrRespond(server_, resp, context)) {
      return;
    }
    const NewScanRequestPB& new_req = req->new_request();
    scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
    scoped_refptr<TabletReplica> replica;
    if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp,
                                             context, &replica)) {
      return;
    }

    string scanner_id;
    Timestamp snap_timestamp;
    Status s = HandleNewScanRequest(replica.get(), &scan_req, context,
                                    &collector, &scanner_id, &snap_timestamp, &has_more,
                                    &error_code);
    if (PREDICT_FALSE(!s.ok())) {
      SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
      return;
    }
    resp->set_scanner_id(scanner_id);
    if (snap_timestamp != Timestamp::kInvalidTimestamp) {
      resp->set_snap_timestamp(snap_timestamp.ToUint64());
    }
  } else if (req->has_continue_request()) {
    const ContinueChecksumRequestPB& continue_req = req->continue_request();
    collector.set_agg_checksum(continue_req.previous_checksum());
    scan_req.set_scanner_id(continue_req.scanner_id());
    Status s = HandleContinueScanRequest(&scan_req, context, &collector, &has_more, &error_code);
    if (PREDICT_FALSE(!s.ok())) {
      SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
      return;
    }
  } else {
    context->RespondFailure(Status::InvalidArgument(
                            "Must pass either new_request or continue_request"));
    return;
  }

  resp->set_checksum(collector.agg_checksum());
  resp->set_has_more_results(has_more);
  resp->set_rows_checksummed(collector.rows_checksummed());

  SetResourceMetrics(context, collector.cpu_times(), resp->mutable_resource_metrics());
  context->RespondSuccess();
}

bool TabletServiceImpl::SupportsFeature(uint32_t feature) const {
  switch (feature) {
    case TabletServerFeatures::COLUMN_PREDICATES:
    case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
    case TabletServerFeatures::QUIESCING:
    case TabletServerFeatures::BLOOM_FILTER_PREDICATE_V2:
    case TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE:
      return true;
    default:
      return false;
  }
}

void TabletServiceImpl::Shutdown() {
}

// Extract a void* pointer suitable for use in a ColumnRangePredicate from the
// user-specified protobuf field.
// This validates that the pb_value has the correct length, copies the data into
// 'arena', and sets *result to point to it.
// Returns bad status if the user-specified value is the wrong length.
static Status ExtractPredicateValue(const ColumnSchema& schema,
                                    const string& pb_value,
                                    Arena* arena,
                                    const void** result) {
  // Copy the data from the protobuf into the Arena.
  uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(pb_value.size()));
  memcpy(data_copy, &pb_value[0], pb_value.size());

  // If the type is of variable length, then we need to return a pointer to a Slice
  // element pointing to the string. Otherwise, just verify that the provided
  // value was the right size.
  if (schema.type_info()->physical_type() == BINARY) {
    *result = arena->NewObject<Slice>(data_copy, pb_value.size());
  } else {
    // TODO: add test case for this invalid request
    size_t expected_size = schema.type_info()->size();
    if (pb_value.size() != expected_size) {
      return Status::InvalidArgument(
        StringPrintf("Bad predicate on %s. Expected value size %zd, got %zd",
                     schema.ToString().c_str(), expected_size, pb_value.size()));
    }
    *result = data_copy;
  }

  return Status::OK();
}

static Status DecodeEncodedKeyRange(const NewScanRequestPB& scan_pb,
                                    const Schema& tablet_schema,
                                    const SharedScanner& scanner,
                                    ScanSpec* spec) {
  EncodedKey* start = nullptr;  // Arena allocated.
  EncodedKey* stop = nullptr;   // Arena allocated.
  if (scan_pb.has_start_primary_key()) {
    RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString(
                            tablet_schema, scanner->arena(),
                            scan_pb.start_primary_key(), &start),
                          "Invalid scan start key");
  }

  if (scan_pb.has_stop_primary_key()) {
    RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString(
                            tablet_schema, scanner->arena(),
                            scan_pb.stop_primary_key(), &stop),
                          "Invalid scan stop key");
  }

  if (scan_pb.order_mode() == ORDERED && scan_pb.has_last_primary_key()) {
    if (start) {
      return Status::InvalidArgument("Cannot specify both a start key and a last key");
    }
    // Set the start key to the last key from a previous scan result.
    RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString(tablet_schema, scanner->arena(),
                                                          scan_pb.last_primary_key(), &start),
                          "Failed to decode last primary key");
    // Increment the start key, so we don't return the last row again.
    RETURN_NOT_OK_PREPEND(EncodedKey::IncrementEncodedKey(tablet_schema, &start, scanner->arena()),
                          "Failed to increment encoded last row key");
  }

  if (start) {
    spec->SetLowerBoundKey(start);
  }
  if (stop) {
    spec->SetExclusiveUpperBoundKey(stop);
  }

  return Status::OK();
}

static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
                            const Schema& tablet_schema,
                            const SharedScanner& scanner,
                            ScanSpec* spec) {
  spec->set_cache_blocks(scan_pb.cache_blocks());

  // First the column predicates.
  for (const ColumnPredicatePB& pred_pb : scan_pb.column_predicates()) {
    optional<ColumnPredicate> predicate;
    RETURN_NOT_OK(ColumnPredicateFromPB(tablet_schema, scanner->arena(), pred_pb, &predicate));
    spec->AddPredicate(std::move(*predicate));
  }

  // Then the column range predicates.
  // TODO: remove this once all clients have moved to ColumnPredicatePB and
  // backwards compatibility can be broken.
  for (const ColumnRangePredicatePB& pred_pb : scan_pb.deprecated_range_predicates()) {
    if (!pred_pb.has_lower_bound() && !pred_pb.has_inclusive_upper_bound()) {
      return Status::InvalidArgument(
        string("Invalid predicate ") + SecureShortDebugString(pred_pb) +
        ": has no lower or upper bound.");
    }
    optional<ColumnSchema> col;
    RETURN_NOT_OK(ColumnSchemaFromPB(pred_pb.column(), &col));

    const void* lower_bound = nullptr;
    const void* upper_bound = nullptr;
    if (pred_pb.has_lower_bound()) {
      RETURN_NOT_OK(ExtractPredicateValue(*col, pred_pb.lower_bound(),
                                          scanner->arena(),
                                          &lower_bound));
    }
    if (pred_pb.has_inclusive_upper_bound()) {
      RETURN_NOT_OK(ExtractPredicateValue(*col, pred_pb.inclusive_upper_bound(),
                                          scanner->arena(),
                                          &upper_bound));
    }

    auto pred = ColumnPredicate::InclusiveRange(*col, lower_bound, upper_bound, scanner->arena());
    if (pred) {
      VLOG(3) << Substitute("Parsed predicate $0 from $1",
                            pred->ToString(), SecureShortDebugString(scan_pb));
      spec->AddPredicate(*pred);
    }
  }

  // Then any encoded key range predicates.
  RETURN_NOT_OK(DecodeEncodedKeyRange(scan_pb, tablet_schema, scanner, spec));

  // If the scanner has a limit, set it now.
  if (scan_pb.has_limit()) {
    spec->set_limit(scan_pb.limit());
  }

  return Status::OK();
}

namespace {
// Checks if 'timestamp' is before the tablet's AHM if this is a
// READ_AT_SNAPSHOT/READ_YOUR_WRITES scan. Returns Status::OK() if it's
// not or Status::InvalidArgument() if it is.
Status VerifyNotAncientHistory(Tablet* tablet, ReadMode read_mode, Timestamp timestamp,
                               const string& timestamp_desc) {
  tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
  if ((read_mode == READ_AT_SNAPSHOT || read_mode == READ_YOUR_WRITES) &&
      history_gc_opts.IsAncientHistory(timestamp)) {
    return Status::InvalidArgument(
        Substitute("$0 is earlier than the ancient history mark. Consider "
                   "increasing the value of the configuration parameter "
                   "--tablet_history_max_age_sec. Snapshot timestamp: $1 "
                   "Ancient History Mark: $2 Physical time difference: $3",
                   timestamp_desc,
                   tablet->clock()->Stringify(timestamp),
                   tablet->clock()->Stringify(history_gc_opts.ancient_history_mark()),
                   tablet->clock()->GetPhysicalComponentDifference(
                       timestamp, history_gc_opts.ancient_history_mark()).ToString()));
  }
  return Status::OK();
}

// Verify that the start (if specified) and end snapshot timestamps are legal
// to read by checking against the ancient history mark and ensuring that the
// start timestamp is earlier than the end timestamp.
Status VerifyLegalSnapshotTimestamps(Tablet* tablet, ReadMode read_mode,
                                     const optional<Timestamp>& snap_start_timestamp,
                                     const Timestamp& snap_end_timestamp) {
  RETURN_NOT_OK(VerifyNotAncientHistory(tablet, read_mode, snap_end_timestamp,
                                        "snapshot scan end timestamp"));
  if (snap_start_timestamp) {
    // Validate diff scan start timestamp, if set.
    RETURN_NOT_OK(VerifyNotAncientHistory(tablet, read_mode, *snap_start_timestamp,
                                          "snapshot scan start timestamp"));
    if (snap_start_timestamp->CompareTo(snap_end_timestamp) > 0) {
      return Status::InvalidArgument(
          Substitute("start timestamp ($0) must be less than or equal to end timestamp ($1)",
                     tablet->clock()->Stringify(*snap_start_timestamp),
                     tablet->clock()->Stringify(snap_end_timestamp)));
    }
  }
  return Status::OK();
}
} // anonymous namespace

// Start a new scan.
Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
                                               const ScanRequestPB* req,
                                               const RpcContext* rpc_context,
                                               ScanResultCollector* result_collector,
                                               string* scanner_id,
                                               Timestamp* snap_timestamp,
                                               bool* has_more_results,
                                               TabletServerErrorPB::Code* error_code) {
  DCHECK(result_collector != nullptr);
  DCHECK(error_code != nullptr);
  DCHECK(req->has_new_scan_request());
  const NewScanRequestPB& scan_pb = req->new_scan_request();
  TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest",
               "tablet_id", scan_pb.tablet_id());

  SharedScanner scanner;
  server_->scanner_manager()->NewScanner(replica,
                                         rpc_context->remote_user(),
                                         scan_pb.row_format_flags(),
                                         &scanner);
  TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
  auto scanner_lock = scanner->LockForAccess();

  // If we early-exit out of this function, automatically unregister
  // the scanner.
  ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
  ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times());

  // Create the user's requested projection.
  // TODO(todd): Add test cases for bad projections including 0 columns.
  Schema projection;
  Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
  if (PREDICT_FALSE(!s.ok())) {
    *error_code = TabletServerErrorPB::INVALID_SCHEMA;
    return s;
  }

  if (projection.has_column_ids()) {
    *error_code = TabletServerErrorPB::INVALID_SCHEMA;
    return Status::InvalidArgument("User requests should not have Column IDs");
  }

  if (scan_pb.order_mode() == UNKNOWN_ORDER_MODE) {
    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
    return Status::InvalidArgument("Unknown order mode specified");
  }

  if (scan_pb.order_mode() == ORDERED) {
    // Ordered scans must be at a snapshot so that we perform a serializable read (which can be
    // resumed). Otherwise, this would be read committed isolation, which is not resumable.
    if (scan_pb.read_mode() != READ_AT_SNAPSHOT) {
      *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
          return Status::InvalidArgument("Cannot do an ordered scan that is not a snapshot read");
    }
  }

  const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
  const Schema& tablet_schema = *tablet_schema_ptr;

  ScanSpec spec;
  s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec);
  if (PREDICT_FALSE(!s.ok())) {
    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
    return s;
  }

  VLOG(3) << "Before optimizing scan spec: " << spec.ToString(tablet_schema);
  spec.PruneInlistValuesIfPossible(tablet_schema,
                                   replica->tablet_metadata()->partition(),
                                   replica->tablet_metadata()->partition_schema());
  spec.OptimizeScan(tablet_schema, scanner->arena(), true);
  VLOG(3) << "After optimizing scan spec: " << spec.ToString(tablet_schema);

  // Missing columns will contain the columns that are not mentioned in the
  // client projection but are actually needed for the scan, such as columns
  // referred to by predicates.
  //
  // NOTE: We should build the missing column after optimizing scan which will
  // remove unnecessary predicates.
  vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);

  // Build a new projection with the projection columns and the missing columns,
  // annotating each column as a key column appropriately.
  //
  // Note: the projection is a consistent schema (i.e. no duplicate columns).
  // However, it has some different semantics as compared to the tablet schema:
  // - It might not contain all of the columns in the tablet.
  // - It might contain extra columns not found in the tablet schema. Virtual
  //   columns are permitted, while others will cause the scan to fail later,
  //   when the tablet validates the projection.
  // - It doesn't know which of its columns are key columns. That's fine for
  //   an UNORDERED scan, but we'll need to fix this for an ORDERED scan, which
  //   requires all key columns in tablet schema order.
  SchemaBuilder projection_builder;
  if (scan_pb.order_mode() == ORDERED) {
    for (int i = 0; i < tablet_schema.num_key_columns(); i++) {
      const auto& col = tablet_schema.column(i);
      // CHECK_OK is safe because the tablet schema has no duplicate columns.
      CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ true));
    }
    for (int i = 0; i < projection.num_columns(); i++) {
      const auto& col = projection.column(i);
      // Any key columns in the projection will be ignored.
      ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
    }
    for (const ColumnSchema& col : missing_cols) {
      // Any key columns in 'missing_cols' will be ignored.
      ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
    }
  } else {
    projection_builder.Reset(projection);
    for (const ColumnSchema& col : missing_cols) {
      // CHECK_OK is safe because the builder's columns (from the projection)
      // and the missing columns are disjoint sets.
      //
      // UNORDERED scans don't need to know which columns are part of the key.
      CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ false));
    }
  }

  // Store the client's specified projection, prior to adding any missing
  // columns for predicates, etc.
  unique_ptr<Schema> client_projection(new Schema(std::move(projection)));
  projection = projection_builder.BuildWithoutIds();
  VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO);

  s = result_collector->InitSerializer(scan_pb.row_format_flags(),
                                       projection,
                                       *client_projection);
  if (!s.ok()) {
    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
    return s;
  }

  if (spec.CanShortCircuit()) {
    VLOG(1) << "short-circuiting without creating a server-side scanner.";
    *has_more_results = false;
    return Status::OK();
  }

  // It's important to keep the reference to the tablet for the case when the
  // tablet replica's shutdown is run concurrently with the code below.
  shared_ptr<Tablet> tablet;
  RETURN_NOT_OK(GetTabletRef(replica, &tablet, error_code));

  // Ensure the tablet has a valid clean time.
  s = tablet->mvcc_manager()->CheckIsCleanTimeInitialized();
  if (!s.ok()) {
    LOG(WARNING) << Substitute("Rejecting scan request for tablet $0: $1",
                               tablet->tablet_id(), s.ToString());
    // Return TABLET_NOT_RUNNING so the scan can be handled appropriately (fail
    // over to another tablet server if fault-tolerant).
    *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
    return s;
  }

  unique_ptr<RowwiseIterator> iter;
  optional<Timestamp> snap_start_timestamp;

  {
    TRACE("Creating iterator");
    TRACE_EVENT0("tserver", "Create iterator");

    switch (scan_pb.read_mode()) {
      case UNKNOWN_READ_MODE: {
        *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
        return Status::NotSupported("Unknown read mode.");
      }
      case READ_LATEST: {
        if (scan_pb.has_snap_start_timestamp()) {
          *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
          return Status::InvalidArgument("scan start timestamp is only supported "
                                         "in READ_AT_SNAPSHOT read mode");
        }
        s = tablet->NewRowIterator(projection, &iter);
        break;
      }
      case READ_YOUR_WRITES: // Fallthrough intended
      case READ_AT_SNAPSHOT: {
        s = HandleScanAtSnapshot(
            scan_pb, rpc_context, projection, tablet.get(), replica->time_manager(),
            &iter, &snap_start_timestamp, snap_timestamp, error_code);
        break;
      }
    }
    TRACE("Iterator created");
  }

  // Make a copy of the optimized spec before it's passed to the iterator.
  // This copy will be given to the Scanner so it can report its predicates to
  // /scans. The copy is necessary because the original spec will be modified
  // as its predicates are pushed into lower-level iterators.
  unique_ptr<ScanSpec> orig_spec(new ScanSpec(spec));

  if (PREDICT_TRUE(s.ok())) {
    TRACE_EVENT0("tserver", "iter->Init");
    s = iter->Init(&spec);
    if (PREDICT_FALSE(s.IsInvalidArgument())) {
      // Tablet::Iterator::Init() returns InvalidArgument when an invalid
      // projection is specified.
      // TODO(todd): would be nice if we threaded these more specific
      // error codes throughout Kudu.
      *error_code = TabletServerErrorPB::MISMATCHED_SCHEMA;
      return s;
    }
  }

  TRACE("Iterator init: $0", s.ToString());

  if (PREDICT_FALSE(!s.ok())) {
    LOG(WARNING) << Substitute(
        "error setting up scanner $0 with request: $1: $2",
        scanner->id(), s.ToString(), SecureShortDebugString(*req));
    // If the replica has been stopped, e.g. due to disk failure, return
    // TABLET_FAILED so the scan can be handled appropriately (fail over to
    // another tablet server if fault-tolerant).
    if (tablet->HasBeenStopped()) {
      *error_code = TabletServerErrorPB::TABLET_FAILED;
    }
    return s;
  }

  // If this is a snapshot scan and the user specified a specific timestamp to
  // scan at, then check that we are not attempting to scan at a time earlier
  // than the ancient history mark. Only perform this check if tablet history
  // GC is enabled.
  //
  // TODO: This validation essentially prohibits scans with READ_AT_SNAPSHOT
  // when history_max_age is set to zero. There is a tablet history GC related
  // race when the history max age is set to very low, or zero. Imagine a case
  // where a scan was started and READ_AT_SNAPSHOT was specified without
  // specifying a snapshot timestamp, and --tablet_history_max_age_sec=0. The
  // above code path will select the latest timestamp (under a lock) prior to
  // calling RowIterator::Init(), which actually opens the blocks. That means
  // that there is an opportunity in between those two calls for tablet history
  // GC to kick in and delete some history. In fact, we may easily not actually
  // end up with a valid snapshot in that case. It would be more correct to
  // initialize the row iterator and then select the latest timestamp
  // represented by those open files in that case.
  //
  // Now that we have initialized our row iterator at a snapshot, return an
  // error if the snapshot timestamp was prior to the ancient history mark.
  // We have to check after we open the iterator in order to avoid a TOCTOU
  // error since it's possible that initializing the row iterator could race
  // against the tablet history GC maintenance task.
  RETURN_NOT_OK_EVAL(VerifyLegalSnapshotTimestamps(tablet.get(), scan_pb.read_mode(),
                                                   snap_start_timestamp, *snap_timestamp),
                     *error_code = TabletServerErrorPB::INVALID_SNAPSHOT);

  *has_more_results = iter->HasNext() && !scanner->has_fulfilled_limit();
  TRACE("has_more: $0", *has_more_results);
  if (!*has_more_results) {
    // If there are no more rows, we can short circuit some work and respond immediately.
    VLOG(1) << "No more rows, short-circuiting out without creating a server-side scanner.";
    return Status::OK();
  }

  scanner->Init(std::move(iter), std::move(orig_spec), std::move(client_projection));

  // Stop the scanner timer because ContinueScanRequest starts its own timer.
  scanner_timer.Stop();
  unreg_scanner.Cancel();
  *scanner_id = scanner->id();

  VLOG(1) << "Started scanner " << scanner->id() << ": " << scanner->iter()->ToString();

  if (GetMaxBatchSizeBytesHint(req) > 0) {
    TRACE("Continuing scan request");
    // TODO(wdberkeley): Instead of copying the pb, instead split
    // HandleContinueScanRequest and call the second half directly. Once that's
    // done, remove the call to ScopedAddScannerTiming::Stop() above (and the
    // method as it won't be used) and start the timing for continue requests
    // from the first half that is no longer executed in this codepath.
    ScanRequestPB continue_req(*req);
    continue_req.set_scanner_id(scanner->id());
    scanner_lock.Unlock();
    return HandleContinueScanRequest(
        &continue_req, rpc_context, result_collector, has_more_results, error_code);
  }

  // Increment the scanner call sequence ID. HandleContinueScanRequest handles
  // this in the non-empty scan case.
  scanner->IncrementCallSeqId();
  return Status::OK();
}

// Continue an existing scan request.
Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
                                                    const RpcContext* rpc_context,
                                                    ScanResultCollector* result_collector,
                                                    bool* has_more_results,
                                                    TabletServerErrorPB::Code* error_code) {
  DCHECK(req->has_scanner_id());
  TRACE_EVENT1("tserver", "TabletServiceImpl::HandleContinueScanRequest",
               "scanner_id", req->scanner_id());

  size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);

  SharedScanner scanner;
  TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
  Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
                                                       rpc_context->remote_user().username(),
                                                       &code,
                                                       &scanner);
  if (!s.ok()) {
    if (s.IsNotFound() && batch_size_bytes == 0 && req->close_scanner()) {
      // Silently ignore any request to close a non-existent scanner.
      return Status::OK();
    }
    LOG(INFO) << Substitute("Scan: $0: call sequence id=$1, remote=$2",
                            s.ToString(), req->call_seq_id(), rpc_context->requestor_string());
    *error_code = code;
    return s;
  }
  // TODO(todd) consider TryLockForAccess and return ServiceUnavailable in the case that
  // another thread is already using the scanner? This should be rare in real
  // circumstances -- only relevant when a client performs some retries on timeout.
  auto scanner_lock = scanner->LockForAccess();

  if (PREDICT_FALSE(FLAGS_scanner_inject_service_unavailable_on_continue_scan)) {
    return Status::ServiceUnavailable("Injecting service unavailable status on Scan due to "
                                      "--scanner_inject_service_unavailable_on_continue_scan");
  }

  // If we early-exit out of this function, automatically unregister the scanner.
  ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
  ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times());

  VLOG(2) << "Found existing scanner " << scanner->id() << " for request: "
          << SecureShortDebugString(*req);
  TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());

  if (batch_size_bytes == 0 && req->close_scanner()) {
    *has_more_results = false;
    return Status::OK();
  }

  if (req->call_seq_id() != scanner->call_seq_id()) {
    *error_code = TabletServerErrorPB::INVALID_SCAN_CALL_SEQ_ID;
    if (!FLAGS_scanner_unregister_on_invalid_seq_id) {
      unreg_scanner.Cancel();
    }
    return Status::InvalidArgument("Invalid call sequence ID in scan request");
  }
  scanner->IncrementCallSeqId();

  RowwiseIterator* iter = scanner->iter();

  // Set the row format flags on the ScanResultCollector.
  s = result_collector->InitSerializer(scanner->row_format_flags(),
                                       iter->schema(),
                                       *scanner->client_projection_schema());
  if (!s.ok()) {
    *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
    return s;
  }

  // TODO(todd): could size the RowBlock based on the user's requested batch size?
  // If people had really large indirect objects, we would currently overshoot
  // their requested batch size by a lot.
  RowBlockMemory mem(32 * 1024);
  RowBlock block(&iter->schema(), FLAGS_scanner_batch_size_rows, &mem);

  // TODO(todd): in the future, use the client timeout to set a budget. For now,
  // just use a half second, which should be plenty to amortize call overhead.
  int budget_ms = 500;
  MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(budget_ms);

  int64_t rows_scanned = 0;
  while (iter->HasNext() && !scanner->has_fulfilled_limit()) {
    if (PREDICT_FALSE(FLAGS_scanner_inject_latency_on_each_batch_ms > 0)) {
      SleepFor(MonoDelta::FromMilliseconds(FLAGS_scanner_inject_latency_on_each_batch_ms));
    }

    Status s = iter->NextBlock(&block);
    if (PREDICT_FALSE(!s.ok())) {
      LOG(WARNING) << "Copying rows from internal iterator for request "
                   << SecureShortDebugString(*req);
      *error_code = TabletServerErrorPB::UNKNOWN_ERROR;
      return s;
    }

    if (PREDICT_TRUE(block.nrows() > 0)) {
      // Count the number of rows scanned, regardless of predicates or deletions.
      // The collector will separately count the number of rows actually returned to
      // the client.
      rows_scanned += block.nrows();
      if (scanner->spec().has_limit()) {
        int64_t rows_left = scanner->spec().limit() - scanner->num_rows_returned();
        DCHECK_GT(rows_left, 0);  // Guaranteed by has_fulfilled_limit()
        block.selection_vector()->ClearToSelectAtMost(static_cast<size_t>(rows_left));
      }
      result_collector->HandleRowBlock(scanner.get(), block);
    }

    int64_t response_size = result_collector->ResponseSize();

    if (VLOG_IS_ON(2)) {
      // This may be fairly expensive if row block size is small
      TRACE("Copied block (nrows=$0), new size=$1", block.nrows(), response_size);
    }

    // TODO: should check if RPC got cancelled, once we implement RPC cancellation.
    if (PREDICT_FALSE(MonoTime::Now() >= deadline)) {
      TRACE("Deadline expired - responding early");
      break;
    }

    if (response_size >= batch_size_bytes) {
      break;
    }
  }

  scoped_refptr<TabletReplica> replica = scanner->tablet_replica();
  shared_ptr<Tablet> tablet;
  TabletServerErrorPB::Code tablet_ref_error_code;
  s = GetTabletRef(replica, &tablet, &tablet_ref_error_code);
  // If the tablet is not running, but the scan operation in progress
  // has reached this point, the tablet server has the necessary data to
  // send in response for the scan continuation request.
  if (PREDICT_FALSE(!s.ok() && tablet_ref_error_code !=
                        TabletServerErrorPB::TABLET_NOT_RUNNING)) {
    *error_code = tablet_ref_error_code;
    return s;
  }

  // Calculate the number of rows/cells/bytes actually processed.
  IteratorStats delta_stats = scanner->UpdateStatsAndGetDelta();
  TRACE_COUNTER_INCREMENT(SCANNER_BYTES_READ_METRIC_NAME, delta_stats.bytes_read);

  // Update metrics based on this scan request.
  if (tablet) {
    // The number of rows/cells/bytes actually returned to the user.
    tablet->metrics()->scanner_rows_returned->IncrementBy(
        result_collector->NumRowsReturned());
    tablet->metrics()->scanner_cells_returned->IncrementBy(
        result_collector->NumRowsReturned() *
            scanner->client_projection_schema()->num_columns());
    tablet->metrics()->scanner_bytes_returned->IncrementBy(
        result_collector->ResponseSize());

    // The number of rows/cells/bytes actually processed.
    tablet->metrics()->scanner_rows_scanned->IncrementBy(rows_scanned);
    tablet->metrics()->scanner_cells_scanned_from_disk->IncrementBy(delta_stats.cells_read);
    tablet->metrics()->scanner_bytes_scanned_from_disk->IncrementBy(delta_stats.bytes_read);
    tablet->metrics()->scanner_predicates_disabled->IncrementBy(delta_stats.predicates_disabled);

    // Last read timestamp.
    tablet->UpdateLastReadTime();
  }

  *has_more_results = !req->close_scanner() && iter->HasNext() &&
      !scanner->has_fulfilled_limit();
  if (*has_more_results) {
    unreg_scanner.Cancel();
  } else {
    VLOG(2) << "Scanner " << scanner->id() << " complete: removing...";
  }

  return Status::OK();
}

namespace {
// Helper to clamp a client deadline for a scan to the max supported by the server.
MonoTime ClampScanDeadlineForWait(const MonoTime& deadline, bool* was_clamped) {
  const MonoTime now = MonoTime::Now();
  if ((deadline - now).ToMilliseconds() > FLAGS_scanner_max_wait_ms) {
    *was_clamped = true;
    return now + MonoDelta::FromMilliseconds(FLAGS_scanner_max_wait_ms);
  }
  *was_clamped = false;
  return deadline;
}
} // anonymous namespace

Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb,
                                               const RpcContext* rpc_context,
                                               const Schema& projection,
                                               Tablet* tablet,
                                               TimeManager* time_manager,
                                               unique_ptr<RowwiseIterator>* iter,
                                               optional<Timestamp>* snap_start_timestamp,
                                               Timestamp* snap_timestamp,
                                               TabletServerErrorPB::Code* error_code) {
  const auto read_mode = scan_pb.read_mode();
  switch (read_mode) {
    case READ_AT_SNAPSHOT: // Fallthrough intended
    case READ_YOUR_WRITES:
      break;
    default:
      LOG(FATAL) << Substitute("$0: unsupported snapshot scan mode", read_mode);
  }

  // Validate other input parameters as well in the very beginning.
  if (scan_pb.has_snap_start_timestamp()) {
    if (read_mode != READ_AT_SNAPSHOT) {
      // TODO(mpercy): Should we allow READ_YOUR_WRITES mode? There is no
      // obvious use for it, but also no obvious reason not to support it,
      // except for the fact that we would also have to test it.
      *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
      return Status::InvalidArgument("scan start timestamp is only supported "
                                     "in READ_AT_SNAPSHOT read mode");
    }
    if (scan_pb.order_mode() != ORDERED) {
      *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
      return Status::InvalidArgument("scan start timestamp is only supported "
                                     "in ORDERED order mode");
    }
  }

  // Based on the read mode, pick a timestamp and verify it.
  Timestamp tmp_snap_timestamp;
  Status s = PickAndVerifyTimestamp(scan_pb, tablet, &tmp_snap_timestamp);
  if (PREDICT_FALSE(!s.ok())) {
    *error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
    return s.CloneAndPrepend("cannot verify timestamp");
  }

  // Reduce the client's deadline by a few msecs to allow for overhead.
  const MonoTime client_deadline =
      rpc_context->GetClientDeadline() - MonoDelta::FromMilliseconds(10);

  // Its not good for the tablet server or for the client if we hang here forever. The tablet
  // server will have one less available thread and the client might be stuck spending all
  // of the allotted time for the scan on a partitioned server that will never have a consistent
  // snapshot at 'snap_timestamp'.
  // Because of this we clamp the client's deadline to the maximum configured
  // scanner wait time. If the client sets a longer timeout then it can use it
  // by retrying (possibly on other servers).
  bool was_clamped = false;
  MonoTime final_deadline = ClampScanDeadlineForWait(client_deadline, &was_clamped);

  // Wait for the tablet to know that 'snap_timestamp' is safe. I.e. that all operations
  // that came before it are, at least, started. This, together with waiting for the mvcc
  // snapshot to be clean below, allows us to always return the same data when scanning at
  // the same timestamp (repeatable reads).
  TRACE("Waiting safe time to advance");
  const MonoTime before = MonoTime::Now();
  s = time_manager->WaitUntilSafe(tmp_snap_timestamp, final_deadline);

  MvccSnapshot snap;
  if (PREDICT_TRUE(s.ok())) {
    // Wait for the in-flights in the snapshot to be finished.
    TRACE("Waiting for operations to commit");
    s = tablet->mvcc_manager()->WaitForSnapshotWithAllApplied(
          tmp_snap_timestamp, &snap, client_deadline);
  }

  // If we got an TimeOut but we had clamped the deadline, return
  // ServiceUnavailable to make the client retry the scan operation soon.
  if (s.IsTimedOut() && was_clamped) {
    *error_code = TabletServerErrorPB::THROTTLED;
    return Status::ServiceUnavailable(s.CloneAndPrepend(
        "could not wait for desired snapshot timestamp to be consistent").ToString());
  }
  RETURN_NOT_OK(s);

  const auto duration_usec = (MonoTime::Now() - before).ToMicroseconds();
  tablet->metrics()->snapshot_read_inflight_wait_duration->Increment(duration_usec);
  TRACE("All operations in snapshot committed. Waited for $0 microseconds", duration_usec);

  tablet::RowIteratorOptions opts;
  opts.projection = &projection;
  opts.snap_to_include = snap;
  opts.order = scan_pb.order_mode();

  optional<Timestamp> tmp_snap_start_timestamp;
  if (scan_pb.has_snap_start_timestamp()) {
    tmp_snap_start_timestamp = Timestamp(scan_pb.snap_start_timestamp());
    opts.snap_to_exclude = MvccSnapshot(*tmp_snap_start_timestamp);
    opts.include_deleted_rows = true;
  }

  // Before we open / wait on anything check that the timestamp(s) are after
  // the AHM. This is not the final check. We'll check this again after the
  // iterators are open but there is no point in doing the work to initialize
  // the iterators and spending the time to wait for a snapshot timestamp to be
  // readable when we can't read back to one of the requested timestamps.
  RETURN_NOT_OK_EVAL(VerifyLegalSnapshotTimestamps(tablet, read_mode,
                                                   tmp_snap_start_timestamp,
                                                   tmp_snap_timestamp),
                     *error_code = TabletServerErrorPB::INVALID_SNAPSHOT);

  RETURN_NOT_OK(tablet->NewRowIterator(std::move(opts), iter));

  // Return the picked snapshot timestamp for both READ_AT_SNAPSHOT
  // and READ_YOUR_WRITES mode, as well as the parsed start timestamp for
  // READ_AT_SNAPSHOT mode, if specified.
  *snap_start_timestamp = std::move(tmp_snap_start_timestamp);
  *snap_timestamp = tmp_snap_timestamp;
  return Status::OK();
}

Status TabletServiceImpl::ValidateTimestamp(const Timestamp& snap_timestamp) {
  Timestamp max_allowed_ts;
  Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts);
  if (PREDICT_FALSE(s.IsNotSupported()) &&
      PREDICT_TRUE(!FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps)) {
    return Status::NotSupported("Snapshot scans not supported on this server",
                                s.ToString());
  }

  // Note: if 'max_allowed_ts' is not obtained from clock_->GetGlobalLatest(), e.g.,
  // in case logical clock is used, it's guaranteed to be higher than 'tmp_snap_timestamp',
  // since 'max_allowed_ts' is default-constructed to kInvalidTimestamp (MAX_LONG - 1).
  if (snap_timestamp > max_allowed_ts) {
    return Status::InvalidArgument(
        Substitute("Snapshot time $0 in the future. Max allowed timestamp is $1",
                   server_->clock()->Stringify(snap_timestamp),
                   server_->clock()->Stringify(max_allowed_ts)));
  }

  return Status::OK();
}

Status TabletServiceImpl::PickAndVerifyTimestamp(const NewScanRequestPB& scan_pb,
                                                 Tablet* tablet,
                                                 Timestamp* snap_timestamp) {
  // If the client sent a timestamp update our clock with it.
  if (scan_pb.has_propagated_timestamp()) {
    Timestamp propagated_timestamp(scan_pb.propagated_timestamp());

    // Update the clock so that we never generate snapshots lower than
    // 'propagated_timestamp'. If 'propagated_timestamp' is lower than
    // 'now' this call has no effect. If 'propagated_timestamp' is too far
    // into the future this will fail and we abort.
    RETURN_NOT_OK(server_->clock()->Update(propagated_timestamp));
  }

  Timestamp tmp_snap_timestamp;
  ReadMode read_mode = scan_pb.read_mode();

  if (read_mode == READ_AT_SNAPSHOT) {
    // For READ_AT_SNAPSHOT mode,
    //   1) if the client provided no snapshot timestamp we take the current
    //      clock time as the snapshot timestamp.
    //   2) else we use the client provided one, but make sure it is not too
    //      far in the future as to be invalid.
    if (!scan_pb.has_snap_timestamp()) {
      tmp_snap_timestamp = server_->clock()->Now();
    } else {
      tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp());
      RETURN_NOT_OK(ValidateTimestamp(tmp_snap_timestamp));
    }
  } else {
    // For READ_YOUR_WRITES mode, we use the following to choose a
    // snapshot timestamp: MAX(propagated timestamp + 1, 'clean' timestamp).
    // There is no need to validate if the chosen timestamp is too far in
    // the future, since:
    //   1) MVCC 'clean' timestamp is by definition in the past (it's maximally
    //      bounded by safe time).
    //   2) the propagated timestamp was used to update the clock above and the
    //      update would have returned an error if the the timestamp was too
    //      far in the future.
    uint64_t clean_timestamp =
        tablet->mvcc_manager()->GetCleanTimestamp().ToUint64();
    uint64_t propagated_timestamp = scan_pb.has_propagated_timestamp()
        ? scan_pb.propagated_timestamp()
        : Timestamp::kMin.ToUint64();
    tmp_snap_timestamp = Timestamp(std::max(propagated_timestamp + 1,
                                            clean_timestamp));
  }
  *snap_timestamp = tmp_snap_timestamp;
  return Status::OK();
}

} // namespace tserver
} // namespace kudu
