[client] allow adding trusted CA certs for RPC connection negotiation

This patch updates Kudu C++ client API to allow for adding trusted
CA certificates into the TLS context of the client's messenger.
Doing so allows to negotiate an RPC connection with Kudu servers
when the client is using JWT for authentication.  Corresponding tests
have been updated as well.

Change-Id: I43a84a7a8e23556d85706b7714c3b40d8c568447
Reviewed-on: http://gerrit.cloudera.org:8080/19907
Tested-by: Kudu Jenkins
Reviewed-by: Zoltan Chovan <zchovan@cloudera.com>
Reviewed-by: Abhishek Chennaka <achennaka@cloudera.com>
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index a5ec83d..0c8c2c7 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -319,6 +319,11 @@
   return *this;
 }
 
+KuduClientBuilder& KuduClientBuilder::trusted_certificate(const string& cert_pem) {
+  data_->trusted_certs_pem_.emplace_back(cert_pem);
+  return *this;
+}
+
 KuduClientBuilder& KuduClientBuilder::num_reactors(int num_reactors) {
   data_->num_reactors_ = num_reactors;
   return *this;
@@ -445,6 +450,11 @@
     // currently logged-in user.
     RETURN_NOT_OK(user_credentials.SetLoggedInRealUser());
   }
+  for (const auto& cert_str : data_->trusted_certs_pem_) {
+    security::Cert cert;
+    RETURN_NOT_OK(cert.FromString(cert_str, security::DataFormat::PEM));
+    RETURN_NOT_OK(messenger->mutable_tls_context()->AddTrustedCertificate(cert));
+  }
 
   shared_ptr<KuduClient> c(new KuduClient);
   c->data_->messenger_ = std::move(messenger);
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 2640675..a1702ab 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -330,6 +330,12 @@
   /// @return Reference to the updated object.
   KuduClientBuilder& import_authentication_credentials(std::string authn_creds);
 
+  /// Add a trusted root CA certificate into the client's TLS certificate bundle.
+  ///
+  /// @param [in] cert_pem_str
+  ///   The trusted certificate to add, in PEM format.
+  KuduClientBuilder& trusted_certificate(const std::string& cert_pem);
+
   /// @brief Set the number of reactors for the RPC messenger.
   ///
   /// The reactor threads are used for sending and receiving. If not provided,
diff --git a/src/kudu/client/client_builder-internal.h b/src/kudu/client/client_builder-internal.h
index 2817124..71a0f90 100644
--- a/src/kudu/client/client_builder-internal.h
+++ b/src/kudu/client/client_builder-internal.h
@@ -45,6 +45,7 @@
   std::string sasl_protocol_name_;
   bool require_authentication_;
   EncryptionPolicy encryption_policy_;
+  std::vector<std::string> trusted_certs_pem_;
 
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
diff --git a/src/kudu/integration-tests/security-itest.cc b/src/kudu/integration-tests/security-itest.cc
index 5e22745..7604696 100644
--- a/src/kudu/integration-tests/security-itest.cc
+++ b/src/kudu/integration-tests/security-itest.cc
@@ -204,8 +204,25 @@
     return proxy.Checksum(req, &resp, &rpc);
   }
 
+  // Retrieve the cluster's IPKI certificate. Effectively, this waits until
+  // the catalog manager is initialized and has the IPKI CA ready to process
+  // requests to the /ipki-ca-cert HTTP endpoint. This also allows to have
+  // the master's TLS certificate used for RPC signed with the IPKI CA,
+  // so it will list the JWT authentication mechanism as available.
+  Status FetchClusterCACert(string* ca_cert_pem) {
+    int leader_idx;
+    RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+    const auto& http_hp = cluster_->master(leader_idx)->bound_http_hostport();
+    string url = Substitute("http://$0/ipki-ca-cert", http_hp.ToString());
+    EasyCurl curl;
+    faststring dst;
+    auto res = curl.FetchURL(url, &dst);
+    *ca_cert_pem = dst.ToString();
+    return res;
+  }
+
  private:
-  std::shared_ptr<Messenger> NewMessengerOrDie() {
+  static std::shared_ptr<Messenger> NewMessengerOrDie() {
     std::shared_ptr<Messenger> messenger;
     CHECK_OK(rpc::MessengerBuilder("test-messenger")
              .set_num_reactors(1)
@@ -520,25 +537,13 @@
   (*binary) = JoinPathSegments(DirName(exe), *binary);
 }
 
-namespace {
-
-// In this testing environment, TLS certificates of Kudu servers are
-// self-signed since Kudu uses its own IPKI. To simplify the testing, the
-// certificate chain of the client isn't modified to include the Kudu's
-// IPKI CA, so it's necessary to allow the client sending its JWT to a server
-// without trusted TLS certificate.
-void RelaxJwtAuthnClientRequirements() {
-  FLAGS_jwt_client_require_trusted_tls_cert = false;
-}
-
-} // anonymous namespace
-
 TEST_F(SecurityITest, TestJwtMiniCluster) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
   cluster_opts_.enable_kerberos = false;
   cluster_opts_.num_tablet_servers = 0;
   cluster_opts_.enable_client_jwt = true;
+
   MiniOidcOptions oidc_opts;
   const auto* const kValidAccount = "valid";
   const auto* const kInvalidAccount = "invalid";
@@ -567,10 +572,11 @@
   cluster_opts_.mini_oidc_options = std::move(oidc_opts);
   ASSERT_OK(StartCluster());
 
-  // Wait for the catalog manager to finish initialization: this is necessary
-  // to make sure the master's TLS certificate used for RPC connections is
-  // signed with the IPKI CA, so it will list JWT as authn mechanism available.
-  ASSERT_OK(cluster_->master()->WaitForCatalogManager());
+  string cluster_cert_pem;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(FetchClusterCACert(&cluster_cert_pem));
+  });
+  ASSERT_FALSE(cluster_cert_pem.empty());
 
   const auto* const kSubject = "kudu-user";
   const auto configure_builder_for =
