Fix bug that consumer which specify incorrect subscription hangs up w… (#1256)
* Fix bug that consumer which specify incorrect subscription hangs up when subscription_auth_mode is Prefix
* Add test for subscription prefix authorization
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b50eee6..bdcc6e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -670,6 +670,11 @@
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
+ }).exceptionally(e -> {
+ String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
+ log.warn(msg);
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
+ return null;
});
} else {
final String msg = "Proxy Client is not authorized to subscribe";
@@ -856,6 +861,11 @@
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
+ }).exceptionally(e -> {
+ String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
+ log.warn(msg);
+ ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
+ return null;
});
} else {
final String msg = "Proxy Client is not authorized to Produce";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 4f4fcd6..465cc6d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
import java.io.IOException;
@@ -29,6 +30,7 @@
import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -148,6 +150,46 @@
}
@Test
+ public void testSubscriptionPrefixAuthorization() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
+ setup();
+
+ ClientConfiguration adminConf = new ClientConfiguration();
+ Authentication adminAuthentication = new ClientAuthentication("superUser");
+ adminConf.setAuthentication(adminAuthentication);
+ admin = spy(new PulsarAdmin(brokerUrl, adminConf));
+
+ String lookupUrl;
+ lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+ ClientConfiguration clientConfValid = new ClientConfiguration();
+ Authentication authentication = new ClientAuthentication(clientRole);
+ clientConfValid.setAuthentication(authentication);
+
+ pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
+
+ admin.properties().createProperty("prop-prefix",
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+ admin.namespaces().createNamespace("prop-prefix/use/ns");
+
+ // (1) Valid subscription name will be approved by authorization service
+ Consumer consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", clientRole + "-sub1");
+ consumer.close();
+
+ // (2) InValid subscription name will be rejected by authorization service
+ try {
+ consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", "sub1");
+ Assert.fail("should have failed with authorization error");
+ } catch (PulsarClientException.AuthorizationException pa) {
+ // Ok
+ }
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test
public void testGrantPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -337,6 +379,24 @@
}
}
+ public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
+
+ @Override
+ public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+ AuthenticationDataSource authenticationData, String subscription) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ if (isNotBlank(subscription)) {
+ if (!subscription.startsWith(role)) {
+ future.completeExceptionally(new PulsarServerException(
+ "The subscription name needs to be prefixed by the authentication role"));
+ }
+ }
+ future.complete(clientRole.equals(role));
+ return future;
+ }
+
+ }
+
public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
private Set<String> grantRoles = Sets.newHashSet();