[pulsar-client] support input-stream for trustStore cert (#7442)

* [pulsar-client] support input-stream for trustStore cert

remove file closing

fix check-style

* fix flaky test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 9f1eac8..614e75e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -142,16 +142,18 @@
         log.info("-- Starting {} test --", methodName);
         String topicName = "persistent://my-property/use/my-ns/my-topic1";
         ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
-                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+                .enableTls(true).allowTlsInsecureConnection(false)
                 .operationTimeout(1000, TimeUnit.MILLISECONDS);
         AtomicInteger index = new AtomicInteger(0);
 
         ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
         ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+        ByteArrayInputStream trustStoreStream = createByteInputStream(TLS_TRUST_CERT_FILE_PATH);
 
         Supplier<ByteArrayInputStream> certProvider = () -> getStream(index, certStream);
         Supplier<ByteArrayInputStream> keyProvider = () -> getStream(index, keyStream);
-        AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider);
+        Supplier<ByteArrayInputStream> trustStoreProvider = () -> getStream(index, trustStoreStream);
+        AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider);
         clientBuilder.authentication(auth);
         @Cleanup
         PulsarClient pulsarClient = clientBuilder.build();
@@ -196,16 +198,20 @@
     public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception {
         log.info("-- Starting {} test --", methodName);
         ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
-                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+                .enableTls(true).allowTlsInsecureConnection(false)
                 .operationTimeout(1000, TimeUnit.MILLISECONDS);
         AtomicInteger certIndex = new AtomicInteger(1);
         AtomicInteger keyIndex = new AtomicInteger(0);
+        AtomicInteger trustStoreIndex = new AtomicInteger(1);
         ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
         ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+        ByteArrayInputStream trustStoreStream = createByteInputStream(TLS_TRUST_CERT_FILE_PATH);
         Supplier<ByteArrayInputStream> certProvider = () -> getStream(certIndex, certStream,
                 keyStream/* invalid cert file */);
         Supplier<ByteArrayInputStream> keyProvider = () -> getStream(keyIndex, keyStream);
-        AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider);
+        Supplier<ByteArrayInputStream> trustStoreProvider = () -> getStream(trustStoreIndex, trustStoreStream,
+                keyStream/* invalid cert file */);
+        AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider);
         clientBuilder.authentication(auth);
         @Cleanup
         PulsarClient pulsarClient = clientBuilder.build();
@@ -219,6 +225,15 @@
         }
 
         certIndex.set(0);
