// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/client/client.h"

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <ostream>
#include <set>
#include <string>
#include <type_traits>
#include <vector>

#include <glog/logging.h>
#include <google/protobuf/stubs/common.h>

#include "kudu/client/callbacks.h"
#include "kudu/client/client-internal.h"
#include "kudu/client/client.pb.h"
#include "kudu/client/client_builder-internal.h"
#include "kudu/client/columnar_scan_batch.h"
#include "kudu/client/error-internal.h"
#include "kudu/client/error_collector.h"
#include "kudu/client/master_proxy_rpc.h"
#include "kudu/client/meta_cache.h"
#include "kudu/client/partitioner-internal.h"
#include "kudu/client/replica-internal.h"
#include "kudu/client/row_result.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_configuration.h"
#include "kudu/client/scan_predicate-internal.h"
#include "kudu/client/scan_token-internal.h"
#include "kudu/client/scanner-internal.h"
#include "kudu/client/schema-internal.h"
#include "kudu/client/session-internal.h"
#include "kudu/client/table-internal.h"
#include "kudu/client/table_alterer-internal.h"
#include "kudu/client/table_creator-internal.h"
#include "kudu/client/table_statistics-internal.h"
#include "kudu/client/tablet-internal.h"
#include "kudu/client/tablet_server-internal.h"
#include "kudu/client/transaction-internal.h"
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/partition_pruner.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/row_operations.pb.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/txn_id.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/request_tracker.h"
#include "kudu/rpc/rpc.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/sasl_common.h"
#include "kudu/rpc/user_credentials.h"
#include "kudu/security/cert.h"
#include "kudu/security/tls_context.h"
#include "kudu/security/token.pb.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
#include "kudu/util/async_util.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/init.h"
#include "kudu/util/logging.h"
#include "kudu/util/logging_callback.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/openssl_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/version_info.h"

using kudu::client::internal::AsyncLeaderMasterRpc;
using kudu::client::internal::MetaCache;
using kudu::client::sp::shared_ptr;
using kudu::consensus::RaftPeerPB;
using kudu::master::AlterTableRequestPB;
using kudu::master::AlterTableResponsePB;
using kudu::master::CreateTableRequestPB;
using kudu::master::CreateTableResponsePB;
using kudu::master::GetTableStatisticsRequestPB;
using kudu::master::GetTableStatisticsResponsePB;
using kudu::master::GetTabletLocationsRequestPB;
using kudu::master::GetTabletLocationsResponsePB;
using kudu::master::ListTabletServersRequestPB;
using kudu::master::ListTabletServersResponsePB;
using kudu::master::ListTabletServersResponsePB_Entry;
using kudu::master::MasterServiceProxy;
using kudu::master::TSInfoPB;
using kudu::master::TableIdentifierPB;
using kudu::master::TabletLocationsPB;
using kudu::rpc::BackoffType;
using kudu::rpc::Messenger;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::RpcController;
using kudu::rpc::UserCredentials;
using kudu::tserver::ScanResponsePB;
using std::map;
using std::make_optional;
using std::nullopt;
using std::optional;
using std::set;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;


MAKE_ENUM_LIMITS(kudu::client::KuduSession::FlushMode,
                 kudu::client::KuduSession::AUTO_FLUSH_SYNC,
                 kudu::client::KuduSession::MANUAL_FLUSH);

MAKE_ENUM_LIMITS(kudu::client::KuduSession::ExternalConsistencyMode,
                 kudu::client::KuduSession::CLIENT_PROPAGATED,
                 kudu::client::KuduSession::COMMIT_WAIT);

MAKE_ENUM_LIMITS(kudu::client::KuduScanner::ReadMode,
                 kudu::client::KuduScanner::READ_LATEST,
                 kudu::client::KuduScanner::READ_YOUR_WRITES);

MAKE_ENUM_LIMITS(kudu::client::KuduScanner::OrderMode,
                 kudu::client::KuduScanner::UNORDERED,
                 kudu::client::KuduScanner::ORDERED);

struct tm;

