/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@SuppressWarnings("unchecked")
@Test(groups = "broker")
public class ServerCnxTest {
    protected EmbeddedChannel channel;
    private ServiceConfiguration svcConfig;
    private ServerCnx serverCnx;
    protected BrokerService brokerService;
    private ManagedLedgerFactory mlFactoryMock;
    private ClientChannelHelper clientChannelHelper;
    private PulsarService pulsar;
    private MetadataStoreExtended store;
    private ConfigurationCacheService configCacheService;
    private NamespaceResources namespaceResources;
    protected NamespaceService namespaceService;
    private final int currentProtocolVersion = ProtocolVersion.values()[ProtocolVersion.values().length - 1]
            .getValue();

    protected final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
    private final String failTopicName = "persistent://prop/use/ns-abc/failTopic";
    private final String nonOwnedTopicName = "persistent://prop/use/ns-abc/success-not-owned-topic";
    private final String encryptionRequiredTopicName = "persistent://prop/use/ns-abc/successEncryptionRequiredTopic";
    private final String successSubName = "successSub";
    private final String nonExistentTopicName = "persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
    private final String topicWithNonLocalCluster = "persistent://prop/usw/ns-abc/successTopic";

    private final ManagedLedger ledgerMock = mock(ManagedLedger.class);
    private final ManagedCursor cursorMock = mock(ManagedCursor.class);
    private OrderedExecutor executor;
    private EventLoopGroup eventLoopGroup;

    @BeforeMethod
    public void setup() throws Exception {
        eventLoopGroup = new NioEventLoopGroup();
        executor = OrderedExecutor.newBuilder().numThreads(1).build();
        svcConfig = spy(ServiceConfiguration.class);
        svcConfig.setBrokerShutdownTimeoutMs(0L);
        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
        doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();

        svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
        svcConfig.setBacklogQuotaCheckEnabled(false);
        doReturn(svcConfig).when(pulsar).getConfiguration();
        doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();

        doReturn("use").when(svcConfig).getClusterName();

        mlFactoryMock = mock(ManagedLedgerFactory.class);
        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();

        ZooKeeper mockZk = createMockZooKeeper();
        doReturn(createMockBookKeeper(executor))
            .when(pulsar).getBookKeeperClient();

        store = new ZKMetadataStore(mockZk);

        doReturn(store).when(pulsar).getLocalMetadataStore();
        doReturn(store).when(pulsar).getConfigurationMetadataStore();

        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
        BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
        doReturn(interceptor).when(brokerService).getInterceptor();
        doReturn(brokerService).when(pulsar).getBrokerService();
        doReturn(executor).when(pulsar).getOrderedExecutor();

        PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
        namespaceResources = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30);
        doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
        doReturn(pulsarResources).when(pulsar).getPulsarResources();

        namespaceService = mock(NamespaceService.class);
        doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any());
        doReturn(namespaceService).when(pulsar).getNamespaceService();
        doReturn(true).when(namespaceService).isServiceUnitOwned(any());
        doReturn(true).when(namespaceService).isServiceUnitActive(any());
        doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
        doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());

        setupMLAsyncCallbackMocks();

        clientChannelHelper = new ClientChannelHelper();
    }

    private int inSec(int time, TimeUnit unit) {
        return (int) TimeUnit.SECONDS.convert(time, unit);
    }

    @AfterMethod(alwaysRun = true)
    public void teardown() throws Exception {
        serverCnx.close();
        channel.close();
        pulsar.close();
        brokerService.close();
        executor.shutdownNow();
        eventLoopGroup.shutdownGracefully().get();
        store.close();
    }

    @Test(timeOut = 30000)
    public void testConnectCommand() throws Exception {
        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // test server response to CONNECT
        ByteBuf clientCommand = Commands.newConnect("none", "", null);
        channel.writeInbound(clientCommand);

        assertEquals(serverCnx.getState(), State.Connected);
        assertTrue(getResponse() instanceof CommandConnected);
        channel.finish();
    }

    private static ByteBuf newConnect(AuthMethod authMethod, String authData, int protocolVersion) {
        BaseCommand cmd = new BaseCommand().setType(Type.CONNECT);
        cmd.setConnect()
                .setClientVersion("Pulsar Client")
                .setAuthMethod(authMethod)
                .setAuthData(authData.getBytes(StandardCharsets.UTF_8))
                .setProtocolVersion(protocolVersion);
        return Commands.serializeWithSize(cmd);
    }

    /**
     * Ensure that old clients may still connect to new servers
     *
     * @throws Exception
     */
    @Test(timeOut = 30000)
    public void testConnectCommandWithEnum() throws Exception {
        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // test server response to CONNECT
        ByteBuf clientCommand = newConnect(AuthMethod.AuthMethodNone, "", Commands.getCurrentProtocolVersion());
        channel.writeInbound(clientCommand);

        assertEquals(serverCnx.getState(), State.Connected);
        assertTrue(getResponse() instanceof CommandConnected);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithProtocolVersion() throws Exception {
        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // test server response to CONNECT
        ByteBuf clientCommand = Commands.newConnect("none", "", null);
        channel.writeInbound(clientCommand);

        assertEquals(serverCnx.getState(), State.Connected);
        CommandConnected response = (CommandConnected) getResponse();
        assertEquals(response.getProtocolVersion(), currentProtocolVersion);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testKeepAlive() throws Exception {
        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // test server response to CONNECT
        ByteBuf clientCommand = Commands.newConnect("none", "", null);
        channel.writeInbound(clientCommand);

        assertEquals(serverCnx.getState(), State.Connected);
        CommandConnected response = (CommandConnected) getResponse();
        assertEquals(response.getProtocolVersion(), currentProtocolVersion);

        // Connection will be closed in 2 seconds, in the meantime give chance to run the cleanup logic
        for (int i = 0; i < 3; i++) {
            channel.runPendingTasks();
            Thread.sleep(1000);
        }

        assertFalse(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testKeepAliveNotEnforcedWithOlderClients() throws Exception {
        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // test server response to CONNECT
        ByteBuf clientCommand = Commands.newConnect("none", "", ProtocolVersion.v0.getValue(), null, null, null, null, null);
        channel.writeInbound(clientCommand);

        assertEquals(serverCnx.getState(), State.Connected);
        CommandConnected response = (CommandConnected) getResponse();
        // Server is responding with same version as client
        assertEquals(response.getProtocolVersion(), ProtocolVersion.v0.getValue());

        // Connection will *not* be closed in 2 seconds
        for (int i = 0; i < 3; i++) {
            channel.runPendingTasks();
            Thread.sleep(1000);
        }
        assertTrue(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testKeepAliveBeforeHandshake() throws Exception {
        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // test server doesn't received a CONNECT command and should close the connection after timeout

        // Connection will be closed in 2 seconds, in the meantime give chance to run the cleanup logic
        for (int i = 0; i < 3; i++) {
            channel.runPendingTasks();
            Thread.sleep(1000);
        }

        assertFalse(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithAuthenticationPositive() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // test server response to CONNECT
        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", null);
        channel.writeInbound(clientCommand);

        assertTrue(getResponse() instanceof CommandConnected);
        assertEquals(serverCnx.getState(), State.Connected);
        assertEquals(serverCnx.getPrincipal(), "pass.client");
        assertTrue(serverCnx.isActive());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithoutOriginalAuthInfoWhenAuthenticateOriginalAuthData() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(true);

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", "");
        channel.writeInbound(clientCommand);

        Object response1 = getResponse();
        assertTrue(response1 instanceof CommandConnected);
        assertEquals(serverCnx.getState(), State.Connected);
        assertEquals(serverCnx.getAuthRole(), "pass.client");
        assertEquals(serverCnx.getPrincipal(), "pass.client");
        assertNull(serverCnx.getOriginalPrincipal());
        assertTrue(serverCnx.isActive());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithPassingOriginalAuthData() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(true);
        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
                null, "client", "pass.client", authMethodName);
        channel.writeInbound(clientCommand);

        Object response1 = getResponse();
        assertTrue(response1 instanceof CommandConnected);
        assertEquals(serverCnx.getState(), State.Connected);
        // Note that this value will change to the client's data if the broker sends an AuthChallenge to the
        // proxy/client. Details described here https://github.com/apache/pulsar/issues/19332.
        assertEquals(serverCnx.getAuthRole(), "pass.proxy");
        // These are all taken without verifying the auth data
        assertEquals(serverCnx.getPrincipal(), "pass.client");
        assertEquals(serverCnx.getOriginalPrincipal(), "pass.client");
        assertTrue(serverCnx.isActive());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithPassingOriginalPrincipal() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(false);
        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
                null, "client", "pass.client", authMethodName);
        channel.writeInbound(clientCommand);

        Object response1 = getResponse();
        assertTrue(response1 instanceof CommandConnected);
        assertEquals(serverCnx.getState(), State.Connected);
        assertEquals(serverCnx.getAuthRole(), "pass.proxy");
        // These are all taken without verifying the auth data
        assertEquals(serverCnx.getPrincipal(), "client");
        assertEquals(serverCnx.getOriginalPrincipal(), "client");
        assertTrue(serverCnx.isActive());
        channel.finish();
    }

    public void testAuthChallengePrincipalChangeFails() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticationRefreshCheckSeconds(1);

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);
        // Don't want the keep alive task affecting which messages are handled
        serverCnx.cancelKeepAliveTask();

        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", "");
        channel.writeInbound(clientCommand);

        Object responseConnected = getResponse();
        assertTrue(responseConnected instanceof CommandConnected);
        assertEquals(serverCnx.getState(), State.Connected);
        assertEquals(serverCnx.getPrincipal(), "pass.client");
        assertTrue(serverCnx.isActive());

        // Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation)
        // and then force channel to run the task
        while (channel.outboundMessages().isEmpty()) {
            Thread.sleep(100);
            channel.runPendingTasks();
        }
        Object responseAuthChallenge1 = getResponse();
        assertTrue(responseAuthChallenge1 instanceof CommandAuthChallenge);

        // Respond with valid info that will both pass and be the same
        ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client".getBytes()), 1, "");
        channel.writeInbound(authResponse1);

        // Trigger the ServerCnx to check if authentication is expired again
        while (channel.outboundMessages().isEmpty()) {
            Thread.sleep(100);
            channel.runPendingTasks();
        }
        Object responseAuthChallenge2 = getResponse();
        assertTrue(responseAuthChallenge2 instanceof CommandAuthChallenge);

        // Respond with invalid info that will pass but have a different authRole
        ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client2".getBytes()), 1, "");
        channel.writeInbound(authResponse2);

        // Expect the connection to disconnect
        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());

        channel.finish();
    }

    public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(true);
        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));
        svcConfig.setAuthenticationRefreshCheckSeconds(1);

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);
        // Don't want the keep alive task affecting which messages are handled
        serverCnx.cancelKeepAliveTask();

        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null,
                null, "pass.client", "pass.client", authMethodName);
        channel.writeInbound(clientCommand);

        Object responseConnected = getResponse();
        assertTrue(responseConnected instanceof CommandConnected);
        assertEquals(serverCnx.getState(), State.Connected);
        assertEquals(serverCnx.getAuthRole(), "pass.proxy");
        // These are all taken without verifying the auth data
        assertEquals(serverCnx.getPrincipal(), "pass.client");
        assertEquals(serverCnx.getOriginalPrincipal(), "pass.client");
        assertTrue(serverCnx.isActive());

        // Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation)
        // and then force channel to run the task
        while (channel.outboundMessages().isEmpty()) {
            Thread.sleep(100);
            channel.runPendingTasks();
        }
        Object responseAuthChallenge1 = getResponse();
        assertTrue(responseAuthChallenge1 instanceof CommandAuthChallenge);

        // Respond with valid info that will both pass and be the same
        ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client".getBytes()), 1, "");
        channel.writeInbound(authResponse1);

        // Trigger the ServerCnx to check if authentication is expired again
        while (channel.outboundMessages().isEmpty()) {
            Thread.sleep(100);
            channel.runPendingTasks();
        }
        Object responseAuthChallenge2 = getResponse();
        assertTrue(responseAuthChallenge2 instanceof CommandAuthChallenge);

        // Respond with invalid info that will pass but have a different authRole
        ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client2".getBytes()), 1, "");
        channel.writeInbound(authResponse2);

        // Expect the connection to disconnect
        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());

        channel.finish();
    }

    // This test is different in branch-2.11 and older because the behavior changes after branch-2.11.
    // See https://github.com/apache/pulsar/pull/19830 for additional information.
    @Test(timeOut = 30000)
    public void testConnectCommandWithDifferentRoleCombinations() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(false);
        svcConfig.setAuthorizationEnabled(true);
        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

        // Invalid combinations where authData is proxy role
        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "pass.proxy", false);
        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "", false);
        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", null, false);
        // Only considered valid because there is no requirement for that only a proxy role can pass
        // an original principal
        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.proxy", true);
        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client1", "pass.client", true);
        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.client", true);
        verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.client1", true);
    }

    private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, String authData,
                                                            String originalPrincipal,
                                                            boolean shouldPass) throws Exception {
        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        ByteBuf clientCommand = Commands.newConnect(authMethodName, authData, 1,null,
                null, originalPrincipal, null, null);
        channel.writeInbound(clientCommand);

        Object response = getResponse();
        if (shouldPass) {
            assertTrue(response instanceof CommandConnected);
            assertEquals(serverCnx.getState(), State.Connected);
            assertTrue(serverCnx.isActive());
        } else {
            assertTrue(response instanceof CommandError);
            assertEquals(((CommandError) response).getError(), ServerError.AuthorizationError);
            assertEquals(serverCnx.getState(), State.Failed);
        }
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithAuthenticationNegative() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        doReturn(authenticationService).when(brokerService).getAuthenticationService();
        doReturn(Optional.empty()).when(authenticationService).getAnonymousUserRole();
        doReturn(true).when(brokerService).isAuthenticationEnabled();

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // test server response to CONNECT
        ByteBuf clientCommand = Commands.newConnect("none", "", null);
        channel.writeInbound(clientCommand);

        assertEquals(serverCnx.getState(), State.Failed);
        assertTrue(getResponse() instanceof CommandError);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testConnectCommandWithFailingOriginalAuthData() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(true);
        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null,
                null, "client", "fail", authMethodName);
        channel.writeInbound(clientCommand);

        Object response1 = getResponse();
        assertTrue(response1 instanceof CommandError);
        assertEquals(((CommandError) response1).getMessage(), "Unable to authenticate");
        assertEquals(serverCnx.getState(), State.Failed);
        assertFalse(serverCnx.isActive());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testAuthResponseWithFailingAuthData() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // Trigger connect command to result in AuthChallenge
        ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1");
        channel.writeInbound(clientCommand);

        Object challenge1 = getResponse();
        assertTrue(challenge1 instanceof CommandAuthChallenge);
        assertEquals(serverCnx.getState(), State.Connecting);

        // Trigger another AuthChallenge to verify that code path continues to challenge
        ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1");
        channel.writeInbound(authResponse1);

        Object challenge2 = getResponse();
        assertTrue(challenge2 instanceof CommandAuthChallenge);
        assertEquals(serverCnx.getState(), State.Connecting);

        // Trigger failure
        ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1");
        channel.writeInbound(authResponse2);

        Object response3 = getResponse();
        assertTrue(response3 instanceof CommandError);
        assertEquals(((CommandError) response3).getMessage(), "Do not pass");
        assertEquals(serverCnx.getState(), State.Failed);
        assertFalse(serverCnx.isActive());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testOriginalAuthDataTriggersAuthChallengeFailure() throws Exception {
        // Test verifies the current behavior in the absence of a solution for
        // https://github.com/apache/pulsar/issues/19291. When that issue is completed, we can update this test
        // to correctly verify that behavior.
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();

        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(true);
        svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // Trigger connect command to result in AuthChallenge
        ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, "1",
                "localhost", "client", "challenge.client", authMethodName);
        channel.writeInbound(clientCommand);

        Object response = getResponse();
        assertTrue(response instanceof CommandError);

        assertEquals(((CommandError) response).getMessage(), "Unable to authenticate");
        assertEquals(serverCnx.getState(), State.Failed);
        assertFalse(serverCnx.isActive());
        channel.finish();
    }

    // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking
    // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary
    // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the
    // AuthorizationService.
    @Test
    public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();
        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(true);
        svcConfig.setProxyRoles(Collections.singleton("pass.pass"));

        svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
        AuthorizationService authorizationService =
                spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig,
                        pulsar.getPulsarResources());
        when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
        svcConfig.setAuthorizationEnabled(true);

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // Connect
        // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider
        // to pass authentication and fail authorization
        String proxyRole = "pass.pass";
        String clientRole = "pass.fail";
        // Submit a failing originalPrincipal to show that it is not used at all.
        ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
                "fail.fail", clientRole, authMethodName);
        channel.writeInbound(connect);
        Object connectResponse = getResponse();
        assertTrue(connectResponse instanceof CommandConnected);
        assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole);
        assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole);
        assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
        assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
        assertEquals(serverCnx.getAuthRole(), proxyRole);
        assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);

        // Lookup
        TopicName topicName = TopicName.get("persistent://public/default/test-topic");
        ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
        channel.writeInbound(lookup);
        Object lookupResponse = getResponse();
        assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
        assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1);
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData());
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getOriginalAuthData());

        // producer
        ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false);
        channel.writeInbound(producer);
        Object producerResponse = getResponse();
        assertTrue(producerResponse instanceof CommandError);
        assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandError) producerResponse).getRequestId(), 2);
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getOriginalAuthData());
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData());

        // consumer
        String subscriptionName = "test-subscribe";
        ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3,
                CommandSubscribe.SubType.Shared, 0, "consumer", 0);
        channel.writeInbound(subscribe);
        Object subscribeResponse = getResponse();
        assertTrue(subscribeResponse instanceof CommandError);
        assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
        verify(authorizationService, times(1)).allowTopicOperationAsync(
                eq(topicName), eq(TopicOperation.CONSUME),
                eq(clientRole), argThat(arg -> {
                    assertTrue(arg instanceof AuthenticationDataSubscription);
                    assertEquals(arg.getCommandData(), clientRole);
                    assertEquals(arg.getSubscription(), subscriptionName);
                    return true;
                }));
        verify(authorizationService, times(1)).allowTopicOperationAsync(
                eq(topicName), eq(TopicOperation.CONSUME),
                eq(proxyRole), argThat(arg -> {
                    assertTrue(arg instanceof AuthenticationDataSubscription);
                    assertEquals(arg.getCommandData(), proxyRole);
                    assertEquals(arg.getSubscription(), subscriptionName);
                    return true;
                }));
    }

    // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking
    // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary
    // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the
    // AuthorizationService.
    public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();
        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);
        svcConfig.setAuthenticateOriginalAuthData(false);
        svcConfig.setProxyRoles(Collections.singleton("pass.pass"));

        svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
        AuthorizationService authorizationService =
                spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig,
                        pulsar.getPulsarResources());
        when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
        svcConfig.setAuthorizationEnabled(true);

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // Connect
        // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider
        // to pass authentication and fail authorization
        String proxyRole = "pass.pass";
        String clientRole = "pass.fail";
        ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
                clientRole, null, null);
        channel.writeInbound(connect);
        Object connectResponse = getResponse();
        assertTrue(connectResponse instanceof CommandConnected);
        assertNull(serverCnx.getOriginalAuthData());
        assertNull(serverCnx.getOriginalAuthState());
        assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
        assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
        assertEquals(serverCnx.getAuthRole(), proxyRole);
        assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);

        // Lookup
        TopicName topicName = TopicName.get("persistent://public/default/test-topic");
        ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
        channel.writeInbound(lookup);
        Object lookupResponse = getResponse();
        assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
        assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1);
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData());
        // This test is an example of https://github.com/apache/pulsar/issues/19332. Essentially, we're passing
        // the proxy's auth data because it is all we have. This test should be updated when we resolve that issue.
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getAuthData());

        // producer
        ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false);
        channel.writeInbound(producer);
        Object producerResponse = getResponse();
        assertTrue(producerResponse instanceof CommandError);
        assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandError) producerResponse).getRequestId(), 2);
        // See https://github.com/apache/pulsar/issues/19332 for justification of this assertion.
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getAuthData());
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData());

        // consumer
        String subscriptionName = "test-subscribe";
        ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3,
                CommandSubscribe.SubType.Shared, 0, "consumer", 0);
        channel.writeInbound(subscribe);
        Object subscribeResponse = getResponse();
        assertTrue(subscribeResponse instanceof CommandError);
        assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
        verify(authorizationService, times(1)).allowTopicOperationAsync(
                eq(topicName), eq(TopicOperation.CONSUME),
                eq(clientRole), argThat(arg -> {
                    assertTrue(arg instanceof AuthenticationDataSubscription);
                    // We assert that the role is clientRole and commandData is proxyRole due to
                    // https://github.com/apache/pulsar/issues/19332.
                    assertEquals(arg.getCommandData(), proxyRole);
                    assertEquals(arg.getSubscription(), subscriptionName);
                    return true;
                }));
        verify(authorizationService, times(1)).allowTopicOperationAsync(
                eq(topicName), eq(TopicOperation.CONSUME),
                eq(proxyRole), argThat(arg -> {
                    assertTrue(arg instanceof AuthenticationDataSubscription);
                    assertEquals(arg.getCommandData(), proxyRole);
                    assertEquals(arg.getSubscription(), subscriptionName);
                    return true;
                }));
    }

    // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking
    // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary
    // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the
    // AuthorizationService.
    @Test
    public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exception {
        AuthenticationService authenticationService = mock(AuthenticationService.class);
        AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
        String authMethodName = authenticationProvider.getAuthMethodName();
        when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
        when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
        svcConfig.setAuthenticationEnabled(true);

        svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider");
        AuthorizationService authorizationService =
                spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig,
                        pulsar.getPulsarResources());
        when(brokerService.getAuthorizationService()).thenReturn(authorizationService);
        svcConfig.setAuthorizationEnabled(true);

        resetChannel();
        assertTrue(channel.isActive());
        assertEquals(serverCnx.getState(), State.Start);

        // connect
        // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider
        // to pass authentication and fail authorization
        String clientRole = "pass.fail";
        ByteBuf connect = Commands.newConnect(authMethodName, clientRole, "test");
        channel.writeInbound(connect);

        Object connectResponse = getResponse();
        assertTrue(connectResponse instanceof CommandConnected);
        assertNull(serverCnx.getOriginalAuthData());
        assertNull(serverCnx.getOriginalAuthState());
        assertNull(serverCnx.getOriginalPrincipal());
        assertEquals(serverCnx.getAuthData().getCommandData(), clientRole);
        assertEquals(serverCnx.getAuthRole(), clientRole);
        assertEquals(serverCnx.getAuthState().getAuthRole(), clientRole);

        // lookup
        TopicName topicName = TopicName.get("persistent://public/default/test-topic");
        ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1);
        channel.writeInbound(lookup);
        Object lookupResponse = getResponse();
        assertTrue(lookupResponse instanceof CommandLookupTopicResponse);
        assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1);
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getAuthData());

        // producer
        ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false);
        channel.writeInbound(producer);
        Object producerResponse = getResponse();
        assertTrue(producerResponse instanceof CommandError);
        assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandError) producerResponse).getRequestId(), 2);
        verify(authorizationService, times(1))
                .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getAuthData());

        // consumer
        String subscriptionName = "test-subscribe";
        ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3,
                CommandSubscribe.SubType.Shared, 0, "consumer", 0);
        channel.writeInbound(subscribe);
        Object subscribeResponse = getResponse();
        assertTrue(subscribeResponse instanceof CommandError);
        assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError);
        assertEquals(((CommandError) subscribeResponse).getRequestId(), 3);
        verify(authorizationService, times(1)).allowTopicOperationAsync(
                eq(topicName), eq(TopicOperation.CONSUME),
                eq(clientRole), argThat(arg -> {
                    assertTrue(arg instanceof AuthenticationDataSubscription);
                    assertEquals(arg.getCommandData(), clientRole);
                    assertEquals(arg.getSubscription(), subscriptionName);
                    return true;
                }));
    }

    @Test(timeOut = 30000)
    public void testProducerCommand() throws Exception {
        resetChannel();
        setChannelConnected();

        // test PRODUCER success case
        ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get();

        assertNotNull(topicRef);
        assertEquals(topicRef.getProducers().size(), 1);

        // test PRODUCER error case
        clientCommand = Commands.newProducer(failTopicName, 2, 2,
                "prod-name-2", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        assertTrue(getResponse() instanceof CommandError);
        assertFalse(pulsar.getBrokerService().getTopicReference(failTopicName).isPresent());

        channel.finish();
        assertEquals(topicRef.getProducers().size(), 0);
    }

    @Test(timeOut = 5000)
    public void testDuplicateConcurrentProducerCommand() throws Exception {
        resetChannel();
        setChannelConnected();

        CompletableFuture<Topic> delayFuture = new CompletableFuture<>();
        doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class));
        // Create producer first time
        ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        // Create producer second time
        clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        Object response = getResponse();
        assertTrue(response instanceof CommandError);
        CommandError error = (CommandError) response;
        assertEquals(error.getError(), ServerError.ServiceNotReady);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerOnNotOwnedTopic() throws Exception {
        resetChannel();
        setChannelConnected();

        // Force the case where the broker doesn't own any topic
        doReturn(CompletableFuture.completedFuture(false)).when(namespaceService)
                .isServiceUnitActiveAsync(any(TopicName.class));

        // test PRODUCER failure case
        ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        Object response = getResponse();
        assertEquals(response.getClass(), CommandError.class);

        CommandError errorResponse = (CommandError) response;
        assertEquals(errorResponse.getError(), ServerError.ServiceNotReady);

        assertFalse(pulsar.getBrokerService().getTopicReference(nonOwnedTopicName).isPresent());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerCommandWithAuthorizationPositive() throws Exception {
        AuthorizationService authorizationService = mock(AuthorizationService.class);
        doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
                Mockito.any(), Mockito.any(), Mockito.any());
        doReturn(authorizationService).when(brokerService).getAuthorizationService();
        doReturn(true).when(brokerService).isAuthenticationEnabled();
        resetChannel();
        setChannelConnected();

        // test PRODUCER success case
        ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        assertEquals(getResponse().getClass(), CommandProducerSuccess.class);

        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get();

        assertNotNull(topicRef);
        assertEquals(topicRef.getProducers().size(), 1);

        channel.finish();
        assertEquals(topicRef.getProducers().size(), 0);
    }

    @Test(timeOut = 30000)
    public void testNonExistentTopic() throws Exception {
        AuthorizationService authorizationService =
                spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
        doReturn(authorizationService).when(brokerService).getAuthorizationService();
        doReturn(true).when(brokerService).isAuthorizationEnabled();
        svcConfig.setAuthorizationEnabled(true);
        Field providerField = AuthorizationService.class.getDeclaredField("provider");
        providerField.setAccessible(true);
        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig,
                pulsar.getPulsarResources()));
        providerField.set(authorizationService, authorizationProvider);
        doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());

        // Test producer creation
        resetChannel();
        setChannelConnected();
        ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(newProducerCmd);
        assertTrue(getResponse() instanceof CommandError);
        channel.finish();

        // Test consumer creation
        resetChannel();
        setChannelConnected();
        ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0);
        channel.writeInbound(newSubscribeCmd);
        assertTrue(getResponse() instanceof CommandError);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testClusterAccess() throws Exception {
        svcConfig.setAuthorizationEnabled(true);
        AuthorizationService authorizationService =
                spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
        Field providerField = AuthorizationService.class.getDeclaredField("provider");
        providerField.setAccessible(true);
        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig,
                pulsar.getPulsarResources()));
        providerField.set(authorizationService, authorizationProvider);
        doReturn(authorizationService).when(brokerService).getAuthorizationService();
        doReturn(true).when(brokerService).isAuthorizationEnabled();
        doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
        doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).validateTenantAdminAccess(Mockito.anyString(), Mockito.any(), Mockito.any());
        doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class), Mockito.anyString(),
                any(AuthAction.class));

        resetChannel();
        setChannelConnected();
        ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        resetChannel();
        setChannelConnected();
        clientCommand = Commands.newProducer(topicWithNonLocalCluster, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandError);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testNonExistentTopicSuperUserAccess() throws Exception {
        AuthorizationService authorizationService =
                spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
        doReturn(authorizationService).when(brokerService).getAuthorizationService();
        doReturn(true).when(brokerService).isAuthorizationEnabled();
        Field providerField = AuthorizationService.class.getDeclaredField("provider");
        providerField.setAccessible(true);
        PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, pulsar.getPulsarResources()));
        providerField.set(authorizationService, authorizationProvider);
        doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());

        // Test producer creation
        resetChannel();
        setChannelConnected();
        ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(newProducerCmd);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get();
        assertNotNull(topicRef);
        assertEquals(topicRef.getProducers().size(), 1);
        channel.finish();

        // Test consumer creation
        resetChannel();
        setChannelConnected();
        ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(newSubscribeCmd);
        topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get();
        assertNotNull(topicRef);
        assertTrue(topicRef.getSubscriptions().containsKey(successSubName));
        assertTrue(topicRef.getSubscription(successSubName).getDispatcher().isConsumerConnected());
        assertTrue(getResponse() instanceof CommandSuccess);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerCommandWithAuthorizationNegative() throws Exception {
        AuthorizationService authorizationService = mock(AuthorizationService.class);
        doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
                Mockito.any(), Mockito.any(), Mockito.any());
        doReturn(authorizationService).when(brokerService).getAuthorizationService();
        doReturn(true).when(brokerService).isAuthenticationEnabled();
        doReturn(true).when(brokerService).isAuthorizationEnabled();
        doReturn("prod1").when(brokerService).generateUniqueProducerName();
        resetChannel();
        setChannelConnected();

        ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                null, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandError);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSendCommand() throws Exception {
        resetChannel();
        setChannelConnected();

        ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        // test SEND success
        sendMessage();

        assertTrue(getResponse() instanceof CommandSendReceipt);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSendCommandBeforeCreatingProducer() throws Exception {
        resetChannel();
        setChannelConnected();

        // test SEND before producer is created
        sendMessage();

        // Then expect channel to close
        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSendCommandAfterBrokerClosedProducer() throws Exception {
        resetChannel();
        setChannelConnected();
        setConnectionVersion(ProtocolVersion.v5.getValue());
        serverCnx.cancelKeepAliveTask();

        String producerName = "my-producer";

        ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand1);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        // Call disconnect method on producer to trigger activity similar to unloading
        Producer producer = serverCnx.getProducers().get(1).get();
        assertNotNull(producer);
        producer.disconnect();
        channel.runPendingTasks();
        assertTrue(getResponse() instanceof CommandCloseProducer);

        // Send message and expect no response
        sendMessage();

        // Move clock forward to trigger scheduled clean up task
        Thread.sleep(1000);
        channel.runScheduledPendingTasks();
        assertTrue(channel.outboundMessages().isEmpty());
        assertTrue(channel.isActive());

        // Send message and expect closed connection
        sendMessage();

        // Then expect channel to close
        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws Exception {
        resetChannel();
        setChannelConnected();
        setConnectionVersion(ProtocolVersion.v5.getValue());
        serverCnx.cancelKeepAliveTask();

        String producerName = "my-producer";

        ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand1);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        // Call disconnect method on producer to trigger activity similar to unloading
        Producer producer = serverCnx.getProducers().get(1).get();
        assertNotNull(producer);
        producer.disconnect();
        channel.runPendingTasks();
        assertTrue(getResponse() instanceof CommandCloseProducer);

        // Send message and expect no response
        sendMessage();

        assertTrue(channel.outboundMessages().isEmpty());

        // Move clock forward to trigger scheduled clean up task
        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer2);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        // Send message and expect success
        sendMessage();

        assertTrue(getResponse() instanceof CommandSendReceipt);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws Exception {
        resetChannel();
        setChannelConnected();
        setConnectionVersion(ProtocolVersion.v5.getValue());
        serverCnx.cancelKeepAliveTask();

        String producerName = "my-producer";

        ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand1);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        ByteBuf closeProducer = Commands.newCloseProducer(1,2);
        channel.writeInbound(closeProducer);
        assertTrue(getResponse() instanceof CommandSuccess);

        // Send message and get disconnected
        sendMessage();
        Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
        channel.finish();
    }

    private void sendMessage() {
        MessageMetadata messageMetadata = new MessageMetadata()
                .setPublishTime(System.currentTimeMillis())
                .setProducerName("prod-name")
                .setSequenceId(0);
        ByteBuf data = Unpooled.buffer(1024);

        ByteBuf clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1,
                ChecksumType.None, messageMetadata, data));
        channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
        clientCommand.release();
    }

    @Test(timeOut = 30000)
    public void testUseSameProducerName() throws Exception {
        resetChannel();
        setChannelConnected();

        String producerName = "my-producer";

        ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand1);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        ByteBuf clientCommand2 = Commands.newProducer(successTopicName, 2 /* producer id */, 2 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand2);
        assertTrue(getResponse() instanceof CommandError);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testRecreateSameProducer() throws Exception {
        resetChannel();
        setChannelConnected();

        // Recreating a producer with the same id should succeed

        String producerName = "my-producer";

        ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer1);

        // Producer create succeeds
        Object response = getResponse();
        assertEquals(response.getClass(), CommandProducerSuccess.class);
        assertEquals(((CommandProducerSuccess) response).getRequestId(), 1);

        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 2 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer2);

        // 2nd producer create succeeds as well
        response = getResponse();
        assertEquals(response.getClass(), CommandProducerSuccess.class);
        assertEquals(((CommandProducerSuccess) response).getRequestId(), 2);

        // We should not receive response for 1st producer, since it was cancelled by the close
        assertTrue(channel.outboundMessages().isEmpty());
        assertTrue(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeMultipleTimes() throws Exception {
        resetChannel();
        setChannelConnected();

        Object response;

        // Sending multiple subscribe commands for the same consumer should succeed

        ByteBuf subscribe1 = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe1);

        // 1st subscribe succeeds
        response = getResponse();
        assertEquals(response.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response).getRequestId(), 1);

        ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 2 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe2);

        // 2nd subscribe succeeds
        response = getResponse();
        assertEquals(response.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response).getRequestId(), 2);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testDuplicateConcurrentSubscribeCommand() throws Exception {
        resetChannel();
        setChannelConnected();

        CompletableFuture<Topic> delayFuture = new CompletableFuture<>();
        doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class));
        // Create subscriber first time
        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(clientCommand);

        // Create producer second time
        clientCommand = Commands.newSubscribe(successTopicName, //
                successSubName, 2 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(clientCommand);

        Awaitility.await().untilAsserted(() -> {
            Object response = getResponse();
            assertTrue(response instanceof CommandError, "Response is not CommandError but " + response);
            CommandError error = (CommandError) response;
            assertEquals(error.getError(), ServerError.ConsumerBusy);
        });
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testCreateProducerTimeout() throws Exception {
        resetChannel();
        setChannelConnected();

        // Delay the topic creation in a deterministic way
        CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
        doAnswer(invocationOnMock -> {
            openTopicFuture.complete(() -> {
                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
            });
            return null;
        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        // In a create producer timeout from client side we expect to see this sequence of commands :
        // 1. create producer
        // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
        // 3. create producer (triggered by reconnection logic)

        // These operations need to be serialized, to allow the last create producer to finally succeed
        // (There can be more create/close pairs in the sequence, depending on the client timeout

        String producerName = "my-producer";

        ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer1);

        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
        channel.writeInbound(closeProducer);

        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer2);

        // Complete the topic opening: It will make 2nd producer creation successful
        openTopicFuture.get().run();

        // Close succeeds
        Object response = getResponse();
        assertEquals(response.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response).getRequestId(), 2);

        // 2nd producer will be successfully created as topic is open by then
        response = getResponse();
        assertEquals(response.getClass(), CommandProducerSuccess.class);
        assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);

        assertTrue(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception {
        resetChannel();
        setChannelConnected();

        // Delay the topic creation in a deterministic way
        CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>();
        doAnswer(invocationOnMock -> {
            openTopicFuture.complete(() -> {
                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
            });
            return null;
        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        // In a create producer timeout from client side we expect to see this sequence of commands :
        // 1. create producer
        // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
        // 3. create producer (triggered by reconnection logic)
        // Then, when another producer is created with the same name, it should fail. Because we only have one
        // channel here, we just use a different producer id

        // These operations need to be serialized, to allow the last create producer to finally succeed
        // (There can be more create/close pairs in the sequence, depending on the client timeout

        String producerName = "my-producer";

        ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer1);

        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
        channel.writeInbound(closeProducer);

        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer2);

        // Complete the topic opening: It will make 2nd producer creation successful
        openTopicFuture.get().run();

        // Close succeeds
        Object response = getResponse();
        assertEquals(response.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response).getRequestId(), 2);

        // 2nd producer will be successfully created as topic is open by then
        response = getResponse();
        assertEquals(response.getClass(), CommandProducerSuccess.class);
        assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);

        // Send create command after getting the CommandProducerSuccess to ensure correct ordering
        ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer3);

        // 3nd producer will fail
        response = getResponse();
        assertEquals(response.getClass(), CommandError.class);
        assertEquals(((CommandError) response).getRequestId(), 4);

        assertTrue(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000, enabled = false)
    public void testCreateProducerMultipleTimeouts() throws Exception {
        resetChannel();
        setChannelConnected();

        // Delay the topic creation in a deterministic way
        CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
        doAnswer(new Answer<Object>() {
            @Override
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                topicCreationDelayLatch.await();

                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
                return null;
            }
        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        // In a create producer timeout from client side we expect to see this sequence of commands :
        // 1. create producer
        // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
        // 3. create producer (triggered by reconnection logic)

        // These operations need to be serialized, to allow the last create producer to finally succeed
        // (There can be more create/close pairs in the sequence, depending on the client timeout

        String producerName = "my-producer";

        ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer1);

        ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
        channel.writeInbound(closeProducer1);

        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer2);

        ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* producer id */, 4 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer3);

        ByteBuf createProducer4 = Commands.newProducer(successTopicName, 1 /* producer id */, 5 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer4);

        // Close succeeds
        Object response = getResponse();
        assertEquals(response.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response).getRequestId(), 2);

        // Now allow topic creation to complete
        topicCreationDelayLatch.countDown();

        // 1st producer it's not acked

        // 2nd producer fails
        response = getResponse();
        assertEquals(response.getClass(), CommandError.class);
        assertEquals(((CommandError) response).getRequestId(), 3);

        // 3rd producer fails
        response = getResponse();
        assertEquals(response.getClass(), CommandError.class);
        assertEquals(((CommandError) response).getRequestId(), 4);

        // 4nd producer fails
        response = getResponse();
        assertEquals(response.getClass(), CommandError.class);
        assertEquals(((CommandError) response).getRequestId(), 5);

        Thread.sleep(100);

        // We should not receive response for 1st producer, since it was cancelled by the close
        assertTrue(channel.outboundMessages().isEmpty());
        assertTrue(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000, skipFailedInvocations = true)
    public void testCreateProducerBookieTimeout() throws Exception {
        resetChannel();
        setChannelConnected();

        // Delay the topic creation in a deterministic way
        CompletableFuture<Runnable> openFailedTopic = new CompletableFuture<>();
        doAnswer(invocationOnMock -> {
            openFailedTopic.complete(() -> {
                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
            });
            return null;
        }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        // In a create producer timeout from client side we expect to see this sequence of commands :
        // 1. create a failure producer which will timeout creation after 100msec
        // 2. close producer
        // 3. Recreate producer (triggered by reconnection logic)
        // 4. Wait till the timeout of 1, and create producer again.

        // These operations need to be serialized, to allow the last create producer to finally succeed
        // (There can be more create/close pairs in the sequence, depending on the client timeout

        String producerName = "my-producer";

        ByteBuf createProducer1 = Commands.newProducer(failTopicName, 1 /* producer id */, 1 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer1);

        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ );
        channel.writeInbound(closeProducer);

        ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer2);

        // Now the topic gets opened.. It will make 2nd producer creation successful
        openFailedTopic.get().run();

        // Close succeeds
        Object response = getResponse();
        assertEquals(response.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response).getRequestId(), 2);

        // 2nd producer success as topic is opened
        response = getResponse();
        assertEquals(response.getClass(), CommandProducerSuccess.class);
        assertEquals(((CommandProducerSuccess) response).getRequestId(), 3);

        // Wait till the failtopic timeout interval
        Thread.sleep(500);
        ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* producer id */, 4 /* request id */,
                producerName, Collections.emptyMap(), false);
        channel.writeInbound(createProducer3);

        // 3rd producer succeeds because 2nd is already connected
        response = getResponse();
        assertEquals(response.getClass(), CommandProducerSuccess.class);
        assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);

        Thread.sleep(500);

        // We should not receive response for 1st producer, since it was cancelled by the close
        assertTrue(channel.outboundMessages().isEmpty());
        assertTrue(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeTimeout() throws Exception {
        resetChannel();
        setChannelConnected();

        // Delay the topic creation in a deterministic way
        CompletableFuture<Runnable> openTopicTask = new CompletableFuture<>();
        doAnswer(invocationOnMock -> {
            openTopicTask.complete(() -> {
                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
            });

            return null;
        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        // In a subscribe timeout from client side we expect to see this sequence of commands :
        // 1. Subscribe
        // 2. close consumer (when the timeout is triggered, which may be before the consumer was created on the broker)
        // 3. Subscribe (triggered by reconnection logic)

        // These operations need to be serialized, to allow the last subscribe operation to finally succeed
        // (There can be more subscribe/close pairs in the sequence, depending on the client timeout

        ByteBuf subscribe1 = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe1);

        ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe2);

        ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe3);

        ByteBuf subscribe4 = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 5 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe4);

        openTopicTask.get().run();

        Object response;

        synchronized (this) {

            // All other subscribe should fail
            response = getResponse();
            assertEquals(response.getClass(), CommandError.class);
            assertEquals(((CommandError) response).getRequestId(), 3);

            response = getResponse();
            assertEquals(response.getClass(), CommandError.class);
            assertEquals(((CommandError) response).getRequestId(), 4);

            response = getResponse();
            assertEquals(response.getClass(), CommandError.class);
            assertEquals(((CommandError) response).getRequestId(), 5);

            // We should receive response for 1st producer, since it was not cancelled by the close
            Awaitility.await().untilAsserted(() -> assertFalse(channel.outboundMessages().isEmpty()));

            assertTrue(channel.isActive());
            response = getResponse();
            assertEquals(response.getClass(), CommandSuccess.class);
            assertEquals(((CommandSuccess) response).getRequestId(), 1);
        }

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeBookieTimeout() throws Exception {
        resetChannel();
        setChannelConnected();

        // Delay the topic creation in a deterministic way
        CompletableFuture<Runnable> openTopicSuccess = new CompletableFuture<>();
        doAnswer(invocationOnMock -> {
            openTopicSuccess.complete(() -> {
                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
            });
            return null;
        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        CompletableFuture<Runnable> openTopicFail = new CompletableFuture<>();
        doAnswer(invocationOnMock -> {
            openTopicFail.complete(() -> {
                ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
                        .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
            });
            return null;
        }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        // In a subscribe timeout from client side we expect to see this sequence of commands :
        // 1. Subscribe against failtopic which will fail after 100msec
        // 2. close consumer
        // 3. Resubscribe (triggered by reconnection logic)
        // 4. Wait till the timeout of 1, and subscribe again.

        // These operations need to be serialized, to allow the last subscribe operation to finally succeed
        // (There can be more subscribe/close pairs in the sequence, depending on the client timeout
        ByteBuf subscribe1 = Commands.newSubscribe(failTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe1);

        ByteBuf closeConsumer = Commands.newCloseConsumer(1 /* consumer id */, 2 /* request id */ );
        channel.writeInbound(closeConsumer);

        ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe2);

        openTopicFail.get().run();

        Object response;

        // Close succeeds
        response = getResponse();
        assertEquals(response.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response).getRequestId(), 2);

        // Subscribe fails
        response = getResponse();
        assertEquals(response.getClass(), CommandError.class);
        assertEquals(((CommandError) response).getRequestId(), 3);

        Awaitility.await().until(() -> !serverCnx.hasConsumer(1));

        ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(subscribe3);

        openTopicSuccess.get().run();

        // Subscribe succeeds
        response = getResponse();
        assertEquals(response.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response).getRequestId(), 4);

        Thread.sleep(100);

        // We should not receive response for 1st producer, since it was cancelled by the close
        assertTrue(channel.outboundMessages().isEmpty());
        assertTrue(channel.isActive());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeCommand() throws Exception {
        final String failSubName = "failSub";

        resetChannel();
        setChannelConnected();
        doReturn(false).when(brokerService).isAuthenticationEnabled();
        doReturn(false).when(brokerService).isAuthorizationEnabled();
        // test SUBSCRIBE on topic and cursor creation success
        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandSuccess);

        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get();

        assertNotNull(topicRef);
        assertTrue(topicRef.getSubscriptions().containsKey(successSubName));
        assertTrue(topicRef.getSubscription(successSubName).getDispatcher().isConsumerConnected());

        // test SUBSCRIBE on topic creation success and cursor failure
        clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive,
                0, "test", 0 /*avoid reseting cursor*/);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandError);

        // test SUBSCRIBE on topic creation failure
        clientCommand = Commands.newSubscribe(failTopicName, successSubName, 3, 3, SubType.Exclusive,
                0, "test", 0 /*avoid reseting cursor*/);
        channel.writeInbound(clientCommand);
        assertEquals(getResponse().getClass(), CommandError.class);

        // Server will not close the connection
        assertTrue(channel.isOpen());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testUnsupportedBatchMsgSubscribeCommand() throws Exception {
        final String failSubName = "failSub";

        resetChannel();
        setChannelConnected();
        setConnectionVersion(ProtocolVersion.v3.getValue());
        doReturn(false).when(brokerService).isAuthenticationEnabled();
        doReturn(false).when(brokerService).isAuthorizationEnabled();
        // test SUBSCRIBE on topic and cursor creation success
        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0 /* priority */,
                "test" /* consumer name */, 0 /*avoid reseting cursor*/);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandSuccess);

        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get();
        topicRef.markBatchMessagePublished();
        // test SUBSCRIBE on topic and cursor creation success
        clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive, 0 /* priority */,
                "test" /* consumer name */, 0 /*avoid reseting cursor*/);
        channel.writeInbound(clientCommand);
        Object response = getResponse();
        assertTrue(response instanceof CommandError);
        assertEquals(ServerError.UnsupportedVersionError, ((CommandError) response).getError());

        // Server will not close the connection
        assertTrue(channel.isOpen());

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeCommandWithAuthorizationPositive() throws Exception {
        AuthorizationService authorizationService = mock(AuthorizationService.class);
        doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
                Mockito.any(), Mockito.any(), Mockito.any());
        doReturn(authorizationService).when(brokerService).getAuthorizationService();
        doReturn(true).when(brokerService).isAuthenticationEnabled();
        doReturn(true).when(brokerService).isAuthorizationEnabled();
        resetChannel();
        setChannelConnected();

        // test SUBSCRIBE on topic and cursor creation success
        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                "test" /* consumer name */, 0 /* avoid reseting cursor */);
        channel.writeInbound(clientCommand);

        assertTrue(getResponse() instanceof CommandSuccess);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSubscribeCommandWithAuthorizationNegative() throws Exception {
        AuthorizationService authorizationService = mock(AuthorizationService.class);
        doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
                Mockito.any(), Mockito.any(), Mockito.any());
        doReturn(authorizationService).when(brokerService).getAuthorizationService();
        doReturn(true).when(brokerService).isAuthenticationEnabled();
        doReturn(true).when(brokerService).isAuthorizationEnabled();

        resetChannel();
        setChannelConnected();

        // test SUBSCRIBE on topic and cursor creation success
        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0 /*avoid reseting cursor*/);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandError);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testAckCommand() throws Exception {
        resetChannel();
        setChannelConnected();

        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, successSubName, 1 /* consumer id */,
                1 /*
                   * request id
                   */, SubType.Exclusive, 0, "test" /* consumer name */, 0 /*avoid reseting cursor*/);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandSuccess);

        PositionImpl pos = new PositionImpl(0, 0);

        clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual,
                                        null, Collections.emptyMap(), -1);
        channel.writeInbound(clientCommand);

        // verify nothing is sent out on the wire after ack
        assertNull(channel.outboundMessages().peek());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testFlowCommand() throws Exception {
        resetChannel();
        setChannelConnected();

        ByteBuf clientCommand = Commands.newSubscribe(successTopicName, successSubName, //
                1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */,
                0 /* avoid reseting cursor */);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandSuccess);

        clientCommand = Commands.newFlow(1 /* consumer id */, 1 /* message permits */);
        channel.writeInbound(clientCommand);

        // cursor is mocked
        // verify nothing is sent out on the wire after ack
        assertNull(channel.outboundMessages().peek());
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception {
        resetChannel();
        setChannelConnected();

        // Set encryption_required to true
        Policies policies = mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        // add `clusterDispatchRate` otherwise there will be a NPE
        // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
        policies.clusterDispatchRate = Maps.newHashMap();
        doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

        // test success case: encrypted producer can connect
        ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */,
                "encrypted-producer", true, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        Object response = getResponse();
        assertEquals(response.getClass(), CommandProducerSuccess.class);
        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get();
        assertNotNull(topicRef);
        assertEquals(topicRef.getProducers().size(), 1);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerFailureOnEncryptionRequiredTopic() throws Exception {
        resetChannel();
        setChannelConnected();

        // Set encryption_required to true
        Policies policies = mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        // add `clusterDispatchRate` otherwise there will be a NPE
        // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
        policies.clusterDispatchRate = Maps.newHashMap();
        doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

        // test failure case: unencrypted producer cannot connect
        ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* request id */,
                "unencrypted-producer", false, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        Object response = getResponse();
        assertEquals(response.getClass(), CommandError.class);
        CommandError errorResponse = (CommandError) response;
        assertEquals(errorResponse.getError(), ServerError.MetadataError);
        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get();
        assertNotNull(topicRef);
        assertEquals(topicRef.getProducers().size(), 0);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testProducerFailureOnEncryptionRequiredOnBroker() throws Exception {
        // (a) Set encryption-required at broker level
        pulsar.getConfig().setEncryptionRequireOnProducer(true);
        resetChannel();
        setChannelConnected();

        // (b) Set encryption_required to false on policy
        Policies policies = mock(Policies.class);
        // Namespace policy doesn't require encryption
        policies.encryption_required = false;
        policies.topicDispatchRate = Maps.newHashMap();
        // add `clusterDispatchRate` otherwise there will be a NPE
        policies.clusterDispatchRate = Maps.newHashMap();
        doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

        // test failure case: unencrypted producer cannot connect
        ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* request id */,
                "unencrypted-producer", false, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        Object response = getResponse();
        assertEquals(response.getClass(), CommandError.class);
        CommandError errorResponse = (CommandError) response;
        assertEquals(errorResponse.getError(), ServerError.MetadataError);
        PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get();
        assertNotNull(topicRef);
        assertEquals(topicRef.getProducers().size(), 0);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSendSuccessOnEncryptionRequiredTopic() throws Exception {
        resetChannel();
        setChannelConnected();

        // Set encryption_required to true
        Policies policies = mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        // add `clusterDispatchRate` otherwise there will be a NPE
        // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
        policies.clusterDispatchRate = Maps.newHashMap();
        doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

        ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", true, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        // test success case: encrypted messages can be published
        MessageMetadata messageMetadata = new MessageMetadata()
                .setPublishTime(System.currentTimeMillis())
                .setProducerName("prod-name")
                .setSequenceId(0);
        messageMetadata.addEncryptionKey()
                .setKey("testKey")
                .setValue("testVal".getBytes());
        ByteBuf data = Unpooled.buffer(1024);

        clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data));
        channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
        clientCommand.release();
        assertTrue(getResponse() instanceof CommandSendReceipt);
        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testSendFailureOnEncryptionRequiredTopic() throws Exception {
        resetChannel();
        setChannelConnected();

        // Set encryption_required to true
        Policies policies = mock(Policies.class);
        policies.encryption_required = true;
        policies.topicDispatchRate = Maps.newHashMap();
        // add `clusterDispatchRate` otherwise there will be a NPE
        // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
        policies.clusterDispatchRate = Maps.newHashMap();
        doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
                .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());

        ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", true, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        assertTrue(getResponse() instanceof CommandProducerSuccess);

        // test failure case: unencrypted messages cannot be published
        MessageMetadata messageMetadata = new MessageMetadata()
                .setPublishTime(System.currentTimeMillis())
                .setProducerName("prod-name")
                .setSequenceId(0);
        ByteBuf data = Unpooled.buffer(1024);

        clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data));
        channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
        clientCommand.release();
        assertTrue(getResponse() instanceof CommandSendError);
        channel.finish();
    }

    protected void resetChannel() throws Exception {
        int MaxMessageSize = 5 * 1024 * 1024;
        if (channel != null && channel.isActive()) {
            serverCnx.close();
            channel.close().get();
        }
        serverCnx = new ServerCnx(pulsar);
        serverCnx.setAuthRole("");
        channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
                MaxMessageSize,
                0,
                4,
                0,
                4),
                (ChannelHandler) serverCnx);
    }

    protected void setChannelConnected() throws Exception {
        Field channelState = ServerCnx.class.getDeclaredField("state");
        channelState.setAccessible(true);
        channelState.set(serverCnx, State.Connected);
    }

    private void setConnectionVersion(int version) throws Exception {
        PulsarHandler cnx = serverCnx;
        Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion");
        versionField.setAccessible(true);
        versionField.set(cnx, version);
    }

    protected Object getResponse() throws Exception {
        // Wait at most for 10s to get a response
        final long sleepTimeMs = 10;
        final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs;
        for (int i = 0; i < iterations; i++) {
            if (!channel.outboundMessages().isEmpty()) {
                Object outObject = channel.outboundMessages().remove();
                return clientChannelHelper.getCommand(outObject);
            } else {
                Thread.sleep(sleepTimeMs);
            }
        }

        throw new IOException("Failed to get response from socket within 10s");
    }

    private void setupMLAsyncCallbackMocks() {
        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();

        // call openLedgerComplete with ledgerMock on ML factory asyncOpen
        doAnswer(new Answer<Object>() {
            @Override
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300);
                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
                return null;
            }
        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        // call openLedgerFailed on ML factory asyncOpen
        doAnswer(new Answer<Object>() {
            @Override
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(300);
                new Thread(() -> {
                    ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
                            .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
                }).start();

                return null;
            }
        }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                any(OpenLedgerCallback.class), any(Supplier.class), any());

        // call addComplete on ledger asyncAddEntry
        doAnswer(new Answer<Object>() {
            @Override
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(
                        new PositionImpl(-1, -1),
                        null,
                        invocationOnMock.getArguments()[2]);
                return null;
            }
        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());

        doAnswer((Answer<Object>) invocationOnMock -> true).when(cursorMock).isDurable();

        doAnswer((Answer<Object>) invocationOnMock -> {
            Thread.sleep(300);
            ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
            return null;
        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());

        doAnswer((Answer<Object>) invocationOnMock -> {
            Thread.sleep(300);
            ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
            return null;
        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
                any(OpenCursorCallback.class), any());

        doAnswer((Answer<Object>) invocationOnMock -> {
            Thread.sleep(300);
            ((OpenCursorCallback) invocationOnMock.getArguments()[2])
                    .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
            return null;
        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any());

        doAnswer((Answer<Object>) invocationOnMock -> {
            Thread.sleep(300);
            ((OpenCursorCallback) invocationOnMock.getArguments()[3])
                    .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
            return null;
        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class),
                any(OpenCursorCallback.class), any());

        doAnswer((Answer<Object>) invocationOnMock -> {
            ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
            return null;
        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any());

        doAnswer((Answer<Object>) invocationOnMock -> {
            ((DeleteCursorCallback) invocationOnMock.getArguments()[1])
                    .deleteCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
            return null;
        }).when(ledgerMock).asyncDeleteCursor(matches(".*fail.*"), any(DeleteCursorCallback.class), any());

        doAnswer((Answer<Object>) invocationOnMock -> {
            ((CloseCallback) invocationOnMock.getArguments()[0]).closeComplete(null);
            return null;
        }).when(cursorMock).asyncClose(any(CloseCallback.class), any());

        doReturn(successSubName).when(cursorMock).getName();
    }

    @Test(timeOut = 30000)
    public void testInvalidTopicOnLookup() throws Exception {
        resetChannel();
        setChannelConnected();

        String invalidTopicName = "xx/ass/aa/aaa";

        resetChannel();
        setChannelConnected();


        channel.writeInbound(Commands.newLookup(invalidTopicName, true, 1));
        Object obj = getResponse();
        assertEquals(obj.getClass(), CommandLookupTopicResponse.class);
        CommandLookupTopicResponse res = (CommandLookupTopicResponse) obj;
        assertEquals(res.getError(), ServerError.InvalidTopicName);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testInvalidTopicOnProducer() throws Exception {
        resetChannel();
        setChannelConnected();

        String invalidTopicName = "xx/ass/aa/aaa";

        resetChannel();
        setChannelConnected();

        ByteBuf clientCommand = Commands.newProducer(invalidTopicName, 1 /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);
        Object obj = getResponse();
        assertEquals(obj.getClass(), CommandError.class);
        CommandError res = (CommandError) obj;
        assertEquals(res.getError(), ServerError.InvalidTopicName);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testInvalidTopicOnSubscribe() throws Exception {
        resetChannel();
        setChannelConnected();

        String invalidTopicName = "xx/ass/aa/aaa";

        resetChannel();
        setChannelConnected();

        channel.writeInbound(Commands.newSubscribe(invalidTopicName, "test-subscription", 1, 1, SubType.Exclusive, 0,
                "consumerName", 0 /*avoid reseting cursor*/));
        Object obj = getResponse();
        assertEquals(obj.getClass(), CommandError.class);
        CommandError res = (CommandError) obj;
        assertEquals(res.getError(), ServerError.InvalidTopicName);

        channel.finish();
    }

    @Test
    public void testDelayedClosedProducer() throws Exception {
        resetChannel();
        setChannelConnected();

        CompletableFuture<Topic> delayFuture = new CompletableFuture<>();
        doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class));
        // Create producer first time
        int producerId = 1;
        ByteBuf clientCommand = Commands.newProducer(successTopicName, producerId /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        ByteBuf closeProducerCmd = Commands.newCloseProducer(producerId, 2);
        channel.writeInbound(closeProducerCmd);

        Topic topic = mock(Topic.class);
        doReturn(CompletableFuture.completedFuture(topic)).when(brokerService).getOrCreateTopic(any(String.class));
        doReturn(CompletableFuture.completedFuture(false)).when(topic).hasSchema();

        clientCommand = Commands.newProducer(successTopicName, producerId /* producer id */, 1 /* request id */,
                "prod-name", Collections.emptyMap(), false);
        channel.writeInbound(clientCommand);

        Object response = getResponse();
        assertTrue(response instanceof CommandSuccess);

        channel.finish();
    }

    @Test(timeOut = 30000)
    public void testTopicIsNotReady() throws Exception {
        resetChannel();
        setChannelConnected();

        // 1st subscribe command to load the topic
        ByteBuf clientCommand1 = Commands.newSubscribe(successTopicName, successSubName, 1 /* consumer id */,
                1 /* request id */, SubType.Shared, 0 /* priority level */, "c1" /* consumer name */,
                0 /* avoid reseting cursor */);
        channel.writeInbound(clientCommand1);

        Object response1 = getResponse();
        assertEquals(response1.getClass(), CommandSuccess.class);
        assertEquals(((CommandSuccess) response1).getRequestId(), 1);

        // Force the checkTopicNsOwnership method to throw ServiceUnitNotReadyException
        doReturn(FutureUtil.failedFuture(new ServiceUnitNotReadyException("Service unit is not ready")))
                .when(brokerService).checkTopicNsOwnership(anyString());

        // 2nd subscribe command when the service unit is not ready
        ByteBuf clientCommand2 = Commands.newSubscribe(successTopicName, successSubName, 2 /* consumer id */,
                2 /* request id */, SubType.Shared, 0 /* priority level */, "c2" /* consumer name */,
                0 /* avoid reseting cursor */);
        channel.writeInbound(clientCommand2);

        Object response2 = getResponse();
        assertEquals(response2.getClass(), CommandError.class);
        assertEquals(((CommandError) response2).getError(), ServerError.ServiceNotReady);
        assertEquals(((CommandError) response2).getRequestId(), 2);

        // Producer command when the service unit is not ready
        ByteBuf clientCommand3 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */,
                "p1" /* producer name */, Collections.emptyMap(), false);
        channel.writeInbound(clientCommand3);

        Object response3 = getResponse();
        assertEquals(response3.getClass(), CommandError.class);
        assertEquals(((CommandError) response3).getError(), ServerError.ServiceNotReady);
        assertEquals(((CommandError) response3).getRequestId(), 3);

        channel.finish();
    }

    @Test(enabled = false)
    public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
        // Mock ServerCnx.field: consumers
        ConcurrentLongHashMap.Builder mapBuilder = Mockito.mock(ConcurrentLongHashMap.Builder.class);
        Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder);
        Mockito.when(mapBuilder.concurrencyLevel(Mockito.anyInt())).thenReturn(mapBuilder);
        ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class);
        Mockito.when(mapBuilder.build()).thenReturn(consumers);
        ArgumentCaptor<Long> ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class);
        final ArgumentCaptor<CompletableFuture> deleteTimesMark = ArgumentCaptor.forClass(CompletableFuture.class);
        Mockito.when(consumers.remove(ignoreArgumentCaptor.capture())).thenReturn(true);
        Mockito.when(consumers.remove(ignoreArgumentCaptor.capture(), deleteTimesMark.capture())).thenReturn(true);
        // case1: exists existingConsumerFuture, already complete or delay done after execute 'isDone()' many times
        // case2: exists existingConsumerFuture, delay complete after execute 'isDone()' many times
        // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
        // Try a delay of 3 stages. The simulation is successful after repeated judgments.
        for(AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1);
                                            futureWillDoneAfterDelayTimes.intValue() <= 3;
                                            futureWillDoneAfterDelayTimes.incrementAndGet()){
            final AtomicInteger futureCallTimes = new AtomicInteger();
            final Consumer mockConsumer = Mockito.mock(Consumer.class);
            CompletableFuture existingConsumerFuture = new CompletableFuture<Consumer>(){

                private boolean complete;

                // delay complete after execute 'isDone()' many times
                @Override
                public boolean isDone() {
                    if (complete) {
                        return true;
                    }
                    int executeIsDoneCommandTimes = futureCallTimes.incrementAndGet();
                    return executeIsDoneCommandTimes >= futureWillDoneAfterDelayTimes.intValue();
                }

                // if trig "getNow()", then complete
                @Override
                public Consumer get(){
                    complete = true;
                    return mockConsumer;
                }

                // if trig "get()", then complete
                @Override
                public Consumer get(long timeout, TimeUnit unit){
                    complete = true;
                    return mockConsumer;
                }

                // if trig "get()", then complete
                @Override
                public Consumer getNow(Consumer ifAbsent){
                    complete = true;
                    return mockConsumer;
                }

                // never fail
                public boolean isCompletedExceptionally(){
                    return false;
                }
            };
            Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
            // do test: delay complete after execute 'isDone()' many times
            // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
            try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
                // Inject consumers to ServerCnx
                theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder);
                // reset channels( serverChannel, clientChannel )
                resetChannel();
                setChannelConnected();
                // auth check disable
                doReturn(false).when(brokerService).isAuthenticationEnabled();
                doReturn(false).when(brokerService).isAuthorizationEnabled();
                // do subscribe
                ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                        successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                        "test" /* consumer name */, 0 /* avoid reseting cursor */);
                channel.writeInbound(clientCommand);
                Object responseObj = getResponse();
                Predicate<Object> responseAssert = obj -> {
                    if (responseObj instanceof CommandSuccess) {
                        return true;
                    }
                    if (responseObj instanceof CommandError) {
                        CommandError commandError = (CommandError) responseObj;
                        return ServerError.ServiceNotReady == commandError.getError();
                    }
                    return false;
                };
                // assert no consumer-delete event occur
                assertFalse(deleteTimesMark.getAllValues().contains(existingConsumerFuture));
                // assert without another error occur
                assertTrue(responseAssert.test(responseAssert));
                // Server will not close the connection
                assertTrue(channel.isOpen());
                channel.finish();
            }
        }
        // case3: exists existingConsumerFuture, already complete and exception
        CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
        Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
        // make consumerFuture delay finish
        Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
        // when sync get return, future will return success value.
        Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
        Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
                thenThrow(new NullPointerException());
        Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
        Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
        try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
            // Inject consumers to ServerCnx
            theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder);
            // reset channels( serverChannel, clientChannel )
            resetChannel();
            setChannelConnected();
            // auth check disable
            doReturn(false).when(brokerService).isAuthenticationEnabled();
            doReturn(false).when(brokerService).isAuthorizationEnabled();
            // do subscribe
            ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                    successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
                    "test" /* consumer name */, 0 /* avoid reseting cursor */);
            channel.writeInbound(clientCommand);
            Object responseObj = getResponse();
            Predicate<Object> responseAssert = obj -> {
                if (responseObj instanceof CommandError) {
                    CommandError commandError = (CommandError) responseObj;
                    return ServerError.ServiceNotReady != commandError.getError();
                }
                return false;
            };
            // assert error response
            assertTrue(responseAssert.test(responseAssert));
            // assert consumer-delete event occur
            assertEquals(1L,
                    deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
            // Server will not close the connection
            assertTrue(channel.isOpen());
            channel.finish();
        }
    }
}
