blob: 10d2e926bc89bbb4e8c3e8e9ee30047ae38abf6b [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.annotations.Test;
import com.yahoo.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import com.yahoo.pulsar.broker.service.persistent.PersistentMessageFinder;
import com.yahoo.pulsar.common.api.DoubleByteBuf;
import com.yahoo.pulsar.common.api.proto.PulsarApi;
import com.yahoo.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
/**
*/
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
public static byte[] createMessageWrittenToLedger(String msg) throws Exception {
PulsarApi.MessageMetadata.Builder messageMetadataBuilder = PulsarApi.MessageMetadata.newBuilder();
messageMetadataBuilder.setPublishTime(System.currentTimeMillis());
messageMetadataBuilder.setProducerName("createMessageWrittenToLedger");
messageMetadataBuilder.setSequenceId(1);
PulsarApi.MessageMetadata messageMetadata = messageMetadataBuilder.build();
ByteBuf data = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
int msgMetadataSize = messageMetadata.getSerializedSize();
int payloadSize = data.readableBytes();
int totalSize = 4 + msgMetadataSize + payloadSize;
ByteBuf headers = PooledByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
headers.writeInt(msgMetadataSize);
messageMetadata.writeTo(outStream);
ByteBuf headersAndPayload = DoubleByteBuf.get(headers, data);
byte[] byteMessage = headersAndPayload.nioBuffer().array();
headersAndPayload.release();
return byteMessage;
}
class Result {
ManagedLedgerException exception = null;
Position position = null;
void reset() {
this.exception = null;
this.position = null;
}
}
CompletableFuture<Void> findMessage(final Result result, final ManagedCursor c1, final long timestamp) {
PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1);
final CompletableFuture<Void> future = new CompletableFuture<>();
messageFinder.findMessages(timestamp, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
result.position = position;
future.complete(null);
}
@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
result.exception = exception;
future.completeExceptionally(exception);
}
});
return future;
}
@Test
void testPersistentMessageFinder() throws Exception {
final String ledgerAndCursorName = "testPersistentMessageFinder";
int entriesPerLedger = 2;
long beginTimestamp = System.currentTimeMillis();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setRetentionTime(1, TimeUnit.HOURS);
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
ledger.addEntry(createMessageWrittenToLedger("retained1"));
// space apart message publish times
Thread.sleep(100);
ledger.addEntry(createMessageWrittenToLedger("retained2"));
Thread.sleep(100);
Position newPosition = ledger.addEntry(createMessageWrittenToLedger("retained3"));
Thread.sleep(100);
long timestamp = System.currentTimeMillis();
Thread.sleep(10);
ledger.addEntry(createMessageWrittenToLedger("afterresetposition"));
Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("not-read"));
List<Entry> entries = c1.readEntries(3);
c1.markDelete(entries.get(2).getPosition());
c1.close();
ledger.close();
entries.forEach(e -> e.release());
// give timed ledger trimming a chance to run
Thread.sleep(1000);
ledger = factory.open(ledgerAndCursorName, config);
c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
long endTimestamp = System.currentTimeMillis();
Result result = new Result();
CompletableFuture<Void> future = findMessage(result, c1, timestamp);
future.get();
assertEquals(result.exception, null);
assertTrue(result.position != null);
assertEquals(result.position, newPosition);
result.reset();
future = findMessage(result, c1, beginTimestamp);
future.get();
assertEquals(result.exception, null);
assertEquals(result.position, null);
result.reset();
future = findMessage(result, c1, endTimestamp);
future.get();
assertEquals(result.exception, null);
assertNotEquals(result.position, null);
assertEquals(result.position, lastPosition);
PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1);
final AtomicBoolean ex = new AtomicBoolean(false);
messageFinder.findEntryFailed(new ManagedLedgerException("failed"), new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
}
@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
ex.set(true);
}
});
assertTrue(ex.get());
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), null);
Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
field.setAccessible(true);
assertEquals(0, field.get(monitor));
result.reset();
c1.close();
ledger.close();
factory.shutdown();
}
}