namespace kudu {

class BlockBloomFilter;
class simple_spinlock;

namespace client {

class ResourceMetrics;

const char* kVerboseEnvVar = "KUDU_CLIENT_VERBOSE";

#if defined(kudu_client_exported_EXPORTS)
static const char* kProgName = "kudu_client";

// We need to reroute all logging to stderr when the client library is
// loaded. GoogleOnceInit() can do that, but there are multiple entry
// points into the client code, and it'd need to be called in each one.
// So instead, let's use a constructor function.
//
// This is restricted to the exported client builds only. In case of linking
// with non-exported kudu client library, logging must be initialized
// from the main() function of the corresponding binary: usually, that's done
// by calling InitGoogleLoggingSafe(argv[0]).
__attribute__((constructor))
static void InitializeBasicLogging() {
  InitGoogleLoggingSafeBasic(kProgName);

  SetVerboseLevelFromEnvVar();
}
#endif

// Set Client logging verbose level from environment variable.
void SetVerboseLevelFromEnvVar() {
  int32_t level = 0; // this is the default logging level;
  const char* env_verbose_level = std::getenv(kVerboseEnvVar);
  if (env_verbose_level != nullptr) {
     if (safe_strto32(env_verbose_level, &level) && (level >= 0)) {
       SetVerboseLogLevel(level);
     } else {
       LOG(WARNING) << "Invalid verbose level from environment variable " << kVerboseEnvVar;
     }
  }
}

// Adapts between the internal LogSeverity and the client's KuduLogSeverity.
static void LoggingAdapterCB(KuduLoggingCallback* user_cb,
                             LogSeverity severity,
                             const char* filename,
                             int line_number,
                             const struct ::tm* time,
                             const char* message,
                             size_t message_len) {
  KuduLogSeverity client_severity;
  switch (severity) {
    case kudu::SEVERITY_INFO:
      client_severity = SEVERITY_INFO;
      break;
    case kudu::SEVERITY_WARNING:
      client_severity = SEVERITY_WARNING;
      break;
    case kudu::SEVERITY_ERROR:
      client_severity = SEVERITY_ERROR;
      break;
    case kudu::SEVERITY_FATAL:
      client_severity = SEVERITY_FATAL;
      break;
    default:
      LOG(FATAL) << "Unknown Kudu log severity: " << severity;
  }
  user_cb->Run(client_severity, filename, line_number, time,
               message, message_len);
}

void InstallLoggingCallback(KuduLoggingCallback* cb) {
  RegisterLoggingCallback(
      [=](LogSeverity severity, const char* filename, int line_number,
          const struct ::tm* time, const char* message, size_t message_len) {
        LoggingAdapterCB(cb, severity, filename, line_number, time, message, message_len);
      });
}

void UninstallLoggingCallback() {
  UnregisterLoggingCallback();
}

void SetVerboseLogLevel(int level) {
  FLAGS_v = level;
}

Status SetInternalSignalNumber(int signum) {
  return SetStackTraceSignal(signum);
}

Status DisableSaslInitialization() {
  return kudu::rpc::DisableSaslInitialization();
}

Status DisableOpenSSLInitialization() {
  return kudu::security::DisableOpenSSLInitialization();
}

string GetShortVersionString() {
  return VersionInfo::GetVersionInfo();
}

string GetAllVersionInfo() {
  return VersionInfo::GetAllVersionInfo();
}

KuduClientBuilder::KuduClientBuilder()
  : data_(new KuduClientBuilder::Data()) {
}

KuduClientBuilder::~KuduClientBuilder() {
  delete data_;
}

KuduClientBuilder& KuduClientBuilder::clear_master_server_addrs() {
  data_->master_server_addrs_.clear();
  return *this;
}

KuduClientBuilder& KuduClientBuilder::master_server_addrs(const vector<string>& addrs) {
  for (const string& addr : addrs) {
    data_->master_server_addrs_.push_back(addr);
  }
  return *this;
}

KuduClientBuilder& KuduClientBuilder::add_master_server_addr(const string& addr) {
  data_->master_server_addrs_.push_back(addr);
  return *this;
}

KuduClientBuilder& KuduClientBuilder::default_admin_operation_timeout(const MonoDelta& timeout) {
  data_->default_admin_operation_timeout_ = timeout;
  return *this;
}

KuduClientBuilder& KuduClientBuilder::default_rpc_timeout(const MonoDelta& timeout) {
  data_->default_rpc_timeout_ = timeout;
  return *this;
}

KuduClientBuilder& KuduClientBuilder::connection_negotiation_timeout(
    const MonoDelta& timeout) {
  data_->connection_negotiation_timeout_ = timeout;
  return *this;
}

KuduClientBuilder& KuduClientBuilder::import_authentication_credentials(string authn_creds) {
  data_->authn_creds_ = std::move(authn_creds);
  return *this;
}

KuduClientBuilder& KuduClientBuilder::num_reactors(int num_reactors) {
  data_->num_reactors_ = num_reactors;
  return *this;
}

KuduClientBuilder& KuduClientBuilder::sasl_protocol_name(const string& sasl_protocol_name) {
  data_->sasl_protocol_name_ = sasl_protocol_name;
  return *this;
}

KuduClientBuilder& KuduClientBuilder::encryption_policy(EncryptionPolicy encryption_policy) {
  data_->encryption_policy_ = encryption_policy;
  return *this;
}

KuduClientBuilder& KuduClientBuilder::require_authentication(bool require_authentication) {
  data_->require_authentication_ = require_authentication;
  return *this;
}

namespace {
Status ImportAuthnCreds(const string& authn_creds,
                        Messenger* messenger,
                        UserCredentials* user_credentials) {
  AuthenticationCredentialsPB pb;
  if (!pb.ParseFromString(authn_creds)) {
    return Status::InvalidArgument("invalid authentication data");
  }
  if (pb.has_authn_token()) {
    const auto& tok = pb.authn_token();
    if (!tok.has_token_data() ||
        !tok.has_signature() ||
        !tok.has_signing_key_seq_num()) {
      return Status::InvalidArgument("invalid authentication token");
    }
    messenger->set_authn_token(tok);
  }
  if (pb.has_real_user()) {
    user_credentials->set_real_user(pb.real_user());
  }
  for (const string& cert_der : pb.ca_cert_ders()) {
    security::Cert cert;
    RETURN_NOT_OK_PREPEND(cert.FromString(cert_der, security::DataFormat::DER),
                          "could not import CA cert");
    RETURN_NOT_OK_PREPEND(messenger->mutable_tls_context()->AddTrustedCertificate(cert),
                          "could not trust CA cert");
  }
  return Status::OK();
}
} // anonymous namespace

Status KuduClientBuilder::Build(shared_ptr<KuduClient>* client) {
  RETURN_NOT_OK(CheckCPUFlags());

  // Init messenger.
  MessengerBuilder builder("client");
  if (data_->connection_negotiation_timeout_.Initialized()) {
    builder.set_rpc_negotiation_timeout_ms(
        data_->connection_negotiation_timeout_.ToMilliseconds());
  }
  if (data_->num_reactors_) {
    builder.set_num_reactors(*data_->num_reactors_);
  }
  if (!data_->sasl_protocol_name_.empty()) {
    builder.set_sasl_proto_name(data_->sasl_protocol_name_);
  }
  if (data_->require_authentication_) {
    builder.set_rpc_authentication("required");
  }
  if (data_->encryption_policy_ != OPTIONAL) {
    builder.set_rpc_encryption("required");
    if (data_->encryption_policy_ == REQUIRED) {
      builder.set_rpc_loopback_encryption(true);
    }
  }
  std::shared_ptr<Messenger> messenger;
  RETURN_NOT_OK(builder.Build(&messenger));
  UserCredentials user_credentials;

  // Parse and import the provided authn data, if any.
  if (!data_->authn_creds_.empty()) {
    RETURN_NOT_OK(ImportAuthnCreds(data_->authn_creds_, messenger.get(), &user_credentials));
  }
  if (!user_credentials.has_real_user()) {
    // If there are no authentication credentials, then set the real user to the
    // currently logged-in user.
    RETURN_NOT_OK(user_credentials.SetLoggedInRealUser());
  }

  shared_ptr<KuduClient> c(new KuduClient);
  c->data_->messenger_ = std::move(messenger);
  c->data_->user_credentials_ = std::move(user_credentials);
  c->data_->master_server_addrs_ = data_->master_server_addrs_;
  c->data_->default_admin_operation_timeout_ = data_->default_admin_operation_timeout_;
  c->data_->default_rpc_timeout_ = data_->default_rpc_timeout_;

  // Let's allow for plenty of time for discovering the master the first
  // time around.
  MonoTime deadline = MonoTime::Now() + c->default_admin_operation_timeout();
  RETURN_NOT_OK_PREPEND(c->data_->ConnectToCluster(c.get(), deadline),
                        "Could not connect to the cluster");

  c->data_->meta_cache_.reset(new MetaCache(c.get(), data_->replica_visibility_));

  // Init local host names used for locality decisions.
  RETURN_NOT_OK_PREPEND(c->data_->InitLocalHostNames(),
                        "Could not determine local host names");

  c->data_->request_tracker_ = new rpc::RequestTracker(c->data_->client_id_);

  client->swap(c);
  return Status::OK();
}

KuduTransaction::SerializationOptions::SerializationOptions()
    : data_(new Data) {
}

KuduTransaction::SerializationOptions::~SerializationOptions() {
  delete data_;
}

bool KuduTransaction::SerializationOptions::keepalive() const {
  return data_->enable_keepalive_;
}

KuduTransaction::SerializationOptions&
KuduTransaction::SerializationOptions::enable_keepalive(bool enable) {
  data_->enable_keepalive_ = enable;
  return *this;
}

KuduTransaction::KuduTransaction(const sp::shared_ptr<KuduClient>& client)
    : data_(new KuduTransaction::Data(client)) {
}

KuduTransaction::~KuduTransaction() {
  delete data_;
}

Status KuduTransaction::CreateSession(sp::shared_ptr<KuduSession>* session) {
  return data_->CreateSession(session);
}

Status KuduTransaction::Commit() {
  return data_->Commit(KuduTransaction::Data::CommitMode::WAIT_FOR_COMPLETION);
}

Status KuduTransaction::StartCommit() {
  return data_->Commit(KuduTransaction::Data::CommitMode::START_ONLY);
}

Status KuduTransaction::IsCommitComplete(
    bool* is_complete, Status* completion_status) {
  return data_->IsCommitComplete(is_complete, completion_status);
}

Status KuduTransaction::Rollback() {
  return data_->Rollback();
}

Status KuduTransaction::Serialize(
    string* serialized_txn,
    const SerializationOptions& options) const {
  return data_->Serialize(serialized_txn, options);
}

Status KuduTransaction::Deserialize(const sp::shared_ptr<KuduClient>& client,
                                    const string& serialized_txn,
                                    sp::shared_ptr<KuduTransaction>* txn) {
  return Data::Deserialize(client, serialized_txn, txn);
}

KuduClient::KuduClient()
  : data_(new KuduClient::Data()) {
  static ObjectIdGenerator oid_generator;
  data_->client_id_ = oid_generator.Next();
}

KuduClient::~KuduClient() {
  delete data_;
}

KuduTableCreator* KuduClient::NewTableCreator() {
  return new KuduTableCreator(this);
}

Status KuduClient::IsCreateTableInProgress(const string& table_name,
                                           bool* create_in_progress) {
  MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
  TableIdentifierPB table;
  table.set_table_name(table_name);
  return data_->IsCreateTableInProgress(this, std::move(table), deadline,
                                        create_in_progress);
}

Status KuduClient::DeleteTable(const string& table_name) {
  return SoftDeleteTable(table_name);
}

Status KuduClient::SoftDeleteTable(const string& table_name,
                                   uint32_t reserve_seconds) {
  return DeleteTableInCatalogs(table_name, true, reserve_seconds);
}

Status KuduClient::DeleteTableInCatalogs(const string& table_name,
                                         bool modify_external_catalogs,
                                         uint32_t reserve_seconds) {
  MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
  return  KuduClient::Data::DeleteTable(this, table_name, deadline, modify_external_catalogs,
                                        reserve_seconds);
}

Status KuduClient::RecallTable(const string& table_id, const string& new_table_name) {
  MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
  return KuduClient::Data::RecallTable(this, table_id, deadline, new_table_name);
}

KuduTableAlterer* KuduClient::NewTableAlterer(const string& table_name) {
  return new KuduTableAlterer(this, table_name);
}

Status KuduClient::IsAlterTableInProgress(const string& table_name,
                                          bool* alter_in_progress) {
  MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
  TableIdentifierPB table;
  table.set_table_name(table_name);
  return data_->IsAlterTableInProgress(this, std::move(table), deadline,
                                       alter_in_progress);
}

Status KuduClient::GetTableSchema(const string& table_name,
                                  KuduSchema* schema) {
  MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
  TableIdentifierPB table;
  table.set_table_name(table_name);
  return data_->GetTableSchema(this,
                               deadline,
                               table,
                               schema,
                               nullptr, // partition schema
                               nullptr, // table id
                               nullptr, // table name
                               nullptr, // number of replicas
                               nullptr, // owner
                               nullptr, // comment
                               nullptr); // extra configs
}

Status KuduClient::ListTabletServers(vector<KuduTabletServer*>* tablet_servers) {
  ListTabletServersRequestPB req;
  ListTabletServersResponsePB resp;
  MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
  RETURN_NOT_OK(data_->ListTabletServers(this, deadline, req, &resp));
  for (int i = 0; i < resp.servers_size(); i++) {
    const ListTabletServersResponsePB_Entry& e = resp.servers(i);
    HostPort hp = HostPortFromPB(e.registration().rpc_addresses(0));
    unique_ptr<KuduTabletServer> ts(new KuduTabletServer);
    ts->data_ = new KuduTabletServer::Data(e.instance_id().permanent_uuid(), hp, e.location());
    tablet_servers->push_back(ts.release());
  }
  return Status::OK();
}

Status KuduClient::ListTables(vector<string>* tables, const string& filter) {
  vector<Data::TableInfo> tables_info;
  RETURN_NOT_OK(data_->ListTablesWithInfo(this, &tables_info, filter, false));
  tables->clear();
  tables->reserve(tables_info.size());
  for (auto& info : tables_info) {
    tables->emplace_back(std::move(info.table_name));
  }
  return Status::OK();
}

Status KuduClient::ListSoftDeletedTables(vector<string>* tables, const string& filter) {
  vector<Data::TableInfo> tables_info;
  RETURN_NOT_OK(data_->ListTablesWithInfo(this, &tables_info, filter,
      /*list_tablet_with_partition=*/ true, /*show_soft_deleted=*/ true));
  tables->clear();
  tables->reserve(tables_info.size());
  for (auto& info : tables_info) {
    tables->emplace_back(std::move(info.table_name));
  }
  return Status::OK();
}

Status KuduClient::TableExists(const string& table_name, bool* exists) {
  auto s = GetTableSchema(table_name, nullptr);
  if (s.ok()) {
    *exists = true;
  } else if (s.IsNotFound()) {
    *exists = false;
    s = Status::OK();
  }
  return s;
}

Status KuduClient::OpenTable(const string& table_name,
                             shared_ptr<KuduTable>* table) {
  TableIdentifierPB table_identifier;
  table_identifier.set_table_name(table_name);
  return data_->OpenTable(this,
                          table_identifier,
                          table);
}

shared_ptr<KuduSession> KuduClient::NewSession() {
  shared_ptr<KuduSession> ret(new KuduSession(shared_from_this()));
  ret->data_->Init(ret);
  return ret;
}

Status KuduClient::NewTransaction(sp::shared_ptr<KuduTransaction>* txn) {
  shared_ptr<KuduTransaction> ret(new KuduTransaction(shared_from_this()));
  const auto s = ret->data_->Begin(ret);
  if (s.ok()) {
    *txn = std::move(ret);
  }
  return s;
}

Status KuduClient::GetTablet(const string& tablet_id, KuduTablet** tablet) {
  GetTabletLocationsRequestPB req;
  GetTabletLocationsResponsePB resp;

  req.add_tablet_ids(tablet_id);
  req.set_intern_ts_infos_in_response(true);
  MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
  Synchronizer sync;
  AsyncLeaderMasterRpc<GetTabletLocationsRequestPB, GetTabletLocationsResponsePB> rpc(
      deadline, this, BackoffType::EXPONENTIAL, req, &resp,
      &MasterServiceProxy::GetTabletLocationsAsync, "GetTabletLocations",
      sync.AsStatusCallback(), {});
  rpc.SendRpc();
  RETURN_NOT_OK(sync.Wait());
  if (resp.has_error()) {
    return StatusFromPB(resp.error().status());
  }
  if (resp.tablet_locations_size() == 0) {
    return Status::NotFound(Substitute("$0: tablet not found", tablet_id));
  }
  if (resp.tablet_locations_size() != 1) {
    return Status::IllegalState(Substitute(
        "Expected only one tablet, but received $0",
        resp.tablet_locations_size()));
  }
  const TabletLocationsPB& t = resp.tablet_locations(0);

  vector<const KuduReplica*> replicas;
  ElementDeleter deleter(&replicas);

  auto add_replica_func = [](const TSInfoPB& ts_info,
                             const RaftPeerPB::Role role,
                             vector<const KuduReplica*>* replicas) {
    if (ts_info.rpc_addresses_size() == 0) {
      return Status::IllegalState(Substitute(
          "No RPC addresses found for tserver $0",
          ts_info.permanent_uuid()));
    }
    HostPort hp = HostPortFromPB(ts_info.rpc_addresses(0));
    unique_ptr<KuduTabletServer> ts(new KuduTabletServer);
    ts->data_ = new KuduTabletServer::Data(ts_info.permanent_uuid(), hp, ts_info.location());

    // TODO(aserbin): try to use member_type instead of role for metacache.
    bool is_leader = role == RaftPeerPB::LEADER;
    bool is_voter = is_leader || role == RaftPeerPB::FOLLOWER;
    unique_ptr<KuduReplica> replica(new KuduReplica);
    replica->data_ = new KuduReplica::Data(is_leader, is_voter, std::move(ts));

    replicas->push_back(replica.release());
    return Status::OK();
  };

  // Handle "old-style" non-interned replicas. It's used for backward compatibility.
  for (const auto& r : t.deprecated_replicas()) {
    RETURN_NOT_OK(add_replica_func(r.ts_info(), r.role(), &replicas));
  }
  // Handle interned replicas.
  for (const auto& r : t.interned_replicas()) {
    RETURN_NOT_OK(add_replica_func(resp.ts_infos(r.ts_info_idx()), r.role(), &replicas));
  }

  unique_ptr<KuduTablet> client_tablet(new KuduTablet);
  client_tablet->data_ = new KuduTablet::Data(tablet_id, std::move(replicas));
  replicas.clear();

  *tablet = client_tablet.release();
  return Status::OK();
}

Status KuduClient::GetTableStatistics(const string& table_name,
                                      KuduTableStatistics** statistics) {
  GetTableStatisticsRequestPB req;
  GetTableStatisticsResponsePB resp;
  TableIdentifierPB* table = req.mutable_table();
  table->set_table_name(table_name);
  MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
  Synchronizer sync;
  AsyncLeaderMasterRpc<GetTableStatisticsRequestPB, GetTableStatisticsResponsePB> rpc(
      deadline, this, BackoffType::EXPONENTIAL, req, &resp,
      &MasterServiceProxy::GetTableStatisticsAsync, "GetTableStatistics",
      sync.AsStatusCallback(), {});
  rpc.SendRpc();
  RETURN_NOT_OK(sync.Wait());
  if (resp.has_error()) {
    return StatusFromPB(resp.error().status());
  }
  unique_ptr<KuduTableStatistics> table_statistics(new KuduTableStatistics);
  table_statistics->data_ = new KuduTableStatistics::Data(
      resp.has_on_disk_size() ? optional<int64_t>(resp.on_disk_size()) : nullopt,
      resp.has_live_row_count() ? optional<int64_t>(resp.live_row_count()) : nullopt,
      resp.has_disk_size_limit() ? optional<int64_t>(resp.disk_size_limit()) : nullopt,
      resp.has_row_count_limit() ? optional<int64_t>(resp.row_count_limit()) : nullopt);

  *statistics = table_statistics.release();
  return Status::OK();
}

string KuduClient::GetMasterAddresses() const {
  return HostPort::ToCommaSeparatedString(data_->master_hostports());
}

bool KuduClient::IsMultiMaster() const {
  return data_->master_server_addrs_.size() > 1;
}

const MonoDelta& KuduClient::default_admin_operation_timeout() const {
  return data_->default_admin_operation_timeout_;
}

const MonoDelta& KuduClient::default_rpc_timeout() const {
  return data_->default_rpc_timeout_;
}

MonoDelta KuduClient::connection_negotiation_timeout() const {
  DCHECK(data_->messenger_);
  return MonoDelta::FromMilliseconds(
      data_->messenger_->rpc_negotiation_timeout_ms());
}

const uint64_t KuduClient::kNoTimestamp = 0;

uint64_t KuduClient::GetLatestObservedTimestamp() const {
  return data_->GetLatestObservedTimestamp();
}

void KuduClient::SetLatestObservedTimestamp(uint64_t ht_timestamp) {
  data_->UpdateLatestObservedTimestamp(ht_timestamp);
}

Status KuduClient::ExportAuthenticationCredentials(string* authn_creds) const {
  AuthenticationCredentialsPB pb;

  if (auto tok = data_->messenger_->authn_token(); tok) {
    pb.mutable_authn_token()->CopyFrom(*tok);
  }
  pb.set_real_user(data_->user_credentials_.real_user());

  vector<string> cert_ders;
  RETURN_NOT_OK_PREPEND(data_->messenger_->tls_context().DumpTrustedCerts(&cert_ders),
                        "could not export trusted certs");
  for (auto& der : cert_ders) {
    pb.add_ca_cert_ders()->assign(std::move(der));
  }

  if (!pb.SerializeToString(authn_creds)) {
    return Status::RuntimeError("could not serialize authentication data");
  }

  return Status::OK();
}

string KuduClient::GetHiveMetastoreUris() const {
  std::lock_guard<simple_spinlock> l(data_->leader_master_lock_);
  return data_->hive_metastore_uris_;
}

bool KuduClient::GetHiveMetastoreSaslEnabled() const {
  std::lock_guard<simple_spinlock> l(data_->leader_master_lock_);
  return data_->hive_metastore_sasl_enabled_;
}

string KuduClient::GetHiveMetastoreUuid() const {
  std::lock_guard<simple_spinlock> l(data_->leader_master_lock_);
  return data_->hive_metastore_uuid_;
}

string KuduClient::location() const {
  return data_->location();
}

string KuduClient::cluster_id() const {
  return data_->cluster_id();
}

////////////////////////////////////////////////////////////
// KuduTableCreator
////////////////////////////////////////////////////////////

KuduTableCreator::KuduTableCreator(KuduClient* client)
  : data_(new KuduTableCreator::Data(client)) {
}

KuduTableCreator::~KuduTableCreator() {
  delete data_;
}

KuduTableCreator& KuduTableCreator::table_name(const string& name) {
  data_->table_name_ = name;
  return *this;
}

KuduTableCreator& KuduTableCreator::schema(const KuduSchema* schema) {
  data_->schema_ = schema;
  return *this;
}

KuduTableCreator& KuduTableCreator::add_hash_partitions(const vector<string>& columns,
                                                        int32_t num_buckets) {
  return add_hash_partitions(columns, num_buckets, 0);
}

KuduTableCreator& KuduTableCreator::add_hash_partitions(const vector<string>& columns,
                                                        int32_t num_buckets,
                                                        int32_t seed) {
  auto* hash_dimension = data_->partition_schema_.add_hash_schema();
  for (const string& col_name : columns) {
    hash_dimension->add_columns()->set_name(col_name);
  }
  hash_dimension->set_num_buckets(num_buckets);
  hash_dimension->set_seed(seed);
  return *this;
}

KuduTableCreator& KuduTableCreator::set_range_partition_columns(const vector<string>& columns) {
  PartitionSchemaPB::RangeSchemaPB* range_schema =
    data_->partition_schema_.mutable_range_schema();
  range_schema->Clear();
  for (const string& col_name : columns) {
    range_schema->add_columns()->set_name(col_name);
  }

  return *this;
}

KuduTableCreator& KuduTableCreator::add_range_partition_split(KuduPartialRow* split_row) {
  data_->range_partition_splits_.emplace_back(split_row);
  return *this;
}

KuduTableCreator& KuduTableCreator::set_owner(const string& owner) {
  data_->owner_ = owner;
  return *this;
}

KuduTableCreator& KuduTableCreator::set_comment(const string& comment) {
  data_->comment_ = comment;
  return *this;
}

KuduTableCreator& KuduTableCreator::split_rows(const vector<const KuduPartialRow*>& rows) {
  for (const KuduPartialRow* row : rows) {
    data_->range_partition_splits_.emplace_back(const_cast<KuduPartialRow*>(row));
  }
  return *this;
}

KuduTableCreator& KuduTableCreator::add_range_partition(
    KuduPartialRow* lower_bound,
    KuduPartialRow* upper_bound,
    RangePartitionBound lower_bound_type,
    RangePartitionBound upper_bound_type) {
  unique_ptr<KuduRangePartition> range_partition(new KuduRangePartition(
      lower_bound, upper_bound, lower_bound_type, upper_bound_type));
  // Using KuduTableCreator::add_range_partition() assumes the range partition
  // uses the table-wide schema.
  range_partition->data_->is_table_wide_hash_schema_ = true;
  data_->range_partitions_.emplace_back(std::move(range_partition));
  return *this;
}

KuduTableCreator& KuduTableCreator::add_custom_range_partition(
    KuduRangePartition* partition) {
  CHECK(partition);
  data_->range_partitions_.emplace_back(partition);
  return *this;
}

KuduTableCreator& KuduTableCreator::num_replicas(int num_replicas) {
  data_->num_replicas_ = num_replicas;
  return *this;
}

KuduTableCreator& KuduTableCreator::dimension_label(const std::string& dimension_label) {
  data_->dimension_label_ = dimension_label;
  return *this;
}

KuduTableCreator& KuduTableCreator::extra_configs(const map<string, string>& extra_configs) {
  data_->extra_configs_ = extra_configs;
  return *this;
}

KuduTableCreator& KuduTableCreator::timeout(const MonoDelta& timeout) {
  data_->timeout_ = timeout;
  return *this;
}

KuduTableCreator& KuduTableCreator::wait(bool wait) {
  data_->wait_ = wait;
  return *this;
}

Status KuduTableCreator::Create() {
  if (!data_->table_name_.length()) {
    return Status::InvalidArgument("Missing table name");
  }
  if (!data_->schema_) {
    return Status::InvalidArgument("Missing schema");
  }
  if (!data_->partition_schema_.has_range_schema() &&
      data_->partition_schema_.hash_schema().empty()) {
    return Status::InvalidArgument(
        "Table partitioning must be specified using "
        "add_hash_partitions or set_range_partition_columns");
  }

  // Build request.
  CreateTableRequestPB req;
  req.set_name(data_->table_name_);
  if (data_->num_replicas_) {
    req.set_num_replicas(*data_->num_replicas_);
  }
  if (data_->dimension_label_) {
    req.set_dimension_label(*data_->dimension_label_);
  }
  if (data_->extra_configs_) {
    req.mutable_extra_configs()->insert(data_->extra_configs_->begin(),
                                        data_->extra_configs_->end());
  }
  if (data_->owner_) {
    req.set_owner(*data_->owner_);
  }
  if (data_->comment_) {
    req.set_comment(*data_->comment_);
  }
  RETURN_NOT_OK_PREPEND(SchemaToPB(*data_->schema_->schema_, req.mutable_schema(),
                                   SCHEMA_PB_WITHOUT_WRITE_DEFAULT),
                        "Invalid schema");

  bool has_range_splits = false;
  RowOperationsPBEncoder splits_encoder(req.mutable_split_rows_range_bounds());
  for (const auto& row : data_->range_partition_splits_) {
    if (!row) {
      return Status::InvalidArgument("range split row must not be null");
    }
    splits_encoder.Add(RowOperationsPB::SPLIT_ROW, *row);
    has_range_splits = true;
  }

  bool has_range_with_custom_hash_schema = false;
  for (const auto& p : data_->range_partitions_) {
    if (!p->data_->is_table_wide_hash_schema_) {
      has_range_with_custom_hash_schema = true;
      break;
    }
  }

  if (has_range_splits && has_range_with_custom_hash_schema) {
    // For simplicity, don't allow having both range splits (deprecated) and
    // custom hash bucket schemas per range partition.
    return Status::InvalidArgument(
        "split rows and custom hash bucket schemas for ranges are incompatible: "
        "choose one or the other");
  }

  auto* partition_schema = req.mutable_partition_schema();
  partition_schema->CopyFrom(data_->partition_schema_);

  for (const auto& p : data_->range_partitions_) {
    const auto* range = p->data_;
    if (!range->lower_bound_ || !range->upper_bound_) {
      return Status::InvalidArgument("range bounds must not be null");
    }

    const RowOperationsPB_Type lower_bound_type =
        range->lower_bound_type_ == KuduTableCreator::INCLUSIVE_BOUND
        ? RowOperationsPB::RANGE_LOWER_BOUND
        : RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND;

    const RowOperationsPB_Type upper_bound_type =
        range->upper_bound_type_ == KuduTableCreator::EXCLUSIVE_BOUND
        ? RowOperationsPB::RANGE_UPPER_BOUND
        : RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND;

    if (!has_range_with_custom_hash_schema) {
      splits_encoder.Add(lower_bound_type, *range->lower_bound_);
      splits_encoder.Add(upper_bound_type, *range->upper_bound_);
    } else {
      auto* range_pb = partition_schema->add_custom_hash_schema_ranges();
      RowOperationsPBEncoder encoder(range_pb->mutable_range_bounds());
      encoder.Add(lower_bound_type, *range->lower_bound_);
      encoder.Add(upper_bound_type, *range->upper_bound_);
      // Now, after adding the information range bounds, add the information
      // on hash schema for the range.
      if (range->is_table_wide_hash_schema_) {
        // With the presence of a range with custom hash schema when the
        // table-wide hash schema is used for this particular range, also add an
        // element into PartitionSchemaPB::custom_hash_schema_ranges to satisfy
        // the convention used by the backend.
        range_pb->mutable_hash_schema()->CopyFrom(
            data_->partition_schema_.hash_schema());
      } else {
        // In case of per-range custom hash bucket schema, add corresponding
        // element into PartitionSchemaPB::custom_hash_schema_ranges.
        for (const auto& hash_dimension : range->hash_schema_) {
          auto* hash_dimension_pb = range_pb->add_hash_schema();
          hash_dimension_pb->set_seed(hash_dimension.seed);
          hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
          for (const auto& column_name : hash_dimension.column_names) {
            hash_dimension_pb->add_columns()->set_name(column_name);
          }
        }
      }
    }
  }

  bool has_immutable_column_schema = false;
  for (size_t i = 0; i < data_->schema_->num_columns(); i++) {
    const auto& col_schema = data_->schema_->Column(i);
    if (col_schema.is_immutable()) {
      has_immutable_column_schema = true;
      break;
    }
  }

  if (data_->table_type_) {
    req.set_table_type(*data_->table_type_);
  }

  MonoTime deadline = MonoTime::Now();
  if (data_->timeout_.Initialized()) {
    deadline += data_->timeout_;
  } else {
    deadline += data_->client_->default_admin_operation_timeout();
  }

  CreateTableResponsePB resp;
  RETURN_NOT_OK_PREPEND(
      data_->client_->data_->CreateTable(data_->client_,
                                         req,
                                         &resp,
                                         deadline,
                                         !data_->range_partitions_.empty(),
                                         has_range_with_custom_hash_schema,
                                         has_immutable_column_schema),
      Substitute("Error creating table $0 on the master", data_->table_name_));
  // Spin until the table is fully created, if requested.
  if (data_->wait_) {
    TableIdentifierPB table;
    table.set_table_id(resp.table_id());
    RETURN_NOT_OK(data_->client_->data_->WaitForCreateTableToFinish(
        data_->client_, table, deadline));
  }

  return Status::OK();
}

KuduRangePartition::KuduRangePartition(
    KuduPartialRow* lower_bound,
    KuduPartialRow* upper_bound,
    KuduTableCreator::RangePartitionBound lower_bound_type,
    KuduTableCreator::RangePartitionBound upper_bound_type)
    : data_(new Data(lower_bound, upper_bound, lower_bound_type, upper_bound_type)) {
}

KuduRangePartition::~KuduRangePartition() {
  delete data_;
}

Status KuduRangePartition::add_hash_partitions(
    const vector<string>& columns,
    int32_t num_buckets,
    int32_t seed) {
  if (seed < 0) {
    // int32_t, not uint32_t for seed is used to be "compatible" with the type
    // of the 'seed' parameter for KuduTableCreator::add_hash_partitions().
    return Status::InvalidArgument("hash seed must be non-negative");
  }
  return data_->add_hash_partitions(columns, num_buckets, seed);
}

////////////////////////////////////////////////////////////
// KuduTableStatistics
////////////////////////////////////////////////////////////

KuduTableStatistics::KuduTableStatistics() : data_(nullptr) {
}

KuduTableStatistics::~KuduTableStatistics() {
  delete data_;
}

int64_t KuduTableStatistics::on_disk_size() const {
  return data_->on_disk_size_ ? *data_->on_disk_size_ : -1;
}

int64_t KuduTableStatistics::live_row_count() const {
  return data_->live_row_count_ ? *data_->live_row_count_ : -1;
}

int64_t KuduTableStatistics::on_disk_size_limit() const {
  return data_->on_disk_size_limit_ ? *data_->on_disk_size_limit_ : -1;
}

int64_t KuduTableStatistics::live_row_count_limit() const {
  return data_->live_row_count_limit_ ? *data_->live_row_count_limit_ : -1;
}

std::string KuduTableStatistics::ToString() const {
  return data_->ToString();
}

////////////////////////////////////////////////////////////
// KuduTable
////////////////////////////////////////////////////////////

KuduTable::KuduTable(const shared_ptr<KuduClient>& client,
                     const string& name,
                     const string& id,
                     int num_replicas,
                     const string& owner,
                     const string& comment,
                     const KuduSchema& schema,
                     const PartitionSchema& partition_schema,
                     const map<string, string>& extra_configs)
  : data_(new KuduTable::Data(client, name, id, num_replicas, owner, comment,
                              schema, partition_schema, extra_configs)) {
}

KuduTable::~KuduTable() {
  delete data_;
}

const string& KuduTable::name() const {
  return data_->name_;
}

const string& KuduTable::id() const {
  return data_->id_;
}

const KuduSchema& KuduTable::schema() const {
  return data_->schema_;
}

const string& KuduTable::comment() const {
  return data_->comment_;
}

int KuduTable::num_replicas() const {
  return data_->num_replicas_;
}

const string& KuduTable::owner() const {
  return data_->owner_;
}

KuduInsert* KuduTable::NewInsert() {
  return new KuduInsert(shared_from_this());
}

KuduInsertIgnore* KuduTable::NewInsertIgnore() {
  return new KuduInsertIgnore(shared_from_this());
}

KuduUpsert* KuduTable::NewUpsert() {
  return new KuduUpsert(shared_from_this());
}

KuduUpsertIgnore* KuduTable::NewUpsertIgnore() {
  return new KuduUpsertIgnore(shared_from_this());
}

KuduUpdate* KuduTable::NewUpdate() {
  return new KuduUpdate(shared_from_this());
}

KuduUpdateIgnore* KuduTable::NewUpdateIgnore() {
  return new KuduUpdateIgnore(shared_from_this());
}

KuduDelete* KuduTable::NewDelete() {
  return new KuduDelete(shared_from_this());
}

KuduDeleteIgnore* KuduTable::NewDeleteIgnore() {
  return new KuduDeleteIgnore(shared_from_this());
}

KuduClient* KuduTable::client() const {
  return data_->client_.get();
}

const PartitionSchema& KuduTable::partition_schema() const {
  return data_->partition_schema_;
}

const map<string, string>& KuduTable::extra_configs() const {
  return data_->extra_configs_;
}

KuduPredicate* KuduTable::NewComparisonPredicate(const Slice& col_name,
                                                 KuduPredicate::ComparisonOp op,
                                                 KuduValue* value) {
  // We always take ownership of value; this ensures cleanup if the predicate is invalid.
  auto cleanup = MakeScopedCleanup([&]() {
    delete value;
  });
  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
    // Ownership of value is passed to the valid returned predicate.
    cleanup.cancel();
    return new KuduPredicate(new ComparisonPredicateData(col_schema, op, value));
  });
}

KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
                                                    vector<KuduBloomFilter*>* bloom_filters) {
  // We always take ownership of values; this ensures cleanup if the predicate is invalid.
  auto cleanup = MakeScopedCleanup([&]() {
    STLDeleteElements(bloom_filters);
  });

  // Empty vector of bloom filters will select all rows. Hence disallowed.
  if (bloom_filters->empty()) {
    return new KuduPredicate(
        new ErrorPredicateData(Status::InvalidArgument("No Bloom filters supplied")));
  }

  // Transfer the Bloom filter raw ptrs over to vector of unique ptrs.
  // There is a possibility of emplace_back() throwing exception, so in such a case the
  // transferred Bloom filters in unique_ptrs will be cleaned-up on exiting scope
  // automatically and the non-nullptr Bloom filters in input "bloom_filters" vector
  // will be cleaned up by the explicit scoped "cleanup".
  vector<unique_ptr<KuduBloomFilter>> bloom_filters_owned;
  bloom_filters_owned.reserve(bloom_filters->size());
  for (auto& bf : *bloom_filters) {
    bloom_filters_owned.emplace_back(bf);
    bf = nullptr;
  }

  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
    // At this point we could cancel the scoped "cleanup". But the scoped cleanup
    // not only deletes pointers contained in the vector but also clears the vector
    // and we want the vector be cleared as expected by the caller.
    return new KuduPredicate(
        new InBloomFilterPredicateData(col_schema, std::move(bloom_filters_owned)));
  });
}

KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
                                                    const vector<Slice>& bloom_filters) {
  // Empty vector of bloom filters will select all rows. Hence disallowed.
  if (bloom_filters.empty()) {
    return new KuduPredicate(
        new ErrorPredicateData(Status::InvalidArgument("No Bloom filters supplied")));
  }

  // Extract the Block Bloom filters.
  vector<DirectBlockBloomFilterUniqPtr> bbf_vec;
  for (const auto& bf_slice : bloom_filters) {
    auto* bbf =
        reinterpret_cast<BlockBloomFilter*>(const_cast<uint8_t*>(bf_slice.data()));
    // In this case, the Block Bloom filters are supplied as opaque pointers
    // and the predicate will convert them to well-defined pointer types
    // but will NOT take ownership of those pointers. Hence a custom deleter,
    // DirectBloomFilterDataDeleter, is used that gives control over ownership.
    DirectBlockBloomFilterUniqPtr bf_uniq_ptr(
        bbf, DirectBloomFilterDataDeleter<BlockBloomFilter>(false /*owned*/));
    bbf_vec.emplace_back(std::move(bf_uniq_ptr));
  }

  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
    return new KuduPredicate(
        new InDirectBloomFilterPredicateData(col_schema, std::move(bbf_vec)));
  });
}

KuduPredicate* KuduTable::NewInListPredicate(const Slice& col_name,
                                             vector<KuduValue*>* values) {
  // We always take ownership of values; this ensures cleanup if the predicate is invalid.
  auto cleanup = MakeScopedCleanup([&]() {
    STLDeleteElements(values);
  });
  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
    // Ownership of values is passed to the valid returned predicate.
    cleanup.cancel();
    return new KuduPredicate(new InListPredicateData(col_schema, values));
  });
}

KuduPredicate* KuduTable::NewIsNotNullPredicate(const Slice& col_name) {
  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
    return new KuduPredicate(new IsNotNullPredicateData(col_schema));
  });
}

KuduPredicate* KuduTable::NewIsNullPredicate(const Slice& col_name) {
  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
    return new KuduPredicate(new IsNullPredicateData(col_schema));
  });
}

// The strategy for retrieving the partitions from the metacache is adapted
// from KuduScanTokenBuilder::Data::Build.
Status KuduTable::ListPartitions(vector<Partition>* partitions) {
  DCHECK(partitions);
  partitions->clear();
  auto& client = data_->client_;
  const auto deadline = MonoTime::Now() + client->default_admin_operation_timeout();
  PartitionPruner pruner;
  pruner.Init(*data_->schema_.schema_, data_->partition_schema_, ScanSpec());
  while (pruner.HasMorePartitionKeyRanges()) {
    scoped_refptr<client::internal::RemoteTablet> tablet;
    Synchronizer sync;
    const auto& partition_key = pruner.NextPartitionKey();
    client->data_->meta_cache_->LookupTabletByKey(
        this,
        partition_key,
        deadline,
        client::internal::MetaCache::LookupType::kLowerBound,
        &tablet,
        sync.AsStatusCallback());
    Status s = sync.Wait();
    if (s.IsNotFound()) {
      // No more tablets.
      break;
    }
    RETURN_NOT_OK(s);

    partitions->emplace_back(tablet->partition());
    pruner.RemovePartitionKeyRange(tablet->partition().end());
  }
  return Status::OK();
}

////////////////////////////////////////////////////////////
// Error
////////////////////////////////////////////////////////////

const Status& KuduError::status() const {
  return data_->status_;
}

const KuduWriteOperation& KuduError::failed_op() const {
  return *data_->failed_op_;
}

KuduWriteOperation* KuduError::release_failed_op() {
  CHECK_NOTNULL(data_->failed_op_.get());
  return data_->failed_op_.release();
}

bool KuduError::was_possibly_successful() const {
  // TODO: implement me - right now be conservative.
  return true;
}

KuduError::KuduError(KuduWriteOperation* failed_op,
                     const Status& status)
  : data_(new KuduError::Data(unique_ptr<KuduWriteOperation>(failed_op),
                              status)) {
}

KuduError::~KuduError() {
  delete data_;
}

////////////////////////////////////////////////////////////
// KuduSession
////////////////////////////////////////////////////////////

KuduSession::KuduSession(const shared_ptr<KuduClient>& client)
    : data_(new KuduSession::Data(client, client->data_->messenger_)) {
}

KuduSession::KuduSession(const shared_ptr<KuduClient>& client, const TxnId& txn_id)
    : data_(new KuduSession::Data(client, client->data_->messenger_, txn_id)) {
}

KuduSession::~KuduSession() {
  WARN_NOT_OK(data_->Close(true), "Closed Session with pending operations.");
  delete data_;
}

Status KuduSession::SetFlushMode(FlushMode m) {
  if (!tight_enum_test<FlushMode>(m)) {
    // Be paranoid in client code.
    return Status::InvalidArgument("Bad flush mode");
  }
  return data_->SetFlushMode(m);
}

Status KuduSession::SetExternalConsistencyMode(ExternalConsistencyMode m) {
  if (!tight_enum_test<ExternalConsistencyMode>(m)) {
    // Be paranoid in client code.
    return Status::InvalidArgument("Bad external consistency mode");
  }
  return data_->SetExternalConsistencyMode(m);
}

Status KuduSession::SetMutationBufferSpace(size_t size) {
  return data_->SetBufferBytesLimit(size);
}

Status KuduSession::SetMutationBufferFlushWatermark(double watermark_pct) {
  return data_->SetBufferFlushWatermark(
      static_cast<int32_t>(100.0 * watermark_pct));
}

Status KuduSession::SetMutationBufferFlushInterval(unsigned int millis) {
  return data_->SetBufferFlushInterval(millis);
}

Status KuduSession::SetMutationBufferMaxNum(unsigned int max_num) {
  return data_->SetMaxBatchersNum(max_num);
}

void KuduSession::SetTimeoutMillis(int timeout_ms) {
  data_->SetTimeoutMillis(timeout_ms);
}

Status KuduSession::Apply(KuduWriteOperation* write_op) {
  RETURN_NOT_OK(data_->ApplyWriteOp(write_op));
  // Thread-safety note: this method should not be called concurrently
  // with other methods which modify the KuduSession::Data members, so it
  // should be safe to read KuduSession::Data members without protection.
  if (data_->flush_mode_ == AUTO_FLUSH_SYNC) {
    RETURN_NOT_OK(data_->Flush());
  }
  return Status::OK();
}

Status KuduSession::Flush() {
  return data_->Flush();
}

void KuduSession::FlushAsync(KuduStatusCallback* user_callback) {
  data_->FlushAsync(user_callback);
}

Status KuduSession::Close() {
  return data_->Close(false);
}

bool KuduSession::HasPendingOperations() const {
  return data_->HasPendingOperations();
}

int KuduSession::CountBufferedOperations() const {
  return data_->CountBufferedOperations();
}

Status KuduSession::SetErrorBufferSpace(size_t size_bytes) {
  return data_->error_collector_->SetMaxMemSize(size_bytes);
}

int KuduSession::CountPendingErrors() const {
  return data_->error_collector_->CountErrors();
}

void KuduSession::GetPendingErrors(vector<KuduError*>* errors, bool* overflowed) {
  data_->error_collector_->GetErrors(errors, overflowed);
}

KuduClient* KuduSession::client() const {
  return data_->client_.get();
}

const ResourceMetrics& KuduSession::GetWriteOpMetrics() const {
  return data_->write_op_metrics_;
}

////////////////////////////////////////////////////////////
// KuduTableAlterer
////////////////////////////////////////////////////////////
KuduTableAlterer::KuduTableAlterer(KuduClient* client, const string& name)
  : data_(new Data(client, name)) {
}

KuduTableAlterer::~KuduTableAlterer() {
  delete data_;
}

KuduTableAlterer* KuduTableAlterer::RenameTo(const string& new_name) {
  data_->rename_to_ = new_name;
  return this;
}

KuduTableAlterer* KuduTableAlterer::SetOwner(const string& new_owner) {
  data_->set_owner_to_ = new_owner;
  return this;
}

KuduTableAlterer* KuduTableAlterer::SetComment(const string& new_comment) {
  data_->set_comment_to_ = new_comment;
  return this;
}

KuduColumnSpec* KuduTableAlterer::AddColumn(const string& name) {
  Data::Step s = { AlterTableRequestPB::ADD_COLUMN,
                   new KuduColumnSpec(name), nullptr };
  auto* spec = s.spec;
  data_->steps_.emplace_back(std::move(s));
  return spec;
}

KuduColumnSpec* KuduTableAlterer::AlterColumn(const string& name) {
  Data::Step s = { AlterTableRequestPB::ALTER_COLUMN,
                   new KuduColumnSpec(name), nullptr };
  auto* spec = s.spec;
  data_->steps_.emplace_back(std::move(s));
  return spec;
}

