blob: 325ad2b99753c854c9da255079e0a24106ba062a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
*/
@Slf4j
public class BrokerServiceTest extends BrokerTestBase {
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";
@BeforeClass
@Override
protected void setup() throws Exception {
super.baseSetup();
}
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
// method for resetting state explicitly
// this is required since setup & cleanup are using BeforeClass & AfterClass
private void resetState() throws Exception {
cleanup();
setup();
}
@Test
public void testOwnedNsCheck() throws Exception {
final String topic = "persistent://prop/ns-abc/successTopic";
BrokerService service = pulsar.getBrokerService();
final CountDownLatch latch1 = new CountDownLatch(1);
service.getOrCreateTopic(topic).thenAccept(t -> {
latch1.countDown();
fail("should fail as NS is not owned");
}).exceptionally(exception -> {
assertTrue(exception.getCause() instanceof IOException);
latch1.countDown();
return null;
});
latch1.await();
admin.lookups().lookupTopic(topic);
final CountDownLatch latch2 = new CountDownLatch(1);
service.getOrCreateTopic(topic).thenAccept(t -> {
try {
assertNotNull(service.getTopicReference(topic));
} catch (Exception e) {
fail("should not fail");
}
latch2.countDown();
}).exceptionally(exception -> {
latch2.countDown();
fail("should not fail");
return null;
});
latch2.await();
}
@Test
public void testBrokerServicePersistentTopicStats() throws Exception {
// this test might fail if there are stats from other tests
resetState();
final String topicName = "persistent://prop/ns-abc/successTopic";
final String subName = "successSub";
TopicStats stats;
SubscriptionStats subStats;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
rolloverPerIntervalStats();
stats = topicRef.getStats(false, false);
subStats = stats.subscriptions.values().iterator().next();
// subscription stats
assertEquals(stats.subscriptions.keySet().size(), 1);
assertEquals(subStats.msgBacklog, 0);
assertEquals(subStats.consumers.size(), 1);
// storage stats
assertEquals(stats.offloadedStorageSize, 0);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
stats = topicRef.getStats(false, false);
subStats = stats.subscriptions.values().iterator().next();
// publisher stats
assertEquals(subStats.msgBacklog, 10);
assertEquals(stats.publishers.size(), 1);
assertTrue(stats.publishers.get(0).msgRateIn > 0.0);
assertTrue(stats.publishers.get(0).msgThroughputIn > 0.0);
assertTrue(stats.publishers.get(0).averageMsgSize > 0.0);
assertNotNull(stats.publishers.get(0).getClientVersion());
// aggregated publish stats
assertEquals(stats.msgRateIn, stats.publishers.get(0).msgRateIn);
assertEquals(stats.msgThroughputIn, stats.publishers.get(0).msgThroughputIn);
double diff = stats.averageMsgSize - stats.publishers.get(0).averageMsgSize;
assertTrue(Math.abs(diff) < 0.000001);
// consumer stats
assertTrue(subStats.consumers.get(0).msgRateOut > 0.0);
assertTrue(subStats.consumers.get(0).msgThroughputOut > 0.0);
// aggregated consumer stats
assertEquals(subStats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(subStats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertNotNull(subStats.consumers.get(0).getClientVersion());
assertEquals(stats.offloadedStorageSize, 0);
Message<byte[]> msg;
for (int i = 0; i < 10; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
stats = topicRef.getStats(false, false);
subStats = stats.subscriptions.values().iterator().next();
assertEquals(stats.offloadedStorageSize, 0);
assertEquals(subStats.msgBacklog, 0);
}
@Test
public void testStatsOfStorageSizeWithSubscription() throws Exception {
final String topicName = "persistent://prop/ns-abc/no-subscription";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getStats(false, false).storageSize, 0);
for (int i = 0; i < 10; i++) {
producer.send(new byte[10]);
}
assertTrue(topicRef.getStats(false, false).storageSize > 0);
}
@Test
public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
// this test might fail if there are stats from other tests
resetState();
final String topicName = "persistent://prop/ns-abc/successSharedTopic";
final String subName = "successSharedSub";
TopicStats stats;
SubscriptionStats subStats;
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
rolloverPerIntervalStats();
stats = topicRef.getStats(false, false);
subStats = stats.subscriptions.values().iterator().next();
// subscription stats
assertEquals(stats.subscriptions.keySet().size(), 1);
assertEquals(subStats.msgBacklog, 0);
assertEquals(subStats.consumers.size(), 1);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
stats = topicRef.getStats(false, false);
subStats = stats.subscriptions.values().iterator().next();
// publisher stats
assertEquals(subStats.msgBacklog, 10);
assertEquals(stats.publishers.size(), 1);
assertTrue(stats.publishers.get(0).msgRateIn > 0.0);
assertTrue(stats.publishers.get(0).msgThroughputIn > 0.0);
assertTrue(stats.publishers.get(0).averageMsgSize > 0.0);
// aggregated publish stats
assertEquals(stats.msgRateIn, stats.publishers.get(0).msgRateIn);
assertEquals(stats.msgThroughputIn, stats.publishers.get(0).msgThroughputIn);
double diff = stats.averageMsgSize - stats.publishers.get(0).averageMsgSize;
assertTrue(Math.abs(diff) < 0.000001);
// consumer stats
assertTrue(subStats.consumers.get(0).msgRateOut > 0.0);
assertTrue(subStats.consumers.get(0).msgThroughputOut > 0.0);
assertEquals(subStats.msgRateRedeliver, 0.0);
assertEquals(subStats.consumers.get(0).unackedMessages, 10);
// aggregated consumer stats
assertEquals(subStats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(subStats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut);
assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
assertEquals(subStats.unackedMessages, subStats.consumers.get(0).unackedMessages);
consumer.redeliverUnacknowledgedMessages();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
stats = topicRef.getStats(false, false);
subStats = stats.subscriptions.values().iterator().next();
assertTrue(subStats.msgRateRedeliver > 0.0);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
Message<byte[]> msg;
for (int i = 0; i < 10; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
stats = topicRef.getStats(false, false);
subStats = stats.subscriptions.values().iterator().next();
assertEquals(subStats.msgBacklog, 0);
}
@Test
public void testBrokerStatsMetrics() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";
BrokerStats brokerStatsClient = admin.brokerStats();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
Message<byte[]> msg = null;
for (int i = 0; i < 10; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
JsonArray metrics = brokerStatsClient.getMetrics();
// these metrics seem to be arriving in different order at different times...
// is the order really relevant here?
boolean namespaceDimensionFound = false;
boolean topicLoadTimesDimensionFound = false;
for (int i = 0; i < metrics.size(); i++) {
try {
String data = metrics.get(i).getAsJsonObject().get("dimensions").toString();
if (!namespaceDimensionFound && data.contains("prop/ns-abc")) {
namespaceDimensionFound = true;
}
if (!topicLoadTimesDimensionFound && data.contains("prop/ns-abc")) {
topicLoadTimesDimensionFound = true;
}
} catch (Exception e) {
/* it's possible there's no dimensions */ }
}
assertTrue(namespaceDimensionFound && topicLoadTimesDimensionFound);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
}
@Test
public void testBrokerServiceNamespaceStats() throws Exception {
// this test fails if there is state from other tests
resetState();
final int numBundles = 4;
final String ns1 = "prop/stats1";
final String ns2 = "prop/stats2";
List<String> nsList = Lists.newArrayList(ns1, ns2);
List<Producer<byte[]>> producerList = Lists.newArrayList();
BrokerStats brokerStatsClient = admin.brokerStats();
for (String ns : nsList) {
admin.namespaces().createNamespace(ns, numBundles);
admin.namespaces().setNamespaceReplicationClusters(ns, Sets.newHashSet("test"));
String topic1 = String.format("persistent://%s/topic1", ns);
producerList.add(pulsarClient.newProducer().topic(topic1).create());
String topic2 = String.format("persistent://%s/topic2", ns);
producerList.add(pulsarClient.newProducer().topic(topic2).create());
}
rolloverPerIntervalStats();
JsonObject topicStats = brokerStatsClient.getTopics();
assertEquals(topicStats.size(), 2, topicStats.toString());
for (String ns : nsList) {
JsonObject nsObject = topicStats.getAsJsonObject(ns);
List<String> topicList = admin.namespaces().getTopics(ns);
for (String topic : topicList) {
NamespaceBundle bundle = (NamespaceBundle) pulsar.getNamespaceService().getBundle(TopicName.get(topic));
JsonObject bundleObject = nsObject.getAsJsonObject(bundle.getBundleRange());
JsonObject topicObject = bundleObject.getAsJsonObject("persistent");
AtomicBoolean topicPresent = new AtomicBoolean();
topicObject.entrySet().iterator().forEachRemaining(persistentTopic -> {
if (persistentTopic.getKey().equals(topic)) {
topicPresent.set(true);
}
});
assertTrue(topicPresent.get());
}
}
for (Producer<?> producer : producerList) {
producer.close();
}
for (String ns : nsList) {
List<String> topics = admin.namespaces().getTopics(ns);
for (String dest : topics) {
admin.topics().delete(dest);
}
admin.namespaces().deleteNamespace(ns);
}
}
@Test
public void testTlsDisabled() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";
PulsarClient pulsarClient = null;
conf.setAuthenticationEnabled(false);
restartBroker();
// Case 1: Access without TLS
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
// Case 2: Access with TLS
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
fail("TLS connection should fail");
} catch (Exception e) {
assertTrue(e.getMessage().contains("ConnectException"));
} finally {
pulsarClient.close();
}
}
@Test
public void testTlsEnabled() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";
conf.setAuthenticationEnabled(false);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();
// Case 1: Access without TLS
PulsarClient pulsarClient = null;
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
// Case 2: Access with TLS (Allow insecure TLS connection)
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
// Case 3: Access with TLS (Disallow insecure TLS connection)
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.allowTlsInsecureConnection(false).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
fail("should fail");
} catch (Exception e) {
assertTrue(e.getMessage().contains("General OpenSslEngine problem"));
} finally {
pulsarClient.close();
}
// Case 4: Access with TLS (Use trusted certificates)
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH)
.statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
}
@SuppressWarnings("deprecation")
@Test
public void testTlsAuthAllowInsecure() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";
Authentication auth;
Set<String> providers = new HashSet<>();
providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
PulsarClient pulsarClient = null;
// Case 1: Access without client certificate
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
fail("should fail");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Unauthorized"));
} finally {
pulsarClient.close();
}
// Case 2: Access with client certificate
try {
auth = new AuthenticationTls();
auth.configure(authParams);
pulsarClient = PulsarClient.builder().authentication(auth).serviceUrl(brokerUrlTls.toString())
.enableTls(true).allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
}
@SuppressWarnings("deprecation")
@Test
public void testTlsAuthDisallowInsecure() throws Exception {
final String topicName = "persistent://prop/my-ns/newTopic";
final String subName = "newSub";
Authentication auth;
Set<String> providers = new HashSet<>();
providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
PulsarClient pulsarClient = null;
// Case 1: Access without client certificate
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
fail("should fail");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Unauthorized"));
} finally {
pulsarClient.close();
}
// Case 2: Access with client certificate
try {
auth = new AuthenticationTls();
auth.configure(authParams);
pulsarClient = PulsarClient.builder().authentication(auth).serviceUrl(brokerUrlTls.toString())
.enableTls(true).allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
fail("should fail");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Unauthorized"));
} finally {
pulsarClient.close();
}
}
@SuppressWarnings("deprecation")
@Test
public void testTlsAuthUseTrustCert() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
final String subName = "newSub";
Authentication auth;
Set<String> providers = new HashSet<>();
providers.add("org.apache.pulsar.broker.authentication.AuthenticationProviderTls");
conf.setAuthenticationEnabled(true);
conf.setAuthenticationProviders(providers);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
conf.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
PulsarClient pulsarClient = null;
// Case 1: Access without client certificate
try {
pulsarClient = PulsarClient.builder().serviceUrl(brokerUrlTls.toString()).enableTls(true)
.allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
fail("should fail");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Unauthorized"));
} finally {
pulsarClient.close();
}
// Case 2: Access with client certificate
try {
auth = new AuthenticationTls();
auth.configure(authParams);
pulsarClient = PulsarClient.builder().authentication(auth).serviceUrl(brokerUrlTls.toString())
.enableTls(true).allowTlsInsecureConnection(true).statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscribe();
} catch (Exception e) {
fail("should not fail");
} finally {
pulsarClient.close();
}
}
/**
* Verifies: client side throttling.
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByClient() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.maxConcurrentLookupRequests(1)
.maxLookupRequests(2)
.build();
// 2 lookup will success.
try {
CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync();
consumer1.get().close();
consumer2.get().close();
} catch (Exception e) {
fail("Subscribe should success with 2 requests");
}
// 3 lookup will fail
try {
CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync();
consumer1.get().close();
consumer2.get().close();
consumer3.get().close();
fail("It should fail as throttling should only receive 2 requests");
} catch (Exception e) {
if (!(e.getCause() instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
fail("Subscribe should fail with TooManyRequestsException");
}
}
}
@Test
public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {
final String namespace = "prop/disableBundle";
try {
admin.namespaces().createNamespace(namespace);
} catch (PulsarAdminException.ConflictException e) {
// Ok.. (if test fails intermittently and namespace is already created)
}
admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test"));
// own namespace bundle
final String topicName = "persistent://" + namespace + "/my-topic";
TopicName topic = TopicName.get(topicName);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();
// disable namespace-bundle
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(topic);
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, false).join();
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName, true);
try {
futureResult.get();
fail("Topic creation should fail due to disable bundle");
} catch (Exception e) {
if (!(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException)) {
fail("Topic creation should fail with ServiceUnitNotReadyException");
}
}
}
/**
* Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.
*
*/
@Test(timeOut = 3000)
public void testTopicFailureShouldNotHaveDeadLock() {
final String namespace = "prop/ns-abc";
final String deadLockTestTopic = "persistent://" + namespace + "/deadLockTestTopic";
// let this broker own this namespace bundle by creating a topic
try {
final String successfulTopic = "persistent://" + namespace + "/ownBundleTopic";
Producer<byte[]> producer = pulsarClient.newProducer().topic(successfulTopic).create();
producer.close();
} catch (Exception e) {
fail(e.getMessage());
}
ExecutorService executor = Executors.newSingleThreadExecutor();
BrokerService service = spy(pulsar.getBrokerService());
// create topic will fail to get managedLedgerConfig
CompletableFuture<ManagedLedgerConfig> failedManagedLedgerConfig = new CompletableFuture<>();
failedManagedLedgerConfig.completeExceptionally(new NullPointerException("failed to peristent policy"));
doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(any());
CompletableFuture<Void> topicCreation = new CompletableFuture<Void>();
// create topic async and wait on the future completion
executor.submit(() -> {
service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> {
topicCreation.completeExceptionally(e.getCause());
return null;
});
});
// future-result should be completed with exception
try {
topicCreation.get(1, TimeUnit.SECONDS);
} catch (TimeoutException | InterruptedException e) {
fail("there is a dead-lock and it should have been prevented");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NullPointerException);
} finally {
executor.shutdownNow();
}
}
@Test
public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
final String namespace = "prop/ns-abc";
final String deadLockTestTopic = "persistent://" + namespace + "/deadLockTestTopic";
// let this broker own this namespace bundle by creating a topic
try {
final String successfulTopic = "persistent://" + namespace + "/ownBundleTopic";
Producer<byte[]> producer = pulsarClient.newProducer().topic(successfulTopic).create();
producer.close();
} catch (Exception e) {
fail(e.getMessage());
}
ExecutorService executor = Executors.newSingleThreadExecutor();
BrokerService service = spy(pulsar.getBrokerService());
// create topic will fail to get managedLedgerConfig
CompletableFuture<ManagedLedgerConfig> failedManagedLedgerConfig = new CompletableFuture<>();
failedManagedLedgerConfig.complete(new ManagedLedgerConfig());
doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(any());
CompletableFuture<Void> topicCreation = new CompletableFuture<Void>();
// fail managed-ledger future
Field ledgerField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
ledgerField.setAccessible(true);
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgerField
.get(pulsar.getManagedLedgerFactory());
CompletableFuture<ManagedLedgerImpl> future = new CompletableFuture<>();
future.completeExceptionally(new ManagedLedgerException("ledger opening failed"));
ledgers.put(namespace + "/persistent/deadLockTestTopic", future);
// create topic async and wait on the future completion
executor.submit(() -> {
service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> {
topicCreation.completeExceptionally(e.getCause());
return null;
});
});
// future-result should be completed with exception
try {
topicCreation.get(1, TimeUnit.SECONDS);
} catch (TimeoutException | InterruptedException e) {
fail("there is a dead-lock and it should have been prevented");
} catch (ExecutionException e) {
assertEquals(e.getCause().getClass(), PersistenceException.class);
} finally {
executor.shutdownNow();
ledgers.clear();
}
}
/**
* It verifies that policiesCache() copies global-policy data into local-policy data and returns combined result
*
* @throws Exception
*/
@Test
public void testCreateNamespacePolicy() throws Exception {
final String namespace = "prop/testPolicy";
final int totalBundle = 3;
System.err.println("----------------");
admin.namespaces().createNamespace(namespace, new BundlesData(totalBundle));
String globalPath = joinPath(LOCAL_POLICIES_ROOT, namespace);
pulsar.getLocalZkCacheService().policiesCache().clear();
Optional<LocalPolicies> policy = pulsar.getLocalZkCacheService().policiesCache().get(globalPath);
assertTrue(policy.isPresent());
assertEquals(policy.get().bundles.numBundles, totalBundle);
}
/**
* It verifies that unloading bundle gracefully closes managed-ledger before removing ownership to avoid bad-zk
* version.
*
* @throws Exception
*/
@Test
public void testStuckTopicUnloading() throws Exception {
final String namespace = "prop/ns-abc";
final String topicName = "persistent://" + namespace + "/unoadTopic";
final String topicMlName = namespace + "/persistent/unoadTopic";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscribe();
consumer.close();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName).sendTimeout(5,
TimeUnit.SECONDS);
Producer<byte[]> producer = producerBuilder.create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerClientFactory()
.getManagedLedgerFactory();
Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
ledgersField.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
.get(mlFactory);
assertNotNull(ledgers.get(topicMlName));
org.apache.pulsar.broker.service.Producer prod = (org.apache.pulsar.broker.service.Producer) spy(topic.producers.values().toArray()[0]);
topic.producers.clear();
topic.producers.put(prod.getProducerName(), prod);
CompletableFuture<Void> waitFuture = new CompletableFuture<Void>();
doReturn(waitFuture).when(prod).disconnect();
Set<NamespaceBundle> bundles = pulsar.getNamespaceService().getOwnedServiceUnits();
for (NamespaceBundle bundle : bundles) {
String ns = bundle.getNamespaceObject().toString();
System.out.println();
if (namespace.equals(ns)) {
pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 2, TimeUnit.SECONDS);
}
}
assertNull(ledgers.get(topicMlName));
}
@Test
public void testMetricsProvider() throws IOException {
PrometheusRawMetricsProvider rawMetricsProvider = new PrometheusRawMetricsProvider() {
@Override
public void generate(SimpleTextOutputStream stream) {
stream.write("test_metrics{label1=\"xyz\"} 10 \n");
}
};
getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
HttpClient httpClient = HttpClientBuilder.create().build();
final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
InputStream inputStream = response.getEntity().getContent();
InputStreamReader isReader = new InputStreamReader(inputStream);
BufferedReader reader = new BufferedReader(isReader);
StringBuffer sb = new StringBuffer();
String str;
while((str = reader.readLine()) != null){
sb.append(str);
}
Assert.assertTrue(sb.toString().contains("test_metrics"));
}
@Test
public void shouldNotPreventCreatingTopicWhenNonexistingTopicIsCached() throws Exception {
// run multiple iterations to increase the chance of reproducing a race condition in the topic cache
for (int i = 0; i < 100; i++) {
final String topicName = "persistent://prop/ns-abc/topic-caching-test-topic" + i;
CountDownLatch latch = new CountDownLatch(1);
Thread getStatsThread = new Thread(() -> {
try {
latch.countDown();
// create race condition with a short delay
// the bug might not reproduce in all environments, this works at least on i7-10750H CPU
Thread.sleep(1);
admin.topics().getStats(topicName);
fail("The topic should not exist yet.");
} catch (PulsarAdminException.NotFoundException e) {
// expected exception
} catch (PulsarAdminException | InterruptedException e) {
log.error("Exception in {}", Thread.currentThread().getName(), e);
}
}, "getStatsThread#" + i);
getStatsThread.start();
latch.await();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
assertNotNull(producer);
getStatsThread.join();
}
}
}