util: pull jwt code from Impala

Change-Id: Ib546762aa7ff53009f32688c30c5642ee4df494b
Reviewed-on: http://gerrit.cloudera.org:8080/18467
Reviewed-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Tested-by: Kudu Jenkins
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 7fe9b5f..e1713e1 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -353,6 +353,17 @@
   gutil)
 
 #######################################
+# kudu_jwt_util
+#######################################
+add_library(kudu_jwt_util
+  jwt-util.cc)
+target_link_libraries(kudu_jwt_util
+  gutil
+  security
+  kudu_curl_util
+  kudu_util)
+
+#######################################
 # kudu_cloud_util
 #######################################
 add_library(kudu_cloud_util
@@ -612,6 +623,15 @@
 endif()
 
 #######################################
+# jwt-util-test
+#######################################
+ADD_KUDU_TEST(jwt-util-test)
+if(NOT NO_TESTS)
+  target_link_libraries(jwt-util-test
+    kudu_jwt_util)
+endif()
+
+#######################################
 # instance_detector-test
 #######################################
 ADD_KUDU_TEST(cloud/instance_detector-test)
diff --git a/src/kudu/util/jwt-util-internal.h b/src/kudu/util/jwt-util-internal.h
new file mode 100644
index 0000000..49b7b3d
--- /dev/null
+++ b/src/kudu/util/jwt-util-internal.h
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Copied from Impala and adapted to Kudu.
+
+#pragma once
+
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include <jwt-cpp/jwt.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/promise.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/status.h"
+
+using DecodedJWT = jwt::decoded_jwt<jwt::traits::kazuho_picojson>;
+using JWTVerifier = jwt::verifier<jwt::default_clock, jwt::traits::kazuho_picojson>;
+
+namespace kudu {
+
+// Key-Value map for parsing Json keys.
+typedef std::unordered_map<std::string, std::string> JsonKVMap;
+
+// JWTPublicKey:
+// This class represent cryptographic public key for JSON Web Token (JWT) verification.
+class JWTPublicKey {
+ public:
+    JWTPublicKey(std::string algorithm, std::string pub_key)
+    : verifier_(jwt::verify()), algorithm_(std::move(algorithm)), public_key_(std::move(pub_key)) {}
+
+  // Verify the given decoded token.
+  Status Verify(const DecodedJWT& decoded_jwt, const std::string& algorithm) const;
+
+  const std::string& get_algorithm() const { return algorithm_; }
+  const std::string& get_key() const { return public_key_; }
+
+ protected:
+  // JWT Verifier.
+  JWTVerifier verifier_;
+
+ private:
+  // Signing Algorithm:
+  // Currently support following JSON Web Algorithms (JWA):
+  // HS256, HS384, HS512, RS256, RS384, and RS512.
+  const std::string algorithm_;
+  // Public key value:
+  // For EC and RSA families of algorithms, it's the public key converted in PEM-encoded
+  // format since jwt-cpp APIs only accept EC/RSA public keys in PEM-encoded format.
+  // For HMAC-SHA2, it's Octet Sequence key representing secret key.
+  const std::string public_key_;
+};
+
+// JWT Public Key for HS256.
+// HS256: HMAC using SHA-256.
+class HS256JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw JWT exception if failed to initialize the verifier.
+  HS256JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs256(pub_key));
+  }
+};
+
+// JWT Public Key for HS384.
+// HS384: HMAC using SHA-384.
+class HS384JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw exception if failed to initialize the JWT verifier.
+  HS384JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs384(pub_key));
+  }
+};
+
+// JWT Public Key for HS512.
+// HS512: HMAC using SHA-512.
+class HS512JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw JWT exception if failed to initialize the verifier.
+  HS512JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs512(pub_key));
+  }
+};
+
+// JWT Public Key for RS256.
+// RS256: RSASSA-PKCS1-v1_5 using SHA-256.
+class RS256JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw JWT exception if failed to initialize the verifier.
+  RS256JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs256(pub_key, "", "", ""));
+  }
+};
+
+// JWT Public Key for RS384.
+// RS384: RSASSA-PKCS1-v1_5 using SHA-384.
+class RS384JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw exception if failed to initialize the JWT verifier.
+  RS384JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs384(pub_key, "", "", ""));
+  }
+};
+
+// JWT Public Key for RS512.
+// RS512: RSASSA-PKCS1-v1_5 using SHA-512.
+class RS512JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw JWT exception if failed to initialize the verifier.
+  RS512JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs512(pub_key, "", "", ""));
+  }
+};
+
+// JWT Public Key for PS256.
+// PS256: RSASSA-PSS using SHA-256 and MGF1 with SHA-256.
+// RSASSA-PSS is the probabilistic version of RSA.
+class PS256JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw JWT exception if failed to initialize the verifier.
+  PS256JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::ps256(pub_key, "", "", ""));
+  }
+};
+
+// JWT Public Key for PS384.
+// PS384: RSASSA-PSS using SHA-384 and MGF1 with SHA-384.
+class PS384JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw exception if failed to initialize the JWT verifier.
+  PS384JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::ps384(pub_key, "", "", ""));
+  }
+};
+
+// JWT Public Key for PS512.
+// PS512: RSASSA-PSS using SHA-512 and MGF1 with SHA-512.
+class PS512JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw JWT exception if failed to initialize the verifier.
+  PS512JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::ps512(pub_key, "", "", ""));
+  }
+};
+
+// JWT Public Key for ES256.
+// ES256: ECDSA using P-256 and SHA-256.
+class ES256JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw JWT exception if failed to initialize the verifier.
+  ES256JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::es256(pub_key, "", "", ""));
+  }
+};
+
+// JWT Public Key for ES384.
+// ES384: ECDSA using P-384 and SHA-384.
+class ES384JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw exception if failed to initialize the JWT verifier.
+  ES384JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::es384(pub_key, "", "", ""));
+  }
+};
+
+// JWT Public Key for ES512.
+// ES512: ECDSA using P-521 and SHA-512.
+class ES512JWTPublicKey : public JWTPublicKey {
+ public:
+  // Throw JWT exception if failed to initialize the verifier.
+  ES512JWTPublicKey(std::string algorithm, const std::string& pub_key)
+      : JWTPublicKey(std::move(algorithm), pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::es512(pub_key, "", "", ""));
+  }
+};
+
+// Construct a JWKPublicKey of HS from the JWK.
+class HSJWTPublicKeyBuilder {
+ public:
+  static Status CreateJWKPublicKey(const JsonKVMap& kv_map, JWTPublicKey** pub_key_out);
+};
+
+// Construct a JWKPublicKey of RSA from the JWK.
+class RSAJWTPublicKeyBuilder {
+ public:
+  static Status CreateJWKPublicKey(const JsonKVMap& kv_map, JWTPublicKey** pub_key_out);
+
+ private:
+  // Convert public key of RSA from JWK format to PEM encoded format by using OpenSSL
+  // APIs.
+  static bool ConvertJwkToPem(
+      const std::string& base64_n, const std::string& base64_e, std::string& pub_key);
+};
+
+// Construct a JWKPublicKey of EC from the JWK.
+class ECJWTPublicKeyBuilder {
+ public:
+  static Status CreateJWKPublicKey(const JsonKVMap& kv_map, JWTPublicKey** pub_key_out);
+
+ private:
+  // Convert public key of EC from JWK format to PEM encoded format by using OpenSSL
+  // APIs.
+  static bool ConvertJwkToPem(int eccgrp, const std::string& base64_x,
+      const std::string& base64_y, std::string& pub_key);
+};
+
+// This class load the JWKS from file or URL, store keys in an internal maps for each
+// family of algorithms, and provides API to retrieve key by key-id.
+// It's a snapshot of the current JWKS. The JWKSMgr maintains a consistent copy of this
+// and updates it atomically when the public keys in JWKS are changed. Clients can obtain
+// an immutable copy. Class instances can be created through the implicitly-defined
+// default and copy constructors.
+class JWKSSnapshot {
+ public:
+  JWKSSnapshot() = default;
+  JWKSSnapshot(const JWKSSnapshot&) = default;
+
+  // Map from a key ID (kid) to a JWTPublicKey.
+  typedef std::unordered_map<std::string, std::unique_ptr<JWTPublicKey>> JWTPublicKeyMap;
+
+  // Load JWKS stored in a JSON file. Returns an error if problems were encountered
+  // while parsing/constructing the Json Web keys. If no keys were given in the file,
+  // the internal maps will be empty.
+  Status LoadKeysFromFile(const std::string& jwks_file_path);
+  // Download JWKS JSON file from the given URL, then load the public keys if the
+  // checksum of JWKS object is changed. If no keys were given in the URL, the internal
+  // maps will be empty.
+  Status LoadKeysFromUrl(
+      const std::string& jwks_url, uint64_t cur_jwks_checksum, bool* is_changed);
+
+  // Look up the key ID in the internal key maps and returns the key if the lookup was
+  // successful, otherwise return nullptr.
+  const JWTPublicKey* LookupRSAPublicKey(const std::string& kid) const;
+  const JWTPublicKey* LookupHSKey(const std::string& kid) const;
+  const JWTPublicKey* LookupECPublicKey(const std::string& kid) const;
+
+  // Return number of keys for each family of algorithms.
+  int GetHSKeyNum() const { return static_cast<int>(hs_key_map_.size()); }
+  // Return number of keys for RSA.
+  int GetRSAPublicKeyNum() const { return static_cast<int>(rsa_pub_key_map_.size()); }
+  // Return number of keys for EC.
+  int GetECPublicKeyNum() const { return static_cast<int>(ec_pub_key_map_.size()); }
+
+  // Return all keys for HS.
+  const JWTPublicKeyMap* GetAllHSKeys() const { return &hs_key_map_; }
+  // Return all keys for RSA.
+  const JWTPublicKeyMap* GetAllRSAPublicKeys() const { return &rsa_pub_key_map_; }
+  // Return all keys for EC.
+  const JWTPublicKeyMap* GetAllECPublicKeys() const { return &ec_pub_key_map_; }
+
+  // Return TRUE if there is no key.
+  bool IsEmpty() const {
+    return hs_key_map_.empty() && rsa_pub_key_map_.empty() && ec_pub_key_map_.empty();
+  }
+
+  uint64_t GetChecksum() const { return jwks_checksum_; }
+
+ private:
+  friend class JWKSetParser;
+
+  // Following two functions are called inside Init().
+  // Add a RSA public key.
+  void AddRSAPublicKey(const std::string& key_id, JWTPublicKey* jwk_pub_key);
+  // Add a HS key.
+  void AddHSKey(const std::string& key_id, JWTPublicKey* jwk_pub_key);
+  // Add an EC public key.
+  void AddECPublicKey(const std::string& key_id, JWTPublicKey* jwk_pub_key);
+
+  // Note: According to section 4.5 of RFC 7517 (JSON Web Key), different keys might use
+  // the same "kid" value is if they have different "kty" (key type) values but are
+  // considered to be equivalent alternatives by the application using them. So keys
+  // for each "kty" are saved in different maps.
+
+  // Octet Sequence keys for HS256 (HMAC using SHA-256), HS384 and HS512.
+  // kty (key type): oct.
+  JWTPublicKeyMap hs_key_map_;
+  // Public keys for RSA family of algorithms: RS256, RS384, RS512, PS256, PS384, PS512.
+  // kty (key type): RSA.
+  JWTPublicKeyMap rsa_pub_key_map_;
+  // Public keys for EC family of algorithms: ES256, ES384, ES512.
+  // kty (key type): EC.
+  JWTPublicKeyMap ec_pub_key_map_;
+
+  // 64 bit checksum of JWKS object.
+  // This variable is only used when downloading JWKS from the given URL.
+  uint64_t jwks_checksum_ = 0;
+};
+
+// An immutable shared JWKS snapshot.
+typedef std::shared_ptr<const JWKSSnapshot> JWKSSnapshotPtr;
+
+// JSON Web Key Set (JWKS) conveys the public keys used by the signing party to the
+// clients that need to validate signatures. It represents a cryptographic key set in
+// JSON data structure.
+// This class works as JWKS manager, which load the JWKS from local file or URL.
+// Init() should be called during the initialization of the daemon.
+// The class is thread safe.
+class JWKSMgr {
+ public:
+   JWKSMgr() {}
+
+  // Destructor is only called for backend tests
+  ~JWKSMgr();
+
+  // Load JWKS stored in a JSON file. Returns an error if problems were encountered
+  // while parsing/constructing the Json Web keys. If no keys were given in the file,
+  // the internal maps will be empty.
+  // If the given jwks_uri is a URL, start a working thread which will periodically
+  // checks the JWKS URL for updates. This provides support for key rotation.
+  Status Init(const std::string& jwks_uri, bool is_local_file);
+
+  // Returns a read only snapshot of the current JWKS. This function should be called
+  // after calling Init().
+  JWKSSnapshotPtr GetJWKSSnapshot() const;
+
+ private:
+  // Atomically replaces a JWKS snapshot with a new copy.
+  void SetJWKSSnapshot(const JWKSSnapshotPtr& new_jwks);
+
+  // Helper function for working thread which periodically checks the JWKS URL for
+  // updates.
+  void UpdateJWKSThread();
+
+  // Thread that runs UpdateJWKSThread(). This thread will exit when the
+  // shut_down_promise_ is set.
+  scoped_refptr<Thread> jwks_update_thread_;
+  Promise<bool> shut_down_promise_;
+
+  // JWKS URI.
+  std::string jwks_uri_;
+
+  // The snapshot of the current JWKS. When the checksum of downloaded JWKS json object
+  // has been changed, the public keys will be reloaded and the content of this pointer
+  // will be atomically swapped.
+  JWKSSnapshotPtr current_jwks_;
+  // 64 bit checksum of current JWKS object.
+  uint64_t current_jwks_checksum_ = 0;
+
+  // Protects current_jwks_.
+  mutable std::mutex current_jwks_lock_;
+};
+
+} // namespace kudu
diff --git a/src/kudu/util/jwt-util-test.cc b/src/kudu/util/jwt-util-test.cc
new file mode 100644
index 0000000..b035ad0
--- /dev/null
+++ b/src/kudu/util/jwt-util-test.cc
@@ -0,0 +1,1148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Copied from Impala and adapted to Kudu.
+
+#include "kudu/util/jwt-util.h"
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <chrono>
+#include <cstdio> // file stuff
+#include <iostream>
+#include <memory>
+#include <string>
+#include <type_traits>
+
+#include <gtest/gtest.h>
+#include <jwt-cpp/jwt.h>
+#include <jwt-cpp/traits/kazuho-picojson/defaults.h>
+#include <jwt-cpp/traits/kazuho-picojson/traits.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/jwt-util-internal.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+namespace kudu {
+
+using std::string;
+using strings::Substitute;
+
+const std::string kRsaPrivKeyPem = R"(-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC4ZtdaIrd1BPIJ
+tfnF0TjIK5inQAXZ3XlCrUlJdP+XHwIRxdv1FsN12XyMYO/6ymLmo9ryoQeIrsXB
+XYqlET3zfAY+diwCb0HEsVvhisthwMU4gZQu6TYW2s9LnXZB5rVtcBK69hcSlA2k
+ZudMZWxZcj0L7KMfO2rIvaHw/qaVOE9j0T257Z8Kp2CLF9MUgX0ObhIsdumFRLaL
+DvDUmBPr2zuh/34j2XmWwn1yjN/WvGtdfhXW79Ki1S40HcWnygHgLV8sESFKUxxQ
+mKvPUTwDOIwLFL5WtE8Mz7N++kgmDcmWMCHc8kcOIu73Ta/3D4imW7VbKgHZo9+K
+3ESFE3RjAgMBAAECggEBAJTEIyjMqUT24G2FKiS1TiHvShBkTlQdoR5xvpZMlYbN
+tVWxUmrAGqCQ/TIjYnfpnzCDMLhdwT48Ab6mQJw69MfiXwc1PvwX1e9hRscGul36
+ryGPKIVQEBsQG/zc4/L2tZe8ut+qeaK7XuYrPp8bk/X1e9qK5m7j+JpKosNSLgJj
+NIbYsBkG2Mlq671irKYj2hVZeaBQmWmZxK4fw0Istz2WfN5nUKUeJhTwpR+JLUg4
+ELYYoB7EO0Cej9UBG30hbgu4RyXA+VbptJ+H042K5QJROUbtnLWuuWosZ5ATldwO
+u03dIXL0SH0ao5NcWBzxU4F2sBXZRGP2x/jiSLHcqoECgYEA4qD7mXQpu1b8XO8U
+6abpKloJCatSAHzjgdR2eRDRx5PMvloipfwqA77pnbjTUFajqWQgOXsDTCjcdQui
+wf5XAaWu+TeAVTytLQbSiTsBhrnoqVrr3RoyDQmdnwHT8aCMouOgcC5thP9vQ8Us
+rVdjvRRbnJpg3BeSNimH+u9AHgsCgYEA0EzcbOltCWPHRAY7B3Ge/AKBjBQr86Kv
+TdpTlxePBDVIlH+BM6oct2gaSZZoHbqPjbq5v7yf0fKVcXE4bSVgqfDJ/sZQu9Lp
+PTeV7wkk0OsAMKk7QukEpPno5q6tOTNnFecpUhVLLlqbfqkB2baYYwLJR3IRzboJ
+FQbLY93E8gkCgYB+zlC5VlQbbNqcLXJoImqItgQkkuW5PCgYdwcrSov2ve5r/Acz
+FNt1aRdSlx4176R3nXyibQA1Vw+ztiUFowiP9WLoM3PtPZwwe4bGHmwGNHPIfwVG
+m+exf9XgKKespYbLhc45tuC08DATnXoYK7O1EnUINSFJRS8cezSI5eHcbQKBgQDC
+PgqHXZ2aVftqCc1eAaxaIRQhRmY+CgUjumaczRFGwVFveP9I6Gdi+Kca3DE3F9Pq
+PKgejo0SwP5vDT+rOGHN14bmGJUMsX9i4MTmZUZ5s8s3lXh3ysfT+GAhTd6nKrIE
+kM3Nh6HWFhROptfc6BNusRh1kX/cspDplK5x8EpJ0QKBgQDWFg6S2je0KtbV5PYe
+RultUEe2C0jYMDQx+JYxbPmtcopvZQrFEur3WKVuLy5UAy7EBvwMnZwIG7OOohJb
+vkSpADK6VPn9lbqq7O8cTedEHttm6otmLt8ZyEl3hZMaL3hbuRj6ysjmoFKx6CrX
+rK0/Ikt5ybqUzKCMJZg2VKGTxg==
+-----END PRIVATE KEY-----)";
+const std::string kRsaPubKeyPem = R"(-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuGbXWiK3dQTyCbX5xdE4
+yCuYp0AF2d15Qq1JSXT/lx8CEcXb9RbDddl8jGDv+spi5qPa8qEHiK7FwV2KpRE9
+83wGPnYsAm9BxLFb4YrLYcDFOIGULuk2FtrPS512Qea1bXASuvYXEpQNpGbnTGVs
+WXI9C+yjHztqyL2h8P6mlThPY9E9ue2fCqdgixfTFIF9Dm4SLHbphUS2iw7w1JgT
+69s7of9+I9l5lsJ9cozf1rxrXX4V1u/SotUuNB3Fp8oB4C1fLBEhSlMcUJirz1E8
+AziMCxS+VrRPDM+zfvpIJg3JljAh3PJHDiLu902v9w+Iplu1WyoB2aPfitxEhRN0
+YwIDAQAB
+-----END PUBLIC KEY-----)";
+// The public keys in JWK format were converted from PEM formatted crypto keys with
+// pem-to-jwk tool at https://hub.docker.com/r/danedmunds/pem-to-jwk/
+const std::string kRsaPubKeyJwkN =
+    "uGbXWiK3dQTyCbX5xdE4yCuYp0AF2d15Qq1JSXT_lx8CEcXb9RbDddl8jGDv-sp"
+    "i5qPa8qEHiK7FwV2KpRE983wGPnYsAm9BxLFb4YrLYcDFOIGULuk2FtrPS512Qe"
+    "a1bXASuvYXEpQNpGbnTGVsWXI9C-yjHztqyL2h8P6mlThPY9E9ue2fCqdgixfTF"
+    "IF9Dm4SLHbphUS2iw7w1JgT69s7of9-I9l5lsJ9cozf1rxrXX4V1u_SotUuNB3F"
+    "p8oB4C1fLBEhSlMcUJirz1E8AziMCxS-VrRPDM-zfvpIJg3JljAh3PJHDiLu902"
+    "v9w-Iplu1WyoB2aPfitxEhRN0Yw";
+const std::string kRsaPubKeyJwkE = "AQAB";
+const std::string kRsaInvalidPubKeyJwkN =
+    "xzYuc22QSst_dS7geYYK5l5kLxU0tayNdixkEQ17ix-CUcUbKIsnyftZxaCYT46"
+    "rQtXgCaYRdJcbB3hmyrOavkhTpX79xJZnQmfuamMbZBqitvscxW9zRR9tBUL6vd"
+    "i_0rpoUwPMEh8-Bw7CgYR0FK0DhWYBNDfe9HKcyZEv3max8Cdq18htxjEsdYO0i"
+    "wzhtKRXomBWTdhD5ykd_fACVTr4-KEY-IeLvubHVmLUhbE5NgWXxrRpGasDqzKh"
+    "CTmsa2Ysf712rl57SlH0Wz_Mr3F7aM9YpErzeYLrl0GhQr9BVJxOvXcVd4kmY-X"
+    "kiCcrkyS1cnghnllh-LCwQu1sYw";
+
+const std::string kRsa512PrivKeyPem = R"(-----BEGIN RSA PRIVATE KEY-----
+MIICWwIBAAKBgQDdlatRjRjogo3WojgGHFHYLugdUWAY9iR3fy4arWNA1KoS8kVw
+33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQsHUfQrSDv+MuSUMAe8jzKE4qW
++jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5Do2kQ+X5xK9cipRgEKwIDAQAB
+AoGAD+onAtVye4ic7VR7V50DF9bOnwRwNXrARcDhq9LWNRrRGElESYYTQ6EbatXS
+3MCyjjX2eMhu/aF5YhXBwkppwxg+EOmXeh+MzL7Zh284OuPbkglAaGhV9bb6/5Cp
+uGb1esyPbYW+Ty2PC0GSZfIXkXs76jXAu9TOBvD0ybc2YlkCQQDywg2R/7t3Q2OE
+2+yo382CLJdrlSLVROWKwb4tb2PjhY4XAwV8d1vy0RenxTB+K5Mu57uVSTHtrMK0
+GAtFr833AkEA6avx20OHo61Yela/4k5kQDtjEf1N0LfI+BcWZtxsS3jDM3i1Hp0K
+Su5rsCPb8acJo5RO26gGVrfAsDcIXKC+bQJAZZ2XIpsitLyPpuiMOvBbzPavd4gY
+6Z8KWrfYzJoI/Q9FuBo6rKwl4BFoToD7WIUS+hpkagwWiz+6zLoX1dbOZwJACmH5
+fSSjAkLRi54PKJ8TFUeOP15h9sQzydI8zJU+upvDEKZsZc/UhT/SySDOxQ4G/523
+Y0sz/OZtSWcol/UMgQJALesy++GdvoIDLfJX5GBQpuFgFenRiRDabxrE9MNUZ2aP
+FaFp+DyAe+b4nDwuJaW2LURbr8AEZga7oQj0uYxcYw==
+-----END RSA PRIVATE KEY-----)";
+const std::string kRsa512PubKeyPem = R"(-----BEGIN PUBLIC KEY-----
+MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDdlatRjRjogo3WojgGHFHYLugd
+UWAY9iR3fy4arWNA1KoS8kVw33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQs
+HUfQrSDv+MuSUMAe8jzKE4qW+jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5D
+o2kQ+X5xK9cipRgEKwIDAQAB
+-----END PUBLIC KEY-----)";
+const std::string kRsa512PubKeyJwkN =
+    "3ZWrUY0Y6IKN1qI4BhxR2C7oHVFgGPYkd38uGq1jQNSqEvJFcN93CYm16_G78FA"
+    "FKWqwsJb3Wx-nbxDn6LtP4AhULB1H0K0g7_jLklDAHvI8yhOKlvoyvsUFPWtNxl"
+    "Jyh5JJXvkNKV_4Oo12e69f8QCuQ6NpEPl-cSvXIqUYBCs";
+const std::string kRsa512PubKeyJwkE = "AQAB";
+const std::string kRsa512InvalidPubKeyJwkN =
+    "xzYuc22QSst_dS7geYYK5l5kLxU0tayNdixkEQ17ix-CUcUbKIsnyftZxaCYT46"
+    "rQtXgCaYRdJcbB3hmyrOavkhTpX79xJZnQmfuamMbZBqitvscxW9zRR9tBUL6vd"
+    "i_0rpoUwPMEh8-Bw7CgYR0FK0DhWYBNDfe9HKcyZEv3max8Cdq18htxjEsdYO0i"
+    "wzhtKRXomBWTdhD5ykd_fACVTr4-KEY-IeLvubHVmLUhbE5NgWXxrRpGasDqzKh"
+    "CTmsa2Ysf712rl57SlH0Wz_Mr3F7aM9YpErzeYLrl0GhQr9BVJxOvXcVd4kmY-X"
+    "kiCcrkyS1cnghnllh-LCwQu1sYw";
+
+const std::string kRsa1024PrivKeyPem = R"(-----BEGIN RSA PRIVATE KEY-----
+MIICXgIBAAKBgQDT+6sb2SvN69NB+6Zg78B7mdke0tC91CTfixzCSn7wS8JUvvZK
+AO1uMgnrCQdDr2TNeRYr6urawIOCDB1Ybz1+cBSNxouVdt/aT9+cw27kzVQE59NA
+PMpQyLtXaAOR6rD8xzyIgAV12QFmc1kHFl7Sjobwmsu5ZWRqYTwdXvFXIQIDAQAB
+AoGBAJKDLxBgWVZJ2AmS1LvK+U50VwxmyL9rENEwZQAkXPfYZMgN9EvRuEihbRl1
+c//kCde6CQjxpMDsrfgER4QH3odypQWT9A5uXKcdfu/z+xKNtB813rSrew3Q9pXe
+wlOb0q7EcS7XHMrcPxj4gvn2yKqB40vF3TIY6oiSeZbFLUvBAkEA9NaTrGB1+FZj
++3lIAs7UtYbxNggX53OEcXlstDbqhG3O9SzAHiccMbGu2lDBcAAghmtg9poT0Uo6
+V3VCJcnfNwJBAN2lppZFVWAXOLD2k8OMCp4jc9pRHIUtPU6kWoflU8O6kuDNNamD
+AeNMhdHX+Ed/Js3ig75eAGxsd9q+CFp/uGcCQQDFfGb0/YFqZFSVPMhm62oLWeMq
+T/DoEfdciDK0Ui9rzh7HB+eW6rkFJGsDUWwV6SRTCD3X64PcpuDUNpK6ZFCVAkEA
+oaBgAAiDH1UPpAvK6LfALl0P6E1pjLvWjvhOg/Z4xKvS21cJIJlF0ShGFSV2CTzx
+YQUiqLkHegkGxV353XRxVQJAZaW5O2BI5jKy2hK0EoAx3pSnp2X4CmkWrXsSeOgC
+Zz+jDkn8QzPbRwb8cyks/IHc2CBvaFStLFKO2VQj1THDhw==
+-----END RSA PRIVATE KEY-----)";
+const std::string kRsa1024PubKeyPem = R"(-----BEGIN PUBLIC KEY-----
+MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDT+6sb2SvN69NB+6Zg78B7mdke
+0tC91CTfixzCSn7wS8JUvvZKAO1uMgnrCQdDr2TNeRYr6urawIOCDB1Ybz1+cBSN
+xouVdt/aT9+cw27kzVQE59NAPMpQyLtXaAOR6rD8xzyIgAV12QFmc1kHFl7Sjobw
+msu5ZWRqYTwdXvFXIQIDAQAB
+-----END PUBLIC KEY-----)";
+const std::string kRsa1024PubKeyJwkN =
+    "0_urG9krzevTQfumYO_Ae5nZHtLQvdQk34scwkp-8EvCVL72SgDtbjIJ6wkHQ69"
+    "kzXkWK-rq2sCDggwdWG89fnAUjcaLlXbf2k_fnMNu5M1UBOfTQDzKUMi7V2gDke"
+    "qw_Mc8iIAFddkBZnNZBxZe0o6G8JrLuWVkamE8HV7xVyE";
+const std::string kRsa1024PubKeyJwkE = "AQAB";
+
+const std::string kRsa2048PrivKeyPem = R"(-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEA0jCHomsNIaRYVlsemWg9yBx3od1B9Fd9RUslk9IVE7IU+QYZ
++T4NvRVPAMjpzuurvPnN4uBVPREycXOgEcWHiJDDQEhlQD4F69W8MFE7SXpdcBih
+zcj5qPYtTFP/52s6Vg7Y8SAUkBDyr0B442ONR1SBD8qEMAxpiLMH1Q/Yap+etvIj
+D1r2zQkQke53An9LvVl7OKkM8KGOcE/0tJRmc7x8ZlLqogPczkXvW6T+YAkwA8Xw
+ginZw0xBzfpoOEnajqm4Yikck0gJ0HwdlYFIp72ih7uozne7PYLVGb9X97cL0H1X
+DiA/SXJiFKo1AKXihcOdIRiw49eo9rzsoWPygQIDAQABAoIBAADe2BT1XgojYNqc
+s9P9UUeEof80Mst6WEe4RQknb9RozVBEX55Ut4sEAqjVbC3MnpBgtXhTfFmNem4W
+BUCa7DyFzZ/fcjc8T9sh7mQB1h3FXraHN5ZUrH9auPsjBuvfBGW/rSjUfJlQefzS
+psgu950Rwxtnt+PuDTrWc6QaKx0ylvESKPIaVoticc11Kcts5Fe/RQ2Az2epDDM7
+ptZamvtzptozPPq5YUIvpSnKCJfzOczAQT4omVewJV/7nbo/MdCALExrqHcIqXFp
+2uMpHV1QhqZ160Bzf1O+iDRCxT3rd4OZ5Y68x/fYV8dRqrqPA5BFep6ukf17cnWM
+svDqsaUCgYEA+Z5RbadUKteAM3v1Deu9RG7TucnxyoNSofpEuwMoVxo3+z+dS44v
+UpC7/MJhx1FBf15yKSPIgtjt5o/LanApcJEZVyghucsNvqy11db027P63NkIL/ic
+AgB04odLvxpgLHNv/qEWy7zHBLHhcazajzDHW+a/xBXrtJa3i2G+poUCgYEA15Ap
+OJPafAx/BPMbrYthpd5pVX5AMExXTur7rMIPi4/wh0O0vqGtulwgX3FiS0X4bAzK
+tNJ23/V2RR0F16IAIVZQqt16pIvmhx52iC55EPp3bZWkGhZ33/8Dxzkbe+rlwECa
+wRK4dOyA9hwsnlRuEb8OHva6sr+EusOxmeN6Us0CgYEAg4O/QTe057GM0RNRJFl8
+6a4+jRdx9hHEmqTCS4m5WlLtBcoZdLJgCm9JLD25yIruKE45daVtwkrK5PwD33ti
+yfUY1cvGIR5zim9yikzry0mDNZJ/ds7UW1WkP6mq5e/elezoJ871tLgsXzPdJMg+
+iszXbHshtA0cl5QE9kG0cgUCgYEAzZf3WLjbxzh75RKhMVIgnfyU5i91tRr6opBH
+3atw/CEavUf8GV1GvtmjHqSbpUNk/ljs9K1PJ6eLV7uomNMv4JvccDqxAENWaUTK
+tHPukBzyzxfL3f3T81XcGqUC65tL6aM0djUOrKXtEc4pWBEasd5Q74NO6bD0PNTs
+jOODBXkCgYEAhaD3gZUCWU+ZA6QmxPotfe9L0tzjmUjsLo0QUgIHJa2VaoHzdnWC
+ClvP3tFFkv2dlD6UW+g0JJFTVWcv+HEiC9WUnD/C6dXK/qA3fRvBhRKy8FTwvOis
+zSVeYds6mvDJwFe+2mk0KQiKnxlx22B4PcYbbN7mZ2ClBFTFrp0+Id4=
+-----END RSA PRIVATE KEY-----)";
+const std::string kRsa2048PubKeyPem = R"(-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA0jCHomsNIaRYVlsemWg9
+yBx3od1B9Fd9RUslk9IVE7IU+QYZ+T4NvRVPAMjpzuurvPnN4uBVPREycXOgEcWH
+iJDDQEhlQD4F69W8MFE7SXpdcBihzcj5qPYtTFP/52s6Vg7Y8SAUkBDyr0B442ON
+R1SBD8qEMAxpiLMH1Q/Yap+etvIjD1r2zQkQke53An9LvVl7OKkM8KGOcE/0tJRm
+c7x8ZlLqogPczkXvW6T+YAkwA8XwginZw0xBzfpoOEnajqm4Yikck0gJ0HwdlYFI
+p72ih7uozne7PYLVGb9X97cL0H1XDiA/SXJiFKo1AKXihcOdIRiw49eo9rzsoWPy
+gQIDAQAB
+-----END PUBLIC KEY-----)";
+const std::string kRsa2048PubKeyJwkN =
+    "0jCHomsNIaRYVlsemWg9yBx3od1B9Fd9RUslk9IVE7IU-QYZ-T4NvRVPAMjpzuu"
+    "rvPnN4uBVPREycXOgEcWHiJDDQEhlQD4F69W8MFE7SXpdcBihzcj5qPYtTFP_52"
+    "s6Vg7Y8SAUkBDyr0B442ONR1SBD8qEMAxpiLMH1Q_Yap-etvIjD1r2zQkQke53A"
+    "n9LvVl7OKkM8KGOcE_0tJRmc7x8ZlLqogPczkXvW6T-YAkwA8XwginZw0xBzfpo"
+    "OEnajqm4Yikck0gJ0HwdlYFIp72ih7uozne7PYLVGb9X97cL0H1XDiA_SXJiFKo"
+    "1AKXihcOdIRiw49eo9rzsoWPygQ";
+const std::string kRsa2048PubKeyJwkE = "AQAB";
+
+const std::string kRsa4096PrivKeyPem = R"(-----BEGIN RSA PRIVATE KEY-----
+MIIJKAIBAAKCAgEAtxmYsvs6ZfhTCFKCHQBW/W3iRfh8wZN+/XPXaOiIx9SXYSFr
+b/WRaTn8UOvflYuRnPYMaRGr5gVTS6/WFVvtNuZVIDOQBgEOBt5MQ0BeM0yPiM6q
+acP15couRwxbJx45ODQNyh5jNF4SdzqThNFTCFHtWakL1qrkGNSKdowIMaM59dm5
+8liMHxp9h9yTmqM4ZgiZkoF6Vy4KYrg9ChVqUZze4KMiyow8Xv6ESM7Eg2ncTbRe
+vuedbtYv7OVlotyozt1geFkWm/8ZUA6Z68lftYMySq0/yjAmjql0DXP1+vPL9k5s
+KGr5lpIUlB7a9JWbXdepNjvy1vslFuLjcM509d1E8e70C5VF61XthKlk3rRIBWK8
+0XupWR7o6clJsMYKxeF+ImBbzDbWrIkMxe0vTbikS6S4CLPVlYx4sMWAWu4UBpxZ
+quw4cVxyiKoZ2j5yTMAD1xiHI/b2/psFzW3qXcgQWTY7dpIP1BInhepCzHlcLSEi
+HtmMoCXNC3+i9ZXxiJ1u8avYfGjH8RrJW8dvVw7BS7zlH9s7rCn001VBJCJcXtkG
+aykw9Zd1E+Jh7IKQJn8gydsQ0enlMmtwsJO/tEvYBojFXbl4XecMWADTiExjXobX
+1y7u9ZTn0KRNkPpX9GTgY3oR0ei+rwOr4d+k2CrUdkMTGfjnfcDHKjHh3LMCAwEA
+AQKCAgBmbM4ryTfY1Pn13NnmSUtgR3jddWysiMrwEz479GCXkIgCEMTeA3wNZh+M
+UPZo3INfT5CPsg/8A5yd6UYT+rGPFXgnJFD72tky5GW69SX9AmYEvL89nR5QJjKP
+Eg1nq5OMqinQmAEcyUcBJWZiVQpizBm/Hz59HmmsrjCqshjfU5TXv60yMXBo8dOp
+Da4QQiAJi+QEvaNnY1zx7mhO3L3125AeD4Ql1B7tcOklJW1uqehQG4coub4qw2xZ
+09VwLonL9rDBgeyQ5ToOu6xE5whALJ0Ugyf8/cSD560A3Y6LjJfbN/FvBrCKFzul
+xEDts0cPTtXcfdqRgjo0PEXI0+U+tfjygf+ZrO1TUC/O0sJuiHD/V9j6aZX7IAui
+ldzoagkZwIBTmTru44Fc4OT9Ajb0h3a7BEt7QBarSgyjzGZgjJmOabDNdH9VVN2w
+iH7zkozXS16NZ2XpX6D3W8ZO3gN45L7K1yvcgy9ORhDSipStpb2loAEw2FepGiXz
+5kxF4sr7Yuj/XdxmU9/WVEv2y0x+kFQJ4lkHUuAzhDaQkBFSqTVyWO+hob9M70sT
+UJhMOLxUcJ08nKYc467yizPZ8VNIB9xZkZs2S5QeBs1femGJnTqJOqepq1YGlsRp
+LanLlWgwTwJM37itZOpGaep5RqO0NrruVOSHRNlIx1xBqgN2EQKCAQEA3Vayxmzf
+mVKilKjinVtyoHAmMWZzMxVXImt/596UvKTXExJZIlzb8eWnaxd9PLlGVQ5yifV9
+Ij1ndwcrCL2NDYmNhOaTtSNdzCsBk+rKvF6IoQC6hKg+oyo69OTQdkN34xZyO3fl
+E9afM0VQWc6IxQpjE60seBGRBvoVm4x2oRuv3+iWfSSYg95/MrSNF0DMC+acWVap
+MzfnWELe7Osgw1E8Km087DMpmdiCWUy2hpVmmWwPe1dOOBTx/lXmCQPYOhK2Kb+O
+se6DRd6ZUfDZMrye36swKpveIpxnP29CrSKu3e08od7e0FMiSy4kXQvLdNUI2YoA
+wtgUL2R6JfAMWwKCAQEA08XwJa9qoYy7UBMRfXcX8QQN3EpZUlvDNbkwlReFtZQw
+ZHnZVXf453IaZ/TzDn41Jui9Gln49XUaLzmMbwTlzsL/3eUgmuW4OAsaFRO//1HP
+awISoJkqi4cqcivkFcfg/3bpuV08dkVuLTsnNGIUVFgwpdFk+TAGVIzS7s4vzgZ7
+NIZRv+D2p8LyYks9CX9/J8ogjtnfxUFj4TCK0JPVq+WB+2AekOQxWarEeJXA2lpd
+fNpg03fWJmpAOsh7lcd6CRhoTUfaiCArrj91YN9YoClv9n5w2b64Mbd8gz6B7Lvl
+mD/KM8hpJOTVVaDLBzssL9IEZc7CPI6zAKaW1iXAiQKCAQEAg2XGt9lGXIUcE1i3
+P2dcgzZQ1h7V4MuYcMyUoBgZAGxzadUIqUerIs2NOBw3subig/gRsyjTYpJFa/oL
+aCLvK8wvAWjI403djykwxJksRetw/POrxrkChma5nUyBHNQsxdk7c2ZXzhEpbYyG
+iOn9c8wYyUOTFKyJBjVMwoz+l+IR5MD1JdGl4RMjO/zHjbhf6ei7hKXXyJo1csYw
+BUIIryr4ps82zZoJ5lUL/Ot3qCnlQMtP3Y8U1mJIzw47g7qOkNsu3VXk5miL8dyV
+9Hkg1+f2AR5ld8YUd0OWX6gzUwk1+nWt+wKOD+pqf2sjF0G7RN57ZHlyvjj8sq3Z
+fdAl5QKCAQAmChwE6OmCc0ECNSqjGs1WIaBLvZ8lyA3cjJNJdJwz7ZZztd9wFsjC
+6iAMJFe0dr8dahjtrtOlY498hB3Ro1OUPDqxpQKiUDky9+uLday7M/rKAelOp7SY
+s4LQV0n1D54+xSFehnzh0b7kqQd1xVhZfi3e2yoECLhaX6FT+/1iSI/A84+jo8kq
+gT4AofsoxZoVj50hi8lCKWjDfnCw3p0271bVzIIxDIxAywfXkS6/ChRY5PEXiyMQ
+a212IaTxVo95KsUxfIKoiP7Pod53tCa7PjY6VKP4uOVlKMxY1tWHrIilPHAZtRoN
+4nzfkK5nch2RyWu4zdbeAdPtff8CIG3hAoIBABqpu+L5lQiP3yrYAgmHbmY1iFXs
+UtXpO6Qn2sEpQl7GbaGtv/lkQ6geA9JG/ka6sO7BoIFFt0ckm1NrhFTMgunPjevm
+eVY6Sn7JZC9qyE+oCrJMg+0hzc5Gw8+/H+e0Jgca8+76WVu8gGcsLdT+NjYNQwXH
+rzo7tuC/a+Da3nd2UnMheqf8ajt7oXaXgrqYjzK9Fx/QJcUel12ny+Nx+NADx4UU
+K43Js4kcyWyYG9ms7S643u1leDDO+hpeB6EN15U2v7zXi8rMrLqvNKrBi9bCRFDu
+3zsKSPS+qeqpNBsefGtx7oluHdiQocA6w20nQ1DzIW2mOo8Pn5nzt7fPPPA=
+-----END RSA PRIVATE KEY-----)";
+const std::string kRsa4096PubKeyPem = R"(-----BEGIN PUBLIC KEY-----
+MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAtxmYsvs6ZfhTCFKCHQBW
+/W3iRfh8wZN+/XPXaOiIx9SXYSFrb/WRaTn8UOvflYuRnPYMaRGr5gVTS6/WFVvt
+NuZVIDOQBgEOBt5MQ0BeM0yPiM6qacP15couRwxbJx45ODQNyh5jNF4SdzqThNFT
+CFHtWakL1qrkGNSKdowIMaM59dm58liMHxp9h9yTmqM4ZgiZkoF6Vy4KYrg9ChVq
+UZze4KMiyow8Xv6ESM7Eg2ncTbRevuedbtYv7OVlotyozt1geFkWm/8ZUA6Z68lf
+tYMySq0/yjAmjql0DXP1+vPL9k5sKGr5lpIUlB7a9JWbXdepNjvy1vslFuLjcM50
+9d1E8e70C5VF61XthKlk3rRIBWK80XupWR7o6clJsMYKxeF+ImBbzDbWrIkMxe0v
+TbikS6S4CLPVlYx4sMWAWu4UBpxZquw4cVxyiKoZ2j5yTMAD1xiHI/b2/psFzW3q
+XcgQWTY7dpIP1BInhepCzHlcLSEiHtmMoCXNC3+i9ZXxiJ1u8avYfGjH8RrJW8dv
+Vw7BS7zlH9s7rCn001VBJCJcXtkGaykw9Zd1E+Jh7IKQJn8gydsQ0enlMmtwsJO/
+tEvYBojFXbl4XecMWADTiExjXobX1y7u9ZTn0KRNkPpX9GTgY3oR0ei+rwOr4d+k
+2CrUdkMTGfjnfcDHKjHh3LMCAwEAAQ==
+-----END PUBLIC KEY-----)";
+const std::string kRsa4096PubKeyJwkN =
+    "txmYsvs6ZfhTCFKCHQBW_W3iRfh8wZN-_XPXaOiIx9SXYSFrb_WRaTn8UOvflYu"
+    "RnPYMaRGr5gVTS6_WFVvtNuZVIDOQBgEOBt5MQ0BeM0yPiM6qacP15couRwxbJx"
+    "45ODQNyh5jNF4SdzqThNFTCFHtWakL1qrkGNSKdowIMaM59dm58liMHxp9h9yTm"
+    "qM4ZgiZkoF6Vy4KYrg9ChVqUZze4KMiyow8Xv6ESM7Eg2ncTbRevuedbtYv7OVl"
+    "otyozt1geFkWm_8ZUA6Z68lftYMySq0_yjAmjql0DXP1-vPL9k5sKGr5lpIUlB7"
+    "a9JWbXdepNjvy1vslFuLjcM509d1E8e70C5VF61XthKlk3rRIBWK80XupWR7o6c"
+    "lJsMYKxeF-ImBbzDbWrIkMxe0vTbikS6S4CLPVlYx4sMWAWu4UBpxZquw4cVxyi"
+    "KoZ2j5yTMAD1xiHI_b2_psFzW3qXcgQWTY7dpIP1BInhepCzHlcLSEiHtmMoCXN"
+    "C3-i9ZXxiJ1u8avYfGjH8RrJW8dvVw7BS7zlH9s7rCn001VBJCJcXtkGaykw9Zd"
+    "1E-Jh7IKQJn8gydsQ0enlMmtwsJO_tEvYBojFXbl4XecMWADTiExjXobX1y7u9Z"
+    "Tn0KRNkPpX9GTgY3oR0ei-rwOr4d-k2CrUdkMTGfjnfcDHKjHh3LM";
+const std::string kRsa4096PubKeyJwkE = "AQAB";
+
+const std::string kEcdsa521PrivKeyPem = R"(-----BEGIN EC PRIVATE KEY-----
+MIHcAgEBBEIAuZxTZjLIZM5hxgZX+JRrqt5FKpAEg/meZ7m9aSE3XbRITqtfz1Uy
+h2Srn7o8+4j/jQpwHTTHZThy10u5jMjaR+mgBwYFK4EEACOhgYkDgYYABAFFah0k
+6m4ddp/tUN/ObrKKwSCp4QUZdiAMaC9eY1HyNBPuuEsH5qCfeY5lmeJwSUpzCosn
+rgW8M2hQ4Kr5V9OXrgHLA5WVtH6//sSkUY2/xYuqc7/Ln8gI5ddtr1qG64Xtgs05
+/CNajSjFZeLm76llakvYiBTTH/ii8hIfrwukW9IP7Q==
+-----END EC PRIVATE KEY-----)";
+const std::string kEcdsa521PubKeyPem = R"(-----BEGIN PUBLIC KEY-----
+MIGbMBAGByqGSM49AgEGBSuBBAAjA4GGAAQBRWodJOpuHXaf7VDfzm6yisEgqeEF
+GXYgDGgvXmNR8jQT7rhLB+agn3mOZZnicElKcwqLJ64FvDNoUOCq+VfTl64BywOV
+lbR+v/7EpFGNv8WLqnO/y5/ICOXXba9ahuuF7YLNOfwjWo0oxWXi5u+pZWpL2IgU
+0x/4ovISH68LpFvSD+0=
+-----END PUBLIC KEY-----)";
+const std::string kEcdsa521PubKeyJwkX =
+    "AUVqHSTqbh12n-1Q385usorBIKnhBRl2IAxoL15jUfI0E-64SwfmoJ95jmWZ4nB"
+    "JSnMKiyeuBbwzaFDgqvlX05eu";
+const std::string kEcdsa521PubKeyJwkY =
+    "AcsDlZW0fr_-xKRRjb_Fi6pzv8ufyAjl122vWobrhe2CzTn8I1qNKMVl4ubvqWV"
+    "qS9iIFNMf-KLyEh-vC6Rb0g_t";
+
+const std::string kEcdsa384PrivKeyPem = R"(-----BEGIN EC PRIVATE KEY-----
+MIGkAgEBBDCrPXJDgQDtNRpM0qNUW/zN1vrCvOVH1CsItVZ+1NeGB+w/2whnIXJQ
+K7U5C1ETPHagBwYFK4EEACKhZANiAAR0JjvVJXc3u1I/7vt5mxzPtAIi1VIqxCwN
+wgISZVySTYZQzyicW2GfhMlFCow28LzqTwH/eCymAvnTAmpK/P1hXhNcnxDBZNOU
+WMbMLFcQrg2wwpIb/k/IXobNwjNPRBo=
+-----END EC PRIVATE KEY-----)";
+const std::string kEcdsa384PubKeyPem = R"(-----BEGIN PUBLIC KEY-----
+MHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEdCY71SV3N7tSP+77eZscz7QCItVSKsQs
+DcICEmVckk2GUM8onFthn4TJRQqMNvC86k8B/3gspgL50wJqSvz9YV4TXJ8QwWTT
+lFjGzCxXEK4NsMKSG/5PyF6GzcIzT0Qa
+-----END PUBLIC KEY-----)";
+const std::string kEcdsa384PubKeyJwkX =
+    "dCY71SV3N7tSP-77eZscz7QCItVSKsQsDcICEmVckk2GUM8onFthn4TJRQqMNvC8";
+const std::string kEcdsa384PubKeyJwkY =
+    "6k8B_3gspgL50wJqSvz9YV4TXJ8QwWTTlFjGzCxXEK4NsMKSG_5PyF6GzcIzT0Qa";
+
+const std::string kEcdsa256PrivKeyPem = R"(-----BEGIN PRIVATE KEY-----
+MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPGJGAm4X1fvBuC1z
+SpO/4Izx6PXfNMaiKaS5RUkFqEGhRANCAARCBvmeksd3QGTrVs2eMrrfa7CYF+sX
+sjyGg+Bo5mPKGH4Gs8M7oIvoP9pb/I85tdebtKlmiCZHAZE5w4DfJSV6
+-----END PRIVATE KEY-----)";
+const std::string kEcdsa256PubKeyPem = R"(-----BEGIN PUBLIC KEY-----
+MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEQgb5npLHd0Bk61bNnjK632uwmBfr
+F7I8hoPgaOZjyhh+BrPDO6CL6D/aW/yPObXXm7SpZogmRwGROcOA3yUleg==
+-----END PUBLIC KEY-----)";
+const std::string kEcdsa256PubKeyJwkX = "Qgb5npLHd0Bk61bNnjK632uwmBfrF7I8hoPgaOZjyhg";
+const std::string kEcdsa256PubKeyJwkY = "fgazwzugi-g_2lv8jzm115u0qWaIJkcBkTnDgN8lJXo";
+
+const std::string kKid1 = "public:c424b67b-fe28-45d7-b015-f79da50b5b21";
+const std::string kKid2 = "public:9b9d0b47-b9ed-4ba6-9180-52fc5b161a3a";
+
+const std::string kJwksHsFileFormat = R"(
+{
+  "keys": [
+    { "kty": "oct", "kid": "$0", "alg": "$1", "k": "$2" }
+  ]
+})";
+
+const std::string kJwksRsaFileFormat = R"(
+{
+  "keys": [
+    { "kty": "RSA", "kid": "$0", "alg": "$1", "n": "$2", "e": "$3" },
+    { "kty": "RSA", "kid": "$4", "alg": "$5", "n": "$6", "e": "$7" }
+  ]
+})";
+
+const std::string kJwksEcFileFormat = R"(
+{
+  "keys": [
+    { "kty": "EC", "kid": "$0", "crv": "$1", "x": "$2", "y": "$3" }
+  ]
+})";
+
+/// Utility class for creating a file that will be automatically deleted upon test
+/// completion.
+class TempTestDataFile {
+ public:
+  // Creates a temporary file with the specified contents.
+  explicit TempTestDataFile(const std::string& contents);
+
+  ~TempTestDataFile() { Delete(); }
+
+  /// Returns the absolute path to the file.
+  const std::string& Filename() const { return name_; }
+
+ private:
+  std::string name_;
+  bool deleted_;
+
+  // Delete this temporary file
+  void Delete();
+};
+
+TempTestDataFile::TempTestDataFile(const std::string& contents)
+  : name_("/tmp/jwks_XXXXXX"), deleted_(false) {
+  std::unique_ptr<WritableFile> tmp_file;
+  string created_filename;
+  WritableFileOptions opts;
+  opts.is_sensitive = false;
+  Status status;
+  status = Env::Default()->NewTempWritableFile(opts, &name_[0], &created_filename, &tmp_file);
+  if (!status.ok()) {
+    std::cerr << Substitute("Error creating temp file: $0", created_filename);
+  }
+
+  status = WriteStringToFile(Env::Default(), contents, created_filename);
+  if (!status.ok()) {
+    std::cerr << Substitute("Error writing contents to temp file: $0", created_filename);
+  }
+
+  name_ = created_filename;
+}
+
+void TempTestDataFile::Delete() {
+  if (deleted_) return;
+  deleted_ = true;
+  if (remove(name_.c_str()) != 0) {
+    std::cout << "Error deleting temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+}
+
+TEST(JwtUtilTest, LoadJwksFile) {
+  // Load JWKS from file.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_FALSE(jwks->IsEmpty());
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ("rs256", key1->get_algorithm());
+  ASSERT_EQ(kRsaPubKeyPem, key1->get_key());
+
+  std::string non_existing_kid("public:c424b67b-fe28-45d7-b015-f79da5-xxxxx");
+  const JWTPublicKey* key3 = jwks->LookupRSAPublicKey(non_existing_kid);
+  ASSERT_FALSE(key3 != nullptr);
+}
+
+TEST(JwtUtilTest, LoadInvalidJwksFiles) {
+  // JWK without kid.
+  std::unique_ptr<TempTestDataFile> jwks_file(new TempTestDataFile(
+      "{"
+      "  \"keys\": ["
+      "    {"
+      "      \"use\": \"sig\","
+      "      \"kty\": \"RSA\","
+      "      \"alg\": \"RS256\","
+      "      \"n\": \"sttddbg-_yjXzcFpbMJB1fIFam9lQBeXWbTqzJwbuFbspHMsRowa8FaPw\","
+      "      \"e\": \"AQAB\""
+      "    }"
+      "  ]"
+      "}"));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file->Filename(), true);
+  ASSERT_FALSE(status.ok());
+  ASSERT_STR_CONTAINS(status.ToString(), "parsing key #0")
+      << " Actual error: " << status.ToString();
+  ASSERT_STR_CONTAINS(status.ToString(), "'kid' property is required")
+      << "actual error: " << status.ToString();
+
+  // Invalid JSON format, missing "]" and "}".
+  jwks_file.reset(new TempTestDataFile(
+      "{"
+      "  \"keys\": ["
+      "    {"
+      "      \"use\": \"sig\","
+      "      \"kty\": \"RSA\","
+      "      \"kid\": \"public:c424b67b-fe28-45d7-b015-f79da50b5b21\","
+      "      \"alg\": \"RS256\","
+      "      \"n\": \"sttddbg-_yjXzcFpbMJB1fIFam9lQBeXWbTqzJwbuFbspHMsRowa8FaPw\","
+      "      \"e\": \"AQAB\""
+      "}"));
+  status = jwt_helper.Init(jwks_file->Filename(), true);
+  ASSERT_FALSE(status.ok());
+  ASSERT_STR_CONTAINS(status.ToString(), "Missing a comma or ']' after an array element")
+      << " Actual error: " << status.ToString();
+
+  // JWKS with empty key id.
+  jwks_file.reset(new TempTestDataFile(
+      Substitute(kJwksRsaFileFormat, "", "RS256", kRsaPubKeyJwkN, kRsaPubKeyJwkE,
+          "", "RS256", kRsaInvalidPubKeyJwkN, kRsaPubKeyJwkE)));
+  status = jwt_helper.Init(jwks_file->Filename(), true);
+  ASSERT_FALSE(status.ok());
+  ASSERT_STR_CONTAINS(status.ToString(), "parsing key #0")
+      << " Actual error: " << status.ToString();
+  ASSERT_STR_CONTAINS(status.ToString(), "'kid' property must be a non-empty string")
+      << " Actual error: " << status.ToString();
+
+  // JWKS with empty key value.
+  jwks_file.reset(new TempTestDataFile(
+      Substitute(kJwksRsaFileFormat, kKid1, "RS256", "", "", kKid2, "RS256", "", "")));
+  status = jwt_helper.Init(jwks_file->Filename(), true);
+  ASSERT_FALSE(status.ok());
+  ASSERT_STR_CONTAINS(status.ToString(), "parsing key #0")
+      << " Actual error: " << status.ToString();
+  ASSERT_STR_CONTAINS(status.ToString(), "'n' and 'e' properties must be a non-empty string")
+      << " Actual error: " << status.ToString();
+}
+
+TEST(JwtUtilTest, VerifyJwtHS256) {
+  // Cryptographic algorithm: HS256.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret = "Yx57JSBzhGFDgDj19CabRpH/+kiaKqI6UZI6lDunQKw=";
+  TempTestDataFile jwks_file(
+      Substitute(kJwksHsFileFormat, kKid1, "HS256", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  EXPECT_OK(status);
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS256.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS256")
+                   .set_key_id(kKid1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs256(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtHS384) {
+  // Cryptographic algorithm: HS384.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret =
+      "TlqmKRc2PNQJXTC3Go7eAadwPxA7x9byyXCi5I8tSvxrE77tYbuF5pfZAyswrkou";
+  TempTestDataFile jwks_file(
+      Substitute(kJwksHsFileFormat, kKid1, "HS384", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS384.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS384")
+                   .set_key_id(kKid1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs384(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtHS512) {
+  // Cryptographic algorithm: HS512.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret = "ywc6DN7+iRw1E5HOqzvrsYodykSLFutT28KN3bJnLZcZpPCNjn0b6gbMfXPcxeY"
+                         "VyuWWGDxh6gCDwPMejbuEEg==";
+  TempTestDataFile jwks_file(
+      Substitute(kJwksHsFileFormat, kKid1, "HS512", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  EXPECT_OK(status);
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS512.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS512")
+                   .set_key_id(kKid1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs512(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS256) {
+  // Cryptographic algorithm: RS256.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kRsaPubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id(kKid1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+  ASSERT_EQ(
+      "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1mNzlkYTUwYj"
+      "ViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoiaW1wYWxhIn0.OW5H2SClL"
+      "lsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI02W69TZNat7DYcC86A_ogLMfNXagHj"
+      "lMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU554kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2"
+      "y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgcbZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO"
+      "25b5n4fryfKasSgaZfmk0CoOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKs"
+      "Pk0bg",
+      token);
+
+  // Verify the JWT token with jwt-cpp APIs directly.
+  auto jwt_decoded_token = jwt::decode(token);
+  auto verifier = jwt::verify()
+                      .allow_algorithm(jwt::algorithm::rs256(kRsaPubKeyPem, "", "", ""))
+                      .with_issuer("auth0");
+  verifier.verify(jwt_decoded_token);
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS384) {
+  // Cryptographic algorithm: RS384.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS384",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS384", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kRsaPubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with RS384.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS384")
+          .set_key_id(kKid1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs384(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS512) {
+  // Cryptographic algorithm: RS512.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS512",
+      kRsa512PubKeyJwkN, kRsa512PubKeyJwkE, kKid2, "RS512",
+      kRsa512InvalidPubKeyJwkN, kRsa512PubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kRsa512PubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with RS512.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS512")
+          .set_key_id(kKid1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs512(kRsa512PubKeyPem, kRsa512PrivKeyPem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtPS256) {
+  // Cryptographic algorithm: PS256.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "PS256",
+      kRsa1024PubKeyJwkN, kRsa1024PubKeyJwkE, kKid2, "PS256",
+      kRsaInvalidPubKeyJwkN, kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kRsa1024PubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with PS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("PS256")
+          .set_key_id(kKid1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::ps256(kRsa1024PubKeyPem, kRsa1024PrivKeyPem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtPS384) {
+  // Cryptographic algorithm: PS384.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "PS384",
+      kRsa2048PubKeyJwkN, kRsa2048PubKeyJwkE, kKid2, "PS384",
+      kRsaInvalidPubKeyJwkN, kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kRsa2048PubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with PS384.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("PS384")
+          .set_key_id(kKid1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::ps384(kRsa2048PubKeyPem, kRsa2048PrivKeyPem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtPS512) {
+  // Cryptographic algorithm: PS512.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "PS512",
+      kRsa4096PubKeyJwkN, kRsa4096PubKeyJwkE, kKid2, "PS512",
+      kRsaInvalidPubKeyJwkN, kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kRsa4096PubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with PS512.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("PS512")
+          .set_key_id(kKid1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::ps512(kRsa4096PubKeyPem, kRsa4096PrivKeyPem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtES256) {
+  // Cryptographic algorithm: ES256.
+  TempTestDataFile jwks_file(Substitute(kJwksEcFileFormat, kKid1, "P-256",
+      kEcdsa256PubKeyJwkX, kEcdsa256PubKeyJwkY));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(1, jwks->GetECPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupECPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kEcdsa256PubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with ES256.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("ES256")
+                   .set_key_id(kKid1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::es256(
+                       kEcdsa256PubKeyPem, kEcdsa256PrivKeyPem, "", ""));
+
+  // Verify the JWT token with jwt-cpp APIs directly.
+  auto jwt_decoded_token = jwt::decode(token);
+  auto verifier =
+      jwt::verify()
+          .allow_algorithm(jwt::algorithm::es256(kEcdsa256PubKeyPem, "", "", ""))
+          .with_issuer("auth0");
+  verifier.verify(jwt_decoded_token);
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtES384) {
+  // Cryptographic algorithm: ES384.
+  TempTestDataFile jwks_file(Substitute(kJwksEcFileFormat, kKid1, "P-384",
+      kEcdsa384PubKeyJwkX, kEcdsa384PubKeyJwkY));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(1, jwks->GetECPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupECPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kEcdsa384PubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with ES384.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("ES384")
+                   .set_key_id(kKid1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::es384(
+                       kEcdsa384PubKeyPem, kEcdsa384PrivKeyPem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtES512) {
+  // Cryptographic algorithm: ES512.
+  TempTestDataFile jwks_file(Substitute(kJwksEcFileFormat, kKid1, "P-521",
+      kEcdsa521PubKeyJwkX, kEcdsa521PubKeyJwkY));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+  JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(1, jwks->GetECPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupECPublicKey(kKid1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(kEcdsa521PubKeyPem, key1->get_key());
+
+  // Create a JWT token and sign it with ES512.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("ES512")
+                   .set_key_id(kKid1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::es512(
+                       kEcdsa521PubKeyPem, kEcdsa521PrivKeyPem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtNotVerifySignature) {
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+
+  // Do not verify signature.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  Status status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtFailMismatchingAlgorithms) {
+  // JWT algorithm is not matching with algorithm in JWK.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+
+  // Create a JWT token, but set mismatching algorithm.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS512")
+          .set_key_id(kKid1)
+          .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+  // Failed to verify the token due to mismatching algorithms.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.ToString().find(
+                  "JWT algorithm 'rs512' is not matching with JWK algorithm 'rs256'")
+      != std::string::npos)
+      << " Actual error: " << status.ToString();
+}
+
+TEST(JwtUtilTest, VerifyJwtFailKeyNotFound) {
+  // The key cannot be found in JWKS.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+
+  // Create a JWT token with a key ID which can not be found in JWKS.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id("unfound-key-id")
+          .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+  // Failed to verify the token since key is not found in JWKS.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(
+      status.ToString().find("Invalid JWK ID in the JWT token") != std::string::npos)
+      << " Actual error: " << status.ToString();
+}
+
+TEST(JwtUtilTest, VerifyJwtTokenWithoutKeyId) {
+  // Verify JWT token without key ID.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+
+  // Create a JWT token without key ID.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").set_algorithm("RS256").sign(
+          jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+  // Verify the token by trying each key in JWK set and there is one matched key.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+}
+
+TEST(JwtUtilTest, VerifyJwtFailTokenWithoutKeyId) {
+  // Verify JWT token without key ID.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+
+  // Create a JWT token without key ID.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").set_algorithm("RS512").sign(
+          jwt::algorithm::rs512(kRsa512PubKeyPem, kRsa512PrivKeyPem, "", ""));
+  // Verify the token by trying each key in JWK set, but there is no matched key.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+}
+
+TEST(JwtUtilTest, VerifyJwtFailTokenWithoutSignature) {
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+
+  // Create a JWT token without signature.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").sign(jwt::algorithm::none{});
+  // Failed to verify the unsigned token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.ToString().find("Unsecured JWT") != std::string::npos)
+      << " Actual error: " << status.ToString();
+}
+
+TEST(JwtUtilTest, VerifyJwtFailExpiredToken) {
+  // Sign JWT token with RS256.
+  TempTestDataFile jwks_file(Substitute(kJwksRsaFileFormat, kKid1, "RS256",
+      kRsaPubKeyJwkN, kRsaPubKeyJwkE, kKid2, "RS256", kRsaInvalidPubKeyJwkN,
+      kRsaPubKeyJwkE));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename(), true);
+  EXPECT_OK(status);
+
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id(kKid1)
+          .set_issued_at(std::chrono::system_clock::now())
+          .set_expires_at(std::chrono::system_clock::now() - std::chrono::seconds{10})
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(kRsaPubKeyPem, kRsaPrivKeyPem, "", ""));
+
+  // Verify the token, including expiring time.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.ToString().find("Verification failed, error: token expired")
+      != std::string::npos)
+      << " Actual error: " << status.ToString();
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/jwt-util.cc b/src/kudu/util/jwt-util.cc
new file mode 100644
index 0000000..31cc300
--- /dev/null
+++ b/src/kudu/util/jwt-util.cc
@@ -0,0 +1,933 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Copied from Impala and adapted to Kudu.
+
+#include "kudu/util/jwt-util.h"
+
+#include <errno.h>
+#include <openssl/bn.h>
+#include <openssl/crypto.h>
+#include <openssl/ec.h>
+#include <openssl/obj_mac.h>
+#include <openssl/pem.h>
+#include <openssl/ssl.h>
+#include <openssl/x509.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <sys/stat.h>
+
+#include <cstring>
+#include <exception>
+#include <functional>
+#include <mutex>
+#include <ostream>
+#include <stdexcept>
+#include <type_traits>
+#include <typeinfo>
+#include <unordered_map>
+#include <utility>
+
+#include <boost/algorithm/string/case_conv.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <jwt-cpp/jwt.h>
+#include <jwt-cpp/traits/kazuho-picojson/defaults.h>
+#include <jwt-cpp/traits/kazuho-picojson/traits.h>
+#include <rapidjson/document.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/filereadstream.h>
+#include <rapidjson/rapidjson.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/curl_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/hash_util.h"
+#include "kudu/util/jwt-util-internal.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/openssl_util.h"
+#include "kudu/util/promise.h"
+#include "kudu/util/string_case.h"
+#include "kudu/util/thread.h"
+
+using std::string;
+using strings::Substitute;
+
+DEFINE_int32(jwks_update_frequency_s, 60,
+    "The time in seconds to wait between downloading JWKS from the specified URL.");
+DEFINE_int32(jwks_pulling_timeout_s, 10,
+    "The time in seconds for connection timed out when pulling JWKS from the specified URL.");
+
+static bool ValidateBiggerThanZero(const char* name, const int32_t val) {
+  if (val <= 0) {
+    LOG(ERROR) << Substitute("Invalid value for $0 flag: $1", name, val);
+    return false;
+  }
+  return true;
+}
+
+DEFINE_validator(jwks_update_frequency_s, &ValidateBiggerThanZero);
+DEFINE_validator(jwks_pulling_timeout_s, &ValidateBiggerThanZero);
+
+namespace kudu {
+
+using rapidjson::Document;
+using rapidjson::Value;
+
+// JWK Set (JSON Web Key Set) is JSON data structure that represents a set of JWKs.
+// This class parses JWKS file.
+class JWKSetParser {
+  public:
+  explicit JWKSetParser(JWKSSnapshot* jwks) : jwks_(jwks) {}
+
+  // Perform the parsing and populate JWKS's internal map. Return error status if
+  // encountering any error.
+  Status Parse(const Document& rules_doc) {
+    bool found_keys = false;
+    for (Value::ConstMemberIterator member = rules_doc.MemberBegin();
+         member != rules_doc.MemberEnd(); ++member) {
+      if (strcmp("keys", member->name.GetString()) == 0) {
+        found_keys = true;
+        RETURN_NOT_OK(ParseKeys(member->value));
+      } else {
+        return Status::InvalidArgument(
+            Substitute(
+                "Unexpected property '$0' must be removed", member->name.GetString()));
+      }
+    }
+    if (!found_keys) {
+      return Status::InvalidArgument("An array of keys is required");
+    }
+    return Status::OK();
+  }
+
+ private:
+  JWKSSnapshot* jwks_;
+
+  static string NameOfTypeOfJsonValue(const Value& value) {
+    switch (value.GetType()) {
+      case rapidjson::kNullType:
+        return "Null";
+      case rapidjson::kFalseType:
+      case rapidjson::kTrueType:
+        return "Bool";
+      case rapidjson::kObjectType:
+        return "Object";
+      case rapidjson::kArrayType:
+        return "Array";
+      case rapidjson::kStringType:
+        return "String";
+      case rapidjson::kNumberType:
+        if (value.IsInt()) return "Integer";
+        if (value.IsDouble()) return "Float";
+      default:
+        DCHECK(false);
+        return "Unknown";
+    }
+  }
+
+  // Parse an array of keys.
+  Status ParseKeys(const Value& keys) {
+    if (!keys.IsArray()) {
+      return Status::InvalidArgument(
+          Substitute(
+              "'keys' must be of type Array but is a '$0'", NameOfTypeOfJsonValue(keys)));
+    }
+    if (keys.Size() == 0) {
+      return Status::InvalidArgument(Substitute("'keys' must be a non empty Array"));
+    }
+    for (rapidjson::SizeType key_idx = 0; key_idx < keys.Size(); ++key_idx) {
+      const Value& key = keys[key_idx];
+      if (!key.IsObject()) {
+        return Status::InvalidArgument(
+            Substitute("parsing key #$0, key should be a JSON Object but is a '$1'.",
+                key_idx, NameOfTypeOfJsonValue(key)));
+      }
+      Status status = ParseKey(key);
+      if (!status.ok()) {
+        Status parse_status = Status::InvalidArgument(Substitute("parsing key #$0, ", key_idx));
+        return parse_status.CloneAndAppend(status.ToString());
+      }
+    }
+    return Status::OK();
+  }
+
+  // Parse a public key and populate JWKS's internal map.
+  Status ParseKey(const Value& json_key) {
+    std::unordered_map<std::string, std::string> kv_map;
+    string key;
+    string value;
+    for (Value::ConstMemberIterator member = json_key.MemberBegin();
+         member != json_key.MemberEnd(); ++member) {
+      key = string(member->name.GetString());
+      RETURN_NOT_OK(ReadKeyProperty(key, json_key, &value, /*required*/ false));
+      if (kv_map.find(key) == kv_map.end()) {
+        kv_map.insert(make_pair(key, value));
+      } else {
+        LOG(WARNING) << "Duplicate property of JWK: " << key;
+      }
+    }
+
+    auto it_kty = kv_map.find("kty");
+    if (it_kty == kv_map.end()) return Status::InvalidArgument("'kty' property is required");
+    auto it_kid = kv_map.find("kid");
+    if (it_kid == kv_map.end()) return Status::InvalidArgument("'kid' property is required");
+    string key_id = it_kid->second;
+    if (key_id.empty()) {
+      return Status::InvalidArgument(Substitute("'kid' property must be a non-empty string"));
+    }
+
+    Status status;
+    string key_type;
+    ToLowerCase(it_kty->second, &key_type);
+    if (key_type == "oct") {
+      JWTPublicKey* jwt_pub_key;
+      status = HSJWTPublicKeyBuilder::CreateJWKPublicKey(kv_map, &jwt_pub_key);
+      if (status.ok()) jwks_->AddHSKey(key_id, jwt_pub_key);
+    } else if (key_type == "rsa") {
+      JWTPublicKey* jwt_pub_key;
+      status = RSAJWTPublicKeyBuilder::CreateJWKPublicKey(kv_map, &jwt_pub_key);
+      if (status.ok()) jwks_->AddRSAPublicKey(key_id, jwt_pub_key);
+    } else if (key_type == "ec") {
+      JWTPublicKey* jwt_pub_key;
+      status = ECJWTPublicKeyBuilder::CreateJWKPublicKey(kv_map, &jwt_pub_key);
+      if (status.ok()) jwks_->AddECPublicKey(key_id, jwt_pub_key);
+    } else {
+      return Status::InvalidArgument(Substitute("Unsupported kty: '$0'", key_type));
+    }
+    return status;
+  }
+
+  // Reads a key property of the given name and assigns the property value to the out
+  // parameter. A true return value indicates success.
+  template <typename T>
+  Status ReadKeyProperty(
+      const string& name, const Value& json_key, T* value, bool required = true) {
+    const Value& json_value = json_key[name.c_str()];
+    if (json_value.IsNull()) {
+      if (required) {
+        return Status::InvalidArgument(Substitute("'$0' property is required and cannot be null",
+                                                  name));
+      }
+      return Status::OK();
+
+    }
+    return ValidateTypeAndExtractValue(name, json_value, value);
+  }
+
+// Extract a value stored in a rapidjson::Value and assign it to the out parameter.
+// The type will be validated before extraction. A true return value indicates success.
+// The name parameter is only used to generate an error message upon failure.
+#define EXTRACT_VALUE(json_type, cpp_type)                                             \
+  Status ValidateTypeAndExtractValue(                                                  \
+      const string& name, const Value& json_value, cpp_type* value) {                  \
+    if (!json_value.Is##json_type()) {                                                 \
+      return Status::InvalidArgument(                                                  \
+          Substitute("'$0' property must be of type " #json_type " but is a $1", name, \
+              NameOfTypeOfJsonValue(json_value)));                                     \
+    }                                                                                  \
+    *value = json_value.Get##json_type();                                              \
+    return Status::OK();                                                               \
+  }
+
+  EXTRACT_VALUE(String, string)
+  // EXTRACT_VALUE(Bool, bool)
+};
+
+//
+// JWTPublicKey member functions.
+//
+// Verify JWT's signature for the given decoded token with jwt-cpp API.
+Status JWTPublicKey::Verify(
+    const DecodedJWT& decoded_jwt, const std::string& algorithm) const {
+  // Verify if algorithms are matching.
+  if (algorithm_ != algorithm) {
+    return Status::NotAuthorized(
+        Substitute("JWT algorithm '$0' is not matching with JWK algorithm '$1'",
+            algorithm, algorithm_));
+  }
+
+  Status status;
+  try {
+    // Call jwt-cpp API to verify token's signature.
+    verifier_.verify(decoded_jwt);
+  } catch (const jwt::error::rsa_exception& e) {
+    status = Status::NotAuthorized(Substitute("RSA error: $0", e.what()));
+  } catch (const jwt::error::token_verification_exception& e) {
+    status = Status::NotAuthorized(
+        Substitute("Verification failed, error: $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status::NotAuthorized(
+        Substitute("Verification failed, error: $0", e.what()));
+  }
+  return status;
+}
+
+// Create a JWKPublicKey of HS from the JWK.
+Status HSJWTPublicKeyBuilder::CreateJWKPublicKey(
+    const JsonKVMap& kv_map, JWTPublicKey** pub_key_out) {
+  // Octet Sequence keys for HS256, HS384 or HS512.
+  // JWK Sample:
+  // {
+  //   "kty":"oct",
+  //   "alg":"HS256",
+  //   "k":"f83OJ3D2xF1Bg8vub9tLe1gHMzV76e8Tus9uPHvRVEU",
+  //   "kid":"Id that can be uniquely Identified"
+  // }
+  auto it_alg = kv_map.find("alg");
+  if (it_alg == kv_map.end()) return Status::InvalidArgument("'alg' property is required");
+  string algorithm;
+  ToLowerCase(it_alg->second, &algorithm);
+
+  if (algorithm.empty()) {
+    return Status::InvalidArgument(Substitute("'alg' property must be a non-empty string"));
+  }
+  auto it_k = kv_map.find("k");
+  if (it_k == kv_map.end()) return Status::InvalidArgument("'k' property is required");
+  if (it_k->second.empty()) {
+    return Status::InvalidArgument(Substitute("'k' property must be a non-empty string"));
+  }
+
+  Status status;
+  JWTPublicKey* jwt_pub_key = nullptr;
+  try {
+    if (algorithm == "hs256") {
+      jwt_pub_key = new HS256JWTPublicKey(algorithm, it_k->second);
+    } else if (algorithm == "hs384") {
+      jwt_pub_key = new HS384JWTPublicKey(algorithm, it_k->second);
+    } else if (algorithm == "hs512") {
+      jwt_pub_key = new HS512JWTPublicKey(algorithm, it_k->second);
+    } else {
+      return Status::InvalidArgument(Substitute("Invalid 'alg' property value: '$0'", algorithm));
+    }
+  } catch (const std::exception& e) {
+    status = Status::NotAuthorized(
+        Substitute("Failed to initialize verifier, error: $0", e.what()));
+  }
+  if (!status.ok()) return status;
+  *pub_key_out = jwt_pub_key;
+  return Status::OK();
+}
+
+// Create a JWKPublicKey of RSA from the JWK.
+Status RSAJWTPublicKeyBuilder::CreateJWKPublicKey(
+    const JsonKVMap& kv_map, JWTPublicKey** pub_key_out) {
+  // JWK Sample:
+  // {
+  //   "kty":"RSA",
+  //   "alg":"RS256",
+  //   "n":"sttddbg-_yjXzcFpbMJB1fI9...Q_QDhvqXx8eQ1r9smM",
+  //   "e":"AQAB",
+  //   "kid":"Id that can be uniquely Identified"
+  // }
+  auto it_alg = kv_map.find("alg");
+  if (it_alg == kv_map.end()) return Status::InvalidArgument("'alg' property is required");
+  string algorithm = boost::algorithm::to_lower_copy(it_alg->second);
+  if (algorithm.empty()) {
+    return Status::InvalidArgument(Substitute("'alg' property must be a non-empty string"));
+  }
+
+  auto it_n = kv_map.find("n");
+  auto it_e = kv_map.find("e");
+  if (it_n == kv_map.end() || it_e == kv_map.end()) {
+    return Status::InvalidArgument("'n' and 'e' properties are required");
+  }
+  if (it_n->second.empty() || it_e->second.empty()) {
+    return Status::InvalidArgument("'n' and 'e' properties must be a non-empty string");
+  }
+  // Converts public key to PEM encoded form.
+  string pub_key;
+  if (!ConvertJwkToPem(it_n->second, it_e->second, pub_key)) {
+    return Status::InvalidArgument(
+        Substitute("Invalid public key 'n':'$0', 'e':'$1'", it_n->second, it_e->second));
+  }
+
+  Status status;
+  JWTPublicKey* jwt_pub_key = nullptr;
+  try {
+    if (algorithm == "rs256") {
+      jwt_pub_key = new RS256JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm == "rs384") {
+      jwt_pub_key = new RS384JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm == "rs512") {
+      jwt_pub_key = new RS512JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm == "ps256") {
+      jwt_pub_key = new PS256JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm == "ps384") {
+      jwt_pub_key = new PS384JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm == "ps512") {
+      jwt_pub_key = new PS512JWTPublicKey(algorithm, pub_key);
+    } else {
+      return Status::InvalidArgument(Substitute("Invalid 'alg' property value: '$0'", algorithm));
+    }
+  } catch (const jwt::error::rsa_exception& e) {
+    status = Status::NotAuthorized(Substitute("RSA error: $0", e.what()));
+    RETURN_NOT_OK(status);
+  } catch (const std::exception& e) {
+    status = Status::NotAuthorized(
+        Substitute("Failed to initialize verifier, error: $0", e.what()));
+    RETURN_NOT_OK(status);
+  }
+
+  *pub_key_out = jwt_pub_key;
+  return Status::OK();
+}
+
+// Convert public key of RSA from JWK format to PEM encoded format by using OpenSSL APIs.
+bool RSAJWTPublicKeyBuilder::ConvertJwkToPem(
+    const std::string& base64_n, const std::string& base64_e, std::string& pub_key) {
+  pub_key.clear();
+  string str_n;
+  string str_e;
+  if (!strings::WebSafeBase64Unescape(base64_n, &str_n)) return false;
+  if (!strings::WebSafeBase64Unescape(base64_e, &str_e)) return false;
+  BIGNUM* modul = BN_bin2bn(reinterpret_cast<const unsigned char*>(str_n.c_str()),
+                            static_cast<int>(str_n.size()),
+                            nullptr);
+  BIGNUM* expon = BN_bin2bn(reinterpret_cast<const unsigned char*>(str_e.c_str()),
+                            static_cast<int>(str_e.size()),
+                            nullptr);
+
+  RSA* rsa = RSA_new();
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  rsa->n = modul;
+  rsa->e = expon;
+#else
+  // RSA_set0_key is a new API introduced in OpenSSL version 1.1
+  RSA_set0_key(rsa, modul, expon, nullptr);
+#endif
+
+  unsigned char desc[1024];
+  memset(desc, 0, 1024);
+  auto bio = security::ssl_make_unique(BIO_new(BIO_s_mem()));
+  PEM_write_bio_RSA_PUBKEY(bio.get(), rsa);
+  if (BIO_read(bio.get(), desc, 1024) > 0) {
+    pub_key = reinterpret_cast<char*>(desc);
+    // Remove last '\n'.
+    if (pub_key.length() > 0 && pub_key[pub_key.length() - 1] == '\n') pub_key.pop_back();
+  }
+  RSA_free(rsa);
+  return !pub_key.empty();
+}
+
+// Create a JWKPublicKey of EC (ES256, ES384 or ES512) from the JWK.
+Status ECJWTPublicKeyBuilder::CreateJWKPublicKey(
+    const JsonKVMap& kv_map, JWTPublicKey** pub_key_out) {
+  // JWK Sample:
+  // {
+  //   "kty":"EC",
+  //   "crv":"P-256",
+  //   "x":"f83OJ3D2xF1Bg8vub9tLe1gHMzV76e8Tus9uPHvRVEU",
+  //   "y":"x_FEzRu9m36HLN_tue659LNpXW6pCyStikYjKIWI5a0",
+  //   "kid":"Id that can be uniquely Identified"
+  // }
+  string algorithm;
+  int eccgrp;
+  auto it_crv = kv_map.find("crv");
+  if (it_crv != kv_map.end()) {
+    string curve = boost::algorithm::to_upper_copy(it_crv->second);
+    if (curve == "P-256") {
+      algorithm = "es256";
+      eccgrp = NID_X9_62_prime256v1;
+    } else if (curve == "P-384") {
+      algorithm = "es384";
+      eccgrp = NID_secp384r1;
+    } else if (curve == "P-521") {
+      algorithm = "es512";
+      eccgrp = NID_secp521r1;
+    } else {
+      return Status::NotSupported(Substitute("Unsupported crv: '$0'", curve));
+    }
+  } else {
+    auto it_alg = kv_map.find("alg");
+    if (it_alg == kv_map.end()) {
+      return Status::InvalidArgument("'alg' or 'crv' property is required");
+    }
+    algorithm = boost::algorithm::to_lower_copy(it_alg->second);
+    if (algorithm.empty()) {
+      return Status::InvalidArgument(Substitute("'alg' property must be a non-empty string"));
+    }
+    if (algorithm == "es256") {
+      // ECDSA using P-256 and SHA-256 (OBJ_txt2nid("prime256v1")).
+      eccgrp = NID_X9_62_prime256v1;
+    } else if (algorithm == "es384") {
+      // ECDSA using P-384 and SHA-384 (OBJ_txt2nid("secp384r1")).
+      eccgrp = NID_secp384r1;
+    } else if (algorithm == "es512") {
+      // ECDSA using P-521 and SHA-512 (OBJ_txt2nid("secp521r1")).
+      eccgrp = NID_secp521r1;
+    } else {
+      return Status::NotSupported(Substitute("Unsupported alg: '$0'", algorithm));
+    }
+  }
+
+  auto it_x = kv_map.find("x");
+  auto it_y = kv_map.find("y");
+  if (it_x == kv_map.end() || it_y == kv_map.end()) {
+    return Status::InvalidArgument("'x' and 'y' properties are required");
+  }
+  if (it_x->second.empty() || it_y->second.empty()) {
+    return Status::InvalidArgument("'x' and 'y' properties must be a non-empty string");
+  }
+  // Converts public key to PEM encoded form.
+  string pub_key;
+  if (!ConvertJwkToPem(eccgrp, it_x->second, it_y->second, pub_key)) {
+    return Status::InvalidArgument(
+        Substitute("Invalid public key 'x':'$0', 'y':'$1'", it_x->second, it_y->second));
+  }
+
+  Status status;
+  JWTPublicKey* jwt_pub_key = nullptr;
+  try {
+    if (algorithm == "es256") {
+      jwt_pub_key = new ES256JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm == "es384") {
+      jwt_pub_key = new ES384JWTPublicKey(algorithm, pub_key);
+    } else {
+      DCHECK(algorithm == "es512");
+      jwt_pub_key = new ES512JWTPublicKey(algorithm, pub_key);
+    }
+  } catch (const jwt::error::ecdsa_exception& e) {
+    status = Status::NotAuthorized(Substitute("EC error: $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status::NotAuthorized(
+        Substitute("Failed to initialize verifier, error: $0", e.what()));
+  }
+  if (!status.ok()) return status;
+  *pub_key_out = jwt_pub_key;
+  return Status::OK();
+}
+
+// Convert public key of EC from JWK format to PEM encoded format by using OpenSSL APIs.
+bool ECJWTPublicKeyBuilder::ConvertJwkToPem(int eccgrp, const std::string& base64_x,
+    const std::string& base64_y, std::string& pub_key) {
+  pub_key.clear();
+  string ascii_x;
+  string ascii_y;
+  if (!strings::WebSafeBase64Unescape(base64_x, &ascii_x)) return false;
+  if (!strings::WebSafeBase64Unescape(base64_y, &ascii_y)) return false;
+  BIGNUM* x = BN_bin2bn(reinterpret_cast<const unsigned char*>(ascii_x.c_str()),
+                        static_cast<int>(ascii_x.size()),
+                        nullptr);
+  BIGNUM* y = BN_bin2bn(reinterpret_cast<const unsigned char*>(ascii_y.c_str()),
+                        static_cast<int>(ascii_y.size()),
+                          nullptr);
+
+  BIO* bio = nullptr;
+  EC_KEY* ecKey = EC_KEY_new_by_curve_name(eccgrp);
+  EC_KEY_set_asn1_flag(ecKey, OPENSSL_EC_NAMED_CURVE);
+  if (EC_KEY_set_public_key_affine_coordinates(ecKey, x, y) == 0) goto cleanup;
+
+  unsigned char desc[1024];
+  memset(desc, 0, 1024);
+  bio = BIO_new(BIO_s_mem());
+  if (PEM_write_bio_EC_PUBKEY(bio, ecKey) != 0) {
+    if (BIO_read(bio, desc, 1024) > 0) {
+      pub_key = reinterpret_cast<char*>(desc);
+      // Remove last '\n'.
+      if (pub_key.length() > 0 && pub_key[pub_key.length() - 1] == '\n') {
+        pub_key.pop_back();
+      }
+    }
+  }
+
+cleanup:
+  if (bio != nullptr) BIO_free(bio);
+  EC_KEY_free(ecKey);
+  BN_free(x);
+  BN_free(y);
+  return !pub_key.empty();
+}
+
+//
+// JWKSSnapshot member functions.
+//
+
+// Load JWKS from the given local json file.
+Status JWKSSnapshot::LoadKeysFromFile(const string& jwks_file_path) {
+  hs_key_map_.clear();
+  rsa_pub_key_map_.clear();
+
+  // Read the file.
+  FILE* jwks_file = fopen(jwks_file_path.c_str(), "r");
+  if (jwks_file == nullptr) {
+    return Status::RuntimeError(
+        Substitute("Could not open JWKS file '$0'; $1", jwks_file_path, strerror(errno)));
+  }
+  // Check for an empty file and ignore it.
+  struct stat jwks_file_stats;
+  if (fstat(fileno(jwks_file), &jwks_file_stats)) {
+    fclose(jwks_file);
+    return Status::RuntimeError(
+        Substitute("Error reading JWKS file '$0'; $1", jwks_file_path, strerror(errno)));
+  }
+  if (jwks_file_stats.st_size == 0) {
+    fclose(jwks_file);
+    return Status::OK();
+  }
+
+  char readBuffer[65536];
+  rapidjson::FileReadStream stream(jwks_file, readBuffer, sizeof(readBuffer));
+  Document jwks_doc;
+  jwks_doc.ParseStream(stream);
+  fclose(jwks_file);
+  if (jwks_doc.HasParseError()) {
+    return Status::InvalidArgument(GetParseError_En(jwks_doc.GetParseError()));
+  }
+  if (!jwks_doc.IsObject()) {
+    return Status::InvalidArgument("root element must be a JSON Object");
+  }
+  if (!jwks_doc.HasMember("keys")) {
+    return Status::InvalidArgument("keys is required");
+  }
+
+  JWKSetParser jwks_parser(this);
+  return jwks_parser.Parse(jwks_doc);
+}
+
+// Download JWKS from the given URL with Kudu's EasyCurl wrapper.
+Status JWKSSnapshot::LoadKeysFromUrl(
+    const std::string& jwks_url, uint64_t cur_jwks_checksum, bool* is_changed) {
+  kudu::EasyCurl curl;
+  kudu::faststring dst;
+  Status status;
+
+  curl.set_timeout(
+      kudu::MonoDelta::FromMilliseconds(static_cast<int64_t>(FLAGS_jwks_pulling_timeout_s) * 1000));
+  curl.set_verify_peer(false);
+  // TODO support CurlAuthType by calling kudu::EasyCurl::set_auth().
+  RETURN_NOT_OK_PREPEND(curl.FetchURL(jwks_url, &dst),
+      Substitute("Error downloading JWKS from '$0'", jwks_url));
+  if (dst.size() > 0) {
+    // Verify if the checksum of the downloaded JWKS has been changed.
+    jwks_checksum_ = HashUtil::FastHash64(dst.data(), dst.size(), /*seed*/ 0xcafebeef);
+    if (jwks_checksum_ == cur_jwks_checksum) return Status::OK();
+    *is_changed = true;
+    // Append '\0' so that the in-memory object could be parsed as StringStream.
+    dst.push_back('\0');
+#ifndef NDEBUG
+    VLOG(3) << "JWKS: " << dst.data();
+#endif
+    // Parse in-memory JWKS JSON object as StringStream.
+    Document jwks_doc;
+    jwks_doc.Parse(reinterpret_cast<char*>(dst.data()));
+    if (jwks_doc.HasParseError()) {
+      status = Status::InvalidArgument(GetParseError_En(jwks_doc.GetParseError()));
+    } else if (!jwks_doc.IsObject()) {
+      status = Status::InvalidArgument("root element must be a JSON Object");
+    } else if (!jwks_doc.HasMember("keys")) {
+      status = Status::InvalidArgument("keys is required");
+    } else {
+      // Load and initialize public keys.
+      JWKSetParser jwks_parser(this);
+      status = jwks_parser.Parse(jwks_doc);
+    }
+  }
+  return status;
+}
+
+void JWKSSnapshot::AddHSKey(const std::string& key_id, JWTPublicKey* jwk_pub_key) {
+  if (hs_key_map_.find(key_id) == hs_key_map_.end()) {
+    hs_key_map_[key_id].reset(jwk_pub_key);
+  } else {
+    LOG(WARNING) << "Duplicate key ID of JWK for HS key: " << key_id;
+  }
+}
+
+void JWKSSnapshot::AddRSAPublicKey(const std::string& key_id, JWTPublicKey* jwk_pub_key) {
+  if (rsa_pub_key_map_.find(key_id) == rsa_pub_key_map_.end()) {
+    rsa_pub_key_map_[key_id].reset(jwk_pub_key);
+  } else {
+    LOG(WARNING) << "Duplicate key ID of JWK for RSA public key: " << key_id;
+  }
+}
+
+void JWKSSnapshot::AddECPublicKey(const std::string& key_id, JWTPublicKey* jwk_pub_key) {
+  if (ec_pub_key_map_.find(key_id) == ec_pub_key_map_.end()) {
+    ec_pub_key_map_[key_id].reset(jwk_pub_key);
+  } else {
+    LOG(WARNING) << "Duplicate key ID of JWK for EC public key: " << key_id;
+  }
+}
+
+const JWTPublicKey* JWKSSnapshot::LookupHSKey(const std::string& kid) const {
+  return FindPointeeOrNull(hs_key_map_, kid);
+}
+
+const JWTPublicKey* JWKSSnapshot::LookupRSAPublicKey(const std::string& kid) const {
+  return FindPointeeOrNull(rsa_pub_key_map_, kid);
+}
+
+const JWTPublicKey* JWKSSnapshot::LookupECPublicKey(const std::string& kid) const {
+  return FindPointeeOrNull(ec_pub_key_map_, kid);
+}
+
+//
+// JWKSMgr member functions.
+//
+
+JWKSMgr::~JWKSMgr() {
+  shut_down_promise_.Set(true);
+  if (jwks_update_thread_ != nullptr) jwks_update_thread_->Join();
+}
+
+Status JWKSMgr::Init(const std::string& jwks_uri, bool is_local_file) {
+  Status status;
+  jwks_uri_ = jwks_uri;
+  std::shared_ptr<JWKSSnapshot> new_jwks = std::make_shared<JWKSSnapshot>();
+  if (is_local_file) {
+    status = new_jwks->LoadKeysFromFile(jwks_uri);
+    if (!status.ok()) {
+      LOG(ERROR) << "Failed to load JWKS: " << status.ToString();
+      return status;
+    }
+    SetJWKSSnapshot(new_jwks);
+  } else {
+    bool is_changed = false;
+    status = new_jwks->LoadKeysFromUrl(jwks_uri, current_jwks_checksum_, &is_changed);
+    if (!status.ok()) {
+      LOG(ERROR) << "Failed to load JWKS: " << status.ToString();
+      return status;
+    }
+    DCHECK(is_changed);
+    if (is_changed) SetJWKSSnapshot(new_jwks);
+
+    // Start a working thread to periodically check the JWKS URL for updates.
+    RETURN_NOT_OK(Thread::Create("impala-server", "JWKS-mgr",
+        [this] { return UpdateJWKSThread(); }, &jwks_update_thread_));
+  }
+
+  // This is only a warning as JWKS information might be changing over time, if for some reason,
+  // the file which URI is pointing at becomes empty, it'll still be downloaded, but no keys will be
+  // verified successfully (due to no public keys in the JWKS to do so).
+  // Since the UpdateJWKSThread is still alive, if the JWKS file/endpoint is fixed, then the
+  // verification will be successful again.
+  if (new_jwks->IsEmpty()) LOG(WARNING) << "JWKS file is empty.";
+  return Status::OK();
+}
+
+void JWKSMgr::UpdateJWKSThread() {
+  std::shared_ptr<JWKSSnapshot> new_jwks;
+  const MonoDelta &timeout = MonoDelta::FromSeconds(FLAGS_jwks_update_frequency_s);
+
+  while (true) {
+    if (shut_down_promise_.WaitFor(timeout) != nullptr) {
+      // Shutdown has happened, stop updating JWKS.
+      break;
+    }
+
+
+    new_jwks = std::make_shared<JWKSSnapshot>();
+    bool is_changed = false;
+    Status status =
+        new_jwks->LoadKeysFromUrl(jwks_uri_, current_jwks_checksum_, &is_changed);
+    if (!status.ok()) {
+      LOG(WARNING) << "Failed to update JWKS: " << status.ToString();
+    } else if (is_changed) {
+      SetJWKSSnapshot(new_jwks);
+    }
+    new_jwks.reset();
+  }
+  // The promise must be set to true.
+  DCHECK(shut_down_promise_.Get());
+}
+
+JWKSSnapshotPtr JWKSMgr::GetJWKSSnapshot() const {
+  std::lock_guard<std::mutex> l(current_jwks_lock_);
+  DCHECK(current_jwks_.get() != nullptr);
+  JWKSSnapshotPtr jwks = current_jwks_;
+  return jwks;
+}
+
+void JWKSMgr::SetJWKSSnapshot(const JWKSSnapshotPtr& new_jwks) {
+  std::lock_guard<std::mutex> l(current_jwks_lock_);
+  DCHECK(new_jwks.get() != nullptr);
+  current_jwks_ = new_jwks;
+  current_jwks_checksum_ = new_jwks->GetChecksum();
+}
+
+//
+// JWTHelper member functions.
+//
+
+struct JWTHelper::JWTDecodedToken {
+  explicit JWTDecodedToken(DecodedJWT  decoded_jwt) : decoded_jwt_(std::move(decoded_jwt)) {}
+  DecodedJWT decoded_jwt_;
+};
+
+JWTHelper* JWTHelper::jwt_helper_ = new JWTHelper();
+
+void JWTHelper::TokenDeleter::operator()(JWTHelper::JWTDecodedToken* token) const {
+  delete token;
+}
+
+Status JWTHelper::Init(const std::string& jwks_uri, bool is_local_file) {
+  jwks_mgr_.reset(new JWKSMgr());
+  RETURN_NOT_OK(jwks_mgr_->Init(jwks_uri, is_local_file));
+  if (!initialized_) initialized_ = true;
+  return Status::OK();
+}
+
+JWKSSnapshotPtr JWTHelper::GetJWKS() const {
+  DCHECK(initialized_);
+  return jwks_mgr_->GetJWKSSnapshot();
+}
+
+// Decode the given JWT token.
+Status JWTHelper::Decode(const string& token, UniqueJWTDecodedToken& decoded_token_out) {
+  Status status;
+  try {
+    // Call jwt-cpp API to decode the JWT token with default jwt::json_traits
+    // (jwt::picojson_traits).
+    decoded_token_out.reset(new JWTDecodedToken(jwt::decode(token)));
+#ifndef NDEBUG
+    std::stringstream msg;
+    msg << "JWT token header: ";
+    for (auto& [key, val] : decoded_token_out.get()->decoded_jwt_.get_header_claims()) {
+      msg << key << "=" << val.to_json().serialize() << ";";
+    }
+    msg << " JWT token payload: ";
+    for (auto& [key, val] : decoded_token_out.get()->decoded_jwt_.get_payload_claims()) {
+      msg << key << "=" << val.to_json().serialize() << ";";
+    }
+    VLOG(3) << msg.str();
+#endif
+  } catch (const std::invalid_argument& e) {
+    status = Status::NotAuthorized(
+        Substitute("Token is not in correct format, error: $0", e.what()));
+  } catch (const std::runtime_error& e) {
+    status = Status::NotAuthorized(
+        Substitute("Base64 decoding failed or invalid json, error: $0", e.what()));
+  }
+  return status;
+}
+
+// Validate the token's signature with public key.
+Status JWTHelper::Verify(const JWTDecodedToken* decoded_token) const {
+  Status status;
+  DCHECK(initialized_);
+
+  const auto& decoded_jwt = decoded_token->decoded_jwt_;
+
+  if (decoded_jwt.get_signature().empty()) {
+    // Don't accept JWT without a signature.
+    return Status::NotAuthorized("Unsecured JWT");
+  }
+  if (jwks_mgr_ == nullptr) {
+    // Skip to signature validation if JWKS file or url is not specified.
+    return Status::OK();
+  }
+
+  JWKSSnapshotPtr jwks = GetJWKS();
+  if (jwks->IsEmpty()) {
+    return Status::NotAuthorized("Verification failed, no matching valid key");
+  }
+
+  try {
+    string algorithm =
+        boost::algorithm::to_lower_copy(decoded_jwt.get_algorithm());
+    string prefix = algorithm.substr(0, 2);
+    if (decoded_jwt.has_key_id()) {
+      // Get key id from token's header and use it to retrieve the public key from JWKS.
+      std::string key_id = decoded_jwt.get_key_id();
+
+      const JWTPublicKey* pub_key = nullptr;
+      if (prefix == "hs") {
+        pub_key = jwks->LookupHSKey(key_id);
+      } else if (prefix == "rs" || prefix == "ps") {
+        pub_key = jwks->LookupRSAPublicKey(key_id);
+      } else if (prefix == "es") {
+        pub_key = jwks->LookupECPublicKey(key_id);
+      } else {
+        return Status::NotAuthorized(
+            Substitute("Unsupported cryptographic algorithm '$0' for JWT", algorithm));
+      }
+      if (pub_key == nullptr) {
+        return Status::NotAuthorized("Invalid JWK ID in the JWT token");
+      }
+      // Use the public key to verify the token's signature.
+      status = pub_key->Verify(decoded_jwt, algorithm);
+    } else {
+      // According to RFC 7517 (JSON Web Key), 'kid' is OPTIONAL so it's possible there
+      // is no key id in the token's header. In this case, get all of public keys from
+      // JWKS for the family of algorithms.
+      const JWKSSnapshot::JWTPublicKeyMap* key_map = nullptr;
+      if (prefix == "hs") {
+        key_map = jwks->GetAllHSKeys();
+      } else if (prefix == "rs" || prefix == "ps") {
+        key_map = jwks->GetAllRSAPublicKeys();
+      } else if (prefix == "es") {
+        key_map = jwks->GetAllECPublicKeys();
+      } else {
+        return Status::NotAuthorized(
+            Substitute("Unsupported cryptographic algorithm '$0' for JWT", algorithm));
+      }
+      if (key_map->empty()) {
+        return Status::NotAuthorized("Verification failed, no matching key");
+      }
+      // Try each key with matching algorithm util the signature is verified.
+      for (const auto& key : *key_map) {
+        status = key.second->Verify(decoded_jwt, algorithm);
+        if (status.ok()) return status;
+      }
+    }
+  } catch (const std::bad_cast& e) {
+    status = Status::NotAuthorized(
+        Substitute("Claim was present but not a string, error: $0", e.what()));
+  } catch (const jwt::error::claim_not_present_exception& e) {
+    status = Status::NotAuthorized(
+        Substitute("Claim not present in JWT token, error $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status::NotAuthorized(
+        Substitute("Token verification failed, error: $0", e.what()));
+  }
+  return status;
+}
+
+Status JWTHelper::GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
+    const string& jwt_custom_claim_username, string& username) {
+  DCHECK(!jwt_custom_claim_username.empty());
+  Status status;
+  const auto& decoded_jwt = decoded_token->decoded_jwt_;
+  try {
+    // Get value of custom claim 'username' from the token payload.
+    if (decoded_jwt.has_payload_claim(jwt_custom_claim_username)) {
+      // Assume the claim data type of 'username' is string.
+      username.assign(
+          decoded_jwt.get_payload_claim(jwt_custom_claim_username)
+              .to_json()
+              .to_str());
+      if (username.empty()) {
+        status = Status::NotAuthorized(
+            Substitute("Claim '$0' is empty", jwt_custom_claim_username));
+      }
+    } else {
+      status = Status::NotAuthorized(
+          Substitute("Claim '$0' was not present", jwt_custom_claim_username));
+    }
+  } catch (const std::runtime_error& e) {
+    status = Status::NotAuthorized(
+        Substitute("Claim '$0' was not present, error: $1", jwt_custom_claim_username,
+            e.what()));
+  }
+  return status;
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/jwt-util.h b/src/kudu/util/jwt-util.h
new file mode 100644
index 0000000..76b7f21
--- /dev/null
+++ b/src/kudu/util/jwt-util.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Copied from Impala and adapted to Kudu.
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "kudu/util/jwt-util-internal.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// JSON Web Token (JWT) is an Internet proposed standard for creating data with optional
+// signature and/or optional encryption whose payload holds JSON that asserts some
+// number of claims. The tokens are signed either using a private secret or a public/
+// private key.
+// This class works as wrapper for jwt-cpp. It provides APIs to decode/verify JWT token,
+// and extracts custom claim from the payload of JWT token.
+// The class is thread safe.
+class JWTHelper {
+ public:
+  // Opaque types for storing the JWT decoded token. This allows us to avoid including
+  // header file jwt-cpp/jwt.h.
+  struct JWTDecodedToken;
+
+  // Custom deleter: intended for use with std::unique_ptr<JWTDecodedToken>.
+  class TokenDeleter {
+   public:
+    // Called by unique_ptr to free JWTDecodedToken
+    void operator()(JWTHelper::JWTDecodedToken* token) const;
+  };
+  // UniqueJWTDecodedToken -- a wrapper around opaque decoded token structure to
+  // facilitate automatic reference counting.
+  typedef std::unique_ptr<JWTDecodedToken, TokenDeleter> UniqueJWTDecodedToken;
+
+  // Return the single instance.
+  static JWTHelper* GetInstance() { return jwt_helper_; }
+
+  // Load JWKS from a given local JSON file or URL. Returns an error if problems were
+  // encountered.
+  Status Init(const std::string& jwks_uri, bool is_local_file);
+
+  // Decode the given JWT token. The decoding result is stored in decoded_token_out.
+  // Return Status::OK if the decoding is successful.
+  static Status Decode(
+      const std::string& token, UniqueJWTDecodedToken& decoded_token_out);
+
+  // Verify the token's signature with the JWKS. The token should be already decoded by
+  // calling Decode().
+  // Return Status::OK if the verification is successful.
+  Status Verify(const JWTDecodedToken* decoded_token) const;
+
+  // Extract custom claim "Username" from the payload of the decoded JWT token.
+  // Return Status::OK if the extraction is successful.
+  static Status GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
+      const std::string& custom_claim_username, std::string& username);
+
+  // Return snapshot of JWKS.
+  std::shared_ptr<const JWKSSnapshot> GetJWKS() const;
+
+ private:
+  // Single instance.
+  static JWTHelper* jwt_helper_;
+
+  // Set it as TRUE when Init() is called.
+  bool initialized_ = false;
+
+  // JWKS Manager for Json Web Token (JWT) verification.
+  // Only one instance per daemon.
+  std::unique_ptr<JWKSMgr> jwks_mgr_;
+};
+
+} // namespace kudu
diff --git a/src/kudu/util/promise.h b/src/kudu/util/promise.h
index 17f8cec..ab3f824 100644
--- a/src/kudu/util/promise.h
+++ b/src/kudu/util/promise.h
@@ -56,9 +56,8 @@
   const T* WaitFor(const MonoDelta& delta) const {
     if (latch_.WaitFor(delta)) {
       return &val_;
-    } else {
-      return NULL;
     }
+    return nullptr;
   }
 
   // Set the value of this promise.