KuduTableAlterer* KuduTableAlterer::DropColumn(const string& name) {
  Data::Step s = { AlterTableRequestPB::DROP_COLUMN,
                   new KuduColumnSpec(name), nullptr };
  data_->steps_.emplace_back(std::move(s));
  return this;
}

KuduTableAlterer* KuduTableAlterer::AddRangePartition(
    KuduPartialRow* lower_bound,
    KuduPartialRow* upper_bound,
    KuduTableCreator::RangePartitionBound lower_bound_type,
    KuduTableCreator::RangePartitionBound upper_bound_type) {
  return AddRangePartitionWithDimension(
      lower_bound, upper_bound, "", lower_bound_type, upper_bound_type);
}

KuduTableAlterer* KuduTableAlterer::AddRangePartitionWithDimension(
    KuduPartialRow* lower_bound,
    KuduPartialRow* upper_bound,
    const std::string& dimension_label,
    KuduTableCreator::RangePartitionBound lower_bound_type,
    KuduTableCreator::RangePartitionBound upper_bound_type) {

  if (lower_bound == nullptr || upper_bound == nullptr) {
    data_->status_ = Status::InvalidArgument("range partition bounds may not be null");
    return this;
  }
  if (*lower_bound->schema() != *upper_bound->schema()) {
    data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas");
    return this;
  }
  if (data_->schema_ == nullptr) {
    data_->schema_ = lower_bound->schema();
  } else if (*lower_bound->schema() != *data_->schema_) {
    data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas");
    return this;
  }

  Data::Step s { AlterTableRequestPB::ADD_RANGE_PARTITION,
                 nullptr,
                 std::unique_ptr<KuduRangePartition>(new KuduRangePartition(
                     lower_bound, upper_bound, lower_bound_type, upper_bound_type)),
                 dimension_label.empty() ? nullopt : make_optional(dimension_label) };
  data_->steps_.emplace_back(std::move(s));
  data_->has_alter_partitioning_steps = true;
  return this;
}

KuduTableAlterer* KuduTableAlterer::AddRangePartition(
    KuduRangePartition* partition) {
  CHECK(partition);
  if (partition->data_->lower_bound_ == nullptr || partition->data_->upper_bound_  == nullptr) {
    data_->status_ = Status::InvalidArgument("range partition bounds may not be null");
    return this;
  }
  if (partition->data_->lower_bound_->schema() != partition->data_->upper_bound_->schema()) {
    data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas");
    return this;
  }
  if (data_->schema_ == nullptr) {
    data_->schema_ = partition->data_->lower_bound_->schema();
  } else if (partition->data_->lower_bound_->schema() != data_->schema_) {
    data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas");
    return this;
  }

  Data::Step s { AlterTableRequestPB::ADD_RANGE_PARTITION,
                 nullptr,
                 std::unique_ptr<KuduRangePartition>(partition),
                 nullopt };
  data_->steps_.emplace_back(std::move(s));
  data_->has_alter_partitioning_steps = true;
  if (!data_->steps_.back().range_partition->data_->is_table_wide_hash_schema_) {
    data_->adding_range_with_custom_hash_schema = true;
  }
  return this;
}

KuduTableAlterer* KuduTableAlterer::DropRangePartition(
    KuduPartialRow* lower_bound,
    KuduPartialRow* upper_bound,
    KuduTableCreator::RangePartitionBound lower_bound_type,
    KuduTableCreator::RangePartitionBound upper_bound_type) {
  if (lower_bound == nullptr || upper_bound == nullptr) {
    data_->status_ = Status::InvalidArgument("range partition bounds may not be null");
    return this;
  }
  if (*lower_bound->schema() != *upper_bound->schema()) {
    data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas");
    return this;
  }
  if (data_->schema_ == nullptr) {
    data_->schema_ = lower_bound->schema();
  } else if (*lower_bound->schema() != *data_->schema_) {
    data_->status_ = Status::InvalidArgument("range partition bounds must have matching schemas");
    return this;
  }

  Data::Step s { AlterTableRequestPB::DROP_RANGE_PARTITION,
                 nullptr,
                 std::unique_ptr<KuduRangePartition>(new KuduRangePartition(
                     lower_bound, upper_bound, lower_bound_type, upper_bound_type)) };
  data_->steps_.emplace_back(std::move(s));
  data_->has_alter_partitioning_steps = true;
  return this;
}

KuduTableAlterer* KuduTableAlterer::AlterExtraConfig(const map<string, string>& extra_configs) {
  data_->new_extra_configs_ = extra_configs;
  return this;
}

KuduTableAlterer* KuduTableAlterer::SetTableDiskSizeLimit(int64_t disk_size_limit) {
  data_->disk_size_limit_ = disk_size_limit;
  return this;
}

KuduTableAlterer* KuduTableAlterer::SetTableRowCountLimit(int64_t row_count_limit) {
  data_->row_count_limit_ = row_count_limit;
  return this;
}

KuduTableAlterer* KuduTableAlterer::timeout(const MonoDelta& timeout) {
  data_->timeout_ = timeout;
  return this;
}

KuduTableAlterer* KuduTableAlterer::wait(bool wait) {
  data_->wait_ = wait;
  return this;
}

KuduTableAlterer* KuduTableAlterer::modify_external_catalogs(
    bool modify_external_catalogs) {
  data_->modify_external_catalogs_ = modify_external_catalogs;
  return this;
}

Status KuduTableAlterer::Alter() {
  AlterTableRequestPB req;
  AlterTableResponsePB resp;
  RETURN_NOT_OK(data_->ToRequest(&req));

  bool has_immutable_column_schema = false;
  for (const auto& step : data_->steps_) {
    if (step.step_type == AlterTableRequestPB::ADD_COLUMN && step.spec->data_->immutable) {
      has_immutable_column_schema = true;
      break;
    }
  }

  MonoDelta timeout = data_->timeout_.Initialized() ?
    data_->timeout_ :
    data_->client_->default_admin_operation_timeout();
  MonoTime deadline = MonoTime::Now() + timeout;
  RETURN_NOT_OK(data_->client_->data_->AlterTable(
      data_->client_, req, &resp, deadline,
      data_->has_alter_partitioning_steps,
      data_->adding_range_with_custom_hash_schema,
      has_immutable_column_schema));

  if (data_->has_alter_partitioning_steps) {
    // If the table partitions change, clear the local meta cache so that the
    // new tablets can immediately be written to and scanned, and the old
    // tablets won't be seen again. This also prevents rows being batched for
    // the wrong tablet when a partition is dropped and added in the same alter
    // table transaction. We could clear the meta cache for just the table being
    // altered or just the partition key ranges being changed, but that would
    // require opening the table in order to get the ID, schema, and partition
    // schema.
    //
    // It is not necessary to wait for the alteration to be completed before
    // clearing the cache (i.e. the tablets to be created), because the master
    // has its soft state updated as part of handling the alter table RPC. When
    // the meta cache looks up the new tablet locations during a subsequent
    // write or scan, the master will return a ServiceUnavailable response if
    // the new tablets are not yet running. The meta cache will automatically
    // retry after a delay when it encounters this error.
    data_->client_->data_->meta_cache_->ClearCache();
  }

  if (data_->wait_) {
    if (!resp.has_table_id()) {
      return Status::NotSupported("Alter Table succeeded but the server's "
          "response did not include a table ID. This server is too old to wait "
          "for alter table to finish");
    }
    TableIdentifierPB table;
    table.set_table_id(resp.table_id());
    RETURN_NOT_OK(data_->client_->data_->WaitForAlterTableToFinish(
        data_->client_, table, deadline));
  }

  return Status::OK();
}

////////////////////////////////////////////////////////////
// KuduScanner
////////////////////////////////////////////////////////////

KuduScanner::KuduScanner(KuduTable* table)
  : data_(new KuduScanner::Data(table)) {
}

KuduScanner::~KuduScanner() {
  Close();
  delete data_;
}

Status KuduScanner::SetProjectedColumns(const vector<string>& col_names) {
  return SetProjectedColumnNames(col_names);
}

Status KuduScanner::SetProjectedColumnNames(const vector<string>& col_names) {
  if (data_->open_) {
    return Status::IllegalState("Projection must be set before Open()");
  }
  return data_->mutable_configuration()->SetProjectedColumnNames(col_names);
}

Status KuduScanner::SetProjectedColumnIndexes(const vector<int>& col_indexes) {
  if (data_->open_) {
    return Status::IllegalState("Projection must be set before Open()");
  }
  return data_->mutable_configuration()->SetProjectedColumnIndexes(col_indexes);
}

Status KuduScanner::SetBatchSizeBytes(uint32_t batch_size) {
  return data_->mutable_configuration()->SetBatchSizeBytes(batch_size);
}

Status KuduScanner::SetReadMode(ReadMode read_mode) {
  if (data_->open_) {
    return Status::IllegalState("Read mode must be set before Open()");
  }
  if (!tight_enum_test<ReadMode>(read_mode)) {
    return Status::InvalidArgument("Bad read mode");
  }
  return data_->mutable_configuration()->SetReadMode(read_mode);
}

Status KuduScanner::SetOrderMode(OrderMode order_mode) {
  if (data_->open_) {
    return Status::IllegalState("Order mode must be set before Open()");
  }
  if (!tight_enum_test<OrderMode>(order_mode)) {
    return Status::InvalidArgument("Bad order mode");
  }
  return data_->mutable_configuration()->SetFaultTolerant(order_mode == ORDERED);
}

Status KuduScanner::SetFaultTolerant() {
  if (data_->open_) {
    return Status::IllegalState("Fault-tolerance must be set before Open()");
  }
  return data_->mutable_configuration()->SetFaultTolerant(true);
}

Status KuduScanner::SetSnapshotMicros(uint64_t snapshot_timestamp_micros) {
  if (data_->open_) {
    return Status::IllegalState("Snapshot timestamp must be set before Open()");
  }
  data_->mutable_configuration()->SetSnapshotMicros(snapshot_timestamp_micros);
  return Status::OK();
}

Status KuduScanner::SetSnapshotRaw(uint64_t snapshot_timestamp) {
  if (data_->open_) {
    return Status::IllegalState("Snapshot timestamp must be set before Open()");
  }
  if (snapshot_timestamp == 0) {
    return Status::IllegalState("Snapshot timestamp must be set bigger than 0");
  }
  data_->mutable_configuration()->SetSnapshotRaw(snapshot_timestamp);
  return Status::OK();
}

Status KuduScanner::SetDiffScan(uint64_t start_timestamp, uint64_t end_timestamp) {
  if (data_->open_) {
    return Status::IllegalState("Diff scan must be set before Open()");
  }
  return data_->mutable_configuration()->SetDiffScan(start_timestamp, end_timestamp);
}

Status KuduScanner::SetSelection(KuduClient::ReplicaSelection selection) {
  if (data_->open_) {
    return Status::IllegalState("Replica selection must be set before Open()");
  }
  return data_->mutable_configuration()->SetSelection(selection);
}

Status KuduScanner::SetTimeoutMillis(int millis) {
  if (data_->open_) {
    return Status::IllegalState("Timeout must be set before Open()");
  }
  data_->mutable_configuration()->SetTimeoutMillis(millis);
  return Status::OK();
}

Status KuduScanner::AddConjunctPredicate(KuduPredicate* pred) {
  // Take ownership even if returning non-OK status.
  unique_ptr<KuduPredicate> p(pred);
  if (data_->open_) {
    return Status::IllegalState("Predicate must be set before Open()");
  }
  return data_->mutable_configuration()->AddConjunctPredicate(std::move(p));
}

Status KuduScanner::AddLowerBound(const KuduPartialRow& key) {
  return data_->mutable_configuration()->AddLowerBound(key);
}

Status KuduScanner::AddLowerBoundRaw(const Slice& key) {
  return data_->mutable_configuration()->AddLowerBoundRaw(key);
}

Status KuduScanner::AddExclusiveUpperBound(const KuduPartialRow& key) {
  return data_->mutable_configuration()->AddUpperBound(key);
}

Status KuduScanner::AddExclusiveUpperBoundRaw(const Slice& key) {
  return data_->mutable_configuration()->AddUpperBoundRaw(key);
}

Status KuduScanner::AddLowerBoundPartitionKeyRaw(const Slice& partition_key) {
  // The number of hash dimensions in all hash schemas of a table is an
  // invariant and checked throughout the code. With that, the table-wide hash
  // schema is used as a proxy to find the number of hash dimensions to separate
  // the hash-related prefix from the rest of the encoded partition key in the
  // code below.
  //
  // TODO(KUDU-2671) update this code if allowing for different number of
  //                 dimensions in range-specific hash schemas
  const auto& hash_schema = GetKuduTable()->partition_schema().hash_schema();
  return data_->mutable_configuration()->AddLowerBoundPartitionKeyRaw(
      Partition::StringToPartitionKey(partition_key.ToString(),
                                      hash_schema.size()));
}

Status KuduScanner::AddExclusiveUpperBoundPartitionKeyRaw(const Slice& partition_key) {
  // The number of hash dimensions in all hash schemas of a table is an
  // invariant and checked throughout the code. With that, the table-wide hash
  // schema is used as a proxy to find the number of hash dimensions to separate
  // the hash-related prefix from the rest of the encoded partition key in the
  // code below.
  //
  // TODO(KUDU-2671) update this code if allowing for different number of
  //                 dimensions in range-specific hash schemas
  const auto& hash_schema = GetKuduTable()->partition_schema().hash_schema();
  return data_->mutable_configuration()->AddUpperBoundPartitionKeyRaw(
      Partition::StringToPartitionKey(partition_key.ToString(),
                                      hash_schema.size()));
}

Status KuduScanner::SetCacheBlocks(bool cache_blocks) {
  if (data_->open_) {
    return Status::IllegalState("Block caching must be set before Open()");
  }
  return data_->mutable_configuration()->SetCacheBlocks(cache_blocks);
}

KuduSchema KuduScanner::GetProjectionSchema() const {
  return KuduSchema::FromSchema(*data_->configuration().projection());
}

shared_ptr<KuduTable> KuduScanner::GetKuduTable() {
  return data_->table_;
}

Status KuduScanner::SetRowFormatFlags(uint64_t flags) {
  switch (flags) {
    case NO_FLAGS:
    case PAD_UNIXTIME_MICROS_TO_16_BYTES:
    case COLUMNAR_LAYOUT:
      break;
    default:
      return Status::InvalidArgument(Substitute("Invalid row format flags: $0", flags));
  }
  if (data_->open_) {
    return Status::IllegalState("Row format flags must be set before Open()");
  }

  return data_->mutable_configuration()->SetRowFormatFlags(flags);
}

Status KuduScanner::SetLimit(int64_t limit) {
  if (data_->open_) {
    return Status::IllegalState("Limit must be set before Open()");
  }
  return data_->mutable_configuration()->SetLimit(limit);
}

const ResourceMetrics& KuduScanner::GetResourceMetrics() const {
  return data_->resource_metrics_;
}

namespace {
// Callback for the RPC sent by Close().
// We can't use the KuduScanner response and RPC controller members for this
// call, because the scanner object may be destructed while the call is still
// being processed.
struct CloseCallback {
  RpcController controller;
  ScanResponsePB response;
  string scanner_id;
  void Callback() {
    if (!controller.status().ok()) {
      LOG(WARNING) << "Couldn't close scanner " << scanner_id << ": "
                   << controller.status().ToString();
    }
    delete this;
  }
};
} // anonymous namespace

string KuduScanner::ToString() const {
  return KUDU_DISABLE_REDACTION(Substitute(
      "$0: $1",
      data_->table_->name(),
      data_->configuration().spec().ToString(*data_->table_->schema().schema_)));
}

Status KuduScanner::Open() {
  CHECK(!data_->open_) << "Scanner already open";

  if (data_->configuration().has_start_timestamp()) {
    RETURN_NOT_OK(data_->mutable_configuration()->AddIsDeletedColumn());
  }
  data_->mutable_configuration()->OptimizeScanSpec();
  data_->partition_pruner_.Init(*data_->table_->schema().schema_,
                                data_->table_->partition_schema(),
                                data_->configuration().spec());

  if (data_->configuration().spec().CanShortCircuit() ||
      !data_->partition_pruner_.HasMorePartitionKeyRanges()) {
    VLOG(2) << "Short circuiting scan " << data_->DebugString();
    data_->open_ = true;
    data_->short_circuit_ = true;
    return Status::OK();
  }

  // For READ_YOUR_WRITES scan mode, get the latest observed timestamp and store it
  // to scan config. Always use this one as propagation timestamp for the duration
  // of the scan to avoid unnecessarily wait.
  if (data_->configuration().read_mode() == READ_YOUR_WRITES) {
    const uint64_t lo_ts = data_->table_->client()->data_->GetLatestObservedTimestamp();
    data_->mutable_configuration()->SetScanLowerBoundTimestampRaw(lo_ts);
  }

  if (data_->configuration().read_mode() != READ_AT_SNAPSHOT &&
      data_->configuration().has_snapshot_timestamp()) {
    return Status::InvalidArgument("Snapshot timestamp should only be configured "
                                   "for READ_AT_SNAPSHOT scan mode.");
  }

  VLOG(2) << "Beginning " << data_->DebugString();

  MonoTime deadline = MonoTime::Now() + data_->configuration().timeout();
  set<string> blacklist;

  RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));

  data_->open_ = true;
  return Status::OK();
}

Status KuduScanner::KeepAlive() {
  return data_->KeepAlive();
}

void KuduScanner::Close() {
  if (!data_->open_) return;

  VLOG(2) << "Ending " << data_->DebugString();

  // Close the scanner on the server-side, if necessary.
  //
  // If the scan did not match any rows, the tserver will not assign a scanner ID.
  // This is reflected in the Open() response. In this case, there is no server-side state
  // to clean up.
  if (!data_->next_req_.scanner_id().empty()) {
    CHECK(data_->proxy_);
    unique_ptr<CloseCallback> closer(new CloseCallback);
    closer->scanner_id = data_->next_req_.scanner_id();
    data_->PrepareRequest(KuduScanner::Data::CLOSE);
    data_->next_req_.set_close_scanner(true);
    closer->controller.set_timeout(data_->configuration().timeout());
    // CloseCallback::Callback() deletes the closer.
    CloseCallback* closer_raw = closer.release();
    data_->proxy_->ScanAsync(data_->next_req_, &closer_raw->response, &closer_raw->controller,
                             [closer_raw]() { closer_raw->Callback(); });
  }
  data_->proxy_.reset();
  data_->open_ = false;
  return;
}

bool KuduScanner::HasMoreRows() const {
  CHECK(data_->open_);
  return !data_->short_circuit_ &&                 // The scan is not short circuited
      (data_->data_in_open_ ||                     // more data in hand
       data_->last_response_.has_more_results() || // more data in this tablet
       data_->MoreTablets());                      // more tablets to scan, possibly with more data
}

Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) {
  if (PREDICT_FALSE(data_->configuration().row_format_flags() != KuduScanner::NO_FLAGS)) {
    return Status::IllegalState(
        Substitute("Cannot extract rows. Row format modifier flags were selected: $0",
                   data_->configuration().row_format_flags()));
  }
  RETURN_NOT_OK(NextBatch(&data_->batch_for_old_api_));
  data_->batch_for_old_api_.data_->ExtractRows(rows);
  return Status::OK();
}

Status KuduScanner::NextBatch(KuduScanBatch* batch) {
  return NextBatch(batch->data_);
}

Status KuduScanner::NextBatch(KuduColumnarScanBatch* batch) {
  return NextBatch(batch->data_);
}