+        try {
+            consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+                    .subscriptionName("my-subscriber-name").subscribe();
+            Assert.fail("should have failed due to invalid tls cert");
+        } catch (PulsarClientException e) {
+            // Ok..
+        }
+        
+        trustStoreIndex.set(0);
         consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name").subscribe();
         consumer.close();
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 70373fb..1cb21ff 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -139,11 +139,15 @@
                 } else {
                     SslContext sslCtx = null;
                     if (authData.hasDataForTls()) {
-                        sslCtx = SecurityUtility.createNettySslContextForClient(
-                                conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
-                                conf.getTlsTrustCertsFilePath(),
-                                authData.getTlsCertificates(),
-                                authData.getTlsPrivateKey());
+                        sslCtx = authData.getTlsTrustStoreStream() == null
+                                ? SecurityUtility.createNettySslContextForClient(
+                                        conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+                                        conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
+                                        authData.getTlsPrivateKey())
+                                : SecurityUtility.createNettySslContextForClient(
+                                        conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+                                        authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
+                                        authData.getTlsPrivateKey());
                     } else {
                         sslCtx = SecurityUtility.createNettySslContextForClient(
                                 conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index 77eafe5..ea15cda 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -20,6 +20,7 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.io.InputStream;
 import java.io.Serializable;
 import java.security.PrivateKey;
 import java.security.cert.Certificate;
@@ -64,6 +65,15 @@
     }
 
     /**
+     *
+     * @return an input-stream of the trust store, or null if the trust-store provided at
+     *         {@link ClientConfigurationData#getTlsTrustStorePath()}
+     */
+    default InputStream getTlsTrustStoreStream() {
+        return null;
+    }
+
+    /**
      * Used for TLS authentication with keystore type.
      *
      * @return a KeyStoreParams for the client certificate chain, or null if the data are not available
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 3ba02bb..1d3839c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -35,6 +35,7 @@
 import io.netty.handler.ssl.SslContext;
 import javax.net.ssl.SSLContext;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -112,11 +113,13 @@
                 } else {
                     SslContext sslCtx = null;
                     if (authData.hasDataForTls()) {
-                        sslCtx = SecurityUtility.createNettySslContextForClient(
-                                conf.isTlsAllowInsecureConnection(),
-                                conf.getTlsTrustCertsFilePath(),
-                                authData.getTlsCertificates(),
-                                authData.getTlsPrivateKey());
+                        sslCtx = authData.getTlsTrustStoreStream() == null
+                                ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+                                        conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
+                                        authData.getTlsPrivateKey())
+                                : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+                                        authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
+                                        authData.getTlsPrivateKey());
                     }
                     else {
                         sslCtx = SecurityUtility.createNettySslContextForClient(
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index e418904..ef2a78b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -18,17 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslHandler;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-import lombok.extern.slf4j.Slf4j;
-import lombok.Getter;
-import lombok.Setter;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -39,6 +31,15 @@
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
 
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
 @Slf4j
 public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
 
@@ -86,9 +87,13 @@
                     // Set client certificate if available
                     AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
                     if (authData.hasDataForTls()) {
-                        return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
-                                conf.getTlsTrustCertsFilePath(), (X509Certificate[]) authData.getTlsCertificates(),
-                                authData.getTlsPrivateKey());
+                        return authData.getTlsTrustStoreStream() == null
+                                ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+                                        conf.getTlsTrustCertsFilePath(),
+                                        (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey())
+                                : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+                                        authData.getTlsTrustStoreStream(),
+                                        (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey());
                     } else {
                         return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
                                 conf.getTlsTrustCertsFilePath());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index 0d3df12..f11e974 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -41,7 +41,7 @@
     private FileModifiedTimeUpdater certFile, keyFile;
     // key and cert using stream
     private InputStream certStream, keyStream;
-    private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider;
+    private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
 
     public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException {
         if (certFilePath == null) {
@@ -58,6 +58,12 @@
 
     public AuthenticationDataTls(Supplier<ByteArrayInputStream> certStreamProvider,
             Supplier<ByteArrayInputStream> keyStreamProvider) throws KeyManagementException {
+        this(certStreamProvider, keyStreamProvider, null);
+    }
+
+    public AuthenticationDataTls(Supplier<ByteArrayInputStream> certStreamProvider,
+            Supplier<ByteArrayInputStream> keyStreamProvider, Supplier<ByteArrayInputStream> trustStoreStreamProvider)
+            throws KeyManagementException {
         if (certStreamProvider == null || certStreamProvider.get() == null) {
             throw new IllegalArgumentException("certStream provider or stream must not be null");
         }
@@ -66,12 +72,12 @@
         }
         this.certStreamProvider = certStreamProvider;
         this.keyStreamProvider = keyStreamProvider;
+        this.trustStoreStreamProvider = trustStoreStreamProvider;
         this.certStream = certStreamProvider.get();
         this.keyStream = keyStreamProvider.get();
         this.tlsCertificates = SecurityUtility.loadCertificatesFromPemStream(certStream);
         this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemStream(keyStream);
     }
-
     /*
      * TLS
      */
@@ -121,5 +127,10 @@
         return this.tlsPrivateKey;
     }
 
+    @Override
+    public InputStream getTlsTrustStoreStream() {
+        return trustStoreStreamProvider != null ? trustStoreStreamProvider.get() : null;
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(AuthenticationDataTls.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index d899146..326fa46 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -45,7 +45,7 @@
 
     private String certFilePath;
     private String keyFilePath;
-    private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider;
+    private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
 
     public AuthenticationTls() {
     }
@@ -55,9 +55,16 @@
         this.keyFilePath = keyFilePath;
     }
 
-    public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider, Supplier<ByteArrayInputStream> keyStreamProvider) {
+    public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider,
+            Supplier<ByteArrayInputStream> keyStreamProvider) {
+        this(certStreamProvider, keyStreamProvider, null);
+    }
+
+    public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider,
+            Supplier<ByteArrayInputStream> keyStreamProvider, Supplier<ByteArrayInputStream> trustStoreStreamProvider) {
         this.certStreamProvider = certStreamProvider;
         this.keyStreamProvider = keyStreamProvider;
+        this.trustStoreStreamProvider = trustStoreStreamProvider;
     }
 
     @Override
