| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/client/scanner-internal.h" |
| |
| #include <algorithm> |
| #include <boost/bind.hpp> |
| #include <cmath> |
| #include <string> |
| #include <vector> |
| |
| #include "kudu/client/client-internal.h" |
| #include "kudu/client/meta_cache.h" |
| #include "kudu/client/row_result.h" |
| #include "kudu/client/table-internal.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/util/hexdump.h" |
| |
| using std::set; |
| using std::string; |
| |
| namespace kudu { |
| |
| using kudu::ColumnPredicatePB; |
| using rpc::RpcController; |
| using strings::Substitute; |
| using strings::SubstituteAndAppend; |
| using tserver::NewScanRequestPB; |
| using tserver::TabletServerFeatures; |
| |
| namespace client { |
| |
| using internal::RemoteTabletServer; |
| |
| static const int64_t kNoTimestamp = -1; |
| |
| KuduScanner::Data::Data(KuduTable* table) |
| : open_(false), |
| data_in_open_(false), |
| has_batch_size_bytes_(false), |
| batch_size_bytes_(0), |
| selection_(KuduClient::CLOSEST_REPLICA), |
| read_mode_(READ_LATEST), |
| is_fault_tolerant_(false), |
| snapshot_timestamp_(kNoTimestamp), |
| short_circuit_(false), |
| table_(DCHECK_NOTNULL(table)), |
| arena_(1024, 1024*1024), |
| timeout_(MonoDelta::FromMilliseconds(kScanTimeoutMillis)), |
| scan_attempts_(0) { |
| SetProjectionSchema(table->schema().schema_); |
| } |
| |
| KuduScanner::Data::~Data() { |
| } |
| |
| namespace { |
| void CopyPredicateBound(const ColumnSchema& col, |
| const void* bound_src, |
| string* bound_dst) { |
| const void* src; |
| size_t size; |
| if (col.type_info()->physical_type() == BINARY) { |
| // Copying a string involves an extra level of indirection through its |
| // owning slice. |
| const Slice* s = reinterpret_cast<const Slice*>(bound_src); |
| src = s->data(); |
| size = s->size(); |
| } else { |
| src = bound_src; |
| size = col.type_info()->size(); |
| } |
| bound_dst->assign(reinterpret_cast<const char*>(src), size); |
| } |
| |
| void ColumnPredicateIntoPB(const ColumnPredicate& predicate, |
| ColumnPredicatePB* pb) { |
| pb->set_column(predicate.column().name()); |
| switch (predicate.predicate_type()) { |
| case PredicateType::Equality: { |
| CopyPredicateBound(predicate.column(), |
| predicate.raw_lower(), |
| pb->mutable_equality()->mutable_value()); |
| return; |
| }; |
| case PredicateType::Range: { |
| auto range_pred = pb->mutable_range(); |
| if (predicate.raw_lower() != nullptr) { |
| CopyPredicateBound(predicate.column(), |
| predicate.raw_lower(), |
| range_pred->mutable_lower()); |
| } |
| if (predicate.raw_upper() != nullptr) { |
| CopyPredicateBound(predicate.column(), |
| predicate.raw_upper(), |
| range_pred->mutable_upper()); |
| } |
| return; |
| }; |
| case PredicateType::IsNotNull: { |
| pb->mutable_is_not_null(); |
| return; |
| }; |
| case PredicateType::None: LOG(FATAL) << "None predicate may not be converted to protobuf"; |
| } |
| LOG(FATAL) << "unknown predicate type"; |
| } |
| } // anonymous namespace |
| |
| Status KuduScanner::Data::HandleError(const ScanRpcStatus& err, |
| const MonoTime& deadline, |
| set<string>* blacklist) { |
| // If we timed out because of the overall deadline, we're done. |
| // We didn't wait a full RPC timeout, though, so don't mark the tserver as failed. |
| if (err.result == ScanRpcStatus::OVERALL_DEADLINE_EXCEEDED) { |
| LOG(INFO) << "Scan of tablet " << remote_->tablet_id() << " at " |
| << ts_->ToString() << " deadline expired."; |
| return last_error_.ok() |
| ? err.status : err.status.CloneAndAppend(last_error_.ToString()); |
| } |
| |
| UpdateLastError(err.status); |
| |
| bool mark_ts_failed = false; |
| bool blacklist_location = false; |
| bool mark_locations_stale = false; |
| bool can_retry = true; |
| bool backoff = false; |
| switch (err.result) { |
| case ScanRpcStatus::SERVER_BUSY: |
| backoff = true; |
| break; |
| case ScanRpcStatus::RPC_DEADLINE_EXCEEDED: |
| case ScanRpcStatus::RPC_ERROR: |
| blacklist_location = true; |
| mark_ts_failed = true; |
| break; |
| case ScanRpcStatus::SCANNER_EXPIRED: |
| break; |
| case ScanRpcStatus::TABLET_NOT_RUNNING: |
| blacklist_location = true; |
| break; |
| case ScanRpcStatus::TABLET_NOT_FOUND: |
| // There was either a tablet configuration change or the table was |
| // deleted, since at the time of this writing we don't support splits. |
| // Force a re-fetch of the tablet metadata. |
| mark_locations_stale = true; |
| blacklist_location = true; |
| break; |
| default: |
| can_retry = false; |
| break; |
| } |
| |
| if (mark_ts_failed) { |
| table_->client()->data_->meta_cache_->MarkTSFailed(ts_, err.status); |
| DCHECK(blacklist_location); |
| } |
| |
| if (blacklist_location) { |
| blacklist->insert(ts_->permanent_uuid()); |
| } |
| |
| if (mark_locations_stale) { |
| remote_->MarkStale(); |
| } |
| |
| if (backoff) { |
| // Exponential backoff with jitter anchored between 10ms and 20ms, and an |
| // upper bound between 2.5s and 5s. |
| MonoDelta sleep = MonoDelta::FromMilliseconds( |
| (10 + rand() % 10) * static_cast<int>(std::pow(2.0, std::min(8, scan_attempts_ - 1)))); |
| MonoTime now = MonoTime::Now(MonoTime::FINE); |
| now.AddDelta(sleep); |
| if (deadline.ComesBefore(now)) { |
| Status ret = Status::TimedOut("unable to retry before timeout", |
| err.status.ToString()); |
| return last_error_.ok() ? |
| ret : ret.CloneAndAppend(last_error_.ToString()); |
| } |
| LOG(INFO) << "Error scanning on server " << ts_->ToString() << ": " |
| << err.status.ToString() << ". Will retry after " |
| << sleep.ToString() << "; attempt " << scan_attempts_; |
| SleepFor(sleep); |
| } |
| if (can_retry) { |
| return Status::OK(); |
| } |
| return err.status; |
| } |
| |
| ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const Status& rpc_status, |
| const MonoTime& overall_deadline, |
| const MonoTime& deadline) { |
| if (rpc_status.ok() && !last_response_.has_error()) { |
| return ScanRpcStatus{ScanRpcStatus::OK, Status::OK()}; |
| } |
| |
| // Check for various RPC-level errors. |
| if (!rpc_status.ok()) { |
| // Handle various RPC-system level errors that came back from the server. These |
| // errors indicate that the TS is actually up. |
| if (rpc_status.IsRemoteError()) { |
| DCHECK(controller_.error_response()); |
| switch (controller_.error_response()->code()) { |
| case rpc::ErrorStatusPB::ERROR_INVALID_REQUEST: |
| return ScanRpcStatus{ScanRpcStatus::INVALID_REQUEST, rpc_status}; |
| case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY: |
| return ScanRpcStatus{ScanRpcStatus::SERVER_BUSY, rpc_status}; |
| default: |
| return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status}; |
| } |
| } |
| |
| if (rpc_status.IsTimedOut()) { |
| if (overall_deadline.Equals(deadline)) { |
| return ScanRpcStatus{ScanRpcStatus::OVERALL_DEADLINE_EXCEEDED, rpc_status}; |
| } else { |
| return ScanRpcStatus{ScanRpcStatus::RPC_DEADLINE_EXCEEDED, rpc_status}; |
| } |
| } |
| return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status}; |
| } |
| |
| // If we got this far, it indicates that the tserver service actually handled the |
| // call, but it was an error for some reason. |
| Status server_status = StatusFromPB(last_response_.error().status()); |
| DCHECK(!server_status.ok()); |
| const tserver::TabletServerErrorPB& error = last_response_.error(); |
| switch (error.code()) { |
| case tserver::TabletServerErrorPB::SCANNER_EXPIRED: |
| return ScanRpcStatus{ScanRpcStatus::SCANNER_EXPIRED, server_status}; |
| case tserver::TabletServerErrorPB::TABLET_NOT_RUNNING: |
| return ScanRpcStatus{ScanRpcStatus::TABLET_NOT_RUNNING, server_status}; |
| case tserver::TabletServerErrorPB::TABLET_NOT_FOUND: |
| return ScanRpcStatus{ScanRpcStatus::TABLET_NOT_FOUND, server_status}; |
| default: |
| return ScanRpcStatus{ScanRpcStatus::OTHER_TS_ERROR, server_status}; |
| } |
| } |
| |
| Status KuduScanner::Data::OpenNextTablet(const MonoTime& deadline, |
| std::set<std::string>* blacklist) { |
| return OpenTablet(partition_pruner_.NextPartitionKey(), |
| deadline, |
| blacklist); |
| } |
| |
| Status KuduScanner::Data::ReopenCurrentTablet(const MonoTime& deadline, |
| std::set<std::string>* blacklist) { |
| return OpenTablet(remote_->partition().partition_key_start(), |
| deadline, |
| blacklist); |
| } |
| |
| ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline, |
| bool allow_time_for_failover) { |
| // The user has specified a timeout which should apply to the total time for each call |
| // to NextBatch(). However, for fault-tolerant scans, or for when we are first opening |
| // a scanner, it's preferable to set a shorter timeout (the "default RPC timeout") for |
| // each individual RPC call. This gives us time to fail over to a different server |
| // if the first server we try happens to be hung. |
| MonoTime rpc_deadline; |
| if (allow_time_for_failover) { |
| rpc_deadline = MonoTime::Now(MonoTime::FINE); |
| rpc_deadline.AddDelta(table_->client()->default_rpc_timeout()); |
| rpc_deadline = MonoTime::Earliest(overall_deadline, rpc_deadline); |
| } else { |
| rpc_deadline = overall_deadline; |
| } |
| |
| controller_.Reset(); |
| controller_.set_deadline(rpc_deadline); |
| if (!spec_.predicates().empty()) { |
| controller_.RequireServerFeature(TabletServerFeatures::COLUMN_PREDICATES); |
| } |
| return AnalyzeResponse( |
| proxy_->Scan(next_req_, |
| &last_response_, |
| &controller_), |
| rpc_deadline, overall_deadline); |
| } |
| |
| Status KuduScanner::Data::OpenTablet(const string& partition_key, |
| const MonoTime& deadline, |
| set<string>* blacklist) { |
| |
| PrepareRequest(KuduScanner::Data::NEW); |
| next_req_.clear_scanner_id(); |
| NewScanRequestPB* scan = next_req_.mutable_new_scan_request(); |
| switch (read_mode_) { |
| case READ_LATEST: scan->set_read_mode(kudu::READ_LATEST); break; |
| case READ_AT_SNAPSHOT: scan->set_read_mode(kudu::READ_AT_SNAPSHOT); break; |
| default: LOG(FATAL) << "Unexpected read mode."; |
| } |
| |
| if (is_fault_tolerant_) { |
| scan->set_order_mode(kudu::ORDERED); |
| } else { |
| scan->set_order_mode(kudu::UNORDERED); |
| } |
| |
| if (last_primary_key_.length() > 0) { |
| VLOG(1) << "Setting NewScanRequestPB last_primary_key to hex value " |
| << HexDump(last_primary_key_); |
| scan->set_last_primary_key(last_primary_key_); |
| } |
| |
| scan->set_cache_blocks(spec_.cache_blocks()); |
| |
| if (snapshot_timestamp_ != kNoTimestamp) { |
| if (PREDICT_FALSE(read_mode_ != READ_AT_SNAPSHOT)) { |
| LOG(WARNING) << "Scan snapshot timestamp set but read mode was READ_LATEST." |
| " Ignoring timestamp."; |
| } else { |
| scan->set_snap_timestamp(snapshot_timestamp_); |
| } |
| } |
| |
| // Set up the predicates. |
| scan->clear_column_predicates(); |
| for (const auto& col_pred : spec_.predicates()) { |
| ColumnPredicateIntoPB(col_pred.second, scan->add_column_predicates()); |
| } |
| |
| if (spec_.lower_bound_key()) { |
| scan->mutable_start_primary_key()->assign( |
| reinterpret_cast<const char*>(spec_.lower_bound_key()->encoded_key().data()), |
| spec_.lower_bound_key()->encoded_key().size()); |
| } else { |
| scan->clear_start_primary_key(); |
| } |
| if (spec_.exclusive_upper_bound_key()) { |
| scan->mutable_stop_primary_key()->assign( |
| reinterpret_cast<const char*>(spec_.exclusive_upper_bound_key()->encoded_key().data()), |
| spec_.exclusive_upper_bound_key()->encoded_key().size()); |
| } else { |
| scan->clear_stop_primary_key(); |
| } |
| RETURN_NOT_OK(SchemaToColumnPBs(*projection_, scan->mutable_projected_columns(), |
| SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS)); |
| |
| for (int attempt = 1;; attempt++) { |
| Synchronizer sync; |
| table_->client()->data_->meta_cache_->LookupTabletByKey(table_, |
| partition_key, |
| deadline, |
| &remote_, |
| sync.AsStatusCallback()); |
| RETURN_NOT_OK(sync.Wait()); |
| |
| scan->set_tablet_id(remote_->tablet_id()); |
| |
| RemoteTabletServer *ts; |
| vector<RemoteTabletServer*> candidates; |
| Status lookup_status = table_->client()->data_->GetTabletServer( |
| table_->client(), |
| remote_, |
| selection_, |
| *blacklist, |
| &candidates, |
| &ts); |
| // If we get ServiceUnavailable, this indicates that the tablet doesn't |
| // currently have any known leader. We should sleep and retry, since |
| // it's likely that the tablet is undergoing a leader election and will |
| // soon have one. |
| if (lookup_status.IsServiceUnavailable() && |
| MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) { |
| |
| // ServiceUnavailable means that we have already blacklisted all of the candidate |
| // tablet servers. So, we clear the list so that we will cycle through them all |
| // another time. |
| blacklist->clear(); |
| int sleep_ms = attempt * 100; |
| // TODO: should ensure that sleep_ms does not pass the provided deadline. |
| VLOG(1) << "Tablet " << remote_->tablet_id() << " current unavailable: " |
| << lookup_status.ToString() << ". Sleeping for " << sleep_ms << "ms " |
| << "and retrying..."; |
| SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
| continue; |
| } |
| RETURN_NOT_OK(lookup_status); |
| CHECK(ts->proxy()); |
| ts_ = CHECK_NOTNULL(ts); |
| proxy_ = ts_->proxy(); |
| |
| bool allow_time_for_failover = static_cast<int>(candidates.size()) - blacklist->size() > 1; |
| ScanRpcStatus scan_status = SendScanRpc(deadline, allow_time_for_failover); |
| if (scan_status.result == ScanRpcStatus::OK) { |
| last_error_ = Status::OK(); |
| scan_attempts_ = 0; |
| break; |
| } |
| scan_attempts_++; |
| RETURN_NOT_OK(HandleError(scan_status, deadline, blacklist)); |
| } |
| |
| partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end()); |
| |
| next_req_.clear_new_scan_request(); |
| data_in_open_ = last_response_.has_data(); |
| if (last_response_.has_more_results()) { |
| next_req_.set_scanner_id(last_response_.scanner_id()); |
| VLOG(1) << "Opened tablet " << remote_->tablet_id() |
| << ", scanner ID " << last_response_.scanner_id(); |
| } else if (last_response_.has_data()) { |
| VLOG(1) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned"; |
| } else { |
| VLOG(1) << "Opened tablet " << remote_->tablet_id() << " (no rows), no scanner ID assigned"; |
| } |
| |
| // If present in the response, set the snapshot timestamp and the encoded last |
| // primary key. This is used when retrying the scan elsewhere. The last |
| // primary key is also updated on each scan response. |
| if (is_fault_tolerant_) { |
| CHECK(last_response_.has_snap_timestamp()); |
| snapshot_timestamp_ = last_response_.snap_timestamp(); |
| if (last_response_.has_last_primary_key()) { |
| last_primary_key_ = last_response_.last_primary_key(); |
| } |
| } |
| |
| if (last_response_.has_snap_timestamp()) { |
| table_->client()->data_->UpdateLatestObservedTimestamp(last_response_.snap_timestamp()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::Data::KeepAlive() { |
| if (!open_) return Status::IllegalState("Scanner was not open."); |
| // If there is no scanner to keep alive, we still return Status::OK(). |
| if (!last_response_.IsInitialized() || !last_response_.has_more_results() || |
| !next_req_.has_scanner_id()) { |
| return Status::OK(); |
| } |
| |
| RpcController controller; |
| controller.set_timeout(timeout_); |
| tserver::ScannerKeepAliveRequestPB request; |
| request.set_scanner_id(next_req_.scanner_id()); |
| tserver::ScannerKeepAliveResponsePB response; |
| RETURN_NOT_OK(proxy_->ScannerKeepAlive(request, &response, &controller)); |
| if (response.has_error()) { |
| return StatusFromPB(response.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| bool KuduScanner::Data::MoreTablets() const { |
| CHECK(open_); |
| // TODO(KUDU-565): add a test which has a scan end on a tablet boundary |
| return partition_pruner_.HasMorePartitionKeyRanges(); |
| } |
| |
| void KuduScanner::Data::PrepareRequest(RequestType state) { |
| if (state == KuduScanner::Data::CLOSE) { |
| next_req_.set_batch_size_bytes(0); |
| } else if (has_batch_size_bytes_) { |
| next_req_.set_batch_size_bytes(batch_size_bytes_); |
| } else { |
| next_req_.clear_batch_size_bytes(); |
| } |
| |
| if (state == KuduScanner::Data::NEW) { |
| next_req_.set_call_seq_id(0); |
| } else { |
| next_req_.set_call_seq_id(next_req_.call_seq_id() + 1); |
| } |
| } |
| |
| void KuduScanner::Data::UpdateLastError(const Status& error) { |
| if (last_error_.ok() || last_error_.IsTimedOut()) { |
| last_error_ = error; |
| } |
| } |
| |
| void KuduScanner::Data::SetProjectionSchema(const Schema* schema) { |
| projection_ = schema; |
| client_projection_ = KuduSchema(*schema); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduScanBatch |
| //////////////////////////////////////////////////////////// |
| |
| KuduScanBatch::Data::Data() : projection_(NULL) {} |
| |
| KuduScanBatch::Data::~Data() {} |
| |
| size_t KuduScanBatch::Data::CalculateProjectedRowSize(const Schema& proj) { |
| return proj.byte_size() + |
| (proj.has_nullables() ? BitmapSize(proj.num_columns()) : 0); |
| } |
| |
| Status KuduScanBatch::Data::Reset(RpcController* controller, |
| const Schema* projection, |
| const KuduSchema* client_projection, |
| gscoped_ptr<RowwiseRowBlockPB> data) { |
| CHECK(controller->finished()); |
| controller_.Swap(controller); |
| projection_ = projection; |
| client_projection_ = client_projection; |
| resp_data_.Swap(data.get()); |
| |
| // First, rewrite the relative addresses into absolute ones. |
| if (PREDICT_FALSE(!resp_data_.has_rows_sidecar())) { |
| return Status::Corruption("Server sent invalid response: no row data"); |
| } else { |
| Status s = controller_.GetSidecar(resp_data_.rows_sidecar(), &direct_data_); |
| if (!s.ok()) { |
| return Status::Corruption("Server sent invalid response: row data " |
| "sidecar index corrupt", s.ToString()); |
| } |
| } |
| |
| if (resp_data_.has_indirect_data_sidecar()) { |
| Status s = controller_.GetSidecar(resp_data_.indirect_data_sidecar(), |
| &indirect_data_); |
| if (!s.ok()) { |
| return Status::Corruption("Server sent invalid response: indirect data " |
| "sidecar index corrupt", s.ToString()); |
| } |
| } |
| |
| RETURN_NOT_OK(RewriteRowBlockPointers(*projection_, resp_data_, indirect_data_, &direct_data_)); |
| projected_row_size_ = CalculateProjectedRowSize(*projection_); |
| return Status::OK(); |
| } |
| |
| void KuduScanBatch::Data::ExtractRows(vector<KuduScanBatch::RowPtr>* rows) { |
| int n_rows = resp_data_.num_rows(); |
| rows->resize(n_rows); |
| |
| if (PREDICT_FALSE(n_rows == 0)) { |
| // Early-out here to avoid a UBSAN failure. |
| VLOG(1) << "Extracted 0 rows"; |
| return; |
| } |
| |
| // Initialize each RowPtr with data from the response. |
| // |
| // Doing this resize and array indexing turns out to be noticeably faster |
| // than using reserve and push_back. |
| const uint8_t* src = direct_data_.data(); |
| KuduScanBatch::RowPtr* dst = &(*rows)[0]; |
| while (n_rows > 0) { |
| *dst = KuduScanBatch::RowPtr(projection_, src); |
| dst++; |
| src += projected_row_size_; |
| n_rows--; |
| } |
| VLOG(1) << "Extracted " << rows->size() << " rows"; |
| } |
| |
| void KuduScanBatch::Data::Clear() { |
| resp_data_.Clear(); |
| controller_.Reset(); |
| } |
| |
| } // namespace client |
| } // namespace kudu |