[python] allow adding trusted CA certs when using JWT

This patch updates Kudu Python client to allow for adding trusted
CA certificates when authenticating to Kudu servers via JWT.

TestJwt was correspondingly updated and re-enabled.

Change-Id: Ie13ea1877e7711d00ede41d141c01a5240bb9205
Reviewed-on: http://gerrit.cloudera.org:8080/19913
Tested-by: Kudu Jenkins
Reviewed-by: Marton Greber <greber.mx@gmail.com>
Reviewed-by: Abhishek Chennaka <achennaka@cloudera.com>
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 7c90518..f0990e5 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -62,7 +62,7 @@
 
 def connect(host, port=7051, admin_timeout_ms=None, rpc_timeout_ms=None,
             require_authentication=False, encryption_policy=ENCRYPTION_OPTIONAL,
-            jwt=None):
+            jwt=None, trusted_certificates=None):
     """
     Connect to a Kudu master server
 
@@ -83,6 +83,10 @@
       Whether to require encryption
     jwt : string, optional
       The JSON web token to set.
+    trusted_certificates : string/list, optional
+      TLS certificates to trust (in PEM format) when negotiating an RPC
+      connection with Kudu servers. This is needed only if authenticating
+      to Kudu servers using JWT.
 
     Returns
     -------
@@ -105,10 +109,13 @@
         else:
             addresses.append('{0}:{1}'.format(host, port))
 
-    return Client(addresses, admin_timeout_ms=admin_timeout_ms,
+    return Client(addresses,
+                  admin_timeout_ms=admin_timeout_ms,
                   rpc_timeout_ms=rpc_timeout_ms,
                   encryption_policy=encryption_policy,
-                  require_authentication=require_authentication, jwt=jwt)
+                  require_authentication=require_authentication,
+                  jwt=jwt,
+                  trusted_certificates=trusted_certificates)
 
 
 def timedelta(seconds=0, millis=0, micros=0, nanos=0):
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 2cdb539..edf56e3 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -293,7 +293,8 @@
     def __cinit__(self, addr_or_addrs, admin_timeout_ms=None,
                   rpc_timeout_ms=None, sasl_protocol_name=None,
                   require_authentication=False,
-                  encryption_policy=ENCRYPTION_OPTIONAL, jwt=None):
+                  encryption_policy=ENCRYPTION_OPTIONAL, jwt=None,
+                  trusted_certificates=None):
         cdef:
             string c_addr
             vector[string] c_addrs
@@ -344,6 +345,10 @@
         if jwt is not None:
             builder.jwt(tobytes(jwt))
 
+        if trusted_certificates is not None:
+            for c in trusted_certificates:
+                builder.trusted_certificate(tobytes(c))
+
         builder.encryption_policy(encryption_policy)
 
 
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index e9522b3..9cd375f 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -600,6 +600,8 @@
 
         KuduClientBuilder& jwt(const string& jwt)
 
+        KuduClientBuilder& trusted_certificate(const string& cert_pem)
+
         Status Build(shared_ptr[KuduClient]* client)
 
     cdef cppclass KuduTabletServer:
diff --git a/python/kudu/tests/common.py b/python/kudu/tests/common.py
index a59922d..85b4fef 100644
--- a/python/kudu/tests/common.py
+++ b/python/kudu/tests/common.py
@@ -63,6 +63,7 @@
 
         master_hosts = []
         master_ports = []
+        master_http_hostports = []
 
         # Start the mini-cluster control process.
         args = ["{0}/kudu".format(bin_path), "test", "mini_cluster"]
@@ -107,8 +108,11 @@
         for m in masters["getMasters"]["masters"]:
             master_hosts.append(m["boundRpcAddress"]["host"])
             master_ports.append(m["boundRpcAddress"]["port"])
+            master_http_hostports.append(
+                '{0}:{1}'.format(m["boundHttpAddress"]["host"],
+                                 m["boundHttpAddress"]["port"]))
 
-        return p, master_hosts, master_ports
+        return p, master_hosts, master_ports, master_http_hostports
 
     @classmethod
     def stop_cluster(cls):
@@ -119,7 +123,9 @@
 
     @classmethod
     def setUpClass(cls):
-        cls.cluster_proc, cls.master_hosts, cls.master_ports = cls.start_cluster()
+        cls.cluster_proc, cls.master_hosts, cls.master_ports, \
+                cls.master_http_hostports = cls.start_cluster()
+
         cls.client = kudu.connect(cls.master_hosts, cls.master_ports)
 
         cls.schema = cls.example_schema()
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 7c836d7..d3bb965 100755
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -29,8 +29,12 @@
                          KuduValue)
 import kudu
 import datetime
-import unittest
 from pytz import utc
+try:
+    from urllib.error import HTTPError
+    from urllib.request import Request, urlopen
+except ImportError:
+    from urllib2 import Request, HTTPError, urlopen
 
 
 class TestClient(KuduTestBase, CompatUnitTest):
@@ -896,18 +900,43 @@
                      require_authentication=True)
 
 class TestJwt(KuduTestBase, CompatUnitTest):
-    @unittest.skip("needs Kudu IPKI CA cert to be in the client's cert bundle")
     def test_jwt(self):
-        jwt = self.get_jwt(valid=True)
-        client = kudu.connect(self.master_hosts, self.master_ports,
-                              require_authentication=True, jwt=jwt)
+        certs=[]
+        for hp in self.master_http_hostports:
+            req = Request("http://{0}/ipki-ca-cert".format(hp))
+            try:
+                resp = urlopen(req)
+                certs.append(resp.read())
+                break
+            except HTTPError as e:
+                if e.code == 503:
+                    continue
+                else:
+                    raise
 
+        self.assertNotEqual(0, len(certs))
+
+        # A sub-case of valid JWT: the client should be able to successfully
+        # connect to the cluster.
+        jwt = self.get_jwt(valid=True)
+        client = kudu.connect(self.master_hosts,
+                              self.master_ports,
+                              require_authentication=True,
+                              jwt=jwt,
+                              trusted_certificates=certs)
+
+        # A sub-case of invalid JWT: the client should fail connecting to the
+        # cluster, and the error message should contain corresponding details
+        # on JWT authentication failure.
         jwt = self.get_jwt(valid=False)
         error_msg = ('FATAL_INVALID_JWT: Not authorized: JWT verification failed: ' +
         'failed to verify signature: VerifyFinal failed')
         with self.assertRaisesRegex(kudu.KuduBadStatus, error_msg):
-            client = kudu.connect(self.master_hosts, self.master_ports,
-                              require_authentication=True, jwt=jwt)
+            client = kudu.connect(self.master_hosts,
+                                  self.master_ports,
+                                  require_authentication=True,
+                                  jwt=jwt,
+                                  trusted_certificates=certs)
 
 class TestMonoDelta(CompatUnitTest):