@@ -580,10 +586,10 @@
     }
     b->jwt(cluster_->oidc()->CreateJwt(account_id, kSubject, true));
     b->require_authentication(true);
+    b->trusted_certificate(cluster_cert_pem);
     SleepFor(MonoDelta::FromMilliseconds(delay_ms));
   };
 
-  RelaxJwtAuthnClientRequirements();
   {
     SCOPED_TRACE("Valid JWT");
     KuduClientBuilder valid_builder;
@@ -623,6 +629,43 @@
     ASSERT_TRUE(s. IsNotAuthorized()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "Not authorized");
   }
+  {
+    SCOPED_TRACE("Valid JWT but client does not trust master's TLS cert");
+    KuduClientBuilder cb;
+    for (auto i = 0; i < cluster_->num_masters(); ++i) {
+      cb.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
+    }
+    cb.jwt(cluster_->oidc()->CreateJwt(kValidAccount, kSubject, true));
+    cb.require_authentication(true);
+
+    shared_ptr<KuduClient> client;
+    auto s = cb.Build(&client);
+    ASSERT_TRUE(s. IsNotAuthorized()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+        "client requires authentication, but server does not have");
+  }
+  {
+    SCOPED_TRACE("Valid JWT with relaxed requirements for server's TLS cert");
+    KuduClientBuilder cb;
+    for (auto i = 0; i < cluster_->num_masters(); ++i) {
+      cb.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
+    }
+    cb.jwt(cluster_->oidc()->CreateJwt(kValidAccount, kSubject, true));
+    cb.require_authentication(true);
+
+    // If not adding the CA certificate that Kudu RPC server certificates are
+    // signed with, in simplified test scenarios it's possible to relax the
+    // requirements at the client side of the Kudu RPC connection negotiation
+    // protocol. With --jwt_client_require_trusted_tls_cert=false, the client
+    // does not verify the server's TLS certificate before sending its JWT
+    // to the server for authentication.
+    FLAGS_jwt_client_require_trusted_tls_cert = false;
+
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cb.Build(&client));
+    vector<string> tables;
+    ASSERT_OK(client->ListTables(&tables));
+  }
 }
 
 TEST_F(SecurityITest, TestJwtMiniClusterWithInvalidCert) {
@@ -658,17 +701,18 @@
   cluster_opts_.mini_oidc_options = std::move(oidc_opts);
   ASSERT_OK(StartCluster());
 
-  // Wait for the catalog manager to finish initialization: this is necessary
-  // to make sure the master's TLS certificate used for RPC connections is
-  // signed with the IPKI CA, so it will list JWT as authn mechanism available.
-  ASSERT_OK(cluster_->master()->WaitForCatalogManager());
+  string cluster_cert_pem;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(FetchClusterCACert(&cluster_cert_pem));
+  });
+  ASSERT_FALSE(cluster_cert_pem.empty());
 
-  RelaxJwtAuthnClientRequirements();
   KuduClientBuilder client_builder;
   for (auto i = 0; i < cluster_->num_masters(); ++i) {
     client_builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
   }
   client_builder.jwt(cluster_->oidc()->CreateJwt(kValidAccount, kSubject, true));
+  client_builder.trusted_certificate(cluster_cert_pem);
   client_builder.require_authentication(true);
 
   shared_ptr<KuduClient> client;
@@ -706,17 +750,18 @@
   cluster_opts_.mini_oidc_options = std::move(oidc_opts);
   ASSERT_OK(StartCluster());
 
-  // Wait for the catalog manager to finish initialization: this is necessary
-  // to make sure the master's TLS certificate used for RPC connections is
-  // signed with the IPKI CA, so it will list JWT as authn mechanism available.
-  ASSERT_OK(cluster_->master()->WaitForCatalogManager());
+  string cluster_cert_pem;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(FetchClusterCACert(&cluster_cert_pem));
+  });
+  ASSERT_FALSE(cluster_cert_pem.empty());
 
-  RelaxJwtAuthnClientRequirements();
   KuduClientBuilder client_builder;
   for (auto i = 0; i < cluster_->num_masters(); ++i) {
     client_builder.add_master_server_addr(cluster_->master(i)->bound_rpc_addr().ToString());
   }
   client_builder.jwt(cluster_->oidc()->CreateJwt(kValidAccount, kSubject, true));
+  client_builder.trusted_certificate(cluster_cert_pem);
   client_builder.require_authentication(true);
 
   shared_ptr<KuduClient> client;