Status KuduScanner::NextBatch(internal::ScanBatchDataInterface* batch_data) {

  // TODO: do some double-buffering here -- when we return this batch
  // we should already have fired off the RPC for the next batch, but
  // need to do some swapping of the response objects around to avoid
  // stomping on the memory the user is looking at.
  CHECK(data_->open_);

  batch_data->Clear();

  if (data_->short_circuit_) {
    return Status::OK();
  }

  if (data_->data_in_open_) {
    // We have data from a previous scan.
    CHECK(data_->proxy_);
    VLOG(2) << "Extracting data from " << data_->DebugString();
    data_->data_in_open_ = false;
    return batch_data->Reset(&data_->controller_,
                             data_->configuration().projection(),
                             data_->configuration().client_projection(),
                             data_->configuration().row_format_flags(),
                             &data_->last_response_);
  }

  if (data_->last_response_.has_more_results()) {
    // More data is available in this tablet.
    CHECK(data_->proxy_);
    VLOG(2) << "Continuing " << data_->DebugString();

    MonoTime batch_deadline = MonoTime::Now() + data_->configuration().timeout();
    data_->PrepareRequest(KuduScanner::Data::CONTINUE);

    while (true) {
      bool allow_time_for_failover = data_->configuration().is_fault_tolerant();
      ScanRpcStatus result = data_->SendScanRpc(batch_deadline, allow_time_for_failover);

      // Success case.
      if (result.result == ScanRpcStatus::OK) {
        if (data_->last_response_.has_last_primary_key()) {
          data_->last_primary_key_ = data_->last_response_.last_primary_key();
        }
        data_->scan_attempts_ = 0;
        return batch_data->Reset(&data_->controller_,
                                 data_->configuration().projection(),
                                 data_->configuration().client_projection(),
                                 data_->configuration().row_format_flags(),
                                 &data_->last_response_);
      }

      data_->scan_attempts_++;

      // Error handling.
      set<string> blacklist;
      bool needs_reopen = false;
      Status s = data_->HandleError(result, batch_deadline, &blacklist, &needs_reopen);
      if (!s.ok()) {
        LOG(WARNING) << "Scan on tablet server " << data_->ts_->ToString() << " with "
                     << data_->DebugString() << " failed: " << result.status.ToString();
        return s;
      }

      if (data_->configuration().is_fault_tolerant()) {
        LOG(WARNING) << "Attempting to retry " << data_->DebugString()
                     << " elsewhere.";
        return data_->ReopenCurrentTablet(batch_deadline, &blacklist);
      }

      if (blacklist.empty() && !needs_reopen) {
        // If we didn't blacklist the current server, we can just retry again.
        continue;
      }
      // If we blacklisted the current server, and it's not fault-tolerant, we can't
      // retry anywhere, so just propagate the error.
      return result.status;
    }
  } else if (data_->MoreTablets()) {
    // More data may be available in other tablets.
    // No need to close the current tablet; we scanned all the data so the
    // server closed it for us.
    VLOG(2) << "Scanning next tablet " << data_->DebugString();
    data_->last_primary_key_.clear();
    MonoTime deadline = MonoTime::Now() + data_->configuration().timeout();
    set<string> blacklist;

    RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
    if (data_->data_in_open_) {
      // Avoid returning an empty batch in between tablets if we have data
      // we can return from this call.
      return NextBatch(batch_data);
    }
    return Status::OK();
  } else {
    // No more data anywhere.
    return Status::OK();
  }
}

Status KuduScanner::GetCurrentServer(KuduTabletServer** server) {
  CHECK(data_->open_);
  internal::RemoteTabletServer* rts = data_->ts_;
  CHECK(rts);
  vector<HostPort> host_ports;
  rts->GetHostPorts(&host_ports);
  if (host_ports.empty()) {
    return Status::IllegalState(Substitute("No HostPort found for RemoteTabletServer $0",
                                           rts->ToString()));
  }
  unique_ptr<KuduTabletServer> client_server(new KuduTabletServer);
  client_server->data_ = new KuduTabletServer::Data(rts->permanent_uuid(),
                                                    host_ports[0],
                                                    rts->location());
  *server = client_server.release();
  return Status::OK();
}

////////////////////////////////////////////////////////////
// KuduScanToken
////////////////////////////////////////////////////////////

KuduScanToken::KuduScanToken()
    : data_(nullptr) {
}

KuduScanToken::~KuduScanToken() {
  delete data_;
}

Status KuduScanToken::IntoKuduScanner(KuduScanner** scanner) const {
  return data_->IntoKuduScanner(scanner);
}

const KuduTablet& KuduScanToken::tablet() const {
  return data_->tablet();
}

Status KuduScanToken::Serialize(string* buf) const {
  return data_->Serialize(buf);
}

Status KuduScanToken::DeserializeIntoScanner(KuduClient* client,
                                             const string& serialized_token,
                                             KuduScanner** scanner) {
  return KuduScanToken::Data::DeserializeIntoScanner(
      client, serialized_token, scanner);
}

////////////////////////////////////////////////////////////
// KuduScanTokenBuilder
////////////////////////////////////////////////////////////

KuduScanTokenBuilder::KuduScanTokenBuilder(KuduTable* table)
    : data_(new KuduScanTokenBuilder::Data(table)) {
}

KuduScanTokenBuilder::~KuduScanTokenBuilder() {
  delete data_;
}

Status KuduScanTokenBuilder::SetProjectedColumnNames(const vector<string>& col_names) {
  return data_->mutable_configuration()->SetProjectedColumnNames(col_names);
}

Status KuduScanTokenBuilder::SetProjectedColumnIndexes(const vector<int>& col_indexes) {
  return data_->mutable_configuration()->SetProjectedColumnIndexes(col_indexes);
}

Status KuduScanTokenBuilder::SetBatchSizeBytes(uint32_t batch_size) {
  return data_->mutable_configuration()->SetBatchSizeBytes(batch_size);
}

Status KuduScanTokenBuilder::SetReadMode(KuduScanner::ReadMode read_mode) {
  if (!tight_enum_test<KuduScanner::ReadMode>(read_mode)) {
    return Status::InvalidArgument("Bad read mode");
  }
  return data_->mutable_configuration()->SetReadMode(read_mode);
}

Status KuduScanTokenBuilder::SetFaultTolerant() {
  return data_->mutable_configuration()->SetFaultTolerant(true);
}

Status KuduScanTokenBuilder::SetSnapshotMicros(uint64_t snapshot_timestamp_micros) {
  data_->mutable_configuration()->SetSnapshotMicros(snapshot_timestamp_micros);
  return Status::OK();
}

Status KuduScanTokenBuilder::SetDiffScan(uint64_t start_timestamp, uint64_t end_timestamp) {
  return data_->mutable_configuration()->SetDiffScan(start_timestamp, end_timestamp);
}

Status KuduScanTokenBuilder::SetSnapshotRaw(uint64_t snapshot_timestamp) {
  data_->mutable_configuration()->SetSnapshotRaw(snapshot_timestamp);
  return Status::OK();
}

Status KuduScanTokenBuilder::SetSelection(KuduClient::ReplicaSelection selection) {
  return data_->mutable_configuration()->SetSelection(selection);
}

Status KuduScanTokenBuilder::SetTimeoutMillis(int millis) {
  data_->mutable_configuration()->SetTimeoutMillis(millis);
  return Status::OK();
}

Status KuduScanTokenBuilder::IncludeTableMetadata(bool include_metadata) {
  data_->IncludeTableMetadata(include_metadata);
  return Status::OK();
}

Status KuduScanTokenBuilder::IncludeTabletMetadata(bool include_metadata) {
  data_->IncludeTabletMetadata(include_metadata);
  return Status::OK();
}

Status KuduScanTokenBuilder::AddConjunctPredicate(KuduPredicate* pred) {
  unique_ptr<KuduPredicate> p(pred);
  return data_->mutable_configuration()->AddConjunctPredicate(std::move(p));
}

Status KuduScanTokenBuilder::AddLowerBound(const KuduPartialRow& key) {
  return data_->mutable_configuration()->AddLowerBound(key);
}

Status KuduScanTokenBuilder::AddUpperBound(const KuduPartialRow& key) {
  return data_->mutable_configuration()->AddUpperBound(key);
}

Status KuduScanTokenBuilder::SetCacheBlocks(bool cache_blocks) {
  return data_->mutable_configuration()->SetCacheBlocks(cache_blocks);
}

Status KuduScanTokenBuilder::Build(vector<KuduScanToken*>* tokens) {
  return data_->Build(tokens);
}

////////////////////////////////////////////////////////////
// KuduReplica
////////////////////////////////////////////////////////////

KuduReplica::KuduReplica()
  : data_(nullptr) {
}

KuduReplica::~KuduReplica() {
  delete data_;
}

bool KuduReplica::is_leader() const {
  return data_->is_leader_;
}

const KuduTabletServer& KuduReplica::ts() const {
  return *data_->ts_;
}

////////////////////////////////////////////////////////////
// KuduTablet
////////////////////////////////////////////////////////////

KuduTablet::KuduTablet()
  : data_(nullptr) {
}

KuduTablet::~KuduTablet() {
  delete data_;
}

const string& KuduTablet::id() const {
  return data_->id_;
}

const vector<const KuduReplica*>& KuduTablet::replicas() const {
  return data_->replicas_;
}

////////////////////////////////////////////////////////////
// KuduTabletServer
////////////////////////////////////////////////////////////

KuduTabletServer::KuduTabletServer()
  : data_(nullptr) {
}

KuduTabletServer::~KuduTabletServer() {
  delete data_;
}

const string& KuduTabletServer::uuid() const {
  return data_->uuid_;
}

const string& KuduTabletServer::hostname() const {
  return data_->hp_.host();
}

uint16_t KuduTabletServer::port() const {
  return data_->hp_.port();
}

const string& KuduTabletServer::location() const {
  return data_->location_;
}

////////////////////////////////////////////////////////////
// KuduPartitionerBuilder
////////////////////////////////////////////////////////////
KuduPartitionerBuilder::KuduPartitionerBuilder(sp::shared_ptr<KuduTable> table)
    : data_(new Data(std::move(table))) {
}

KuduPartitionerBuilder::~KuduPartitionerBuilder() {
  delete data_;
}

KuduPartitionerBuilder* KuduPartitionerBuilder::SetBuildTimeout(MonoDelta timeout) {
  data_->SetBuildTimeout(timeout);
  return this;
}

Status KuduPartitionerBuilder::Build(KuduPartitioner** partitioner) {
  return data_->Build(partitioner);
}

////////////////////////////////////////////////////////////
// KuduPartitioner
////////////////////////////////////////////////////////////
KuduPartitioner::KuduPartitioner(Data* data)
    : data_(CHECK_NOTNULL(data)) {
}

KuduPartitioner::~KuduPartitioner() {
  delete data_;
}

int KuduPartitioner::NumPartitions() const {
  return data_->num_partitions_;
}

Status KuduPartitioner::PartitionRow(const KuduPartialRow& row, int* partition) {
  return data_->PartitionRow(row, partition);
}

} // namespace client
} // namespace kudu
