| // Copyright 2014 Cloudera, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "kudu/client/scanner-internal.h" |
| |
| #include <algorithm> |
| #include <boost/bind.hpp> |
| #include <cmath> |
| #include <string> |
| #include <vector> |
| |
| #include "kudu/client/client-internal.h" |
| #include "kudu/client/meta_cache.h" |
| #include "kudu/client/row_result.h" |
| #include "kudu/client/table-internal.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/util/hexdump.h" |
| |
| using std::set; |
| using std::string; |
| |
| namespace kudu { |
| |
| using rpc::RpcController; |
| using tserver::ColumnRangePredicatePB; |
| using tserver::NewScanRequestPB; |
| using tserver::ScanResponsePB; |
| |
| namespace client { |
| |
| using internal::RemoteTabletServer; |
| |
| static const int64_t kNoTimestamp = -1; |
| |
| KuduScanner::Data::Data(KuduTable* table) |
| : open_(false), |
| data_in_open_(false), |
| has_batch_size_bytes_(false), |
| batch_size_bytes_(0), |
| selection_(KuduClient::CLOSEST_REPLICA), |
| read_mode_(READ_LATEST), |
| is_fault_tolerant_(false), |
| snapshot_timestamp_(kNoTimestamp), |
| table_(DCHECK_NOTNULL(table)), |
| projection_(table->schema().schema_), |
| arena_(1024, 1024*1024), |
| spec_encoder_(table->schema().schema_, &arena_), |
| timeout_(MonoDelta::FromMilliseconds(kScanTimeoutMillis)), |
| scan_attempts_(0) { |
| } |
| |
| KuduScanner::Data::~Data() { |
| } |
| |
| Status KuduScanner::Data::CheckForErrors() { |
| if (PREDICT_TRUE(!last_response_.has_error())) { |
| return Status::OK(); |
| } |
| |
| return StatusFromPB(last_response_.error().status()); |
| } |
| |
| void KuduScanner::Data::CopyPredicateBound(const ColumnSchema& col, |
| const void* bound_src, |
| string* bound_dst) { |
| const void* src; |
| size_t size; |
| if (col.type_info()->physical_type() == BINARY) { |
| // Copying a string involves an extra level of indirection through its |
| // owning slice. |
| const Slice* s = reinterpret_cast<const Slice*>(bound_src); |
| src = s->data(); |
| size = s->size(); |
| } else { |
| src = bound_src; |
| size = col.type_info()->size(); |
| } |
| bound_dst->assign(reinterpret_cast<const char*>(src), size); |
| } |
| |
| Status KuduScanner::Data::CanBeRetried(const bool isNewScan, |
| const Status& rpc_status, const Status& server_status, |
| const MonoTime& actual_deadline, const MonoTime& deadline, |
| const vector<RemoteTabletServer*>& candidates, |
| set<string>* blacklist) { |
| CHECK(!rpc_status.ok() || !server_status.ok()); |
| |
| // Check for ERROR_SERVER_TOO_BUSY, which should result in a retry after a delay. |
| if (server_status.ok() && |
| !rpc_status.ok() && |
| controller_.error_response() && |
| controller_.error_response()->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY) { |
| |
| // Exponential backoff with jitter anchored between 10ms and 20ms, and an |
| // upper bound between 2.5s and 5s. |
| MonoDelta sleep = MonoDelta::FromMilliseconds( |
| (10 + rand() % 10) * static_cast<int>(std::pow(2.0, std::min(8, scan_attempts_ - 1)))); |
| MonoTime now = MonoTime::Now(MonoTime::FINE); |
| now.AddDelta(sleep); |
| if (deadline.ComesBefore(now)) { |
| return Status::TimedOut("unable to retry before timeout", rpc_status.ToString()); |
| } |
| LOG(INFO) << "Retrying scan to busy tablet server " << ts_->ToString() |
| << " after " << sleep.ToString() << "; attempt " << scan_attempts_; |
| SleepFor(sleep); |
| return Status::OK(); |
| } |
| |
| // Start by checking network errors. |
| if (!rpc_status.ok()) { |
| if (rpc_status.IsTimedOut() && actual_deadline.Equals(deadline)) { |
| // If we ended because of the overall deadline, we're done. |
| // We didn't wait a full RPC timeout though, so don't mark the tserver as failed. |
| LOG(INFO) << "Scan of tablet " << remote_->tablet_id() << " at " |
| << ts_->ToString() << " deadline expired."; |
| return rpc_status; |
| } else { |
| // All other types of network errors are retriable, and also indicate the tserver is failed. |
| table_->client()->data_->meta_cache_->MarkTSFailed(ts_, rpc_status); |
| } |
| } |
| |
| // If we're in the middle of a batch and doing a non fault-tolerant scan, then |
| // we cannot retry. Non fault-tolerant scans can still be retried on a tablet |
| // boundary (i.e. an OpenTablet call). |
| if (!isNewScan && !is_fault_tolerant_) { |
| return !rpc_status.ok() ? rpc_status : server_status; |
| } |
| |
| // For retries, the correct action depends on the particular failure condition. |
| // |
| // On an RPC error, we retry at a different tablet server. |
| // |
| // If the server returned an error code, it depends: |
| // |
| // - SCANNER_EXPIRED : The scan can be retried at the same tablet server. |
| // |
| // - TABLET_NOT_RUNNING : The scan can be retried at a different tablet server, subject |
| // to the client's specified selection criteria. |
| // |
| // - Any other error : Fatal. This indicates an unexpected error while processing the scan |
| // request. |
| if (rpc_status.ok() && !server_status.ok()) { |
| const tserver::TabletServerErrorPB& error = last_response_.error(); |
| if (error.code() == tserver::TabletServerErrorPB::SCANNER_EXPIRED) { |
| VLOG(1) << "Got SCANNER_EXPIRED error code, non-fatal error."; |
| } else if (error.code() == tserver::TabletServerErrorPB::TABLET_NOT_RUNNING) { |
| VLOG(1) << "Got TABLET_NOT_RUNNING error code, temporarily blacklisting node " |
| << ts_->permanent_uuid(); |
| blacklist->insert(ts_->permanent_uuid()); |
| // We've blacklisted all the live candidate tservers. |
| // Do a short random sleep, clear the temp blacklist, then do another round of retries. |
| if (!candidates.empty() && candidates.size() == blacklist->size()) { |
| MonoDelta sleep_delta = MonoDelta::FromMilliseconds((random() % 5000) + 1000); |
| LOG(INFO) << "All live candidate nodes are unavailable because of transient errors." |
| << " Sleeping for " << sleep_delta.ToMilliseconds() << " ms before trying again."; |
| SleepFor(sleep_delta); |
| blacklist->clear(); |
| } |
| } else { |
| // All other server errors are fatal. Usually indicates a malformed request, e.g. a bad scan |
| // specification. |
| return server_status; |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::Data::OpenTablet(const string& partition_key, |
| const MonoTime& deadline, |
| set<string>* blacklist) { |
| |
| PrepareRequest(KuduScanner::Data::NEW); |
| next_req_.clear_scanner_id(); |
| NewScanRequestPB* scan = next_req_.mutable_new_scan_request(); |
| switch (read_mode_) { |
| case READ_LATEST: scan->set_read_mode(kudu::READ_LATEST); break; |
| case READ_AT_SNAPSHOT: scan->set_read_mode(kudu::READ_AT_SNAPSHOT); break; |
| default: LOG(FATAL) << "Unexpected read mode."; |
| } |
| |
| if (is_fault_tolerant_) { |
| scan->set_order_mode(kudu::ORDERED); |
| } else { |
| scan->set_order_mode(kudu::UNORDERED); |
| } |
| |
| if (last_primary_key_.length() > 0) { |
| VLOG(1) << "Setting NewScanRequestPB last_primary_key to hex value " |
| << HexDump(last_primary_key_); |
| scan->set_last_primary_key(last_primary_key_); |
| } |
| |
| scan->set_cache_blocks(spec_.cache_blocks()); |
| |
| if (snapshot_timestamp_ != kNoTimestamp) { |
| if (PREDICT_FALSE(read_mode_ != READ_AT_SNAPSHOT)) { |
| LOG(WARNING) << "Scan snapshot timestamp set but read mode was READ_LATEST." |
| " Ignoring timestamp."; |
| } else { |
| scan->set_snap_timestamp(snapshot_timestamp_); |
| } |
| } |
| |
| // Set up the predicates. |
| scan->clear_range_predicates(); |
| BOOST_FOREACH(const ColumnRangePredicate& pred, spec_.predicates()) { |
| const ColumnSchema& col = pred.column(); |
| const ValueRange& range = pred.range(); |
| ColumnRangePredicatePB* pb = scan->add_range_predicates(); |
| if (range.has_lower_bound()) { |
| CopyPredicateBound(col, range.lower_bound(), |
| pb->mutable_lower_bound()); |
| } |
| if (range.has_upper_bound()) { |
| CopyPredicateBound(col, range.upper_bound(), |
| pb->mutable_upper_bound()); |
| } |
| ColumnSchemaToPB(col, pb->mutable_column()); |
| } |
| |
| if (spec_.lower_bound_key()) { |
| scan->mutable_start_primary_key()->assign( |
| reinterpret_cast<const char*>(spec_.lower_bound_key()->encoded_key().data()), |
| spec_.lower_bound_key()->encoded_key().size()); |
| } else { |
| scan->clear_start_primary_key(); |
| } |
| if (spec_.exclusive_upper_bound_key()) { |
| scan->mutable_stop_primary_key()->assign( |
| reinterpret_cast<const char*>(spec_.exclusive_upper_bound_key()->encoded_key().data()), |
| spec_.exclusive_upper_bound_key()->encoded_key().size()); |
| } else { |
| scan->clear_stop_primary_key(); |
| } |
| RETURN_NOT_OK(SchemaToColumnPBs(*projection_, scan->mutable_projected_columns(), |
| SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS)); |
| |
| for (int attempt = 1;; attempt++) { |
| Synchronizer sync; |
| table_->client()->data_->meta_cache_->LookupTabletByKey(table_, |
| partition_key, |
| deadline, |
| &remote_, |
| sync.AsStatusCallback()); |
| RETURN_NOT_OK(sync.Wait()); |
| |
| scan->set_tablet_id(remote_->tablet_id()); |
| |
| RemoteTabletServer *ts; |
| vector<RemoteTabletServer*> candidates; |
| Status lookup_status = table_->client()->data_->GetTabletServer( |
| table_->client(), |
| remote_->tablet_id(), |
| selection_, |
| *blacklist, |
| &candidates, |
| &ts); |
| // If we get ServiceUnavailable, this indicates that the tablet doesn't |
| // currently have any known leader. We should sleep and retry, since |
| // it's likely that the tablet is undergoing a leader election and will |
| // soon have one. |
| if (lookup_status.IsServiceUnavailable() && |
| MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) { |
| int sleep_ms = attempt * 100; |
| VLOG(1) << "Tablet " << remote_->tablet_id() << " current unavailable: " |
| << lookup_status.ToString() << ". Sleeping for " << sleep_ms << "ms " |
| << "and retrying..."; |
| SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
| continue; |
| } |
| RETURN_NOT_OK(lookup_status); |
| |
| // Recalculate the deadlines. |
| // If we have other replicas beyond this one to try, then we'll try to |
| // open the scanner with the default RPC timeout. That gives us time to |
| // try other replicas later. Otherwise, we open the scanner using the |
| // full remaining deadline for the user's call. |
| MonoTime rpc_deadline; |
| if (static_cast<int>(candidates.size()) - blacklist->size() > 1) { |
| rpc_deadline = MonoTime::Now(MonoTime::FINE); |
| rpc_deadline.AddDelta(table_->client()->default_rpc_timeout()); |
| rpc_deadline = MonoTime::Earliest(deadline, rpc_deadline); |
| } else { |
| rpc_deadline = deadline; |
| } |
| |
| controller_.Reset(); |
| controller_.set_deadline(rpc_deadline); |
| |
| CHECK(ts->proxy()); |
| ts_ = CHECK_NOTNULL(ts); |
| proxy_ = ts->proxy(); |
| const Status rpc_status = proxy_->Scan(next_req_, &last_response_, &controller_); |
| const Status server_status = CheckForErrors(); |
| if (rpc_status.ok() && server_status.ok()) { |
| scan_attempts_ = 0; |
| break; |
| } |
| scan_attempts_++; |
| RETURN_NOT_OK(CanBeRetried(true, rpc_status, server_status, rpc_deadline, deadline, |
| candidates, blacklist)); |
| } |
| |
| next_req_.clear_new_scan_request(); |
| data_in_open_ = last_response_.has_data(); |
| if (last_response_.has_more_results()) { |
| next_req_.set_scanner_id(last_response_.scanner_id()); |
| VLOG(1) << "Opened tablet " << remote_->tablet_id() |
| << ", scanner ID " << last_response_.scanner_id(); |
| } else if (last_response_.has_data()) { |
| VLOG(1) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned"; |
| } else { |
| VLOG(1) << "Opened tablet " << remote_->tablet_id() << " (no rows), no scanner ID assigned"; |
| } |
| |
| // If present in the response, set the snapshot timestamp and the encoded last |
| // primary key. This is used when retrying the scan elsewhere. The last |
| // primary key is also updated on each scan response. |
| if (is_fault_tolerant_) { |
| CHECK(last_response_.has_snap_timestamp()); |
| snapshot_timestamp_ = last_response_.snap_timestamp(); |
| if (last_response_.has_last_primary_key()) { |
| last_primary_key_ = last_response_.last_primary_key(); |
| } |
| } |
| |
| if (last_response_.has_snap_timestamp()) { |
| table_->client()->data_->UpdateLatestObservedTimestamp(last_response_.snap_timestamp()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::Data::KeepAlive() { |
| if (!open_) return Status::IllegalState("Scanner was not open."); |
| // If there is no scanner to keep alive, we still return Status::OK(). |
| if (!last_response_.IsInitialized() || !last_response_.has_more_results() || |
| !next_req_.has_scanner_id()) { |
| return Status::OK(); |
| } |
| |
| RpcController controller; |
| controller.set_timeout(timeout_); |
| tserver::ScannerKeepAliveRequestPB request; |
| request.set_scanner_id(next_req_.scanner_id()); |
| tserver::ScannerKeepAliveResponsePB response; |
| RETURN_NOT_OK(proxy_->ScannerKeepAlive(request, &response, &controller)); |
| if (response.has_error()) { |
| return StatusFromPB(response.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::Data::ExtractRows(vector<KuduRowResult>* rows) { |
| return ExtractRows(controller_, projection_, &last_response_, rows); |
| } |
| |
| Status KuduScanner::Data::ExtractRows(const RpcController& controller, |
| const Schema* projection, |
| ScanResponsePB* resp, |
| vector<KuduRowResult>* rows) { |
| // First, rewrite the relative addresses into absolute ones. |
| RowwiseRowBlockPB* rowblock_pb = resp->mutable_data(); |
| Slice direct, indirect; |
| |
| if (PREDICT_FALSE(!rowblock_pb->has_rows_sidecar())) { |
| return Status::Corruption("Server sent invalid response: no row data"); |
| } else { |
| Status s = controller.GetSidecar(rowblock_pb->rows_sidecar(), &direct); |
| if (!s.ok()) { |
| return Status::Corruption("Server sent invalid response: row data " |
| "sidecar index corrupt", s.ToString()); |
| } |
| } |
| |
| if (rowblock_pb->has_indirect_data_sidecar()) { |
| Status s = controller.GetSidecar(rowblock_pb->indirect_data_sidecar(), |
| &indirect); |
| if (!s.ok()) { |
| return Status::Corruption("Server sent invalid response: indirect data " |
| "sidecar index corrupt", s.ToString()); |
| } |
| } |
| |
| RETURN_NOT_OK(RewriteRowBlockPointers(*projection, *rowblock_pb, indirect, &direct)); |
| |
| int n_rows = rowblock_pb->num_rows(); |
| if (PREDICT_FALSE(n_rows == 0)) { |
| // Early-out here to avoid a UBSAN failure. |
| VLOG(1) << "Extracted 0 rows"; |
| return Status::OK(); |
| } |
| |
| // Next, allocate a block of KuduRowResults in 'rows'. |
| size_t before = rows->size(); |
| rows->resize(before + n_rows); |
| |
| // Lastly, initialize each KuduRowResult with data from the response. |
| // |
| // Doing this resize and array indexing turns out to be noticeably faster |
| // than using reserve and push_back. |
| int projected_row_size = CalculateProjectedRowSize(*projection); |
| const uint8_t* src = direct.data(); |
| KuduRowResult* dst = &(*rows)[before]; |
| while (n_rows > 0) { |
| dst->Init(projection, src); |
| dst++; |
| src += projected_row_size; |
| n_rows--; |
| } |
| VLOG(1) << "Extracted " << rows->size() - before << " rows"; |
| return Status::OK(); |
| } |
| |
| bool KuduScanner::Data::MoreTablets() const { |
| CHECK(open_); |
| // TODO(KUDU-565): add a test which has a scan end on a tablet boundary |
| |
| if (remote_->partition().partition_key_end().empty()) { |
| // Last tablet -- nothing more to scan. |
| return false; |
| } |
| |
| if (!spec_.exclusive_upper_bound_partition_key().empty() && |
| spec_.exclusive_upper_bound_partition_key() <= remote_->partition().partition_key_end()) { |
| // We are not past the scan's upper bound partition key. |
| return false; |
| } |
| |
| if (!table_->partition_schema().IsSimplePKRangePartitioning(*table_->schema().schema_)) { |
| // We can't do culling yet if the partitioning isn't simple. |
| return true; |
| } |
| |
| if (spec_.exclusive_upper_bound_key() == NULL) { |
| // No upper bound - keep going! |
| return true; |
| } |
| |
| // Otherwise, we have to compare the upper bound. |
| return spec_.exclusive_upper_bound_key()->encoded_key() |
| .compare(remote_->partition().partition_key_end()) > 0; |
| } |
| |
| void KuduScanner::Data::PrepareRequest(RequestType state) { |
| if (state == KuduScanner::Data::CLOSE) { |
| next_req_.set_batch_size_bytes(0); |
| } else if (has_batch_size_bytes_) { |
| next_req_.set_batch_size_bytes(batch_size_bytes_); |
| } else { |
| next_req_.clear_batch_size_bytes(); |
| } |
| |
| if (state == KuduScanner::Data::NEW) { |
| next_req_.set_call_seq_id(0); |
| } else { |
| next_req_.set_call_seq_id(next_req_.call_seq_id() + 1); |
| } |
| } |
| |
| size_t KuduScanner::Data::CalculateProjectedRowSize(const Schema& proj) { |
| return proj.byte_size() + |
| (proj.has_nullables() ? BitmapSize(proj.num_columns()) : 0); |
| } |
| |
| } // namespace client |
| } // namespace kudu |