c++ client: remove unnecessary code
1. GetTableSchema() was implemented using its own RPC instead of a much
simpler call to SyncLeaderMasterRpc(). An RPC is attractive for
asynchronous use, but since it's never used that way, let's just switch
over to SyncLeaderMasterRpc().
2. KuduTable::Open() was issuing both GetTableSchema() and
GetTableLocations() RPCs. It's not clear why we're doing the latter; the
response is completely ignored. So let's stop doing that.
My main motivation is to remove fragile error-handling and retry logic which
has been duplicated all over the client.
Change-Id: Idda2cc15bc6224df992bfe2eec287c0222adced0
Reviewed-on: http://gerrit.cloudera.org:8080/3809
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Reviewed-by: Todd Lipcon <todd@apache.org>
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 69d643d..aa2f98e 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <limits>
+#include <memory>
#include <mutex>
#include <string>
#include <vector>
@@ -46,6 +47,7 @@
using std::set;
using std::shared_ptr;
using std::string;
+using std::unique_ptr;
using std::vector;
namespace kudu {
@@ -453,11 +455,6 @@
&MasterServiceProxy::AlterTable,
std::move(required_feature_flags));
RETURN_NOT_OK(s);
- // TODO: Consider the situation where the request is sent to the
- // server, gets executed on the server and written to the server,
- // but is seen as failed by the client, and is then retried (in which
- // case the retry will fail due to original table being removed, a
- // column being already added, etc...)
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
@@ -544,197 +541,40 @@
return false;
}
-namespace internal {
-
-// Gets a table's schema from the leader master. If the leader master
-// is down, waits for a new master to become the leader, and then gets
-// the table schema from the new leader master.
-//
-// TODO: When we implement the next fault tolerant client-master RPC
-// call (e.g., CreateTable/AlterTable), we should generalize this
-// method as to enable code sharing.
-class GetTableSchemaRpc : public Rpc {
- public:
- GetTableSchemaRpc(KuduClient* client,
- StatusCallback user_cb,
- string table_name,
- KuduSchema* out_schema,
- PartitionSchema* out_partition_schema,
- string* out_id,
- const MonoTime& deadline,
- const shared_ptr<rpc::Messenger>& messenger);
-
- virtual void SendRpc() OVERRIDE;
-
- virtual string ToString() const OVERRIDE;
-
- virtual ~GetTableSchemaRpc();
-
- private:
- virtual void SendRpcCb(const Status& status) OVERRIDE;
-
- void ResetLeaderMasterAndRetry();
-
- void NewLeaderMasterDeterminedCb(const Status& status);
-
- KuduClient* client_;
- StatusCallback user_cb_;
- const string table_name_;
- KuduSchema* out_schema_;
- PartitionSchema* out_partition_schema_;
- string* out_id_;
- GetTableSchemaResponsePB resp_;
-};
-
-GetTableSchemaRpc::GetTableSchemaRpc(KuduClient* client,
- StatusCallback user_cb,
- string table_name,
- KuduSchema* out_schema,
- PartitionSchema* out_partition_schema,
- string* out_id,
- const MonoTime& deadline,
- const shared_ptr<rpc::Messenger>& messenger)
- : Rpc(deadline, messenger),
- client_(DCHECK_NOTNULL(client)),
- user_cb_(std::move(user_cb)),
- table_name_(std::move(table_name)),
- out_schema_(DCHECK_NOTNULL(out_schema)),
- out_partition_schema_(DCHECK_NOTNULL(out_partition_schema)),
- out_id_(DCHECK_NOTNULL(out_id)) {
-}
-
-GetTableSchemaRpc::~GetTableSchemaRpc() {
-}
-
-void GetTableSchemaRpc::SendRpc() {
- MonoTime now = MonoTime::Now(MonoTime::FINE);
- if (retrier().deadline().ComesBefore(now)) {
- SendRpcCb(Status::TimedOut("GetTableSchema timed out after deadline expired"));
- return;
- }
-
- // See KuduClient::Data::SyncLeaderMasterRpc().
- MonoTime rpc_deadline = now;
- rpc_deadline.AddDelta(client_->default_rpc_timeout());
- mutable_retrier()->mutable_controller()->set_deadline(
- MonoTime::Earliest(rpc_deadline, retrier().deadline()));
-
- GetTableSchemaRequestPB req;
- req.mutable_table()->set_table_name(table_name_);
- client_->data_->master_proxy()->GetTableSchemaAsync(
- req, &resp_,
- mutable_retrier()->mutable_controller(),
- boost::bind(&GetTableSchemaRpc::SendRpcCb, this, Status::OK()));
-}
-
-string GetTableSchemaRpc::ToString() const {
- return Substitute("GetTableSchemaRpc(table_name: $0, num_attempts: $1)",
- table_name_, num_attempts());
-}
-
-void GetTableSchemaRpc::ResetLeaderMasterAndRetry() {
- client_->data_->SetMasterServerProxyAsync(
- client_,
- retrier().deadline(),
- Bind(&GetTableSchemaRpc::NewLeaderMasterDeterminedCb,
- Unretained(this)));
-}
-
-void GetTableSchemaRpc::NewLeaderMasterDeterminedCb(const Status& status) {
- if (status.ok()) {
- mutable_retrier()->mutable_controller()->Reset();
- SendRpc();
- } else {
- LOG(WARNING) << "Failed to determine new Master: " << status.ToString();
- mutable_retrier()->DelayedRetry(this, status);
- }
-}
-
-void GetTableSchemaRpc::SendRpcCb(const Status& status) {
- Status new_status = status;
- if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
- return;
- }
-
- if (new_status.ok() && resp_.has_error()) {
- if (resp_.error().code() == MasterErrorPB::NOT_THE_LEADER ||
- resp_.error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
- if (client_->IsMultiMaster()) {
- LOG(WARNING) << "Leader Master has changed ("
- << client_->data_->leader_master_hostport().ToString()
- << " is no longer the leader), re-trying...";
- ResetLeaderMasterAndRetry();
- return;
- }
- }
- new_status = StatusFromPB(resp_.error().status());
- }
-
- if (new_status.IsTimedOut()) {
- if (MonoTime::Now(MonoTime::FINE).ComesBefore(retrier().deadline())) {
- if (client_->IsMultiMaster()) {
- LOG(WARNING) << "Leader Master ("
- << client_->data_->leader_master_hostport().ToString()
- << ") timed out, re-trying...";
- ResetLeaderMasterAndRetry();
- return;
- }
- } else {
- // Operation deadline expired during this latest RPC.
- new_status = new_status.CloneAndPrepend(
- "GetTableSchema timed out after deadline expired");
- }
- }
-
- if (new_status.IsNetworkError()) {
- if (client_->IsMultiMaster()) {
- LOG(WARNING) << "Encountered a network error from the Master("
- << client_->data_->leader_master_hostport().ToString() << "): "
- << new_status.ToString() << ", retrying...";
- ResetLeaderMasterAndRetry();
- return;
- }
- }
-
- if (new_status.ok()) {
- gscoped_ptr<Schema> schema(new Schema());
- new_status = SchemaFromPB(resp_.schema(), schema.get());
- if (new_status.ok()) {
- delete out_schema_->schema_;
- out_schema_->schema_ = schema.release();
- new_status = PartitionSchema::FromPB(resp_.partition_schema(),
- *out_schema_->schema_,
- out_partition_schema_);
-
- *out_id_ = resp_.table_id();
- CHECK_GT(out_id_->size(), 0) << "Running against a too-old master";
- }
- }
- if (!new_status.ok()) {
- LOG(WARNING) << ToString() << " failed: " << new_status.ToString();
- }
- user_cb_.Run(new_status);
-}
-
-} // namespace internal
-
Status KuduClient::Data::GetTableSchema(KuduClient* client,
const string& table_name,
const MonoTime& deadline,
KuduSchema* schema,
PartitionSchema* partition_schema,
string* table_id) {
- Synchronizer sync;
- GetTableSchemaRpc rpc(client,
- sync.AsStatusCallback(),
- table_name,
- schema,
- partition_schema,
- table_id,
- deadline,
- messenger_);
- rpc.SendRpc();
- return sync.Wait();
+ GetTableSchemaRequestPB req;
+ GetTableSchemaResponsePB resp;
+
+ req.mutable_table()->set_table_name(table_name);
+ Status s = SyncLeaderMasterRpc<GetTableSchemaRequestPB, GetTableSchemaResponsePB>(
+ deadline, client, req, &resp,
+ "GetTableSchema", &MasterServiceProxy::GetTableSchema, {});
+ RETURN_NOT_OK(s);
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+
+ // Parse the server schema out of the response.
+ unique_ptr<Schema> new_schema(new Schema());
+ RETURN_NOT_OK(SchemaFromPB(resp.schema(), new_schema.get()));
+
+ // Parse the server partition schema out of the response.
+ PartitionSchema new_partition_schema;
+ RETURN_NOT_OK(PartitionSchema::FromPB(resp.partition_schema(),
+ *new_schema.get(),
+ &new_partition_schema));
+
+ // Parsing was successful; release the schemas to the user.
+ delete schema->schema_;
+ schema->schema_ = new_schema.release();
+ *partition_schema = std::move(new_partition_schema);
+ *table_id = resp.table_id();
+ return Status::OK();
}
void KuduClient::Data::LeaderMasterDetermined(const Status& status,
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 6999a01..f0fba94 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -384,13 +384,10 @@
&partition_schema,
&table_id));
- // In the future, probably will look up the table in some map to reuse KuduTable
- // instances.
- shared_ptr<KuduTable> ret(new KuduTable(shared_from_this(), table_name, table_id,
- schema, partition_schema));
- RETURN_NOT_OK(ret->data_->Open());
- table->swap(ret);
-
+ // TODO: in the future, probably will look up the table in some map to reuse
+ // KuduTable instances.
+ table->reset(new KuduTable(shared_from_this(), table_name, table_id,
+ schema, partition_schema));
return Status::OK();
}
diff --git a/src/kudu/client/table-internal.cc b/src/kudu/client/table-internal.cc
index c9aa5dd..ec0df68 100644
--- a/src/kudu/client/table-internal.cc
+++ b/src/kudu/client/table-internal.cc
@@ -19,22 +19,7 @@
#include <string>
-#include "kudu/client/client-internal.h"
-#include "kudu/common/wire_protocol.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/sysinfo.h"
-#include "kudu/master/master.pb.h"
-#include "kudu/master/master.proxy.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/util/monotime.h"
-
namespace kudu {
-
-using master::GetTableLocationsRequestPB;
-using master::GetTableLocationsResponsePB;
-using rpc::RpcController;
-using std::string;
-
namespace client {
using sp::shared_ptr;
@@ -54,95 +39,5 @@
KuduTable::Data::~Data() {
}
-Status KuduTable::Data::Open() {
- // TODO: fetch the schema from the master here once catalog is available.
- GetTableLocationsRequestPB req;
- GetTableLocationsResponsePB resp;
-
- MonoTime deadline = MonoTime::Now(MonoTime::FINE);
- deadline.AddDelta(client_->default_admin_operation_timeout());
-
- req.mutable_table()->set_table_id(id_);
- Status s;
- // TODO: replace this with Async RPC-retrier based RPC in the next revision,
- // adding exponential backoff and allowing this to be used safely in a
- // a reactor thread.
- while (true) {
- RpcController rpc;
-
- // Have we already exceeded our deadline?
- MonoTime now = MonoTime::Now(MonoTime::FINE);
- if (deadline.ComesBefore(now)) {
- const char* msg = "OpenTable timed out after deadline expired";
- LOG(ERROR) << msg;
- return Status::TimedOut(msg);
- }
-
- // See KuduClient::Data::SyncLeaderMasterRpc().
- MonoTime rpc_deadline = now;
- rpc_deadline.AddDelta(client_->default_rpc_timeout());
- rpc.set_deadline(MonoTime::Earliest(rpc_deadline, deadline));
-
- s = client_->data_->master_proxy()->GetTableLocations(req, &resp, &rpc);
- if (!s.ok()) {
- // Various conditions cause us to look for the leader master again.
- // It's ok if that eventually fails; we'll retry over and over until
- // the deadline is reached.
-
- if (s.IsNetworkError()) {
- LOG(WARNING) << "Network error talking to the leader master ("
- << client_->data_->leader_master_hostport().ToString() << "): "
- << s.ToString();
- if (client_->IsMultiMaster()) {
- LOG(INFO) << "Determining the leader master again and retrying.";
- WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
- "Failed to determine new Master");
- continue;
- }
- }
-
- if (s.IsTimedOut()
- && MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
- // If the RPC timed out and the operation deadline expired, we'll loop
- // again and time out for good above.
- LOG(WARNING) << "Timed out talking to the leader master ("
- << client_->data_->leader_master_hostport().ToString() << "): "
- << s.ToString();
- if (client_->IsMultiMaster()) {
- LOG(INFO) << "Determining the leader master again and retrying.";
- WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
- "Failed to determine new Master");
- continue;
- }
- }
- }
- if (s.ok() && resp.has_error()) {
- if (resp.error().code() == master::MasterErrorPB::NOT_THE_LEADER ||
- resp.error().code() == master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
- LOG(WARNING) << "Master " << client_->data_->leader_master_hostport().ToString()
- << " is no longer the leader master.";
- if (client_->IsMultiMaster()) {
- LOG(INFO) << "Determining the leader master again and retrying.";
- WARN_NOT_OK(client_->data_->SetMasterServerProxy(client_.get(), deadline),
- "Failed to determine new Master");
- continue;
- }
- }
- s = StatusFromPB(resp.error().status());
- }
- if (s.ok()) {
- break;
- } else {
- LOG(WARNING) << "Error getting table locations: " << s.ToString() << ", retrying.";
- }
-
- /* TODO: Use exponential backoff instead */
- base::SleepForMilliseconds(100);
- }
-
- VLOG(1) << "Open Table " << name_ << ", found " << resp.tablet_locations_size() << " tablets";
- return Status::OK();
-}
-
} // namespace client
} // namespace kudu
diff --git a/src/kudu/client/table-internal.h b/src/kudu/client/table-internal.h
index 0a56f0d..17c90f2 100644
--- a/src/kudu/client/table-internal.h
+++ b/src/kudu/client/table-internal.h
@@ -35,8 +35,6 @@
PartitionSchema partition_schema);
~Data();
- Status Open();
-
sp::shared_ptr<KuduClient> client_;
std::string name_;