Fix bug that consumer which specify incorrect subscription hangs up w… (#1256)

* Fix bug that consumer which specify incorrect subscription hangs up when subscription_auth_mode is Prefix

* Add test for subscription prefix authorization
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index b50eee6..bdcc6e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -670,6 +670,11 @@
                         ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
                     }
                     return null;
+                }).exceptionally(e -> {
+                    String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
+                    log.warn(msg);
+                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
+                    return null;
                 });
             } else {
                 final String msg = "Proxy Client is not authorized to subscribe";
@@ -856,6 +861,11 @@
                         ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
                     }
                     return null;
+                }).exceptionally(e -> {
+                    String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
+                    log.warn(msg);
+                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
+                    return null;
                 });
             } else {
                 final String msg = "Proxy Client is not authorized to Produce";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 4f4fcd6..465cc6d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
@@ -29,6 +30,7 @@
 
 import javax.naming.AuthenticationException;
 
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -148,6 +150,46 @@
     }
 
     @Test
+    public void testSubscriptionPrefixAuthorization() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
+        setup();
+
+        ClientConfiguration adminConf = new ClientConfiguration();
+        Authentication adminAuthentication = new ClientAuthentication("superUser");
+        adminConf.setAuthentication(adminAuthentication);
+        admin = spy(new PulsarAdmin(brokerUrl, adminConf));
+
+        String lookupUrl;
+        lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+        ClientConfiguration clientConfValid = new ClientConfiguration();
+        Authentication authentication = new ClientAuthentication(clientRole);
+        clientConfValid.setAuthentication(authentication);
+
+        pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
+
+        admin.properties().createProperty("prop-prefix",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("prop-prefix/use/ns");
+
+        // (1) Valid subscription name will be approved by authorization service
+        Consumer consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", clientRole + "-sub1");
+        consumer.close();
+
+        // (2) InValid subscription name will be rejected by authorization service
+        try {
+            consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", "sub1");
+            Assert.fail("should have failed with authorization error");
+        } catch (PulsarClientException.AuthorizationException pa) {
+            // Ok
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
     public void testGrantPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -337,6 +379,24 @@
         }
     }
 
+    public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData, String subscription) {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            if (isNotBlank(subscription)) {
+                if (!subscription.startsWith(role)) {
+                    future.completeExceptionally(new PulsarServerException(
+                            "The subscription name needs to be prefixed by the authentication role"));
+                }
+            }
+            future.complete(clientRole.equals(role));
+            return future;
+        }
+
+    }
+
     public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
 
         private Set<String> grantRoles = Sets.newHashSet();