blob: 09c2962cd65655b2ee3b2ef6ac5c03d4f8fa9f7f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class AbstractBaseDispatcherTest {
private AbstractBaseDispatcherTestHelper helper;
private ServiceConfiguration svcConfig;
private PersistentSubscription subscriptionMock;
@BeforeMethod
public void setup() throws Exception {
this.svcConfig = mock(ServiceConfiguration.class);
this.subscriptionMock = mock(PersistentSubscription.class);
this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
}
@Test
public void testFilterEntriesForConsumerOfNullElement() {
List<Entry> entries = new ArrayList<>();
entries.add(null);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@Test
public void testFilterEntriesForConsumerOfEntryFilter() {
Topic mockTopic = mock(Topic.class);
when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
BrokerService mockBrokerService = mock(BrokerService.class);
when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
EntryFilterWithClassLoader mockFilter = mock(EntryFilterWithClassLoader.class);
when(mockFilter.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn(
EntryFilter.FilterResult.REJECT);
ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("key", mockFilter);
when(mockBrokerService.getEntryFilters()).thenReturn(entryFilters);
this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
List<Entry> entries = new ArrayList<>();
entries.add(EntryImpl.create(1, 2, createMessage("message1", 1)));
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
//
int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@Test
public void testFilterEntriesForConsumerOfTxnMsgAbort() {
List<Entry> entries = new ArrayList<>();
entries.add(EntryImpl.create(1, 1, createTnxAbortMessage("message1", 1)));
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@Test
public void testFilterEntriesForConsumerOfTxnBufferAbort() {
PersistentTopic mockTopic = mock(PersistentTopic.class);
when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
when(mockTopic.isTxnAborted(any(TxnID.class))).thenReturn(true);
List<Entry> entries = new ArrayList<>();
entries.add(EntryImpl.create(1, 1, createTnxMessage("message1", 1)));
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@Test
public void testFilterEntriesForConsumerOfServerOnlyMarker() {
List<Entry> entries = new ArrayList<>();
ByteBuf markerMessage =
Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster");
entries.add(EntryImpl.create(1, 1, markerMessage));
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
@Test
public void testFilterEntriesForConsumerOfDelayedMsg() {
List<Entry> entries = new ArrayList<>();
entries.add(EntryImpl.create(1, 1, createDelayedMessage("message1", 1)));
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
assertEquals(size, 0);
}
private ByteBuf createMessage(String message, int sequenceId) {
MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(sequenceId)
.setProducerName("testProducer")
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis());
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}
private ByteBuf createTnxMessage(String message, int sequenceId) {
MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(sequenceId)
.setProducerName("testProducer")
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis())
.setTxnidMostBits(8)
.setTxnidLeastBits(0);
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}
private ByteBuf createTnxAbortMessage(String message, int sequenceId) {
MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(sequenceId)
.setProducerName("testProducer")
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis())
.setTxnidMostBits(8)
.setTxnidLeastBits(0)
.setMarkerType(MarkerType.TXN_ABORT_VALUE);
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}
private ByteBuf createDelayedMessage(String message, int sequenceId) {
MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(sequenceId)
.setProducerName("testProducer")
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis())
.setDeliverAtTime(System.currentTimeMillis() + 5000);
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}
private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {
protected AbstractBaseDispatcherTestHelper(Subscription subscription,
ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
}
@Override
protected boolean isConsumersExceededOnSubscription() {
return false;
}
@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
//for test.
return true;
}
@Override
protected void reScheduleRead() {
}
@Override
public void addConsumer(Consumer consumer) throws BrokerServiceException {
}
@Override
public void removeConsumer(Consumer consumer) throws BrokerServiceException {
}
@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
}
@Override
public boolean isConsumerConnected() {
return false;
}
@Override
public List<Consumer> getConsumers() {
return null;
}
@Override
public boolean canUnsubscribe(Consumer consumer) {
return false;
}
@Override
public CompletableFuture<Void> close() {
return null;
}
@Override
public boolean isClosed() {
return false;
}
@Override
public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
return null;
}
@Override
public CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
return null;
}
@Override
public void reset() {
}
@Override
public CommandSubscribe.SubType getType() {
return null;
}
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
}
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
}
@Override
public void addUnAckedMessages(int unAckMessages) {
}
@Override
public RedeliveryTracker getRedeliveryTracker() {
return null;
}
}
}