blob: aada20665564e9a038cae391fbddd4553da6e548 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static java.util.Collections.emptyMap;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.common.api.proto.CommandAck.AckType.Cumulative;
import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover;
import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared;
import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.Optional;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class MessageCumulativeAckTest {
private final int consumerId = 1;
private BrokerService brokerService;
private ServerCnx serverCnx;
private MetadataStore store;
protected PulsarService pulsar;
private OrderedExecutor executor;
private EventLoopGroup eventLoopGroup;
private PersistentSubscription sub;
@BeforeMethod
public void setup() throws Exception {
executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setClusterName("pulsar-cluster");
pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
doReturn(svcConfig).when(pulsar).getConfiguration();
ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
doReturn(TransactionTestBase.createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();
store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
doReturn(store).when(pulsar).getLocalMetadataStore();
doReturn(store).when(pulsar).getConfigurationMetadataStore();
PulsarResources pulsarResources = new PulsarResources(store, store);
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(pulsarResources).when(pulsar).getPulsarResources();
});
serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
.when(serverCnx).getCommandSender();
eventLoopGroup = new NioEventLoopGroup();
brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
doReturn(brokerService).when(pulsar).getBrokerService();
});
String topicName = TopicName.get("MessageCumulativeAckTest").toString();
PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService);
sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
mock(ManagedCursorImpl.class), false));
doNothing().when(sub).acknowledgeMessage(any(), any(), any());
}
@AfterMethod(alwaysRun = true)
public void shutdown() throws Exception {
if (brokerService != null) {
brokerService.close();
brokerService = null;
}
if (pulsar != null) {
pulsar.close();
pulsar = null;
}
executor.shutdown();
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().get();
}
store.close();
sub = null;
}
@DataProvider(name = "individualAckModes")
public static Object[][] individualAckModes() {
return new Object[][]{
{Shared},
{Key_Shared},
};
}
@DataProvider(name = "notIndividualAckModes")
public static Object[][] notIndividualAckModes() {
return new Object[][]{
{Exclusive},
{Failover},
};
}
@Test(timeOut = 5000, dataProvider = "individualAckModes")
public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
"Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
CommandAck commandAck = new CommandAck();
commandAck.setAckType(Cumulative);
commandAck.setConsumerId(consumerId);
commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
consumer.messageAcked(commandAck).get();
verify(sub, never()).acknowledgeMessage(any(), any(), any());
}
@Test(timeOut = 5000, dataProvider = "notIndividualAckModes")
public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
"Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
CommandAck commandAck = new CommandAck();
commandAck.setAckType(Cumulative);
commandAck.setConsumerId(consumerId);
commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
consumer.messageAcked(commandAck).get();
verify(sub, times(1)).acknowledgeMessage(any(), any(), any());
}
@Test(timeOut = 5000)
public void testAckWithMoreThanNoneMessageIds() throws Exception {
Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0,
"Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
CommandAck commandAck = new CommandAck();
commandAck.setAckType(Cumulative);
commandAck.setConsumerId(consumerId);
commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
commandAck.addMessageId().setEntryId(0L).setLedgerId(2L);
consumer.messageAcked(commandAck).get();
verify(sub, never()).acknowledgeMessage(any(), any(), any());
}
}