c++ client: remove unnecessary code

1. GetTableSchema() was implemented using its own RPC instead of a much
   simpler call to SyncLeaderMasterRpc(). An RPC is attractive for
   asynchronous use, but since it's never used that way, let's just switch
   over to SyncLeaderMasterRpc().
2. KuduTable::Open() was issuing both GetTableSchema() and
   GetTableLocations() RPCs. It's not clear why we're doing the latter; the
   response is completely ignored. So let's stop doing that.

My main motivation is to remove fragile error-handling and retry logic which
has been duplicated all over the client.

Change-Id: Idda2cc15bc6224df992bfe2eec287c0222adced0
Reviewed-on: http://gerrit.cloudera.org:8080/3809
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Reviewed-by: Todd Lipcon <todd@apache.org>
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 69d643d..aa2f98e 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <limits>
+#include <memory>
 #include <mutex>
 #include <string>
 #include <vector>
@@ -46,6 +47,7 @@
 using std::set;
 using std::shared_ptr;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
@@ -453,11 +455,6 @@
           &MasterServiceProxy::AlterTable,
           std::move(required_feature_flags));
   RETURN_NOT_OK(s);
-  // TODO: Consider the situation where the request is sent to the
-  // server, gets executed on the server and written to the server,
-  // but is seen as failed by the client, and is then retried (in which
-  // case the retry will fail due to original table being removed, a
-  // column being already added, etc...)
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
@@ -544,197 +541,40 @@
   return false;
 }
 
-namespace internal {
-
-// Gets a table's schema from the leader master. If the leader master
-// is down, waits for a new master to become the leader, and then gets
-// the table schema from the new leader master.
-//
-// TODO: When we implement the next fault tolerant client-master RPC
-// call (e.g., CreateTable/AlterTable), we should generalize this
-// method as to enable code sharing.
-class GetTableSchemaRpc : public Rpc {
- public:
-  GetTableSchemaRpc(KuduClient* client,
-                    StatusCallback user_cb,
-                    string table_name,
-                    KuduSchema* out_schema,
-                    PartitionSchema* out_partition_schema,
-                    string* out_id,
-                    const MonoTime& deadline,
-                    const shared_ptr<rpc::Messenger>& messenger);
-
-  virtual void SendRpc() OVERRIDE;
-
-  virtual string ToString() const OVERRIDE;
-
-  virtual ~GetTableSchemaRpc();
-
- private:
-  virtual void SendRpcCb(const Status& status) OVERRIDE;
-
-  void ResetLeaderMasterAndRetry();
-
-  void NewLeaderMasterDeterminedCb(const Status& status);
-
-  KuduClient* client_;
-  StatusCallback user_cb_;
-  const string table_name_;
-  KuduSchema* out_schema_;
-  PartitionSchema* out_partition_schema_;
-  string* out_id_;
-  GetTableSchemaResponsePB resp_;
-};
-
-GetTableSchemaRpc::GetTableSchemaRpc(KuduClient* client,
-                                     StatusCallback user_cb,
-                                     string table_name,
-                                     KuduSchema* out_schema,
-                                     PartitionSchema* out_partition_schema,
-                                     string* out_id,
-                                     const MonoTime& deadline,
-                                     const shared_ptr<rpc::Messenger>& messenger)
-    : Rpc(deadline, messenger),
-      client_(DCHECK_NOTNULL(client)),
-      user_cb_(std::move(user_cb)),
-      table_name_(std::move(table_name)),
-      out_schema_(DCHECK_NOTNULL(out_schema)),
-      out_partition_schema_(DCHECK_NOTNULL(out_partition_schema)),
-      out_id_(DCHECK_NOTNULL(out_id)) {
-}
-
-GetTableSchemaRpc::~GetTableSchemaRpc() {
-}
-
-void GetTableSchemaRpc::SendRpc() {
-  MonoTime now = MonoTime::Now(MonoTime::FINE);
-  if (retrier().deadline().ComesBefore(now)) {
-    SendRpcCb(Status::TimedOut("GetTableSchema timed out after deadline expired"));
-    return;
-  }
-
-  // See KuduClient::Data::SyncLeaderMasterRpc().
-  MonoTime rpc_deadline = now;
-  rpc_deadline.AddDelta(client_->default_rpc_timeout());
-  mutable_retrier()->mutable_controller()->set_deadline(
-      MonoTime::Earliest(rpc_deadline, retrier().deadline()));
-
-  GetTableSchemaRequestPB req;
-  req.mutable_table()->set_table_name(table_name_);
-  client_->data_->master_proxy()->GetTableSchemaAsync(
-      req, &resp_,
-      mutable_retrier()->mutable_controller(),
-      boost::bind(&GetTableSchemaRpc::SendRpcCb, this, Status::OK()));
-}
-
-string GetTableSchemaRpc::ToString() const {
-  return Substitute("GetTableSchemaRpc(table_name: $0, num_attempts: $1)",
-                    table_name_, num_attempts());
-}
-
-void GetTableSchemaRpc::ResetLeaderMasterAndRetry() {
-  client_->data_->SetMasterServerProxyAsync(
-      client_,
-      retrier().deadline(),
-      Bind(&GetTableSchemaRpc::NewLeaderMasterDeterminedCb,
-           Unretained(this)));
-}
-
-void GetTableSchemaRpc::NewLeaderMasterDeterminedCb(const Status& status) {
-  if (status.ok()) {
-    mutable_retrier()->mutable_controller()->Reset();
-    SendRpc();
-  } else {
-    LOG(WARNING) << "Failed to determine new Master: " << status.ToString();
-    mutable_retrier()->DelayedRetry(this, status);
-  }
-}
-
-void GetTableSchemaRpc::SendRpcCb(const Status& status) {
-  Status new_status = status;
-  if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
-    return;
-  }
-
-  if (new_status.ok() && resp_.has_error()) {
-    if (resp_.error().code() == MasterErrorPB::NOT_THE_LEADER ||
-        resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
-      if (client_->IsMultiMaster()) {
-        LOG(WARNING) << "Leader Master has changed ("
-                     << client_->data_->leader_master_hostport().ToString()
-                     << " is no longer the leader), re-trying...";
-        ResetLeaderMasterAndRetry();
-        return;
-      }
-    }
-    new_status = StatusFromPB(resp_.error().status());
-  }
-
-  if (new_status.IsTimedOut()) {
-    if (MonoTime::Now(MonoTime::FINE).ComesBefore(retrier().deadline())) {
-      if (client_->IsMultiMaster()) {
-        LOG(WARNING) << "Leader Master ("
-            << client_->data_->leader_master_hostport().ToString()
-            << ") timed out, re-trying...";
-        ResetLeaderMasterAndRetry();
-        return;
-      }
-    } else {
-      // Operation deadline expired during this latest RPC.
-      new_status = new_status.CloneAndPrepend(
-          "GetTableSchema timed out after deadline expired");
-    }
-  }
-
-  if (new_status.IsNetworkError()) {
-    if (client_->IsMultiMaster()) {
-      LOG(WARNING) << "Encountered a network error from the Master("
-                   << client_->data_->leader_master_hostport().ToString() << "): "
-                   << new_status.ToString() << ", retrying...";
-      ResetLeaderMasterAndRetry();
-      return;
-    }
-  }
-
-  if (new_status.ok()) {
-    gscoped_ptr<Schema> schema(new Schema());
-    new_status = SchemaFromPB(resp_.schema(), schema.get());
-    if (new_status.ok()) {
-      delete out_schema_->schema_;
-      out_schema_->schema_ = schema.release();
-      new_status = PartitionSchema::FromPB(resp_.partition_schema(),
-                                           *out_schema_->schema_,
-                                           out_partition_schema_);
-
-      *out_id_ = resp_.table_id();
-      CHECK_GT(out_id_->size(), 0) << "Running against a too-old master";
-    }
-  }
-  if (!new_status.ok()) {
-    LOG(WARNING) << ToString() << " failed: " << new_status.ToString();
-  }
-  user_cb_.Run(new_status);
-}
-
-} // namespace internal
-
 Status KuduClient::Data::GetTableSchema(KuduClient* client,
                                         const string& table_name,
                                         const MonoTime& deadline,
                                         KuduSchema* schema,
                                         PartitionSchema* partition_schema,
                                         string* table_id) {
-  Synchronizer sync;
-  GetTableSchemaRpc rpc(client,
-                        sync.AsStatusCallback(),
-                        table_name,
-                        schema,
-                        partition_schema,
-                        table_id,
-                        deadline,
-                        messenger_);
-  rpc.SendRpc();
-  return sync.Wait();
+  GetTableSchemaRequestPB req;
+  GetTableSchemaResponsePB resp;
+
+  req.mutable_table()->set_table_name(table_name);
+  Status s = SyncLeaderMasterRpc<GetTableSchemaRequestPB, GetTableSchemaResponsePB>(
+      deadline, client, req, &resp,
+      "GetTableSchema", &MasterServiceProxy::GetTableSchema, {});
+  RETURN_NOT_OK(s);
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+
+  // Parse the server schema out of the response.
+  unique_ptr<Schema> new_schema(new Schema());
+  RETURN_NOT_OK(SchemaFromPB(resp.schema(), new_schema.get()));
+
+  // Parse the server partition schema out of the response.
+  PartitionSchema new_partition_schema;
+  RETURN_NOT_OK(PartitionSchema::FromPB(resp.partition_schema(),
+                                        *new_schema.get(),
+                                        &new_partition_schema));
+
+  // Parsing was successful; release the schemas to the user.
+  delete schema->schema_;
+  schema->schema_ = new_schema.release();
+  *partition_schema = std::move(new_partition_schema);
+  *table_id = resp.table_id();
+  return Status::OK();
 }
 
 void KuduClient::Data::LeaderMasterDetermined(const Status& status,
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 6999a01..f0fba94 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -384,13 +384,10 @@
                                       &partition_schema,
                                       &table_id));
 
-  // In the future, probably will look up the table in some map to reuse KuduTable
-  // instances.
-  shared_ptr<KuduTable> ret(new KuduTable(shared_from_this(), table_name, table_id,
-                                          schema, partition_schema));
-  RETURN_NOT_OK(ret->data_->Open());
-  table->swap(ret);
-
+  // TODO: in the future, probably will look up the table in some map to reuse
+  // KuduTable instances.
+  table->reset(new KuduTable(shared_from_this(), table_name, table_id,
+                             schema, partition_schema));
   return Status::OK();
 }
 
diff --git a/src/kudu/client/table-internal.cc b/src/kudu/client/table-internal.cc
index c9aa5dd..ec0df68 100644
--- a/src/kudu/client/table-internal.cc
+++ b/src/kudu/client/table-internal.cc
@@ -19,22 +19,7 @@
 
 #include <string>
 
-#include "kudu/client/client-internal.h"
-#include "kudu/common/wire_protocol.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/sysinfo.h"
-#include "kudu/master/master.pb.h"
-#include "kudu/master/master.proxy.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/util/monotime.h"
-
 namespace kudu {
-
-using master::GetTableLocationsRequestPB;
-using master::GetTableLocationsResponsePB;
-using rpc::RpcController;
-using std::string;
-
 namespace client {
 
 using sp::shared_ptr;
@@ -54,95 +39,5 @@
 KuduTable::Data::~Data() {
 }
 
-Status KuduTable::Data::Open() {
-  // TODO: fetch the schema from the master here once catalog is available.
-  GetTableLocationsRequestPB req;
-  GetTableLocationsResponsePB resp;
-
-  MonoTime deadline = MonoTime::Now(MonoTime::FINE);
-  deadline.AddDelta(client_->default_admin_operation_timeout());
-
-  req.mutable_table()->set_table_id(id_);
-  Status s;
-  // TODO: replace this with Async RPC-retrier based RPC in the next revision,
-  // adding exponential backoff and allowing this to be used safely in a
-  // a reactor thread.
-  while (true) {
-    RpcController rpc;
-
-    // Have we already exceeded our deadline?
-    MonoTime now = MonoTime::Now(MonoTime::FINE);
-    if (deadline.ComesBefore(now)) {
-      const char* msg = "OpenTable timed out after deadline expired";
-      LOG(ERROR) << msg;
-      return Status::TimedOut(msg);
-    }
-
-    // See KuduClient::Data::SyncLeaderMasterRpc().
-    MonoTime rpc_deadline = now;
-    rpc_deadline.AddDelta(client_->default_rpc_timeout());
-    rpc.set_deadline(MonoTime::Earliest(rpc_deadline, deadline));
-
-    s = client_->data_->master_proxy()->GetTableLocations(req, &resp, &rpc);
-    if (!s.ok()) {
-      // Various conditions cause us to look for the leader master again.
-      // It's ok if that eventually fails; we'll retry over and over until
-      // the deadline is reached.
-
-      if (s.IsNetworkError()) {
-        LOG(WARNING) << "Network error talking to the leader master ("
-                     << client_->data_->leader_master_hostport().ToString() << "): "
-                     << s.ToString();
-        if (client_->IsMultiMaster()) {
-          LOG(INFO) << "Determining the leader master again and retrying.";
-          WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
-                      "Failed to determine new Master");
-          continue;
-        }
-      }
-
-      if (s.IsTimedOut()
-          && MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
-        // If the RPC timed out and the operation deadline expired, we'll loop
-        // again and time out for good above.
-        LOG(WARNING) << "Timed out talking to the leader master ("
-                     << client_->data_->leader_master_hostport().ToString() << "): "
-                     << s.ToString();
-        if (client_->IsMultiMaster()) {
-          LOG(INFO) << "Determining the leader master again and retrying.";
-          WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
-                      "Failed to determine new Master");
-          continue;
-        }
-      }
-    }
-    if (s.ok() && resp.has_error()) {
-      if (resp.error().code() == master::MasterErrorPB::NOT_THE_LEADER ||
-          resp.error().code() == master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
-        LOG(WARNING) << "Master " << client_->data_->leader_master_hostport().ToString()
-                     << " is no longer the leader master.";
-        if (client_->IsMultiMaster()) {
-          LOG(INFO) << "Determining the leader master again and retrying.";
-          WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
-                      "Failed to determine new Master");
-          continue;
-        }
-      }
-      s = StatusFromPB(resp.error().status());
-    }
-    if (s.ok()) {
-      break;
-    } else {
-      LOG(WARNING) << "Error getting table locations: " << s.ToString() << ", retrying.";
-    }
-
-    /* TODO: Use exponential backoff instead */
-    base::SleepForMilliseconds(100);
-  }
-
-  VLOG(1) << "Open Table " << name_ << ", found " << resp.tablet_locations_size() << " tablets";
-  return Status::OK();
-}
-
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/table-internal.h b/src/kudu/client/table-internal.h
index 0a56f0d..17c90f2 100644
--- a/src/kudu/client/table-internal.h
+++ b/src/kudu/client/table-internal.h
@@ -35,8 +35,6 @@
        PartitionSchema partition_schema);
   ~Data();
 
-  Status Open();
-
   sp::shared_ptr<KuduClient> client_;
 
   std::string name_;