blob: 2ce965f5337fafb7db0580fcd21ab03ce3a3433f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/client/scan_token-internal.h"
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include "kudu/client/client-internal.h"
#include "kudu/client/client.h"
#include "kudu/client/meta_cache.h"
#include "kudu/client/replica-internal.h"
#include "kudu/client/scanner-internal.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/client/tablet-internal.h"
#include "kudu/client/tablet_server-internal.h"
#include "kudu/common/column_predicate.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/partition.h"
#include "kudu/common/partition_pruner.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/async_util.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace client {
using internal::MetaCache;
KuduScanToken::Data::Data(KuduTable* table,
ScanTokenPB message,
unique_ptr<KuduTablet> tablet)
: table_(table),
message_(std::move(message)),
tablet_(std::move(tablet)) {
}
Status KuduScanToken::Data::IntoKuduScanner(KuduScanner** scanner) const {
return PBIntoScanner(table_->client(), message_, scanner);
}
Status KuduScanToken::Data::Serialize(string* buf) const {
if (!message_.SerializeToString(buf)) {
return Status::Corruption("unable to serialize scan token");
}
return Status::OK();
}
Status KuduScanToken::Data::DeserializeIntoScanner(KuduClient* client,
const std::string& serialized_token,
KuduScanner** scanner) {
ScanTokenPB message;
if (!message.ParseFromString(serialized_token)) {
return Status::Corruption("unable to deserialize scan token");
}
return PBIntoScanner(client, message, scanner);
}
Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
const ScanTokenPB& message,
KuduScanner** scanner) {
for (int32_t feature : message.feature_flags()) {
if (!ScanTokenPB::Feature_IsValid(feature) || feature == ScanTokenPB::Unknown) {
return Status::NotSupported(
"scan token requires features not supported by this client version");
}
}
sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(message.table_name(), &table));
Schema* schema = table->schema().schema_;
unique_ptr<KuduScanner> scan_builder(new KuduScanner(table.get()));
vector<int> column_indexes;
for (const ColumnSchemaPB& column : message.projected_columns()) {
int column_idx = schema->find_column(column.name());
if (column_idx == Schema::kColumnNotFound) {
return Status::IllegalState("unknown column in scan token", column.name());
}
DataType expected_type = schema->column(column_idx).type_info()->type();
if (column.type() != expected_type) {
return Status::IllegalState(Substitute(
"invalid type $0 for column '$1' in scan token, expected: $2",
DataType_Name(column.type()), column.name(), DataType_Name(expected_type)));
}
bool expected_is_nullable = schema->column(column_idx).is_nullable();
if (column.is_nullable() != expected_is_nullable) {
return Status::IllegalState(Substitute(
"invalid nullability for column '$0' in scan token, expected: $1",
column.name(), expected_is_nullable ? "NULLABLE" : "NOT NULL"));
}
column_indexes.push_back(column_idx);
}
RETURN_NOT_OK(scan_builder->SetProjectedColumnIndexes(column_indexes));
ScanConfiguration* configuration = scan_builder->data_->mutable_configuration();
for (const ColumnPredicatePB& pb : message.column_predicates()) {
boost::optional<ColumnPredicate> predicate;
RETURN_NOT_OK(ColumnPredicateFromPB(*schema, configuration->arena(), pb, &predicate));
configuration->AddConjunctPredicate(std::move(*predicate));
}
if (message.has_lower_bound_primary_key()) {
RETURN_NOT_OK(scan_builder->AddLowerBoundRaw(message.lower_bound_primary_key()));
}
if (message.has_upper_bound_primary_key()) {
RETURN_NOT_OK(scan_builder->AddExclusiveUpperBoundRaw(message.upper_bound_primary_key()));
}
if (message.has_lower_bound_partition_key()) {
RETURN_NOT_OK(scan_builder->AddLowerBoundPartitionKeyRaw(message.lower_bound_partition_key()));
}
if (message.has_upper_bound_partition_key()) {
RETURN_NOT_OK(scan_builder->AddExclusiveUpperBoundPartitionKeyRaw(
message.upper_bound_partition_key()));
}
if (message.has_limit()) {
// TODO(KUDU-16)
}
if (message.has_read_mode()) {
switch (message.read_mode()) {
case ReadMode::READ_LATEST:
RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_LATEST));
break;
case ReadMode::READ_AT_SNAPSHOT:
RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
break;
case ReadMode::READ_YOUR_WRITES:
RETURN_NOT_OK(scan_builder->SetReadMode(KuduScanner::READ_YOUR_WRITES));
break;
default:
return Status::InvalidArgument("scan token has unrecognized read mode");
}
}
if (message.fault_tolerant()) {
RETURN_NOT_OK(scan_builder->SetFaultTolerant());
}
if (message.has_snap_timestamp()) {
RETURN_NOT_OK(scan_builder->SetSnapshotRaw(message.snap_timestamp()));
}
RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks()));
// Since the latest observed timestamp from the given client might be
// more recent than the one when the token is created, the performance
// of the scan could be affected if using READ_YOUR_WRITES mode.
//
// We choose to keep it this way because the code path is simpler.
// Beside, in practice it's very rarely the case that an active client
// is permanently being written to and read from (using scan tokens).
//
// However it is worth to note that this is a possible optimization, if
// we ever notice READ_YOUR_WRITES read stalling with scan tokens.
if (message.has_propagated_timestamp()) {
client->data_->UpdateLatestObservedTimestamp(message.propagated_timestamp());
}
if (message.has_batch_size_bytes()) {
RETURN_NOT_OK(scan_builder->SetBatchSizeBytes(message.batch_size_bytes()));
}
if (message.has_scan_request_timeout_ms()) {
scan_builder->SetTimeoutMillis(message.scan_request_timeout_ms());
}
*scanner = scan_builder.release();
return Status::OK();
}
KuduScanTokenBuilder::Data::Data(KuduTable* table)
: configuration_(table) {
}
Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
KuduTable* table = configuration_.table_;
KuduClient* client = table->client();
configuration_.OptimizeScanSpec();
if (configuration_.spec().CanShortCircuit()) {
return Status::OK();
}
ScanTokenPB pb;
pb.set_table_name(table->name());
RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), pb.mutable_projected_columns(),
SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
if (configuration_.spec().lower_bound_key()) {
pb.mutable_lower_bound_primary_key()->assign(
reinterpret_cast<const char*>(configuration_.spec().lower_bound_key()->encoded_key().data()),
configuration_.spec().lower_bound_key()->encoded_key().size());
} else {
pb.clear_lower_bound_primary_key();
}
if (configuration_.spec().exclusive_upper_bound_key()) {
pb.mutable_upper_bound_primary_key()->assign(reinterpret_cast<const char*>(
configuration_.spec().exclusive_upper_bound_key()->encoded_key().data()),
configuration_.spec().exclusive_upper_bound_key()->encoded_key().size());
} else {
pb.clear_upper_bound_primary_key();
}
for (const auto& predicate_pair : configuration_.spec().predicates()) {
ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
}
const KuduScanner::ReadMode read_mode = configuration_.read_mode();
switch (read_mode) {
case KuduScanner::READ_LATEST:
pb.set_read_mode(kudu::READ_LATEST);
if (configuration_.has_snapshot_timestamp()) {
return Status::InvalidArgument("Snapshot timestamp should only be configured "
"for READ_AT_SNAPSHOT scan mode.");
}
break;
case KuduScanner::READ_AT_SNAPSHOT:
pb.set_read_mode(kudu::READ_AT_SNAPSHOT);
if (configuration_.has_snapshot_timestamp()) {
pb.set_snap_timestamp(configuration_.snapshot_timestamp());
}
break;
case KuduScanner::READ_YOUR_WRITES:
pb.set_read_mode(kudu::READ_YOUR_WRITES);
if (configuration_.has_snapshot_timestamp()) {
return Status::InvalidArgument("Snapshot timestamp should only be configured "
"for READ_AT_SNAPSHOT scan mode.");
}
break;
default:
LOG(FATAL) << Substitute("$0: unexpected read mode", read_mode);
}
pb.set_cache_blocks(configuration_.spec().cache_blocks());
pb.set_fault_tolerant(configuration_.is_fault_tolerant());
pb.set_propagated_timestamp(client->GetLatestObservedTimestamp());
pb.set_scan_request_timeout_ms(configuration_.timeout().ToMilliseconds());
if (configuration_.has_batch_size_bytes()) {
pb.set_batch_size_bytes(configuration_.batch_size_bytes());
}
MonoTime deadline = MonoTime::Now() + client->default_admin_operation_timeout();
PartitionPruner pruner;
pruner.Init(*table->schema().schema_, table->partition_schema(), configuration_.spec());
while (pruner.HasMorePartitionKeyRanges()) {
scoped_refptr<internal::RemoteTablet> tablet;
Synchronizer sync;
const string& partition_key = pruner.NextPartitionKey();
client->data_->meta_cache_->LookupTabletByKey(table,
partition_key,
deadline,
MetaCache::LookupType::kLowerBound,
&tablet,
sync.AsStatusCallback());
Status s = sync.Wait();
if (s.IsNotFound()) {
// No more tablets in the table.
pruner.RemovePartitionKeyRange("");
continue;
} else {
RETURN_NOT_OK(s);
}
// Check if the meta cache returned a tablet covering a partition key range past
// what we asked for. This can happen if the requested partition key falls
// in a non-covered range. In this case we can potentially prune the tablet.
if (partition_key < tablet->partition().partition_key_start() &&
pruner.ShouldPrune(tablet->partition())) {
pruner.RemovePartitionKeyRange(tablet->partition().partition_key_end());
continue;
}
vector<internal::RemoteReplica> replicas;
tablet->GetRemoteReplicas(&replicas);
vector<const KuduReplica*> client_replicas;
ElementDeleter deleter(&client_replicas);
// Convert the replicas from their internal format to something appropriate
// for clients.
for (const auto& r : replicas) {
vector<HostPort> host_ports;
r.ts->GetHostPorts(&host_ports);
if (host_ports.empty()) {
return Status::IllegalState(Substitute(
"No host found for tablet server $0", r.ts->ToString()));
}
unique_ptr<KuduTabletServer> client_ts(new KuduTabletServer);
client_ts->data_ = new KuduTabletServer::Data(r.ts->permanent_uuid(),
host_ports[0]);
bool is_leader = r.role == consensus::RaftPeerPB::LEADER;
bool is_voter = is_leader || r.role == consensus::RaftPeerPB::FOLLOWER;
unique_ptr<KuduReplica> client_replica(new KuduReplica);
client_replica->data_ = new KuduReplica::Data(is_leader, is_voter,
std::move(client_ts));
client_replicas.push_back(client_replica.release());
}
unique_ptr<KuduTablet> client_tablet(new KuduTablet);
client_tablet->data_ = new KuduTablet::Data(tablet->tablet_id(),
std::move(client_replicas));
client_replicas.clear();
// Create the scan token itself.
ScanTokenPB message;
message.CopyFrom(pb);
message.set_lower_bound_partition_key(
tablet->partition().partition_key_start());
message.set_upper_bound_partition_key(
tablet->partition().partition_key_end());
unique_ptr<KuduScanToken> client_scan_token(new KuduScanToken);
client_scan_token->data_ =
new KuduScanToken::Data(table,
std::move(message),
std::move(client_tablet));
tokens->push_back(client_scan_token.release());
pruner.RemovePartitionKeyRange(tablet->partition().partition_key_end());
}
return Status::OK();
}
} // namespace client
} // namespace kudu