| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/client/client.h" |
| |
| #include <algorithm> |
| #include <boost/bind.hpp> |
| #include <memory> |
| #include <mutex> |
| #include <set> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "kudu/client/batcher.h" |
| #include "kudu/client/callbacks.h" |
| #include "kudu/client/client-internal.h" |
| #include "kudu/client/client_builder-internal.h" |
| #include "kudu/client/error-internal.h" |
| #include "kudu/client/error_collector.h" |
| #include "kudu/client/meta_cache.h" |
| #include "kudu/client/row_result.h" |
| #include "kudu/client/scan_predicate-internal.h" |
| #include "kudu/client/scan_token-internal.h" |
| #include "kudu/client/scanner-internal.h" |
| #include "kudu/client/schema-internal.h" |
| #include "kudu/client/session-internal.h" |
| #include "kudu/client/table-internal.h" |
| #include "kudu/client/table_alterer-internal.h" |
| #include "kudu/client/table_creator-internal.h" |
| #include "kudu/client/tablet_server-internal.h" |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/row_operations.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/master/master.h" // TODO: remove this include - just needed for default port |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/util/init.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/net/dns_resolver.h" |
| #include "kudu/util/oid_generator.h" |
| #include "kudu/util/version_info.h" |
| |
| using kudu::master::AlterTableRequestPB; |
| using kudu::master::AlterTableRequestPB_Step; |
| using kudu::master::AlterTableResponsePB; |
| using kudu::master::CreateTableRequestPB; |
| using kudu::master::CreateTableResponsePB; |
| using kudu::master::DeleteTableRequestPB; |
| using kudu::master::DeleteTableResponsePB; |
| using kudu::master::GetTableSchemaRequestPB; |
| using kudu::master::GetTableSchemaResponsePB; |
| using kudu::master::ListTablesRequestPB; |
| using kudu::master::ListTablesResponsePB; |
| using kudu::master::ListTabletServersRequestPB; |
| using kudu::master::ListTabletServersResponsePB; |
| using kudu::master::ListTabletServersResponsePB_Entry; |
| using kudu::master::MasterServiceProxy; |
| using kudu::master::TabletLocationsPB; |
| using kudu::rpc::Messenger; |
| using kudu::rpc::MessengerBuilder; |
| using kudu::rpc::RpcController; |
| using kudu::tserver::ScanResponsePB; |
| using std::pair; |
| using std::set; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| |
| MAKE_ENUM_LIMITS(kudu::client::KuduSession::FlushMode, |
| kudu::client::KuduSession::AUTO_FLUSH_SYNC, |
| kudu::client::KuduSession::MANUAL_FLUSH); |
| |
| MAKE_ENUM_LIMITS(kudu::client::KuduSession::ExternalConsistencyMode, |
| kudu::client::KuduSession::CLIENT_PROPAGATED, |
| kudu::client::KuduSession::COMMIT_WAIT); |
| |
| MAKE_ENUM_LIMITS(kudu::client::KuduScanner::ReadMode, |
| kudu::client::KuduScanner::READ_LATEST, |
| kudu::client::KuduScanner::READ_AT_SNAPSHOT); |
| |
| MAKE_ENUM_LIMITS(kudu::client::KuduScanner::OrderMode, |
| kudu::client::KuduScanner::UNORDERED, |
| kudu::client::KuduScanner::ORDERED); |
| |
| namespace kudu { |
| namespace client { |
| |
| using internal::Batcher; |
| using internal::ErrorCollector; |
| using internal::MetaCache; |
| using sp::shared_ptr; |
| |
| static const char* kProgName = "kudu_client"; |
| |
| // We need to reroute all logging to stderr when the client library is |
| // loaded. GoogleOnceInit() can do that, but there are multiple entry |
| // points into the client code, and it'd need to be called in each one. |
| // So instead, let's use a constructor function. |
| // |
| // Should this be restricted to just the exported client build? Probably |
| // not, as any application using the library probably wants stderr logging |
| // more than file logging. |
| __attribute__((constructor)) |
| static void InitializeBasicLogging() { |
| InitGoogleLoggingSafeBasic(kProgName); |
| } |
| |
| // Adapts between the internal LogSeverity and the client's KuduLogSeverity. |
| static void LoggingAdapterCB(KuduLoggingCallback* user_cb, |
| LogSeverity severity, |
| const char* filename, |
| int line_number, |
| const struct ::tm* time, |
| const char* message, |
| size_t message_len) { |
| KuduLogSeverity client_severity; |
| switch (severity) { |
| case kudu::SEVERITY_INFO: |
| client_severity = SEVERITY_INFO; |
| break; |
| case kudu::SEVERITY_WARNING: |
| client_severity = SEVERITY_WARNING; |
| break; |
| case kudu::SEVERITY_ERROR: |
| client_severity = SEVERITY_ERROR; |
| break; |
| case kudu::SEVERITY_FATAL: |
| client_severity = SEVERITY_FATAL; |
| break; |
| default: |
| LOG(FATAL) << "Unknown Kudu log severity: " << severity; |
| } |
| user_cb->Run(client_severity, filename, line_number, time, |
| message, message_len); |
| } |
| |
| void InstallLoggingCallback(KuduLoggingCallback* cb) { |
| RegisterLoggingCallback(Bind(&LoggingAdapterCB, Unretained(cb))); |
| } |
| |
| void UninstallLoggingCallback() { |
| UnregisterLoggingCallback(); |
| } |
| |
| void SetVerboseLogLevel(int level) { |
| FLAGS_v = level; |
| } |
| |
| Status SetInternalSignalNumber(int signum) { |
| return SetStackTraceSignal(signum); |
| } |
| |
| std::string GetShortVersionString() { |
| return VersionInfo::GetShortVersionString(); |
| } |
| |
| std::string GetAllVersionInfo() { |
| return VersionInfo::GetAllVersionInfo(); |
| } |
| |
| KuduClientBuilder::KuduClientBuilder() |
| : data_(new KuduClientBuilder::Data()) { |
| } |
| |
| KuduClientBuilder::~KuduClientBuilder() { |
| delete data_; |
| } |
| |
| KuduClientBuilder& KuduClientBuilder::clear_master_server_addrs() { |
| data_->master_server_addrs_.clear(); |
| return *this; |
| } |
| |
| KuduClientBuilder& KuduClientBuilder::master_server_addrs(const vector<string>& addrs) { |
| for (const string& addr : addrs) { |
| data_->master_server_addrs_.push_back(addr); |
| } |
| return *this; |
| } |
| |
| KuduClientBuilder& KuduClientBuilder::add_master_server_addr(const string& addr) { |
| data_->master_server_addrs_.push_back(addr); |
| return *this; |
| } |
| |
| KuduClientBuilder& KuduClientBuilder::default_admin_operation_timeout(const MonoDelta& timeout) { |
| data_->default_admin_operation_timeout_ = timeout; |
| return *this; |
| } |
| |
| KuduClientBuilder& KuduClientBuilder::default_rpc_timeout(const MonoDelta& timeout) { |
| data_->default_rpc_timeout_ = timeout; |
| return *this; |
| } |
| |
| Status KuduClientBuilder::Build(shared_ptr<KuduClient>* client) { |
| RETURN_NOT_OK(CheckCPUFlags()); |
| |
| shared_ptr<KuduClient> c(new KuduClient()); |
| |
| // Init messenger. |
| MessengerBuilder builder("client"); |
| RETURN_NOT_OK(builder.Build(&c->data_->messenger_)); |
| |
| c->data_->master_server_addrs_ = data_->master_server_addrs_; |
| c->data_->default_admin_operation_timeout_ = data_->default_admin_operation_timeout_; |
| c->data_->default_rpc_timeout_ = data_->default_rpc_timeout_; |
| |
| // Let's allow for plenty of time for discovering the master the first |
| // time around. |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(c->default_admin_operation_timeout()); |
| RETURN_NOT_OK_PREPEND(c->data_->SetMasterServerProxy(c.get(), deadline), |
| "Could not locate the leader master"); |
| |
| c->data_->meta_cache_.reset(new MetaCache(c.get())); |
| c->data_->dns_resolver_.reset(new DnsResolver()); |
| |
| // Init local host names used for locality decisions. |
| RETURN_NOT_OK_PREPEND(c->data_->InitLocalHostNames(), |
| "Could not determine local host names"); |
| |
| c->data_->request_tracker_ = new rpc::RequestTracker(c->data_->client_id_); |
| |
| client->swap(c); |
| return Status::OK(); |
| } |
| |
| KuduClient::KuduClient() |
| : data_(new KuduClient::Data()) { |
| static ObjectIdGenerator oid_generator; |
| data_->client_id_ = oid_generator.Next(); |
| } |
| |
| KuduClient::~KuduClient() { |
| delete data_; |
| } |
| |
| KuduTableCreator* KuduClient::NewTableCreator() { |
| return new KuduTableCreator(this); |
| } |
| |
| Status KuduClient::IsCreateTableInProgress(const string& table_name, |
| bool *create_in_progress) { |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(default_admin_operation_timeout()); |
| return data_->IsCreateTableInProgress(this, table_name, deadline, create_in_progress); |
| } |
| |
| Status KuduClient::DeleteTable(const string& table_name) { |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(default_admin_operation_timeout()); |
| return data_->DeleteTable(this, table_name, deadline); |
| } |
| |
| KuduTableAlterer* KuduClient::NewTableAlterer(const string& name) { |
| return new KuduTableAlterer(this, name); |
| } |
| |
| Status KuduClient::IsAlterTableInProgress(const string& table_name, |
| bool *alter_in_progress) { |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(default_admin_operation_timeout()); |
| return data_->IsAlterTableInProgress(this, table_name, deadline, alter_in_progress); |
| } |
| |
| Status KuduClient::GetTableSchema(const string& table_name, |
| KuduSchema* schema) { |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(default_admin_operation_timeout()); |
| string table_id_ignored; |
| PartitionSchema partition_schema; |
| return data_->GetTableSchema(this, |
| table_name, |
| deadline, |
| schema, |
| &partition_schema, |
| &table_id_ignored); |
| } |
| |
| Status KuduClient::ListTabletServers(vector<KuduTabletServer*>* tablet_servers) { |
| ListTabletServersRequestPB req; |
| ListTabletServersResponsePB resp; |
| |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(default_admin_operation_timeout()); |
| Status s = |
| data_->SyncLeaderMasterRpc<ListTabletServersRequestPB, ListTabletServersResponsePB>( |
| deadline, |
| this, |
| req, |
| &resp, |
| "ListTabletServers", |
| &MasterServiceProxy::ListTabletServers, |
| {}); |
| RETURN_NOT_OK(s); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| for (int i = 0; i < resp.servers_size(); i++) { |
| const ListTabletServersResponsePB_Entry& e = resp.servers(i); |
| auto ts = new KuduTabletServer(); |
| ts->data_ = new KuduTabletServer::Data(e.instance_id().permanent_uuid(), |
| e.registration().rpc_addresses(0).host()); |
| tablet_servers->push_back(ts); |
| } |
| return Status::OK(); |
| } |
| |
| Status KuduClient::ListTables(vector<string>* tables, |
| const string& filter) { |
| ListTablesRequestPB req; |
| ListTablesResponsePB resp; |
| |
| if (!filter.empty()) { |
| req.set_name_filter(filter); |
| } |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(default_admin_operation_timeout()); |
| Status s = |
| data_->SyncLeaderMasterRpc<ListTablesRequestPB, ListTablesResponsePB>( |
| deadline, |
| this, |
| req, |
| &resp, |
| "ListTables", |
| &MasterServiceProxy::ListTables, |
| {}); |
| RETURN_NOT_OK(s); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| for (int i = 0; i < resp.tables_size(); i++) { |
| tables->push_back(resp.tables(i).name()); |
| } |
| return Status::OK(); |
| } |
| |
| Status KuduClient::TableExists(const string& table_name, bool* exists) { |
| std::vector<std::string> tables; |
| RETURN_NOT_OK(ListTables(&tables, table_name)); |
| for (const string& table : tables) { |
| if (table == table_name) { |
| *exists = true; |
| return Status::OK(); |
| } |
| } |
| *exists = false; |
| return Status::OK(); |
| } |
| |
| Status KuduClient::OpenTable(const string& table_name, |
| shared_ptr<KuduTable>* table) { |
| KuduSchema schema; |
| string table_id; |
| PartitionSchema partition_schema; |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(default_admin_operation_timeout()); |
| RETURN_NOT_OK(data_->GetTableSchema(this, |
| table_name, |
| deadline, |
| &schema, |
| &partition_schema, |
| &table_id)); |
| |
| // TODO: in the future, probably will look up the table in some map to reuse |
| // KuduTable instances. |
| table->reset(new KuduTable(shared_from_this(), table_name, table_id, |
| schema, partition_schema)); |
| return Status::OK(); |
| } |
| |
| shared_ptr<KuduSession> KuduClient::NewSession() { |
| shared_ptr<KuduSession> ret(new KuduSession(shared_from_this())); |
| ret->data_->Init(ret); |
| return ret; |
| } |
| |
| bool KuduClient::IsMultiMaster() const { |
| return data_->master_server_addrs_.size() > 1; |
| } |
| |
| const MonoDelta& KuduClient::default_admin_operation_timeout() const { |
| return data_->default_admin_operation_timeout_; |
| } |
| |
| const MonoDelta& KuduClient::default_rpc_timeout() const { |
| return data_->default_rpc_timeout_; |
| } |
| |
| const uint64_t KuduClient::kNoTimestamp = 0; |
| |
| uint64_t KuduClient::GetLatestObservedTimestamp() const { |
| return data_->GetLatestObservedTimestamp(); |
| } |
| |
| void KuduClient::SetLatestObservedTimestamp(uint64_t ht_timestamp) { |
| data_->UpdateLatestObservedTimestamp(ht_timestamp); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduTableCreator |
| //////////////////////////////////////////////////////////// |
| |
| KuduTableCreator::KuduTableCreator(KuduClient* client) |
| : data_(new KuduTableCreator::Data(client)) { |
| } |
| |
| KuduTableCreator::~KuduTableCreator() { |
| delete data_; |
| } |
| |
| KuduTableCreator& KuduTableCreator::table_name(const string& name) { |
| data_->table_name_ = name; |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::schema(const KuduSchema* schema) { |
| data_->schema_ = schema; |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::add_hash_partitions(const std::vector<std::string>& columns, |
| int32_t num_buckets) { |
| return add_hash_partitions(columns, num_buckets, 0); |
| } |
| |
| KuduTableCreator& KuduTableCreator::add_hash_partitions(const std::vector<std::string>& columns, |
| int32_t num_buckets, int32_t seed) { |
| PartitionSchemaPB::HashBucketSchemaPB* bucket_schema = |
| data_->partition_schema_.add_hash_bucket_schemas(); |
| for (const string& col_name : columns) { |
| bucket_schema->add_columns()->set_name(col_name); |
| } |
| bucket_schema->set_num_buckets(num_buckets); |
| bucket_schema->set_seed(seed); |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::set_range_partition_columns( |
| const std::vector<std::string>& columns) { |
| PartitionSchemaPB::RangeSchemaPB* range_schema = |
| data_->partition_schema_.mutable_range_schema(); |
| range_schema->Clear(); |
| for (const string& col_name : columns) { |
| range_schema->add_columns()->set_name(col_name); |
| } |
| |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::add_range_split(KuduPartialRow* split_row) { |
| data_->range_splits_.emplace_back(split_row); |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::split_rows(const vector<const KuduPartialRow*>& rows) { |
| for (const KuduPartialRow* row : rows) { |
| data_->range_splits_.emplace_back(const_cast<KuduPartialRow*>(row)); |
| } |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::add_range_bound(KuduPartialRow* lower_bound, |
| KuduPartialRow* upper_bound) { |
| data_->range_bounds_.emplace_back(unique_ptr<KuduPartialRow>(lower_bound), |
| unique_ptr<KuduPartialRow>(upper_bound)); |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::num_replicas(int num_replicas) { |
| data_->num_replicas_ = num_replicas; |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::timeout(const MonoDelta& timeout) { |
| data_->timeout_ = timeout; |
| return *this; |
| } |
| |
| KuduTableCreator& KuduTableCreator::wait(bool wait) { |
| data_->wait_ = wait; |
| return *this; |
| } |
| |
| Status KuduTableCreator::Create() { |
| if (!data_->table_name_.length()) { |
| return Status::InvalidArgument("Missing table name"); |
| } |
| if (!data_->schema_) { |
| return Status::InvalidArgument("Missing schema"); |
| } |
| if (!data_->partition_schema_.has_range_schema() && |
| data_->partition_schema_.hash_bucket_schemas().empty()) { |
| return Status::InvalidArgument( |
| "Table partitioning must be specified using " |
| "add_hash_partitions or set_range_partition_columns"); |
| } |
| |
| // Build request. |
| CreateTableRequestPB req; |
| req.set_name(data_->table_name_); |
| if (data_->num_replicas_ >= 1) { |
| req.set_num_replicas(data_->num_replicas_); |
| } |
| RETURN_NOT_OK_PREPEND(SchemaToPB(*data_->schema_->schema_, req.mutable_schema()), |
| "Invalid schema"); |
| |
| RowOperationsPBEncoder encoder(req.mutable_split_rows_range_bounds()); |
| |
| for (const auto& row : data_->range_splits_) { |
| if (!row) { |
| return Status::InvalidArgument("range split row must not be null"); |
| } |
| encoder.Add(RowOperationsPB::SPLIT_ROW, *row); |
| } |
| |
| for (const auto& bound : data_->range_bounds_) { |
| if (!bound.first || !bound.second) { |
| return Status::InvalidArgument("range bounds must not be null"); |
| } |
| encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, *bound.first); |
| encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, *bound.second); |
| } |
| |
| req.mutable_partition_schema()->CopyFrom(data_->partition_schema_); |
| |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| if (data_->timeout_.Initialized()) { |
| deadline.AddDelta(data_->timeout_); |
| } else { |
| deadline.AddDelta(data_->client_->default_admin_operation_timeout()); |
| } |
| |
| RETURN_NOT_OK_PREPEND(data_->client_->data_->CreateTable(data_->client_, |
| req, |
| *data_->schema_, |
| deadline, |
| !data_->range_bounds_.empty()), |
| strings::Substitute("Error creating table $0 on the master", |
| data_->table_name_)); |
| |
| // Spin until the table is fully created, if requested. |
| if (data_->wait_) { |
| RETURN_NOT_OK(data_->client_->data_->WaitForCreateTableToFinish(data_->client_, |
| data_->table_name_, |
| deadline)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduTable |
| //////////////////////////////////////////////////////////// |
| |
| KuduTable::KuduTable(const shared_ptr<KuduClient>& client, |
| const string& name, |
| const string& table_id, |
| const KuduSchema& schema, |
| const PartitionSchema& partition_schema) |
| : data_(new KuduTable::Data(client, name, table_id, schema, partition_schema)) { |
| } |
| |
| KuduTable::~KuduTable() { |
| delete data_; |
| } |
| |
| const string& KuduTable::name() const { |
| return data_->name_; |
| } |
| |
| const string& KuduTable::id() const { |
| return data_->id_; |
| } |
| |
| const KuduSchema& KuduTable::schema() const { |
| return data_->schema_; |
| } |
| |
| KuduInsert* KuduTable::NewInsert() { |
| return new KuduInsert(shared_from_this()); |
| } |
| |
| KuduUpsert* KuduTable::NewUpsert() { |
| return new KuduUpsert(shared_from_this()); |
| } |
| |
| KuduUpdate* KuduTable::NewUpdate() { |
| return new KuduUpdate(shared_from_this()); |
| } |
| |
| KuduDelete* KuduTable::NewDelete() { |
| return new KuduDelete(shared_from_this()); |
| } |
| |
| KuduClient* KuduTable::client() const { |
| return data_->client_.get(); |
| } |
| |
| const PartitionSchema& KuduTable::partition_schema() const { |
| return data_->partition_schema_; |
| } |
| |
| KuduPredicate* KuduTable::NewComparisonPredicate(const Slice& col_name, |
| KuduPredicate::ComparisonOp op, |
| KuduValue* value) { |
| StringPiece name_sp(reinterpret_cast<const char*>(col_name.data()), col_name.size()); |
| const Schema* s = data_->schema_.schema_; |
| int col_idx = s->find_column(name_sp); |
| if (col_idx == Schema::kColumnNotFound) { |
| // Since this function doesn't return an error, instead we create a special |
| // predicate that just returns the errors when we add it to the scanner. |
| // |
| // This makes the API more "fluent". |
| delete value; // we always take ownership of 'value'. |
| return new KuduPredicate(new ErrorPredicateData( |
| Status::NotFound("column not found", col_name))); |
| } |
| |
| return new KuduPredicate(new ComparisonPredicateData(s->column(col_idx), op, value)); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // Error |
| //////////////////////////////////////////////////////////// |
| |
| const Status& KuduError::status() const { |
| return data_->status_; |
| } |
| |
| const KuduWriteOperation& KuduError::failed_op() const { |
| return *data_->failed_op_; |
| } |
| |
| KuduWriteOperation* KuduError::release_failed_op() { |
| CHECK_NOTNULL(data_->failed_op_.get()); |
| return data_->failed_op_.release(); |
| } |
| |
| bool KuduError::was_possibly_successful() const { |
| // TODO: implement me - right now be conservative. |
| return true; |
| } |
| |
| KuduError::KuduError(KuduWriteOperation* failed_op, |
| const Status& status) |
| : data_(new KuduError::Data(gscoped_ptr<KuduWriteOperation>(failed_op), |
| status)) { |
| } |
| |
| KuduError::~KuduError() { |
| delete data_; |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduSession |
| //////////////////////////////////////////////////////////// |
| |
| KuduSession::KuduSession(const shared_ptr<KuduClient>& client) |
| : data_(new KuduSession::Data(client)) { |
| } |
| |
| KuduSession::~KuduSession() { |
| WARN_NOT_OK(data_->Close(true), "Closed Session with pending operations."); |
| delete data_; |
| } |
| |
| Status KuduSession::Close() { |
| return data_->Close(false); |
| } |
| |
| Status KuduSession::SetFlushMode(FlushMode m) { |
| if (m == AUTO_FLUSH_BACKGROUND) { |
| return Status::NotSupported("AUTO_FLUSH_BACKGROUND has not been implemented in the" |
| " c++ client (see KUDU-456)."); |
| } |
| if (data_->batcher_->HasPendingOperations()) { |
| // TODO: there may be a more reasonable behavior here. |
| return Status::IllegalState("Cannot change flush mode when writes are buffered"); |
| } |
| if (!tight_enum_test<FlushMode>(m)) { |
| // Be paranoid in client code. |
| return Status::InvalidArgument("Bad flush mode"); |
| } |
| |
| data_->flush_mode_ = m; |
| return Status::OK(); |
| } |
| |
| Status KuduSession::SetExternalConsistencyMode(ExternalConsistencyMode m) { |
| if (data_->batcher_->HasPendingOperations()) { |
| // TODO: there may be a more reasonable behavior here. |
| return Status::IllegalState("Cannot change external consistency mode when writes are " |
| "buffered"); |
| } |
| if (!tight_enum_test<ExternalConsistencyMode>(m)) { |
| // Be paranoid in client code. |
| return Status::InvalidArgument("Bad external consistency mode"); |
| } |
| |
| data_->external_consistency_mode_ = m; |
| return Status::OK(); |
| } |
| |
| void KuduSession::SetTimeoutMillis(int millis) { |
| CHECK_GE(millis, 0); |
| data_->timeout_ms_ = millis; |
| data_->batcher_->SetTimeoutMillis(millis); |
| } |
| |
| Status KuduSession::Flush() { |
| Synchronizer s; |
| KuduStatusMemberCallback<Synchronizer> ksmcb(&s, &Synchronizer::StatusCB); |
| FlushAsync(&ksmcb); |
| return s.Wait(); |
| } |
| |
| void KuduSession::FlushAsync(KuduStatusCallback* user_callback) { |
| CHECK_NE(data_->flush_mode_, AUTO_FLUSH_BACKGROUND) << |
| "AUTO_FLUSH_BACKGROUND has not been implemented"; |
| |
| // Swap in a new batcher to start building the next batch. |
| // Save off the old batcher. |
| scoped_refptr<Batcher> old_batcher; |
| { |
| std::lock_guard<simple_spinlock> l(data_->lock_); |
| data_->NewBatcher(shared_from_this(), &old_batcher); |
| InsertOrDie(&data_->flushed_batchers_, old_batcher.get()); |
| } |
| |
| // Send off any buffered data. Important to do this outside of the lock |
| // since the callback may itself try to take the lock, in the case that |
| // the batch fails "inline" on the same thread. |
| old_batcher->FlushAsync(user_callback); |
| } |
| |
| bool KuduSession::HasPendingOperations() const { |
| std::lock_guard<simple_spinlock> l(data_->lock_); |
| if (data_->batcher_->HasPendingOperations()) { |
| return true; |
| } |
| for (Batcher* b : data_->flushed_batchers_) { |
| if (b->HasPendingOperations()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| Status KuduSession::Apply(KuduWriteOperation* write_op) { |
| if (!write_op->row().IsKeySet()) { |
| Status status = Status::IllegalState("Key not specified", write_op->ToString()); |
| data_->error_collector_->AddError(gscoped_ptr<KuduError>( |
| new KuduError(write_op, status))); |
| return status; |
| } |
| |
| Status s = data_->batcher_->Add(write_op); |
| if (!PREDICT_FALSE(s.ok())) { |
| data_->error_collector_->AddError(gscoped_ptr<KuduError>( |
| new KuduError(write_op, s))); |
| return s; |
| } |
| |
| if (data_->flush_mode_ == AUTO_FLUSH_SYNC) { |
| return Flush(); |
| } |
| |
| return Status::OK(); |
| } |
| |
| int KuduSession::CountBufferedOperations() const { |
| std::lock_guard<simple_spinlock> l(data_->lock_); |
| CHECK_EQ(data_->flush_mode_, MANUAL_FLUSH); |
| |
| return data_->batcher_->CountBufferedOperations(); |
| } |
| |
| int KuduSession::CountPendingErrors() const { |
| return data_->error_collector_->CountErrors(); |
| } |
| |
| void KuduSession::GetPendingErrors(vector<KuduError*>* errors, bool* overflowed) { |
| data_->error_collector_->GetErrors(errors, overflowed); |
| } |
| |
| KuduClient* KuduSession::client() const { |
| return data_->client_.get(); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduTableAlterer |
| //////////////////////////////////////////////////////////// |
| KuduTableAlterer::KuduTableAlterer(KuduClient* client, const string& name) |
| : data_(new Data(client, name)) { |
| } |
| |
| KuduTableAlterer::~KuduTableAlterer() { |
| delete data_; |
| } |
| |
| KuduTableAlterer* KuduTableAlterer::RenameTo(const string& new_name) { |
| data_->rename_to_ = new_name; |
| return this; |
| } |
| |
| KuduColumnSpec* KuduTableAlterer::AddColumn(const string& name) { |
| Data::Step s = { AlterTableRequestPB::ADD_COLUMN, |
| new KuduColumnSpec(name), nullptr, nullptr }; |
| data_->steps_.emplace_back(std::move(s)); |
| return s.spec; |
| } |
| |
| KuduColumnSpec* KuduTableAlterer::AlterColumn(const string& name) { |
| Data::Step s = { AlterTableRequestPB::ALTER_COLUMN, |
| new KuduColumnSpec(name), nullptr, nullptr }; |
| data_->steps_.emplace_back(std::move(s)); |
| return s.spec; |
| } |
| |
| KuduTableAlterer* KuduTableAlterer::DropColumn(const string& name) { |
| Data::Step s = { AlterTableRequestPB::DROP_COLUMN, |
| new KuduColumnSpec(name), nullptr, nullptr }; |
| data_->steps_.emplace_back(std::move(s)); |
| return this; |
| } |
| |
| KuduTableAlterer* KuduTableAlterer::AddRangePartition(KuduPartialRow* lower_bound, |
| KuduPartialRow* upper_bound) { |
| if (lower_bound == nullptr || upper_bound == nullptr) { |
| data_->status_ = Status::InvalidArgument("range partition bounds may not be null"); |
| } |
| if (!lower_bound->schema()->Equals(*upper_bound->schema())) { |
| data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas"); |
| } |
| if (data_->schema_ == nullptr) { |
| data_->schema_ = lower_bound->schema(); |
| } else if (!lower_bound->schema()->Equals(*data_->schema_)) { |
| data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas"); |
| } |
| |
| Data::Step s { AlterTableRequestPB::ADD_RANGE_PARTITION, |
| nullptr, |
| unique_ptr<KuduPartialRow>(lower_bound), |
| unique_ptr<KuduPartialRow>(upper_bound) }; |
| data_->steps_.emplace_back(std::move(s)); |
| data_->has_alter_partitioning_steps = true; |
| return this; |
| } |
| |
| KuduTableAlterer* KuduTableAlterer::DropRangePartition(KuduPartialRow* lower_bound, |
| KuduPartialRow* upper_bound) { |
| if (lower_bound == nullptr || upper_bound == nullptr) { |
| data_->status_ = Status::InvalidArgument("range partition bounds may not be null"); |
| } |
| if (!lower_bound->schema()->Equals(*upper_bound->schema())) { |
| data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas"); |
| } |
| if (data_->schema_ == nullptr) { |
| data_->schema_ = lower_bound->schema(); |
| } else if (!lower_bound->schema()->Equals(*data_->schema_)) { |
| data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas"); |
| } |
| |
| Data::Step s { AlterTableRequestPB::DROP_RANGE_PARTITION, |
| nullptr, |
| unique_ptr<KuduPartialRow>(lower_bound), |
| unique_ptr<KuduPartialRow>(upper_bound) }; |
| data_->steps_.emplace_back(std::move(s)); |
| data_->has_alter_partitioning_steps = true; |
| return this; |
| } |
| |
| KuduTableAlterer* KuduTableAlterer::timeout(const MonoDelta& timeout) { |
| data_->timeout_ = timeout; |
| return this; |
| } |
| |
| KuduTableAlterer* KuduTableAlterer::wait(bool wait) { |
| data_->wait_ = wait; |
| return this; |
| } |
| |
| Status KuduTableAlterer::Alter() { |
| AlterTableRequestPB req; |
| RETURN_NOT_OK(data_->ToRequest(&req)); |
| |
| MonoDelta timeout = data_->timeout_.Initialized() ? |
| data_->timeout_ : |
| data_->client_->default_admin_operation_timeout(); |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(timeout); |
| RETURN_NOT_OK(data_->client_->data_->AlterTable(data_->client_, req, deadline, |
| data_->has_alter_partitioning_steps)); |
| |
| if (data_->has_alter_partitioning_steps) { |
| // If the table partitions change, clear the local meta cache so that the |
| // new tablets can immediately be written to and scanned, and the old |
| // tablets won't be seen again. This also prevents rows being batched for |
| // the wrong tablet when a partition is dropped and added in the same alter |
| // table transaction. We could clear the meta cache for just the table being |
| // altered or just the partition key ranges being changed, but that would |
| // require opening the table in order to get the ID, schema, and partition |
| // schema. |
| // |
| // It is not necessary to wait for the alteration to be completed before |
| // clearing the cache (i.e. the tablets to be created), because the master |
| // has its soft state updated as part of handling the alter table RPC. When |
| // the meta cache looks up the new tablet locations during a subsequent |
| // write or scan, the master will return a ServiceUnavailable response if |
| // the new tablets are not yet running. The meta cache will automatically |
| // retry after a delay when it encounters this error. |
| data_->client_->data_->meta_cache_->ClearCache(); |
| } |
| |
| if (data_->wait_) { |
| string alter_name = data_->rename_to_.get_value_or(data_->table_name_); |
| RETURN_NOT_OK(data_->client_->data_->WaitForAlterTableToFinish( |
| data_->client_, alter_name, deadline)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduScanner |
| //////////////////////////////////////////////////////////// |
| |
| KuduScanner::KuduScanner(KuduTable* table) |
| : data_(new KuduScanner::Data(table)) { |
| } |
| |
| KuduScanner::~KuduScanner() { |
| Close(); |
| delete data_; |
| } |
| |
| Status KuduScanner::SetProjectedColumns(const vector<string>& col_names) { |
| return SetProjectedColumnNames(col_names); |
| } |
| |
| Status KuduScanner::SetProjectedColumnNames(const vector<string>& col_names) { |
| if (data_->open_) { |
| return Status::IllegalState("Projection must be set before Open()"); |
| } |
| return data_->mutable_configuration()->SetProjectedColumnNames(col_names); |
| } |
| |
| Status KuduScanner::SetProjectedColumnIndexes(const vector<int>& col_indexes) { |
| if (data_->open_) { |
| return Status::IllegalState("Projection must be set before Open()"); |
| } |
| return data_->mutable_configuration()->SetProjectedColumnIndexes(col_indexes); |
| } |
| |
| Status KuduScanner::SetBatchSizeBytes(uint32_t batch_size) { |
| return data_->mutable_configuration()->SetBatchSizeBytes(batch_size); |
| } |
| |
| Status KuduScanner::SetReadMode(ReadMode read_mode) { |
| if (data_->open_) { |
| return Status::IllegalState("Read mode must be set before Open()"); |
| } |
| if (!tight_enum_test<ReadMode>(read_mode)) { |
| return Status::InvalidArgument("Bad read mode"); |
| } |
| return data_->mutable_configuration()->SetReadMode(read_mode); |
| } |
| |
| Status KuduScanner::SetOrderMode(OrderMode order_mode) { |
| if (data_->open_) { |
| return Status::IllegalState("Order mode must be set before Open()"); |
| } |
| if (!tight_enum_test<OrderMode>(order_mode)) { |
| return Status::InvalidArgument("Bad order mode"); |
| } |
| return data_->mutable_configuration()->SetFaultTolerant(order_mode == ORDERED); |
| } |
| |
| Status KuduScanner::SetFaultTolerant() { |
| if (data_->open_) { |
| return Status::IllegalState("Fault-tolerance must be set before Open()"); |
| } |
| return data_->mutable_configuration()->SetFaultTolerant(true); |
| } |
| |
| Status KuduScanner::SetSnapshotMicros(uint64_t snapshot_timestamp_micros) { |
| if (data_->open_) { |
| return Status::IllegalState("Snapshot timestamp must be set before Open()"); |
| } |
| data_->mutable_configuration()->SetSnapshotMicros(snapshot_timestamp_micros); |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::SetSnapshotRaw(uint64_t snapshot_timestamp) { |
| if (data_->open_) { |
| return Status::IllegalState("Snapshot timestamp must be set before Open()"); |
| } |
| data_->mutable_configuration()->SetSnapshotRaw(snapshot_timestamp); |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::SetSelection(KuduClient::ReplicaSelection selection) { |
| if (data_->open_) { |
| return Status::IllegalState("Replica selection must be set before Open()"); |
| } |
| return data_->mutable_configuration()->SetSelection(selection); |
| } |
| |
| Status KuduScanner::SetTimeoutMillis(int millis) { |
| if (data_->open_) { |
| return Status::IllegalState("Timeout must be set before Open()"); |
| } |
| data_->mutable_configuration()->SetTimeoutMillis(millis); |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::AddConjunctPredicate(KuduPredicate* pred) { |
| if (data_->open_) { |
| // Take ownership even if we return a bad status. |
| delete pred; |
| return Status::IllegalState("Predicate must be set before Open()"); |
| } |
| return data_->mutable_configuration()->AddConjunctPredicate(pred); |
| } |
| |
| Status KuduScanner::AddLowerBound(const KuduPartialRow& key) { |
| return data_->mutable_configuration()->AddLowerBound(key); |
| } |
| |
| Status KuduScanner::AddLowerBoundRaw(const Slice& key) { |
| return data_->mutable_configuration()->AddLowerBoundRaw(key); |
| } |
| |
| Status KuduScanner::AddExclusiveUpperBound(const KuduPartialRow& key) { |
| return data_->mutable_configuration()->AddUpperBound(key); |
| } |
| |
| Status KuduScanner::AddExclusiveUpperBoundRaw(const Slice& key) { |
| return data_->mutable_configuration()->AddUpperBoundRaw(key); |
| } |
| |
| Status KuduScanner::AddLowerBoundPartitionKeyRaw(const Slice& partition_key) { |
| return data_->mutable_configuration()->AddLowerBoundPartitionKeyRaw(partition_key); |
| } |
| |
| Status KuduScanner::AddExclusiveUpperBoundPartitionKeyRaw(const Slice& partition_key) { |
| return data_->mutable_configuration()->AddUpperBoundPartitionKeyRaw(partition_key); |
| } |
| |
| Status KuduScanner::SetCacheBlocks(bool cache_blocks) { |
| if (data_->open_) { |
| return Status::IllegalState("Block caching must be set before Open()"); |
| } |
| return data_->mutable_configuration()->SetCacheBlocks(cache_blocks); |
| } |
| |
| KuduSchema KuduScanner::GetProjectionSchema() const { |
| return KuduSchema(*data_->configuration().projection()); |
| } |
| |
| const ResourceMetrics& KuduScanner::GetResourceMetrics() const { |
| return data_->resource_metrics_; |
| } |
| |
| namespace { |
| // Callback for the RPC sent by Close(). |
| // We can't use the KuduScanner response and RPC controller members for this |
| // call, because the scanner object may be destructed while the call is still |
| // being processed. |
| struct CloseCallback { |
| RpcController controller; |
| ScanResponsePB response; |
| string scanner_id; |
| void Callback() { |
| if (!controller.status().ok()) { |
| LOG(WARNING) << "Couldn't close scanner " << scanner_id << ": " |
| << controller.status().ToString(); |
| } |
| delete this; |
| } |
| }; |
| } // anonymous namespace |
| |
| string KuduScanner::ToString() const { |
| return strings::Substitute("$0: $1", |
| data_->table_->name(), |
| data_->configuration() |
| .spec() |
| .ToString(*data_->table_->schema().schema_)); |
| } |
| |
| Status KuduScanner::Open() { |
| CHECK(!data_->open_) << "Scanner already open"; |
| |
| data_->mutable_configuration()->OptimizeScanSpec(); |
| data_->partition_pruner_.Init(*data_->table_->schema().schema_, |
| data_->table_->partition_schema(), |
| data_->configuration().spec()); |
| |
| if (data_->configuration().spec().CanShortCircuit() || |
| !data_->partition_pruner_.HasMorePartitionKeyRanges()) { |
| VLOG(1) << "Short circuiting scan " << ToString(); |
| data_->open_ = true; |
| data_->short_circuit_ = true; |
| return Status::OK(); |
| } |
| |
| VLOG(1) << "Beginning scan " << ToString(); |
| |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(data_->configuration().timeout()); |
| set<string> blacklist; |
| |
| RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist)); |
| |
| data_->open_ = true; |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::KeepAlive() { |
| return data_->KeepAlive(); |
| } |
| |
| void KuduScanner::Close() { |
| if (!data_->open_) return; |
| |
| VLOG(1) << "Ending scan " << ToString(); |
| |
| // Close the scanner on the server-side, if necessary. |
| // |
| // If the scan did not match any rows, the tserver will not assign a scanner ID. |
| // This is reflected in the Open() response. In this case, there is no server-side state |
| // to clean up. |
| if (!data_->next_req_.scanner_id().empty()) { |
| CHECK(data_->proxy_); |
| gscoped_ptr<CloseCallback> closer(new CloseCallback); |
| closer->scanner_id = data_->next_req_.scanner_id(); |
| data_->PrepareRequest(KuduScanner::Data::CLOSE); |
| data_->next_req_.set_close_scanner(true); |
| closer->controller.set_timeout(data_->configuration().timeout()); |
| data_->proxy_->ScanAsync(data_->next_req_, &closer->response, &closer->controller, |
| boost::bind(&CloseCallback::Callback, closer.get())); |
| ignore_result(closer.release()); |
| } |
| data_->proxy_.reset(); |
| data_->open_ = false; |
| return; |
| } |
| |
| bool KuduScanner::HasMoreRows() const { |
| CHECK(data_->open_); |
| return !data_->short_circuit_ && // The scan is not short circuited |
| (data_->data_in_open_ || // more data in hand |
| data_->last_response_.has_more_results() || // more data in this tablet |
| data_->MoreTablets()); // more tablets to scan, possibly with more data |
| } |
| |
| Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) { |
| RETURN_NOT_OK(NextBatch(&data_->batch_for_old_api_)); |
| data_->batch_for_old_api_.data_->ExtractRows(rows); |
| return Status::OK(); |
| } |
| |
| Status KuduScanner::NextBatch(KuduScanBatch* batch) { |
| // TODO: do some double-buffering here -- when we return this batch |
| // we should already have fired off the RPC for the next batch, but |
| // need to do some swapping of the response objects around to avoid |
| // stomping on the memory the user is looking at. |
| CHECK(data_->open_); |
| CHECK(data_->proxy_); |
| |
| batch->data_->Clear(); |
| |
| if (data_->short_circuit_) { |
| return Status::OK(); |
| } |
| |
| if (data_->data_in_open_) { |
| // We have data from a previous scan. |
| VLOG(1) << "Extracting data from scan " << ToString(); |
| data_->data_in_open_ = false; |
| return batch->data_->Reset(&data_->controller_, |
| data_->configuration().projection(), |
| data_->configuration().client_projection(), |
| make_gscoped_ptr(data_->last_response_.release_data())); |
| } else if (data_->last_response_.has_more_results()) { |
| // More data is available in this tablet. |
| VLOG(1) << "Continuing scan " << ToString(); |
| |
| MonoTime batch_deadline = MonoTime::Now(MonoTime::FINE); |
| batch_deadline.AddDelta(data_->configuration().timeout()); |
| data_->PrepareRequest(KuduScanner::Data::CONTINUE); |
| |
| while (true) { |
| bool allow_time_for_failover = data_->configuration().is_fault_tolerant(); |
| ScanRpcStatus result = data_->SendScanRpc(batch_deadline, allow_time_for_failover); |
| |
| // Success case. |
| if (result.result == ScanRpcStatus::OK) { |
| if (data_->last_response_.has_last_primary_key()) { |
| data_->last_primary_key_ = data_->last_response_.last_primary_key(); |
| } |
| data_->scan_attempts_ = 0; |
| return batch->data_->Reset(&data_->controller_, |
| data_->configuration().projection(), |
| data_->configuration().client_projection(), |
| make_gscoped_ptr(data_->last_response_.release_data())); |
| } |
| |
| data_->scan_attempts_++; |
| |
| // Error handling. |
| LOG(WARNING) << "Scan at tablet server " << data_->ts_->ToString() << " of tablet " |
| << ToString() << " failed: " << result.status.ToString(); |
| |
| set<string> blacklist; |
| RETURN_NOT_OK(data_->HandleError(result, batch_deadline, &blacklist)); |
| |
| if (data_->configuration().is_fault_tolerant()) { |
| LOG(WARNING) << "Attempting to retry scan of tablet " << ToString() << " elsewhere."; |
| return data_->ReopenCurrentTablet(batch_deadline, &blacklist); |
| } |
| |
| if (blacklist.empty()) { |
| // If we didn't blacklist the current server, we can just retry again. |
| continue; |
| } |
| // If we blacklisted the current server, and it's not fault-tolerant, we can't |
| // retry anywhere, so just propagate the error. |
| return result.status; |
| } |
| } else if (data_->MoreTablets()) { |
| // More data may be available in other tablets. |
| // No need to close the current tablet; we scanned all the data so the |
| // server closed it for us. |
| VLOG(1) << "Scanning next tablet " << ToString(); |
| data_->last_primary_key_.clear(); |
| MonoTime deadline = MonoTime::Now(MonoTime::FINE); |
| deadline.AddDelta(data_->configuration().timeout()); |
| set<string> blacklist; |
| |
| RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist)); |
| // No rows written, the next invocation will pick them up. |
| return Status::OK(); |
| } else { |
| // No more data anywhere. |
| return Status::OK(); |
| } |
| } |
| |
| Status KuduScanner::GetCurrentServer(KuduTabletServer** server) { |
| CHECK(data_->open_); |
| internal::RemoteTabletServer* rts = data_->ts_; |
| CHECK(rts); |
| vector<HostPort> host_ports; |
| rts->GetHostPorts(&host_ports); |
| if (host_ports.empty()) { |
| return Status::IllegalState(strings::Substitute("No HostPort found for RemoteTabletServer $0", |
| rts->ToString())); |
| } |
| *server = new KuduTabletServer(); |
| (*server)->data_ = new KuduTabletServer::Data(rts->permanent_uuid(), |
| host_ports[0].host()); |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduScanToken |
| //////////////////////////////////////////////////////////// |
| |
| KuduScanToken::KuduScanToken(KuduScanToken::Data* data) |
| : data_(data) { |
| } |
| |
| KuduScanToken::~KuduScanToken() { |
| delete data_; |
| } |
| |
| Status KuduScanToken::IntoKuduScanner(KuduScanner** scanner) const { |
| return data_->IntoKuduScanner(scanner); |
| } |
| |
| const vector<KuduTabletServer*>& KuduScanToken::TabletServers() const { |
| return data_->TabletServers(); |
| } |
| |
| Status KuduScanToken::Serialize(string* buf) const { |
| return data_->Serialize(buf); |
| } |
| |
| Status KuduScanToken::DeserializeIntoScanner(KuduClient* client, |
| const string& serialized_token, |
| KuduScanner** scanner) { |
| return KuduScanToken::Data::DeserializeIntoScanner(client, serialized_token, scanner); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduScanTokenBuilder |
| //////////////////////////////////////////////////////////// |
| |
| KuduScanTokenBuilder::KuduScanTokenBuilder(KuduTable* table) |
| : data_(new KuduScanTokenBuilder::Data(table)) { |
| } |
| |
| KuduScanTokenBuilder::~KuduScanTokenBuilder() { |
| delete data_; |
| } |
| |
| Status KuduScanTokenBuilder::SetProjectedColumnNames(const vector<string>& col_names) { |
| return data_->mutable_configuration()->SetProjectedColumnNames(col_names); |
| } |
| |
| Status KuduScanTokenBuilder::SetProjectedColumnIndexes(const vector<int>& col_indexes) { |
| return data_->mutable_configuration()->SetProjectedColumnIndexes(col_indexes); |
| } |
| |
| Status KuduScanTokenBuilder::SetBatchSizeBytes(uint32_t batch_size) { |
| return data_->mutable_configuration()->SetBatchSizeBytes(batch_size); |
| } |
| |
| Status KuduScanTokenBuilder::SetReadMode(KuduScanner::ReadMode read_mode) { |
| if (!tight_enum_test<KuduScanner::ReadMode>(read_mode)) { |
| return Status::InvalidArgument("Bad read mode"); |
| } |
| return data_->mutable_configuration()->SetReadMode(read_mode); |
| } |
| |
| Status KuduScanTokenBuilder::SetFaultTolerant() { |
| return data_->mutable_configuration()->SetFaultTolerant(true); |
| } |
| |
| Status KuduScanTokenBuilder::SetSnapshotMicros(uint64_t snapshot_timestamp_micros) { |
| data_->mutable_configuration()->SetSnapshotMicros(snapshot_timestamp_micros); |
| return Status::OK(); |
| } |
| |
| Status KuduScanTokenBuilder::SetSnapshotRaw(uint64_t snapshot_timestamp) { |
| data_->mutable_configuration()->SetSnapshotRaw(snapshot_timestamp); |
| return Status::OK(); |
| } |
| |
| Status KuduScanTokenBuilder::SetSelection(KuduClient::ReplicaSelection selection) { |
| return data_->mutable_configuration()->SetSelection(selection); |
| } |
| |
| Status KuduScanTokenBuilder::SetTimeoutMillis(int millis) { |
| data_->mutable_configuration()->SetTimeoutMillis(millis); |
| return Status::OK(); |
| } |
| |
| Status KuduScanTokenBuilder::AddConjunctPredicate(KuduPredicate* pred) { |
| return data_->mutable_configuration()->AddConjunctPredicate(pred); |
| } |
| |
| Status KuduScanTokenBuilder::AddLowerBound(const KuduPartialRow& key) { |
| return data_->mutable_configuration()->AddLowerBound(key); |
| } |
| |
| Status KuduScanTokenBuilder::AddUpperBound(const KuduPartialRow& key) { |
| return data_->mutable_configuration()->AddUpperBound(key); |
| } |
| |
| Status KuduScanTokenBuilder::SetCacheBlocks(bool cache_blocks) { |
| return data_->mutable_configuration()->SetCacheBlocks(cache_blocks); |
| } |
| |
| Status KuduScanTokenBuilder::Build(vector<KuduScanToken*>* tokens) { |
| return data_->Build(tokens); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // KuduTabletServer |
| //////////////////////////////////////////////////////////// |
| |
| KuduTabletServer::KuduTabletServer() |
| : data_(nullptr) { |
| } |
| |
| KuduTabletServer::~KuduTabletServer() { |
| delete data_; |
| } |
| |
| const string& KuduTabletServer::uuid() const { |
| return data_->uuid_; |
| } |
| |
| const string& KuduTabletServer::hostname() const { |
| return data_->hostname_; |
| } |
| |
| } // namespace client |
| } // namespace kudu |