blob: 8957ce54b36c80436038a48b509140a2aab12aed [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/tablet_service.h"
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <functional>
#include <memory>
#include <numeric>
#include <ostream>
#include <string>
#include <type_traits>
#include <unordered_set>
#include <vector>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include "kudu/clock/clock.h"
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/iterator.h"
#include "kudu/common/iterator_stats.h"
#include "kudu/common/key_range.h"
#include "kudu/common/partition.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/consensus/replica_management.pb.h"
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/server/server_base.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/transactions/alter_schema_transaction.h"
#include "kudu/tablet/transactions/transaction.h"
#include "kudu/tablet/transactions/write_transaction.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_replica_lookup.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_service.pb.h"
#include "kudu/util/auto_release_pool.h"
#include "kudu/util/crc.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/faststring.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/process_memory.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
#include "kudu/util/trace.h"
#include "kudu/util/trace_metrics.h"
DEFINE_int32(scanner_default_batch_size_bytes, 1024 * 1024,
"The default size for batches of scan results");
TAG_FLAG(scanner_default_batch_size_bytes, advanced);
TAG_FLAG(scanner_default_batch_size_bytes, runtime);
DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024,
"The maximum batch size that a client may request for "
"scan results.");
TAG_FLAG(scanner_max_batch_size_bytes, advanced);
TAG_FLAG(scanner_max_batch_size_bytes, runtime);
DEFINE_int32(scanner_batch_size_rows, 100,
"The number of rows to batch for servicing scan requests.");
TAG_FLAG(scanner_batch_size_rows, advanced);
TAG_FLAG(scanner_batch_size_rows, runtime);
DEFINE_bool(scanner_allow_snapshot_scans_with_logical_timestamps, false,
"If set, the server will support snapshot scans with logical timestamps.");
TAG_FLAG(scanner_allow_snapshot_scans_with_logical_timestamps, unsafe);
DEFINE_int32(scanner_max_wait_ms, 1000,
"The maximum amount of time (in milliseconds) we'll hang a scanner thread waiting for "
"safe time to advance or transactions to commit, even if its deadline allows waiting "
"longer.");
TAG_FLAG(scanner_max_wait_ms, advanced);
// Fault injection flags.
DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0,
"If set, the scanner will pause the specified number of milliesconds "
"before reading each batch of data on the tablet server. "
"Used for tests.");
TAG_FLAG(scanner_inject_latency_on_each_batch_ms, unsafe);
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int32(memory_limit_warn_threshold_percentage);
DECLARE_int32(tablet_history_max_age_sec);
using google::protobuf::RepeatedPtrField;
using kudu::consensus::BulkChangeConfigRequestPB;
using kudu::consensus::ChangeConfigRequestPB;
using kudu::consensus::ChangeConfigResponsePB;
using kudu::consensus::ConsensusRequestPB;
using kudu::consensus::ConsensusResponsePB;
using kudu::consensus::GetLastOpIdRequestPB;
using kudu::consensus::GetNodeInstanceRequestPB;
using kudu::consensus::GetNodeInstanceResponsePB;
using kudu::consensus::LeaderStepDownRequestPB;
using kudu::consensus::LeaderStepDownResponsePB;
using kudu::consensus::OpId;
using kudu::consensus::RaftConsensus;
using kudu::consensus::RunLeaderElectionRequestPB;
using kudu::consensus::RunLeaderElectionResponsePB;
using kudu::consensus::StartTabletCopyRequestPB;
using kudu::consensus::StartTabletCopyResponsePB;
using kudu::consensus::TimeManager;
using kudu::consensus::UnsafeChangeConfigRequestPB;
using kudu::consensus::UnsafeChangeConfigResponsePB;
using kudu::consensus::VoteRequestPB;
using kudu::consensus::VoteResponsePB;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::RpcContext;
using kudu::rpc::RpcSidecar;
using kudu::server::ServerBase;
using kudu::tablet::AlterSchemaTransactionState;
using kudu::tablet::TABLET_DATA_COPYING;
using kudu::tablet::TABLET_DATA_DELETED;
using kudu::tablet::TABLET_DATA_TOMBSTONED;
using kudu::tablet::Tablet;
using kudu::tablet::TabletReplica;
using kudu::tablet::TransactionCompletionCallback;
using kudu::tablet::WriteTransactionState;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace cfile {
extern const char* CFILE_CACHE_MISS_BYTES_METRIC_NAME;
extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME;
}
namespace tserver {
namespace {
// Lookup the given tablet, only ensuring that it exists.
// If it does not, responds to the RPC associated with 'context' after setting
// resp->mutable_error() to indicate the failure reason.
//
// Returns true if successful.
template<class RespClass>
bool LookupTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager,
const string& tablet_id,
RespClass* resp,
rpc::RpcContext* context,
scoped_refptr<TabletReplica>* replica) {
Status s = tablet_manager->GetTabletReplica(tablet_id, replica);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::TABLET_NOT_FOUND, context);
return false;
}
return true;
}
template<class RespClass>
void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica,
tablet::TabletStatePB tablet_state,
RespClass* resp,
rpc::RpcContext* context) {
Status s = Status::IllegalState("Tablet not RUNNING",
tablet::TabletStatePB_Name(tablet_state));
auto error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
if (replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_TOMBSTONED ||
replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_DELETED) {
// Treat tombstoned tablets as if they don't exist for most purposes.
// This takes precedence over failed, since we don't reset the failed
// status of a TabletReplica when deleting it. Only tablet copy does that.
error_code = TabletServerErrorPB::TABLET_NOT_FOUND;
} else if (tablet_state == tablet::FAILED) {
s = s.CloneAndAppend(replica->error().ToString());
error_code = TabletServerErrorPB::TABLET_FAILED;
}
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
}
// Check if the replica is running.
template<class RespClass>
bool CheckTabletReplicaRunningOrRespond(const scoped_refptr<TabletReplica>& replica,
RespClass* resp,
rpc::RpcContext* context) {
// Check RUNNING state.
tablet::TabletStatePB state = replica->state();
if (PREDICT_FALSE(state != tablet::RUNNING)) {
RespondTabletNotRunning(replica, state, resp, context);
return false;
}
return true;
}
// Lookup the given tablet, ensuring that it both exists and is RUNNING.
// If it is not, responds to the RPC associated with 'context' after setting
// resp->mutable_error() to indicate the failure reason.
//
// Returns true if successful.
template<class RespClass>
bool LookupRunningTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager,
const string& tablet_id,
RespClass* resp,
rpc::RpcContext* context,
scoped_refptr<TabletReplica>* replica) {
if (!LookupTabletReplicaOrRespond(tablet_manager, tablet_id, resp, context, replica)) {
return false;
}
if (!CheckTabletReplicaRunningOrRespond(*replica, resp, context)) {
return false;
}
return true;
}
template<class ReqClass, class RespClass>
bool CheckUuidMatchOrRespond(TabletReplicaLookupIf* tablet_manager,
const char* method_name,
const ReqClass* req,
RespClass* resp,
rpc::RpcContext* context) {
const string& local_uuid = tablet_manager->NodeInstance().permanent_uuid();
if (PREDICT_FALSE(!req->has_dest_uuid())) {
// Maintain compat in release mode, but complain.
string msg = Substitute("$0: Missing destination UUID in request from $1: $2",
method_name, context->requestor_string(), SecureShortDebugString(*req));
#ifdef NDEBUG
KLOG_EVERY_N(ERROR, 100) << msg;
#else
LOG(DFATAL) << msg;
#endif
return true;
}
if (PREDICT_FALSE(req->dest_uuid() != local_uuid)) {
Status s = Status::InvalidArgument(Substitute("$0: Wrong destination UUID requested. "
"Local UUID: $1. Requested UUID: $2",
method_name, local_uuid, req->dest_uuid()));
LOG(WARNING) << s.ToString() << ": from " << context->requestor_string()
<< ": " << SecureShortDebugString(*req);
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::WRONG_SERVER_UUID, context);
return false;
}
return true;
}
template<class RespClass>
bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>& replica,
RespClass* resp,
rpc::RpcContext* context,
shared_ptr<RaftConsensus>* consensus_out) {
shared_ptr<RaftConsensus> tmp_consensus = replica->shared_consensus();
if (!tmp_consensus) {
Status s = Status::ServiceUnavailable("Raft Consensus unavailable",
"Tablet replica not initialized");
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::TABLET_NOT_RUNNING, context);
return false;
}
*consensus_out = std::move(tmp_consensus);
return true;
}
Status GetTabletRef(const scoped_refptr<TabletReplica>& replica,
shared_ptr<Tablet>* tablet,
TabletServerErrorPB::Code* error_code) {
*DCHECK_NOTNULL(tablet) = replica->shared_tablet();
if (PREDICT_FALSE(!*tablet)) {
*error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
return Status::IllegalState("Tablet is not running");
}
return Status::OK();
}
template <class RespType>
void HandleUnknownError(const Status& s, RespType* resp, RpcContext* context) {
resp->Clear();
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
}
template <class ReqType, class RespType>
void HandleResponse(const ReqType* req, RespType* resp,
RpcContext* context, const Status& s) {
if (PREDICT_FALSE(!s.ok())) {
HandleUnknownError(s, resp, context);
return;
}
context->RespondSuccess();
}
template <class ReqType, class RespType>
static StdStatusCallback BindHandleResponse(
const ReqType* req,
RespType* resp,
RpcContext* context) {
return std::bind(&HandleResponse<ReqType, RespType>,
req,
resp,
context,
std::placeholders::_1);
}
} // namespace
typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
static void SetupErrorAndRespond(TabletServerErrorPB* error,
const Status& s,
TabletServerErrorPB::Code code,
rpc::RpcContext* context) {
// Generic "service unavailable" errors will cause the client to retry later.
if ((code == TabletServerErrorPB::UNKNOWN_ERROR ||
code == TabletServerErrorPB::THROTTLED) && s.IsServiceUnavailable()) {
context->RespondRpcFailure(rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY, s);
return;
}
StatusToPB(s, error->mutable_status());
error->set_code(code);
context->RespondNoCache();
}
template <class ReqType, class RespType>
void HandleErrorResponse(const ReqType* req, RespType* resp, RpcContext* context,
const boost::optional<TabletServerErrorPB::Code>& error_code,
const Status& s) {
resp->Clear();
if (error_code) {
SetupErrorAndRespond(resp->mutable_error(), s, *error_code, context);
} else {
HandleUnknownError(s, resp, context);
}
}
// A transaction completion callback that responds to the client when transactions
// complete and sets the client error if there is one to set.
template<class Response>
class RpcTransactionCompletionCallback : public TransactionCompletionCallback {
public:
RpcTransactionCompletionCallback(rpc::RpcContext* context,
Response* response)
: context_(context),
response_(response) {}
virtual void TransactionCompleted() OVERRIDE {
if (!status_.ok()) {
SetupErrorAndRespond(get_error(), status_, code_, context_);
} else {
context_->RespondSuccess();
}
};
private:
TabletServerErrorPB* get_error() {
return response_->mutable_error();
}
rpc::RpcContext* context_;
Response* response_;
tablet::TransactionState* state_;
};
// Generic interface to handle scan results.
class ScanResultCollector {
public:
virtual void HandleRowBlock(Scanner* scanner,
const RowBlock& row_block) = 0;
// Returns number of bytes which will be returned in the response.
virtual int64_t ResponseSize() const = 0;
// Returns the last processed row's primary key.
virtual const faststring& last_primary_key() const = 0;
// Return the number of rows actually returned to the client.
virtual int64_t NumRowsReturned() const = 0;
// Sets row format flags on the ScanResultCollector.
//
// This is a setter instead of a constructor argument passed to specific
// collector implementations because, currently, the collector is built
// before the request is decoded and checked for 'row_format_flags'.
//
// Does nothing by default.
virtual void set_row_format_flags(uint64_t /* row_format_flags */) {}
};
namespace {
// Given a RowBlock, set last_primary_key to the primary key of the last selected row
// in the RowBlock. If no row is selected, last_primary_key is not set.
void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) {
// Find the last selected row and save its encoded key.
const SelectionVector* sel = row_block.selection_vector();
if (sel->AnySelected()) {
for (int i = sel->nrows() - 1; i >= 0; i--) {
if (sel->IsRowSelected(i)) {
RowBlockRow last_row = row_block.row(i);
const Schema* schema = last_row.schema();
schema->EncodeComparableKey(last_row, last_primary_key);
break;
}
}
}
}
} // namespace
// Copies the scan result to the given row block PB and data buffers.
//
// This implementation is used in the common case where a client is running
// a scan and the data needs to be returned to the client.
//
// (This is in contrast to some other ScanResultCollector implementation that
// might do an aggregation or gather some other types of statistics via a
// server-side scan and thus never need to return the actual data.)
class ScanResultCopier : public ScanResultCollector {
public:
ScanResultCopier(RowwiseRowBlockPB* rowblock_pb,
faststring* rows_data,
faststring* indirect_data)
: rowblock_pb_(DCHECK_NOTNULL(rowblock_pb)),
rows_data_(DCHECK_NOTNULL(rows_data)),
indirect_data_(DCHECK_NOTNULL(indirect_data)),
num_rows_returned_(0),
pad_unixtime_micros_to_16_bytes_(false) {}
void HandleRowBlock(Scanner* scanner, const RowBlock& row_block) override {
int64_t num_selected = row_block.selection_vector()->CountSelected();
// Fast-path empty blocks (eg because the predicate didn't match any rows or
// all rows in the block were deleted)
if (num_selected == 0) return;
num_rows_returned_ += num_selected;
scanner->add_num_rows_returned(num_selected);
SerializeRowBlock(row_block, rowblock_pb_, scanner->client_projection_schema(),
rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
SetLastRow(row_block, &last_primary_key_);
}
// Returns number of bytes buffered to return.
int64_t ResponseSize() const override {
return rows_data_->size() + indirect_data_->size();
}
const faststring& last_primary_key() const override {
return last_primary_key_;
}
int64_t NumRowsReturned() const override {
return num_rows_returned_;
}
void set_row_format_flags(uint64_t row_format_flags) override {
if (row_format_flags & RowFormatFlags::PAD_UNIX_TIME_MICROS_TO_16_BYTES) {
pad_unixtime_micros_to_16_bytes_ = true;
}
}
private:
RowwiseRowBlockPB* const rowblock_pb_;
faststring* const rows_data_;
faststring* const indirect_data_;
int64_t num_rows_returned_;
faststring last_primary_key_;
bool pad_unixtime_micros_to_16_bytes_;
DISALLOW_COPY_AND_ASSIGN(ScanResultCopier);
};
// Checksums the scan result.
class ScanResultChecksummer : public ScanResultCollector {
public:
ScanResultChecksummer()
: crc_(crc::GetCrc32cInstance()),
agg_checksum_(0),
rows_checksummed_(0) {
}
virtual void HandleRowBlock(Scanner* scanner,
const RowBlock& row_block) OVERRIDE {
const Schema* client_projection_schema = scanner->client_projection_schema();
if (!client_projection_schema) {
client_projection_schema = &row_block.schema();
}
size_t nrows = row_block.nrows();
for (size_t i = 0; i < nrows; i++) {
if (!row_block.selection_vector()->IsRowSelected(i)) continue;
uint32_t row_crc = CalcRowCrc32(*client_projection_schema, row_block.row(i));
agg_checksum_ += row_crc;
rows_checksummed_++;
}
// Find the last selected row and save its encoded key.
SetLastRow(row_block, &encoded_last_row_);
}
// Returns a constant -- we only return checksum based on a time budget.
virtual int64_t ResponseSize() const OVERRIDE { return sizeof(agg_checksum_); }
virtual const faststring& last_primary_key() const OVERRIDE { return encoded_last_row_; }
virtual int64_t NumRowsReturned() const OVERRIDE {
return 0;
}
int64_t rows_checksummed() const {
return rows_checksummed_;
}
// Accessors for initializing / setting the checksum.
void set_agg_checksum(uint64_t value) { agg_checksum_ = value; }
uint64_t agg_checksum() const { return agg_checksum_; }
private:
// Calculates a CRC32C for the given row.
uint32_t CalcRowCrc32(const Schema& projection, const RowBlockRow& row) {
tmp_buf_.clear();
for (size_t j = 0; j < projection.num_columns(); j++) {
uint32_t col_index = static_cast<uint32_t>(j); // For the CRC.
tmp_buf_.append(&col_index, sizeof(col_index));
ColumnBlockCell cell = row.cell(j);
if (cell.is_nullable()) {
uint8_t is_defined = cell.is_null() ? 0 : 1;
tmp_buf_.append(&is_defined, sizeof(is_defined));
if (!is_defined) continue;
}
if (cell.typeinfo()->physical_type() == BINARY) {
const Slice* data = reinterpret_cast<const Slice *>(cell.ptr());
tmp_buf_.append(data->data(), data->size());
} else {
tmp_buf_.append(cell.ptr(), cell.size());
}
}
uint64_t row_crc = 0;
crc_->Compute(tmp_buf_.data(), tmp_buf_.size(), &row_crc, nullptr);
return static_cast<uint32_t>(row_crc); // CRC32 only uses the lower 32 bits.
}
faststring tmp_buf_;
crc::Crc* const crc_;
uint64_t agg_checksum_;
int64_t rows_checksummed_;
faststring encoded_last_row_;
DISALLOW_COPY_AND_ASSIGN(ScanResultChecksummer);
};
// Return the batch size to use for a given request, after clamping
// the user-requested request within the server-side allowable range.
// This is only a hint, really more of a threshold since returned bytes
// may exceed this limit, but hopefully only by a little bit.
static size_t GetMaxBatchSizeBytesHint(const ScanRequestPB* req) {
if (!req->has_batch_size_bytes()) {
return FLAGS_scanner_default_batch_size_bytes;
}
return std::min(req->batch_size_bytes(),
implicit_cast<uint32_t>(FLAGS_scanner_max_batch_size_bytes));
}
TabletServiceImpl::TabletServiceImpl(TabletServer* server)
: TabletServerServiceIf(server->metric_entity(), server->result_tracker()),
server_(server) {
}
bool TabletServiceImpl::AuthorizeClientOrServiceUser(const google::protobuf::Message* /*req*/,
google::protobuf::Message* /*resp*/,
rpc::RpcContext* context) {
return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::USER |
ServerBase::SERVICE_USER);
}
bool TabletServiceImpl::AuthorizeClient(const google::protobuf::Message* /*req*/,
google::protobuf::Message* /*resp*/,
rpc::RpcContext* context) {
return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::USER);
}
void TabletServiceImpl::Ping(const PingRequestPB* /*req*/,
PingResponsePB* /*resp*/,
rpc::RpcContext* context) {
context->RespondSuccess();
}
TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server)
: TabletServerAdminServiceIf(server->metric_entity(), server->result_tracker()),
server_(server) {
}
bool TabletServiceAdminImpl::AuthorizeServiceUser(const google::protobuf::Message* /*req*/,
google::protobuf::Message* /*resp*/,
rpc::RpcContext* context) {
return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::SERVICE_USER);
}
void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req,
AlterSchemaResponsePB* resp,
rpc::RpcContext* context) {
if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "AlterSchema", req, resp, context)) {
return;
}
DVLOG(3) << "Received Alter Schema RPC: " << SecureDebugString(*req);
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
context, &replica)) {
return;
}
uint32_t schema_version = replica->tablet_metadata()->schema_version();
// If the schema was already applied, respond as succeded
if (schema_version == req->schema_version()) {
// Sanity check, to verify that the tablet should have the same schema
// specified in the request.
Schema req_schema;
Status s = SchemaFromPB(req->schema(), &req_schema);
if (!s.ok()) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::INVALID_SCHEMA, context);
return;
}
Schema tablet_schema = replica->tablet_metadata()->schema();
if (req_schema.Equals(tablet_schema)) {
context->RespondSuccess();
return;
}
schema_version = replica->tablet_metadata()->schema_version();
if (schema_version == req->schema_version()) {
LOG(ERROR) << "The current schema does not match the request schema."
<< " version=" << schema_version
<< " current-schema=" << tablet_schema.ToString()
<< " request-schema=" << req_schema.ToString()
<< " (corruption)";
SetupErrorAndRespond(resp->mutable_error(),
Status::Corruption("got a different schema for the same version number"),
TabletServerErrorPB::MISMATCHED_SCHEMA, context);
return;
}
}
// If the current schema is newer than the one in the request reject the request.
if (schema_version > req->schema_version()) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Tablet has a newer schema"),
TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, context);
return;
}
unique_ptr<AlterSchemaTransactionState> tx_state(
new AlterSchemaTransactionState(replica.get(), req, resp));
tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
new RpcTransactionCompletionCallback<AlterSchemaResponsePB>(context,
resp)));
// Submit the alter schema op. The RPC will be responded to asynchronously.
Status s = replica->SubmitAlterSchema(std::move(tx_state));
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
}
void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req,
CreateTabletResponsePB* resp,
rpc::RpcContext* context) {
if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "CreateTablet", req, resp, context)) {
return;
}
TRACE_EVENT1("tserver", "CreateTablet",
"tablet_id", req->tablet_id());
Schema schema;
Status s = SchemaFromPB(req->schema(), &schema);
DCHECK(schema.has_column_ids());
if (!s.ok()) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid Schema."),
TabletServerErrorPB::INVALID_SCHEMA, context);
return;
}
PartitionSchema partition_schema;
s = PartitionSchema::FromPB(req->partition_schema(), schema, &partition_schema);
if (!s.ok()) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid PartitionSchema."),
TabletServerErrorPB::INVALID_SCHEMA, context);
return;
}
Partition partition;
Partition::FromPB(req->partition(), &partition);
LOG(INFO) << "Processing CreateTablet for tablet " << req->tablet_id()
<< " (table=" << req->table_name()
<< " [id=" << req->table_id() << "]), partition="
<< partition_schema.PartitionDebugString(partition, schema);
VLOG(1) << "Full request: " << SecureDebugString(*req);
s = server_->tablet_manager()->CreateNewTablet(req->table_id(),
req->tablet_id(),
partition,
req->table_name(),
schema,
partition_schema,
req->config(),
nullptr);
if (PREDICT_FALSE(!s.ok())) {
TabletServerErrorPB::Code code;
if (s.IsAlreadyPresent()) {
code = TabletServerErrorPB::TABLET_ALREADY_EXISTS;
} else {
code = TabletServerErrorPB::UNKNOWN_ERROR;
}
SetupErrorAndRespond(resp->mutable_error(), s, code, context);
return;
}
context->RespondSuccess();
}
void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req,
DeleteTabletResponsePB* resp,
rpc::RpcContext* context) {
if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "DeleteTablet", req, resp, context)) {
return;
}
TRACE_EVENT2("tserver", "DeleteTablet",
"tablet_id", req->tablet_id(),
"reason", req->reason());
tablet::TabletDataState delete_type = tablet::TABLET_DATA_UNKNOWN;
if (req->has_delete_type()) {
delete_type = req->delete_type();
}
LOG(INFO) << "Processing DeleteTablet for tablet " << req->tablet_id()
<< " with delete_type " << TabletDataState_Name(delete_type)
<< (req->has_reason() ? (" (" + req->reason() + ")") : "")
<< " from " << context->requestor_string();
VLOG(1) << "Full request: " << SecureDebugString(*req);
boost::optional<int64_t> cas_config_opid_index_less_or_equal;
if (req->has_cas_config_opid_index_less_or_equal()) {
cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal();
}
auto response_callback = [context, req, resp](const Status& s, TabletServerErrorPB::Code code) {
if (PREDICT_FALSE(!s.ok())) {
HandleErrorResponse(req, resp, context, code, s);
return;
}
context->RespondSuccess();
};
server_->tablet_manager()->DeleteTabletAsync(req->tablet_id(),
delete_type,
cas_config_opid_index_less_or_equal,
response_callback);
}
void TabletServiceImpl::Write(const WriteRequestPB* req,
WriteResponsePB* resp,
rpc::RpcContext* context) {
TRACE_EVENT1("tserver", "TabletServiceImpl::Write",
"tablet_id", req->tablet_id());
DVLOG(3) << "Received Write RPC: " << SecureDebugString(*req);
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
context, &replica)) {
return;
}
shared_ptr<Tablet> tablet;
TabletServerErrorPB::Code error_code;
Status s = GetTabletRef(replica, &tablet, &error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
uint64_t bytes = req->row_operations().rows().size() +
req->row_operations().indirect_data().size();
if (!tablet->ShouldThrottleAllow(bytes)) {
SetupErrorAndRespond(resp->mutable_error(),
Status::ServiceUnavailable("Rejecting Write request: throttled"),
TabletServerErrorPB::THROTTLED,
context);
return;
}
// Check for memory pressure; don't bother doing any additional work if we've
// exceeded the limit.
double capacity_pct;
if (process_memory::SoftLimitExceeded(&capacity_pct)) {
tablet->metrics()->leader_memory_pressure_rejections->Increment();
string msg = StringPrintf("Soft memory limit exceeded (at %.2f%% of capacity)", capacity_pct);
if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) {
KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting Write request: " << msg << THROTTLE_MSG;
} else {
KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting Write request: " << msg << THROTTLE_MSG;
}
SetupErrorAndRespond(resp->mutable_error(), Status::ServiceUnavailable(msg),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
if (!server_->clock()->SupportsExternalConsistencyMode(req->external_consistency_mode())) {
Status s = Status::NotSupported("The configured clock does not support the"
" required consistency mode.");
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(
replica.get(),
req,
context->AreResultsTracked() ? context->request_id() : nullptr,
resp));
// If the client sent us a timestamp, decode it and update the clock so that all future
// timestamps are greater than the passed timestamp.
if (req->has_propagated_timestamp()) {
Timestamp ts(req->propagated_timestamp());
s = server_->clock()->Update(ts);
}
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
new RpcTransactionCompletionCallback<WriteResponsePB>(context,
resp)));
// Submit the write. The RPC will be responded to asynchronously.
s = replica->SubmitWrite(std::move(tx_state));
// Check that we could submit the write
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
}
}
ConsensusServiceImpl::ConsensusServiceImpl(ServerBase* server,
TabletReplicaLookupIf* tablet_manager)
: ConsensusServiceIf(server->metric_entity(), server->result_tracker()),
server_(server),
tablet_manager_(tablet_manager) {
}
ConsensusServiceImpl::~ConsensusServiceImpl() {
}
bool ConsensusServiceImpl::AuthorizeServiceUser(const google::protobuf::Message* /*req*/,
google::protobuf::Message* /*resp*/,
rpc::RpcContext* rpc) {
return server_->Authorize(rpc, ServerBase::SUPER_USER | ServerBase::SERVICE_USER);
}
void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
ConsensusResponsePB* resp,
rpc::RpcContext* context) {
DVLOG(3) << "Received Consensus Update RPC: " << SecureDebugString(*req);
if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, context)) {
return;
}
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
&replica)) {
return;
}
// Submit the update directly to the TabletReplica's RaftConsensus instance.
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
Status s = consensus->Update(req, resp);
if (PREDICT_FALSE(!s.ok())) {
// Clear the response first, since a partially-filled response could
// result in confusing a caller, or in having missing required fields
// in embedded optional messages.
resp->Clear();
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
context->RespondSuccess();
}
void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
VoteResponsePB* resp,
rpc::RpcContext* context) {
DVLOG(3) << "Received Consensus Request Vote RPC: " << SecureDebugString(*req);
if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, context)) {
return;
}
// Because the last-logged opid is stored in the TabletMetadata we go through
// the following dance:
// 1. Get a reference to the currently-registered TabletReplica.
// 2. Fetch (non-atomically) the current data state and last-logged opid from
// the TabletMetadata.
// 3. If the data state is COPYING or TOMBSTONED, pass the last-logged opid
// from the TabletMetadata into RaftConsensus::RequestVote().
//
// The reason this sequence is safe to do without atomic locks is the
// RaftConsensus object associated with the TabletReplica will be Shutdown()
// and thus unable to vote if another TabletCopy operation comes between
// steps 1 and 3.
//
// TODO(mpercy): Move the last-logged opid into ConsensusMetadata to avoid
// this hacky plumbing. An additional benefit would be that we would be able
// to easily "tombstoned vote" while the tablet is bootstrapping.
scoped_refptr<TabletReplica> replica;
if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
return;
}
boost::optional<OpId> last_logged_opid;
tablet::TabletDataState data_state = replica->tablet_metadata()->tablet_data_state();
LOG(INFO) << "Received RequestConsensusVote() RPC: " << SecureShortDebugString(*req);
// We cannot vote while DELETED. This check is not racy because DELETED is a
// terminal state; it is not possible to transition out of DELETED.
if (data_state == TABLET_DATA_DELETED) {
RespondTabletNotRunning(replica, replica->state(), resp, context);
return;
}
// Attempt to vote while copying or tombstoned.
if (data_state == TABLET_DATA_COPYING || data_state == TABLET_DATA_TOMBSTONED) {
last_logged_opid = replica->tablet_metadata()->tombstone_last_logged_opid();
}
// Submit the vote request directly to the consensus instance.
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
Status s = consensus->RequestVote(req,
consensus::TabletVotingState(std::move(last_logged_opid),
data_state),
resp);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
context->RespondSuccess();
}
void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
ChangeConfigResponsePB* resp,
RpcContext* context) {
VLOG(1) << "Received ChangeConfig RPC: " << SecureDebugString(*req);
if (!CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, context)) {
return;
}
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
&replica)) {
return;
}
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
boost::optional<TabletServerErrorPB::Code> error_code;
Status s = consensus->ChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code);
if (PREDICT_FALSE(!s.ok())) {
HandleErrorResponse(req, resp, context, error_code, s);
return;
}
// The success case is handled when the callback fires.
}
void ConsensusServiceImpl::BulkChangeConfig(const BulkChangeConfigRequestPB* req,
ChangeConfigResponsePB* resp,
RpcContext* context) {
VLOG(1) << "Received BulkChangeConfig RPC: " << SecureDebugString(*req);
if (!CheckUuidMatchOrRespond(tablet_manager_, "BulkChangeConfig", req, resp, context)) {
return;
}
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
&replica)) {
return;
}
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
boost::optional<TabletServerErrorPB::Code> error_code;
Status s = consensus->BulkChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code);
if (PREDICT_FALSE(!s.ok())) {
HandleErrorResponse(req, resp, context, error_code, s);
return;
}
// The success case is handled when the callback fires.
}
void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* req,
UnsafeChangeConfigResponsePB* resp,
RpcContext* context) {
LOG(INFO) << "Received UnsafeChangeConfig RPC: " << SecureDebugString(*req)
<< " from " << context->requestor_string();
if (!CheckUuidMatchOrRespond(tablet_manager_, "UnsafeChangeConfig", req, resp, context)) {
return;
}
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
&replica)) {
return;
}
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) {
return;
}
boost::optional<TabletServerErrorPB::Code> error_code;
const Status s = consensus->UnsafeChangeConfig(*req, &error_code);
if (PREDICT_FALSE(!s.ok())) {
HandleErrorResponse(req, resp, context, error_code, s);
return;
}
context->RespondSuccess();
}
void ConsensusServiceImpl::GetNodeInstance(const GetNodeInstanceRequestPB* req,
GetNodeInstanceResponsePB* resp,
rpc::RpcContext* context) {
VLOG(1) << "Received Get Node Instance RPC: " << SecureDebugString(*req);
resp->mutable_node_instance()->CopyFrom(tablet_manager_->NodeInstance());
context->RespondSuccess();
}
void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* req,
RunLeaderElectionResponsePB* resp,
rpc::RpcContext* context) {
LOG(INFO) << "Received Run Leader Election RPC: " << SecureDebugString(*req)
<< " from " << context->requestor_string();
if (!CheckUuidMatchOrRespond(tablet_manager_, "RunLeaderElection", req, resp, context)) {
return;
}
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
&replica)) {
return;
}
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
Status s = consensus->StartElection(
consensus::RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
consensus::RaftConsensus::EXTERNAL_REQUEST);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
context->RespondSuccess();
}
void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
LeaderStepDownResponsePB* resp,
RpcContext* context) {
LOG(INFO) << "Received LeaderStepDown RPC: " << SecureDebugString(*req)
<< " from " << context->requestor_string();
if (!CheckUuidMatchOrRespond(tablet_manager_, "LeaderStepDown", req, resp, context)) {
return;
}
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
&replica)) {
return;
}
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
Status s = consensus->StepDown(resp);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s,
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
context->RespondSuccess();
}
void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req,
consensus::GetLastOpIdResponsePB *resp,
rpc::RpcContext *context) {
DVLOG(3) << "Received GetLastOpId RPC: " << SecureDebugString(*req);
if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, context)) {
return;
}
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
&replica)) {
return;
}
if (replica->state() != tablet::RUNNING) {
SetupErrorAndRespond(resp->mutable_error(),
Status::ServiceUnavailable("TabletReplica not in RUNNING state"),
TabletServerErrorPB::TABLET_NOT_RUNNING, context);
return;
}
shared_ptr<RaftConsensus> consensus;
if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) {
HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"),
resp, context);
return;
}
boost::optional<OpId> opid = consensus->GetLastOpId(req->opid_type());
if (!opid) {
SetupErrorAndRespond(resp->mutable_error(),
Status::IllegalState("Cannot fetch last OpId in WAL"),
TabletServerErrorPB::TABLET_NOT_RUNNING,
context);
return;
}
*resp->mutable_opid() = *opid;
context->RespondSuccess();
}
void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateRequestPB* req,
consensus::GetConsensusStateResponsePB* resp,
rpc::RpcContext* context) {
DVLOG(3) << "Received GetConsensusState RPC: " << SecureDebugString(*req);
if (!CheckUuidMatchOrRespond(tablet_manager_, "GetConsensusState", req, resp, context)) {
return;
}
unordered_set<string> requested_ids(req->tablet_ids().begin(), req->tablet_ids().end());
bool all_ids = requested_ids.empty();
vector<scoped_refptr<TabletReplica>> tablet_replicas;
tablet_manager_->GetTabletReplicas(&tablet_replicas);
for (const scoped_refptr<TabletReplica>& replica : tablet_replicas) {
if (!all_ids && !ContainsKey(requested_ids, replica->tablet_id())) {
continue;
}
shared_ptr<RaftConsensus> consensus(replica->shared_consensus());
if (!consensus) {
continue;
}
consensus::GetConsensusStateResponsePB_TabletConsensusInfoPB tablet_info;
Status s = consensus->ConsensusState(tablet_info.mutable_cstate(), req->report_health());
if (!s.ok()) {
DCHECK(s.IsIllegalState()) << s.ToString();
continue;
}
tablet_info.set_tablet_id(replica->tablet_id());
*resp->add_tablets() = std::move(tablet_info);
}
const auto scheme = FLAGS_raft_prepare_replacement_before_eviction
? consensus::ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION
: consensus::ReplicaManagementInfoPB::EVICT_FIRST;
resp->mutable_replica_management_info()->set_replacement_scheme(scheme);
context->RespondSuccess();
}
void ConsensusServiceImpl::StartTabletCopy(const StartTabletCopyRequestPB* req,
StartTabletCopyResponsePB* resp,
rpc::RpcContext* context) {
if (!CheckUuidMatchOrRespond(tablet_manager_, "StartTabletCopy", req, resp, context)) {
return;
}
auto response_callback = [context, resp](const Status& s, TabletServerErrorPB::Code code) {
if (s.ok()) {
context->RespondSuccess();
} else {
// Skip calling SetupErrorAndRespond since this path doesn't need the
// error to be transformed.
StatusToPB(s, resp->mutable_error()->mutable_status());
resp->mutable_error()->set_code(code);
context->RespondNoCache();
}
};
tablet_manager_->StartTabletCopy(req, response_callback);
}
void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
ScannerKeepAliveResponsePB *resp,
rpc::RpcContext *context) {
DCHECK(req->has_scanner_id());
SharedScanner scanner;
if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) {
resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED);
Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
req->scanner_id()));
StatusToPB(s, resp->mutable_error()->mutable_status());
LOG(INFO) << Substitute("ScannerKeepAlive: $0: remote=$1",
s.ToString(), context->requestor_string());
context->RespondSuccess();
return;
}
scanner->UpdateAccessTime();
context->RespondSuccess();
}
namespace {
void SetResourceMetrics(ResourceMetricsPB* metrics, rpc::RpcContext* context) {
metrics->set_cfile_cache_miss_bytes(
context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_MISS_BYTES_METRIC_NAME));
metrics->set_cfile_cache_hit_bytes(
context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_HIT_BYTES_METRIC_NAME));
}
} // anonymous namespace
void TabletServiceImpl::Scan(const ScanRequestPB* req,
ScanResponsePB* resp,
rpc::RpcContext* context) {
TRACE_EVENT0("tserver", "TabletServiceImpl::Scan");
// Validate the request: user must pass a new_scan_request or
// a scanner ID, but not both.
if (PREDICT_FALSE(req->has_scanner_id() &&
req->has_new_scan_request())) {
context->RespondFailure(Status::InvalidArgument(
"Must not pass both a scanner_id and new_scan_request"));
return;
}
size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
RowwiseRowBlockPB data;
ScanResultCopier collector(&data, rows_data.get(), indirect_data.get());
bool has_more_results = false;
TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR;
if (req->has_new_scan_request()) {
const NewScanRequestPB& scan_pb = req->new_scan_request();
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), scan_pb.tablet_id(), resp,
context, &replica)) {
return;
}
string scanner_id;
Timestamp scan_timestamp;
Status s = HandleNewScanRequest(replica.get(), req, context,
&collector, &scanner_id, &scan_timestamp, &has_more_results,
&error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
// Only set the scanner id if we have more results.
if (has_more_results) {
resp->set_scanner_id(scanner_id);
}
if (scan_timestamp != Timestamp::kInvalidTimestamp) {
resp->set_snap_timestamp(scan_timestamp.ToUint64());
}
} else if (req->has_scanner_id()) {
Status s = HandleContinueScanRequest(req, context, &collector, &has_more_results, &error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
} else {
context->RespondFailure(Status::InvalidArgument(
"Must pass either a scanner_id or new_scan_request"));
return;
}
resp->set_has_more_results(has_more_results);
resp->mutable_data()->CopyFrom(data);
// Add sidecar data to context and record the returned indices.
int rows_idx;
CHECK_OK(context->AddOutboundSidecar(
RpcSidecar::FromFaststring((std::move(rows_data))), &rows_idx));
resp->mutable_data()->set_rows_sidecar(rows_idx);
// Add indirect data as a sidecar, if applicable.
if (indirect_data->size() > 0) {
int indirect_idx;
CHECK_OK(context->AddOutboundSidecar(
RpcSidecar::FromFaststring(std::move(indirect_data)), &indirect_idx));
resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
}
// Set the last row found by the collector.
//
// We could have an empty batch if all the remaining rows are filtered by the
// predicate, in which case do not set the last row.
const faststring& last = collector.last_primary_key();
if (last.length() > 0) {
resp->set_last_primary_key(last.ToString());
}
resp->set_propagated_timestamp(server_->clock()->Now().ToUint64());
SetResourceMetrics(resp->mutable_resource_metrics(), context);
context->RespondSuccess();
}
void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
ListTabletsResponsePB* resp,
rpc::RpcContext* context) {
vector<scoped_refptr<TabletReplica>> replicas;
server_->tablet_manager()->GetTabletReplicas(&replicas);
RepeatedPtrField<StatusAndSchemaPB>* replica_status = resp->mutable_status_and_schema();
for (const scoped_refptr<TabletReplica>& replica : replicas) {
StatusAndSchemaPB* status = replica_status->Add();
replica->GetTabletStatusPB(status->mutable_tablet_status());
if (req->need_schema_info()) {
CHECK_OK(SchemaToPB(replica->tablet_metadata()->schema(),
status->mutable_schema()));
replica->tablet_metadata()->partition_schema().ToPB(status->mutable_partition_schema());
}
}
context->RespondSuccess();
}
void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
SplitKeyRangeResponsePB* resp,
rpc::RpcContext* context) {
TRACE_EVENT1("tserver", "TabletServiceImpl::SplitKeyRange",
"tablet_id", req->tablet_id());
DVLOG(3) << "Received SplitKeyRange RPC: " << SecureDebugString(*req);
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
context, &replica)) {
return;
}
shared_ptr<Tablet> tablet;
TabletServerErrorPB::Code error_code;
Status s = GetTabletRef(replica, &tablet, &error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
// Decode encoded key
Arena arena(256);
Schema tablet_schema = replica->tablet_metadata()->schema();
gscoped_ptr<EncodedKey> start, stop;
if (req->has_start_primary_key()) {
s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->start_primary_key(), &start);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid SplitKeyRange start primary key"),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
}
if (req->has_stop_primary_key()) {
s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->stop_primary_key(), &stop);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid SplitKeyRange stop primary key"),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
}
if (req->has_start_primary_key() && req->has_stop_primary_key()) {
// Validate the start key is less than the stop key, if they are both set
if (start->encoded_key().compare(stop->encoded_key()) > 0) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid primary key range"),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
}
// Validate the column are valid
Schema schema;
s = ColumnPBsToSchema(req->columns(), &schema);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(),
s,
TabletServerErrorPB::INVALID_SCHEMA,
context);
return;
}
if (schema.has_column_ids()) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("User requests should not have Column IDs"),
TabletServerErrorPB::INVALID_SCHEMA,
context);
return;
}
vector<ColumnId> column_ids;
for (const ColumnSchema& column : schema.columns()) {
int column_id = tablet_schema.find_column(column.name());
if (PREDICT_FALSE(column_id == Schema::kColumnNotFound)) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument(
"Invalid SplitKeyRange column name", column.name()),
TabletServerErrorPB::INVALID_SCHEMA,
context);
return;
}
column_ids.emplace_back(column_id);
}
// Validate the target chunk size are valid
if (req->target_chunk_size_bytes() == 0) {
SetupErrorAndRespond(resp->mutable_error(),
Status::InvalidArgument("Invalid SplitKeyRange target chunk size"),
TabletServerErrorPB::UNKNOWN_ERROR,
context);
return;
}
vector<KeyRange> ranges;
tablet->SplitKeyRange(start.get(), stop.get(), column_ids,
req->target_chunk_size_bytes(), &ranges);
for (auto range : ranges) {
range.ToPB(resp->add_ranges());
}
context->RespondSuccess();
}
void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
ChecksumResponsePB* resp,
rpc::RpcContext* context) {
VLOG(1) << "Full request: " << SecureDebugString(*req);
// Validate the request: user must pass a new_scan_request or
// a scanner ID, but not both.
if (PREDICT_FALSE(req->has_new_request() &&
req->has_continue_request())) {
context->RespondFailure(Status::InvalidArgument(
"Must not pass both a scanner_id and new_scan_request"));
return;
}
// Convert ChecksumRequestPB to a ScanRequestPB.
ScanRequestPB scan_req;
if (req->has_call_seq_id()) scan_req.set_call_seq_id(req->call_seq_id());
if (req->has_batch_size_bytes()) scan_req.set_batch_size_bytes(req->batch_size_bytes());
if (req->has_close_scanner()) scan_req.set_close_scanner(req->close_scanner());
ScanResultChecksummer collector;
bool has_more = false;
TabletServerErrorPB::Code error_code;
if (req->has_new_request()) {
scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
const NewScanRequestPB& new_req = req->new_request();
scoped_refptr<TabletReplica> replica;
if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp,
context, &replica)) {
return;
}
string scanner_id;
Timestamp snap_timestamp;
Status s = HandleNewScanRequest(replica.get(), &scan_req, context,
&collector, &scanner_id, &snap_timestamp, &has_more,
&error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
resp->set_scanner_id(scanner_id);
if (snap_timestamp != Timestamp::kInvalidTimestamp) {
resp->set_snap_timestamp(snap_timestamp.ToUint64());
}
} else if (req->has_continue_request()) {
const ContinueChecksumRequestPB& continue_req = req->continue_request();
collector.set_agg_checksum(continue_req.previous_checksum());
scan_req.set_scanner_id(continue_req.scanner_id());
Status s = HandleContinueScanRequest(&scan_req, context, &collector, &has_more, &error_code);
if (PREDICT_FALSE(!s.ok())) {
SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
return;
}
} else {
context->RespondFailure(Status::InvalidArgument(
"Must pass either new_request or continue_request"));
return;
}
resp->set_checksum(collector.agg_checksum());
resp->set_has_more_results(has_more);
SetResourceMetrics(resp->mutable_resource_metrics(), context);
resp->set_rows_checksummed(collector.rows_checksummed());
context->RespondSuccess();
}
bool TabletServiceImpl::SupportsFeature(uint32_t feature) const {
switch (feature) {
case TabletServerFeatures::COLUMN_PREDICATES:
case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
return true;
default:
return false;
}
}
void TabletServiceImpl::Shutdown() {
}
// Extract a void* pointer suitable for use in a ColumnRangePredicate from the
// user-specified protobuf field.
// This validates that the pb_value has the correct length, copies the data into
// 'arena', and sets *result to point to it.
// Returns bad status if the user-specified value is the wrong length.
static Status ExtractPredicateValue(const ColumnSchema& schema,
const string& pb_value,
Arena* arena,
const void** result) {
// Copy the data from the protobuf into the Arena.
uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(pb_value.size()));
memcpy(data_copy, &pb_value[0], pb_value.size());
// If the type is of variable length, then we need to return a pointer to a Slice
// element pointing to the string. Otherwise, just verify that the provided
// value was the right size.
if (schema.type_info()->physical_type() == BINARY) {
*result = arena->NewObject<Slice>(data_copy, pb_value.size());
} else {
// TODO: add test case for this invalid request
size_t expected_size = schema.type_info()->size();
if (pb_value.size() != expected_size) {
return Status::InvalidArgument(
StringPrintf("Bad predicate on %s. Expected value size %zd, got %zd",
schema.ToString().c_str(), expected_size, pb_value.size()));
}
*result = data_copy;
}
return Status::OK();
}
static Status DecodeEncodedKeyRange(const NewScanRequestPB& scan_pb,
const Schema& tablet_schema,
const SharedScanner& scanner,
ScanSpec* spec) {
gscoped_ptr<EncodedKey> start, stop;
if (scan_pb.has_start_primary_key()) {
RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString(
tablet_schema, scanner->arena(),
scan_pb.start_primary_key(), &start),
"Invalid scan start key");
}
if (scan_pb.has_stop_primary_key()) {
RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString(
tablet_schema, scanner->arena(),
scan_pb.stop_primary_key(), &stop),
"Invalid scan stop key");
}
if (scan_pb.order_mode() == ORDERED && scan_pb.has_last_primary_key()) {
if (start) {
return Status::InvalidArgument("Cannot specify both a start key and a last key");
}
// Set the start key to the last key from a previous scan result.
RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString(tablet_schema, scanner->arena(),
scan_pb.last_primary_key(), &start),
"Failed to decode last primary key");
// Increment the start key, so we don't return the last row again.
RETURN_NOT_OK_PREPEND(EncodedKey::IncrementEncodedKey(tablet_schema, &start, scanner->arena()),
"Failed to increment encoded last row key");
}
if (start) {
spec->SetLowerBoundKey(start.get());
scanner->autorelease_pool()->Add(start.release());
}
if (stop) {
spec->SetExclusiveUpperBoundKey(stop.get());
scanner->autorelease_pool()->Add(stop.release());
}
return Status::OK();
}
static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
const Schema& tablet_schema,
const Schema& projection,
vector<ColumnSchema>* missing_cols,
gscoped_ptr<ScanSpec>* spec,
const SharedScanner& scanner) {
gscoped_ptr<ScanSpec> ret(new ScanSpec);
ret->set_cache_blocks(scan_pb.cache_blocks());
unordered_set<string> missing_col_names;
// First the column predicates.
for (const ColumnPredicatePB& pred_pb : scan_pb.column_predicates()) {
boost::optional<ColumnPredicate> predicate;
RETURN_NOT_OK(ColumnPredicateFromPB(tablet_schema, scanner->arena(), pred_pb, &predicate));
if (projection.find_column(predicate->column().name()) == Schema::kColumnNotFound &&
!ContainsKey(missing_col_names, predicate->column().name())) {
InsertOrDie(&missing_col_names, predicate->column().name());
missing_cols->push_back(predicate->column());
}
ret->AddPredicate(std::move(*predicate));
}
// Then the column range predicates.
// TODO: remove this once all clients have moved to ColumnPredicatePB and
// backwards compatibility can be broken.
for (const ColumnRangePredicatePB& pred_pb : scan_pb.deprecated_range_predicates()) {
if (!pred_pb.has_lower_bound() && !pred_pb.has_inclusive_upper_bound()) {
return Status::InvalidArgument(
string("Invalid predicate ") + SecureShortDebugString(pred_pb) +
": has no lower or upper bound.");
}
ColumnSchema col(ColumnSchemaFromPB(pred_pb.column()));
if (projection.find_column(col.name()) == Schema::kColumnNotFound &&
!ContainsKey(missing_col_names, col.name())) {
missing_cols->push_back(col);
InsertOrDie(&missing_col_names, col.name());
}
const void* lower_bound = nullptr;
const void* upper_bound = nullptr;
if (pred_pb.has_lower_bound()) {
const void* val;
RETURN_NOT_OK(ExtractPredicateValue(col, pred_pb.lower_bound(),
scanner->arena(),
&val));
lower_bound = val;
}
if (pred_pb.has_inclusive_upper_bound()) {
const void* val;
RETURN_NOT_OK(ExtractPredicateValue(col, pred_pb.inclusive_upper_bound(),
scanner->arena(),
&val));
upper_bound = val;
}
auto pred = ColumnPredicate::InclusiveRange(col, lower_bound, upper_bound, scanner->arena());
if (pred) {
if (VLOG_IS_ON(3)) {
VLOG(3) << "Parsed predicate " << pred->ToString()
<< " from " << SecureShortDebugString(scan_pb);
}
ret->AddPredicate(*pred);
}
}
// When doing an ordered scan, we need to include the key columns to be able to encode
// the last row key for the scan response.
if (scan_pb.order_mode() == kudu::ORDERED &&
projection.num_key_columns() != tablet_schema.num_key_columns()) {
for (int i = 0; i < tablet_schema.num_key_columns(); i++) {
const ColumnSchema &col = tablet_schema.column(i);
if (projection.find_column(col.name()) == -1 &&
!ContainsKey(missing_col_names, col.name())) {
missing_cols->push_back(col);
InsertOrDie(&missing_col_names, col.name());
}
}
}
// Then any encoded key range predicates.
RETURN_NOT_OK(DecodeEncodedKeyRange(scan_pb, tablet_schema, scanner, ret.get()));
// If the scanner has a limit, set it now.
if (scan_pb.has_limit()) {
ret->set_limit(scan_pb.limit());
}
spec->swap(ret);
return Status::OK();
}
namespace {
// Checks if 'timestamp' is before the tablet's AHM if this is a
// READ_AT_SNAPSHOT/READ_YOUR_WRITES scan. Returns Status::OK() if it's
// not or Status::InvalidArgument() if it is.
Status VerifyNotAncientHistory(Tablet* tablet, ReadMode read_mode, Timestamp timestamp) {
tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts();
if ((read_mode == READ_AT_SNAPSHOT || read_mode == READ_YOUR_WRITES) &&
history_gc_opts.IsAncientHistory(timestamp)) {
return Status::InvalidArgument(
Substitute("Snapshot timestamp is earlier than the ancient history mark. Consider "
"increasing the value of the configuration parameter "
"--tablet_history_max_age_sec. Snapshot timestamp: $0 "
"Ancient History Mark: $1 Physical time difference: $2",
tablet->clock()->Stringify(timestamp),
tablet->clock()->Stringify(history_gc_opts.ancient_history_mark()),
tablet->clock()->GetPhysicalComponentDifference(
timestamp, history_gc_opts.ancient_history_mark()).ToString()));
}
return Status::OK();
}
} // anonymous namespace
// Start a new scan.
Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
const ScanRequestPB* req,
const RpcContext* rpc_context,
ScanResultCollector* result_collector,
std::string* scanner_id,
Timestamp* snap_timestamp,
bool* has_more_results,
TabletServerErrorPB::Code* error_code) {
DCHECK(result_collector != nullptr);
DCHECK(error_code != nullptr);
DCHECK(req->has_new_scan_request());
const NewScanRequestPB& scan_pb = req->new_scan_request();
TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest",
"tablet_id", scan_pb.tablet_id());
const Schema& tablet_schema = replica->tablet_metadata()->schema();
SharedScanner scanner;
server_->scanner_manager()->NewScanner(replica,
rpc_context->requestor_string(),
scan_pb.row_format_flags(),
&scanner);
TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
// If we early-exit out of this function, automatically unregister
// the scanner.
ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
// Create the user's requested projection.
// TODO: add test cases for bad projections including 0 columns
Schema projection;
Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection);
if (PREDICT_FALSE(!s.ok())) {
*error_code = TabletServerErrorPB::INVALID_SCHEMA;
return s;
}
if (projection.has_column_ids()) {
*error_code = TabletServerErrorPB::INVALID_SCHEMA;
return Status::InvalidArgument("User requests should not have Column IDs");
}
if (scan_pb.order_mode() == ORDERED) {
// Ordered scans must be at a snapshot so that we perform a serializable read (which can be
// resumed). Otherwise, this would be read committed isolation, which is not resumable.
if (scan_pb.read_mode() != READ_AT_SNAPSHOT) {
*error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
return Status::InvalidArgument("Cannot do an ordered scan that is not a snapshot read");
}
}
gscoped_ptr<ScanSpec> spec(new ScanSpec);
// Missing columns will contain the columns that are not mentioned in the client
// projection but are actually needed for the scan, such as columns referred to by
// predicates or key columns (if this is an ORDERED scan).
vector<ColumnSchema> missing_cols;
s = SetupScanSpec(scan_pb, tablet_schema, projection, &missing_cols, &spec, scanner);
if (PREDICT_FALSE(!s.ok())) {
*error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
return s;
}
VLOG(3) << "Before optimizing scan spec: " << spec->ToString(tablet_schema);
spec->OptimizeScan(tablet_schema, scanner->arena(), scanner->autorelease_pool(), true);
VLOG(3) << "After optimizing scan spec: " << spec->ToString(tablet_schema);
if (spec->CanShortCircuit()) {
VLOG(1) << "short-circuiting without creating a server-side scanner.";
*has_more_results = false;
return Status::OK();
}
// Store the original projection.
gscoped_ptr<Schema> orig_projection(new Schema(projection));
scanner->set_client_projection_schema(std::move(orig_projection));
// Build a new projection with the projection columns and the missing columns. Make
// sure to set whether the column is a key column appropriately.
SchemaBuilder projection_builder;
vector<ColumnSchema> projection_columns = projection.columns();
for (const ColumnSchema& col : missing_cols) {
projection_columns.push_back(col);
}
for (const ColumnSchema& col : projection_columns) {
CHECK_OK(projection_builder.AddColumn(col, tablet_schema.is_key_column(col.name())));
}
projection = projection_builder.BuildWithoutIds();
gscoped_ptr<RowwiseIterator> iter;
// Preset the error code for when creating the iterator on the tablet fails
TabletServerErrorPB::Code tmp_error_code = TabletServerErrorPB::MISMATCHED_SCHEMA;
// It's important to keep the reference to the tablet for the case when the
// tablet replica's shutdown is run concurrently with the code below.
shared_ptr<Tablet> tablet;
RETURN_NOT_OK(GetTabletRef(replica, &tablet, error_code));
// Ensure the tablet has a valid clean time.
s = tablet->mvcc_manager()->CheckIsSafeTimeInitialized();
if (!s.ok()) {
LOG(WARNING) << Substitute("Rejecting scan request for tablet $0: $1",
tablet->tablet_id(), s.ToString());
// Return TABLET_NOT_RUNNING so the scan can be handled appropriately (fail
// over to another tablet server if fault-tolerant).
*error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
return s;
}
{
TRACE("Creating iterator");
TRACE_EVENT0("tserver", "Create iterator");
switch (scan_pb.read_mode()) {
case UNKNOWN_READ_MODE: {
*error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
s = Status::NotSupported("Unknown read mode.");
return s;
}
case READ_LATEST: {
s = tablet->NewRowIterator(projection, &iter);
break;
}
case READ_YOUR_WRITES: // Fallthrough intended
case READ_AT_SNAPSHOT: {
scoped_refptr<consensus::TimeManager> time_manager = replica->time_manager();
s = HandleScanAtSnapshot(scan_pb, rpc_context, projection, tablet.get(),
time_manager.get(), &iter, snap_timestamp);
// If we got a Status::ServiceUnavailable() from HandleScanAtSnapshot() it might
// mean we're just behind so let the client try again.
if (s.IsServiceUnavailable()) {
*error_code = TabletServerErrorPB::THROTTLED;
return s;
}
if (!s.ok()) {
tmp_error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
}
break;
}
}
TRACE("Iterator created");
}
// Make a copy of the optimized spec before it's passed to the iterator.
// This copy will be given to the Scanner so it can report its predicates to
// /scans. The copy is necessary because the original spec will be modified
// as its predicates are pushed into lower-level iterators.
gscoped_ptr<ScanSpec> orig_spec(new ScanSpec(*spec));
if (PREDICT_TRUE(s.ok())) {
TRACE_EVENT0("tserver", "iter->Init");
s = iter->Init(spec.get());
}
TRACE("Iterator init: $0", s.ToString());
if (PREDICT_FALSE(s.IsInvalidArgument())) {
// An invalid projection returns InvalidArgument above.
// TODO: would be nice if we threaded these more specific
// error codes throughout Kudu.
*error_code = tmp_error_code;
return s;
}
if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << Substitute("Error setting up scanner with request $0: $1",
SecureShortDebugString(*req), s.ToString());
// If the replica has been stopped, e.g. due to disk failure, return
// TABLET_FAILED so the scan can be handled appropriately (fail over to
// another tablet server if fault-tolerant).
*error_code = tablet->HasBeenStopped() ?
TabletServerErrorPB::TABLET_FAILED : TabletServerErrorPB::UNKNOWN_ERROR;
return s;
}
// If this is a snapshot scan and the user specified a specific timestamp to
// scan at, then check that we are not attempting to scan at a time earlier
// than the ancient history mark. Only perform this check if tablet history
// GC is enabled.
//
// TODO: This validation essentially prohibits scans with READ_AT_SNAPSHOT
// when history_max_age is set to zero. There is a tablet history GC related
// race when the history max age is set to very low, or zero. Imagine a case
// where a scan was started and READ_AT_SNAPSHOT was specified without
// specifying a snapshot timestamp, and --tablet_history_max_age_sec=0. The
// above code path will select the latest timestamp (under a lock) prior to
// calling RowIterator::Init(), which actually opens the blocks. That means
// that there is an opportunity in between those two calls for tablet history
// GC to kick in and delete some history. In fact, we may easily not actually
// end up with a valid snapshot in that case. It would be more correct to
// initialize the row iterator and then select the latest timestamp
// represented by those open files in that case.
//
// Now that we have initialized our row iterator at a snapshot, return an
// error if the snapshot timestamp was prior to the ancient history mark.
// We have to check after we open the iterator in order to avoid a TOCTOU
// error.
s = VerifyNotAncientHistory(tablet.get(), scan_pb.read_mode(), *snap_timestamp);
if (!s.ok()) {
*error_code = TabletServerErrorPB::INVALID_SNAPSHOT;
return s;
}
*has_more_results = iter->HasNext() && !scanner->has_fulfilled_limit();
TRACE("has_more: $0", *has_more_results);
if (!*has_more_results) {
// If there are no more rows, we can short circuit some work and respond immediately.
VLOG(1) << "No more rows, short-circuiting out without creating a server-side scanner.";
return Status::OK();
}
scanner->Init(std::move(iter), std::move(orig_spec));
unreg_scanner.Cancel();
*scanner_id = scanner->id();
VLOG(1) << "Started scanner " << scanner->id() << ": " << scanner->iter()->ToString();
size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
if (batch_size_bytes > 0) {
TRACE("Continuing scan request");
// TODO: instead of copying the pb, instead split HandleContinueScanRequest
// and call the second half directly
ScanRequestPB continue_req(*req);
continue_req.set_scanner_id(scanner->id());
RETURN_NOT_OK(HandleContinueScanRequest(&continue_req, rpc_context, result_collector,
has_more_results, error_code));
} else {
// Increment the scanner call sequence ID. HandleContinueScanRequest handles
// this in the non-empty scan case.
scanner->IncrementCallSeqId();
}
return Status::OK();
}
// Continue an existing scan request.
Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
const RpcContext* rpc_context,
ScanResultCollector* result_collector,
bool* has_more_results,
TabletServerErrorPB::Code* error_code) {
DCHECK(req->has_scanner_id());
TRACE_EVENT1("tserver", "TabletServiceImpl::HandleContinueScanRequest",
"scanner_id", req->scanner_id());
size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
// TODO(todd): need some kind of concurrency control on these scanner objects
// in case multiple RPCs hit the same scanner at the same time. Probably
// just a trylock and fail the RPC if it contends.
SharedScanner scanner;
if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) {
if (batch_size_bytes == 0 && req->close_scanner()) {
// Silently ignore any request to close a non-existent scanner.
return Status::OK();
}
*error_code = TabletServerErrorPB::SCANNER_EXPIRED;
Status s = Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
req->scanner_id()));
LOG(INFO) << Substitute("Scan: $0: call sequence id=$1, remote=$2",
s.ToString(), req->call_seq_id(), rpc_context->requestor_string());
return s;
}
// Set the row format flags on the ScanResultCollector.
result_collector->set_row_format_flags(scanner->row_format_flags());
// If we early-exit out of this function, automatically unregister the scanner.
ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id());
VLOG(2) << "Found existing scanner " << scanner->id() << " for request: "
<< SecureShortDebugString(*req);
TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
if (batch_size_bytes == 0 && req->close_scanner()) {
*has_more_results = false;
return Status::OK();
}
if (req->call_seq_id() != scanner->call_seq_id()) {
*error_code = TabletServerErrorPB::INVALID_SCAN_CALL_SEQ_ID;
return Status::InvalidArgument("Invalid call sequence ID in scan request");
}
scanner->IncrementCallSeqId();
scanner->UpdateAccessTime();
RowwiseIterator* iter = scanner->iter();
// TODO(todd): could size the RowBlock based on the user's requested batch size?
// If people had really large indirect objects, we would currently overshoot
// their requested batch size by a lot.
Arena arena(32 * 1024);
RowBlock block(scanner->iter()->schema(),
FLAGS_scanner_batch_size_rows, &arena);
// TODO(todd): in the future, use the client timeout to set a budget. For now,
// just use a half second, which should be plenty to amortize call overhead.
int budget_ms = 500;
MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(budget_ms);
int64_t rows_scanned = 0;
while (iter->HasNext() && !scanner->has_fulfilled_limit()) {
if (PREDICT_FALSE(FLAGS_scanner_inject_latency_on_each_batch_ms > 0)) {
SleepFor(MonoDelta::FromMilliseconds(FLAGS_scanner_inject_latency_on_each_batch_ms));
}
Status s = iter->NextBlock(&block);
if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << "Copying rows from internal iterator for request "
<< SecureShortDebugString(*req);
*error_code = TabletServerErrorPB::UNKNOWN_ERROR;
return s;
}
if (PREDICT_TRUE(block.nrows() > 0)) {
// Count the number of rows scanned, regardless of predicates or deletions.
// The collector will separately count the number of rows actually returned to
// the client.
rows_scanned += block.nrows();
if (scanner->spec().has_limit()) {
int64_t rows_left = scanner->spec().limit() - scanner->num_rows_returned();
DCHECK_GT(rows_left, 0); // Guaranteed by has_fulfilled_limit()
block.selection_vector()->ClearToSelectAtMost(static_cast<size_t>(rows_left));
}
result_collector->HandleRowBlock(scanner.get(), block);
}
int64_t response_size = result_collector->ResponseSize();
if (VLOG_IS_ON(2)) {
// This may be fairly expensive if row block size is small
TRACE("Copied block (nrows=$0), new size=$1", block.nrows(), response_size);
}
// TODO: should check if RPC got cancelled, once we implement RPC cancellation.
if (PREDICT_FALSE(MonoTime::Now() >= deadline)) {
TRACE("Deadline expired - responding early");
break;
}
if (response_size >= batch_size_bytes) {
break;
}
}
scoped_refptr<TabletReplica> replica = scanner->tablet_replica();
shared_ptr<Tablet> tablet;
TabletServerErrorPB::Code tablet_ref_error_code;
const Status s = GetTabletRef(replica, &tablet, &tablet_ref_error_code);
// If the tablet is not running, but the scan operation in progress
// has reached this point, the tablet server has the necessary data to
// send in response for the scan continuation request.
if (PREDICT_FALSE(!s.ok() && tablet_ref_error_code !=
TabletServerErrorPB::TABLET_NOT_RUNNING)) {
*error_code = tablet_ref_error_code;
return s;
}
// Update metrics based on this scan request.
if (tablet) {
// First, the number of rows/cells/bytes actually returned to the user.
tablet->metrics()->scanner_rows_returned->IncrementBy(
result_collector->NumRowsReturned());
tablet->metrics()->scanner_cells_returned->IncrementBy(
result_collector->NumRowsReturned() *
scanner->client_projection_schema()->num_columns());
tablet->metrics()->scanner_bytes_returned->IncrementBy(
result_collector->ResponseSize());
}
// Then the number of rows/cells/bytes actually processed. Here we have to dig
// into the per-column iterator stats, sum them up, and then subtract out the
// total that we already reported in a previous scan.
vector<IteratorStats> stats_by_col;
scanner->GetIteratorStats(&stats_by_col);
IteratorStats total_stats = std::accumulate(stats_by_col.begin(),
stats_by_col.end(),
IteratorStats());
IteratorStats delta_stats = total_stats - scanner->already_reported_stats();
scanner->set_already_reported_stats(total_stats);
if (tablet) {
tablet->metrics()->scanner_rows_scanned->IncrementBy(rows_scanned);
tablet->metrics()->scanner_cells_scanned_from_disk->IncrementBy(delta_stats.cells_read);
tablet->metrics()->scanner_bytes_scanned_from_disk->IncrementBy(delta_stats.bytes_read);
}
scanner->UpdateAccessTime();
*has_more_results = !req->close_scanner() && iter->HasNext() &&
!scanner->has_fulfilled_limit();
if (*has_more_results) {
unreg_scanner.Cancel();
} else {
VLOG(2) << "Scanner " << scanner->id() << " complete: removing...";
}
return Status::OK();
}
namespace {
// Helper to clamp a client deadline for a scan to the max supported by the server.
MonoTime ClampScanDeadlineForWait(const MonoTime& deadline, bool* was_clamped) {
MonoTime now = MonoTime::Now();
if (deadline.GetDeltaSince(now).ToMilliseconds() > FLAGS_scanner_max_wait_ms) {
*was_clamped = true;
return now + MonoDelta::FromMilliseconds(FLAGS_scanner_max_wait_ms);
}
*was_clamped = false;
return deadline;
}
} // anonymous namespace
Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb,
const RpcContext* rpc_context,
const Schema& projection,
Tablet* tablet,
consensus::TimeManager* time_manager,
gscoped_ptr<RowwiseIterator>* iter,
Timestamp* snap_timestamp) {
switch (scan_pb.read_mode()) {
case READ_AT_SNAPSHOT: // Fallthrough intended
case READ_YOUR_WRITES:
break;
default:
LOG(FATAL) << "Unsupported snapshot scan mode specified.";
}
// Based on the read mode, pick a timestamp and verify it.
Timestamp tmp_snap_timestamp;
RETURN_NOT_OK(PickAndVerifyTimestamp(scan_pb, tablet, &tmp_snap_timestamp));
// Reduce the client's deadline by a few msecs to allow for overhead.
MonoTime client_deadline = rpc_context->GetClientDeadline() - MonoDelta::FromMilliseconds(10);
// Its not good for the tablet server or for the client if we hang here forever. The tablet
// server will have one less available thread and the client might be stuck spending all
// of the allotted time for the scan on a partitioned server that will never have a consistent
// snapshot at 'snap_timestamp'.
// Because of this we clamp the client's deadline to the max. configured. If the client
// sets a long timeout then it can use it by trying in other servers.
bool was_clamped = false;
MonoTime final_deadline = ClampScanDeadlineForWait(client_deadline, &was_clamped);
// Wait for the tablet to know that 'snap_timestamp' is safe. I.e. that all operations
// that came before it are, at least, started. This, together with waiting for the mvcc
// snapshot to be clean below, allows us to always return the same data when scanning at
// the same timestamp (repeatable reads).
TRACE("Waiting safe time to advance");
MonoTime before = MonoTime::Now();
Status s = time_manager->WaitUntilSafe(tmp_snap_timestamp, final_deadline);
tablet::MvccSnapshot snap;
tablet::MvccManager* mvcc_manager = tablet->mvcc_manager();
if (s.ok()) {
// Wait for the in-flights in the snapshot to be finished.
TRACE("Waiting for operations to commit");
s = mvcc_manager->WaitForSnapshotWithAllCommitted(tmp_snap_timestamp, &snap, client_deadline);
}
// If we got an TimeOut but we had clamped the deadline, return a ServiceUnavailable instead
// so that the client retries.
if (s.IsTimedOut() && was_clamped) {
return Status::ServiceUnavailable(s.CloneAndPrepend(
"could not wait for desired snapshot timestamp to be consistent").ToString());
}
RETURN_NOT_OK(s);
uint64_t duration_usec = (MonoTime::Now() - before).ToMicroseconds();
tablet->metrics()->snapshot_read_inflight_wait_duration->Increment(duration_usec);
TRACE("All operations in snapshot committed. Waited for $0 microseconds", duration_usec);
if (scan_pb.order_mode() == UNKNOWN_ORDER_MODE) {
return Status::InvalidArgument("Unknown order mode specified");
}
RETURN_NOT_OK(tablet->NewRowIterator(projection, snap, scan_pb.order_mode(), iter));
// Return the picked snapshot timestamp for both READ_AT_SNAPSHOT
// and READ_YOUR_WRITES mode.
*snap_timestamp = tmp_snap_timestamp;
return Status::OK();
}
Status TabletServiceImpl::ValidateTimestamp(const Timestamp& snap_timestamp) {
Timestamp max_allowed_ts;
Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts);
if (PREDICT_FALSE(s.IsNotSupported()) &&
PREDICT_TRUE(!FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps)) {
return Status::NotSupported("Snapshot scans not supported on this server",
s.ToString());
}
// Note: if 'max_allowed_ts' is not obtained from clock_->GetGlobalLatest(), e.g.,
// in case logical clock is used, it's guaranteed to be higher than 'tmp_snap_timestamp',
// since 'max_allowed_ts' is default-constructed to kInvalidTimestamp (MAX_LONG - 1).
if (snap_timestamp > max_allowed_ts) {
return Status::InvalidArgument(
Substitute("Snapshot time $0 in the future. Max allowed timestamp is $1",
server_->clock()->Stringify(snap_timestamp),
server_->clock()->Stringify(max_allowed_ts)));
}
return Status::OK();
}
Status TabletServiceImpl::PickAndVerifyTimestamp(const NewScanRequestPB& scan_pb,
Tablet* tablet,
Timestamp* snap_timestamp) {
// If the client sent a timestamp update our clock with it.
if (scan_pb.has_propagated_timestamp()) {
Timestamp propagated_timestamp(scan_pb.propagated_timestamp());
// Update the clock so that we never generate snapshots lower than
// 'propagated_timestamp'. If 'propagated_timestamp' is lower than
// 'now' this call has no effect. If 'propagated_timestamp' is too far
// into the future this will fail and we abort.
RETURN_NOT_OK(server_->clock()->Update(propagated_timestamp));
}
Timestamp tmp_snap_timestamp;
ReadMode read_mode = scan_pb.read_mode();
tablet::MvccManager* mvcc_manager = tablet->mvcc_manager();
if (read_mode == READ_AT_SNAPSHOT) {
// For READ_AT_SNAPSHOT mode,
// 1) if the client provided no snapshot timestamp we take the current
// clock time as the snapshot timestamp.
// 2) else we use the client provided one, but make sure it is not too
// far in the future as to be invalid.
if (!scan_pb.has_snap_timestamp()) {
tmp_snap_timestamp = server_->clock()->Now();
} else {
tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp());
RETURN_NOT_OK(ValidateTimestamp(tmp_snap_timestamp));
}
} else {
// For READ_YOUR_WRITES mode, we use the following to choose a
// snapshot timestamp: MAX(propagated timestamp + 1, 'clean' timestamp).
// There is no need to validate if the chosen timestamp is too far in
// the future, since:
// 1) MVCC 'clean' timestamp is by definition in the past (it's maximally
// bounded by safe time).
// 2) the propagated timestamp was used to update the clock above and the
// update would have returned an error if the the timestamp was too
// far in the future.
uint64_t clean_timestamp = mvcc_manager->GetCleanTimestamp().ToUint64();
uint64_t propagated_timestamp = scan_pb.has_propagated_timestamp() ?
scan_pb.propagated_timestamp() : Timestamp::kMin.ToUint64();
tmp_snap_timestamp = Timestamp(std::max(propagated_timestamp + 1, clean_timestamp));
}
// Before we wait on anything check that the timestamp is after the AHM.
// This is not the final check. We'll check this again after the iterators are open but
// there is no point in waiting if we can't actually scan afterwards.
RETURN_NOT_OK(VerifyNotAncientHistory(tablet,
read_mode,
tmp_snap_timestamp));
*snap_timestamp = tmp_snap_timestamp;
return Status::OK();
}
} // namespace tserver
} // namespace kudu