[pulsar-client] support input-stream for trustStore cert (#7442)
* [pulsar-client] support input-stream for trustStore cert
remove file closing
fix check-style
* fix flaky test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 9f1eac8..614e75e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -142,16 +142,18 @@
log.info("-- Starting {} test --", methodName);
String topicName = "persistent://my-property/use/my-ns/my-topic1";
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
- .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+ .enableTls(true).allowTlsInsecureConnection(false)
.operationTimeout(1000, TimeUnit.MILLISECONDS);
AtomicInteger index = new AtomicInteger(0);
ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+ ByteArrayInputStream trustStoreStream = createByteInputStream(TLS_TRUST_CERT_FILE_PATH);
Supplier<ByteArrayInputStream> certProvider = () -> getStream(index, certStream);
Supplier<ByteArrayInputStream> keyProvider = () -> getStream(index, keyStream);
- AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider);
+ Supplier<ByteArrayInputStream> trustStoreProvider = () -> getStream(index, trustStoreStream);
+ AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider);
clientBuilder.authentication(auth);
@Cleanup
PulsarClient pulsarClient = clientBuilder.build();
@@ -196,16 +198,20 @@
public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception {
log.info("-- Starting {} test --", methodName);
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
- .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+ .enableTls(true).allowTlsInsecureConnection(false)
.operationTimeout(1000, TimeUnit.MILLISECONDS);
AtomicInteger certIndex = new AtomicInteger(1);
AtomicInteger keyIndex = new AtomicInteger(0);
+ AtomicInteger trustStoreIndex = new AtomicInteger(1);
ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+ ByteArrayInputStream trustStoreStream = createByteInputStream(TLS_TRUST_CERT_FILE_PATH);
Supplier<ByteArrayInputStream> certProvider = () -> getStream(certIndex, certStream,
keyStream/* invalid cert file */);
Supplier<ByteArrayInputStream> keyProvider = () -> getStream(keyIndex, keyStream);
- AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider);
+ Supplier<ByteArrayInputStream> trustStoreProvider = () -> getStream(trustStoreIndex, trustStoreStream,
+ keyStream/* invalid cert file */);
+ AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider);
clientBuilder.authentication(auth);
@Cleanup
PulsarClient pulsarClient = clientBuilder.build();
@@ -219,6 +225,15 @@
}
certIndex.set(0);
+ try {
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("my-subscriber-name").subscribe();
+ Assert.fail("should have failed due to invalid tls cert");
+ } catch (PulsarClientException e) {
+ // Ok..
+ }
+
+ trustStoreIndex.set(0);
consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
consumer.close();
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index 70373fb..1cb21ff 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -139,11 +139,15 @@
} else {
SslContext sslCtx = null;
if (authData.hasDataForTls()) {
- sslCtx = SecurityUtility.createNettySslContextForClient(
- conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
- conf.getTlsTrustCertsFilePath(),
- authData.getTlsCertificates(),
- authData.getTlsPrivateKey());
+ sslCtx = authData.getTlsTrustStoreStream() == null
+ ? SecurityUtility.createNettySslContextForClient(
+ conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+ conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
+ authData.getTlsPrivateKey())
+ : SecurityUtility.createNettySslContextForClient(
+ conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
+ authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
+ authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createNettySslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
index 77eafe5..ea15cda 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/AuthenticationDataProvider.java
@@ -20,6 +20,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
+import java.io.InputStream;
import java.io.Serializable;
import java.security.PrivateKey;
import java.security.cert.Certificate;
@@ -64,6 +65,15 @@
}
/**
+ *
+ * @return an input-stream of the trust store, or null if the trust-store provided at
+ * {@link ClientConfigurationData#getTlsTrustStorePath()}
+ */
+ default InputStream getTlsTrustStoreStream() {
+ return null;
+ }
+
+ /**
* Used for TLS authentication with keystore type.
*
* @return a KeyStoreParams for the client certificate chain, or null if the data are not available
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 3ba02bb..1d3839c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -35,6 +35,7 @@
import io.netty.handler.ssl.SslContext;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -112,11 +113,13 @@
} else {
SslContext sslCtx = null;
if (authData.hasDataForTls()) {
- sslCtx = SecurityUtility.createNettySslContextForClient(
- conf.isTlsAllowInsecureConnection(),
- conf.getTlsTrustCertsFilePath(),
- authData.getTlsCertificates(),
- authData.getTlsPrivateKey());
+ sslCtx = authData.getTlsTrustStoreStream() == null
+ ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
+ authData.getTlsPrivateKey())
+ : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+ authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
+ authData.getTlsPrivateKey());
}
else {
sslCtx = SecurityUtility.createNettySslContextForClient(
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
index e418904..ef2a78b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java
@@ -18,17 +18,9 @@
*/
package org.apache.pulsar.client.impl;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslHandler;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import lombok.extern.slf4j.Slf4j;
-import lombok.Getter;
-import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -39,6 +31,15 @@
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
@Slf4j
public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> {
@@ -86,9 +87,13 @@
// Set client certificate if available
AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
if (authData.hasDataForTls()) {
- return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
- conf.getTlsTrustCertsFilePath(), (X509Certificate[]) authData.getTlsCertificates(),
- authData.getTlsPrivateKey());
+ return authData.getTlsTrustStoreStream() == null
+ ? SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+ conf.getTlsTrustCertsFilePath(),
+ (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey())
+ : SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
+ authData.getTlsTrustStoreStream(),
+ (X509Certificate[]) authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
return SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection(),
conf.getTlsTrustCertsFilePath());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
index 0d3df12..f11e974 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataTls.java
@@ -41,7 +41,7 @@
private FileModifiedTimeUpdater certFile, keyFile;
// key and cert using stream
private InputStream certStream, keyStream;
- private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider;
+ private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
public AuthenticationDataTls(String certFilePath, String keyFilePath) throws KeyManagementException {
if (certFilePath == null) {
@@ -58,6 +58,12 @@
public AuthenticationDataTls(Supplier<ByteArrayInputStream> certStreamProvider,
Supplier<ByteArrayInputStream> keyStreamProvider) throws KeyManagementException {
+ this(certStreamProvider, keyStreamProvider, null);
+ }
+
+ public AuthenticationDataTls(Supplier<ByteArrayInputStream> certStreamProvider,
+ Supplier<ByteArrayInputStream> keyStreamProvider, Supplier<ByteArrayInputStream> trustStoreStreamProvider)
+ throws KeyManagementException {
if (certStreamProvider == null || certStreamProvider.get() == null) {
throw new IllegalArgumentException("certStream provider or stream must not be null");
}
@@ -66,12 +72,12 @@
}
this.certStreamProvider = certStreamProvider;
this.keyStreamProvider = keyStreamProvider;
+ this.trustStoreStreamProvider = trustStoreStreamProvider;
this.certStream = certStreamProvider.get();
this.keyStream = keyStreamProvider.get();
this.tlsCertificates = SecurityUtility.loadCertificatesFromPemStream(certStream);
this.tlsPrivateKey = SecurityUtility.loadPrivateKeyFromPemStream(keyStream);
}
-
/*
* TLS
*/
@@ -121,5 +127,10 @@
return this.tlsPrivateKey;
}
+ @Override
+ public InputStream getTlsTrustStoreStream() {
+ return trustStoreStreamProvider != null ? trustStoreStreamProvider.get() : null;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationDataTls.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index d899146..326fa46 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -45,7 +45,7 @@
private String certFilePath;
private String keyFilePath;
- private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider;
+ private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider;
public AuthenticationTls() {
}
@@ -55,9 +55,16 @@
this.keyFilePath = keyFilePath;
}
- public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider, Supplier<ByteArrayInputStream> keyStreamProvider) {
+ public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider,
+ Supplier<ByteArrayInputStream> keyStreamProvider) {
+ this(certStreamProvider, keyStreamProvider, null);
+ }
+
+ public AuthenticationTls(Supplier<ByteArrayInputStream> certStreamProvider,
+ Supplier<ByteArrayInputStream> keyStreamProvider, Supplier<ByteArrayInputStream> trustStoreStreamProvider) {
this.certStreamProvider = certStreamProvider;
this.keyStreamProvider = keyStreamProvider;
+ this.trustStoreStreamProvider = trustStoreStreamProvider;
}
@Override
@@ -76,7 +83,7 @@
if (certFilePath != null && keyFilePath != null) {
return new AuthenticationDataTls(certFilePath, keyFilePath);
} else if (certStreamProvider != null && keyStreamProvider != null) {
- return new AuthenticationDataTls(certStreamProvider, keyStreamProvider);
+ return new AuthenticationDataTls(certStreamProvider, keyStreamProvider, trustStoreStreamProvider);
}
} catch (Exception e) {
throw new PulsarClientException(e);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
index 48cf992..35919c9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/NettyClientSslContextRefresher.java
@@ -19,12 +19,14 @@
package org.apache.pulsar.common.util;
import io.netty.handler.ssl.SslContext;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.api.AuthenticationDataProvider;
/**
@@ -52,9 +54,13 @@
public synchronized SslContext update()
throws SSLException, FileNotFoundException, GeneralSecurityException, IOException {
if (authData != null && authData.hasDataForTls()) {
- this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
- this.tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(),
- authData.getTlsPrivateKey());
+ this.sslNettyContext = authData.getTlsTrustStoreStream() == null
+ ? SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+ tlsTrustCertsFilePath.getFileName(), (X509Certificate[]) authData.getTlsCertificates(),
+ authData.getTlsPrivateKey())
+ : SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
+ authData.getTlsTrustStoreStream(), (X509Certificate[]) authData.getTlsCertificates(),
+ authData.getTlsPrivateKey());
} else {
this.sslNettyContext = SecurityUtility.createNettySslContextForClient(this.tlsAllowInsecureConnection,
this.tlsTrustCertsFilePath.getFileName());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 555cbac..2e3633d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -56,6 +56,8 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang3.StringUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
/**
@@ -165,8 +167,23 @@
public static SslContext createNettySslContextForClient(boolean allowInsecureConnection, String trustCertsFilePath,
Certificate[] certificates, PrivateKey privateKey)
throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
+
+ if (StringUtils.isNotBlank(trustCertsFilePath)) {
+ try (FileInputStream trustCertsStream = new FileInputStream(trustCertsFilePath)) {
+ return createNettySslContextForClient(allowInsecureConnection, trustCertsStream, certificates,
+ privateKey);
+ }
+ } else {
+ return createNettySslContextForClient(allowInsecureConnection, (InputStream) null, certificates,
+ privateKey);
+ }
+ }
+
+ public static SslContext createNettySslContextForClient(boolean allowInsecureConnection,
+ InputStream trustCertsStream, Certificate[] certificates, PrivateKey privateKey)
+ throws GeneralSecurityException, SSLException, FileNotFoundException, IOException {
SslContextBuilder builder = SslContextBuilder.forClient();
- setupTrustCerts(builder, allowInsecureConnection, trustCertsFilePath);
+ setupTrustCerts(builder, allowInsecureConnection, trustCertsStream);
setupKeyManager(builder, privateKey, (X509Certificate[]) certificates);
return builder.build();
}
@@ -181,7 +198,13 @@
SslContextBuilder builder = SslContextBuilder.forServer(privateKey, (X509Certificate[]) certificates);
setupCiphers(builder, ciphers);
setupProtocols(builder, protocols);
- setupTrustCerts(builder, allowInsecureConnection, trustCertsFilePath);
+ if (StringUtils.isNotBlank(trustCertsFilePath)) {
+ try (FileInputStream trustCertsStream = new FileInputStream(trustCertsFilePath)) {
+ setupTrustCerts(builder, allowInsecureConnection, trustCertsStream);
+ }
+ } else {
+ setupTrustCerts(builder, allowInsecureConnection, null);
+ }
setupKeyManager(builder, privateKey, certificates);
setupClientAuthentication(builder, requireTrustedClientCertOnConnect);
return builder.build();
@@ -320,14 +343,12 @@
}
private static void setupTrustCerts(SslContextBuilder builder, boolean allowInsecureConnection,
- String trustCertsFilePath) throws IOException, FileNotFoundException {
+ InputStream trustCertsStream) throws IOException, FileNotFoundException {
if (allowInsecureConnection) {
builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
} else {
- if (trustCertsFilePath != null && trustCertsFilePath.length() != 0) {
- try (FileInputStream input = new FileInputStream(trustCertsFilePath)) {
- builder.trustManager(input);
- }
+ if (trustCertsStream != null) {
+ builder.trustManager(trustCertsStream);
} else {
builder.trustManager((File) null);
}