blob: 0cf52b4e36d4c1fbb60f180be7531bd794897b21 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;
import org.testng.collections.Sets;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
/**
*/
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
public static byte[] createMessageWrittenToLedger(String msg) throws Exception {
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("createMessageWrittenToLedger")
.setSequenceId(1);
ByteBuf data = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
int msgMetadataSize = messageMetadata.getSerializedSize();
int payloadSize = data.readableBytes();
int totalSize = 4 + msgMetadataSize + payloadSize;
ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
headers.writeInt(msgMetadataSize);
messageMetadata.writeTo(headers);
ByteBuf headersAndPayload = ByteBufPair.coalesce(ByteBufPair.get(headers, data));
byte[] byteMessage = headersAndPayload.nioBuffer().array();
headersAndPayload.release();
return byteMessage;
}
public static ByteBuf createMessageByteBufWrittenToLedger(String msg) throws Exception {
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("createMessageWrittenToLedger")
.setSequenceId(1);
ByteBuf data = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
int msgMetadataSize = messageMetadata.getSerializedSize();
int payloadSize = data.readableBytes();
int totalSize = 4 + msgMetadataSize + payloadSize;
ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
headers.writeInt(msgMetadataSize);
messageMetadata.writeTo(headers);
return ByteBufPair.coalesce(ByteBufPair.get(headers, data));
}
public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads) throws Exception {
ByteBuf msgWithEntryMeta =
Commands.addBrokerEntryMetadata(headerAndPayloads, getBrokerEntryMetadataInterceptors(), 1);
byte[] byteMessage = msgWithEntryMeta.nioBuffer().array();
msgWithEntryMeta.release();
return byteMessage;
}
class Result {
ManagedLedgerException exception = null;
Position position = null;
void reset() {
this.exception = null;
this.position = null;
}
}
CompletableFuture<Void> findMessage(final Result result, final ManagedCursor c1, final long timestamp) {
PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1);
final CompletableFuture<Void> future = new CompletableFuture<>();
messageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
result.position = position;
future.complete(null);
}
@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
result.exception = exception;
future.completeExceptionally(exception);
}
});
return future;
}
@Test
void testPersistentMessageFinder() throws Exception {
final String ledgerAndCursorName = "testPersistentMessageFinder";
int entriesPerLedger = 2;
long beginTimestamp = System.currentTimeMillis();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setRetentionTime(1, TimeUnit.HOURS);
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
ledger.addEntry(createMessageWrittenToLedger("retained1"));
// space apart message publish times
Thread.sleep(100);
ledger.addEntry(createMessageWrittenToLedger("retained2"));
Thread.sleep(100);
Position newPosition = ledger.addEntry(createMessageWrittenToLedger("retained3"));
Thread.sleep(100);
long timestamp = System.currentTimeMillis();
Thread.sleep(10);
ledger.addEntry(createMessageWrittenToLedger("afterresetposition"));
Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("not-read"));
List<Entry> entries = c1.readEntries(3);
c1.markDelete(entries.get(2).getPosition());
c1.close();
ledger.close();
entries.forEach(e -> e.release());
// give timed ledger trimming a chance to run
Thread.sleep(1000);
ledger = factory.open(ledgerAndCursorName, config);
c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
long endTimestamp = System.currentTimeMillis();
Result result = new Result();
CompletableFuture<Void> future = findMessage(result, c1, timestamp);
future.get();
assertNull(result.exception);
assertNotNull(result.position);
assertEquals(result.position, newPosition);
result.reset();
future = findMessage(result, c1, beginTimestamp);
future.get();
assertNull(result.exception);
assertNull(result.position);
result.reset();
future = findMessage(result, c1, endTimestamp);
future.get();
assertNull(result.exception);
assertNotEquals(result.position, null);
assertEquals(result.position, lastPosition);
PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1);
final AtomicBoolean ex = new AtomicBoolean(false);
messageFinder.findEntryFailed(new ManagedLedgerException("failed"), Optional.empty(),
new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
}
@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
ex.set(true);
}
});
assertTrue(ex.get());
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Optional.empty(), null);
Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
field.setAccessible(true);
assertEquals(0, field.get(monitor));
result.reset();
c1.close();
ledger.close();
factory.shutdown();
}
@Test
void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {
final String ledgerAndCursorName = "publishTime";
final String ledgerAndCursorNameForBrokerTimestampMessage = "brokerTimestamp";
ManagedLedgerConfig config = new ManagedLedgerConfig();
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
ledger.addEntry(createMessageWrittenToLedger("message1"));
// space apart message publish times
Thread.sleep(100);
ledger.addEntry(createMessageWrittenToLedger("message2"));
Thread.sleep(100);
Position position = ledger.addEntry(createMessageWrittenToLedger("message3"));
Thread.sleep(100);
long timestamp = System.currentTimeMillis();
Result result = new Result();
CompletableFuture<Void> future = findMessage(result, cursor, timestamp);
future.get();
assertNull(result.exception);
assertNotNull(result.position);
assertEquals(result.position, position);
List<Entry> entryList = cursor.readEntries(3);
for (Entry entry : entryList) {
// entry has no raw metadata if BrokerTimestampForMessage is disable
assertNull(Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()));
}
result.reset();
cursor.close();
ledger.close();
ManagedLedgerConfig configNew = new ManagedLedgerConfig();
ManagedLedger ledgerNew = factory.open(ledgerAndCursorNameForBrokerTimestampMessage, configNew);
ManagedCursorImpl cursorNew = (ManagedCursorImpl) ledgerNew.openCursor(ledgerAndCursorNameForBrokerTimestampMessage);
// build message which has publish time first
ByteBuf msg1 = createMessageByteBufWrittenToLedger("message1");
ByteBuf msg2 = createMessageByteBufWrittenToLedger("message2");
ByteBuf msg3 = createMessageByteBufWrittenToLedger("message3");
Thread.sleep(10);
long timeAfterPublishTime = System.currentTimeMillis();
Thread.sleep(10);
// append broker timestamp as entry metadata
ledgerNew.addEntry(appendBrokerTimestamp(msg1));
// space apart message publish times
Thread.sleep(100);
ledgerNew.addEntry(appendBrokerTimestamp(msg2));
Thread.sleep(100);
Position newPosition = ledgerNew.addEntry(appendBrokerTimestamp(msg3));
Thread.sleep(100);
long timeAfterBrokerTimestamp = System.currentTimeMillis();
CompletableFuture<Void> publishTimeFuture = findMessage(result, cursorNew, timeAfterPublishTime);
publishTimeFuture.get();
assertNull(result.exception);
// position should be null, since broker timestamp for message is bigger than timeAfterPublishTime
assertNull(result.position);
result.reset();
CompletableFuture<Void> brokerTimestampFuture = findMessage(result, cursorNew, timeAfterBrokerTimestamp);
brokerTimestampFuture.get();
assertNull(result.exception);
assertNotNull(result.position);
assertEquals(result.position, newPosition);
List<Entry> entryListNew = cursorNew.readEntries(4);
for (Entry entry : entryListNew) {
// entry should have raw metadata since BrokerTimestampForMessage is enable
BrokerEntryMetadata brokerMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
assertNotNull(brokerMetadata);
assertTrue(brokerMetadata.getBrokerTimestamp() > timeAfterPublishTime);
}
result.reset();
cursorNew.close();
ledgerNew.close();
factory.shutdown();
}
public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
Thread.currentThread().getContextClassLoader());
}
/**
* It tests that message expiry doesn't get stuck if it can't read deleted ledger's entry.
*
* @throws Exception
*/
@Test
void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers";
final int entriesPerLedger = 2;
final int totalEntries = 10;
final int ttlSeconds = 1;
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setRetentionTime(1, TimeUnit.HOURS);
config.setAutoSkipNonRecoverableData(true);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
for (int i = 0; i < totalEntries; i++) {
ledger.addEntry(createMessageWrittenToLedger("msg" + i));
}
List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
assertEquals(ledgers.size(), totalEntries / entriesPerLedger);
// this will make sure that all entries should be deleted
Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds));
bkc.deleteLedger(ledgers.get(0).getLedgerId());
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
Position previousPos = previousMarkDelete;
retryStrategically(
(test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos),
5, 100);
previousMarkDelete = c1.getMarkDeletedPosition();
}
PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition();
assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId());
assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId());
c1.close();
ledger.close();
factory.shutdown();
}
@Test
void testMessageExpiryWithPosition() throws Exception {
final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
final int entriesPerLedger = 5;
final int totalEntries = 30;
List<Position> positions = new ArrayList<>();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setRetentionTime(1, TimeUnit.HOURS);
config.setAutoSkipNonRecoverableData(true);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
PersistentSubscription subscription = mock(PersistentSubscription.class);
Topic topic = mock(Topic.class);
when(subscription.getTopic()).thenReturn(topic);
for (int i = 0; i < totalEntries; i++) {
positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
}
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname",
cursor.getName(), cursor, subscription));
assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1));
boolean issued;
// Expire by position and verify mark delete position of cursor.
issued = monitor.expireMessages(positions.get(15));
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
assertTrue(issued);
clearInvocations(monitor);
// Expire by position beyond last position and nothing should happen.
issued = monitor.expireMessages(PositionImpl.get(100, 100));
assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
assertFalse(issued);
// Expire by position again and verify mark delete position of cursor didn't change.
issued = monitor.expireMessages(positions.get(15));
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
assertTrue(issued);
clearInvocations(monitor);
// Expire by position before current mark delete position and verify mark delete position of cursor didn't change.
issued = monitor.expireMessages(positions.get(10));
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId()));
assertTrue(issued);
clearInvocations(monitor);
// Expire by position after current mark delete position and verify mark delete position of cursor move to new position.
issued = monitor.expireMessages(positions.get(16));
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any()));
assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(16).getLedgerId(), positions.get(16).getEntryId()));
assertTrue(issued);
clearInvocations(monitor);
ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname",
cursor.getName(), mockCursor, subscription));
// Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to
// expire message shouldn't issue.
doAnswer(invocation -> null).when(mockCursor).asyncFindNewestMatching(any(), any(), any(), any());
issued = mockMonitor.expireMessages(positions.get(15));
assertTrue(issued);
issued = mockMonitor.expireMessages(positions.get(15));
assertFalse(issued);
cursor.close();
ledger.close();
factory.shutdown();
}
@Test
public void test() {
ResetCursorData resetCursorData = new ResetCursorData(1, 1);
resetCursorData.setExcluded(true);
System.out.println(Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
}
}