| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tserver/tablet_service.h" |
| |
| #include <algorithm> |
| #include <boost/optional.hpp> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/scan_spec.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/consensus.h" |
| #include "kudu/gutil/bind.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/gutil/strings/escaping.h" |
| #include "kudu/rpc/rpc_context.h" |
| #include "kudu/rpc/rpc_sidecar.h" |
| #include "kudu/server/hybrid_clock.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet_bootstrap.h" |
| #include "kudu/tablet/tablet_metrics.h" |
| #include "kudu/tablet/tablet_peer.h" |
| #include "kudu/tablet/transactions/alter_schema_transaction.h" |
| #include "kudu/tablet/transactions/write_transaction.h" |
| #include "kudu/tserver/tablet_copy_service.h" |
| #include "kudu/tserver/scanners.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/util/crc.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/mem_tracker.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/status_callback.h" |
| #include "kudu/util/trace.h" |
| |
| DEFINE_int32(scanner_default_batch_size_bytes, 1024 * 1024, |
| "The default size for batches of scan results"); |
| TAG_FLAG(scanner_default_batch_size_bytes, advanced); |
| TAG_FLAG(scanner_default_batch_size_bytes, runtime); |
| |
| DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024, |
| "The maximum batch size that a client may request for " |
| "scan results."); |
| TAG_FLAG(scanner_max_batch_size_bytes, advanced); |
| TAG_FLAG(scanner_max_batch_size_bytes, runtime); |
| |
| DEFINE_int32(scanner_batch_size_rows, 100, |
| "The number of rows to batch for servicing scan requests."); |
| TAG_FLAG(scanner_batch_size_rows, advanced); |
| TAG_FLAG(scanner_batch_size_rows, runtime); |
| |
| // Fault injection flags. |
| DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0, |
| "If set, the scanner will pause the specified number of milliesconds " |
| "before reading each batch of data on the tablet server. " |
| "Used for tests."); |
| TAG_FLAG(scanner_inject_latency_on_each_batch_ms, unsafe); |
| |
| DECLARE_int32(memory_limit_warn_threshold_percentage); |
| |
| namespace kudu { |
| namespace cfile { |
| extern const char* CFILE_CACHE_MISS_BYTES_METRIC_NAME; |
| extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME; |
| } |
| } |
| |
| namespace kudu { |
| namespace tserver { |
| |
| using consensus::ChangeConfigRequestPB; |
| using consensus::ChangeConfigResponsePB; |
| using consensus::CONSENSUS_CONFIG_ACTIVE; |
| using consensus::CONSENSUS_CONFIG_COMMITTED; |
| using consensus::Consensus; |
| using consensus::ConsensusConfigType; |
| using consensus::ConsensusRequestPB; |
| using consensus::ConsensusResponsePB; |
| using consensus::GetLastOpIdRequestPB; |
| using consensus::GetNodeInstanceRequestPB; |
| using consensus::GetNodeInstanceResponsePB; |
| using consensus::LeaderStepDownRequestPB; |
| using consensus::LeaderStepDownResponsePB; |
| using consensus::RunLeaderElectionRequestPB; |
| using consensus::RunLeaderElectionResponsePB; |
| using consensus::StartTabletCopyRequestPB; |
| using consensus::StartTabletCopyResponsePB; |
| using consensus::VoteRequestPB; |
| using consensus::VoteResponsePB; |
| |
| using google::protobuf::RepeatedPtrField; |
| using rpc::ResultTracker; |
| using rpc::RpcContext; |
| using std::shared_ptr; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| using tablet::AlterSchemaTransactionState; |
| using tablet::Tablet; |
| using tablet::TabletPeer; |
| using tablet::TabletStatusPB; |
| using tablet::TransactionCompletionCallback; |
| using tablet::WriteTransactionState; |
| |
| namespace { |
| |
| // Lookup the given tablet, ensuring that it both exists and is RUNNING. |
| // If it is not, responds to the RPC associated with 'context' after setting |
| // resp->mutable_error() to indicate the failure reason. |
| // |
| // Returns true if successful. |
| template<class RespClass> |
| bool LookupTabletPeerOrRespond(TabletPeerLookupIf* tablet_manager, |
| const string& tablet_id, |
| RespClass* resp, |
| rpc::RpcContext* context, |
| scoped_refptr<TabletPeer>* peer) { |
| if (PREDICT_FALSE(!tablet_manager->GetTabletPeer(tablet_id, peer).ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::NotFound("Tablet not found"), |
| TabletServerErrorPB::TABLET_NOT_FOUND, context); |
| return false; |
| } |
| |
| // Check RUNNING state. |
| tablet::TabletStatePB state = (*peer)->state(); |
| if (PREDICT_FALSE(state != tablet::RUNNING)) { |
| Status s = Status::IllegalState("Tablet not RUNNING", |
| tablet::TabletStatePB_Name(state)); |
| if (state == tablet::FAILED) { |
| s = s.CloneAndAppend((*peer)->error().ToString()); |
| } |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::TABLET_NOT_RUNNING, context); |
| return false; |
| } |
| return true; |
| } |
| |
| template<class ReqClass, class RespClass> |
| bool CheckUuidMatchOrRespond(TabletPeerLookupIf* tablet_manager, |
| const char* method_name, |
| const ReqClass* req, |
| RespClass* resp, |
| rpc::RpcContext* context) { |
| const string& local_uuid = tablet_manager->NodeInstance().permanent_uuid(); |
| if (PREDICT_FALSE(!req->has_dest_uuid())) { |
| // Maintain compat in release mode, but complain. |
| string msg = Substitute("$0: Missing destination UUID in request from $1: $2", |
| method_name, context->requestor_string(), req->ShortDebugString()); |
| #ifdef NDEBUG |
| KLOG_EVERY_N(ERROR, 100) << msg; |
| #else |
| LOG(DFATAL) << msg; |
| #endif |
| return true; |
| } |
| if (PREDICT_FALSE(req->dest_uuid() != local_uuid)) { |
| Status s = Status::InvalidArgument(Substitute("$0: Wrong destination UUID requested. " |
| "Local UUID: $1. Requested UUID: $2", |
| method_name, local_uuid, req->dest_uuid())); |
| LOG(WARNING) << s.ToString() << ": from " << context->requestor_string() |
| << ": " << req->ShortDebugString(); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::WRONG_SERVER_UUID, context); |
| return false; |
| } |
| return true; |
| } |
| |
| template<class RespClass> |
| bool GetConsensusOrRespond(const scoped_refptr<TabletPeer>& tablet_peer, |
| RespClass* resp, |
| rpc::RpcContext* context, |
| scoped_refptr<Consensus>* consensus) { |
| *consensus = tablet_peer->shared_consensus(); |
| if (!*consensus) { |
| Status s = Status::ServiceUnavailable("Consensus unavailable. Tablet not running"); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::TABLET_NOT_RUNNING, context); |
| return false; |
| } |
| return true; |
| } |
| |
| Status GetTabletRef(const scoped_refptr<TabletPeer>& tablet_peer, |
| shared_ptr<Tablet>* tablet, |
| TabletServerErrorPB::Code* error_code) { |
| *DCHECK_NOTNULL(tablet) = tablet_peer->shared_tablet(); |
| if (PREDICT_FALSE(!*tablet)) { |
| *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING; |
| return Status::IllegalState("Tablet is not running"); |
| } |
| return Status::OK(); |
| } |
| |
| template <class RespType> |
| void HandleUnknownError(const Status& s, RespType* resp, RpcContext* context) { |
| resp->Clear(); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| } |
| |
| template <class ReqType, class RespType> |
| void HandleResponse(const ReqType* req, RespType* resp, |
| RpcContext* context, const Status& s) { |
| if (PREDICT_FALSE(!s.ok())) { |
| HandleUnknownError(s, resp, context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| template <class ReqType, class RespType> |
| static StatusCallback BindHandleResponse(const ReqType* req, RespType* resp, RpcContext* context) { |
| return Bind(&HandleResponse<ReqType, RespType>, req, resp, context); |
| } |
| |
| } // namespace |
| |
| typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB; |
| |
| static void SetupErrorAndRespond(TabletServerErrorPB* error, |
| const Status& s, |
| TabletServerErrorPB::Code code, |
| rpc::RpcContext* context) { |
| // Generic "service unavailable" errors will cause the client to retry later. |
| if ((code == TabletServerErrorPB::UNKNOWN_ERROR || |
| code == TabletServerErrorPB::THROTTLED) && s.IsServiceUnavailable()) { |
| context->RespondRpcFailure(rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY, s); |
| return; |
| } |
| |
| StatusToPB(s, error->mutable_status()); |
| error->set_code(code); |
| context->RespondNoCache(); |
| } |
| |
| template <class ReqType, class RespType> |
| void HandleErrorResponse(const ReqType* req, RespType* resp, RpcContext* context, |
| const boost::optional<TabletServerErrorPB::Code>& error_code, |
| const Status& s) { |
| resp->Clear(); |
| if (error_code) { |
| SetupErrorAndRespond(resp->mutable_error(), s, *error_code, context); |
| } else { |
| HandleUnknownError(s, resp, context); |
| } |
| } |
| |
| // A transaction completion callback that responds to the client when transactions |
| // complete and sets the client error if there is one to set. |
| template<class Response> |
| class RpcTransactionCompletionCallback : public TransactionCompletionCallback { |
| public: |
| RpcTransactionCompletionCallback(rpc::RpcContext* context, |
| Response* response) |
| : context_(context), |
| response_(response) {} |
| |
| virtual void TransactionCompleted() OVERRIDE { |
| if (!status_.ok()) { |
| SetupErrorAndRespond(get_error(), status_, code_, context_); |
| } else { |
| context_->RespondSuccess(); |
| } |
| }; |
| |
| private: |
| |
| TabletServerErrorPB* get_error() { |
| return response_->mutable_error(); |
| } |
| |
| rpc::RpcContext* context_; |
| Response* response_; |
| tablet::TransactionState* state_; |
| }; |
| |
| // Generic interface to handle scan results. |
| class ScanResultCollector { |
| public: |
| virtual void HandleRowBlock(const Schema* client_projection_schema, |
| const RowBlock& row_block) = 0; |
| |
| // Returns number of times HandleRowBlock() was called. |
| virtual int BlocksProcessed() const = 0; |
| |
| // Returns number of bytes which will be returned in the response. |
| virtual int64_t ResponseSize() const = 0; |
| |
| // Returns the last processed row's primary key. |
| virtual const faststring& last_primary_key() const = 0; |
| |
| // Return the number of rows actually returned to the client. |
| virtual int64_t NumRowsReturned() const = 0; |
| }; |
| |
| namespace { |
| |
| // Given a RowBlock, set last_primary_key to the primary key of the last selected row |
| // in the RowBlock. If no row is selected, last_primary_key is not set. |
| void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) { |
| // Find the last selected row and save its encoded key. |
| const SelectionVector* sel = row_block.selection_vector(); |
| if (sel->AnySelected()) { |
| for (int i = sel->nrows() - 1; i >= 0; i--) { |
| if (sel->IsRowSelected(i)) { |
| RowBlockRow last_row = row_block.row(i); |
| const Schema* schema = last_row.schema(); |
| schema->EncodeComparableKey(last_row, last_primary_key); |
| break; |
| } |
| } |
| } |
| } |
| |
| } // namespace |
| |
| // Copies the scan result to the given row block PB and data buffers. |
| // |
| // This implementation is used in the common case where a client is running |
| // a scan and the data needs to be returned to the client. |
| // |
| // (This is in contrast to some other ScanResultCollector implementation that |
| // might do an aggregation or gather some other types of statistics via a |
| // server-side scan and thus never need to return the actual data.) |
| class ScanResultCopier : public ScanResultCollector { |
| public: |
| ScanResultCopier(RowwiseRowBlockPB* rowblock_pb, faststring* rows_data, faststring* indirect_data) |
| : rowblock_pb_(DCHECK_NOTNULL(rowblock_pb)), |
| rows_data_(DCHECK_NOTNULL(rows_data)), |
| indirect_data_(DCHECK_NOTNULL(indirect_data)), |
| blocks_processed_(0), |
| num_rows_returned_(0) { |
| } |
| |
| virtual void HandleRowBlock(const Schema* client_projection_schema, |
| const RowBlock& row_block) OVERRIDE { |
| blocks_processed_++; |
| num_rows_returned_ += row_block.selection_vector()->CountSelected(); |
| SerializeRowBlock(row_block, rowblock_pb_, client_projection_schema, |
| rows_data_, indirect_data_); |
| SetLastRow(row_block, &last_primary_key_); |
| } |
| |
| virtual int BlocksProcessed() const OVERRIDE { return blocks_processed_; } |
| |
| // Returns number of bytes buffered to return. |
| virtual int64_t ResponseSize() const OVERRIDE { |
| return rows_data_->size() + indirect_data_->size(); |
| } |
| |
| virtual const faststring& last_primary_key() const OVERRIDE { |
| return last_primary_key_; |
| } |
| |
| virtual int64_t NumRowsReturned() const OVERRIDE { |
| return num_rows_returned_; |
| } |
| |
| private: |
| RowwiseRowBlockPB* const rowblock_pb_; |
| faststring* const rows_data_; |
| faststring* const indirect_data_; |
| int blocks_processed_; |
| int64_t num_rows_returned_; |
| faststring last_primary_key_; |
| |
| DISALLOW_COPY_AND_ASSIGN(ScanResultCopier); |
| }; |
| |
| // Checksums the scan result. |
| class ScanResultChecksummer : public ScanResultCollector { |
| public: |
| ScanResultChecksummer() |
| : crc_(crc::GetCrc32cInstance()), |
| agg_checksum_(0), |
| blocks_processed_(0), |
| rows_checksummed_(0) { |
| } |
| |
| virtual void HandleRowBlock(const Schema* client_projection_schema, |
| const RowBlock& row_block) OVERRIDE { |
| blocks_processed_++; |
| if (!client_projection_schema) { |
| client_projection_schema = &row_block.schema(); |
| } |
| |
| size_t nrows = row_block.nrows(); |
| for (size_t i = 0; i < nrows; i++) { |
| if (!row_block.selection_vector()->IsRowSelected(i)) continue; |
| uint32_t row_crc = CalcRowCrc32(*client_projection_schema, row_block.row(i)); |
| agg_checksum_ += row_crc; |
| rows_checksummed_++; |
| } |
| // Find the last selected row and save its encoded key. |
| SetLastRow(row_block, &encoded_last_row_); |
| } |
| |
| virtual int BlocksProcessed() const OVERRIDE { return blocks_processed_; } |
| |
| // Returns a constant -- we only return checksum based on a time budget. |
| virtual int64_t ResponseSize() const OVERRIDE { return sizeof(agg_checksum_); } |
| |
| virtual const faststring& last_primary_key() const OVERRIDE { return encoded_last_row_; } |
| |
| virtual int64_t NumRowsReturned() const OVERRIDE { |
| return 0; |
| } |
| |
| int64_t rows_checksummed() const { |
| return rows_checksummed_; |
| } |
| |
| // Accessors for initializing / setting the checksum. |
| void set_agg_checksum(uint64_t value) { agg_checksum_ = value; } |
| uint64_t agg_checksum() const { return agg_checksum_; } |
| |
| private: |
| // Calculates a CRC32C for the given row. |
| uint32_t CalcRowCrc32(const Schema& projection, const RowBlockRow& row) { |
| tmp_buf_.clear(); |
| |
| for (size_t j = 0; j < projection.num_columns(); j++) { |
| uint32_t col_index = static_cast<uint32_t>(j); // For the CRC. |
| tmp_buf_.append(&col_index, sizeof(col_index)); |
| ColumnBlockCell cell = row.cell(j); |
| if (cell.is_nullable()) { |
| uint8_t is_defined = cell.is_null() ? 0 : 1; |
| tmp_buf_.append(&is_defined, sizeof(is_defined)); |
| if (!is_defined) continue; |
| } |
| if (cell.typeinfo()->physical_type() == BINARY) { |
| const Slice* data = reinterpret_cast<const Slice *>(cell.ptr()); |
| tmp_buf_.append(data->data(), data->size()); |
| } else { |
| tmp_buf_.append(cell.ptr(), cell.size()); |
| } |
| } |
| |
| uint64_t row_crc = 0; |
| crc_->Compute(tmp_buf_.data(), tmp_buf_.size(), &row_crc, nullptr); |
| return static_cast<uint32_t>(row_crc); // CRC32 only uses the lower 32 bits. |
| } |
| |
| |
| faststring tmp_buf_; |
| crc::Crc* const crc_; |
| uint64_t agg_checksum_; |
| int blocks_processed_; |
| int64_t rows_checksummed_; |
| faststring encoded_last_row_; |
| |
| DISALLOW_COPY_AND_ASSIGN(ScanResultChecksummer); |
| }; |
| |
| // Return the batch size to use for a given request, after clamping |
| // the user-requested request within the server-side allowable range. |
| // This is only a hint, really more of a threshold since returned bytes |
| // may exceed this limit, but hopefully only by a little bit. |
| static size_t GetMaxBatchSizeBytesHint(const ScanRequestPB* req) { |
| if (!req->has_batch_size_bytes()) { |
| return FLAGS_scanner_default_batch_size_bytes; |
| } |
| |
| return std::min(req->batch_size_bytes(), |
| implicit_cast<uint32_t>(FLAGS_scanner_max_batch_size_bytes)); |
| } |
| |
| TabletServiceImpl::TabletServiceImpl(TabletServer* server) |
| : TabletServerServiceIf(server->metric_entity(), server->result_tracker()), |
| server_(server) { |
| } |
| |
| void TabletServiceImpl::Ping(const PingRequestPB* req, |
| PingResponsePB* resp, |
| rpc::RpcContext* context) { |
| context->RespondSuccess(); |
| } |
| |
| TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server) |
| : TabletServerAdminServiceIf(server->metric_entity(), server->result_tracker()), |
| server_(server) { |
| } |
| |
| void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req, |
| AlterSchemaResponsePB* resp, |
| rpc::RpcContext* context) { |
| if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "AlterSchema", req, resp, context)) { |
| return; |
| } |
| DVLOG(3) << "Received Alter Schema RPC: " << req->DebugString(); |
| |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(server_->tablet_manager(), req->tablet_id(), resp, context, |
| &tablet_peer)) { |
| return; |
| } |
| |
| uint32_t schema_version = tablet_peer->tablet_metadata()->schema_version(); |
| |
| // If the schema was already applied, respond as succeded |
| if (schema_version == req->schema_version()) { |
| // Sanity check, to verify that the tablet should have the same schema |
| // specified in the request. |
| Schema req_schema; |
| Status s = SchemaFromPB(req->schema(), &req_schema); |
| if (!s.ok()) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::INVALID_SCHEMA, context); |
| return; |
| } |
| |
| Schema tablet_schema = tablet_peer->tablet_metadata()->schema(); |
| if (req_schema.Equals(tablet_schema)) { |
| context->RespondSuccess(); |
| return; |
| } |
| |
| schema_version = tablet_peer->tablet_metadata()->schema_version(); |
| if (schema_version == req->schema_version()) { |
| LOG(ERROR) << "The current schema does not match the request schema." |
| << " version=" << schema_version |
| << " current-schema=" << tablet_schema.ToString() |
| << " request-schema=" << req_schema.ToString() |
| << " (corruption)"; |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::Corruption("got a different schema for the same version number"), |
| TabletServerErrorPB::MISMATCHED_SCHEMA, context); |
| return; |
| } |
| } |
| |
| // If the current schema is newer than the one in the request reject the request. |
| if (schema_version > req->schema_version()) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Tablet has a newer schema"), |
| TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, context); |
| return; |
| } |
| |
| unique_ptr<AlterSchemaTransactionState> tx_state( |
| new AlterSchemaTransactionState(tablet_peer.get(), req, resp)); |
| |
| tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>( |
| new RpcTransactionCompletionCallback<AlterSchemaResponsePB>(context, |
| resp))); |
| |
| // Submit the alter schema op. The RPC will be responded to asynchronously. |
| Status s = tablet_peer->SubmitAlterSchema(std::move(tx_state)); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| } |
| |
| void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req, |
| CreateTabletResponsePB* resp, |
| rpc::RpcContext* context) { |
| if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "CreateTablet", req, resp, context)) { |
| return; |
| } |
| TRACE_EVENT1("tserver", "CreateTablet", |
| "tablet_id", req->tablet_id()); |
| |
| Schema schema; |
| Status s = SchemaFromPB(req->schema(), &schema); |
| DCHECK(schema.has_column_ids()); |
| if (!s.ok()) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Invalid Schema."), |
| TabletServerErrorPB::INVALID_SCHEMA, context); |
| return; |
| } |
| |
| PartitionSchema partition_schema; |
| s = PartitionSchema::FromPB(req->partition_schema(), schema, &partition_schema); |
| if (!s.ok()) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Invalid PartitionSchema."), |
| TabletServerErrorPB::INVALID_SCHEMA, context); |
| return; |
| } |
| |
| Partition partition; |
| Partition::FromPB(req->partition(), &partition); |
| |
| LOG(INFO) << "Processing CreateTablet for tablet " << req->tablet_id() |
| << " (table=" << req->table_name() |
| << " [id=" << req->table_id() << "]), partition=" |
| << partition_schema.PartitionDebugString(partition, schema); |
| VLOG(1) << "Full request: " << req->DebugString(); |
| |
| s = server_->tablet_manager()->CreateNewTablet(req->table_id(), |
| req->tablet_id(), |
| partition, |
| req->table_name(), |
| schema, |
| partition_schema, |
| req->config(), |
| nullptr); |
| if (PREDICT_FALSE(!s.ok())) { |
| TabletServerErrorPB::Code code; |
| if (s.IsAlreadyPresent()) { |
| code = TabletServerErrorPB::TABLET_ALREADY_EXISTS; |
| } else { |
| code = TabletServerErrorPB::UNKNOWN_ERROR; |
| } |
| SetupErrorAndRespond(resp->mutable_error(), s, code, context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req, |
| DeleteTabletResponsePB* resp, |
| rpc::RpcContext* context) { |
| if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "DeleteTablet", req, resp, context)) { |
| return; |
| } |
| TRACE_EVENT2("tserver", "DeleteTablet", |
| "tablet_id", req->tablet_id(), |
| "reason", req->reason()); |
| |
| tablet::TabletDataState delete_type = tablet::TABLET_DATA_UNKNOWN; |
| if (req->has_delete_type()) { |
| delete_type = req->delete_type(); |
| } |
| LOG(INFO) << "Processing DeleteTablet for tablet " << req->tablet_id() |
| << " with delete_type " << TabletDataState_Name(delete_type) |
| << (req->has_reason() ? (" (" + req->reason() + ")") : "") |
| << " from " << context->requestor_string(); |
| VLOG(1) << "Full request: " << req->DebugString(); |
| |
| boost::optional<int64_t> cas_config_opid_index_less_or_equal; |
| if (req->has_cas_config_opid_index_less_or_equal()) { |
| cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal(); |
| } |
| boost::optional<TabletServerErrorPB::Code> error_code; |
| Status s = server_->tablet_manager()->DeleteTablet(req->tablet_id(), |
| delete_type, |
| cas_config_opid_index_less_or_equal, |
| &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| HandleErrorResponse(req, resp, context, error_code, s); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceImpl::Write(const WriteRequestPB* req, |
| WriteResponsePB* resp, |
| rpc::RpcContext* context) { |
| TRACE_EVENT1("tserver", "TabletServiceImpl::Write", |
| "tablet_id", req->tablet_id()); |
| DVLOG(3) << "Received Write RPC: " << req->DebugString(); |
| |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(server_->tablet_manager(), req->tablet_id(), resp, context, |
| &tablet_peer)) { |
| return; |
| } |
| |
| shared_ptr<Tablet> tablet; |
| TabletServerErrorPB::Code error_code; |
| Status s = GetTabletRef(tablet_peer, &tablet, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| |
| uint64_t bytes = req->row_operations().rows().size() + |
| req->row_operations().indirect_data().size(); |
| if (!tablet->ShouldThrottleAllow(bytes)) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::ServiceUnavailable("Rejecting Write request: throttled"), |
| TabletServerErrorPB::THROTTLED, |
| context); |
| return; |
| } |
| |
| // Check for memory pressure; don't bother doing any additional work if we've |
| // exceeded the limit. |
| double capacity_pct; |
| if (tablet->mem_tracker()->AnySoftLimitExceeded(&capacity_pct)) { |
| tablet->metrics()->leader_memory_pressure_rejections->Increment(); |
| string msg = StringPrintf( |
| "Soft memory limit exceeded (at %.2f%% of capacity)", |
| capacity_pct); |
| if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) { |
| KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting Write request: " << msg << THROTTLE_MSG; |
| } else { |
| KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting Write request: " << msg << THROTTLE_MSG; |
| } |
| SetupErrorAndRespond(resp->mutable_error(), Status::ServiceUnavailable(msg), |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| |
| if (!server_->clock()->SupportsExternalConsistencyMode(req->external_consistency_mode())) { |
| Status s = Status::NotSupported("The configured clock does not support the" |
| " required consistency mode."); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| |
| unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState( |
| tablet_peer.get(), |
| req, |
| context->AreResultsTracked() ? context->request_id() : nullptr, |
| resp)); |
| |
| // If the client sent us a timestamp, decode it and update the clock so that all future |
| // timestamps are greater than the passed timestamp. |
| if (req->has_propagated_timestamp()) { |
| Timestamp ts(req->propagated_timestamp()); |
| s = server_->clock()->Update(ts); |
| } |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| |
| tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>( |
| new RpcTransactionCompletionCallback<WriteResponsePB>(context, |
| resp))); |
| |
| // Submit the write. The RPC will be responded to asynchronously. |
| s = tablet_peer->SubmitWrite(std::move(tx_state)); |
| |
| // Check that we could submit the write |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| } |
| return; |
| } |
| |
| ConsensusServiceImpl::ConsensusServiceImpl(const scoped_refptr<MetricEntity>& metric_entity, |
| const scoped_refptr<ResultTracker>& result_tracker, |
| TabletPeerLookupIf* tablet_manager) |
| : ConsensusServiceIf(metric_entity, result_tracker), |
| tablet_manager_(tablet_manager) { |
| } |
| |
| ConsensusServiceImpl::~ConsensusServiceImpl() { |
| } |
| |
| void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req, |
| ConsensusResponsePB* resp, |
| rpc::RpcContext* context) { |
| DVLOG(3) << "Received Consensus Update RPC: " << req->DebugString(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, context, &tablet_peer)) { |
| return; |
| } |
| |
| tablet_peer->permanent_uuid(); |
| |
| // Submit the update directly to the TabletPeer's Consensus instance. |
| scoped_refptr<Consensus> consensus; |
| if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return; |
| Status s = consensus->Update(req, resp); |
| if (PREDICT_FALSE(!s.ok())) { |
| // Clear the response first, since a partially-filled response could |
| // result in confusing a caller, or in having missing required fields |
| // in embedded optional messages. |
| resp->Clear(); |
| |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req, |
| VoteResponsePB* resp, |
| rpc::RpcContext* context) { |
| DVLOG(3) << "Received Consensus Request Vote RPC: " << req->DebugString(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, context, &tablet_peer)) { |
| return; |
| } |
| |
| // Submit the vote request directly to the consensus instance. |
| scoped_refptr<Consensus> consensus; |
| if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return; |
| Status s = consensus->RequestVote(req, resp); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req, |
| ChangeConfigResponsePB* resp, |
| RpcContext* context) { |
| DVLOG(3) << "Received ChangeConfig RPC: " << req->DebugString(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, context, |
| &tablet_peer)) { |
| return; |
| } |
| |
| scoped_refptr<Consensus> consensus; |
| if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return; |
| boost::optional<TabletServerErrorPB::Code> error_code; |
| Status s = consensus->ChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| HandleErrorResponse(req, resp, context, error_code, s); |
| return; |
| } |
| // The success case is handled when the callback fires. |
| } |
| |
| void ConsensusServiceImpl::GetNodeInstance(const GetNodeInstanceRequestPB* req, |
| GetNodeInstanceResponsePB* resp, |
| rpc::RpcContext* context) { |
| DVLOG(3) << "Received Get Node Instance RPC: " << req->DebugString(); |
| resp->mutable_node_instance()->CopyFrom(tablet_manager_->NodeInstance()); |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* req, |
| RunLeaderElectionResponsePB* resp, |
| rpc::RpcContext* context) { |
| DVLOG(3) << "Received Run Leader Election RPC: " << req->DebugString(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "RunLeaderElection", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, context, &tablet_peer)) { |
| return; |
| } |
| |
| scoped_refptr<Consensus> consensus; |
| if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return; |
| Status s = consensus->StartElection( |
| consensus::Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req, |
| LeaderStepDownResponsePB* resp, |
| RpcContext* context) { |
| DVLOG(3) << "Received Leader stepdown RPC: " << req->DebugString(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "LeaderStepDown", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, context, &tablet_peer)) { |
| return; |
| } |
| |
| scoped_refptr<Consensus> consensus; |
| if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return; |
| Status s = consensus->StepDown(resp); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req, |
| consensus::GetLastOpIdResponsePB *resp, |
| rpc::RpcContext *context) { |
| DVLOG(3) << "Received GetLastOpId RPC: " << req->DebugString(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, context, &tablet_peer)) { |
| return; |
| } |
| |
| if (tablet_peer->state() != tablet::RUNNING) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::ServiceUnavailable("Tablet Peer not in RUNNING state"), |
| TabletServerErrorPB::TABLET_NOT_RUNNING, context); |
| return; |
| } |
| scoped_refptr<Consensus> consensus; |
| if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return; |
| if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) { |
| HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"), |
| resp, context); |
| return; |
| } |
| Status s = consensus->GetLastOpId(req->opid_type(), resp->mutable_opid()); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateRequestPB *req, |
| consensus::GetConsensusStateResponsePB *resp, |
| rpc::RpcContext *context) { |
| DVLOG(3) << "Received GetConsensusState RPC: " << req->DebugString(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "GetConsensusState", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(tablet_manager_, req->tablet_id(), resp, context, &tablet_peer)) { |
| return; |
| } |
| |
| scoped_refptr<Consensus> consensus; |
| if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return; |
| ConsensusConfigType type = req->type(); |
| if (PREDICT_FALSE(type != CONSENSUS_CONFIG_ACTIVE && type != CONSENSUS_CONFIG_COMMITTED)) { |
| HandleUnknownError( |
| Status::InvalidArgument(Substitute("Unsupported ConsensusConfigType $0 ($1)", |
| ConsensusConfigType_Name(type), type)), |
| resp, context); |
| return; |
| } |
| *resp->mutable_cstate() = consensus->ConsensusState(req->type()); |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::StartTabletCopy(const StartTabletCopyRequestPB* req, |
| StartTabletCopyResponsePB* resp, |
| rpc::RpcContext* context) { |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "StartTabletCopy", req, resp, context)) { |
| return; |
| } |
| boost::optional<TabletServerErrorPB::Code> error_code; |
| Status s = tablet_manager_->StartTabletCopy(*req, &error_code); |
| if (!s.ok()) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| error_code.get_value_or(TabletServerErrorPB::UNKNOWN_ERROR), |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req, |
| ScannerKeepAliveResponsePB *resp, |
| rpc::RpcContext *context) { |
| DCHECK(req->has_scanner_id()); |
| SharedScanner scanner; |
| if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) { |
| resp->mutable_error()->set_code(TabletServerErrorPB::SCANNER_EXPIRED); |
| StatusToPB(Status::NotFound("Scanner not found"), |
| resp->mutable_error()->mutable_status()); |
| return; |
| } |
| scanner->UpdateAccessTime(); |
| context->RespondSuccess(); |
| } |
| |
| namespace { |
| void SetResourceMetrics(ResourceMetricsPB* metrics, rpc::RpcContext* context) { |
| metrics->set_cfile_cache_miss_bytes( |
| context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_MISS_BYTES_METRIC_NAME)); |
| metrics->set_cfile_cache_hit_bytes( |
| context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_HIT_BYTES_METRIC_NAME)); |
| } |
| } // anonymous namespace |
| |
| void TabletServiceImpl::Scan(const ScanRequestPB* req, |
| ScanResponsePB* resp, |
| rpc::RpcContext* context) { |
| TRACE_EVENT0("tserver", "TabletServiceImpl::Scan"); |
| // Validate the request: user must pass a new_scan_request or |
| // a scanner ID, but not both. |
| if (PREDICT_FALSE(req->has_scanner_id() && |
| req->has_new_scan_request())) { |
| context->RespondFailure(Status::InvalidArgument( |
| "Must not pass both a scanner_id and new_scan_request")); |
| return; |
| } |
| |
| size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req); |
| gscoped_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10)); |
| gscoped_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10)); |
| RowwiseRowBlockPB data; |
| ScanResultCopier collector(&data, rows_data.get(), indirect_data.get()); |
| |
| bool has_more_results = false; |
| TabletServerErrorPB::Code error_code; |
| if (req->has_new_scan_request()) { |
| const NewScanRequestPB& scan_pb = req->new_scan_request(); |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(server_->tablet_manager(), scan_pb.tablet_id(), resp, context, |
| &tablet_peer)) { |
| return; |
| } |
| string scanner_id; |
| Timestamp scan_timestamp; |
| Status s = HandleNewScanRequest(tablet_peer.get(), req, context, |
| &collector, &scanner_id, &scan_timestamp, &has_more_results, |
| &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| |
| // Only set the scanner id if we have more results. |
| if (has_more_results) { |
| resp->set_scanner_id(scanner_id); |
| } |
| if (scan_timestamp != Timestamp::kInvalidTimestamp) { |
| resp->set_snap_timestamp(scan_timestamp.ToUint64()); |
| } |
| } else if (req->has_scanner_id()) { |
| Status s = HandleContinueScanRequest(req, &collector, &has_more_results, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| } else { |
| context->RespondFailure(Status::InvalidArgument( |
| "Must pass either a scanner_id or new_scan_request")); |
| return; |
| } |
| resp->set_has_more_results(has_more_results); |
| |
| DVLOG(2) << "Blocks processed: " << collector.BlocksProcessed(); |
| if (collector.BlocksProcessed() > 0) { |
| resp->mutable_data()->CopyFrom(data); |
| |
| // Add sidecar data to context and record the returned indices. |
| int rows_idx; |
| CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr( |
| new rpc::RpcSidecar(std::move(rows_data))), &rows_idx)); |
| resp->mutable_data()->set_rows_sidecar(rows_idx); |
| |
| // Add indirect data as a sidecar, if applicable. |
| if (indirect_data->size() > 0) { |
| int indirect_idx; |
| CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr( |
| new rpc::RpcSidecar(std::move(indirect_data))), &indirect_idx)); |
| resp->mutable_data()->set_indirect_data_sidecar(indirect_idx); |
| } |
| |
| // Set the last row found by the collector. |
| // We could have an empty batch if all the remaining rows are filtered by the predicate, |
| // in which case do not set the last row. |
| const faststring& last = collector.last_primary_key(); |
| if (last.length() > 0) { |
| resp->set_last_primary_key(last.ToString()); |
| } |
| } |
| SetResourceMetrics(resp->mutable_resource_metrics(), context); |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req, |
| ListTabletsResponsePB* resp, |
| rpc::RpcContext* context) { |
| vector<scoped_refptr<TabletPeer> > peers; |
| server_->tablet_manager()->GetTabletPeers(&peers); |
| RepeatedPtrField<StatusAndSchemaPB>* peer_status = resp->mutable_status_and_schema(); |
| for (const scoped_refptr<TabletPeer>& peer : peers) { |
| StatusAndSchemaPB* status = peer_status->Add(); |
| peer->GetTabletStatusPB(status->mutable_tablet_status()); |
| |
| if (req->need_schema_info()) { |
| CHECK_OK(SchemaToPB(peer->status_listener()->schema(), |
| status->mutable_schema())); |
| peer->tablet_metadata()->partition_schema().ToPB(status->mutable_partition_schema()); |
| } |
| } |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceImpl::Checksum(const ChecksumRequestPB* req, |
| ChecksumResponsePB* resp, |
| rpc::RpcContext* context) { |
| VLOG(1) << "Full request: " << req->DebugString(); |
| |
| // Validate the request: user must pass a new_scan_request or |
| // a scanner ID, but not both. |
| if (PREDICT_FALSE(req->has_new_request() && |
| req->has_continue_request())) { |
| context->RespondFailure(Status::InvalidArgument( |
| "Must not pass both a scanner_id and new_scan_request")); |
| return; |
| } |
| |
| // Convert ChecksumRequestPB to a ScanRequestPB. |
| ScanRequestPB scan_req; |
| if (req->has_call_seq_id()) scan_req.set_call_seq_id(req->call_seq_id()); |
| if (req->has_batch_size_bytes()) scan_req.set_batch_size_bytes(req->batch_size_bytes()); |
| if (req->has_close_scanner()) scan_req.set_close_scanner(req->close_scanner()); |
| |
| ScanResultChecksummer collector; |
| bool has_more = false; |
| TabletServerErrorPB::Code error_code; |
| if (req->has_new_request()) { |
| scan_req.mutable_new_scan_request()->CopyFrom(req->new_request()); |
| const NewScanRequestPB& new_req = req->new_request(); |
| scoped_refptr<TabletPeer> tablet_peer; |
| if (!LookupTabletPeerOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp, context, |
| &tablet_peer)) { |
| return; |
| } |
| |
| string scanner_id; |
| Timestamp snap_timestamp; |
| Status s = HandleNewScanRequest(tablet_peer.get(), &scan_req, context, |
| &collector, &scanner_id, &snap_timestamp, &has_more, |
| &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| resp->set_scanner_id(scanner_id); |
| if (snap_timestamp != Timestamp::kInvalidTimestamp) { |
| resp->set_snap_timestamp(snap_timestamp.ToUint64()); |
| } |
| } else if (req->has_continue_request()) { |
| const ContinueChecksumRequestPB& continue_req = req->continue_request(); |
| collector.set_agg_checksum(continue_req.previous_checksum()); |
| scan_req.set_scanner_id(continue_req.scanner_id()); |
| Status s = HandleContinueScanRequest(&scan_req, &collector, &has_more, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| } else { |
| context->RespondFailure(Status::InvalidArgument( |
| "Must pass either new_request or continue_request")); |
| return; |
| } |
| |
| resp->set_checksum(collector.agg_checksum()); |
| resp->set_has_more_results(has_more); |
| SetResourceMetrics(resp->mutable_resource_metrics(), context); |
| resp->set_rows_checksummed(collector.rows_checksummed()); |
| context->RespondSuccess(); |
| } |
| |
| bool TabletServiceImpl::SupportsFeature(uint32_t feature) const { |
| return feature == TabletServerFeatures::COLUMN_PREDICATES; |
| } |
| |
| void TabletServiceImpl::Shutdown() { |
| } |
| |
| // Extract a void* pointer suitable for use in a ColumnRangePredicate from the |
| // user-specified protobuf field. |
| // This validates that the pb_value has the correct length, copies the data into |
| // 'arena', and sets *result to point to it. |
| // Returns bad status if the user-specified value is the wrong length. |
| static Status ExtractPredicateValue(const ColumnSchema& schema, |
| const string& pb_value, |
| Arena* arena, |
| const void** result) { |
| // Copy the data from the protobuf into the Arena. |
| uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(pb_value.size())); |
| memcpy(data_copy, &pb_value[0], pb_value.size()); |
| |
| // If the type is of variable length, then we need to return a pointer to a Slice |
| // element pointing to the string. Otherwise, just verify that the provided |
| // value was the right size. |
| if (schema.type_info()->physical_type() == BINARY) { |
| *result = arena->NewObject<Slice>(data_copy, pb_value.size()); |
| } else { |
| // TODO: add test case for this invalid request |
| size_t expected_size = schema.type_info()->size(); |
| if (pb_value.size() != expected_size) { |
| return Status::InvalidArgument( |
| StringPrintf("Bad predicate on %s. Expected value size %zd, got %zd", |
| schema.ToString().c_str(), expected_size, pb_value.size())); |
| } |
| *result = data_copy; |
| } |
| |
| return Status::OK(); |
| } |
| |
| static Status DecodeEncodedKeyRange(const NewScanRequestPB& scan_pb, |
| const Schema& tablet_schema, |
| const SharedScanner& scanner, |
| ScanSpec* spec) { |
| gscoped_ptr<EncodedKey> start, stop; |
| if (scan_pb.has_start_primary_key()) { |
| RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString( |
| tablet_schema, scanner->arena(), |
| scan_pb.start_primary_key(), &start), |
| "Invalid scan start key"); |
| } |
| |
| if (scan_pb.has_stop_primary_key()) { |
| RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString( |
| tablet_schema, scanner->arena(), |
| scan_pb.stop_primary_key(), &stop), |
| "Invalid scan stop key"); |
| } |
| |
| if (scan_pb.order_mode() == ORDERED && scan_pb.has_last_primary_key()) { |
| if (start) { |
| return Status::InvalidArgument("Cannot specify both a start key and a last key"); |
| } |
| // Set the start key to the last key from a previous scan result. |
| RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString(tablet_schema, scanner->arena(), |
| scan_pb.last_primary_key(), &start), |
| "Failed to decode last primary key"); |
| // Increment the start key, so we don't return the last row again. |
| RETURN_NOT_OK_PREPEND(EncodedKey::IncrementEncodedKey(tablet_schema, &start, scanner->arena()), |
| "Failed to increment encoded last row key"); |
| } |
| |
| if (start) { |
| spec->SetLowerBoundKey(start.get()); |
| scanner->autorelease_pool()->Add(start.release()); |
| } |
| if (stop) { |
| spec->SetExclusiveUpperBoundKey(stop.get()); |
| scanner->autorelease_pool()->Add(stop.release()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| static Status SetupScanSpec(const NewScanRequestPB& scan_pb, |
| const Schema& tablet_schema, |
| const Schema& projection, |
| vector<ColumnSchema>* missing_cols, |
| gscoped_ptr<ScanSpec>* spec, |
| const SharedScanner& scanner) { |
| gscoped_ptr<ScanSpec> ret(new ScanSpec); |
| ret->set_cache_blocks(scan_pb.cache_blocks()); |
| |
| unordered_set<string> missing_col_names; |
| |
| // First the column predicates. |
| for (const ColumnPredicatePB& pred_pb : scan_pb.column_predicates()) { |
| boost::optional<ColumnPredicate> predicate; |
| RETURN_NOT_OK(ColumnPredicateFromPB(tablet_schema, scanner->arena(), pred_pb, &predicate)); |
| |
| if (projection.find_column(predicate->column().name()) == Schema::kColumnNotFound && |
| !ContainsKey(missing_col_names, predicate->column().name())) { |
| InsertOrDie(&missing_col_names, predicate->column().name()); |
| missing_cols->push_back(predicate->column()); |
| } |
| |
| ret->AddPredicate(std::move(*predicate)); |
| } |
| |
| // Then the column range predicates. |
| // TODO: remove this once all clients have moved to ColumnPredicatePB and |
| // backwards compatibility can be broken. |
| for (const ColumnRangePredicatePB& pred_pb : scan_pb.deprecated_range_predicates()) { |
| if (!pred_pb.has_lower_bound() && !pred_pb.has_inclusive_upper_bound()) { |
| return Status::InvalidArgument( |
| string("Invalid predicate ") + pred_pb.ShortDebugString() + |
| ": has no lower or upper bound."); |
| } |
| ColumnSchema col(ColumnSchemaFromPB(pred_pb.column())); |
| if (projection.find_column(col.name()) == Schema::kColumnNotFound && |
| !ContainsKey(missing_col_names, col.name())) { |
| missing_cols->push_back(col); |
| InsertOrDie(&missing_col_names, col.name()); |
| } |
| |
| const void* lower_bound = nullptr; |
| const void* upper_bound = nullptr; |
| if (pred_pb.has_lower_bound()) { |
| const void* val; |
| RETURN_NOT_OK(ExtractPredicateValue(col, pred_pb.lower_bound(), |
| scanner->arena(), |
| &val)); |
| lower_bound = val; |
| } |
| if (pred_pb.has_inclusive_upper_bound()) { |
| const void* val; |
| RETURN_NOT_OK(ExtractPredicateValue(col, pred_pb.inclusive_upper_bound(), |
| scanner->arena(), |
| &val)); |
| upper_bound = val; |
| } |
| |
| auto pred = ColumnPredicate::InclusiveRange(col, lower_bound, upper_bound, scanner->arena()); |
| if (pred) { |
| if (VLOG_IS_ON(3)) { |
| VLOG(3) << "Parsed predicate " << pred->ToString() |
| << " from " << scan_pb.ShortDebugString(); |
| } |
| ret->AddPredicate(*pred); |
| } |
| } |
| |
| // When doing an ordered scan, we need to include the key columns to be able to encode |
| // the last row key for the scan response. |
| if (scan_pb.order_mode() == kudu::ORDERED && |
| projection.num_key_columns() != tablet_schema.num_key_columns()) { |
| for (int i = 0; i < tablet_schema.num_key_columns(); i++) { |
| const ColumnSchema &col = tablet_schema.column(i); |
| if (projection.find_column(col.name()) == -1 && |
| !ContainsKey(missing_col_names, col.name())) { |
| missing_cols->push_back(col); |
| InsertOrDie(&missing_col_names, col.name()); |
| } |
| } |
| } |
| // Then any encoded key range predicates. |
| RETURN_NOT_OK(DecodeEncodedKeyRange(scan_pb, tablet_schema, scanner, ret.get())); |
| |
| spec->swap(ret); |
| return Status::OK(); |
| } |
| |
| // Start a new scan. |
| Status TabletServiceImpl::HandleNewScanRequest(TabletPeer* tablet_peer, |
| const ScanRequestPB* req, |
| const RpcContext* rpc_context, |
| ScanResultCollector* result_collector, |
| std::string* scanner_id, |
| Timestamp* snap_timestamp, |
| bool* has_more_results, |
| TabletServerErrorPB::Code* error_code) { |
| DCHECK(result_collector != nullptr); |
| DCHECK(error_code != nullptr); |
| DCHECK(req->has_new_scan_request()); |
| const NewScanRequestPB& scan_pb = req->new_scan_request(); |
| TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest", |
| "tablet_id", scan_pb.tablet_id()); |
| |
| const Schema& tablet_schema = tablet_peer->tablet_metadata()->schema(); |
| |
| SharedScanner scanner; |
| server_->scanner_manager()->NewScanner(tablet_peer, |
| rpc_context->requestor_string(), |
| &scanner); |
| |
| // If we early-exit out of this function, automatically unregister |
| // the scanner. |
| ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id()); |
| |
| // Create the user's requested projection. |
| // TODO: add test cases for bad projections including 0 columns |
| Schema projection; |
| Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection); |
| if (PREDICT_FALSE(!s.ok())) { |
| *error_code = TabletServerErrorPB::INVALID_SCHEMA; |
| return s; |
| } |
| |
| if (projection.has_column_ids()) { |
| *error_code = TabletServerErrorPB::INVALID_SCHEMA; |
| return Status::InvalidArgument("User requests should not have Column IDs"); |
| } |
| |
| if (scan_pb.order_mode() == ORDERED) { |
| // Ordered scans must be at a snapshot so that we perform a serializable read (which can be |
| // resumed). Otherwise, this would be read committed isolation, which is not resumable. |
| if (scan_pb.read_mode() != READ_AT_SNAPSHOT) { |
| *error_code = TabletServerErrorPB::INVALID_SNAPSHOT; |
| return Status::InvalidArgument("Cannot do an ordered scan that is not a snapshot read"); |
| } |
| } |
| |
| gscoped_ptr<ScanSpec> spec(new ScanSpec); |
| |
| // Missing columns will contain the columns that are not mentioned in the client |
| // projection but are actually needed for the scan, such as columns referred to by |
| // predicates or key columns (if this is an ORDERED scan). |
| vector<ColumnSchema> missing_cols; |
| s = SetupScanSpec(scan_pb, tablet_schema, projection, &missing_cols, &spec, scanner); |
| if (PREDICT_FALSE(!s.ok())) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return s; |
| } |
| |
| VLOG(3) << "Before optimizing scan spec: " << spec->ToString(tablet_schema); |
| spec->OptimizeScan(tablet_schema, scanner->arena(), scanner->autorelease_pool(), true); |
| VLOG(3) << "After optimizing scan spec: " << spec->ToString(tablet_schema); |
| |
| if (spec->CanShortCircuit()) { |
| VLOG(1) << "short-circuiting without creating a server-side scanner."; |
| *has_more_results = false; |
| return Status::OK(); |
| } |
| |
| // Store the original projection. |
| gscoped_ptr<Schema> orig_projection(new Schema(projection)); |
| scanner->set_client_projection_schema(std::move(orig_projection)); |
| |
| // Build a new projection with the projection columns and the missing columns. Make |
| // sure to set whether the column is a key column appropriately. |
| SchemaBuilder projection_builder; |
| vector<ColumnSchema> projection_columns = projection.columns(); |
| for (const ColumnSchema& col : missing_cols) { |
| projection_columns.push_back(col); |
| } |
| for (const ColumnSchema& col : projection_columns) { |
| CHECK_OK(projection_builder.AddColumn(col, tablet_schema.is_key_column(col.name()))); |
| } |
| projection = projection_builder.BuildWithoutIds(); |
| |
| gscoped_ptr<RowwiseIterator> iter; |
| // Preset the error code for when creating the iterator on the tablet fails |
| TabletServerErrorPB::Code tmp_error_code = TabletServerErrorPB::MISMATCHED_SCHEMA; |
| |
| shared_ptr<Tablet> tablet; |
| RETURN_NOT_OK(GetTabletRef(tablet_peer, &tablet, error_code)); |
| { |
| TRACE("Creating iterator"); |
| TRACE_EVENT0("tserver", "Create iterator"); |
| |
| switch (scan_pb.read_mode()) { |
| case UNKNOWN_READ_MODE: { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| s = Status::NotSupported("Unknown read mode."); |
| return s; |
| } |
| case READ_LATEST: { |
| s = tablet->NewRowIterator(projection, &iter); |
| break; |
| } |
| case READ_AT_SNAPSHOT: { |
| s = HandleScanAtSnapshot(scan_pb, rpc_context, projection, tablet, &iter, snap_timestamp); |
| if (!s.ok()) { |
| tmp_error_code = TabletServerErrorPB::INVALID_SNAPSHOT; |
| } |
| } |
| TRACE("Iterator created"); |
| } |
| } |
| |
| if (PREDICT_TRUE(s.ok())) { |
| TRACE_EVENT0("tserver", "iter->Init"); |
| s = iter->Init(spec.get()); |
| } |
| |
| TRACE("Iterator init: $0", s.ToString()); |
| |
| if (PREDICT_FALSE(s.IsInvalidArgument())) { |
| // An invalid projection returns InvalidArgument above. |
| // TODO: would be nice if we threaded these more specific |
| // error codes throughout Kudu. |
| *error_code = tmp_error_code; |
| return s; |
| } else if (PREDICT_FALSE(!s.ok())) { |
| LOG(WARNING) << "Error setting up scanner with request " << req->ShortDebugString(); |
| *error_code = TabletServerErrorPB::UNKNOWN_ERROR; |
| return s; |
| } |
| |
| *has_more_results = iter->HasNext(); |
| TRACE("has_more: $0", *has_more_results); |
| if (!*has_more_results) { |
| // If there are no more rows, we can short circuit some work and respond immediately. |
| VLOG(1) << "No more rows, short-circuiting out without creating a server-side scanner."; |
| return Status::OK(); |
| } |
| |
| scanner->Init(std::move(iter), std::move(spec)); |
| unreg_scanner.Cancel(); |
| *scanner_id = scanner->id(); |
| |
| VLOG(1) << "Started scanner " << scanner->id() << ": " << scanner->iter()->ToString(); |
| |
| size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req); |
| if (batch_size_bytes > 0) { |
| TRACE("Continuing scan request"); |
| // TODO: instead of copying the pb, instead split HandleContinueScanRequest |
| // and call the second half directly |
| ScanRequestPB continue_req(*req); |
| continue_req.set_scanner_id(scanner->id()); |
| RETURN_NOT_OK(HandleContinueScanRequest(&continue_req, result_collector, has_more_results, |
| error_code)); |
| } else { |
| // Increment the scanner call sequence ID. HandleContinueScanRequest handles |
| // this in the non-empty scan case. |
| scanner->IncrementCallSeqId(); |
| } |
| return Status::OK(); |
| } |
| |
| // Continue an existing scan request. |
| Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req, |
| ScanResultCollector* result_collector, |
| bool* has_more_results, |
| TabletServerErrorPB::Code* error_code) { |
| DCHECK(req->has_scanner_id()); |
| TRACE_EVENT1("tserver", "TabletServiceImpl::HandleContinueScanRequest", |
| "scanner_id", req->scanner_id()); |
| |
| size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req); |
| |
| // TODO: need some kind of concurrency control on these scanner objects |
| // in case multiple RPCs hit the same scanner at the same time. Probably |
| // just a trylock and fail the RPC if it contends. |
| SharedScanner scanner; |
| if (!server_->scanner_manager()->LookupScanner(req->scanner_id(), &scanner)) { |
| if (batch_size_bytes == 0 && req->close_scanner()) { |
| // A request to close a non-existent scanner. |
| return Status::OK(); |
| } else { |
| *error_code = TabletServerErrorPB::SCANNER_EXPIRED; |
| return Status::NotFound("Scanner not found"); |
| } |
| } |
| |
| // If we early-exit out of this function, automatically unregister the scanner. |
| ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id()); |
| |
| VLOG(2) << "Found existing scanner " << scanner->id() << " for request: " |
| << req->ShortDebugString(); |
| TRACE("Found scanner $0", scanner->id()); |
| |
| if (batch_size_bytes == 0 && req->close_scanner()) { |
| *has_more_results = false; |
| return Status::OK(); |
| } |
| |
| if (req->call_seq_id() != scanner->call_seq_id()) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_CALL_SEQ_ID; |
| return Status::InvalidArgument("Invalid call sequence ID in scan request"); |
| } |
| scanner->IncrementCallSeqId(); |
| scanner->UpdateAccessTime(); |
| |
| RowwiseIterator* iter = scanner->iter(); |
| |
| // TODO: could size the RowBlock based on the user's requested batch size? |
| // If people had really large indirect objects, we would currently overshoot |
| // their requested batch size by a lot. |
| Arena arena(32 * 1024, 1 * 1024 * 1024); |
| RowBlock block(scanner->iter()->schema(), |
| FLAGS_scanner_batch_size_rows, &arena); |
| |
| // TODO: in the future, use the client timeout to set a budget. For now, |
| // just use a half second, which should be plenty to amortize call overhead. |
| int budget_ms = 500; |
| MonoTime deadline = MonoTime::Now(MonoTime::COARSE); |
| deadline.AddDelta(MonoDelta::FromMilliseconds(budget_ms)); |
| |
| int64_t rows_scanned = 0; |
| while (iter->HasNext()) { |
| if (PREDICT_FALSE(FLAGS_scanner_inject_latency_on_each_batch_ms > 0)) { |
| SleepFor(MonoDelta::FromMilliseconds(FLAGS_scanner_inject_latency_on_each_batch_ms)); |
| } |
| |
| Status s = iter->NextBlock(&block); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG(WARNING) << "Copying rows from internal iterator for request " << req->ShortDebugString(); |
| *error_code = TabletServerErrorPB::UNKNOWN_ERROR; |
| return s; |
| } |
| |
| if (PREDICT_TRUE(block.nrows() > 0)) { |
| // Count the number of rows scanned, regardless of predicates or deletions. |
| // The collector will separately count the number of rows actually returned to |
| // the client. |
| rows_scanned += block.nrows(); |
| result_collector->HandleRowBlock(scanner->client_projection_schema(), block); |
| } |
| |
| int64_t response_size = result_collector->ResponseSize(); |
| |
| if (VLOG_IS_ON(2)) { |
| // This may be fairly expensive if row block size is small |
| TRACE("Copied block (nrows=$0), new size=$1", block.nrows(), response_size); |
| } |
| |
| // TODO: should check if RPC got cancelled, once we implement RPC cancellation. |
| MonoTime now = MonoTime::Now(MonoTime::COARSE); |
| if (PREDICT_FALSE(!now.ComesBefore(deadline))) { |
| TRACE("Deadline expired - responding early"); |
| break; |
| } |
| |
| if (response_size >= batch_size_bytes) { |
| break; |
| } |
| } |
| |
| // Update metrics based on this scan request. |
| scoped_refptr<TabletPeer> tablet_peer = scanner->tablet_peer(); |
| shared_ptr<Tablet> tablet; |
| RETURN_NOT_OK(GetTabletRef(tablet_peer, &tablet, error_code)); |
| |
| // First, the number of rows/cells/bytes actually returned to the user. |
| tablet->metrics()->scanner_rows_returned->IncrementBy( |
| result_collector->NumRowsReturned()); |
| tablet->metrics()->scanner_cells_returned->IncrementBy( |
| result_collector->NumRowsReturned() * scanner->client_projection_schema()->num_columns()); |
| tablet->metrics()->scanner_bytes_returned->IncrementBy( |
| result_collector->ResponseSize()); |
| |
| // Then the number of rows/cells/bytes actually processed. Here we have to dig |
| // into the per-column iterator stats, sum them up, and then subtract out the |
| // total that we already reported in a previous scan. |
| vector<IteratorStats> stats_by_col; |
| scanner->GetIteratorStats(&stats_by_col); |
| IteratorStats total_stats; |
| for (const IteratorStats& stats : stats_by_col) { |
| total_stats.AddStats(stats); |
| } |
| IteratorStats delta_stats = total_stats; |
| delta_stats.SubtractStats(scanner->already_reported_stats()); |
| scanner->set_already_reported_stats(total_stats); |
| |
| tablet->metrics()->scanner_rows_scanned->IncrementBy( |
| rows_scanned); |
| tablet->metrics()->scanner_cells_scanned_from_disk->IncrementBy( |
| delta_stats.cells_read_from_disk); |
| tablet->metrics()->scanner_bytes_scanned_from_disk->IncrementBy( |
| delta_stats.bytes_read_from_disk); |
| |
| scanner->UpdateAccessTime(); |
| *has_more_results = !req->close_scanner() && iter->HasNext(); |
| if (*has_more_results) { |
| unreg_scanner.Cancel(); |
| } else { |
| VLOG(2) << "Scanner " << scanner->id() << " complete: removing..."; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb, |
| const RpcContext* rpc_context, |
| const Schema& projection, |
| const shared_ptr<Tablet>& tablet, |
| gscoped_ptr<RowwiseIterator>* iter, |
| Timestamp* snap_timestamp) { |
| |
| // TODO check against the earliest boundary (i.e. how early can we go) right |
| // now we're keeping all undos/redos forever! |
| |
| // If the client sent a timestamp update our clock with it. |
| if (scan_pb.has_propagated_timestamp()) { |
| Timestamp propagated_timestamp(scan_pb.propagated_timestamp()); |
| |
| // Update the clock so that we never generate snapshots lower that |
| // 'propagated_timestamp'. If 'propagated_timestamp' is lower than |
| // 'now' this call has no effect. If 'propagated_timestamp' is too much |
| // into the future this will fail and we abort. |
| RETURN_NOT_OK(server_->clock()->Update(propagated_timestamp)); |
| } |
| |
| Timestamp tmp_snap_timestamp; |
| |
| // If the client provided no snapshot timestamp we take the current clock |
| // time as the snapshot timestamp. |
| if (!scan_pb.has_snap_timestamp()) { |
| tmp_snap_timestamp = server_->clock()->Now(); |
| // ... else we use the client provided one, but make sure it is not too far |
| // in the future as to be invalid. |
| } else { |
| tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp()); |
| Timestamp max_allowed_ts; |
| Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts); |
| if (!s.ok()) { |
| return Status::NotSupported("Snapshot scans not supported on this server", |
| s.ToString()); |
| } |
| if (tmp_snap_timestamp.CompareTo(max_allowed_ts) > 0) { |
| return Status::InvalidArgument( |
| Substitute("Snapshot time $0 in the future. Max allowed timestamp is $1", |
| server_->clock()->Stringify(tmp_snap_timestamp), |
| server_->clock()->Stringify(max_allowed_ts))); |
| } |
| } |
| |
| tablet::MvccSnapshot snap; |
| |
| // Wait for the in-flights in the snapshot to be finished. |
| // We'll use the client-provided deadline, but not if it's more than 5 seconds from |
| // now -- it's better to make the client retry than hold RPC threads busy. |
| // |
| // TODO(KUDU-1127): even this may not be sufficient -- perhaps we should check how long it |
| // has been since the MVCC manager was able to advance its safe time. If it has been |
| // a long time, it's likely that the majority of voters for this tablet are down |
| // and some writes are "stuck" and therefore won't be committed. |
| MonoTime client_deadline = rpc_context->GetClientDeadline(); |
| // Subtract a little bit from the client deadline so that it's more likely we actually |
| // have time to send our response sent back before it times out. |
| client_deadline.AddDelta(MonoDelta::FromMilliseconds(-10)); |
| |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(MonoDelta::FromSeconds(5)); |
| if (client_deadline.ComesBefore(deadline)) { |
| deadline = client_deadline; |
| } |
| |
| TRACE("Waiting for operations in snapshot to commit"); |
| MonoTime before = MonoTime::Now(MonoTime::FINE); |
| RETURN_NOT_OK_PREPEND( |
| tablet->mvcc_manager()->WaitForCleanSnapshotAtTimestamp( |
| tmp_snap_timestamp, &snap, deadline), |
| "could not wait for desired snapshot timestamp to be consistent"); |
| |
| uint64_t duration_usec = MonoTime::Now(MonoTime::FINE).GetDeltaSince(before).ToMicroseconds(); |
| tablet->metrics()->snapshot_read_inflight_wait_duration->Increment(duration_usec); |
| TRACE("All operations in snapshot committed. Waited for $0 microseconds", duration_usec); |
| |
| tablet::Tablet::OrderMode order; |
| switch (scan_pb.order_mode()) { |
| case UNORDERED: order = tablet::Tablet::UNORDERED; break; |
| case ORDERED: order = tablet::Tablet::ORDERED; break; |
| default: LOG(FATAL) << "Unexpected order mode."; |
| } |
| RETURN_NOT_OK(tablet->NewRowIterator(projection, snap, order, iter)); |
| *snap_timestamp = tmp_snap_timestamp; |
| return Status::OK(); |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |