| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; |
| import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; |
| import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.ArgumentMatchers.argThat; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.Mockito.CALLS_REAL_METHODS; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.matches; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertTrue; |
| import com.google.common.collect.Maps; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| import io.netty.channel.ChannelHandler; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.embedded.EmbeddedChannel; |
| import io.netty.channel.nio.NioEventLoopGroup; |
| import io.netty.handler.codec.LengthFieldBasedFrameDecoder; |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Predicate; |
| import java.util.function.Supplier; |
| import org.apache.bookkeeper.common.util.OrderedExecutor; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; |
| import org.apache.bookkeeper.mledger.ManagedCursor; |
| import org.apache.bookkeeper.mledger.ManagedLedger; |
| import org.apache.bookkeeper.mledger.ManagedLedgerConfig; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerFactory; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider; |
| import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; |
| import org.apache.pulsar.broker.auth.MockAuthenticationProvider; |
| import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider; |
| import org.apache.pulsar.broker.authentication.AuthenticationProvider; |
| import org.apache.pulsar.broker.authentication.AuthenticationService; |
| import org.apache.pulsar.broker.authorization.AuthorizationService; |
| import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; |
| import org.apache.pulsar.broker.cache.ConfigurationCacheService; |
| import org.apache.pulsar.broker.intercept.BrokerInterceptor; |
| import org.apache.pulsar.broker.namespace.NamespaceService; |
| import org.apache.pulsar.broker.resources.NamespaceResources; |
| import org.apache.pulsar.broker.resources.PulsarResources; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; |
| import org.apache.pulsar.broker.service.ServerCnx.State; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; |
| import org.apache.pulsar.broker.service.utils.ClientChannelHelper; |
| import org.apache.pulsar.common.api.AuthData; |
| import org.apache.pulsar.common.api.proto.AuthMethod; |
| import org.apache.pulsar.common.api.proto.BaseCommand; |
| import org.apache.pulsar.common.api.proto.BaseCommand.Type; |
| import org.apache.pulsar.common.api.proto.CommandAck.AckType; |
| import org.apache.pulsar.common.api.proto.CommandAuthChallenge; |
| import org.apache.pulsar.common.api.proto.CommandAuthResponse; |
| import org.apache.pulsar.common.api.proto.CommandCloseProducer; |
| import org.apache.pulsar.common.api.proto.CommandConnected; |
| import org.apache.pulsar.common.api.proto.CommandError; |
| import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; |
| import org.apache.pulsar.common.api.proto.CommandProducerSuccess; |
| import org.apache.pulsar.common.api.proto.CommandSendError; |
| import org.apache.pulsar.common.api.proto.CommandSendReceipt; |
| import org.apache.pulsar.common.api.proto.CommandSubscribe; |
| import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; |
| import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; |
| import org.apache.pulsar.common.api.proto.CommandSuccess; |
| import org.apache.pulsar.common.api.proto.MessageMetadata; |
| import org.apache.pulsar.common.api.proto.ProtocolVersion; |
| import org.apache.pulsar.common.api.proto.ServerError; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.AuthAction; |
| import org.apache.pulsar.common.policies.data.Policies; |
| import org.apache.pulsar.common.policies.data.TopicOperation; |
| import org.apache.pulsar.common.protocol.ByteBufPair; |
| import org.apache.pulsar.common.protocol.Commands; |
| import org.apache.pulsar.common.protocol.Commands.ChecksumType; |
| import org.apache.pulsar.common.protocol.PulsarHandler; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; |
| import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; |
| import org.apache.pulsar.metadata.impl.ZKMetadataStore; |
| import org.apache.pulsar.zookeeper.ZooKeeperDataCache; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.awaitility.Awaitility; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.MockedStatic; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| @SuppressWarnings("unchecked") |
| @Test(groups = "broker") |
| public class ServerCnxTest { |
| protected EmbeddedChannel channel; |
| private ServiceConfiguration svcConfig; |
| private ServerCnx serverCnx; |
| protected BrokerService brokerService; |
| private ManagedLedgerFactory mlFactoryMock; |
| private ClientChannelHelper clientChannelHelper; |
| private PulsarService pulsar; |
| private MetadataStoreExtended store; |
| private ConfigurationCacheService configCacheService; |
| private NamespaceResources namespaceResources; |
| protected NamespaceService namespaceService; |
| private final int currentProtocolVersion = ProtocolVersion.values()[ProtocolVersion.values().length - 1] |
| .getValue(); |
| |
| protected final String successTopicName = "persistent://prop/use/ns-abc/successTopic"; |
| private final String failTopicName = "persistent://prop/use/ns-abc/failTopic"; |
| private final String nonOwnedTopicName = "persistent://prop/use/ns-abc/success-not-owned-topic"; |
| private final String encryptionRequiredTopicName = "persistent://prop/use/ns-abc/successEncryptionRequiredTopic"; |
| private final String successSubName = "successSub"; |
| private final String nonExistentTopicName = "persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic"; |
| private final String topicWithNonLocalCluster = "persistent://prop/usw/ns-abc/successTopic"; |
| |
| private final ManagedLedger ledgerMock = mock(ManagedLedger.class); |
| private final ManagedCursor cursorMock = mock(ManagedCursor.class); |
| private OrderedExecutor executor; |
| private EventLoopGroup eventLoopGroup; |
| |
| @BeforeMethod |
| public void setup() throws Exception { |
| eventLoopGroup = new NioEventLoopGroup(); |
| executor = OrderedExecutor.newBuilder().numThreads(1).build(); |
| svcConfig = spy(ServiceConfiguration.class); |
| svcConfig.setBrokerShutdownTimeoutMs(0L); |
| pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); |
| doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService(); |
| |
| svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS)); |
| svcConfig.setBacklogQuotaCheckEnabled(false); |
| doReturn(svcConfig).when(pulsar).getConfiguration(); |
| doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources(); |
| |
| doReturn("use").when(svcConfig).getClusterName(); |
| |
| mlFactoryMock = mock(ManagedLedgerFactory.class); |
| doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); |
| |
| ZooKeeper mockZk = createMockZooKeeper(); |
| doReturn(createMockBookKeeper(executor)) |
| .when(pulsar).getBookKeeperClient(); |
| |
| store = new ZKMetadataStore(mockZk); |
| |
| doReturn(store).when(pulsar).getLocalMetadataStore(); |
| doReturn(store).when(pulsar).getConfigurationMetadataStore(); |
| |
| brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); |
| BrokerInterceptor interceptor = mock(BrokerInterceptor.class); |
| doReturn(interceptor).when(brokerService).getInterceptor(); |
| doReturn(brokerService).when(pulsar).getBrokerService(); |
| doReturn(executor).when(pulsar).getOrderedExecutor(); |
| |
| PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store); |
| namespaceResources = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30); |
| doReturn(namespaceResources).when(pulsarResources).getNamespaceResources(); |
| doReturn(pulsarResources).when(pulsar).getPulsarResources(); |
| |
| namespaceService = mock(NamespaceService.class); |
| doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any()); |
| doReturn(namespaceService).when(pulsar).getNamespaceService(); |
| doReturn(true).when(namespaceService).isServiceUnitOwned(any()); |
| doReturn(true).when(namespaceService).isServiceUnitActive(any()); |
| doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any()); |
| doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); |
| |
| setupMLAsyncCallbackMocks(); |
| |
| clientChannelHelper = new ClientChannelHelper(); |
| } |
| |
| private int inSec(int time, TimeUnit unit) { |
| return (int) TimeUnit.SECONDS.convert(time, unit); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| public void teardown() throws Exception { |
| serverCnx.close(); |
| channel.close(); |
| pulsar.close(); |
| brokerService.close(); |
| executor.shutdownNow(); |
| eventLoopGroup.shutdownGracefully().get(); |
| store.close(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConnectCommand() throws Exception { |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // test server response to CONNECT |
| ByteBuf clientCommand = Commands.newConnect("none", "", null); |
| channel.writeInbound(clientCommand); |
| |
| assertEquals(serverCnx.getState(), State.Connected); |
| assertTrue(getResponse() instanceof CommandConnected); |
| channel.finish(); |
| } |
| |
| private static ByteBuf newConnect(AuthMethod authMethod, String authData, int protocolVersion) { |
| BaseCommand cmd = new BaseCommand().setType(Type.CONNECT); |
| cmd.setConnect() |
| .setClientVersion("Pulsar Client") |
| .setAuthMethod(authMethod) |
| .setAuthData(authData.getBytes(StandardCharsets.UTF_8)) |
| .setProtocolVersion(protocolVersion); |
| return Commands.serializeWithSize(cmd); |
| } |
| |
| /** |
| * Ensure that old clients may still connect to new servers |
| * |
| * @throws Exception |
| */ |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithEnum() throws Exception { |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // test server response to CONNECT |
| ByteBuf clientCommand = newConnect(AuthMethod.AuthMethodNone, "", Commands.getCurrentProtocolVersion()); |
| channel.writeInbound(clientCommand); |
| |
| assertEquals(serverCnx.getState(), State.Connected); |
| assertTrue(getResponse() instanceof CommandConnected); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithProtocolVersion() throws Exception { |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // test server response to CONNECT |
| ByteBuf clientCommand = Commands.newConnect("none", "", null); |
| channel.writeInbound(clientCommand); |
| |
| assertEquals(serverCnx.getState(), State.Connected); |
| CommandConnected response = (CommandConnected) getResponse(); |
| assertEquals(response.getProtocolVersion(), currentProtocolVersion); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testKeepAlive() throws Exception { |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // test server response to CONNECT |
| ByteBuf clientCommand = Commands.newConnect("none", "", null); |
| channel.writeInbound(clientCommand); |
| |
| assertEquals(serverCnx.getState(), State.Connected); |
| CommandConnected response = (CommandConnected) getResponse(); |
| assertEquals(response.getProtocolVersion(), currentProtocolVersion); |
| |
| // Connection will be closed in 2 seconds, in the meantime give chance to run the cleanup logic |
| for (int i = 0; i < 3; i++) { |
| channel.runPendingTasks(); |
| Thread.sleep(1000); |
| } |
| |
| assertFalse(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testKeepAliveNotEnforcedWithOlderClients() throws Exception { |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // test server response to CONNECT |
| ByteBuf clientCommand = Commands.newConnect("none", "", ProtocolVersion.v0.getValue(), null, null, null, null, null); |
| channel.writeInbound(clientCommand); |
| |
| assertEquals(serverCnx.getState(), State.Connected); |
| CommandConnected response = (CommandConnected) getResponse(); |
| // Server is responding with same version as client |
| assertEquals(response.getProtocolVersion(), ProtocolVersion.v0.getValue()); |
| |
| // Connection will *not* be closed in 2 seconds |
| for (int i = 0; i < 3; i++) { |
| channel.runPendingTasks(); |
| Thread.sleep(1000); |
| } |
| assertTrue(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testKeepAliveBeforeHandshake() throws Exception { |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // test server doesn't received a CONNECT command and should close the connection after timeout |
| |
| // Connection will be closed in 2 seconds, in the meantime give chance to run the cleanup logic |
| for (int i = 0; i < 3; i++) { |
| channel.runPendingTasks(); |
| Thread.sleep(1000); |
| } |
| |
| assertFalse(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithAuthenticationPositive() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // test server response to CONNECT |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", null); |
| channel.writeInbound(clientCommand); |
| |
| assertTrue(getResponse() instanceof CommandConnected); |
| assertEquals(serverCnx.getState(), State.Connected); |
| assertEquals(serverCnx.getPrincipal(), "pass.client"); |
| assertTrue(serverCnx.isActive()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithoutOriginalAuthInfoWhenAuthenticateOriginalAuthData() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(true); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ""); |
| channel.writeInbound(clientCommand); |
| |
| Object response1 = getResponse(); |
| assertTrue(response1 instanceof CommandConnected); |
| assertEquals(serverCnx.getState(), State.Connected); |
| assertEquals(serverCnx.getAuthRole(), "pass.client"); |
| assertEquals(serverCnx.getPrincipal(), "pass.client"); |
| assertNull(serverCnx.getOriginalPrincipal()); |
| assertTrue(serverCnx.isActive()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithPassingOriginalAuthData() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(true); |
| svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, |
| null, "client", "pass.client", authMethodName); |
| channel.writeInbound(clientCommand); |
| |
| Object response1 = getResponse(); |
| assertTrue(response1 instanceof CommandConnected); |
| assertEquals(serverCnx.getState(), State.Connected); |
| // Note that this value will change to the client's data if the broker sends an AuthChallenge to the |
| // proxy/client. Details described here https://github.com/apache/pulsar/issues/19332. |
| assertEquals(serverCnx.getAuthRole(), "pass.proxy"); |
| // These are all taken without verifying the auth data |
| assertEquals(serverCnx.getPrincipal(), "pass.client"); |
| assertEquals(serverCnx.getOriginalPrincipal(), "pass.client"); |
| assertTrue(serverCnx.isActive()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithPassingOriginalPrincipal() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(false); |
| svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, |
| null, "client", "pass.client", authMethodName); |
| channel.writeInbound(clientCommand); |
| |
| Object response1 = getResponse(); |
| assertTrue(response1 instanceof CommandConnected); |
| assertEquals(serverCnx.getState(), State.Connected); |
| assertEquals(serverCnx.getAuthRole(), "pass.proxy"); |
| // These are all taken without verifying the auth data |
| assertEquals(serverCnx.getPrincipal(), "client"); |
| assertEquals(serverCnx.getOriginalPrincipal(), "client"); |
| assertTrue(serverCnx.isActive()); |
| channel.finish(); |
| } |
| |
| public void testAuthChallengePrincipalChangeFails() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticationRefreshCheckSeconds(1); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| // Don't want the keep alive task affecting which messages are handled |
| serverCnx.cancelKeepAliveTask(); |
| |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ""); |
| channel.writeInbound(clientCommand); |
| |
| Object responseConnected = getResponse(); |
| assertTrue(responseConnected instanceof CommandConnected); |
| assertEquals(serverCnx.getState(), State.Connected); |
| assertEquals(serverCnx.getPrincipal(), "pass.client"); |
| assertTrue(serverCnx.isActive()); |
| |
| // Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation) |
| // and then force channel to run the task |
| while (channel.outboundMessages().isEmpty()) { |
| Thread.sleep(100); |
| channel.runPendingTasks(); |
| } |
| Object responseAuthChallenge1 = getResponse(); |
| assertTrue(responseAuthChallenge1 instanceof CommandAuthChallenge); |
| |
| // Respond with valid info that will both pass and be the same |
| ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client".getBytes()), 1, ""); |
| channel.writeInbound(authResponse1); |
| |
| // Trigger the ServerCnx to check if authentication is expired again |
| while (channel.outboundMessages().isEmpty()) { |
| Thread.sleep(100); |
| channel.runPendingTasks(); |
| } |
| Object responseAuthChallenge2 = getResponse(); |
| assertTrue(responseAuthChallenge2 instanceof CommandAuthChallenge); |
| |
| // Respond with invalid info that will pass but have a different authRole |
| ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client2".getBytes()), 1, ""); |
| channel.writeInbound(authResponse2); |
| |
| // Expect the connection to disconnect |
| Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAlwaysExpiredAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(true); |
| svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); |
| svcConfig.setAuthenticationRefreshCheckSeconds(1); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| // Don't want the keep alive task affecting which messages are handled |
| serverCnx.cancelKeepAliveTask(); |
| |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, |
| null, "pass.client", "pass.client", authMethodName); |
| channel.writeInbound(clientCommand); |
| |
| Object responseConnected = getResponse(); |
| assertTrue(responseConnected instanceof CommandConnected); |
| assertEquals(serverCnx.getState(), State.Connected); |
| assertEquals(serverCnx.getAuthRole(), "pass.proxy"); |
| // These are all taken without verifying the auth data |
| assertEquals(serverCnx.getPrincipal(), "pass.client"); |
| assertEquals(serverCnx.getOriginalPrincipal(), "pass.client"); |
| assertTrue(serverCnx.isActive()); |
| |
| // Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation) |
| // and then force channel to run the task |
| while (channel.outboundMessages().isEmpty()) { |
| Thread.sleep(100); |
| channel.runPendingTasks(); |
| } |
| Object responseAuthChallenge1 = getResponse(); |
| assertTrue(responseAuthChallenge1 instanceof CommandAuthChallenge); |
| |
| // Respond with valid info that will both pass and be the same |
| ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client".getBytes()), 1, ""); |
| channel.writeInbound(authResponse1); |
| |
| // Trigger the ServerCnx to check if authentication is expired again |
| while (channel.outboundMessages().isEmpty()) { |
| Thread.sleep(100); |
| channel.runPendingTasks(); |
| } |
| Object responseAuthChallenge2 = getResponse(); |
| assertTrue(responseAuthChallenge2 instanceof CommandAuthChallenge); |
| |
| // Respond with invalid info that will pass but have a different authRole |
| ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("pass.client2".getBytes()), 1, ""); |
| channel.writeInbound(authResponse2); |
| |
| // Expect the connection to disconnect |
| Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| // This test is different in branch-2.11 and older because the behavior changes after branch-2.11. |
| // See https://github.com/apache/pulsar/pull/19830 for additional information. |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithDifferentRoleCombinations() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(false); |
| svcConfig.setAuthorizationEnabled(true); |
| svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); |
| |
| // Invalid combinations where authData is proxy role |
| verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "pass.proxy", false); |
| verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", "", false); |
| verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.proxy", null, false); |
| // Only considered valid because there is no requirement for that only a proxy role can pass |
| // an original principal |
| verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.proxy", true); |
| verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client1", "pass.client", true); |
| verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.client", true); |
| verifyAuthRoleAndOriginalPrincipalBehavior(authMethodName, "pass.client", "pass.client1", true); |
| } |
| |
| private void verifyAuthRoleAndOriginalPrincipalBehavior(String authMethodName, String authData, |
| String originalPrincipal, |
| boolean shouldPass) throws Exception { |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, authData, 1,null, |
| null, originalPrincipal, null, null); |
| channel.writeInbound(clientCommand); |
| |
| Object response = getResponse(); |
| if (shouldPass) { |
| assertTrue(response instanceof CommandConnected); |
| assertEquals(serverCnx.getState(), State.Connected); |
| assertTrue(serverCnx.isActive()); |
| } else { |
| assertTrue(response instanceof CommandError); |
| assertEquals(((CommandError) response).getError(), ServerError.AuthorizationError); |
| assertEquals(serverCnx.getState(), State.Failed); |
| } |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithAuthenticationNegative() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| doReturn(authenticationService).when(brokerService).getAuthenticationService(); |
| doReturn(Optional.empty()).when(authenticationService).getAnonymousUserRole(); |
| doReturn(true).when(brokerService).isAuthenticationEnabled(); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // test server response to CONNECT |
| ByteBuf clientCommand = Commands.newConnect("none", "", null); |
| channel.writeInbound(clientCommand); |
| |
| assertEquals(serverCnx.getState(), State.Failed); |
| assertTrue(getResponse() instanceof CommandError); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConnectCommandWithFailingOriginalAuthData() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(true); |
| svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null, |
| null, "client", "fail", authMethodName); |
| channel.writeInbound(clientCommand); |
| |
| Object response1 = getResponse(); |
| assertTrue(response1 instanceof CommandError); |
| assertEquals(((CommandError) response1).getMessage(), "Unable to authenticate"); |
| assertEquals(serverCnx.getState(), State.Failed); |
| assertFalse(serverCnx.isActive()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testAuthResponseWithFailingAuthData() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // Trigger connect command to result in AuthChallenge |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1"); |
| channel.writeInbound(clientCommand); |
| |
| Object challenge1 = getResponse(); |
| assertTrue(challenge1 instanceof CommandAuthChallenge); |
| assertEquals(serverCnx.getState(), State.Connecting); |
| |
| // Trigger another AuthChallenge to verify that code path continues to challenge |
| ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1"); |
| channel.writeInbound(authResponse1); |
| |
| Object challenge2 = getResponse(); |
| assertTrue(challenge2 instanceof CommandAuthChallenge); |
| assertEquals(serverCnx.getState(), State.Connecting); |
| |
| // Trigger failure |
| ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1"); |
| channel.writeInbound(authResponse2); |
| |
| Object response3 = getResponse(); |
| assertTrue(response3 instanceof CommandError); |
| assertEquals(((CommandError) response3).getMessage(), "Do not pass"); |
| assertEquals(serverCnx.getState(), State.Failed); |
| assertFalse(serverCnx.isActive()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testOriginalAuthDataTriggersAuthChallengeFailure() throws Exception { |
| // Test verifies the current behavior in the absence of a solution for |
| // https://github.com/apache/pulsar/issues/19291. When that issue is completed, we can update this test |
| // to correctly verify that behavior. |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(true); |
| svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // Trigger connect command to result in AuthChallenge |
| ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, "1", |
| "localhost", "client", "challenge.client", authMethodName); |
| channel.writeInbound(clientCommand); |
| |
| Object response = getResponse(); |
| assertTrue(response instanceof CommandError); |
| |
| assertEquals(((CommandError) response).getMessage(), "Unable to authenticate"); |
| assertEquals(serverCnx.getState(), State.Failed); |
| assertFalse(serverCnx.isActive()); |
| channel.finish(); |
| } |
| |
| // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking |
| // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary |
| // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the |
| // AuthorizationService. |
| @Test |
| public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(true); |
| svcConfig.setProxyRoles(Collections.singleton("pass.pass")); |
| |
| svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider"); |
| AuthorizationService authorizationService = |
| spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, |
| pulsar.getPulsarResources()); |
| when(brokerService.getAuthorizationService()).thenReturn(authorizationService); |
| svcConfig.setAuthorizationEnabled(true); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // Connect |
| // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider |
| // to pass authentication and fail authorization |
| String proxyRole = "pass.pass"; |
| String clientRole = "pass.fail"; |
| // Submit a failing originalPrincipal to show that it is not used at all. |
| ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", |
| "fail.fail", clientRole, authMethodName); |
| channel.writeInbound(connect); |
| Object connectResponse = getResponse(); |
| assertTrue(connectResponse instanceof CommandConnected); |
| assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole); |
| assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole); |
| assertEquals(serverCnx.getOriginalPrincipal(), clientRole); |
| assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); |
| assertEquals(serverCnx.getAuthRole(), proxyRole); |
| assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole); |
| |
| // Lookup |
| TopicName topicName = TopicName.get("persistent://public/default/test-topic"); |
| ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1); |
| channel.writeInbound(lookup); |
| Object lookupResponse = getResponse(); |
| assertTrue(lookupResponse instanceof CommandLookupTopicResponse); |
| assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1); |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getOriginalAuthData()); |
| |
| // producer |
| ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false); |
| channel.writeInbound(producer); |
| Object producerResponse = getResponse(); |
| assertTrue(producerResponse instanceof CommandError); |
| assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandError) producerResponse).getRequestId(), 2); |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getOriginalAuthData()); |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); |
| |
| // consumer |
| String subscriptionName = "test-subscribe"; |
| ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3, |
| CommandSubscribe.SubType.Shared, 0, "consumer", 0); |
| channel.writeInbound(subscribe); |
| Object subscribeResponse = getResponse(); |
| assertTrue(subscribeResponse instanceof CommandError); |
| assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandError) subscribeResponse).getRequestId(), 3); |
| verify(authorizationService, times(1)).allowTopicOperationAsync( |
| eq(topicName), eq(TopicOperation.CONSUME), |
| eq(clientRole), argThat(arg -> { |
| assertTrue(arg instanceof AuthenticationDataSubscription); |
| assertEquals(arg.getCommandData(), clientRole); |
| assertEquals(arg.getSubscription(), subscriptionName); |
| return true; |
| })); |
| verify(authorizationService, times(1)).allowTopicOperationAsync( |
| eq(topicName), eq(TopicOperation.CONSUME), |
| eq(proxyRole), argThat(arg -> { |
| assertTrue(arg instanceof AuthenticationDataSubscription); |
| assertEquals(arg.getCommandData(), proxyRole); |
| assertEquals(arg.getSubscription(), subscriptionName); |
| return true; |
| })); |
| } |
| |
| // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking |
| // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary |
| // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the |
| // AuthorizationService. |
| public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| svcConfig.setAuthenticateOriginalAuthData(false); |
| svcConfig.setProxyRoles(Collections.singleton("pass.pass")); |
| |
| svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider"); |
| AuthorizationService authorizationService = |
| spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, |
| pulsar.getPulsarResources()); |
| when(brokerService.getAuthorizationService()).thenReturn(authorizationService); |
| svcConfig.setAuthorizationEnabled(true); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // Connect |
| // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider |
| // to pass authentication and fail authorization |
| String proxyRole = "pass.pass"; |
| String clientRole = "pass.fail"; |
| ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", |
| clientRole, null, null); |
| channel.writeInbound(connect); |
| Object connectResponse = getResponse(); |
| assertTrue(connectResponse instanceof CommandConnected); |
| assertNull(serverCnx.getOriginalAuthData()); |
| assertNull(serverCnx.getOriginalAuthState()); |
| assertEquals(serverCnx.getOriginalPrincipal(), clientRole); |
| assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); |
| assertEquals(serverCnx.getAuthRole(), proxyRole); |
| assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole); |
| |
| // Lookup |
| TopicName topicName = TopicName.get("persistent://public/default/test-topic"); |
| ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1); |
| channel.writeInbound(lookup); |
| Object lookupResponse = getResponse(); |
| assertTrue(lookupResponse instanceof CommandLookupTopicResponse); |
| assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1); |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); |
| // This test is an example of https://github.com/apache/pulsar/issues/19332. Essentially, we're passing |
| // the proxy's auth data because it is all we have. This test should be updated when we resolve that issue. |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getAuthData()); |
| |
| // producer |
| ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false); |
| channel.writeInbound(producer); |
| Object producerResponse = getResponse(); |
| assertTrue(producerResponse instanceof CommandError); |
| assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandError) producerResponse).getRequestId(), 2); |
| // See https://github.com/apache/pulsar/issues/19332 for justification of this assertion. |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getAuthData()); |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); |
| |
| // consumer |
| String subscriptionName = "test-subscribe"; |
| ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3, |
| CommandSubscribe.SubType.Shared, 0, "consumer", 0); |
| channel.writeInbound(subscribe); |
| Object subscribeResponse = getResponse(); |
| assertTrue(subscribeResponse instanceof CommandError); |
| assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandError) subscribeResponse).getRequestId(), 3); |
| verify(authorizationService, times(1)).allowTopicOperationAsync( |
| eq(topicName), eq(TopicOperation.CONSUME), |
| eq(clientRole), argThat(arg -> { |
| assertTrue(arg instanceof AuthenticationDataSubscription); |
| // We assert that the role is clientRole and commandData is proxyRole due to |
| // https://github.com/apache/pulsar/issues/19332. |
| assertEquals(arg.getCommandData(), proxyRole); |
| assertEquals(arg.getSubscription(), subscriptionName); |
| return true; |
| })); |
| verify(authorizationService, times(1)).allowTopicOperationAsync( |
| eq(topicName), eq(TopicOperation.CONSUME), |
| eq(proxyRole), argThat(arg -> { |
| assertTrue(arg instanceof AuthenticationDataSubscription); |
| assertEquals(arg.getCommandData(), proxyRole); |
| assertEquals(arg.getSubscription(), subscriptionName); |
| return true; |
| })); |
| } |
| |
| // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking |
| // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary |
| // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the |
| // AuthorizationService. |
| @Test |
| public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exception { |
| AuthenticationService authenticationService = mock(AuthenticationService.class); |
| AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); |
| String authMethodName = authenticationProvider.getAuthMethodName(); |
| when(brokerService.getAuthenticationService()).thenReturn(authenticationService); |
| when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); |
| svcConfig.setAuthenticationEnabled(true); |
| |
| svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider"); |
| AuthorizationService authorizationService = |
| spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, |
| pulsar.getPulsarResources()); |
| when(brokerService.getAuthorizationService()).thenReturn(authorizationService); |
| svcConfig.setAuthorizationEnabled(true); |
| |
| resetChannel(); |
| assertTrue(channel.isActive()); |
| assertEquals(serverCnx.getState(), State.Start); |
| |
| // connect |
| // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider |
| // to pass authentication and fail authorization |
| String clientRole = "pass.fail"; |
| ByteBuf connect = Commands.newConnect(authMethodName, clientRole, "test"); |
| channel.writeInbound(connect); |
| |
| Object connectResponse = getResponse(); |
| assertTrue(connectResponse instanceof CommandConnected); |
| assertNull(serverCnx.getOriginalAuthData()); |
| assertNull(serverCnx.getOriginalAuthState()); |
| assertNull(serverCnx.getOriginalPrincipal()); |
| assertEquals(serverCnx.getAuthData().getCommandData(), clientRole); |
| assertEquals(serverCnx.getAuthRole(), clientRole); |
| assertEquals(serverCnx.getAuthState().getAuthRole(), clientRole); |
| |
| // lookup |
| TopicName topicName = TopicName.get("persistent://public/default/test-topic"); |
| ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1); |
| channel.writeInbound(lookup); |
| Object lookupResponse = getResponse(); |
| assertTrue(lookupResponse instanceof CommandLookupTopicResponse); |
| assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1); |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getAuthData()); |
| |
| // producer |
| ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false); |
| channel.writeInbound(producer); |
| Object producerResponse = getResponse(); |
| assertTrue(producerResponse instanceof CommandError); |
| assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandError) producerResponse).getRequestId(), 2); |
| verify(authorizationService, times(1)) |
| .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getAuthData()); |
| |
| // consumer |
| String subscriptionName = "test-subscribe"; |
| ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3, |
| CommandSubscribe.SubType.Shared, 0, "consumer", 0); |
| channel.writeInbound(subscribe); |
| Object subscribeResponse = getResponse(); |
| assertTrue(subscribeResponse instanceof CommandError); |
| assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError); |
| assertEquals(((CommandError) subscribeResponse).getRequestId(), 3); |
| verify(authorizationService, times(1)).allowTopicOperationAsync( |
| eq(topicName), eq(TopicOperation.CONSUME), |
| eq(clientRole), argThat(arg -> { |
| assertTrue(arg instanceof AuthenticationDataSubscription); |
| assertEquals(arg.getCommandData(), clientRole); |
| assertEquals(arg.getSubscription(), subscriptionName); |
| return true; |
| })); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testProducerCommand() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // test PRODUCER success case |
| ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); |
| |
| assertNotNull(topicRef); |
| assertEquals(topicRef.getProducers().size(), 1); |
| |
| // test PRODUCER error case |
| clientCommand = Commands.newProducer(failTopicName, 2, 2, |
| "prod-name-2", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| assertTrue(getResponse() instanceof CommandError); |
| assertFalse(pulsar.getBrokerService().getTopicReference(failTopicName).isPresent()); |
| |
| channel.finish(); |
| assertEquals(topicRef.getProducers().size(), 0); |
| } |
| |
| @Test(timeOut = 5000) |
| public void testDuplicateConcurrentProducerCommand() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| CompletableFuture<Topic> delayFuture = new CompletableFuture<>(); |
| doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class)); |
| // Create producer first time |
| ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| // Create producer second time |
| clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| Object response = getResponse(); |
| assertTrue(response instanceof CommandError); |
| CommandError error = (CommandError) response; |
| assertEquals(error.getError(), ServerError.ServiceNotReady); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testProducerOnNotOwnedTopic() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Force the case where the broker doesn't own any topic |
| doReturn(CompletableFuture.completedFuture(false)).when(namespaceService) |
| .isServiceUnitActiveAsync(any(TopicName.class)); |
| |
| // test PRODUCER failure case |
| ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| |
| CommandError errorResponse = (CommandError) response; |
| assertEquals(errorResponse.getError(), ServerError.ServiceNotReady); |
| |
| assertFalse(pulsar.getBrokerService().getTopicReference(nonOwnedTopicName).isPresent()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testProducerCommandWithAuthorizationPositive() throws Exception { |
| AuthorizationService authorizationService = mock(AuthorizationService.class); |
| doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(), |
| Mockito.any(), Mockito.any(), Mockito.any()); |
| doReturn(authorizationService).when(brokerService).getAuthorizationService(); |
| doReturn(true).when(brokerService).isAuthenticationEnabled(); |
| resetChannel(); |
| setChannelConnected(); |
| |
| // test PRODUCER success case |
| ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| assertEquals(getResponse().getClass(), CommandProducerSuccess.class); |
| |
| PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); |
| |
| assertNotNull(topicRef); |
| assertEquals(topicRef.getProducers().size(), 1); |
| |
| channel.finish(); |
| assertEquals(topicRef.getProducers().size(), 0); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testNonExistentTopic() throws Exception { |
| AuthorizationService authorizationService = |
| spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); |
| doReturn(authorizationService).when(brokerService).getAuthorizationService(); |
| doReturn(true).when(brokerService).isAuthorizationEnabled(); |
| svcConfig.setAuthorizationEnabled(true); |
| Field providerField = AuthorizationService.class.getDeclaredField("provider"); |
| providerField.setAccessible(true); |
| PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, |
| pulsar.getPulsarResources())); |
| providerField.set(authorizationService, authorizationProvider); |
| doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); |
| |
| // Test producer creation |
| resetChannel(); |
| setChannelConnected(); |
| ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(newProducerCmd); |
| assertTrue(getResponse() instanceof CommandError); |
| channel.finish(); |
| |
| // Test consumer creation |
| resetChannel(); |
| setChannelConnected(); |
| ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0); |
| channel.writeInbound(newSubscribeCmd); |
| assertTrue(getResponse() instanceof CommandError); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testClusterAccess() throws Exception { |
| svcConfig.setAuthorizationEnabled(true); |
| AuthorizationService authorizationService = |
| spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); |
| Field providerField = AuthorizationService.class.getDeclaredField("provider"); |
| providerField.setAccessible(true); |
| PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, |
| pulsar.getPulsarResources())); |
| providerField.set(authorizationService, authorizationProvider); |
| doReturn(authorizationService).when(brokerService).getAuthorizationService(); |
| doReturn(true).when(brokerService).isAuthorizationEnabled(); |
| doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); |
| doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).validateTenantAdminAccess(Mockito.anyString(), Mockito.any(), Mockito.any()); |
| doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class), Mockito.anyString(), |
| any(AuthAction.class)); |
| |
| resetChannel(); |
| setChannelConnected(); |
| ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| resetChannel(); |
| setChannelConnected(); |
| clientCommand = Commands.newProducer(topicWithNonLocalCluster, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandError); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testNonExistentTopicSuperUserAccess() throws Exception { |
| AuthorizationService authorizationService = |
| spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources()); |
| doReturn(authorizationService).when(brokerService).getAuthorizationService(); |
| doReturn(true).when(brokerService).isAuthorizationEnabled(); |
| Field providerField = AuthorizationService.class.getDeclaredField("provider"); |
| providerField.setAccessible(true); |
| PulsarAuthorizationProvider authorizationProvider = spy(new PulsarAuthorizationProvider(svcConfig, pulsar.getPulsarResources())); |
| providerField.set(authorizationService, authorizationProvider); |
| doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any()); |
| |
| // Test producer creation |
| resetChannel(); |
| setChannelConnected(); |
| ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(newProducerCmd); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get(); |
| assertNotNull(topicRef); |
| assertEquals(topicRef.getProducers().size(), 1); |
| channel.finish(); |
| |
| // Test consumer creation |
| resetChannel(); |
| setChannelConnected(); |
| ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(newSubscribeCmd); |
| topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get(); |
| assertNotNull(topicRef); |
| assertTrue(topicRef.getSubscriptions().containsKey(successSubName)); |
| assertTrue(topicRef.getSubscription(successSubName).getDispatcher().isConsumerConnected()); |
| assertTrue(getResponse() instanceof CommandSuccess); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testProducerCommandWithAuthorizationNegative() throws Exception { |
| AuthorizationService authorizationService = mock(AuthorizationService.class); |
| doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(), |
| Mockito.any(), Mockito.any(), Mockito.any()); |
| doReturn(authorizationService).when(brokerService).getAuthorizationService(); |
| doReturn(true).when(brokerService).isAuthenticationEnabled(); |
| doReturn(true).when(brokerService).isAuthorizationEnabled(); |
| doReturn("prod1").when(brokerService).generateUniqueProducerName(); |
| resetChannel(); |
| setChannelConnected(); |
| |
| ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| null, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandError); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSendCommand() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| // test SEND success |
| sendMessage(); |
| |
| assertTrue(getResponse() instanceof CommandSendReceipt); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSendCommandBeforeCreatingProducer() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // test SEND before producer is created |
| sendMessage(); |
| |
| // Then expect channel to close |
| Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSendCommandAfterBrokerClosedProducer() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| setConnectionVersion(ProtocolVersion.v5.getValue()); |
| serverCnx.cancelKeepAliveTask(); |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand1); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| // Call disconnect method on producer to trigger activity similar to unloading |
| Producer producer = serverCnx.getProducers().get(1).get(); |
| assertNotNull(producer); |
| producer.disconnect(); |
| channel.runPendingTasks(); |
| assertTrue(getResponse() instanceof CommandCloseProducer); |
| |
| // Send message and expect no response |
| sendMessage(); |
| |
| // Move clock forward to trigger scheduled clean up task |
| Thread.sleep(1000); |
| channel.runScheduledPendingTasks(); |
| assertTrue(channel.outboundMessages().isEmpty()); |
| assertTrue(channel.isActive()); |
| |
| // Send message and expect closed connection |
| sendMessage(); |
| |
| // Then expect channel to close |
| Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testBrokerClosedProducerClientRecreatesProducerThenSendCommand() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| setConnectionVersion(ProtocolVersion.v5.getValue()); |
| serverCnx.cancelKeepAliveTask(); |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand1); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| // Call disconnect method on producer to trigger activity similar to unloading |
| Producer producer = serverCnx.getProducers().get(1).get(); |
| assertNotNull(producer); |
| producer.disconnect(); |
| channel.runPendingTasks(); |
| assertTrue(getResponse() instanceof CommandCloseProducer); |
| |
| // Send message and expect no response |
| sendMessage(); |
| |
| assertTrue(channel.outboundMessages().isEmpty()); |
| |
| // Move clock forward to trigger scheduled clean up task |
| ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer2); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| // Send message and expect success |
| sendMessage(); |
| |
| assertTrue(getResponse() instanceof CommandSendReceipt); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testClientClosedProducerThenSendsMessageAndGetsClosed() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| setConnectionVersion(ProtocolVersion.v5.getValue()); |
| serverCnx.cancelKeepAliveTask(); |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand1); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| ByteBuf closeProducer = Commands.newCloseProducer(1,2); |
| channel.writeInbound(closeProducer); |
| assertTrue(getResponse() instanceof CommandSuccess); |
| |
| // Send message and get disconnected |
| sendMessage(); |
| Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive()); |
| channel.finish(); |
| } |
| |
| private void sendMessage() { |
| MessageMetadata messageMetadata = new MessageMetadata() |
| .setPublishTime(System.currentTimeMillis()) |
| .setProducerName("prod-name") |
| .setSequenceId(0); |
| ByteBuf data = Unpooled.buffer(1024); |
| |
| ByteBuf clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, |
| ChecksumType.None, messageMetadata, data)); |
| channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); |
| clientCommand.release(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testUseSameProducerName() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand1); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| ByteBuf clientCommand2 = Commands.newProducer(successTopicName, 2 /* producer id */, 2 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand2); |
| assertTrue(getResponse() instanceof CommandError); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testRecreateSameProducer() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Recreating a producer with the same id should succeed |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer1); |
| |
| // Producer create succeeds |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandProducerSuccess.class); |
| assertEquals(((CommandProducerSuccess) response).getRequestId(), 1); |
| |
| ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 2 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer2); |
| |
| // 2nd producer create succeeds as well |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandProducerSuccess.class); |
| assertEquals(((CommandProducerSuccess) response).getRequestId(), 2); |
| |
| // We should not receive response for 1st producer, since it was cancelled by the close |
| assertTrue(channel.outboundMessages().isEmpty()); |
| assertTrue(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSubscribeMultipleTimes() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| Object response; |
| |
| // Sending multiple subscribe commands for the same consumer should succeed |
| |
| ByteBuf subscribe1 = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe1); |
| |
| // 1st subscribe succeeds |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 1); |
| |
| ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 2 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe2); |
| |
| // 2nd subscribe succeeds |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 2); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testDuplicateConcurrentSubscribeCommand() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| CompletableFuture<Topic> delayFuture = new CompletableFuture<>(); |
| doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class)); |
| // Create subscriber first time |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand); |
| |
| // Create producer second time |
| clientCommand = Commands.newSubscribe(successTopicName, // |
| successSubName, 2 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand); |
| |
| Awaitility.await().untilAsserted(() -> { |
| Object response = getResponse(); |
| assertTrue(response instanceof CommandError, "Response is not CommandError but " + response); |
| CommandError error = (CommandError) response; |
| assertEquals(error.getError(), ServerError.ConsumerBusy); |
| }); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testCreateProducerTimeout() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Delay the topic creation in a deterministic way |
| CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>(); |
| doAnswer(invocationOnMock -> { |
| openTopicFuture.complete(() -> { |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); |
| }); |
| return null; |
| }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| // In a create producer timeout from client side we expect to see this sequence of commands : |
| // 1. create producer |
| // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker |
| // 3. create producer (triggered by reconnection logic) |
| |
| // These operations need to be serialized, to allow the last create producer to finally succeed |
| // (There can be more create/close pairs in the sequence, depending on the client timeout |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer1); |
| |
| ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ ); |
| channel.writeInbound(closeProducer); |
| |
| ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer2); |
| |
| // Complete the topic opening: It will make 2nd producer creation successful |
| openTopicFuture.get().run(); |
| |
| // Close succeeds |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 2); |
| |
| // 2nd producer will be successfully created as topic is open by then |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandProducerSuccess.class); |
| assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); |
| |
| assertTrue(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testCreateProducerTimeoutThenCreateSameNamedProducerShouldFail() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Delay the topic creation in a deterministic way |
| CompletableFuture<Runnable> openTopicFuture = new CompletableFuture<>(); |
| doAnswer(invocationOnMock -> { |
| openTopicFuture.complete(() -> { |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); |
| }); |
| return null; |
| }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| // In a create producer timeout from client side we expect to see this sequence of commands : |
| // 1. create producer |
| // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker |
| // 3. create producer (triggered by reconnection logic) |
| // Then, when another producer is created with the same name, it should fail. Because we only have one |
| // channel here, we just use a different producer id |
| |
| // These operations need to be serialized, to allow the last create producer to finally succeed |
| // (There can be more create/close pairs in the sequence, depending on the client timeout |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer1); |
| |
| ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ ); |
| channel.writeInbound(closeProducer); |
| |
| ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer2); |
| |
| // Complete the topic opening: It will make 2nd producer creation successful |
| openTopicFuture.get().run(); |
| |
| // Close succeeds |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 2); |
| |
| // 2nd producer will be successfully created as topic is open by then |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandProducerSuccess.class); |
| assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); |
| |
| // Send create command after getting the CommandProducerSuccess to ensure correct ordering |
| ByteBuf createProducer3 = Commands.newProducer(successTopicName, 2 /* producer id */, 4 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer3); |
| |
| // 3nd producer will fail |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| assertEquals(((CommandError) response).getRequestId(), 4); |
| |
| assertTrue(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000, enabled = false) |
| public void testCreateProducerMultipleTimeouts() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Delay the topic creation in a deterministic way |
| CountDownLatch topicCreationDelayLatch = new CountDownLatch(1); |
| doAnswer(new Answer<Object>() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) throws Throwable { |
| topicCreationDelayLatch.await(); |
| |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); |
| return null; |
| } |
| }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| // In a create producer timeout from client side we expect to see this sequence of commands : |
| // 1. create producer |
| // 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker |
| // 3. create producer (triggered by reconnection logic) |
| |
| // These operations need to be serialized, to allow the last create producer to finally succeed |
| // (There can be more create/close pairs in the sequence, depending on the client timeout |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer1); |
| |
| ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ ); |
| channel.writeInbound(closeProducer1); |
| |
| ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer2); |
| |
| ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* producer id */, 4 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer3); |
| |
| ByteBuf createProducer4 = Commands.newProducer(successTopicName, 1 /* producer id */, 5 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer4); |
| |
| // Close succeeds |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 2); |
| |
| // Now allow topic creation to complete |
| topicCreationDelayLatch.countDown(); |
| |
| // 1st producer it's not acked |
| |
| // 2nd producer fails |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| assertEquals(((CommandError) response).getRequestId(), 3); |
| |
| // 3rd producer fails |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| assertEquals(((CommandError) response).getRequestId(), 4); |
| |
| // 4nd producer fails |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| assertEquals(((CommandError) response).getRequestId(), 5); |
| |
| Thread.sleep(100); |
| |
| // We should not receive response for 1st producer, since it was cancelled by the close |
| assertTrue(channel.outboundMessages().isEmpty()); |
| assertTrue(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000, skipFailedInvocations = true) |
| public void testCreateProducerBookieTimeout() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Delay the topic creation in a deterministic way |
| CompletableFuture<Runnable> openFailedTopic = new CompletableFuture<>(); |
| doAnswer(invocationOnMock -> { |
| openFailedTopic.complete(() -> { |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); |
| }); |
| return null; |
| }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| // In a create producer timeout from client side we expect to see this sequence of commands : |
| // 1. create a failure producer which will timeout creation after 100msec |
| // 2. close producer |
| // 3. Recreate producer (triggered by reconnection logic) |
| // 4. Wait till the timeout of 1, and create producer again. |
| |
| // These operations need to be serialized, to allow the last create producer to finally succeed |
| // (There can be more create/close pairs in the sequence, depending on the client timeout |
| |
| String producerName = "my-producer"; |
| |
| ByteBuf createProducer1 = Commands.newProducer(failTopicName, 1 /* producer id */, 1 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer1); |
| |
| ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 2 /* request id */ ); |
| channel.writeInbound(closeProducer); |
| |
| ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer2); |
| |
| // Now the topic gets opened.. It will make 2nd producer creation successful |
| openFailedTopic.get().run(); |
| |
| // Close succeeds |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 2); |
| |
| // 2nd producer success as topic is opened |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandProducerSuccess.class); |
| assertEquals(((CommandProducerSuccess) response).getRequestId(), 3); |
| |
| // Wait till the failtopic timeout interval |
| Thread.sleep(500); |
| ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* producer id */, 4 /* request id */, |
| producerName, Collections.emptyMap(), false); |
| channel.writeInbound(createProducer3); |
| |
| // 3rd producer succeeds because 2nd is already connected |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandProducerSuccess.class); |
| assertEquals(((CommandProducerSuccess) response).getRequestId(), 4); |
| |
| Thread.sleep(500); |
| |
| // We should not receive response for 1st producer, since it was cancelled by the close |
| assertTrue(channel.outboundMessages().isEmpty()); |
| assertTrue(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSubscribeTimeout() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Delay the topic creation in a deterministic way |
| CompletableFuture<Runnable> openTopicTask = new CompletableFuture<>(); |
| doAnswer(invocationOnMock -> { |
| openTopicTask.complete(() -> { |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); |
| }); |
| |
| return null; |
| }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| // In a subscribe timeout from client side we expect to see this sequence of commands : |
| // 1. Subscribe |
| // 2. close consumer (when the timeout is triggered, which may be before the consumer was created on the broker) |
| // 3. Subscribe (triggered by reconnection logic) |
| |
| // These operations need to be serialized, to allow the last subscribe operation to finally succeed |
| // (There can be more subscribe/close pairs in the sequence, depending on the client timeout |
| |
| ByteBuf subscribe1 = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe1); |
| |
| ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe2); |
| |
| ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe3); |
| |
| ByteBuf subscribe4 = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 5 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe4); |
| |
| openTopicTask.get().run(); |
| |
| Object response; |
| |
| synchronized (this) { |
| |
| // All other subscribe should fail |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| assertEquals(((CommandError) response).getRequestId(), 3); |
| |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| assertEquals(((CommandError) response).getRequestId(), 4); |
| |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| assertEquals(((CommandError) response).getRequestId(), 5); |
| |
| // We should receive response for 1st producer, since it was not cancelled by the close |
| Awaitility.await().untilAsserted(() -> assertFalse(channel.outboundMessages().isEmpty())); |
| |
| assertTrue(channel.isActive()); |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 1); |
| } |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSubscribeBookieTimeout() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Delay the topic creation in a deterministic way |
| CompletableFuture<Runnable> openTopicSuccess = new CompletableFuture<>(); |
| doAnswer(invocationOnMock -> { |
| openTopicSuccess.complete(() -> { |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); |
| }); |
| return null; |
| }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| CompletableFuture<Runnable> openTopicFail = new CompletableFuture<>(); |
| doAnswer(invocationOnMock -> { |
| openTopicFail.complete(() -> { |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]) |
| .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null); |
| }); |
| return null; |
| }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| // In a subscribe timeout from client side we expect to see this sequence of commands : |
| // 1. Subscribe against failtopic which will fail after 100msec |
| // 2. close consumer |
| // 3. Resubscribe (triggered by reconnection logic) |
| // 4. Wait till the timeout of 1, and subscribe again. |
| |
| // These operations need to be serialized, to allow the last subscribe operation to finally succeed |
| // (There can be more subscribe/close pairs in the sequence, depending on the client timeout |
| ByteBuf subscribe1 = Commands.newSubscribe(failTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe1); |
| |
| ByteBuf closeConsumer = Commands.newCloseConsumer(1 /* consumer id */, 2 /* request id */ ); |
| channel.writeInbound(closeConsumer); |
| |
| ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe2); |
| |
| openTopicFail.get().run(); |
| |
| Object response; |
| |
| // Close succeeds |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 2); |
| |
| // Subscribe fails |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| assertEquals(((CommandError) response).getRequestId(), 3); |
| |
| Awaitility.await().until(() -> !serverCnx.hasConsumer(1)); |
| |
| ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(subscribe3); |
| |
| openTopicSuccess.get().run(); |
| |
| // Subscribe succeeds |
| response = getResponse(); |
| assertEquals(response.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response).getRequestId(), 4); |
| |
| Thread.sleep(100); |
| |
| // We should not receive response for 1st producer, since it was cancelled by the close |
| assertTrue(channel.outboundMessages().isEmpty()); |
| assertTrue(channel.isActive()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSubscribeCommand() throws Exception { |
| final String failSubName = "failSub"; |
| |
| resetChannel(); |
| setChannelConnected(); |
| doReturn(false).when(brokerService).isAuthenticationEnabled(); |
| doReturn(false).when(brokerService).isAuthorizationEnabled(); |
| // test SUBSCRIBE on topic and cursor creation success |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandSuccess); |
| |
| PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); |
| |
| assertNotNull(topicRef); |
| assertTrue(topicRef.getSubscriptions().containsKey(successSubName)); |
| assertTrue(topicRef.getSubscription(successSubName).getDispatcher().isConsumerConnected()); |
| |
| // test SUBSCRIBE on topic creation success and cursor failure |
| clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive, |
| 0, "test", 0 /*avoid reseting cursor*/); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandError); |
| |
| // test SUBSCRIBE on topic creation failure |
| clientCommand = Commands.newSubscribe(failTopicName, successSubName, 3, 3, SubType.Exclusive, |
| 0, "test", 0 /*avoid reseting cursor*/); |
| channel.writeInbound(clientCommand); |
| assertEquals(getResponse().getClass(), CommandError.class); |
| |
| // Server will not close the connection |
| assertTrue(channel.isOpen()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testUnsupportedBatchMsgSubscribeCommand() throws Exception { |
| final String failSubName = "failSub"; |
| |
| resetChannel(); |
| setChannelConnected(); |
| setConnectionVersion(ProtocolVersion.v3.getValue()); |
| doReturn(false).when(brokerService).isAuthenticationEnabled(); |
| doReturn(false).when(brokerService).isAuthorizationEnabled(); |
| // test SUBSCRIBE on topic and cursor creation success |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0 /* priority */, |
| "test" /* consumer name */, 0 /*avoid reseting cursor*/); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandSuccess); |
| |
| PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); |
| topicRef.markBatchMessagePublished(); |
| // test SUBSCRIBE on topic and cursor creation success |
| clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive, 0 /* priority */, |
| "test" /* consumer name */, 0 /*avoid reseting cursor*/); |
| channel.writeInbound(clientCommand); |
| Object response = getResponse(); |
| assertTrue(response instanceof CommandError); |
| assertEquals(ServerError.UnsupportedVersionError, ((CommandError) response).getError()); |
| |
| // Server will not close the connection |
| assertTrue(channel.isOpen()); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSubscribeCommandWithAuthorizationPositive() throws Exception { |
| AuthorizationService authorizationService = mock(AuthorizationService.class); |
| doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(), |
| Mockito.any(), Mockito.any(), Mockito.any()); |
| doReturn(authorizationService).when(brokerService).getAuthorizationService(); |
| doReturn(true).when(brokerService).isAuthenticationEnabled(); |
| doReturn(true).when(brokerService).isAuthorizationEnabled(); |
| resetChannel(); |
| setChannelConnected(); |
| |
| // test SUBSCRIBE on topic and cursor creation success |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand); |
| |
| assertTrue(getResponse() instanceof CommandSuccess); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSubscribeCommandWithAuthorizationNegative() throws Exception { |
| AuthorizationService authorizationService = mock(AuthorizationService.class); |
| doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(), |
| Mockito.any(), Mockito.any(), Mockito.any()); |
| doReturn(authorizationService).when(brokerService).getAuthorizationService(); |
| doReturn(true).when(brokerService).isAuthenticationEnabled(); |
| doReturn(true).when(brokerService).isAuthorizationEnabled(); |
| |
| resetChannel(); |
| setChannelConnected(); |
| |
| // test SUBSCRIBE on topic and cursor creation success |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0 /*avoid reseting cursor*/); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandError); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testAckCommand() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, successSubName, 1 /* consumer id */, |
| 1 /* |
| * request id |
| */, SubType.Exclusive, 0, "test" /* consumer name */, 0 /*avoid reseting cursor*/); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandSuccess); |
| |
| PositionImpl pos = new PositionImpl(0, 0); |
| |
| clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual, |
| null, Collections.emptyMap(), -1); |
| channel.writeInbound(clientCommand); |
| |
| // verify nothing is sent out on the wire after ack |
| assertNull(channel.outboundMessages().peek()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testFlowCommand() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, successSubName, // |
| 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, |
| 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandSuccess); |
| |
| clientCommand = Commands.newFlow(1 /* consumer id */, 1 /* message permits */); |
| channel.writeInbound(clientCommand); |
| |
| // cursor is mocked |
| // verify nothing is sent out on the wire after ack |
| assertNull(channel.outboundMessages().peek()); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Set encryption_required to true |
| Policies policies = mock(Policies.class); |
| policies.encryption_required = true; |
| policies.topicDispatchRate = Maps.newHashMap(); |
| // add `clusterDispatchRate` otherwise there will be a NPE |
| // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate` |
| policies.clusterDispatchRate = Maps.newHashMap(); |
| doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources) |
| .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject()); |
| |
| // test success case: encrypted producer can connect |
| ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */, |
| "encrypted-producer", true, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandProducerSuccess.class); |
| PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get(); |
| assertNotNull(topicRef); |
| assertEquals(topicRef.getProducers().size(), 1); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testProducerFailureOnEncryptionRequiredTopic() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Set encryption_required to true |
| Policies policies = mock(Policies.class); |
| policies.encryption_required = true; |
| policies.topicDispatchRate = Maps.newHashMap(); |
| // add `clusterDispatchRate` otherwise there will be a NPE |
| // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate` |
| policies.clusterDispatchRate = Maps.newHashMap(); |
| doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources) |
| .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject()); |
| |
| // test failure case: unencrypted producer cannot connect |
| ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* request id */, |
| "unencrypted-producer", false, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| CommandError errorResponse = (CommandError) response; |
| assertEquals(errorResponse.getError(), ServerError.MetadataError); |
| PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get(); |
| assertNotNull(topicRef); |
| assertEquals(topicRef.getProducers().size(), 0); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testProducerFailureOnEncryptionRequiredOnBroker() throws Exception { |
| // (a) Set encryption-required at broker level |
| pulsar.getConfig().setEncryptionRequireOnProducer(true); |
| resetChannel(); |
| setChannelConnected(); |
| |
| // (b) Set encryption_required to false on policy |
| Policies policies = mock(Policies.class); |
| // Namespace policy doesn't require encryption |
| policies.encryption_required = false; |
| policies.topicDispatchRate = Maps.newHashMap(); |
| // add `clusterDispatchRate` otherwise there will be a NPE |
| policies.clusterDispatchRate = Maps.newHashMap(); |
| doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources) |
| .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject()); |
| |
| // test failure case: unencrypted producer cannot connect |
| ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* request id */, |
| "unencrypted-producer", false, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| Object response = getResponse(); |
| assertEquals(response.getClass(), CommandError.class); |
| CommandError errorResponse = (CommandError) response; |
| assertEquals(errorResponse.getError(), ServerError.MetadataError); |
| PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get(); |
| assertNotNull(topicRef); |
| assertEquals(topicRef.getProducers().size(), 0); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSendSuccessOnEncryptionRequiredTopic() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Set encryption_required to true |
| Policies policies = mock(Policies.class); |
| policies.encryption_required = true; |
| policies.topicDispatchRate = Maps.newHashMap(); |
| // add `clusterDispatchRate` otherwise there will be a NPE |
| // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate` |
| policies.clusterDispatchRate = Maps.newHashMap(); |
| doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources) |
| .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject()); |
| |
| ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", true, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| // test success case: encrypted messages can be published |
| MessageMetadata messageMetadata = new MessageMetadata() |
| .setPublishTime(System.currentTimeMillis()) |
| .setProducerName("prod-name") |
| .setSequenceId(0); |
| messageMetadata.addEncryptionKey() |
| .setKey("testKey") |
| .setValue("testVal".getBytes()); |
| ByteBuf data = Unpooled.buffer(1024); |
| |
| clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data)); |
| channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); |
| clientCommand.release(); |
| assertTrue(getResponse() instanceof CommandSendReceipt); |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSendFailureOnEncryptionRequiredTopic() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // Set encryption_required to true |
| Policies policies = mock(Policies.class); |
| policies.encryption_required = true; |
| policies.topicDispatchRate = Maps.newHashMap(); |
| // add `clusterDispatchRate` otherwise there will be a NPE |
| // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate` |
| policies.clusterDispatchRate = Maps.newHashMap(); |
| doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources) |
| .getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject()); |
| |
| ByteBuf clientCommand = Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", true, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| assertTrue(getResponse() instanceof CommandProducerSuccess); |
| |
| // test failure case: unencrypted messages cannot be published |
| MessageMetadata messageMetadata = new MessageMetadata() |
| .setPublishTime(System.currentTimeMillis()) |
| .setProducerName("prod-name") |
| .setSequenceId(0); |
| ByteBuf data = Unpooled.buffer(1024); |
| |
| clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data)); |
| channel.writeInbound(Unpooled.copiedBuffer(clientCommand)); |
| clientCommand.release(); |
| assertTrue(getResponse() instanceof CommandSendError); |
| channel.finish(); |
| } |
| |
| protected void resetChannel() throws Exception { |
| int MaxMessageSize = 5 * 1024 * 1024; |
| if (channel != null && channel.isActive()) { |
| serverCnx.close(); |
| channel.close().get(); |
| } |
| serverCnx = new ServerCnx(pulsar); |
| serverCnx.setAuthRole(""); |
| channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder( |
| MaxMessageSize, |
| 0, |
| 4, |
| 0, |
| 4), |
| (ChannelHandler) serverCnx); |
| } |
| |
| protected void setChannelConnected() throws Exception { |
| Field channelState = ServerCnx.class.getDeclaredField("state"); |
| channelState.setAccessible(true); |
| channelState.set(serverCnx, State.Connected); |
| } |
| |
| private void setConnectionVersion(int version) throws Exception { |
| PulsarHandler cnx = serverCnx; |
| Field versionField = PulsarHandler.class.getDeclaredField("remoteEndpointProtocolVersion"); |
| versionField.setAccessible(true); |
| versionField.set(cnx, version); |
| } |
| |
| protected Object getResponse() throws Exception { |
| // Wait at most for 10s to get a response |
| final long sleepTimeMs = 10; |
| final long iterations = TimeUnit.SECONDS.toMillis(10) / sleepTimeMs; |
| for (int i = 0; i < iterations; i++) { |
| if (!channel.outboundMessages().isEmpty()) { |
| Object outObject = channel.outboundMessages().remove(); |
| return clientChannelHelper.getCommand(outObject); |
| } else { |
| Thread.sleep(sleepTimeMs); |
| } |
| } |
| |
| throw new IOException("Failed to get response from socket within 10s"); |
| } |
| |
| private void setupMLAsyncCallbackMocks() { |
| doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors(); |
| |
| // call openLedgerComplete with ledgerMock on ML factory asyncOpen |
| doAnswer(new Answer<Object>() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) throws Throwable { |
| Thread.sleep(300); |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null); |
| return null; |
| } |
| }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| // call openLedgerFailed on ML factory asyncOpen |
| doAnswer(new Answer<Object>() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) throws Throwable { |
| Thread.sleep(300); |
| new Thread(() -> { |
| ((OpenLedgerCallback) invocationOnMock.getArguments()[2]) |
| .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null); |
| }).start(); |
| |
| return null; |
| } |
| }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), |
| any(OpenLedgerCallback.class), any(Supplier.class), any()); |
| |
| // call addComplete on ledger asyncAddEntry |
| doAnswer(new Answer<Object>() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) throws Throwable { |
| ((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete( |
| new PositionImpl(-1, -1), |
| null, |
| invocationOnMock.getArguments()[2]); |
| return null; |
| } |
| }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any()); |
| |
| doAnswer((Answer<Object>) invocationOnMock -> true).when(cursorMock).isDurable(); |
| |
| doAnswer((Answer<Object>) invocationOnMock -> { |
| Thread.sleep(300); |
| ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null); |
| return null; |
| }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any()); |
| |
| doAnswer((Answer<Object>) invocationOnMock -> { |
| Thread.sleep(300); |
| ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); |
| return null; |
| }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), |
| any(OpenCursorCallback.class), any()); |
| |
| doAnswer((Answer<Object>) invocationOnMock -> { |
| Thread.sleep(300); |
| ((OpenCursorCallback) invocationOnMock.getArguments()[2]) |
| .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null); |
| return null; |
| }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any()); |
| |
| doAnswer((Answer<Object>) invocationOnMock -> { |
| Thread.sleep(300); |
| ((OpenCursorCallback) invocationOnMock.getArguments()[3]) |
| .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null); |
| return null; |
| }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), |
| any(OpenCursorCallback.class), any()); |
| |
| doAnswer((Answer<Object>) invocationOnMock -> { |
| ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null); |
| return null; |
| }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any()); |
| |
| doAnswer((Answer<Object>) invocationOnMock -> { |
| ((DeleteCursorCallback) invocationOnMock.getArguments()[1]) |
| .deleteCursorFailed(new ManagedLedgerException("Managed ledger failure"), null); |
| return null; |
| }).when(ledgerMock).asyncDeleteCursor(matches(".*fail.*"), any(DeleteCursorCallback.class), any()); |
| |
| doAnswer((Answer<Object>) invocationOnMock -> { |
| ((CloseCallback) invocationOnMock.getArguments()[0]).closeComplete(null); |
| return null; |
| }).when(cursorMock).asyncClose(any(CloseCallback.class), any()); |
| |
| doReturn(successSubName).when(cursorMock).getName(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testInvalidTopicOnLookup() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| String invalidTopicName = "xx/ass/aa/aaa"; |
| |
| resetChannel(); |
| setChannelConnected(); |
| |
| |
| channel.writeInbound(Commands.newLookup(invalidTopicName, true, 1)); |
| Object obj = getResponse(); |
| assertEquals(obj.getClass(), CommandLookupTopicResponse.class); |
| CommandLookupTopicResponse res = (CommandLookupTopicResponse) obj; |
| assertEquals(res.getError(), ServerError.InvalidTopicName); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testInvalidTopicOnProducer() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| String invalidTopicName = "xx/ass/aa/aaa"; |
| |
| resetChannel(); |
| setChannelConnected(); |
| |
| ByteBuf clientCommand = Commands.newProducer(invalidTopicName, 1 /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| Object obj = getResponse(); |
| assertEquals(obj.getClass(), CommandError.class); |
| CommandError res = (CommandError) obj; |
| assertEquals(res.getError(), ServerError.InvalidTopicName); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testInvalidTopicOnSubscribe() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| String invalidTopicName = "xx/ass/aa/aaa"; |
| |
| resetChannel(); |
| setChannelConnected(); |
| |
| channel.writeInbound(Commands.newSubscribe(invalidTopicName, "test-subscription", 1, 1, SubType.Exclusive, 0, |
| "consumerName", 0 /*avoid reseting cursor*/)); |
| Object obj = getResponse(); |
| assertEquals(obj.getClass(), CommandError.class); |
| CommandError res = (CommandError) obj; |
| assertEquals(res.getError(), ServerError.InvalidTopicName); |
| |
| channel.finish(); |
| } |
| |
| @Test |
| public void testDelayedClosedProducer() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| CompletableFuture<Topic> delayFuture = new CompletableFuture<>(); |
| doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class)); |
| // Create producer first time |
| int producerId = 1; |
| ByteBuf clientCommand = Commands.newProducer(successTopicName, producerId /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| ByteBuf closeProducerCmd = Commands.newCloseProducer(producerId, 2); |
| channel.writeInbound(closeProducerCmd); |
| |
| Topic topic = mock(Topic.class); |
| doReturn(CompletableFuture.completedFuture(topic)).when(brokerService).getOrCreateTopic(any(String.class)); |
| doReturn(CompletableFuture.completedFuture(false)).when(topic).hasSchema(); |
| |
| clientCommand = Commands.newProducer(successTopicName, producerId /* producer id */, 1 /* request id */, |
| "prod-name", Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand); |
| |
| Object response = getResponse(); |
| assertTrue(response instanceof CommandSuccess); |
| |
| channel.finish(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testTopicIsNotReady() throws Exception { |
| resetChannel(); |
| setChannelConnected(); |
| |
| // 1st subscribe command to load the topic |
| ByteBuf clientCommand1 = Commands.newSubscribe(successTopicName, successSubName, 1 /* consumer id */, |
| 1 /* request id */, SubType.Shared, 0 /* priority level */, "c1" /* consumer name */, |
| 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand1); |
| |
| Object response1 = getResponse(); |
| assertEquals(response1.getClass(), CommandSuccess.class); |
| assertEquals(((CommandSuccess) response1).getRequestId(), 1); |
| |
| // Force the checkTopicNsOwnership method to throw ServiceUnitNotReadyException |
| doReturn(FutureUtil.failedFuture(new ServiceUnitNotReadyException("Service unit is not ready"))) |
| .when(brokerService).checkTopicNsOwnership(anyString()); |
| |
| // 2nd subscribe command when the service unit is not ready |
| ByteBuf clientCommand2 = Commands.newSubscribe(successTopicName, successSubName, 2 /* consumer id */, |
| 2 /* request id */, SubType.Shared, 0 /* priority level */, "c2" /* consumer name */, |
| 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand2); |
| |
| Object response2 = getResponse(); |
| assertEquals(response2.getClass(), CommandError.class); |
| assertEquals(((CommandError) response2).getError(), ServerError.ServiceNotReady); |
| assertEquals(((CommandError) response2).getRequestId(), 2); |
| |
| // Producer command when the service unit is not ready |
| ByteBuf clientCommand3 = Commands.newProducer(successTopicName, 1 /* producer id */, 3 /* request id */, |
| "p1" /* producer name */, Collections.emptyMap(), false); |
| channel.writeInbound(clientCommand3); |
| |
| Object response3 = getResponse(); |
| assertEquals(response3.getClass(), CommandError.class); |
| assertEquals(((CommandError) response3).getError(), ServerError.ServiceNotReady); |
| assertEquals(((CommandError) response3).getRequestId(), 3); |
| |
| channel.finish(); |
| } |
| |
| @Test(enabled = false) |
| public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{ |
| // Mock ServerCnx.field: consumers |
| ConcurrentLongHashMap.Builder mapBuilder = Mockito.mock(ConcurrentLongHashMap.Builder.class); |
| Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder); |
| Mockito.when(mapBuilder.concurrencyLevel(Mockito.anyInt())).thenReturn(mapBuilder); |
| ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class); |
| Mockito.when(mapBuilder.build()).thenReturn(consumers); |
| ArgumentCaptor<Long> ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class); |
| final ArgumentCaptor<CompletableFuture> deleteTimesMark = ArgumentCaptor.forClass(CompletableFuture.class); |
| Mockito.when(consumers.remove(ignoreArgumentCaptor.capture())).thenReturn(true); |
| Mockito.when(consumers.remove(ignoreArgumentCaptor.capture(), deleteTimesMark.capture())).thenReturn(true); |
| // case1: exists existingConsumerFuture, already complete or delay done after execute 'isDone()' many times |
| // case2: exists existingConsumerFuture, delay complete after execute 'isDone()' many times |
| // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051 |
| // Try a delay of 3 stages. The simulation is successful after repeated judgments. |
| for(AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1); |
| futureWillDoneAfterDelayTimes.intValue() <= 3; |
| futureWillDoneAfterDelayTimes.incrementAndGet()){ |
| final AtomicInteger futureCallTimes = new AtomicInteger(); |
| final Consumer mockConsumer = Mockito.mock(Consumer.class); |
| CompletableFuture existingConsumerFuture = new CompletableFuture<Consumer>(){ |
| |
| private boolean complete; |
| |
| // delay complete after execute 'isDone()' many times |
| @Override |
| public boolean isDone() { |
| if (complete) { |
| return true; |
| } |
| int executeIsDoneCommandTimes = futureCallTimes.incrementAndGet(); |
| return executeIsDoneCommandTimes >= futureWillDoneAfterDelayTimes.intValue(); |
| } |
| |
| // if trig "getNow()", then complete |
| @Override |
| public Consumer get(){ |
| complete = true; |
| return mockConsumer; |
| } |
| |
| // if trig "get()", then complete |
| @Override |
| public Consumer get(long timeout, TimeUnit unit){ |
| complete = true; |
| return mockConsumer; |
| } |
| |
| // if trig "get()", then complete |
| @Override |
| public Consumer getNow(Consumer ifAbsent){ |
| complete = true; |
| return mockConsumer; |
| } |
| |
| // never fail |
| public boolean isCompletedExceptionally(){ |
| return false; |
| } |
| }; |
| Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); |
| // do test: delay complete after execute 'isDone()' many times |
| // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051 |
| try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) { |
| // Inject consumers to ServerCnx |
| theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder); |
| // reset channels( serverChannel, clientChannel ) |
| resetChannel(); |
| setChannelConnected(); |
| // auth check disable |
| doReturn(false).when(brokerService).isAuthenticationEnabled(); |
| doReturn(false).when(brokerService).isAuthorizationEnabled(); |
| // do subscribe |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand); |
| Object responseObj = getResponse(); |
| Predicate<Object> responseAssert = obj -> { |
| if (responseObj instanceof CommandSuccess) { |
| return true; |
| } |
| if (responseObj instanceof CommandError) { |
| CommandError commandError = (CommandError) responseObj; |
| return ServerError.ServiceNotReady == commandError.getError(); |
| } |
| return false; |
| }; |
| // assert no consumer-delete event occur |
| assertFalse(deleteTimesMark.getAllValues().contains(existingConsumerFuture)); |
| // assert without another error occur |
| assertTrue(responseAssert.test(responseAssert)); |
| // Server will not close the connection |
| assertTrue(channel.isOpen()); |
| channel.finish(); |
| } |
| } |
| // case3: exists existingConsumerFuture, already complete and exception |
| CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class); |
| Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture); |
| // make consumerFuture delay finish |
| Mockito.when(existingConsumerFuture.isDone()).thenReturn(true); |
| // when sync get return, future will return success value. |
| Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException()); |
| Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())). |
| thenThrow(new NullPointerException()); |
| Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true); |
| Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException()); |
| try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) { |
| // Inject consumers to ServerCnx |
| theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder); |
| // reset channels( serverChannel, clientChannel ) |
| resetChannel(); |
| setChannelConnected(); |
| // auth check disable |
| doReturn(false).when(brokerService).isAuthenticationEnabled(); |
| doReturn(false).when(brokerService).isAuthorizationEnabled(); |
| // do subscribe |
| ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // |
| successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, |
| "test" /* consumer name */, 0 /* avoid reseting cursor */); |
| channel.writeInbound(clientCommand); |
| Object responseObj = getResponse(); |
| Predicate<Object> responseAssert = obj -> { |
| if (responseObj instanceof CommandError) { |
| CommandError commandError = (CommandError) responseObj; |
| return ServerError.ServiceNotReady != commandError.getError(); |
| } |
| return false; |
| }; |
| // assert error response |
| assertTrue(responseAssert.test(responseAssert)); |
| // assert consumer-delete event occur |
| assertEquals(1L, |
| deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count()); |
| // Server will not close the connection |
| assertTrue(channel.isOpen()); |
| channel.finish(); |
| } |
| } |
| } |