| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tserver/tablet_service.h" |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstdint> |
| #include <cstring> |
| #include <functional> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <type_traits> |
| #include <unordered_set> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/stubs/port.h> |
| |
| #include "kudu/clock/clock.h" |
| #include "kudu/common/column_predicate.h" |
| #include "kudu/common/columnar_serialization.h" |
| #include "kudu/common/columnblock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/encoded_key.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/iterator_stats.h" |
| #include "kudu/common/key_range.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/row_operations.pb.h" |
| #include "kudu/common/rowblock.h" |
| #include "kudu/common/rowblock_memory.h" |
| #include "kudu/common/scan_spec.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/timestamp.h" |
| #include "kudu/common/types.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/metadata.pb.h" |
| #include "kudu/consensus/opid.pb.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/consensus/replica_management.pb.h" |
| #include "kudu/consensus/time_manager.h" |
| #include "kudu/fs/fs_manager.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/inbound_call.h" |
| #include "kudu/rpc/remote_user.h" |
| #include "kudu/rpc/rpc_context.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #include "kudu/rpc/rpc_sidecar.h" |
| #include "kudu/rpc/rpc_verification_util.h" |
| #include "kudu/security/token.pb.h" |
| #include "kudu/security/token_verifier.h" |
| #include "kudu/server/server_base.h" |
| #include "kudu/tablet/compaction.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/mvcc.h" |
| #include "kudu/tablet/ops/alter_schema_op.h" |
| #include "kudu/tablet/ops/op.h" |
| #include "kudu/tablet/ops/participant_op.h" |
| #include "kudu/tablet/ops/write_op.h" |
| #include "kudu/tablet/rowset.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet.pb.h" |
| #include "kudu/tablet/tablet_metadata.h" |
| #include "kudu/tablet/tablet_metrics.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tablet/txn_coordinator.h" |
| #include "kudu/transactions/transactions.pb.h" |
| #include "kudu/transactions/txn_status_manager.h" |
| #include "kudu/tserver/scanners.h" |
| #include "kudu/tserver/tablet_replica_lookup.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.pb.h" |
| #include "kudu/tserver/tserver_service.pb.h" |
| #include "kudu/util/crc.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/fault_injection.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/memory/arena.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/process_memory.h" |
| #include "kudu/util/random_util.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/threadpool.h" |
| #include "kudu/util/trace.h" |
| #include "kudu/util/trace_metrics.h" |
| |
| DEFINE_int32(scanner_default_batch_size_bytes, 1024 * 1024, |
| "The default size for batches of scan results"); |
| TAG_FLAG(scanner_default_batch_size_bytes, advanced); |
| TAG_FLAG(scanner_default_batch_size_bytes, runtime); |
| |
| DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024, |
| "The maximum batch size that a client may request for " |
| "scan results."); |
| TAG_FLAG(scanner_max_batch_size_bytes, advanced); |
| TAG_FLAG(scanner_max_batch_size_bytes, runtime); |
| |
| // The default value is sized to a power of 2 to improve BitmapCopy performance |
| // when copying a RowBlock (in ORDERED scans). |
| DEFINE_int32(scanner_batch_size_rows, 128, |
| "The number of rows to batch for servicing scan requests."); |
| TAG_FLAG(scanner_batch_size_rows, advanced); |
| TAG_FLAG(scanner_batch_size_rows, runtime); |
| |
| DEFINE_bool(scanner_allow_snapshot_scans_with_logical_timestamps, false, |
| "If set, the server will support snapshot scans with logical timestamps."); |
| TAG_FLAG(scanner_allow_snapshot_scans_with_logical_timestamps, unsafe); |
| |
| DEFINE_int32(scanner_max_wait_ms, 1000, |
| "The maximum amount of time (in milliseconds) a scan " |
| "at a snapshot is allowed to wait for safe time to advance " |
| "or pending write operations to apply, even if the deadline " |
| "of the scan request itself allows for waiting longer."); |
| TAG_FLAG(scanner_max_wait_ms, advanced); |
| TAG_FLAG(scanner_max_wait_ms, runtime); |
| |
| // Fault injection flags. |
| DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0, |
| "If set, the scanner will pause the specified number of milliesconds " |
| "before reading each batch of data on the tablet server. " |
| "Used for tests."); |
| TAG_FLAG(scanner_inject_latency_on_each_batch_ms, unsafe); |
| |
| DEFINE_bool(scanner_inject_service_unavailable_on_continue_scan, false, |
| "If set, the scanner will return a ServiceUnavailable Status on " |
| "any Scan continuation RPC call. Used for tests."); |
| TAG_FLAG(scanner_inject_service_unavailable_on_continue_scan, unsafe); |
| |
| DEFINE_bool(scanner_unregister_on_invalid_seq_id, true, |
| "If set, an invalid sequence ID will cause a scanner to get unregistered. " |
| "Used for tests."); |
| TAG_FLAG(scanner_unregister_on_invalid_seq_id, unsafe); |
| |
| |
| DEFINE_bool(tserver_enforce_access_control, false, |
| "If set, the server will apply fine-grained access control rules " |
| "to client RPCs."); |
| TAG_FLAG(tserver_enforce_access_control, runtime); |
| |
| DEFINE_double(tserver_inject_invalid_authz_token_ratio, 0.0, |
| "Fraction of the time that authz token validation will fail. Used for tests."); |
| TAG_FLAG(tserver_inject_invalid_authz_token_ratio, hidden); |
| |
| DEFINE_bool(tserver_txn_write_op_handling_enabled, true, |
| "Whether to enable appropriate handling of write operations " |
| "in the context of multi-row transactions"); |
| TAG_FLAG(tserver_txn_write_op_handling_enabled, hidden); |
| |
| DECLARE_bool(enable_txn_system_client_init); |
| DECLARE_bool(raft_prepare_replacement_before_eviction); |
| DECLARE_int32(memory_limit_warn_threshold_percentage); |
| DECLARE_int32(tablet_history_max_age_sec); |
| DECLARE_uint32(txn_keepalive_interval_ms); |
| |
| METRIC_DEFINE_counter( |
| server, |
| op_apply_queue_overload_rejections, |
| "Number of Rejected Write Requests Due to Queue Overloaded Error", |
| kudu::MetricUnit::kRequests, |
| "Number of rejected write requests due to overloaded op apply queue", |
| kudu::MetricLevel::kWarn); |
| |
| using google::protobuf::RepeatedPtrField; |
| using kudu::consensus::BulkChangeConfigRequestPB; |
| using kudu::consensus::ChangeConfigRequestPB; |
| using kudu::consensus::ChangeConfigResponsePB; |
| using kudu::consensus::ConsensusRequestPB; |
| using kudu::consensus::ConsensusResponsePB; |
| using kudu::consensus::GetLastOpIdRequestPB; |
| using kudu::consensus::GetNodeInstanceRequestPB; |
| using kudu::consensus::GetNodeInstanceResponsePB; |
| using kudu::consensus::LeaderStepDownMode; |
| using kudu::consensus::LeaderStepDownRequestPB; |
| using kudu::consensus::LeaderStepDownResponsePB; |
| using kudu::consensus::OpId; |
| using kudu::consensus::RaftConsensus; |
| using kudu::consensus::RaftPeerPB; |
| using kudu::consensus::RunLeaderElectionRequestPB; |
| using kudu::consensus::RunLeaderElectionResponsePB; |
| using kudu::consensus::StartTabletCopyRequestPB; |
| using kudu::consensus::StartTabletCopyResponsePB; |
| using kudu::consensus::TimeManager; |
| using kudu::consensus::UnsafeChangeConfigRequestPB; |
| using kudu::consensus::UnsafeChangeConfigResponsePB; |
| using kudu::consensus::VoteRequestPB; |
| using kudu::consensus::VoteResponsePB; |
| using kudu::fault_injection::MaybeTrue; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::ErrorStatusPB; |
| using kudu::rpc::ParseTokenVerificationResult; |
| using kudu::rpc::RpcContext; |
| using kudu::rpc::RpcSidecar; |
| using kudu::security::TokenPB; |
| using kudu::security::TokenVerifier; |
| using kudu::server::ServerBase; |
| using kudu::tablet::AlterSchemaOpState; |
| using kudu::tablet::MvccSnapshot; |
| using kudu::tablet::OpCompletionCallback; |
| using kudu::tablet::ParticipantOpState; |
| using kudu::tablet::TABLET_DATA_COPYING; |
| using kudu::tablet::TABLET_DATA_DELETED; |
| using kudu::tablet::TABLET_DATA_TOMBSTONED; |
| using kudu::tablet::Tablet; |
| using kudu::tablet::TabletReplica; |
| using kudu::tablet::TabletStatePB; |
| using kudu::tablet::TxnMetadataPB;; |
| using kudu::tablet::WriteAuthorizationContext; |
| using kudu::tablet::WriteOpState; |
| using kudu::tablet::WritePrivilegeType; |
| using kudu::tablet::WritePrivileges; |
| using std::make_optional; |
| using std::nullopt; |
| using std::optional; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| |
| namespace cfile { |
| extern const char* CFILE_CACHE_MISS_BYTES_METRIC_NAME; |
| extern const char* CFILE_CACHE_HIT_BYTES_METRIC_NAME; |
| } |
| |
| namespace tserver { |
| |
| const char* SCANNER_BYTES_READ_METRIC_NAME = "scanner_bytes_read"; |
| |
| namespace { |
| |
| // Lookup the given tablet, only ensuring that it exists. |
| // If it does not, responds to the RPC associated with 'context' after setting |
| // resp->mutable_error() to indicate the failure reason. |
| // |
| // Returns true if successful. |
| template<class RespClass> |
| bool LookupTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager, |
| const string& tablet_id, |
| RespClass* resp, |
| RpcContext* context, |
| scoped_refptr<TabletReplica>* replica) { |
| Status s = tablet_manager->GetTabletReplica(tablet_id, replica); |
| if (PREDICT_FALSE(!s.ok())) { |
| if (s.IsServiceUnavailable()) { |
| // If the tablet manager isn't initialized, the remote should check again |
| // soon. |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, context); |
| } else { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::TABLET_NOT_FOUND, context); |
| } |
| return false; |
| } |
| return true; |
| } |
| |
| template<class RespClass> |
| void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica, |
| TabletStatePB tablet_state, |
| RespClass* resp, |
| RpcContext* context) { |
| Status s = Status::IllegalState("Tablet not RUNNING", |
| tablet::TabletStatePB_Name(tablet_state)); |
| auto error_code = TabletServerErrorPB::TABLET_NOT_RUNNING; |
| if (replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_TOMBSTONED || |
| replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_DELETED) { |
| // Treat tombstoned tablets as if they don't exist for most purposes. |
| // This takes precedence over failed, since we don't reset the failed |
| // status of a TabletReplica when deleting it. Only tablet copy does that. |
| error_code = TabletServerErrorPB::TABLET_NOT_FOUND; |
| } else if (tablet_state == tablet::FAILED) { |
| s = s.CloneAndAppend(replica->error().ToString()); |
| error_code = TabletServerErrorPB::TABLET_FAILED; |
| } |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| } |
| |
| // Check if the replica is running. |
| template<class RespClass> |
| bool CheckTabletReplicaRunningOrRespond(const scoped_refptr<TabletReplica>& replica, |
| RespClass* resp, |
| RpcContext* context) { |
| // Check RUNNING state. |
| TabletStatePB state = replica->state(); |
| if (PREDICT_FALSE(state != tablet::RUNNING)) { |
| RespondTabletNotRunning(replica, state, resp, context); |
| return false; |
| } |
| return true; |
| } |
| |
| // Lookup the given tablet, ensuring that it both exists and is RUNNING. |
| // If it is not, responds to the RPC associated with 'context' after setting |
| // resp->mutable_error() to indicate the failure reason. |
| // |
| // Returns true if successful. |
| template<class RespClass> |
| bool LookupRunningTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager, |
| const string& tablet_id, |
| RespClass* resp, |
| RpcContext* context, |
| scoped_refptr<TabletReplica>* replica) { |
| if (!LookupTabletReplicaOrRespond(tablet_manager, tablet_id, resp, context, replica)) { |
| return false; |
| } |
| if (!CheckTabletReplicaRunningOrRespond(*replica, resp, context)) { |
| return false; |
| } |
| return true; |
| } |
| |
| template<class ReqClass, class RespClass> |
| bool CheckUuidMatchOrRespond(TabletReplicaLookupIf* tablet_manager, |
| const char* method_name, |
| const ReqClass* req, |
| RespClass* resp, |
| RpcContext* context) { |
| const string& local_uuid = tablet_manager->NodeInstance().permanent_uuid(); |
| if (PREDICT_FALSE(!req->has_dest_uuid())) { |
| // Maintain compat in release mode, but complain. |
| string msg = Substitute("$0: Missing destination UUID in request from $1: $2", |
| method_name, context->requestor_string(), SecureShortDebugString(*req)); |
| #ifdef NDEBUG |
| KLOG_EVERY_N(ERROR, 100) << msg; |
| #else |
| LOG(DFATAL) << msg; |
| #endif |
| return true; |
| } |
| if (PREDICT_FALSE(req->dest_uuid() != local_uuid)) { |
| Status s = Status::InvalidArgument(Substitute("$0: Wrong destination UUID requested. " |
| "Local UUID: $1. Requested UUID: $2", |
| method_name, local_uuid, req->dest_uuid())); |
| LOG(WARNING) << s.ToString() << ": from " << context->requestor_string() |
| << ": " << SecureShortDebugString(*req); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::WRONG_SERVER_UUID, context); |
| return false; |
| } |
| return true; |
| } |
| |
| template<class RespClass> |
| bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>& replica, |
| RespClass* resp, |
| RpcContext* context, |
| shared_ptr<RaftConsensus>* consensus_out) { |
| shared_ptr<RaftConsensus> tmp_consensus = replica->shared_consensus(); |
| if (!tmp_consensus) { |
| Status s = Status::ServiceUnavailable("Raft Consensus unavailable", |
| "Tablet replica not initialized"); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::TABLET_NOT_RUNNING, context); |
| return false; |
| } |
| *consensus_out = std::move(tmp_consensus); |
| return true; |
| } |
| |
| template<class RespClass> |
| bool CheckTabletServerNotQuiescingOrRespond(const TabletServer* server, RespClass* resp, |
| RpcContext* context) { |
| if (PREDICT_FALSE(server->quiescing())) { |
| Status s = Status::ServiceUnavailable("Tablet server is quiescing"); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::TABLET_NOT_RUNNING, context); |
| return false; |
| } |
| return true; |
| } |
| |
| Status GetTabletRef(const scoped_refptr<TabletReplica>& replica, |
| shared_ptr<Tablet>* tablet, |
| TabletServerErrorPB::Code* error_code) { |
| *DCHECK_NOTNULL(tablet) = replica->shared_tablet(); |
| if (PREDICT_FALSE(!*tablet)) { |
| *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING; |
| return Status::IllegalState("Tablet is not running"); |
| } |
| return Status::OK(); |
| } |
| |
| template <class RespType> |
| void HandleUnknownError(const Status& s, RespType* resp, RpcContext* context) { |
| resp->Clear(); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| } |
| |
| template <class ReqType, class RespType> |
| void HandleResponse(const ReqType* req, RespType* resp, |
| RpcContext* context, const Status& s) { |
| if (PREDICT_FALSE(!s.ok())) { |
| HandleUnknownError(s, resp, context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| } // namespace |
| |
| typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB; |
| |
| // Populates 'required_column_privileges' with the column-level privileges |
| // required to perform the scan specified by 'scan_pb', consulting the column |
| // IDs found in 'schema'. |
| // |
| // Users of NewScanRequestPB (e.g. Scans and Checksums) require the following |
| // privileges: |
| // if no projected columns (i.e. a "counting" scan) || |
| // projected columns has virtual column (e.g. "diff" scan): |
| // SCAN ON TABLE || foreach (column): SCAN ON COLUMN |
| // else: |
| // if uses pk (e.g. ORDERED scan, or primary key fields set): |
| // foreach(primary key column): SCAN ON COLUMN |
| // foreach(projected column): SCAN ON COLUMN |
| // foreach(predicated column): SCAN ON COLUMN |
| // |
| // Returns false if the request is malformed (e.g. unknown non-virtual column |
| // name), and sends an error response via 'context' if so. 'req_type' is used |
| // to add context in logs. |
| static bool GetScanPrivilegesOrRespond(const NewScanRequestPB& scan_pb, const Schema& schema, |
| const string& req_type, |
| unordered_set<ColumnId>* required_column_privileges, |
| RpcContext* context) { |
| const auto respond_not_authorized = [&] (const string& col_name) { |
| LOG(WARNING) << Substitute("rejecting $0 request from $1: no column named '$2'", |
| req_type, context->requestor_string(), col_name); |
| context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED, |
| Status::NotAuthorized(Substitute("not authorized to $0", req_type))); |
| }; |
| // If there is no projection (i.e. this is a "counting" scan), the user |
| // needs full scan privileges on the table. |
| if (scan_pb.projected_columns_size() == 0) { |
| *required_column_privileges = unordered_set<ColumnId>(schema.column_ids().begin(), |
| schema.column_ids().end()); |
| return true; |
| } |
| unordered_set<ColumnId> required_privileges; |
| // Determine the scan's projected key column IDs. |
| for (int i = 0; i < scan_pb.projected_columns_size(); i++) { |
| optional<ColumnSchema> projected_column; |
| Status s = ColumnSchemaFromPB(scan_pb.projected_columns(i), &projected_column); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG(WARNING) << s.ToString(); |
| context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_REQUEST, s); |
| return false; |
| } |
| // A projection may contain virtual columns, which don't exist in the |
| // tablet schema. If we were to search for a virtual column, we would |
| // incorrectly get a "not found" error. To reconcile this with the fact |
| // that we want to return an authorization error if the user has requested |
| // a non-virtual column that doesn't exist, we require full scan privileges |
| // for virtual columns. |
| if (projected_column->type_info()->is_virtual()) { |
| *required_column_privileges = unordered_set<ColumnId>(schema.column_ids().begin(), |
| schema.column_ids().end()); |
| return true; |
| } |
| int col_idx = schema.find_column(projected_column->name()); |
| if (col_idx == Schema::kColumnNotFound) { |
| respond_not_authorized(scan_pb.projected_columns(i).name()); |
| return false; |
| } |
| EmplaceIfNotPresent(&required_privileges, schema.column_id(col_idx)); |
| } |
| // Ordered scans and any scans that make use of the primary key require |
| // privileges to scan across all primary key columns. |
| if (scan_pb.order_mode() == ORDERED || |
| scan_pb.has_start_primary_key() || |
| scan_pb.has_stop_primary_key() || |
| scan_pb.has_last_primary_key()) { |
| const auto& key_cols = schema.get_key_column_ids(); |
| required_privileges.insert(key_cols.begin(), key_cols.end()); |
| } |
| // Determine the scan's predicate column IDs. |
| for (int i = 0; i < scan_pb.column_predicates_size(); i++) { |
| int col_idx = schema.find_column(scan_pb.column_predicates(i).column()); |
| if (col_idx == Schema::kColumnNotFound) { |
| respond_not_authorized(scan_pb.column_predicates(i).column()); |
| return false; |
| } |
| EmplaceIfNotPresent(&required_privileges, schema.column_id(col_idx)); |
| } |
| // Do the same for the DEPRECATED_range_predicates field. Even though this |
| // field is deprecated, it is still exposed as a part of our public API and |
| // thus needs to be taken into account. |
| for (int i = 0; i < scan_pb.deprecated_range_predicates_size(); i++) { |
| int col_idx = schema.find_column(scan_pb.deprecated_range_predicates(i).column().name()); |
| if (col_idx == Schema::kColumnNotFound) { |
| respond_not_authorized(scan_pb.deprecated_range_predicates(i).column().name()); |
| return false; |
| } |
| EmplaceIfNotPresent(&required_privileges, schema.column_id(col_idx)); |
| } |
| *required_column_privileges = std::move(required_privileges); |
| return true; |
| } |
| |
| // Checks the column-level privileges required to perform the scan specified by |
| // 'scan_pb' against the authorized column IDs listed in |
| // 'authorized_column_ids', consulting the column IDs found in 'schema'. |
| // |
| // Returns false if the scan isn't authorized and uses 'context' to send an |
| // error response. 'req_type' is used for logging'. |
| static bool CheckScanPrivilegesOrRespond(const NewScanRequestPB& scan_pb, const Schema& schema, |
| const unordered_set<ColumnId>& authorized_column_ids, |
| const string& req_type, RpcContext* context) { |
| unordered_set<ColumnId> required_column_privileges; |
| if (!GetScanPrivilegesOrRespond(scan_pb, schema, req_type, |
| &required_column_privileges, context)) { |
| return false; |
| } |
| for (const auto& required_col_id : required_column_privileges) { |
| if (!ContainsKey(authorized_column_ids, required_col_id)) { |
| LOG(WARNING) << Substitute("rejecting $0 request from $1: authz token doesn't " |
| "authorize column ID $2", req_type, context->requestor_string(), |
| required_col_id); |
| context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED, |
| Status::NotAuthorized(Substitute("not authorized to $0", req_type))); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| // Returns false if the table ID of 'privilege' doesn't match 'table_id', |
| // responding with an error via 'context' if so. Otherwise, returns true. |
| // 'req_type' is used for logging purposes. |
| static bool CheckMatchingTableIdOrRespond(const security::TablePrivilegePB& privilege, |
| const string& table_id, const string& req_type, |
| RpcContext* context) { |
| if (privilege.table_id() != table_id) { |
| LOG(WARNING) << Substitute("rejecting $0 request from $1: '$2', expected '$3'", |
| req_type, context->requestor_string(), |
| privilege.table_id(), table_id); |
| context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN, |
| Status::NotAuthorized("authorization token is for the wrong table ID")); |
| return false; |
| } |
| return true; |
| } |
| |
| // Returns false if the privilege has neither full scan privileges nor any |
| // column-level scan privileges, in which case any scan-like request should be |
| // rejected. Otherwise returns true, and returns any column-level scan |
| // privileges in 'privilege'. |
| static bool CheckMayHaveScanPrivilegesOrRespond(const security::TablePrivilegePB& privilege, |
| const string& req_type, |
| unordered_set<ColumnId>* authorized_column_ids, |
| RpcContext* context) { |
| DCHECK(authorized_column_ids); |
| DCHECK(authorized_column_ids->empty()); |
| if (privilege.column_privileges_size() > 0) { |
| for (const auto& col_id_and_privilege : privilege.column_privileges()) { |
| if (col_id_and_privilege.second.scan_privilege()) { |
| EmplaceOrDie(authorized_column_ids, col_id_and_privilege.first); |
| } |
| } |
| } |
| if (privilege.scan_privilege() || !authorized_column_ids->empty()) { |
| return true; |
| } |
| LOG(WARNING) << Substitute("rejecting $0 request from $1: no column privileges", |
| req_type, context->requestor_string()); |
| context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED, |
| Status::NotAuthorized(Substitute("not authorized to $0", req_type))); |
| return false; |
| } |
| |
| // Verifies the authorization token's correctness. Returns false and sends an |
| // appropriate response if the request's authz token is invalid. |
| template <class AuthorizableRequest> |
| static bool VerifyAuthzTokenOrRespond(const TokenVerifier& token_verifier, |
| const AuthorizableRequest& req, |
| RpcContext* context, |
| TokenPB* token) { |
| DCHECK(token); |
| if (!req.has_authz_token()) { |
| context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN, |
| Status::NotAuthorized("no authorization token presented")); |
| return false; |
| } |
| TokenPB token_pb; |
| const auto result = token_verifier.VerifyTokenSignature(req.authz_token(), &token_pb); |
| ErrorStatusPB::RpcErrorCodePB error; |
| Status s = ParseTokenVerificationResult(result, |
| ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN, &error); |
| if (!s.ok()) { |
| context->RespondRpcFailure(error, s.CloneAndPrepend("authz token verification failure")); |
| return false; |
| } |
| if (!token_pb.has_authz() || |
| !token_pb.authz().has_table_privilege() || |
| token_pb.authz().username() != context->remote_user().username()) { |
| context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN, |
| Status::NotAuthorized("invalid authorization token presented")); |
| return false; |
| } |
| if (MaybeTrue(FLAGS_tserver_inject_invalid_authz_token_ratio)) { |
| context->RespondRpcFailure(ErrorStatusPB::ERROR_INVALID_AUTHORIZATION_TOKEN, |
| Status::NotAuthorized("INJECTED FAILURE")); |
| return false; |
| } |
| *token = std::move(token_pb); |
| return true; |
| } |
| |
| static void SetupErrorAndRespond(TabletServerErrorPB* error, |
| const Status& s, |
| TabletServerErrorPB::Code code, |
| RpcContext* context) { |
| // Non-authorized errors will drop the connection. |
| if (code == TabletServerErrorPB::NOT_AUTHORIZED) { |
| DCHECK(s.IsNotAuthorized()); |
| context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED, s); |
| return; |
| } |
| // Generic "service unavailable" errors will cause the client to retry later. |
| if ((code == TabletServerErrorPB::UNKNOWN_ERROR || |
| code == TabletServerErrorPB::THROTTLED) && s.IsServiceUnavailable()) { |
| context->RespondRpcFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, s); |
| return; |
| } |
| |
| StatusToPB(s, error->mutable_status()); |
| error->set_code(code); |
| context->RespondNoCache(); |
| } |
| |
| template <class ReqType, class RespType> |
| void HandleErrorResponse(const ReqType* req, RespType* resp, RpcContext* context, |
| const optional<TabletServerErrorPB::Code>& error_code, |
| const Status& s) { |
| resp->Clear(); |
| if (error_code) { |
| SetupErrorAndRespond(resp->mutable_error(), s, *error_code, context); |
| } else { |
| HandleUnknownError(s, resp, context); |
| } |
| } |
| |
| // A op completion callback that responds to the client when ops complete and |
| // sets the client error if there is one to set. |
| template<class Response> |
| class RpcOpCompletionCallback : public OpCompletionCallback { |
| public: |
| RpcOpCompletionCallback(RpcContext* context, Response* response) |
| : context_(context), |
| response_(response) {} |
| |
| void OpCompleted() override { |
| if (!status_.ok()) { |
| LOG(WARNING) << Substitute("failed op from $0: $1", |
| context_->requestor_string(), status_.ToString()); |
| SetupErrorAndRespond(get_error(), status_, code_, context_); |
| } else { |
| context_->RespondSuccess(); |
| } |
| } |
| |
| private: |
| TabletServerErrorPB* get_error() { |
| return response_->mutable_error(); |
| } |
| |
| rpc::RpcContext* context_; |
| Response* response_; |
| }; |
| |
| class TxnWriteCompletionCallback : public RpcOpCompletionCallback<WriteResponsePB> { |
| public: |
| TxnWriteCompletionCallback(RpcContext* context, WriteResponsePB* response, |
| std::function<Status(void)> abort_func) |
| : RpcOpCompletionCallback(context, response), |
| abort_func_(std::move(abort_func)) {} |
| |
| void OpCompleted() override { |
| if (PREDICT_FALSE(code_ == TabletServerErrorPB::TXN_LOCKED_ABORT)) { |
| WARN_NOT_OK(abort_func_(), "Error running txn abort callback"); |
| } |
| RpcOpCompletionCallback::OpCompleted(); |
| } |
| |
| private: |
| std::function<Status(void)> abort_func_; |
| }; |
| |
| // Generic interface to handle scan results. |
| class ScanResultCollector { |
| public: |
| virtual void HandleRowBlock(Scanner* scanner, |
| const RowBlock& row_block) = 0; |
| |
| // Returns number of bytes which will be returned in the response. |
| virtual int64_t ResponseSize() const = 0; |
| |
| // Return the number of rows actually returned to the client. |
| virtual int64_t NumRowsReturned() const = 0; |
| |
| // Initialize the serializer with the given row format flags. |
| // |
| // This is a separate function instead of a constructor argument passed to specific |
| // collector implementations because, currently, the collector is built before the |
| // request is decoded and checked for 'row_format_flags'. |
| // |
| // Does nothing by default. |
| virtual Status InitSerializer(uint64_t /* row_format_flags */, |
| const Schema& /* scanner_schema */, |
| const Schema& /* client_schema */) { |
| return Status::OK(); |
| } |
| |
| CpuTimes* cpu_times() { |
| return &cpu_times_; |
| } |
| |
| private: |
| CpuTimes cpu_times_; |
| }; |
| |
| namespace { |
| |
| // Given a RowBlock, set last_primary_key to the primary key of the last selected row |
| // in the RowBlock. If no row is selected, last_primary_key is not set. |
| void SetLastRow(const RowBlock& row_block, faststring* last_primary_key) { |
| // Find the last selected row and save its encoded key. |
| const SelectionVector* sel = row_block.selection_vector(); |
| if (sel->AnySelected()) { |
| for (int i = sel->nrows() - 1; i >= 0; i--) { |
| if (sel->IsRowSelected(i)) { |
| RowBlockRow last_row = row_block.row(i); |
| const Schema* schema = last_row.schema(); |
| schema->EncodeComparableKey(last_row, last_primary_key); |
| break; |
| } |
| } |
| } |
| } |
| |
| // Interface for serializing results into a scan response. |
| class ResultSerializer { |
| public: |
| virtual ~ResultSerializer() = default; |
| |
| // Add the given RowBlock to the pending response. |
| virtual int SerializeRowBlock(const RowBlock& row_block, |
| const Schema* client_projection_schema) = 0; |
| |
| // Return the approximate size (in bytes) of the pending response. Once this |
| // result is greater than the client's requested batch size, the pending rows |
| // will be returned to the client. |
| virtual size_t ResponseSize() const = 0; |
| |
| // Serialize the pending rows into the response protobuf. |
| // Must be called at most once. |
| virtual void SetupResponse(RpcContext* context, ScanResponsePB* resp) = 0; |
| }; |
| |
| class RowwiseResultSerializer : public ResultSerializer { |
| public: |
| RowwiseResultSerializer(int batch_size_bytes, uint64_t flags) |
| : rows_data_(batch_size_bytes * 11 / 10), |
| indirect_data_(batch_size_bytes * 11 / 10), |
| pad_unixtime_micros_to_16_bytes_(flags & RowFormatFlags::PAD_UNIX_TIME_MICROS_TO_16_BYTES) { |
| // TODO(todd): use a chain of faststrings instead of a single one to avoid |
| // allocating this large buffer. Large buffer allocations are slow and |
| // potentially wasteful. |
| } |
| |
| int SerializeRowBlock(const RowBlock& row_block, |
| const Schema* client_projection_schema) override { |
| // TODO(todd) create some kind of serializer object that caches the projection |
| // information to avoid recalculating it on every SerializeRowBlock call. |
| int num_selected = kudu::SerializeRowBlock( |
| row_block, client_projection_schema, |
| &rows_data_, &indirect_data_, pad_unixtime_micros_to_16_bytes_); |
| rowblock_pb_.set_num_rows(rowblock_pb_.num_rows() + num_selected); |
| return num_selected; |
| } |
| |
| size_t ResponseSize() const override { |
| return rows_data_.size() + indirect_data_.size(); |
| } |
| |
| void SetupResponse(RpcContext* context, ScanResponsePB* resp) override { |
| CHECK(!done_); |
| done_ = true; |
| |
| *resp->mutable_data() = std::move(rowblock_pb_); |
| // Add sidecar data to context and record the returned indices. |
| int rows_idx; |
| CHECK_OK(context->AddOutboundSidecar( |
| RpcSidecar::FromFaststring((std::move(rows_data_))), &rows_idx)); |
| resp->mutable_data()->set_rows_sidecar(rows_idx); |
| |
| // Add indirect data as a sidecar, if applicable. |
| if (indirect_data_.size() > 0) { |
| int indirect_idx; |
| CHECK_OK(context->AddOutboundSidecar( |
| RpcSidecar::FromFaststring(std::move(indirect_data_)), &indirect_idx)); |
| resp->mutable_data()->set_indirect_data_sidecar(indirect_idx); |
| } |
| } |
| |
| private: |
| RowwiseRowBlockPB rowblock_pb_; |
| faststring rows_data_; |
| faststring indirect_data_; |
| bool pad_unixtime_micros_to_16_bytes_; |
| bool done_ = false; |
| }; |
| |
| class ColumnarResultSerializer : public ResultSerializer { |
| public: |
| static Status Create(uint64_t flags, |
| int batch_size_bytes, |
| const Schema& scanner_schema, |
| const Schema& client_schema, |
| unique_ptr<ResultSerializer>* serializer) { |
| if (flags & ~RowFormatFlags::COLUMNAR_LAYOUT) { |
| return Status::InvalidArgument("Row format flags not supported with columnar layout"); |
| } |
| serializer->reset(new ColumnarResultSerializer( |
| scanner_schema, client_schema, batch_size_bytes)); |
| return Status::OK(); |
| } |
| |
| int SerializeRowBlock(const RowBlock& row_block, |
| const Schema* /* unused */) override { |
| CHECK(!done_); |
| int n_sel = results_.AddRowBlock(row_block); |
| num_rows_ += n_sel; |
| return n_sel; |
| } |
| |
| size_t ResponseSize() const override { |
| CHECK(!done_); |
| |
| int total = 0; |
| for (const auto& col : results_.columns()) { |
| total += col.data.size(); |
| if (col.varlen_data) { |
| total += col.varlen_data->size(); |
| } |
| if (col.non_null_bitmap) { |
| total += col.non_null_bitmap->size(); |
| } |
| } |
| return total; |
| } |
| |
| void SetupResponse(RpcContext* context, ScanResponsePB* resp) override { |
| CHECK(!done_); |
| done_ = true; |
| ColumnarRowBlockPB* data = resp->mutable_columnar_data(); |
| auto cols = std::move(results_).TakeColumns(); |
| for (auto& col : cols) { |
| auto* col_pb = data->add_columns(); |
| int sidecar_idx; |
| CHECK_OK(context->AddOutboundSidecar( |
| RpcSidecar::FromFaststring((std::move(col.data))), &sidecar_idx)); |
| col_pb->set_data_sidecar(sidecar_idx); |
| |
| if (col.varlen_data) { |
| CHECK_OK(context->AddOutboundSidecar( |
| RpcSidecar::FromFaststring((std::move(*col.varlen_data))), &sidecar_idx)); |
| col_pb->set_varlen_data_sidecar(sidecar_idx); |
| } |
| |
| if (col.non_null_bitmap) { |
| CHECK_OK(context->AddOutboundSidecar( |
| RpcSidecar::FromFaststring((std::move(*col.non_null_bitmap))), &sidecar_idx)); |
| col_pb->set_non_null_bitmap_sidecar(sidecar_idx); |
| } |
| } |
| data->set_num_rows(num_rows_); |
| } |
| |
| private: |
| ColumnarResultSerializer(const Schema& scanner_schema, |
| const Schema& client_schema, |
| int batch_size_bytes) |
| : results_(scanner_schema, client_schema, batch_size_bytes) { |
| } |
| |
| int64_t num_rows_ = 0; |
| ColumnarSerializedBatch results_; |
| bool done_ = false; |
| }; |
| |
| } // anonymous namespace |
| |
| // Copies the scan result to the given row block PB and data buffers. |
| // |
| // This implementation is used in the common case where a client is running |
| // a scan and the data needs to be returned to the client. |
| // |
| // (This is in contrast to some other ScanResultCollector implementation that |
| // might do an aggregation or gather some other types of statistics via a |
| // server-side scan and thus never need to return the actual data.) |
| class ScanResultCopier : public ScanResultCollector { |
| public: |
| explicit ScanResultCopier(int batch_size_bytes) |
| : batch_size_bytes_(batch_size_bytes), |
| num_rows_returned_(0) { |
| } |
| |
| void HandleRowBlock(Scanner* scanner, const RowBlock& row_block) override { |
| int num_selected = serializer_->SerializeRowBlock( |
| row_block, scanner->client_projection_schema()); |
| |
| if (num_selected > 0) { |
| num_rows_returned_ += num_selected; |
| scanner->add_num_rows_returned(num_selected); |
| SetLastRow(row_block, &last_primary_key_); |
| } |
| } |
| |
| // Returns number of bytes buffered to return. |
| int64_t ResponseSize() const override { |
| return serializer_->ResponseSize(); |
| } |
| |
| int64_t NumRowsReturned() const override { |
| return num_rows_returned_; |
| } |
| |
| Status InitSerializer(uint64_t row_format_flags, |
| const Schema& scanner_schema, |
| const Schema& client_schema) override { |
| if (serializer_) { |
| // TODO(todd) for the NewScanner case, this gets called twice |
| // which is a bit ugly. Refactor to avoid! |
| return Status::OK(); |
| } |
| if (row_format_flags & COLUMNAR_LAYOUT) { |
| return ColumnarResultSerializer::Create( |
| row_format_flags, batch_size_bytes_, scanner_schema, client_schema, &serializer_); |
| } |
| serializer_.reset(new RowwiseResultSerializer(batch_size_bytes_, row_format_flags)); |
| return Status::OK(); |
| } |
| |
| void SetupResponse(RpcContext* context, ScanResponsePB* resp) { |
| if (serializer_) { |
| serializer_->SetupResponse(context, resp); |
| } |
| |
| // Set the last row found by the collector. |
| // |
| // We could have an empty batch if all the remaining rows are filtered by the |
| // predicate, in which case do not set the last row. |
| if (last_primary_key_.length() > 0) { |
| resp->set_last_primary_key(last_primary_key_.ToString()); |
| } |
| } |
| |
| private: |
| int batch_size_bytes_; |
| int64_t num_rows_returned_; |
| faststring last_primary_key_; |
| unique_ptr<ResultSerializer> serializer_; |
| |
| DISALLOW_COPY_AND_ASSIGN(ScanResultCopier); |
| }; |
| |
| // Checksums the scan result. |
| class ScanResultChecksummer : public ScanResultCollector { |
| public: |
| ScanResultChecksummer() |
| : crc_(crc::GetCrc32cInstance()), |
| agg_checksum_(0), |
| rows_checksummed_(0) { |
| } |
| |
| virtual void HandleRowBlock(Scanner* scanner, |
| const RowBlock& row_block) OVERRIDE { |
| |
| const Schema* client_projection_schema = scanner->client_projection_schema(); |
| if (!client_projection_schema) { |
| client_projection_schema = row_block.schema(); |
| } |
| |
| size_t nrows = row_block.nrows(); |
| for (size_t i = 0; i < nrows; i++) { |
| if (!row_block.selection_vector()->IsRowSelected(i)) continue; |
| uint32_t row_crc = CalcRowCrc32(*client_projection_schema, row_block.row(i)); |
| agg_checksum_ += row_crc; |
| rows_checksummed_++; |
| } |
| // Find the last selected row and save its encoded key. |
| SetLastRow(row_block, &encoded_last_row_); |
| } |
| |
| // Returns a constant -- we only return checksum based on a time budget. |
| virtual int64_t ResponseSize() const OVERRIDE { return sizeof(agg_checksum_); } |
| |
| virtual int64_t NumRowsReturned() const OVERRIDE { |
| return 0; |
| } |
| |
| int64_t rows_checksummed() const { |
| return rows_checksummed_; |
| } |
| |
| // Accessors for initializing / setting the checksum. |
| void set_agg_checksum(uint64_t value) { agg_checksum_ = value; } |
| uint64_t agg_checksum() const { return agg_checksum_; } |
| |
| private: |
| // Calculates a CRC32C for the given row. |
| uint32_t CalcRowCrc32(const Schema& projection, const RowBlockRow& row) { |
| tmp_buf_.clear(); |
| |
| for (size_t j = 0; j < projection.num_columns(); j++) { |
| uint32_t col_index = static_cast<uint32_t>(j); // For the CRC. |
| tmp_buf_.append(&col_index, sizeof(col_index)); |
| ColumnBlockCell cell = row.cell(j); |
| if (cell.is_nullable()) { |
| uint8_t is_defined = cell.is_null() ? 0 : 1; |
| tmp_buf_.append(&is_defined, sizeof(is_defined)); |
| if (!is_defined) continue; |
| } |
| if (cell.typeinfo()->physical_type() == BINARY) { |
| const Slice* data = reinterpret_cast<const Slice *>(cell.ptr()); |
| tmp_buf_.append(data->data(), data->size()); |
| } else { |
| tmp_buf_.append(cell.ptr(), cell.size()); |
| } |
| } |
| |
| uint64_t row_crc = 0; |
| crc_->Compute(tmp_buf_.data(), tmp_buf_.size(), &row_crc, nullptr); |
| return static_cast<uint32_t>(row_crc); // CRC32 only uses the lower 32 bits. |
| } |
| |
| |
| faststring tmp_buf_; |
| crc::Crc* const crc_; |
| uint64_t agg_checksum_; |
| int64_t rows_checksummed_; |
| faststring encoded_last_row_; |
| |
| DISALLOW_COPY_AND_ASSIGN(ScanResultChecksummer); |
| }; |
| |
| // Return the batch size to use for a given request, after clamping |
| // the user-requested request within the server-side allowable range. |
| // This is only a hint, really more of a threshold since returned bytes |
| // may exceed this limit, but hopefully only by a little bit. |
| static size_t GetMaxBatchSizeBytesHint(const ScanRequestPB* req) { |
| if (!req->has_batch_size_bytes()) { |
| return FLAGS_scanner_default_batch_size_bytes; |
| } |
| |
| return std::min(req->batch_size_bytes(), |
| implicit_cast<uint32_t>(FLAGS_scanner_max_batch_size_bytes)); |
| } |
| |
| TabletServiceImpl::TabletServiceImpl(TabletServer* server) |
| : TabletServerServiceIf(server->metric_entity(), server->result_tracker()), |
| server_(server), |
| rng_(GetRandomSeed32()) { |
| num_op_apply_queue_rejections_ = server_->metric_entity()->FindOrCreateCounter( |
| &METRIC_op_apply_queue_overload_rejections); |
| } |
| |
| bool TabletServiceImpl::AuthorizeClientOrServiceUser(const google::protobuf::Message* /*req*/, |
| google::protobuf::Message* /*resp*/, |
| RpcContext* context) { |
| return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::USER | |
| ServerBase::SERVICE_USER); |
| } |
| |
| bool TabletServiceImpl::AuthorizeListTablets(const google::protobuf::Message* req, |
| google::protobuf::Message* resp, |
| RpcContext* context) { |
| if (FLAGS_tserver_enforce_access_control) { |
| return server_->Authorize(context, ServerBase::SUPER_USER); |
| } |
| return AuthorizeClient(req, resp, context); |
| } |
| |
| bool TabletServiceImpl::AuthorizeClient(const google::protobuf::Message* /*req*/, |
| google::protobuf::Message* /*resp*/, |
| RpcContext* context) { |
| return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::USER); |
| } |
| |
| void TabletServiceImpl::Ping(const PingRequestPB* /*req*/, |
| PingResponsePB* /*resp*/, |
| RpcContext* context) { |
| context->RespondSuccess(); |
| } |
| |
| TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server) |
| : TabletServerAdminServiceIf(server->metric_entity(), server->result_tracker()), |
| server_(server) { |
| } |
| |
| bool TabletServiceAdminImpl::AuthorizeServiceUser(const google::protobuf::Message* /*req*/, |
| google::protobuf::Message* /*resp*/, |
| RpcContext* context) { |
| return server_->Authorize(context, ServerBase::SUPER_USER | ServerBase::SERVICE_USER); |
| } |
| |
| void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req, |
| AlterSchemaResponsePB* resp, |
| RpcContext* context) { |
| if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "AlterSchema", req, resp, context)) { |
| return; |
| } |
| DVLOG(3) << "Received Alter Schema RPC: " << SecureDebugString(*req); |
| |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, |
| context, &replica)) { |
| return; |
| } |
| |
| uint32_t schema_version = replica->tablet_metadata()->schema_version(); |
| |
| // If the schema was already applied, respond as succeded |
| if (schema_version == req->schema_version()) { |
| // Sanity check, to verify that the tablet should have the same schema |
| // specified in the request. |
| Schema req_schema; |
| Status s = SchemaFromPB(req->schema(), &req_schema); |
| if (!s.ok()) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::INVALID_SCHEMA, context); |
| return; |
| } |
| |
| const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema(); |
| const Schema& tablet_schema = *tablet_schema_ptr; |
| if (req_schema == tablet_schema) { |
| context->RespondSuccess(); |
| return; |
| } |
| |
| schema_version = replica->tablet_metadata()->schema_version(); |
| if (schema_version == req->schema_version()) { |
| LOG(ERROR) << "The current schema does not match the request schema." |
| << " version=" << schema_version |
| << " current-schema=" << tablet_schema.ToString() |
| << " request-schema=" << req_schema.ToString() |
| << " (corruption)"; |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::Corruption("got a different schema for the same version number"), |
| TabletServerErrorPB::MISMATCHED_SCHEMA, context); |
| return; |
| } |
| } |
| |
| // If the current schema is newer than the one in the request reject the request. |
| if (schema_version > req->schema_version()) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Tablet has a newer schema"), |
| TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, context); |
| return; |
| } |
| |
| unique_ptr<AlterSchemaOpState> op_state( |
| new AlterSchemaOpState(replica.get(), req, resp)); |
| |
| op_state->set_completion_callback(unique_ptr<OpCompletionCallback>( |
| new RpcOpCompletionCallback<AlterSchemaResponsePB>(context, resp))); |
| |
| // Submit the alter schema op. The RPC will be responded to asynchronously. |
| Status s = replica->SubmitAlterSchema(std::move(op_state)); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| } |
| |
| namespace { |
| // Returns an error if 'op' is missing any required fields, or if it's of an |
| // unknown type. |
| Status ValidateCoordinatorOpFields(const CoordinatorOpPB& op) { |
| const auto& type = op.type(); |
| Status s; |
| switch (type) { |
| case CoordinatorOpPB::REGISTER_PARTICIPANT: |
| if (!op.has_txn_participant_id()) { |
| return Status::InvalidArgument(Substitute("Missing participant id: $0", |
| SecureShortDebugString(op))); |
| } |
| [[fallthrough]]; |
| case CoordinatorOpPB::BEGIN_TXN: |
| case CoordinatorOpPB::BEGIN_COMMIT_TXN: |
| case CoordinatorOpPB::ABORT_TXN: |
| case CoordinatorOpPB::GET_TXN_STATUS: |
| case CoordinatorOpPB::KEEP_TXN_ALIVE: |
| if (!op.has_txn_id()) { |
| return Status::InvalidArgument(Substitute("Missing txn id: $0", |
| SecureShortDebugString(op))); |
| } |
| return Status::OK(); |
| default: |
| return Status::InvalidArgument(Substitute("Unknown op type: $0", type)); |
| } |
| __builtin_unreachable(); |
| } |
| } // anonymous namespace |
| |
| void TabletServiceAdminImpl::CoordinateTransaction(const CoordinateTransactionRequestPB* req, |
| CoordinateTransactionResponsePB* resp, |
| RpcContext* context) { |
| if (PREDICT_FALSE(!req->has_txn_status_tablet_id() || |
| !req->has_op())) { |
| Status s = Status::InvalidArgument( |
| Substitute("Missing fields in request: $0", SecureShortDebugString(*req))); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| const auto& op = req->op(); |
| Status s = ValidateCoordinatorOpFields(op); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (PREDICT_FALSE(!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), |
| req->txn_status_tablet_id(), |
| resp, context, &replica))) { |
| return; |
| } |
| tablet::TxnCoordinator* txn_coordinator = replica->txn_coordinator(); |
| if (PREDICT_FALSE(!txn_coordinator)) { |
| Status s = Status::InvalidArgument( |
| Substitute("Requested tablet is not a txn coordinator: $0", replica->tablet_id())); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| // Catch any replication errors in this 'ts_error' so we can return an |
| // appropriate error to the caller if need be. |
| TabletServerErrorPB ts_error; |
| transactions::TxnStatusEntryPB txn_status; |
| const auto& user = op.user(); |
| const auto& txn_id = op.txn_id(); |
| int64_t highest_seen_txn_id = -1; |
| { |
| transactions::TxnStatusManager::ScopedLeaderSharedLock l(txn_coordinator); |
| if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, context)) { |
| return; |
| } |
| switch (op.type()) { |
| case CoordinatorOpPB::BEGIN_TXN: |
| s = txn_coordinator->BeginTransaction( |
| txn_id, user, &highest_seen_txn_id, &ts_error); |
| break; |
| case CoordinatorOpPB::REGISTER_PARTICIPANT: |
| s = txn_coordinator->RegisterParticipant(txn_id, op.txn_participant_id(), user, &ts_error); |
| break; |
| case CoordinatorOpPB::BEGIN_COMMIT_TXN: |
| s = txn_coordinator->BeginCommitTransaction(txn_id, user, &ts_error); |
| break; |
| case CoordinatorOpPB::ABORT_TXN: |
| s = txn_coordinator->AbortTransaction(txn_id, user, &ts_error); |
| break; |
| case CoordinatorOpPB::GET_TXN_STATUS: |
| s = txn_coordinator->GetTransactionStatus( |
| txn_id, user, &txn_status, &ts_error); |
| break; |
| case CoordinatorOpPB::KEEP_TXN_ALIVE: |
| s = txn_coordinator->KeepTransactionAlive(txn_id, user, &ts_error); |
| break; |
| default: |
| s = Status::InvalidArgument(Substitute("Unknown op type: $0", op.type())); |
| } |
| } |
| if (ts_error.has_status() && ts_error.status().code() != AppStatusPB::OK) { |
| *resp->mutable_error() = std::move(ts_error); |
| context->RespondNoCache(); |
| return; |
| } |
| // From here on out, errors are considered application errors. |
| if (PREDICT_FALSE(!s.ok())) { |
| StatusToPB(s, resp->mutable_op_result()->mutable_op_error()); |
| } else if (op.type() == CoordinatorOpPB::GET_TXN_STATUS) { |
| // Populate corresponding field in the response. |
| *(resp->mutable_op_result()->mutable_txn_status()) = std::move(txn_status); |
| } else if (op.type() == CoordinatorOpPB::BEGIN_TXN) { |
| resp->mutable_op_result()->set_keepalive_millis( |
| FLAGS_txn_keepalive_interval_ms); |
| } |
| if (op.type() == CoordinatorOpPB::BEGIN_TXN && !s.IsServiceUnavailable()) { |
| DCHECK_GE(highest_seen_txn_id, 0); |
| resp->mutable_op_result()->set_highest_seen_txn_id(highest_seen_txn_id); |
| } |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceAdminImpl::ParticipateInTransaction(const ParticipantRequestPB* req, |
| ParticipantResponsePB* resp, |
| RpcContext* context) { |
| if (!req->has_op() || !req->op().has_type() || !req->op().has_txn_id() || |
| !req->has_tablet_id()) { |
| Status s = Status::InvalidArgument( |
| Substitute("Missing fields in request: $0", SecureShortDebugString(*req))); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, |
| context, &replica)) { |
| return; |
| } |
| shared_ptr<Tablet> tablet; |
| TabletServerErrorPB::Code error_code; |
| Status s = GetTabletRef(replica, &tablet, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| const auto& op = req->op(); |
| if (op.type() == ParticipantOpPB::GET_METADATA) { |
| TxnMetadataPB pb; |
| const auto* meta = tablet->metadata(); |
| if (meta->GetTxnMetadataPB(op.txn_id(), &pb)) { |
| *resp->mutable_metadata() = std::move(pb); |
| context->RespondSuccess(); |
| return; |
| } |
| SetupErrorAndRespond( |
| resp->mutable_error(), |
| Status::InvalidArgument(Substitute("txn ID $0 has no metadata", op.txn_id())), |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| |
| // TODO(awong): consider memory-based throttling? |
| // TODO(awong): we should also persist the transaction's owner, and prevent |
| // other users from mutating it. |
| unique_ptr<ParticipantOpState> op_state( |
| new ParticipantOpState(replica.get(), tablet->txn_participant(), req, resp)); |
| op_state->set_completion_callback(unique_ptr<OpCompletionCallback>( |
| new RpcOpCompletionCallback<ParticipantResponsePB>(context, resp))); |
| s = replica->SubmitTxnParticipantOp(std::move(op_state)); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| } |
| } |
| |
| bool TabletServiceAdminImpl::SupportsFeature(uint32_t feature) const { |
| switch (feature) { |
| case TabletServerFeatures::COLUMN_PREDICATES: |
| case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES: |
| case TabletServerFeatures::QUIESCING: |
| case TabletServerFeatures::BLOOM_FILTER_PREDICATE_V2: |
| // TODO(awong): once transactions are useable, add a feature flag. |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| void TabletServiceAdminImpl::Quiesce(const QuiesceTabletServerRequestPB* req, |
| QuiesceTabletServerResponsePB* resp, |
| RpcContext* context) { |
| if (req->has_quiesce()) { |
| bool quiesce_tserver = req->quiesce(); |
| *server_->mutable_quiescing() = quiesce_tserver; |
| LOG(INFO) << Substitute("Tablet server $0 set to $1", |
| server_->fs_manager()->uuid(), |
| (quiesce_tserver ? "quiescing" : "not quiescing")); |
| } |
| if (req->return_stats()) { |
| resp->set_num_leaders(server_->num_raft_leaders()->value()); |
| resp->set_num_active_scanners(server_->scanner_manager()->CountActiveScanners()); |
| LOG(INFO) << Substitute("Tablet server has $0 leaders and $1 scanners", |
| resp->num_leaders(), resp->num_active_scanners()); |
| } |
| resp->set_is_quiescing(server_->quiescing()); |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req, |
| CreateTabletResponsePB* resp, |
| RpcContext* context) { |
| if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "CreateTablet", req, resp, context)) { |
| return; |
| } |
| TRACE_EVENT1("tserver", "CreateTablet", |
| "tablet_id", req->tablet_id()); |
| |
| Schema schema; |
| Status s = SchemaFromPB(req->schema(), &schema); |
| DCHECK(schema.has_column_ids()); |
| if (!s.ok()) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Invalid Schema."), |
| TabletServerErrorPB::INVALID_SCHEMA, context); |
| return; |
| } |
| |
| PartitionSchema partition_schema; |
| s = PartitionSchema::FromPB(req->partition_schema(), schema, &partition_schema); |
| if (!s.ok()) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Invalid PartitionSchema."), |
| TabletServerErrorPB::INVALID_SCHEMA, context); |
| return; |
| } |
| |
| Partition partition; |
| Partition::FromPB(req->partition(), &partition); |
| |
| LOG(INFO) << Substitute("Processing CreateTablet for tablet $0 ($1table=$2 [id=$3]), " |
| "partition=$4", req->tablet_id(), |
| req->has_table_type() ? TableTypePB_Name(req->table_type()) + " ": "", |
| req->table_name(), req->table_id(), |
| partition_schema.PartitionDebugString(partition, schema)); |
| VLOG(1) << "Full request: " << SecureDebugString(*req); |
| |
| s = server_->tablet_manager()->CreateNewTablet( |
| req->table_id(), |
| req->tablet_id(), |
| partition, |
| req->table_name(), |
| schema, |
| partition_schema, |
| req->config(), |
| req->has_extra_config() ? make_optional(req->extra_config()) : nullopt, |
| req->has_dimension_label() ? make_optional(req->dimension_label()) : nullopt, |
| req->has_table_type() && req->table_type() != TableTypePB::DEFAULT_TABLE ? |
| make_optional(req->table_type()) : nullopt, |
| nullptr); |
| if (PREDICT_FALSE(!s.ok())) { |
| TabletServerErrorPB::Code code; |
| if (s.IsAlreadyPresent()) { |
| code = TabletServerErrorPB::TABLET_ALREADY_EXISTS; |
| } else { |
| code = TabletServerErrorPB::UNKNOWN_ERROR; |
| } |
| SetupErrorAndRespond(resp->mutable_error(), s, code, context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req, |
| DeleteTabletResponsePB* resp, |
| RpcContext* context) { |
| if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "DeleteTablet", req, resp, context)) { |
| return; |
| } |
| TRACE_EVENT2("tserver", "DeleteTablet", |
| "tablet_id", req->tablet_id(), |
| "reason", req->reason()); |
| |
| tablet::TabletDataState delete_type = tablet::TABLET_DATA_UNKNOWN; |
| if (req->has_delete_type()) { |
| delete_type = req->delete_type(); |
| } |
| LOG(INFO) << "Processing DeleteTablet for tablet " << req->tablet_id() |
| << " with delete_type " << TabletDataState_Name(delete_type) |
| << (req->has_reason() ? (" (" + req->reason() + ")") : "") |
| << " from " << context->requestor_string(); |
| VLOG(1) << "Full request: " << SecureDebugString(*req); |
| |
| optional<int64_t> cas_config_opid_index_less_or_equal; |
| if (req->has_cas_config_opid_index_less_or_equal()) { |
| cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal(); |
| } |
| |
| auto response_callback = [context, req, resp](const Status& s, TabletServerErrorPB::Code code) { |
| if (PREDICT_FALSE(!s.ok())) { |
| HandleErrorResponse(req, resp, context, code, s); |
| return; |
| } |
| context->RespondSuccess(); |
| }; |
| server_->tablet_manager()->DeleteTabletAsync(req->tablet_id(), |
| delete_type, |
| cas_config_opid_index_less_or_equal, |
| response_callback); |
| } |
| |
| void TabletServiceImpl::Write(const WriteRequestPB* req, |
| WriteResponsePB* resp, |
| RpcContext* context) { |
| const auto& tablet_id = req->tablet_id(); |
| TRACE_EVENT1("tserver", "TabletServiceImpl::Write", |
| "tablet_id", tablet_id); |
| DVLOG(3) << "Received Write RPC: " << SecureDebugString(*req); |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond( |
| server_->tablet_manager(), tablet_id, resp, context, &replica)) { |
| return; |
| } |
| optional<WriteAuthorizationContext> authz_context; |
| if (FLAGS_tserver_enforce_access_control) { |
| TokenPB token; |
| if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) { |
| return; |
| } |
| const auto& privilege = token.authz().table_privilege(); |
| if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(), |
| "Write", context)) { |
| return; |
| } |
| WritePrivileges privileges; |
| if (privilege.insert_privilege()) { |
| InsertOrDie(&privileges, WritePrivilegeType::INSERT); |
| } |
| if (privilege.update_privilege()) { |
| InsertOrDie(&privileges, WritePrivilegeType::UPDATE); |
| } |
| if (privilege.delete_privilege()) { |
| InsertOrDie(&privileges, WritePrivilegeType::DELETE); |
| } |
| if (privileges.empty()) { |
| // If we know there are no write-related privileges outright, we can |
| // short-circuit further checking and reject the request immediately. |
| // Otherwise, we'll defer the checking to the prepare phase of the |
| // op after decoding the operations. |
| LOG(WARNING) << Substitute("rejecting Write request from $0: no write privileges", |
| context->requestor_string()); |
| context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED, |
| Status::NotAuthorized("not authorized to write")); |
| return; |
| } |
| authz_context = WriteAuthorizationContext{ privileges, /*requested_op_types=*/{} }; |
| } |
| |
| shared_ptr<Tablet> tablet; |
| TabletServerErrorPB::Code error_code; |
| Status s = GetTabletRef(replica, &tablet, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| |
| uint64_t bytes = req->row_operations().rows().size() + |
| req->row_operations().indirect_data().size(); |
| if (!tablet->ShouldThrottleAllow(bytes)) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::ServiceUnavailable("Rejecting Write request: throttled"), |
| TabletServerErrorPB::THROTTLED, |
| context); |
| return; |
| } |
| |
| // Check for memory pressure; don't bother doing any additional work if we've |
| // exceeded the limit. |
| double capacity_pct; |
| if (process_memory::SoftLimitExceeded(&capacity_pct)) { |
| tablet->metrics()->leader_memory_pressure_rejections->Increment(); |
| string msg = StringPrintf("Soft memory limit exceeded (at %.2f%% of capacity)", capacity_pct); |
| if (capacity_pct >= FLAGS_memory_limit_warn_threshold_percentage) { |
| KLOG_EVERY_N_SECS(WARNING, 1) << "Rejecting Write request: " << msg << THROTTLE_MSG; |
| } else { |
| KLOG_EVERY_N_SECS(INFO, 1) << "Rejecting Write request: " << msg << THROTTLE_MSG; |
| } |
| SetupErrorAndRespond(resp->mutable_error(), Status::ServiceUnavailable(msg), |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| |
| if (!server_->clock()->SupportsExternalConsistencyMode(req->external_consistency_mode())) { |
| Status s = Status::NotSupported("The configured clock does not support the" |
| " required consistency mode."); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| |
| // If the apply queue is overloaded, the write request might be rejected. |
| // The longer the queue was in overloaded state, the higher the probability |
| // of rejecting the request. |
| MonoDelta queue_otime; |
| MonoDelta threshold; |
| if (server_->tablet_apply_pool()->QueueOverloaded(&queue_otime, &threshold)) { |
| DCHECK(threshold.Initialized()); |
| DCHECK_GT(threshold.ToMilliseconds(), 0); |
| auto overload_threshold_ms = threshold.ToMilliseconds(); |
| // The longer the queue has been in the overloaded state, the higher the |
| // probability of an op to be rejected. |
| auto time_factor = queue_otime.ToMilliseconds() / overload_threshold_ms + 1; |
| if (!rng_.OneIn(time_factor * time_factor + 1)) { |
| static const Status kStatus = Status::ServiceUnavailable( |
| "op apply queue is overloaded"); |
| num_op_apply_queue_rejections_->Increment(); |
| return SetupErrorAndRespond(resp->mutable_error(), |
| kStatus, |
| TabletServerErrorPB::THROTTLED, |
| context); |
| } |
| } |
| |
| unique_ptr<WriteOpState> op_state(new WriteOpState( |
| replica.get(), |
| req, |
| context->AreResultsTracked() ? context->request_id() : nullptr, |
| resp, |
| std::move(authz_context))); |
| |
| // If the client sent us a timestamp, decode it and update the clock so that all future |
| // timestamps are greater than the passed timestamp. |
| if (req->has_propagated_timestamp()) { |
| Timestamp ts(req->propagated_timestamp()); |
| s = server_->clock()->Update(ts); |
| } |
| if (PREDICT_FALSE(!s.ok())) { |
| return SetupErrorAndRespond( |
| resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context); |
| } |
| |
| const auto deadline = context->GetClientDeadline(); |
| const auto& username = context->remote_user().username(); |
| |
| if (!req->has_txn_id() || |
| PREDICT_FALSE(!FLAGS_tserver_txn_write_op_handling_enabled)) { |
| op_state->set_completion_callback(unique_ptr<OpCompletionCallback>( |
| new RpcOpCompletionCallback<WriteResponsePB>(context, resp))); |
| |
| // Submit the write operation. The RPC will be responded asynchronously. |
| s = replica->SubmitWrite(std::move(op_state)); |
| } else { |
| if (!FLAGS_enable_txn_system_client_init) { |
| return SetupErrorAndRespond( |
| resp->mutable_error(), |
| Status::NotSupported(Substitute("txns not supported on server $0", |
| replica->permanent_uuid())), |
| TabletServerErrorPB::UNKNOWN_ERROR, context); |
| } |
| auto abort_func = [this, txn_id = req->txn_id(), &username] { |
| return server_->tablet_manager()->ScheduleAbortTxn(txn_id, username); |
| }; |
| op_state->set_completion_callback(unique_ptr<OpCompletionCallback>( |
| new TxnWriteCompletionCallback(context, resp, std::move(abort_func)))); |
| |
| // If it's a write operation in the context of a multi-row transaction, |
| // schedule running preliminary tasks if necessary: register the tablet as |
| // a participant in the transaction and begin transaction for the |
| // participating tablet. |
| // |
| // This functor is to schedule preliminary tasks prior to submitting |
| // the write operation via TabletReplica::SubmitWrite(). |
| const auto scheduler = [this, &username, replica, deadline]( |
| int64_t txn_id, tablet::RegisteredTxnCallback began_txn_cb) { |
| return server_->tablet_manager()->SchedulePreliminaryTasksForTxnWrite( |
| std::move(replica), txn_id, username, deadline, std::move(began_txn_cb)); |
| }; |
| s = replica->SubmitTxnWrite(std::move(op_state), scheduler); |
| VLOG(2) << Substitute("submitting txn write op: $0", s.ToString()); |
| } |
| |
| // Check that we could submit the write |
| if (PREDICT_FALSE(!s.ok())) { |
| return SetupErrorAndRespond( |
| resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR, context); |
| } |
| } |
| |
| ConsensusServiceImpl::ConsensusServiceImpl(ServerBase* server, |
| TabletReplicaLookupIf* tablet_manager) |
| : ConsensusServiceIf(server->metric_entity(), server->result_tracker()), |
| server_(server), |
| tablet_manager_(tablet_manager) { |
| } |
| |
| ConsensusServiceImpl::~ConsensusServiceImpl() { |
| } |
| |
| bool ConsensusServiceImpl::AuthorizeServiceUser(const google::protobuf::Message* /*req*/, |
| google::protobuf::Message* /*resp*/, |
| RpcContext* rpc) { |
| return server_->Authorize(rpc, ServerBase::SUPER_USER | ServerBase::SERVICE_USER); |
| } |
| |
| void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req, |
| ConsensusResponsePB* resp, |
| RpcContext* context) { |
| DVLOG(3) << "Received Consensus Update RPC: " << SecureDebugString(*req); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, |
| &replica)) { |
| return; |
| } |
| |
| // Submit the update directly to the TabletReplica's RaftConsensus instance. |
| shared_ptr<RaftConsensus> consensus; |
| if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; |
| Status s = consensus->Update(req, resp); |
| if (PREDICT_FALSE(!s.ok())) { |
| // Clear the response first, since a partially-filled response could |
| // result in confusing a caller, or in having missing required fields |
| // in embedded optional messages. |
| resp->Clear(); |
| |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req, |
| VoteResponsePB* resp, |
| RpcContext* context) { |
| DVLOG(3) << "Received Consensus Request Vote RPC: " << SecureDebugString(*req); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, context)) { |
| return; |
| } |
| |
| // Because the last-logged opid is stored in the TabletMetadata we go through |
| // the following dance: |
| // 1. Get a reference to the currently-registered TabletReplica. |
| // 2. Fetch (non-atomically) the current data state and last-logged opid from |
| // the TabletMetadata. |
| // 3. If the data state is COPYING or TOMBSTONED, pass the last-logged opid |
| // from the TabletMetadata into RaftConsensus::RequestVote(). |
| // |
| // The reason this sequence is safe to do without atomic locks is the |
| // RaftConsensus object associated with the TabletReplica will be Shutdown() |
| // and thus unable to vote if another TabletCopy operation comes between |
| // steps 1 and 3. |
| // |
| // TODO(mpercy): Move the last-logged opid into ConsensusMetadata to avoid |
| // this hacky plumbing. An additional benefit would be that we would be able |
| // to easily "tombstoned vote" while the tablet is bootstrapping. |
| |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) { |
| return; |
| } |
| |
| optional<OpId> last_logged_opid; |
| tablet::TabletDataState data_state = replica->tablet_metadata()->tablet_data_state(); |
| |
| LOG(INFO) << "Received RequestConsensusVote() RPC: " << SecureShortDebugString(*req); |
| |
| // We cannot vote while DELETED. This check is not racy because DELETED is a |
| // terminal state; it is not possible to transition out of DELETED. |
| if (data_state == TABLET_DATA_DELETED) { |
| RespondTabletNotRunning(replica, replica->state(), resp, context); |
| return; |
| } |
| |
| // Attempt to vote while copying or tombstoned. |
| if (data_state == TABLET_DATA_COPYING || data_state == TABLET_DATA_TOMBSTONED) { |
| last_logged_opid = replica->tablet_metadata()->tombstone_last_logged_opid(); |
| } |
| |
| // Submit the vote request directly to the consensus instance. |
| shared_ptr<RaftConsensus> consensus; |
| if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; |
| |
| Status s = consensus->RequestVote(req, |
| consensus::TabletVotingState(std::move(last_logged_opid), |
| data_state), |
| resp); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req, |
| ChangeConfigResponsePB* resp, |
| RpcContext* context) { |
| VLOG(1) << "Received ChangeConfig RPC: " << SecureDebugString(*req); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, |
| &replica)) { |
| return; |
| } |
| |
| shared_ptr<RaftConsensus> consensus; |
| if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; |
| optional<TabletServerErrorPB::Code> error_code; |
| Status s = consensus->ChangeConfig( |
| *req, [req, resp, context](const Status& s) { |
| HandleResponse(req, resp, context, s); |
| }, |
| &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| HandleErrorResponse(req, resp, context, error_code, s); |
| return; |
| } |
| // The success case is handled when the callback fires. |
| } |
| |
| void ConsensusServiceImpl::BulkChangeConfig(const BulkChangeConfigRequestPB* req, |
| ChangeConfigResponsePB* resp, |
| RpcContext* context) { |
| VLOG(1) << "Received BulkChangeConfig RPC: " << SecureDebugString(*req); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "BulkChangeConfig", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, |
| &replica)) { |
| return; |
| } |
| |
| shared_ptr<RaftConsensus> consensus; |
| if (!GetConsensusOrRespond(replica, resp, context, &consensus)) { |
| return; |
| } |
| optional<TabletServerErrorPB::Code> error_code; |
| Status s = consensus->BulkChangeConfig( |
| *req, [req, resp, context](const Status& s) { |
| HandleResponse(req, resp, context, s); |
| }, |
| &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| HandleErrorResponse(req, resp, context, error_code, s); |
| return; |
| } |
| // The success case is handled when the callback fires. |
| } |
| |
| void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* req, |
| UnsafeChangeConfigResponsePB* resp, |
| RpcContext* context) { |
| LOG(INFO) << "Received UnsafeChangeConfig RPC: " << SecureDebugString(*req) |
| << " from " << context->requestor_string(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "UnsafeChangeConfig", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, |
| &replica)) { |
| return; |
| } |
| |
| shared_ptr<RaftConsensus> consensus; |
| if (!GetConsensusOrRespond(replica, resp, context, &consensus)) { |
| return; |
| } |
| optional<TabletServerErrorPB::Code> error_code; |
| const Status s = consensus->UnsafeChangeConfig(*req, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| HandleErrorResponse(req, resp, context, error_code, s); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::GetNodeInstance(const GetNodeInstanceRequestPB* req, |
| GetNodeInstanceResponsePB* resp, |
| RpcContext* context) { |
| VLOG(1) << "Received Get Node Instance RPC: " << SecureDebugString(*req); |
| resp->mutable_node_instance()->CopyFrom(tablet_manager_->NodeInstance()); |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* req, |
| RunLeaderElectionResponsePB* resp, |
| RpcContext* context) { |
| LOG(INFO) << "Received Run Leader Election RPC: " << SecureDebugString(*req) |
| << " from " << context->requestor_string(); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "RunLeaderElection", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, |
| &replica)) { |
| return; |
| } |
| |
| shared_ptr<RaftConsensus> consensus; |
| if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; |
| Status s = consensus->StartElection( |
| consensus::RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, |
| consensus::RaftConsensus::EXTERNAL_REQUEST); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req, |
| LeaderStepDownResponsePB* resp, |
| RpcContext* context) { |
| LOG(INFO) << "Received LeaderStepDown RPC: " << SecureDebugString(*req) |
| << " from " << context->requestor_string(); |
| if (PREDICT_FALSE(!req->new_leader_uuid().empty() && |
| req->mode() == LeaderStepDownMode::ABRUPT)) { |
| Status s = Status::InvalidArgument( |
| "cannot specify a new leader uuid for an abrupt step down"); |
| SetupErrorAndRespond(resp->mutable_error(), s, |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| } |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "LeaderStepDown", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, |
| &replica)) { |
| return; |
| } |
| |
| shared_ptr<RaftConsensus> consensus; |
| if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; |
| switch (req->mode()) { |
| case LeaderStepDownMode::ABRUPT: |
| HandleResponse(req, resp, context, consensus->StepDown(resp)); |
| break; |
| case LeaderStepDownMode::GRACEFUL: { |
| const auto new_leader_uuid = req->new_leader_uuid().empty() |
| ? nullopt |
| : make_optional(req->new_leader_uuid()); |
| Status s = consensus->TransferLeadership(new_leader_uuid, resp); |
| HandleResponse(req, resp, context, s); |
| break; |
| } |
| default: |
| Status s = Status::InvalidArgument( |
| Substitute("unknown LeaderStepDown mode: $0", req->mode())); |
| HandleUnknownError(s, resp, context); |
| } |
| } |
| |
| void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req, |
| consensus::GetLastOpIdResponsePB *resp, |
| RpcContext *context) { |
| DVLOG(3) << "Received GetLastOpId RPC: " << SecureDebugString(*req); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, context)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, |
| &replica)) { |
| return; |
| } |
| |
| if (replica->state() != tablet::RUNNING) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::ServiceUnavailable("TabletReplica not in RUNNING state"), |
| TabletServerErrorPB::TABLET_NOT_RUNNING, context); |
| return; |
| } |
| shared_ptr<RaftConsensus> consensus; |
| if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; |
| if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) { |
| HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"), |
| resp, context); |
| return; |
| } |
| optional<OpId> opid = consensus->GetLastOpId(req->opid_type()); |
| if (!opid) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::IllegalState("Cannot fetch last OpId in WAL"), |
| TabletServerErrorPB::TABLET_NOT_RUNNING, |
| context); |
| return; |
| } |
| *resp->mutable_opid() = *opid; |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateRequestPB* req, |
| consensus::GetConsensusStateResponsePB* resp, |
| RpcContext* context) { |
| DVLOG(3) << "Received GetConsensusState RPC: " << SecureDebugString(*req); |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "GetConsensusState", req, resp, context)) { |
| return; |
| } |
| |
| unordered_set<string> requested_ids(req->tablet_ids().begin(), req->tablet_ids().end()); |
| bool all_ids = requested_ids.empty(); |
| |
| vector<scoped_refptr<TabletReplica>> tablet_replicas; |
| tablet_manager_->GetTabletReplicas(&tablet_replicas); |
| for (const scoped_refptr<TabletReplica>& replica : tablet_replicas) { |
| if (!all_ids && !ContainsKey(requested_ids, replica->tablet_id())) { |
| continue; |
| } |
| |
| shared_ptr<RaftConsensus> consensus(replica->shared_consensus()); |
| if (!consensus) { |
| continue; |
| } |
| |
| consensus::GetConsensusStateResponsePB_TabletConsensusInfoPB tablet_info; |
| Status s = consensus->ConsensusState(tablet_info.mutable_cstate(), req->report_health()); |
| if (!s.ok()) { |
| DCHECK(s.IsIllegalState()) << s.ToString(); |
| continue; |
| } |
| tablet_info.set_tablet_id(replica->tablet_id()); |
| *resp->add_tablets() = std::move(tablet_info); |
| } |
| const auto scheme = FLAGS_raft_prepare_replacement_before_eviction |
| ? consensus::ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION |
| : consensus::ReplicaManagementInfoPB::EVICT_FIRST; |
| resp->mutable_replica_management_info()->set_replacement_scheme(scheme); |
| |
| context->RespondSuccess(); |
| } |
| |
| void ConsensusServiceImpl::StartTabletCopy(const StartTabletCopyRequestPB* req, |
| StartTabletCopyResponsePB* resp, |
| RpcContext* context) { |
| if (!CheckUuidMatchOrRespond(tablet_manager_, "StartTabletCopy", req, resp, context)) { |
| return; |
| } |
| auto response_callback = [context, resp](const Status& s, TabletServerErrorPB::Code code) { |
| if (s.ok()) { |
| context->RespondSuccess(); |
| } else { |
| // Skip calling SetupErrorAndRespond since this path doesn't need the |
| // error to be transformed. |
| StatusToPB(s, resp->mutable_error()->mutable_status()); |
| resp->mutable_error()->set_code(code); |
| context->RespondNoCache(); |
| } |
| }; |
| tablet_manager_->StartTabletCopy(req, response_callback); |
| } |
| |
| void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req, |
| ScannerKeepAliveResponsePB *resp, |
| RpcContext *context) { |
| DCHECK(req->has_scanner_id()); |
| SharedScanner scanner; |
| TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR; |
| Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(), |
| context->remote_user().username(), |
| &error_code, |
| &scanner); |
| if (!s.ok()) { |
| StatusToPB(s, resp->mutable_error()->mutable_status()); |
| LOG(INFO) << Substitute("ScannerKeepAlive: $0: remote=$1", |
| s.ToString(), context->requestor_string()); |
| if (PREDICT_TRUE(s.IsNotFound())) { |
| resp->mutable_error()->set_code(error_code); |
| StatusToPB(s, resp->mutable_error()->mutable_status()); |
| context->RespondSuccess(); |
| return; |
| } |
| DCHECK(s.IsNotAuthorized()); |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| { |
| // Locking for access has the side effect of updating the access time. |
| // Here we do a trylock -- a failure indicates that there is already another |
| // thread currently accessing the scanner, so that thread will update the |
| // access time upon release of the lock. |
| auto lock = scanner->TryLockForAccess(); |
| } |
| |
| context->RespondSuccess(); |
| } |
| |
| namespace { |
| void SetResourceMetrics(const RpcContext* context, |
| const CpuTimes* cpu_times, |
| ResourceMetricsPB* metrics) { |
| metrics->set_cfile_cache_miss_bytes( |
| context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_MISS_BYTES_METRIC_NAME)); |
| metrics->set_cfile_cache_hit_bytes( |
| context->trace()->metrics()->GetMetric(cfile::CFILE_CACHE_HIT_BYTES_METRIC_NAME)); |
| |
| metrics->set_bytes_read( |
| context->trace()->metrics()->GetMetric(SCANNER_BYTES_READ_METRIC_NAME)); |
| |
| rpc::InboundCallTiming timing; |
| timing.time_handled = context->GetTimeHandled(); |
| timing.time_received = context->GetTimeReceived(); |
| timing.time_completed = MonoTime::Now(); |
| |
| metrics->set_queue_duration_nanos(timing.QueueDuration().ToNanoseconds()); |
| metrics->set_total_duration_nanos(timing.TotalDuration().ToNanoseconds()); |
| metrics->set_cpu_system_nanos(cpu_times->system); |
| metrics->set_cpu_user_nanos(cpu_times->user); |
| } |
| } // anonymous namespace |
| |
| void TabletServiceImpl::Scan(const ScanRequestPB* req, |
| ScanResponsePB* resp, |
| RpcContext* context) { |
| TRACE_EVENT0("tserver", "TabletServiceImpl::Scan"); |
| |
| // Validate the request: user must pass a new_scan_request or |
| // a scanner ID, but not both. |
| if (PREDICT_FALSE(req->has_scanner_id() && |
| req->has_new_scan_request())) { |
| context->RespondFailure(Status::InvalidArgument( |
| "Must not pass both a scanner_id and new_scan_request")); |
| return; |
| } |
| |
| // If this is a new scan request, we must enforce the appropriate privileges. |
| TokenPB token; |
| if (FLAGS_tserver_enforce_access_control && req->has_new_scan_request()) { |
| const auto& scan_pb = req->new_scan_request(); |
| if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), |
| req->new_scan_request(), context, &token)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), |
| req->new_scan_request().tablet_id(), resp, context, &replica)) { |
| return; |
| } |
| const auto& privilege = token.authz().table_privilege(); |
| if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(), |
| "Scan", context)) { |
| return; |
| } |
| unordered_set<ColumnId> authorized_column_ids; |
| if (!CheckMayHaveScanPrivilegesOrRespond(privilege, "Scan", &authorized_column_ids, context)) { |
| return; |
| } |
| // If the token doesn't have full scan privileges for the table, check |
| // for required privileges based on the scan request. |
| if (!privilege.scan_privilege()) { |
| const SchemaPtr schema_ptr = replica->tablet_metadata()->schema(); |
| if (!CheckScanPrivilegesOrRespond(scan_pb, *schema_ptr, authorized_column_ids, |
| "Scan", context)) { |
| return; |
| } |
| } |
| } |
| |
| ScanResultCopier collector(GetMaxBatchSizeBytesHint(req)); |
| |
| bool has_more_results = false; |
| TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR; |
| if (req->has_new_scan_request()) { |
| if (!CheckTabletServerNotQuiescingOrRespond(server_, resp, context)) { |
| return; |
| } |
| const NewScanRequestPB& scan_pb = req->new_scan_request(); |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), scan_pb.tablet_id(), resp, |
| context, &replica)) { |
| return; |
| } |
| string scanner_id; |
| Timestamp scan_timestamp; |
| Status s = HandleNewScanRequest(replica.get(), req, context, |
| &collector, &scanner_id, &scan_timestamp, &has_more_results, |
| &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| |
| // Only set the scanner id if we have more results. |
| if (has_more_results) { |
| resp->set_scanner_id(scanner_id); |
| } |
| if (scan_timestamp != Timestamp::kInvalidTimestamp) { |
| resp->set_snap_timestamp(scan_timestamp.ToUint64()); |
| } |
| } else if (req->has_scanner_id()) { |
| Status s = HandleContinueScanRequest(req, context, &collector, &has_more_results, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| } else { |
| context->RespondFailure(Status::InvalidArgument( |
| "Must pass either a scanner_id or new_scan_request")); |
| return; |
| } |
| |
| collector.SetupResponse(context, resp); |
| resp->set_has_more_results(has_more_results); |
| resp->set_propagated_timestamp(server_->clock()->Now().ToUint64()); |
| |
| SetResourceMetrics(context, collector.cpu_times(), resp->mutable_resource_metrics()); |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req, |
| ListTabletsResponsePB* resp, |
| RpcContext* context) { |
| vector<scoped_refptr<TabletReplica>> replicas; |
| server_->tablet_manager()->GetTabletReplicas(&replicas); |
| RepeatedPtrField<StatusAndSchemaPB>* replica_status = resp->mutable_status_and_schema(); |
| for (const scoped_refptr<TabletReplica>& replica : replicas) { |
| StatusAndSchemaPB* status = replica_status->Add(); |
| replica->GetTabletStatusPB(status->mutable_tablet_status()); |
| if (status->tablet_status().state() == tablet::RUNNING) { |
| status->set_role(replica->consensus()->role()); |
| } else { |
| status->set_role(RaftPeerPB::UNKNOWN_ROLE); |
| } |
| |
| if (req->need_schema_info()) { |
| const SchemaPtr schema_ptr = replica->tablet_metadata()->schema(); |
| const Schema& tablet_schema = *schema_ptr; |
| CHECK_OK(SchemaToPB(tablet_schema, status->mutable_schema())); |
| CHECK_OK(replica->tablet_metadata()->partition_schema().ToPB( |
| tablet_schema, status->mutable_partition_schema())); |
| status->set_schema_version(replica->tablet_metadata()->schema_version()); |
| } |
| } |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req, |
| SplitKeyRangeResponsePB* resp, |
| RpcContext* context) { |
| TRACE_EVENT1("tserver", "TabletServiceImpl::SplitKeyRange", |
| "tablet_id", req->tablet_id()); |
| DVLOG(3) << "Received SplitKeyRange RPC: " << SecureDebugString(*req); |
| |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, |
| context, &replica)) { |
| return; |
| } |
| |
| if (FLAGS_tserver_enforce_access_control) { |
| TokenPB token; |
| if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), *req, context, &token)) { |
| return; |
| } |
| const auto& privilege = token.authz().table_privilege(); |
| if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(), |
| "SplitKeyRange", context)) { |
| return; |
| } |
| // Split-key requests require: |
| // if uses pk (e.g. primary key fields set): |
| // foreach(primary key column): SCAN ON COLUMN |
| // foreach(requested column): SCAN ON COLUMN |
| // |
| // If the privilege doesn't have full scan privileges, or column-level scan |
| // privileges, the user is definitely not authorized to perform a scan. |
| unordered_set<ColumnId> authorized_column_ids; |
| if (!CheckMayHaveScanPrivilegesOrRespond(privilege, "SplitKeyRange", |
| &authorized_column_ids, context)) { |
| return; |
| } |
| if (!privilege.scan_privilege()) { |
| const SchemaPtr schema_ptr = replica->tablet_metadata()->schema(); |
| const Schema& schema = *schema_ptr; |
| unordered_set<ColumnId> required_column_privileges; |
| if (req->has_start_primary_key() || req->has_stop_primary_key()) { |
| const auto& key_cols = schema.get_key_column_ids(); |
| required_column_privileges.insert(key_cols.begin(), key_cols.end()); |
| } |
| bool is_authorized = true; |
| const string rejection_prefix = Substitute("rejecting SplitKeyRange request from $0", |
| context->requestor_string()); |
| for (int i = 0; i < req->columns_size(); i++) { |
| const auto& column_name = req->columns(i).name(); |
| int col_idx = schema.find_column(req->columns(i).name()); |
| if (col_idx == Schema::kColumnNotFound) { |
| LOG(WARNING) << Substitute("$0: no column named '$1'", rejection_prefix, column_name); |
| is_authorized = false; |
| break; |
| } |
| EmplaceIfNotPresent(&required_column_privileges, schema.column_id(col_idx)); |
| } |
| for (const auto& required_col_id : required_column_privileges) { |
| if (!ContainsKey(authorized_column_ids, required_col_id)) { |
| LOG(WARNING) << Substitute("$0: authz token doesn't authorize column ID $1", |
| rejection_prefix, required_col_id); |
| is_authorized = false; |
| break; |
| } |
| } |
| if (!is_authorized) { |
| context->RespondRpcFailure(ErrorStatusPB::FATAL_UNAUTHORIZED, |
| Status::NotAuthorized("not authorized to SplitKeyRange")); |
| return; |
| } |
| } |
| } |
| |
| shared_ptr<Tablet> tablet; |
| TabletServerErrorPB::Code error_code; |
| Status s = GetTabletRef(replica, &tablet, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| |
| // Decode encoded key |
| Arena arena(256); |
| const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema(); |
| const Schema& tablet_schema = *tablet_schema_ptr; |
| EncodedKey* start = nullptr; |
| EncodedKey* stop = nullptr; |
| if (req->has_start_primary_key()) { |
| s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->start_primary_key(), &start); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Invalid SplitKeyRange start primary key"), |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| } |
| if (req->has_stop_primary_key()) { |
| s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->stop_primary_key(), &stop); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Invalid SplitKeyRange stop primary key"), |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| } |
| if (req->has_start_primary_key() && req->has_stop_primary_key()) { |
| // Validate the start key is less than the stop key, if they are both set |
| if (start->encoded_key() > stop->encoded_key()) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Invalid primary key range"), |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| } |
| |
| // Validate the column are valid |
| Schema schema; |
| s = ColumnPBsToSchema(req->columns(), &schema); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| s, |
| TabletServerErrorPB::INVALID_SCHEMA, |
| context); |
| return; |
| } |
| if (schema.has_column_ids()) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("User requests should not have Column IDs"), |
| TabletServerErrorPB::INVALID_SCHEMA, |
| context); |
| return; |
| } |
| |
| vector<ColumnId> column_ids; |
| for (const ColumnSchema& column : schema.columns()) { |
| int column_idx = tablet_schema.find_column(column.name()); |
| if (PREDICT_FALSE(column_idx == Schema::kColumnNotFound)) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument( |
| "Invalid SplitKeyRange column name", column.name()), |
| TabletServerErrorPB::INVALID_SCHEMA, |
| context); |
| return; |
| } |
| column_ids.emplace_back(tablet_schema.column_id(column_idx)); |
| } |
| |
| // Validate the target chunk size are valid |
| if (req->target_chunk_size_bytes() == 0) { |
| SetupErrorAndRespond(resp->mutable_error(), |
| Status::InvalidArgument("Invalid SplitKeyRange target chunk size"), |
| TabletServerErrorPB::UNKNOWN_ERROR, |
| context); |
| return; |
| } |
| |
| vector<KeyRange> ranges; |
| tablet->SplitKeyRange(start, stop, column_ids, req->target_chunk_size_bytes(), &ranges); |
| for (const auto& range : ranges) { |
| range.ToPB(resp->add_ranges()); |
| } |
| |
| context->RespondSuccess(); |
| } |
| |
| void TabletServiceImpl::Checksum(const ChecksumRequestPB* req, |
| ChecksumResponsePB* resp, |
| RpcContext* context) { |
| VLOG(1) << "Full request: " << SecureDebugString(*req); |
| |
| // Validate the request: user must pass a new_scan_request or |
| // a scanner ID, but not both. |
| if (PREDICT_FALSE(req->has_new_request() && |
| req->has_continue_request())) { |
| context->RespondFailure(Status::InvalidArgument( |
| "Must not pass both a scanner_id and new_scan_request")); |
| return; |
| } |
| |
| // Convert ChecksumRequestPB to a ScanRequestPB. |
| ScanRequestPB scan_req; |
| if (req->has_call_seq_id()) scan_req.set_call_seq_id(req->call_seq_id()); |
| if (req->has_batch_size_bytes()) scan_req.set_batch_size_bytes(req->batch_size_bytes()); |
| if (req->has_close_scanner()) scan_req.set_close_scanner(req->close_scanner()); |
| |
| ScanResultChecksummer collector; |
| bool has_more = false; |
| TabletServerErrorPB::Code error_code = TabletServerErrorPB::UNKNOWN_ERROR; |
| // TODO(KUDU-2870): the CLI tool doesn't currently fetch authz tokens when |
| // checksumming. Until it does, allow the super-user to avoid fine-grained |
| // privilege checking. |
| if (FLAGS_tserver_enforce_access_control && |
| !server_->IsFromSuperUser(context) && |
| req->has_new_request()) { |
| const NewScanRequestPB& new_req = req->new_request(); |
| TokenPB token; |
| if (!VerifyAuthzTokenOrRespond(server_->token_verifier(), req->new_request(), |
| context, &token)) { |
| return; |
| } |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp, |
| context, &replica)) { |
| return; |
| } |
| const auto& privilege = token.authz().table_privilege(); |
| if (!CheckMatchingTableIdOrRespond(privilege, replica->tablet_metadata()->table_id(), |
| "Checksum", context)) { |
| return; |
| } |
| unordered_set<ColumnId> authorized_column_ids; |
| if (!CheckMayHaveScanPrivilegesOrRespond(privilege, "Checksum", |
| &authorized_column_ids, context)) { |
| return; |
| } |
| // If the token doesn't have full scan privileges for the table, check |
| // for required privileges based on the checksum request. |
| if (!privilege.scan_privilege()) { |
| const SchemaPtr schema_ptr = replica->tablet_metadata()->schema(); |
| if (!CheckScanPrivilegesOrRespond(new_req, *schema_ptr, authorized_column_ids, |
| "Checksum", context)) { |
| return; |
| } |
| } |
| } |
| if (req->has_new_request()) { |
| if (!CheckTabletServerNotQuiescingOrRespond(server_, resp, context)) { |
| return; |
| } |
| const NewScanRequestPB& new_req = req->new_request(); |
| scan_req.mutable_new_scan_request()->CopyFrom(req->new_request()); |
| scoped_refptr<TabletReplica> replica; |
| if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp, |
| context, &replica)) { |
| return; |
| } |
| |
| string scanner_id; |
| Timestamp snap_timestamp; |
| Status s = HandleNewScanRequest(replica.get(), &scan_req, context, |
| &collector, &scanner_id, &snap_timestamp, &has_more, |
| &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| resp->set_scanner_id(scanner_id); |
| if (snap_timestamp != Timestamp::kInvalidTimestamp) { |
| resp->set_snap_timestamp(snap_timestamp.ToUint64()); |
| } |
| } else if (req->has_continue_request()) { |
| const ContinueChecksumRequestPB& continue_req = req->continue_request(); |
| collector.set_agg_checksum(continue_req.previous_checksum()); |
| scan_req.set_scanner_id(continue_req.scanner_id()); |
| Status s = HandleContinueScanRequest(&scan_req, context, &collector, &has_more, &error_code); |
| if (PREDICT_FALSE(!s.ok())) { |
| SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); |
| return; |
| } |
| } else { |
| context->RespondFailure(Status::InvalidArgument( |
| "Must pass either new_request or continue_request")); |
| return; |
| } |
| |
| resp->set_checksum(collector.agg_checksum()); |
| resp->set_has_more_results(has_more); |
| resp->set_rows_checksummed(collector.rows_checksummed()); |
| |
| SetResourceMetrics(context, collector.cpu_times(), resp->mutable_resource_metrics()); |
| context->RespondSuccess(); |
| } |
| |
| bool TabletServiceImpl::SupportsFeature(uint32_t feature) const { |
| switch (feature) { |
| case TabletServerFeatures::COLUMN_PREDICATES: |
| case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES: |
| case TabletServerFeatures::QUIESCING: |
| case TabletServerFeatures::BLOOM_FILTER_PREDICATE_V2: |
| case TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| void TabletServiceImpl::Shutdown() { |
| } |
| |
| // Extract a void* pointer suitable for use in a ColumnRangePredicate from the |
| // user-specified protobuf field. |
| // This validates that the pb_value has the correct length, copies the data into |
| // 'arena', and sets *result to point to it. |
| // Returns bad status if the user-specified value is the wrong length. |
| static Status ExtractPredicateValue(const ColumnSchema& schema, |
| const string& pb_value, |
| Arena* arena, |
| const void** result) { |
| // Copy the data from the protobuf into the Arena. |
| uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(pb_value.size())); |
| memcpy(data_copy, &pb_value[0], pb_value.size()); |
| |
| // If the type is of variable length, then we need to return a pointer to a Slice |
| // element pointing to the string. Otherwise, just verify that the provided |
| // value was the right size. |
| if (schema.type_info()->physical_type() == BINARY) { |
| *result = arena->NewObject<Slice>(data_copy, pb_value.size()); |
| } else { |
| // TODO: add test case for this invalid request |
| size_t expected_size = schema.type_info()->size(); |
| if (pb_value.size() != expected_size) { |
| return Status::InvalidArgument( |
| StringPrintf("Bad predicate on %s. Expected value size %zd, got %zd", |
| schema.ToString().c_str(), expected_size, pb_value.size())); |
| } |
| *result = data_copy; |
| } |
| |
| return Status::OK(); |
| } |
| |
| static Status DecodeEncodedKeyRange(const NewScanRequestPB& scan_pb, |
| const Schema& tablet_schema, |
| const SharedScanner& scanner, |
| ScanSpec* spec) { |
| EncodedKey* start = nullptr; // Arena allocated. |
| EncodedKey* stop = nullptr; // Arena allocated. |
| if (scan_pb.has_start_primary_key()) { |
| RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString( |
| tablet_schema, scanner->arena(), |
| scan_pb.start_primary_key(), &start), |
| "Invalid scan start key"); |
| } |
| |
| if (scan_pb.has_stop_primary_key()) { |
| RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString( |
| tablet_schema, scanner->arena(), |
| scan_pb.stop_primary_key(), &stop), |
| "Invalid scan stop key"); |
| } |
| |
| if (scan_pb.order_mode() == ORDERED && scan_pb.has_last_primary_key()) { |
| if (start) { |
| return Status::InvalidArgument("Cannot specify both a start key and a last key"); |
| } |
| // Set the start key to the last key from a previous scan result. |
| RETURN_NOT_OK_PREPEND(EncodedKey::DecodeEncodedString(tablet_schema, scanner->arena(), |
| scan_pb.last_primary_key(), &start), |
| "Failed to decode last primary key"); |
| // Increment the start key, so we don't return the last row again. |
| RETURN_NOT_OK_PREPEND(EncodedKey::IncrementEncodedKey(tablet_schema, &start, scanner->arena()), |
| "Failed to increment encoded last row key"); |
| } |
| |
| if (start) { |
| spec->SetLowerBoundKey(start); |
| } |
| if (stop) { |
| spec->SetExclusiveUpperBoundKey(stop); |
| } |
| |
| return Status::OK(); |
| } |
| |
| static Status SetupScanSpec(const NewScanRequestPB& scan_pb, |
| const Schema& tablet_schema, |
| const SharedScanner& scanner, |
| ScanSpec* spec) { |
| spec->set_cache_blocks(scan_pb.cache_blocks()); |
| |
| // First the column predicates. |
| for (const ColumnPredicatePB& pred_pb : scan_pb.column_predicates()) { |
| optional<ColumnPredicate> predicate; |
| RETURN_NOT_OK(ColumnPredicateFromPB(tablet_schema, scanner->arena(), pred_pb, &predicate)); |
| spec->AddPredicate(std::move(*predicate)); |
| } |
| |
| // Then the column range predicates. |
| // TODO: remove this once all clients have moved to ColumnPredicatePB and |
| // backwards compatibility can be broken. |
| for (const ColumnRangePredicatePB& pred_pb : scan_pb.deprecated_range_predicates()) { |
| if (!pred_pb.has_lower_bound() && !pred_pb.has_inclusive_upper_bound()) { |
| return Status::InvalidArgument( |
| string("Invalid predicate ") + SecureShortDebugString(pred_pb) + |
| ": has no lower or upper bound."); |
| } |
| optional<ColumnSchema> col; |
| RETURN_NOT_OK(ColumnSchemaFromPB(pred_pb.column(), &col)); |
| |
| const void* lower_bound = nullptr; |
| const void* upper_bound = nullptr; |
| if (pred_pb.has_lower_bound()) { |
| RETURN_NOT_OK(ExtractPredicateValue(*col, pred_pb.lower_bound(), |
| scanner->arena(), |
| &lower_bound)); |
| } |
| if (pred_pb.has_inclusive_upper_bound()) { |
| RETURN_NOT_OK(ExtractPredicateValue(*col, pred_pb.inclusive_upper_bound(), |
| scanner->arena(), |
| &upper_bound)); |
| } |
| |
| auto pred = ColumnPredicate::InclusiveRange(*col, lower_bound, upper_bound, scanner->arena()); |
| if (pred) { |
| VLOG(3) << Substitute("Parsed predicate $0 from $1", |
| pred->ToString(), SecureShortDebugString(scan_pb)); |
| spec->AddPredicate(*pred); |
| } |
| } |
| |
| // Then any encoded key range predicates. |
| RETURN_NOT_OK(DecodeEncodedKeyRange(scan_pb, tablet_schema, scanner, spec)); |
| |
| // If the scanner has a limit, set it now. |
| if (scan_pb.has_limit()) { |
| spec->set_limit(scan_pb.limit()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| namespace { |
| // Checks if 'timestamp' is before the tablet's AHM if this is a |
| // READ_AT_SNAPSHOT/READ_YOUR_WRITES scan. Returns Status::OK() if it's |
| // not or Status::InvalidArgument() if it is. |
| Status VerifyNotAncientHistory(Tablet* tablet, ReadMode read_mode, Timestamp timestamp, |
| const string& timestamp_desc) { |
| tablet::HistoryGcOpts history_gc_opts = tablet->GetHistoryGcOpts(); |
| if ((read_mode == READ_AT_SNAPSHOT || read_mode == READ_YOUR_WRITES) && |
| history_gc_opts.IsAncientHistory(timestamp)) { |
| return Status::InvalidArgument( |
| Substitute("$0 is earlier than the ancient history mark. Consider " |
| "increasing the value of the configuration parameter " |
| "--tablet_history_max_age_sec. Snapshot timestamp: $1 " |
| "Ancient History Mark: $2 Physical time difference: $3", |
| timestamp_desc, |
| tablet->clock()->Stringify(timestamp), |
| tablet->clock()->Stringify(history_gc_opts.ancient_history_mark()), |
| tablet->clock()->GetPhysicalComponentDifference( |
| timestamp, history_gc_opts.ancient_history_mark()).ToString())); |
| } |
| return Status::OK(); |
| } |
| |
| // Verify that the start (if specified) and end snapshot timestamps are legal |
| // to read by checking against the ancient history mark and ensuring that the |
| // start timestamp is earlier than the end timestamp. |
| Status VerifyLegalSnapshotTimestamps(Tablet* tablet, ReadMode read_mode, |
| const optional<Timestamp>& snap_start_timestamp, |
| const Timestamp& snap_end_timestamp) { |
| RETURN_NOT_OK(VerifyNotAncientHistory(tablet, read_mode, snap_end_timestamp, |
| "snapshot scan end timestamp")); |
| if (snap_start_timestamp) { |
| // Validate diff scan start timestamp, if set. |
| RETURN_NOT_OK(VerifyNotAncientHistory(tablet, read_mode, *snap_start_timestamp, |
| "snapshot scan start timestamp")); |
| if (snap_start_timestamp->CompareTo(snap_end_timestamp) > 0) { |
| return Status::InvalidArgument( |
| Substitute("start timestamp ($0) must be less than or equal to end timestamp ($1)", |
| tablet->clock()->Stringify(*snap_start_timestamp), |
| tablet->clock()->Stringify(snap_end_timestamp))); |
| } |
| } |
| return Status::OK(); |
| } |
| } // anonymous namespace |
| |
| // Start a new scan. |
| Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica, |
| const ScanRequestPB* req, |
| const RpcContext* rpc_context, |
| ScanResultCollector* result_collector, |
| string* scanner_id, |
| Timestamp* snap_timestamp, |
| bool* has_more_results, |
| TabletServerErrorPB::Code* error_code) { |
| DCHECK(result_collector != nullptr); |
| DCHECK(error_code != nullptr); |
| DCHECK(req->has_new_scan_request()); |
| const NewScanRequestPB& scan_pb = req->new_scan_request(); |
| TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest", |
| "tablet_id", scan_pb.tablet_id()); |
| |
| SharedScanner scanner; |
| server_->scanner_manager()->NewScanner(replica, |
| rpc_context->remote_user(), |
| scan_pb.row_format_flags(), |
| &scanner); |
| TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id()); |
| auto scanner_lock = scanner->LockForAccess(); |
| |
| // If we early-exit out of this function, automatically unregister |
| // the scanner. |
| ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id()); |
| ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times()); |
| |
| // Create the user's requested projection. |
| // TODO(todd): Add test cases for bad projections including 0 columns. |
| Schema projection; |
| Status s = ColumnPBsToSchema(scan_pb.projected_columns(), &projection); |
| if (PREDICT_FALSE(!s.ok())) { |
| *error_code = TabletServerErrorPB::INVALID_SCHEMA; |
| return s; |
| } |
| |
| if (projection.has_column_ids()) { |
| *error_code = TabletServerErrorPB::INVALID_SCHEMA; |
| return Status::InvalidArgument("User requests should not have Column IDs"); |
| } |
| |
| if (scan_pb.order_mode() == UNKNOWN_ORDER_MODE) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return Status::InvalidArgument("Unknown order mode specified"); |
| } |
| |
| if (scan_pb.order_mode() == ORDERED) { |
| // Ordered scans must be at a snapshot so that we perform a serializable read (which can be |
| // resumed). Otherwise, this would be read committed isolation, which is not resumable. |
| if (scan_pb.read_mode() != READ_AT_SNAPSHOT) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return Status::InvalidArgument("Cannot do an ordered scan that is not a snapshot read"); |
| } |
| } |
| |
| const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema(); |
| const Schema& tablet_schema = *tablet_schema_ptr; |
| |
| ScanSpec spec; |
| s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec); |
| if (PREDICT_FALSE(!s.ok())) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return s; |
| } |
| |
| VLOG(3) << "Before optimizing scan spec: " << spec.ToString(tablet_schema); |
| spec.PruneInlistValuesIfPossible(tablet_schema, |
| replica->tablet_metadata()->partition(), |
| replica->tablet_metadata()->partition_schema()); |
| spec.OptimizeScan(tablet_schema, scanner->arena(), true); |
| VLOG(3) << "After optimizing scan spec: " << spec.ToString(tablet_schema); |
| |
| // Missing columns will contain the columns that are not mentioned in the |
| // client projection but are actually needed for the scan, such as columns |
| // referred to by predicates. |
| // |
| // NOTE: We should build the missing column after optimizing scan which will |
| // remove unnecessary predicates. |
| vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection); |
| |
| // Build a new projection with the projection columns and the missing columns, |
| // annotating each column as a key column appropriately. |
| // |
| // Note: the projection is a consistent schema (i.e. no duplicate columns). |
| // However, it has some different semantics as compared to the tablet schema: |
| // - It might not contain all of the columns in the tablet. |
| // - It might contain extra columns not found in the tablet schema. Virtual |
| // columns are permitted, while others will cause the scan to fail later, |
| // when the tablet validates the projection. |
| // - It doesn't know which of its columns are key columns. That's fine for |
| // an UNORDERED scan, but we'll need to fix this for an ORDERED scan, which |
| // requires all key columns in tablet schema order. |
| SchemaBuilder projection_builder; |
| if (scan_pb.order_mode() == ORDERED) { |
| for (int i = 0; i < tablet_schema.num_key_columns(); i++) { |
| const auto& col = tablet_schema.column(i); |
| // CHECK_OK is safe because the tablet schema has no duplicate columns. |
| CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ true)); |
| } |
| for (int i = 0; i < projection.num_columns(); i++) { |
| const auto& col = projection.column(i); |
| // Any key columns in the projection will be ignored. |
| ignore_result(projection_builder.AddColumn(col, /* is_key= */ false)); |
| } |
| for (const ColumnSchema& col : missing_cols) { |
| // Any key columns in 'missing_cols' will be ignored. |
| ignore_result(projection_builder.AddColumn(col, /* is_key= */ false)); |
| } |
| } else { |
| projection_builder.Reset(projection); |
| for (const ColumnSchema& col : missing_cols) { |
| // CHECK_OK is safe because the builder's columns (from the projection) |
| // and the missing columns are disjoint sets. |
| // |
| // UNORDERED scans don't need to know which columns are part of the key. |
| CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ false)); |
| } |
| } |
| |
| // Store the client's specified projection, prior to adding any missing |
| // columns for predicates, etc. |
| unique_ptr<Schema> client_projection(new Schema(std::move(projection))); |
| projection = projection_builder.BuildWithoutIds(); |
| VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO); |
| |
| s = result_collector->InitSerializer(scan_pb.row_format_flags(), |
| projection, |
| *client_projection); |
| if (!s.ok()) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return s; |
| } |
| |
| if (spec.CanShortCircuit()) { |
| VLOG(1) << "short-circuiting without creating a server-side scanner."; |
| *has_more_results = false; |
| return Status::OK(); |
| } |
| |
| // It's important to keep the reference to the tablet for the case when the |
| // tablet replica's shutdown is run concurrently with the code below. |
| shared_ptr<Tablet> tablet; |
| RETURN_NOT_OK(GetTabletRef(replica, &tablet, error_code)); |
| |
| // Ensure the tablet has a valid clean time. |
| s = tablet->mvcc_manager()->CheckIsCleanTimeInitialized(); |
| if (!s.ok()) { |
| LOG(WARNING) << Substitute("Rejecting scan request for tablet $0: $1", |
| tablet->tablet_id(), s.ToString()); |
| // Return TABLET_NOT_RUNNING so the scan can be handled appropriately (fail |
| // over to another tablet server if fault-tolerant). |
| *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING; |
| return s; |
| } |
| |
| unique_ptr<RowwiseIterator> iter; |
| optional<Timestamp> snap_start_timestamp; |
| |
| { |
| TRACE("Creating iterator"); |
| TRACE_EVENT0("tserver", "Create iterator"); |
| |
| switch (scan_pb.read_mode()) { |
| case UNKNOWN_READ_MODE: { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return Status::NotSupported("Unknown read mode."); |
| } |
| case READ_LATEST: { |
| if (scan_pb.has_snap_start_timestamp()) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return Status::InvalidArgument("scan start timestamp is only supported " |
| "in READ_AT_SNAPSHOT read mode"); |
| } |
| s = tablet->NewRowIterator(projection, &iter); |
| break; |
| } |
| case READ_YOUR_WRITES: // Fallthrough intended |
| case READ_AT_SNAPSHOT: { |
| s = HandleScanAtSnapshot( |
| scan_pb, rpc_context, projection, tablet.get(), replica->time_manager(), |
| &iter, &snap_start_timestamp, snap_timestamp, error_code); |
| break; |
| } |
| } |
| TRACE("Iterator created"); |
| } |
| |
| // Make a copy of the optimized spec before it's passed to the iterator. |
| // This copy will be given to the Scanner so it can report its predicates to |
| // /scans. The copy is necessary because the original spec will be modified |
| // as its predicates are pushed into lower-level iterators. |
| unique_ptr<ScanSpec> orig_spec(new ScanSpec(spec)); |
| |
| if (PREDICT_TRUE(s.ok())) { |
| TRACE_EVENT0("tserver", "iter->Init"); |
| s = iter->Init(&spec); |
| if (PREDICT_FALSE(s.IsInvalidArgument())) { |
| // Tablet::Iterator::Init() returns InvalidArgument when an invalid |
| // projection is specified. |
| // TODO(todd): would be nice if we threaded these more specific |
| // error codes throughout Kudu. |
| *error_code = TabletServerErrorPB::MISMATCHED_SCHEMA; |
| return s; |
| } |
| } |
| |
| TRACE("Iterator init: $0", s.ToString()); |
| |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG(WARNING) << Substitute( |
| "error setting up scanner $0 with request: $1: $2", |
| scanner->id(), s.ToString(), SecureShortDebugString(*req)); |
| // If the replica has been stopped, e.g. due to disk failure, return |
| // TABLET_FAILED so the scan can be handled appropriately (fail over to |
| // another tablet server if fault-tolerant). |
| if (tablet->HasBeenStopped()) { |
| *error_code = TabletServerErrorPB::TABLET_FAILED; |
| } |
| return s; |
| } |
| |
| // If this is a snapshot scan and the user specified a specific timestamp to |
| // scan at, then check that we are not attempting to scan at a time earlier |
| // than the ancient history mark. Only perform this check if tablet history |
| // GC is enabled. |
| // |
| // TODO: This validation essentially prohibits scans with READ_AT_SNAPSHOT |
| // when history_max_age is set to zero. There is a tablet history GC related |
| // race when the history max age is set to very low, or zero. Imagine a case |
| // where a scan was started and READ_AT_SNAPSHOT was specified without |
| // specifying a snapshot timestamp, and --tablet_history_max_age_sec=0. The |
| // above code path will select the latest timestamp (under a lock) prior to |
| // calling RowIterator::Init(), which actually opens the blocks. That means |
| // that there is an opportunity in between those two calls for tablet history |
| // GC to kick in and delete some history. In fact, we may easily not actually |
| // end up with a valid snapshot in that case. It would be more correct to |
| // initialize the row iterator and then select the latest timestamp |
| // represented by those open files in that case. |
| // |
| // Now that we have initialized our row iterator at a snapshot, return an |
| // error if the snapshot timestamp was prior to the ancient history mark. |
| // We have to check after we open the iterator in order to avoid a TOCTOU |
| // error since it's possible that initializing the row iterator could race |
| // against the tablet history GC maintenance task. |
| RETURN_NOT_OK_EVAL(VerifyLegalSnapshotTimestamps(tablet.get(), scan_pb.read_mode(), |
| snap_start_timestamp, *snap_timestamp), |
| *error_code = TabletServerErrorPB::INVALID_SNAPSHOT); |
| |
| *has_more_results = iter->HasNext() && !scanner->has_fulfilled_limit(); |
| TRACE("has_more: $0", *has_more_results); |
| if (!*has_more_results) { |
| // If there are no more rows, we can short circuit some work and respond immediately. |
| VLOG(1) << "No more rows, short-circuiting out without creating a server-side scanner."; |
| return Status::OK(); |
| } |
| |
| scanner->Init(std::move(iter), std::move(orig_spec), std::move(client_projection)); |
| |
| // Stop the scanner timer because ContinueScanRequest starts its own timer. |
| scanner_timer.Stop(); |
| unreg_scanner.Cancel(); |
| *scanner_id = scanner->id(); |
| |
| VLOG(1) << "Started scanner " << scanner->id() << ": " << scanner->iter()->ToString(); |
| |
| if (GetMaxBatchSizeBytesHint(req) > 0) { |
| TRACE("Continuing scan request"); |
| // TODO(wdberkeley): Instead of copying the pb, instead split |
| // HandleContinueScanRequest and call the second half directly. Once that's |
| // done, remove the call to ScopedAddScannerTiming::Stop() above (and the |
| // method as it won't be used) and start the timing for continue requests |
| // from the first half that is no longer executed in this codepath. |
| ScanRequestPB continue_req(*req); |
| continue_req.set_scanner_id(scanner->id()); |
| scanner_lock.Unlock(); |
| return HandleContinueScanRequest( |
| &continue_req, rpc_context, result_collector, has_more_results, error_code); |
| } |
| |
| // Increment the scanner call sequence ID. HandleContinueScanRequest handles |
| // this in the non-empty scan case. |
| scanner->IncrementCallSeqId(); |
| return Status::OK(); |
| } |
| |
| // Continue an existing scan request. |
| Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req, |
| const RpcContext* rpc_context, |
| ScanResultCollector* result_collector, |
| bool* has_more_results, |
| TabletServerErrorPB::Code* error_code) { |
| DCHECK(req->has_scanner_id()); |
| TRACE_EVENT1("tserver", "TabletServiceImpl::HandleContinueScanRequest", |
| "scanner_id", req->scanner_id()); |
| |
| size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req); |
| |
| SharedScanner scanner; |
| TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR; |
| Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(), |
| rpc_context->remote_user().username(), |
| &code, |
| &scanner); |
| if (!s.ok()) { |
| if (s.IsNotFound() && batch_size_bytes == 0 && req->close_scanner()) { |
| // Silently ignore any request to close a non-existent scanner. |
| return Status::OK(); |
| } |
| LOG(INFO) << Substitute("Scan: $0: call sequence id=$1, remote=$2", |
| s.ToString(), req->call_seq_id(), rpc_context->requestor_string()); |
| *error_code = code; |
| return s; |
| } |
| // TODO(todd) consider TryLockForAccess and return ServiceUnavailable in the case that |
| // another thread is already using the scanner? This should be rare in real |
| // circumstances -- only relevant when a client performs some retries on timeout. |
| auto scanner_lock = scanner->LockForAccess(); |
| |
| if (PREDICT_FALSE(FLAGS_scanner_inject_service_unavailable_on_continue_scan)) { |
| return Status::ServiceUnavailable("Injecting service unavailable status on Scan due to " |
| "--scanner_inject_service_unavailable_on_continue_scan"); |
| } |
| |
| // If we early-exit out of this function, automatically unregister the scanner. |
| ScopedUnregisterScanner unreg_scanner(server_->scanner_manager(), scanner->id()); |
| ScopedAddScannerTiming scanner_timer(scanner.get(), result_collector->cpu_times()); |
| |
| VLOG(2) << "Found existing scanner " << scanner->id() << " for request: " |
| << SecureShortDebugString(*req); |
| TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id()); |
| |
| if (batch_size_bytes == 0 && req->close_scanner()) { |
| *has_more_results = false; |
| return Status::OK(); |
| } |
| |
| if (req->call_seq_id() != scanner->call_seq_id()) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_CALL_SEQ_ID; |
| if (!FLAGS_scanner_unregister_on_invalid_seq_id) { |
| unreg_scanner.Cancel(); |
| } |
| return Status::InvalidArgument("Invalid call sequence ID in scan request"); |
| } |
| scanner->IncrementCallSeqId(); |
| |
| RowwiseIterator* iter = scanner->iter(); |
| |
| // Set the row format flags on the ScanResultCollector. |
| s = result_collector->InitSerializer(scanner->row_format_flags(), |
| iter->schema(), |
| *scanner->client_projection_schema()); |
| if (!s.ok()) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return s; |
| } |
| |
| // TODO(todd): could size the RowBlock based on the user's requested batch size? |
| // If people had really large indirect objects, we would currently overshoot |
| // their requested batch size by a lot. |
| RowBlockMemory mem(32 * 1024); |
| RowBlock block(&iter->schema(), FLAGS_scanner_batch_size_rows, &mem); |
| |
| // TODO(todd): in the future, use the client timeout to set a budget. For now, |
| // just use a half second, which should be plenty to amortize call overhead. |
| int budget_ms = 500; |
| MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(budget_ms); |
| |
| int64_t rows_scanned = 0; |
| while (iter->HasNext() && !scanner->has_fulfilled_limit()) { |
| if (PREDICT_FALSE(FLAGS_scanner_inject_latency_on_each_batch_ms > 0)) { |
| SleepFor(MonoDelta::FromMilliseconds(FLAGS_scanner_inject_latency_on_each_batch_ms)); |
| } |
| |
| Status s = iter->NextBlock(&block); |
| if (PREDICT_FALSE(!s.ok())) { |
| LOG(WARNING) << "Copying rows from internal iterator for request " |
| << SecureShortDebugString(*req); |
| *error_code = TabletServerErrorPB::UNKNOWN_ERROR; |
| return s; |
| } |
| |
| if (PREDICT_TRUE(block.nrows() > 0)) { |
| // Count the number of rows scanned, regardless of predicates or deletions. |
| // The collector will separately count the number of rows actually returned to |
| // the client. |
| rows_scanned += block.nrows(); |
| if (scanner->spec().has_limit()) { |
| int64_t rows_left = scanner->spec().limit() - scanner->num_rows_returned(); |
| DCHECK_GT(rows_left, 0); // Guaranteed by has_fulfilled_limit() |
| block.selection_vector()->ClearToSelectAtMost(static_cast<size_t>(rows_left)); |
| } |
| result_collector->HandleRowBlock(scanner.get(), block); |
| } |
| |
| int64_t response_size = result_collector->ResponseSize(); |
| |
| if (VLOG_IS_ON(2)) { |
| // This may be fairly expensive if row block size is small |
| TRACE("Copied block (nrows=$0), new size=$1", block.nrows(), response_size); |
| } |
| |
| // TODO: should check if RPC got cancelled, once we implement RPC cancellation. |
| if (PREDICT_FALSE(MonoTime::Now() >= deadline)) { |
| TRACE("Deadline expired - responding early"); |
| break; |
| } |
| |
| if (response_size >= batch_size_bytes) { |
| break; |
| } |
| } |
| |
| scoped_refptr<TabletReplica> replica = scanner->tablet_replica(); |
| shared_ptr<Tablet> tablet; |
| TabletServerErrorPB::Code tablet_ref_error_code; |
| s = GetTabletRef(replica, &tablet, &tablet_ref_error_code); |
| // If the tablet is not running, but the scan operation in progress |
| // has reached this point, the tablet server has the necessary data to |
| // send in response for the scan continuation request. |
| if (PREDICT_FALSE(!s.ok() && tablet_ref_error_code != |
| TabletServerErrorPB::TABLET_NOT_RUNNING)) { |
| *error_code = tablet_ref_error_code; |
| return s; |
| } |
| |
| // Calculate the number of rows/cells/bytes actually processed. |
| IteratorStats delta_stats = scanner->UpdateStatsAndGetDelta(); |
| TRACE_COUNTER_INCREMENT(SCANNER_BYTES_READ_METRIC_NAME, delta_stats.bytes_read); |
| |
| // Update metrics based on this scan request. |
| if (tablet) { |
| // The number of rows/cells/bytes actually returned to the user. |
| tablet->metrics()->scanner_rows_returned->IncrementBy( |
| result_collector->NumRowsReturned()); |
| tablet->metrics()->scanner_cells_returned->IncrementBy( |
| result_collector->NumRowsReturned() * |
| scanner->client_projection_schema()->num_columns()); |
| tablet->metrics()->scanner_bytes_returned->IncrementBy( |
| result_collector->ResponseSize()); |
| |
| // The number of rows/cells/bytes actually processed. |
| tablet->metrics()->scanner_rows_scanned->IncrementBy(rows_scanned); |
| tablet->metrics()->scanner_cells_scanned_from_disk->IncrementBy(delta_stats.cells_read); |
| tablet->metrics()->scanner_bytes_scanned_from_disk->IncrementBy(delta_stats.bytes_read); |
| tablet->metrics()->scanner_predicates_disabled->IncrementBy(delta_stats.predicates_disabled); |
| |
| // Last read timestamp. |
| tablet->UpdateLastReadTime(); |
| } |
| |
| *has_more_results = !req->close_scanner() && iter->HasNext() && |
| !scanner->has_fulfilled_limit(); |
| if (*has_more_results) { |
| unreg_scanner.Cancel(); |
| } else { |
| VLOG(2) << "Scanner " << scanner->id() << " complete: removing..."; |
| } |
| |
| return Status::OK(); |
| } |
| |
| namespace { |
| // Helper to clamp a client deadline for a scan to the max supported by the server. |
| MonoTime ClampScanDeadlineForWait(const MonoTime& deadline, bool* was_clamped) { |
| const MonoTime now = MonoTime::Now(); |
| if ((deadline - now).ToMilliseconds() > FLAGS_scanner_max_wait_ms) { |
| *was_clamped = true; |
| return now + MonoDelta::FromMilliseconds(FLAGS_scanner_max_wait_ms); |
| } |
| *was_clamped = false; |
| return deadline; |
| } |
| } // anonymous namespace |
| |
| Status TabletServiceImpl::HandleScanAtSnapshot(const NewScanRequestPB& scan_pb, |
| const RpcContext* rpc_context, |
| const Schema& projection, |
| Tablet* tablet, |
| TimeManager* time_manager, |
| unique_ptr<RowwiseIterator>* iter, |
| optional<Timestamp>* snap_start_timestamp, |
| Timestamp* snap_timestamp, |
| TabletServerErrorPB::Code* error_code) { |
| const auto read_mode = scan_pb.read_mode(); |
| switch (read_mode) { |
| case READ_AT_SNAPSHOT: // Fallthrough intended |
| case READ_YOUR_WRITES: |
| break; |
| default: |
| LOG(FATAL) << Substitute("$0: unsupported snapshot scan mode", read_mode); |
| } |
| |
| // Validate other input parameters as well in the very beginning. |
| if (scan_pb.has_snap_start_timestamp()) { |
| if (read_mode != READ_AT_SNAPSHOT) { |
| // TODO(mpercy): Should we allow READ_YOUR_WRITES mode? There is no |
| // obvious use for it, but also no obvious reason not to support it, |
| // except for the fact that we would also have to test it. |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return Status::InvalidArgument("scan start timestamp is only supported " |
| "in READ_AT_SNAPSHOT read mode"); |
| } |
| if (scan_pb.order_mode() != ORDERED) { |
| *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC; |
| return Status::InvalidArgument("scan start timestamp is only supported " |
| "in ORDERED order mode"); |
| } |
| } |
| |
| // Based on the read mode, pick a timestamp and verify it. |
| Timestamp tmp_snap_timestamp; |
| Status s = PickAndVerifyTimestamp(scan_pb, tablet, &tmp_snap_timestamp); |
| if (PREDICT_FALSE(!s.ok())) { |
| *error_code = TabletServerErrorPB::INVALID_SNAPSHOT; |
| return s.CloneAndPrepend("cannot verify timestamp"); |
| } |
| |
| // Reduce the client's deadline by a few msecs to allow for overhead. |
| const MonoTime client_deadline = |
| rpc_context->GetClientDeadline() - MonoDelta::FromMilliseconds(10); |
| |
| // Its not good for the tablet server or for the client if we hang here forever. The tablet |
| // server will have one less available thread and the client might be stuck spending all |
| // of the allotted time for the scan on a partitioned server that will never have a consistent |
| // snapshot at 'snap_timestamp'. |
| // Because of this we clamp the client's deadline to the maximum configured |
| // scanner wait time. If the client sets a longer timeout then it can use it |
| // by retrying (possibly on other servers). |
| bool was_clamped = false; |
| MonoTime final_deadline = ClampScanDeadlineForWait(client_deadline, &was_clamped); |
| |
| // Wait for the tablet to know that 'snap_timestamp' is safe. I.e. that all operations |
| // that came before it are, at least, started. This, together with waiting for the mvcc |
| // snapshot to be clean below, allows us to always return the same data when scanning at |
| // the same timestamp (repeatable reads). |
| TRACE("Waiting safe time to advance"); |
| const MonoTime before = MonoTime::Now(); |
| s = time_manager->WaitUntilSafe(tmp_snap_timestamp, final_deadline); |
| |
| MvccSnapshot snap; |
| if (PREDICT_TRUE(s.ok())) { |
| // Wait for the in-flights in the snapshot to be finished. |
| TRACE("Waiting for operations to commit"); |
| s = tablet->mvcc_manager()->WaitForSnapshotWithAllApplied( |
| tmp_snap_timestamp, &snap, client_deadline); |
| } |
| |
| // If we got an TimeOut but we had clamped the deadline, return |
| // ServiceUnavailable to make the client retry the scan operation soon. |
| if (s.IsTimedOut() && was_clamped) { |
| *error_code = TabletServerErrorPB::THROTTLED; |
| return Status::ServiceUnavailable(s.CloneAndPrepend( |
| "could not wait for desired snapshot timestamp to be consistent").ToString()); |
| } |
| RETURN_NOT_OK(s); |
| |
| const auto duration_usec = (MonoTime::Now() - before).ToMicroseconds(); |
| tablet->metrics()->snapshot_read_inflight_wait_duration->Increment(duration_usec); |
| TRACE("All operations in snapshot committed. Waited for $0 microseconds", duration_usec); |
| |
| tablet::RowIteratorOptions opts; |
| opts.projection = &projection; |
| opts.snap_to_include = snap; |
| opts.order = scan_pb.order_mode(); |
| |
| optional<Timestamp> tmp_snap_start_timestamp; |
| if (scan_pb.has_snap_start_timestamp()) { |
| tmp_snap_start_timestamp = Timestamp(scan_pb.snap_start_timestamp()); |
| opts.snap_to_exclude = MvccSnapshot(*tmp_snap_start_timestamp); |
| opts.include_deleted_rows = true; |
| } |
| |
| // Before we open / wait on anything check that the timestamp(s) are after |
| // the AHM. This is not the final check. We'll check this again after the |
| // iterators are open but there is no point in doing the work to initialize |
| // the iterators and spending the time to wait for a snapshot timestamp to be |
| // readable when we can't read back to one of the requested timestamps. |
| RETURN_NOT_OK_EVAL(VerifyLegalSnapshotTimestamps(tablet, read_mode, |
| tmp_snap_start_timestamp, |
| tmp_snap_timestamp), |
| *error_code = TabletServerErrorPB::INVALID_SNAPSHOT); |
| |
| RETURN_NOT_OK(tablet->NewRowIterator(std::move(opts), iter)); |
| |
| // Return the picked snapshot timestamp for both READ_AT_SNAPSHOT |
| // and READ_YOUR_WRITES mode, as well as the parsed start timestamp for |
| // READ_AT_SNAPSHOT mode, if specified. |
| *snap_start_timestamp = std::move(tmp_snap_start_timestamp); |
| *snap_timestamp = tmp_snap_timestamp; |
| return Status::OK(); |
| } |
| |
| Status TabletServiceImpl::ValidateTimestamp(const Timestamp& snap_timestamp) { |
| Timestamp max_allowed_ts; |
| Status s = server_->clock()->GetGlobalLatest(&max_allowed_ts); |
| if (PREDICT_FALSE(s.IsNotSupported()) && |
| PREDICT_TRUE(!FLAGS_scanner_allow_snapshot_scans_with_logical_timestamps)) { |
| return Status::NotSupported("Snapshot scans not supported on this server", |
| s.ToString()); |
| } |
| |
| // Note: if 'max_allowed_ts' is not obtained from clock_->GetGlobalLatest(), e.g., |
| // in case logical clock is used, it's guaranteed to be higher than 'tmp_snap_timestamp', |
| // since 'max_allowed_ts' is default-constructed to kInvalidTimestamp (MAX_LONG - 1). |
| if (snap_timestamp > max_allowed_ts) { |
| return Status::InvalidArgument( |
| Substitute("Snapshot time $0 in the future. Max allowed timestamp is $1", |
| server_->clock()->Stringify(snap_timestamp), |
| server_->clock()->Stringify(max_allowed_ts))); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status TabletServiceImpl::PickAndVerifyTimestamp(const NewScanRequestPB& scan_pb, |
| Tablet* tablet, |
| Timestamp* snap_timestamp) { |
| // If the client sent a timestamp update our clock with it. |
| if (scan_pb.has_propagated_timestamp()) { |
| Timestamp propagated_timestamp(scan_pb.propagated_timestamp()); |
| |
| // Update the clock so that we never generate snapshots lower than |
| // 'propagated_timestamp'. If 'propagated_timestamp' is lower than |
| // 'now' this call has no effect. If 'propagated_timestamp' is too far |
| // into the future this will fail and we abort. |
| RETURN_NOT_OK(server_->clock()->Update(propagated_timestamp)); |
| } |
| |
| Timestamp tmp_snap_timestamp; |
| ReadMode read_mode = scan_pb.read_mode(); |
| |
| if (read_mode == READ_AT_SNAPSHOT) { |
| // For READ_AT_SNAPSHOT mode, |
| // 1) if the client provided no snapshot timestamp we take the current |
| // clock time as the snapshot timestamp. |
| // 2) else we use the client provided one, but make sure it is not too |
| // far in the future as to be invalid. |
| if (!scan_pb.has_snap_timestamp()) { |
| tmp_snap_timestamp = server_->clock()->Now(); |
| } else { |
| tmp_snap_timestamp.FromUint64(scan_pb.snap_timestamp()); |
| RETURN_NOT_OK(ValidateTimestamp(tmp_snap_timestamp)); |
| } |
| } else { |
| // For READ_YOUR_WRITES mode, we use the following to choose a |
| // snapshot timestamp: MAX(propagated timestamp + 1, 'clean' timestamp). |
| // There is no need to validate if the chosen timestamp is too far in |
| // the future, since: |
| // 1) MVCC 'clean' timestamp is by definition in the past (it's maximally |
| // bounded by safe time). |
| // 2) the propagated timestamp was used to update the clock above and the |
| // update would have returned an error if the the timestamp was too |
| // far in the future. |
| uint64_t clean_timestamp = |
| tablet->mvcc_manager()->GetCleanTimestamp().ToUint64(); |
| uint64_t propagated_timestamp = scan_pb.has_propagated_timestamp() |
| ? scan_pb.propagated_timestamp() |
| : Timestamp::kMin.ToUint64(); |
| tmp_snap_timestamp = Timestamp(std::max(propagated_timestamp + 1, |
| clean_timestamp)); |
| } |
| *snap_timestamp = tmp_snap_timestamp; |
| return Status::OK(); |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |