blob: 98e89eacd9e7c4710262812913eae5a86dd439e6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_CLIENT_SCANNER_INTERNAL_H
#define KUDU_CLIENT_SCANNER_INTERNAL_H
#include <cstddef>
#include <cstdint>
#include <memory>
#include <ostream>
#include <set>
#include <string>
#include <vector>
#include <glog/logging.h>
#include "kudu/client/client.h"
#include "kudu/client/columnar_scan_batch.h"
#include "kudu/client/resource_metrics.h"
#include "kudu/client/row_result.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_configuration.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/partition_pruner.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
namespace kudu {
class MonoTime;
class Schema;
namespace tserver {
class TabletServerServiceProxy;
} // tserver
namespace client {
class KuduSchema;
namespace internal {
class RemoteTablet;
class RemoteTabletServer;
} // namespace internal
// The result of KuduScanner::Data::AnalyzeResponse.
//
// This provides a more specific enum for handling the possible error conditions in a Scan
// RPC.
struct ScanRpcStatus {
enum Result {
OK,
// The request was malformed (e.g. bad schema, etc).
INVALID_REQUEST,
// The server received the request but it was not ready to serve it right
// away. It might happen that the server was too busy and did not have
// necessary resources or information to serve the request but it
// anticipates it should be ready to serve the request really soon, so it's
// worth retrying the request at a later time.
SERVICE_UNAVAILABLE,
// The deadline for the whole batch was exceeded.
OVERALL_DEADLINE_EXCEEDED,
// The deadline for an individual RPC was exceeded, but we have more time left to try
// on other hosts.
RPC_DEADLINE_EXCEEDED,
// The authentication token supplied by the client is invalid. The token
// has likely expired.
RPC_INVALID_AUTHENTICATION_TOKEN,
// The authorization token supplied by the client is invalid. The token has
// likely expired.
RPC_INVALID_AUTHORIZATION_TOKEN,
// The requestor was not authorized to make the request.
SCAN_NOT_AUTHORIZED,
// Another RPC-system error (e.g. NetworkError because the TS was down).
RPC_ERROR,
// The scanner on the server side expired.
SCANNER_EXPIRED,
// The destination tablet was not running (e.g. in the process of bootstrapping).
TABLET_NOT_RUNNING,
// The destination tablet does not exist (e.g. because the replica was deleted).
TABLET_NOT_FOUND,
// Some other unknown tablet server error. This indicates that the TS was running
// but some problem occurred other than the ones enumerated above.
OTHER_TS_ERROR
};
Result result;
Status status;
};
class KuduScanner::Data {
public:
explicit Data(KuduTable* table);
~Data();
// Calculates a deadline and sends the next RPC for this scanner. The deadline for the
// RPC is calculated based on whether 'allow_time_for_failover' is true. If true,
// the deadline used for the RPC will be shortened so that, on timeout, there will
// be enough time for another attempt to a different server. If false, then the RPC
// will use 'overall_deadline' as its deadline.
//
// The RPC and TS proxy should already have been prepared in next_req_, proxy_, etc.
ScanRpcStatus SendScanRpc(const MonoTime& overall_deadline, bool allow_time_for_failover);
// Called when KuduScanner::NextBatch or KuduScanner::Data::OpenTablet result in an RPC or
// server error.
//
// If the provided 'err' indicates the error was retryable, then returns Status::OK()
// and potentially inserts the current server into 'blacklist' if the retry should be
// made on a different replica. If the current server seems healthy, but the scanner expired,
// sets 'needs_reopen' to true to indicate that the client should re-open a new scanner.
//
// If 'needs_reopen' is nullptr, then it is not set.
//
// This function may also sleep in case the error suggests that backoff is necessary.
Status HandleError(const ScanRpcStatus& err,
const MonoTime& deadline,
std::set<std::string>* blacklist,
bool* needs_reopen);
// Opens the next tablet in the scan, or returns Status::NotFound if there are
// no more tablets to scan.
//
// The deadline is the time budget for this operation.
// The blacklist is used to temporarily filter out nodes that are experiencing transient errors.
// This blacklist may be modified by the callee.
Status OpenNextTablet(const MonoTime& deadline, std::set<std::string>* blacklist);
// Open the current tablet in the scan again.
// See OpenNextTablet for options.
Status ReopenCurrentTablet(const MonoTime& deadline, std::set<std::string>* blacklist);
// Open the tablet to scan.
Status OpenTablet(const std::string& partition_key,
const MonoTime& deadline,
std::set<std::string>* blacklist);
Status KeepAlive();
// Returns whether there may exist more tablets to scan.
//
// This method does not take into account any non-covered range partitions
// that may exist in the table, so it should only be used as a hint.
//
// Note: there may not be any actual matching rows in subsequent tablets,
// but we won't know until we scan them.
bool MoreTablets() const;
// Possible scan requests.
enum RequestType {
// A new scan of a particular tablet.
NEW,
// A continuation of an existing scan (to read more rows).
CONTINUE,
// A close of a partially-completed scan. Complete scans are closed
// automatically by the tablet server.
CLOSE
};
// Modifies fields in 'next_req_' in preparation for a new request.
void PrepareRequest(RequestType state);
// Update 'last_error_' if need be. Should be invoked whenever a
// non-fatal (i.e. retriable) scan error is encountered.
void UpdateLastError(const Status& error);
const ScanConfiguration& configuration() const {
return configuration_;
}
ScanConfiguration* mutable_configuration() {
return &configuration_;
}
ScanConfiguration configuration_;
bool open_;
bool data_in_open_;
// Set to true if the scan is known to be empty based on predicates and
// primary key bounds.
bool short_circuit_;
// The encoded last primary key from the most recent tablet scan response.
std::string last_primary_key_;
internal::RemoteTabletServer* ts_;
// The proxy can be derived from the RemoteTabletServer, but this involves retaking the
// meta cache lock. Keeping our own shared_ptr avoids this overhead.
std::shared_ptr<tserver::TabletServerServiceProxy> proxy_;
// The next scan request to be sent. This is cached as a field
// since most scan requests will share the scanner ID with the previous
// request.
tserver::ScanRequestPB next_req_;
// The last response received from the server. Cached for buffer reuse.
tserver::ScanResponsePB last_response_;
// RPC controller for the last in-flight RPC.
rpc::RpcController controller_;
// The table we're scanning.
sp::shared_ptr<KuduTable> table_;
PartitionPruner partition_pruner_;
// The tablet we're scanning.
scoped_refptr<internal::RemoteTablet> remote_;
// Number of attempts since the last successful scan.
int scan_attempts_;
// Number of rows already returned.
int64_t num_rows_returned_;
// The deprecated "NextBatch(vector<KuduRowResult>*) API requires some local
// storage for the actual row data. If that API is used, this member keeps the
// actual storage for the batch that is returned.
KuduScanBatch batch_for_old_api_;
// The latest error experienced by this scan that provoked a retry. If the
// scan times out, this error will be incorporated into the status that is
// passed back to the client.
//
// TODO: This and the overall scan retry logic duplicates much of RpcRetrier.
Status last_error_;
// The scanner's cumulative resource metrics since the scan was started.
ResourceMetrics resource_metrics_;
// Returns a text description of the scan suitable for debug printing.
//
// This method will not return sensitive predicate information, so it's
// suitable for use in client-side logging (as opposed to Scanner::ToString).
std::string DebugString() const;
private:
// Analyze the response of the last Scan RPC made by this scanner.
//
// The error handling of a scan RPC is fairly complex, since we have to handle
// some errors which happen at the network layer, some which happen generically
// at the RPC layer, and some which are scanner-specific. This function consolidates
// the various different error situations into a single enum code and Status.
//
// 'rpc_status': the Status directly returned by the RPC Scan() method.
// 'overall_deadline': the user-provided deadline for the scanner batch
// 'rpc_deadline': the deadline that was used for this specific RPC, which
// might be earlier than the overall deadline, in order to
// leave more time for further retries on other hosts.
ScanRpcStatus AnalyzeResponse(const Status& rpc_status,
const MonoTime& overall_deadline,
const MonoTime& rpc_deadline);
// Add additional details to the status message, such as number of retries,
// original cause of the error, etc. Returns a cloned object.
Status EnrichStatusMessage(Status s) const;
void UpdateResourceMetrics();
DISALLOW_COPY_AND_ASSIGN(Data);
};
namespace internal {
class ScanBatchDataInterface {
public:
virtual ~ScanBatchDataInterface() = default;
virtual Status Reset(rpc::RpcController* controller,
const Schema* projection,
const KuduSchema* client_projection,
uint64_t row_format_flags,
tserver::ScanResponsePB* response) = 0;
virtual void Clear() = 0;
};
} // namespace internal
class KuduScanBatch::Data : public internal::ScanBatchDataInterface {
public:
Data();
~Data();
Status Reset(rpc::RpcController* controller,
const Schema* projection,
const KuduSchema* client_projection,
uint64_t row_format_flags,
tserver::ScanResponsePB* response) override;
int num_rows() const {
return resp_data_.num_rows();
}
KuduRowResult row(int idx) {
DCHECK_EQ(row_format_flags_, KuduScanner::NO_FLAGS)
<< "Cannot decode individual rows. Row format flags were set: "
<< row_format_flags_;
DCHECK_GE(idx, 0);
DCHECK_LT(idx, num_rows());
if (direct_data_.empty()) {
return KuduRowResult(projection_, nullptr);
}
int offset = idx * projected_row_size_;
return KuduRowResult(projection_, &direct_data_[offset]);
}
void ExtractRows(std::vector<KuduScanBatch::RowPtr>* rows);
void Clear() override;
// Returns the size of a row for the given projection 'proj'.
static size_t CalculateProjectedRowSize(const Schema& proj);
// The RPC controller for the RPC which returned this batch.
// Holding on to the controller ensures we hold on to the indirect data
// which contains the rows.
rpc::RpcController controller_;
// The PB which contains the "direct data" slice.
RowwiseRowBlockPB resp_data_;
// Slices into the direct and indirect row data, whose lifetime is ensured
// by the members above.
Slice direct_data_, indirect_data_;
// The projection being scanned.
const Schema* projection_;
// The KuduSchema version of 'projection_'
const KuduSchema* client_projection_;
// The row format flags that were passed to the KuduScanner.
// See: KuduScanner::SetRowFormatFlags()
uint64_t row_format_flags_;
// The number of bytes of direct data for each row.
size_t projected_row_size_;
};
class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface {
public:
Status Reset(rpc::RpcController* controller,
const Schema* projection,
const KuduSchema* client_projection,
uint64_t row_format_flags,
tserver::ScanResponsePB* response) override;
void Clear() override;
Status GetFixedLengthColumn(int idx, Slice* data) const;
Status GetVariableLengthColumn(int idx, Slice* offsets, Slice* data) const;
Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
private:
Status CheckColumnIndex(int idx) const;
friend class KuduColumnarScanBatch;
// The RPC controller for the RPC which returned this batch.
// Holding on to the controller ensures we hold on to the
// sidecars which contain the actual data.
rpc::RpcController controller_;
// The PB which contains the "direct data" slice.
ColumnarRowBlockPB resp_data_;
// The projection being scanned.
const Schema* projection_;
// The KuduSchema version of 'projection_'
const KuduSchema* client_projection_;
};
} // namespace client
} // namespace kudu
#endif