@@ -76,7 +83,7 @@
             if (certFilePath != null && keyFilePath != null) {
                 return new AuthenticationDataTls(certFilePath, keyFilePath);
             } else if (certStreamProvider != null && keyStreamProvider != null) {
-                return new AuthenticationDataTls(certStreamProvider, keyStreamProvider);
+                return new AuthenticationDataTls(certStreamProvider, keyStreamProvider, trustStoreStreamProvider);
             }
         } catch (Exception e) {
             throw new PulsarClientException(e);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
index 48cf992..35919c9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
@@ -19,12 +19,14 @@
 package org.apache.pulsar.common.util;
 
 import io.netty.handler.ssl.SslContext;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.cert.X509Certificate;
 import javax.net.ssl.SSLException;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 
 /**
@@ -52,9 +54,13 @@
     public synchronized SslContext update()
             throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
         if (authData != null && authData.hasDataForTls()) {
-            this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
-                    this.tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(),
-                    authData.getTlsPrivateKey());
+            this.sslNettyContext = authData.getTlsTrustStoreStream() == null
+                    ? SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+                            tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(),
+                            authData.getTlsPrivateKey())
+                    : SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+                            authData.getTlsTrustStoreStream(), (X509Certificate[]) authData.getTlsCertificates(),
+                            authData.getTlsPrivateKey());
         } else {
             this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
                     this.tlsTrustCertsFilePath.getFileName());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 555cbac..2e3633d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -56,6 +56,8 @@
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang3.StringUtils;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 
 /**
@@ -165,8 +167,23 @@
     public static SslContext createNettySslContextForClient(boolean allowInsecureConnection, String trustCertsFilePath,
             Certificate[] certificates, PrivateKey privateKey)
             throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+
+        if (StringUtils.isNotBlank(trustCertsFilePath)) {
+            try (FileInputStream trustCertsStream = new FileInputStream(trustCertsFilePath)) {
+                return createNettySslContextForClient(allowInsecureConnection, trustCertsStream, certificates,
+                        privateKey);
+            }
+        } else {
+            return createNettySslContextForClient(allowInsecureConnection, (InputStream) null, certificates,
+                    privateKey);
+        }
+    }
+
+    public static SslContext createNettySslContextForClient(boolean allowInsecureConnection,
+            InputStream trustCertsStream, Certificate[] certificates, PrivateKey privateKey)
+            throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
         SslContextBuilder builder = SslContextBuilder.forClient();
-        setupTrustCerts(builder, allowInsecureConnection, trustCertsFilePath);
+        setupTrustCerts(builder, allowInsecureConnection, trustCertsStream);
         setupKeyManager(builder, privateKey, (X509Certificate[]) certificates);
         return builder.build();
     }
@@ -181,7 +198,13 @@
         SslContextBuilder builder = SslContextBuilder.forServer(privateKey, (X509Certificate[]) certificates);
         setupCiphers(builder, ciphers);
         setupProtocols(builder, protocols);
-        setupTrustCerts(builder, allowInsecureConnection, trustCertsFilePath);
+        if (StringUtils.isNotBlank(trustCertsFilePath)) {
+            try (FileInputStream trustCertsStream = new FileInputStream(trustCertsFilePath)) {
+                setupTrustCerts(builder, allowInsecureConnection, trustCertsStream);
+            }
+        } else {
+            setupTrustCerts(builder, allowInsecureConnection, null);
+        }
         setupKeyManager(builder, privateKey, certificates);
         setupClientAuthentication(builder, requireTrustedClientCertOnConnect);
         return builder.build();
@@ -320,14 +343,12 @@
     }
 
     private static void setupTrustCerts(SslContextBuilder builder, boolean allowInsecureConnection,
-            String trustCertsFilePath) throws IOException, FileNotFoundException {
+            InputStream trustCertsStream) throws IOException, FileNotFoundException {
         if (allowInsecureConnection) {
             builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
         } else {
-            if (trustCertsFilePath != null && trustCertsFilePath.length() != 0) {
-                try (FileInputStream input = new FileInputStream(trustCertsFilePath)) {
-                    builder.trustManager(input);
-                }
+            if (trustCertsStream != null) {
+                builder.trustManager(trustCertsStream);
             } else {
                 builder.trustManager((File) null);
             }