Add NPE check for PulsarService#getAdminClient (#9782) (#9746)
### Motivation
If user set incorrect `brokerClientTlsEnabled` config, the `PulsarService#getAdminClient` would throw NPE and the error logs is not clear. For example, start a standalone pulsar with `brokerClientTlsEnabled=true`, some admin APIs that don't involve `PulsarService#getAdminClient` work well, however some admin APIs like `GET /admin/v2/non-persistent/:tenant/:namespace` will throw NPE with following logs:
```
org.apache.pulsar.broker.PulsarServerException: java.lang.NullPointerException
at org.apache.pulsar.broker.PulsarService.getAdminClient(PulsarService.java:1193)
at org.apache.pulsar.broker.admin.v2.NonPersistentTopics.getList(NonPersistentTopics.java:273)
```
After this PR, the logs became:
```
org.apache.pulsar.broker.PulsarServerException: java.lang.IllegalArgumentException: adminApiUrl is null, isBrokerClientTlsEnabled: true, webServiceAddressTls: null, webServiceAddress: http://localhost:8080
```
### Modifications
- Check if `adminApiUrl` is null in `PulsarService#getAdminClient` and give a human readable error message.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is a trivial rework / code cleanup without any test coverage.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 541d05e..324da50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -481,6 +481,11 @@
throw new IllegalArgumentException("brokerServicePort/brokerServicePortTls must be present");
}
+ if (config.isAuthorizationEnabled() && !config.isAuthenticationEnabled()) {
+ throw new IllegalStateException("Invalid broker configuration. Authentication must be enabled with "
+ + "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
+ }
+
localMetadataStore = createLocalMetadataStore();
coordinationService = new CoordinationServiceImpl(localMetadataStore);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 6f1ce05..3418218 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -123,12 +123,12 @@
public AdminTest() {
super();
- conf.setClusterName(configClusterName);
}
@Override
@BeforeMethod
public void setup() throws Exception {
+ conf.setClusterName(configClusterName);
super.internalSetup();
configurationCache = pulsar.getConfigurationCache();
@@ -194,6 +194,7 @@
@AfterMethod(alwaysRun = true)
public void cleanup() throws Exception {
super.internalCleanup();
+ conf.setClusterName(configClusterName);
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 19135e5..3dea0ba 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -126,7 +126,6 @@
public NamespacesTest() {
super();
- conf.setClusterName(testLocalCluster);
}
@BeforeClass
@@ -149,6 +148,7 @@
@Override
@BeforeMethod
public void setup() throws Exception {
+ conf.setClusterName(testLocalCluster);
super.internalSetup();
namespaces = spy(new Namespaces());
@@ -191,6 +191,7 @@
@AfterMethod(alwaysRun = true)
public void cleanup() throws Exception {
super.internalCleanup();
+ conf.setClusterName(testLocalCluster);
}
@Test
@@ -1439,7 +1440,7 @@
@Test
public void testMaxTopicsPerNamespace() throws Exception {
- super.internalCleanup();
+ cleanup();
conf.setMaxTopicsPerNamespace(15);
super.internalSetup();
@@ -1488,7 +1489,7 @@
// check producer/consumer auto create partitioned topic
- super.internalCleanup();
+ cleanup();
conf.setMaxTopicsPerNamespace(0);
conf.setDefaultNumPartitions(3);
conf.setAllowAutoTopicCreationType("partitioned");
@@ -1518,7 +1519,7 @@
}
// check producer/consumer auto create non-partitioned topic
- super.internalCleanup();
+ cleanup();
conf.setMaxTopicsPerNamespace(0);
conf.setDefaultNumPartitions(1);
conf.setAllowAutoTopicCreationType("non-partitioned");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
index 6522a7a..7ee3b09 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
@@ -125,9 +125,9 @@
@Test
public void testTopicPolicyDisabled() throws Exception {
+ super.internalCleanup();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(false);
- super.internalCleanup();
super.internalSetup();
admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index cda308e..45ba7c2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -25,6 +25,7 @@
import java.util.EnumSet;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -46,12 +47,20 @@
@Override
public void setup() throws Exception {
conf.setClusterName("c1");
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthenticationProviders(
+ Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider"));
conf.setAuthorizationEnabled(true);
conf.setAuthorizationAllowWildcardsMatching(true);
- conf.setSuperUserRoles(Sets.newHashSet("pulsar.super_user"));
+ conf.setSuperUserRoles(Sets.newHashSet("pulsar.super_user", "pass.pass"));
internalSetup();
}
+ @Override
+ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+ pulsarAdminBuilder.authentication(new MockAuthentication("pass.pass"));
+ }
+
@AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/InvalidBrokerConfigForAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/InvalidBrokerConfigForAuthorizationTest.java
new file mode 100644
index 0000000..4192228
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/InvalidBrokerConfigForAuthorizationTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.auth;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.testng.annotations.Test;
+
+public class InvalidBrokerConfigForAuthorizationTest extends MockedPulsarServiceBaseTest {
+
+ @Test
+ void startupShouldFailWhenAuthorizationIsEnabledWithoutAuthentication() throws Exception {
+ conf.setAuthorizationEnabled(true);
+ conf.setAuthenticationEnabled(false);
+ try {
+ internalSetup();
+ fail("An exception should have been thrown");
+ } catch (Exception e) {
+ assertEquals(e.getClass(), PulsarServerException.class);
+ assertEquals(e.getCause().getClass(), IllegalStateException.class);
+ assertEquals(e.getCause().getMessage(), "Invalid broker configuration. Authentication must be "
+ + "enabled with authenticationEnabled=true when authorization is enabled with "
+ + "authorizationEnabled=true.");
+ }
+ }
+
+ @Override
+ protected void setup() throws Exception {
+
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 6198e65..7b52ad2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -49,6 +49,7 @@
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
@@ -188,6 +189,7 @@
stopBroker();
pulsar = null;
}
+ resetConfig();
if (mockBookKeeper != null) {
mockBookKeeper.reallyShutdown();
mockBookKeeper = null;
@@ -220,7 +222,6 @@
}
bkExecutor = null;
}
-
}
protected abstract void setup() throws Exception;
@@ -253,16 +254,18 @@
if (admin != null) {
admin.close();
}
- admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build());
+ PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString());
+ customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
+ admin = spy(pulsarAdminBuilder.build());
+ }
+
+ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+
}
protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {
- boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
- // enable authorization to initialize authorization service which is used by grant-permission
- conf.setAuthorizationEnabled(true);
PulsarService pulsar = startBrokerWithoutAuthorization(conf);
- conf.setAuthorizationEnabled(isAuthorizationEnabled);
return pulsar;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index aaf3040..568d221 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -911,9 +911,9 @@
int topicMessageTTLSecs = 2;
String namespaceName = "prop/expiry-check-2";
+ cleanup();
this.conf.setSystemTopicEnabled(true);
this.conf.setTopicLevelPoliciesEnabled(true);
- cleanup();
setup();
admin.namespaces().createNamespace(namespaceName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index f835643..a303be9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -55,7 +55,6 @@
@Override
public void cleanup() throws Exception {
super.internalCleanup();
- ;
}
@Test(timeOut = testTimeout)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index 4a37d63..b6117aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -224,6 +224,7 @@
@Test(timeOut = 30000)
public void testTopicPolicyTakeSnapshot() throws Exception {
+ super.internalCleanup();
resetConfig();
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
@@ -231,7 +232,6 @@
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(7);
conf.setBrokerDeduplicationEntriesInterval(20000);
- super.internalCleanup();
super.internalSetup();
super.producerBaseSetup();
@@ -335,12 +335,12 @@
}
private void testTakeSnapshot(boolean enabledSnapshot) throws Exception {
+ super.internalCleanup();
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(enabledSnapshot ? 1 : 0);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
conf.setBrokerDeduplicationEntriesInterval(20000);
- super.internalCleanup();
super.internalSetup();
super.producerBaseSetup();
@@ -412,12 +412,12 @@
@Test(timeOut = 30000)
public void testNamespacePolicyTakeSnapshot() throws Exception {
+ super.internalCleanup();
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(3);
conf.setBrokerDeduplicationEntriesInterval(20000);
- super.internalCleanup();
super.internalSetup();
super.producerBaseSetup();
@@ -464,12 +464,12 @@
@Test(timeOut = 30000)
public void testDisableNamespacePolicyTakeSnapshot() throws Exception {
+ super.internalCleanup();
resetConfig();
conf.setBrokerDeduplicationEnabled(true);
conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
conf.setBrokerDeduplicationEntriesInterval(20000);
- super.internalCleanup();
super.internalSetup();
super.producerBaseSetup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 1383bc3..b5603a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -55,9 +55,9 @@
@BeforeMethod
@Override
protected void setup() throws Exception {
+ this.conf.setClusterName("test");
super.internalSetup();
super.producerBaseSetup();
- this.conf.setClusterName("test");
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
index 5483a81..6de954f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
@@ -48,11 +48,11 @@
@BeforeMethod
@Override
protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
this.conf.setClusterName("test");
this.conf.setTopicPublisherThrottlingTickTimeMillis(1);
this.conf.setBrokerPublisherThrottlingTickTimeMillis(1);
+ super.internalSetup();
+ super.producerBaseSetup();
}
@AfterMethod(alwaysRun = true)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 16136c0..0e97ca8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -163,6 +163,11 @@
}
public void start() throws Exception {
+ if (proxyConfig.isAuthorizationEnabled() && !proxyConfig.isAuthenticationEnabled()) {
+ throw new IllegalStateException("Invalid proxy configuration. Authentication must be enabled with "
+ + "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true.");
+ }
+
if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getConfigurationStoreServers())) {
localMetadataStore = createLocalMetadataStore();
configMetadataStore = createConfigurationMetadataStore();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
new file mode 100644
index 0000000..20532ea
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+public class InvalidProxyConfigForAuthorizationTest {
+
+ @Test
+ void startupShouldFailWhenAuthorizationIsEnabledWithoutAuthentication() throws Exception {
+ ProxyConfiguration proxyConfiguration = new ProxyConfiguration();
+ proxyConfiguration.setAuthorizationEnabled(true);
+ proxyConfiguration.setAuthenticationEnabled(false);
+ try (ProxyService proxyService = new ProxyService(proxyConfiguration,
+ Mockito.mock(AuthenticationService.class))) {
+ proxyService.start();
+ fail("An exception should have been thrown");
+ } catch (Exception e) {
+ assertEquals(e.getClass(), IllegalStateException.class);
+ assertEquals(e.getMessage(), "Invalid proxy configuration. Authentication must be "
+ + "enabled with authenticationEnabled=true when authorization is enabled with "
+ + "authorizationEnabled=true.");
+ }
+ }
+}