blob: fd20c31d32fb89fe48f8d185aed3ed45847e9c0a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.compaction;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompactedTopicImpl implements CompactedTopic {
static final long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
static final long COMPACT_LEDGER_EMPTY = -0xfeed0fbbL;
static final int DEFAULT_STARTPOINT_CACHE_SIZE = 100;
private final BookKeeper bk;
private volatile PositionImpl compactionHorizon = null;
private volatile CompletableFuture<CompactedTopicContext> compactedTopicContext = null;
public CompactedTopicImpl(BookKeeper bk) {
this.bk = bk;
}
@Override
public CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId) {
synchronized (this) {
compactionHorizon = (PositionImpl) p;
CompletableFuture<CompactedTopicContext> previousContext = compactedTopicContext;
compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);
// delete the ledger from the old context once the new one is open
return compactedTopicContext.thenCompose(__ ->
previousContext != null ? previousContext : CompletableFuture.completedFuture(null));
}
}
@Override
public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {
return tryDeleteCompactedLedger(bk, compactedLedgerId);
}
@Override
@Deprecated
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
PositionImpl cursorPosition;
if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
cursorPosition = PositionImpl.EARLIEST;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
}
// TODO: redeliver epoch link https://github.com/apache/pulsar/issues/13690
ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH);
final PositionImpl currentCompactionHorizon = compactionHorizon;
if (currentCompactionHorizon == null
|| currentCompactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
} else {
compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
.thenCompose((startPoint) -> {
// do not need to read the compaction ledger if it is empty.
// the cursor just needs to be set to the compaction horizon
if (startPoint == COMPACT_LEDGER_EMPTY || startPoint == NEWER_THAN_COMPACTED) {
cursor.seek(currentCompactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
return CompletableFuture.completedFuture(null);
} else {
long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
startPoint + numberOfEntriesToRead);
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
Entry lastEntry = entries.get(entries.size() - 1);
// The compaction task depends on the last snapshot and the incremental
// entries to build the new snapshot. So for the compaction cursor, we
// need to force seek the read position to ensure the compactor can read
// the complete last snapshot because of the compactor will read the data
// before the compaction cursor mark delete position
cursor.seek(lastEntry.getPosition().getNext(), true);
callback.readEntriesComplete(entries, readEntriesCtx);
});
}
}))
.exceptionally((exception) -> {
if (exception.getCause() instanceof NoSuchElementException) {
cursor.seek(currentCompactionHorizon.getNext());
callback.readEntriesComplete(Collections.emptyList(), readEntriesCtx);
} else {
callback.readEntriesFailed(new ManagedLedgerException(exception), readEntriesCtx);
}
return null;
});
}
}
static CompletableFuture<Long> findStartPoint(PositionImpl p,
long lastEntryId,
AsyncLoadingCache<Long, MessageIdData> cache) {
CompletableFuture<Long> promise = new CompletableFuture<>();
// if lastEntryId is less than zero it means there are no entries in the compact ledger
if (lastEntryId < 0) {
promise.complete(COMPACT_LEDGER_EMPTY);
} else {
findStartPointLoop(p, 0, lastEntryId, promise, cache);
}
return promise;
}
@VisibleForTesting
static void findStartPointLoop(PositionImpl p, long start, long end,
CompletableFuture<Long> promise,
AsyncLoadingCache<Long, MessageIdData> cache) {
long midpoint = start + ((end - start) / 2);
CompletableFuture<MessageIdData> startEntry = cache.get(start);
CompletableFuture<MessageIdData> middleEntry = cache.get(midpoint);
CompletableFuture<MessageIdData> endEntry = cache.get(end);
CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(
() -> {
if (comparePositionAndMessageId(p, startEntry.join()) <= 0) {
promise.complete(start);
} else if (comparePositionAndMessageId(p, middleEntry.join()) <= 0) {
findStartPointLoop(p, start + 1, midpoint, promise, cache);
} else if (comparePositionAndMessageId(p, endEntry.join()) <= 0) {
findStartPointLoop(p, midpoint + 1, end, promise, cache);
} else {
promise.complete(NEWER_THAN_COMPACTED);
}
}).exceptionally((exception) -> {
promise.completeExceptionally(exception);
return null;
});
}
static AsyncLoadingCache<Long, MessageIdData> createCache(LedgerHandle lh,
long maxSize) {
return Caffeine.newBuilder()
.maximumSize(maxSize)
.buildAsync((entryId, executor) -> readOneMessageId(lh, entryId));
}
private static CompletableFuture<MessageIdData> readOneMessageId(LedgerHandle lh, long entryId) {
CompletableFuture<MessageIdData> promise = new CompletableFuture<>();
lh.asyncReadEntries(entryId, entryId,
(rc, _lh, seq, ctx) -> {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
// Need to release buffers for all entries in the sequence
if (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
entry.getEntryBuffer().release();
while (seq.hasMoreElements()) {
seq.nextElement().getEntryBuffer().release();
}
promise.complete(m.getMessageIdData());
}
} else {
promise.completeExceptionally(new NoSuchElementException(
String.format("No such entry %d in ledger %d", entryId, lh.getId())));
}
}
}, null);
return promise;
}
private static CompletableFuture<CompactedTopicContext> openCompactedLedger(BookKeeper bk, long id) {
CompletableFuture<LedgerHandle> promise = new CompletableFuture<>();
bk.asyncOpenLedger(id,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD,
(rc, ledger, ctx) -> {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
promise.complete(ledger);
}
}, null);
return promise.thenApply((ledger) -> new CompactedTopicContext(
ledger, createCache(ledger, DEFAULT_STARTPOINT_CACHE_SIZE)));
}
private static CompletableFuture<Void> tryDeleteCompactedLedger(BookKeeper bk, long id) {
CompletableFuture<Void> promise = new CompletableFuture<>();
bk.asyncDeleteLedger(id,
(rc, ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("Error deleting compacted topic ledger {}",
id, BKException.create(rc));
} else {
log.debug("Compacted topic ledger deleted successfully");
}
promise.complete(null); // don't propagate any error
}, null);
return promise;
}
static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>();
lh.asyncReadEntries(from, to,
(rc, _lh, seq, ctx) -> {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
promise.complete(seq);
}
}, null);
return promise.thenApply(
(seq) -> {
List<Entry> entries = new ArrayList<Entry>();
while (seq.hasMoreElements()) {
ByteBuf buf = seq.nextElement().getEntryBuffer();
try (RawMessage m = RawMessageImpl.deserializeFrom(buf)) {
entries.add(EntryImpl.create(m.getMessageIdData().getLedgerId(),
m.getMessageIdData().getEntryId(),
m.getHeadersAndPayload()));
} finally {
buf.release();
}
}
return entries;
});
}
/**
* Getter for CompactedTopicContext.
* @return CompactedTopicContext
*/
public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException {
return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get());
}
@Override
public CompletableFuture<Entry> readLastEntryOfCompactedLedger() {
if (compactionHorizon == null) {
return CompletableFuture.completedFuture(null);
}
return compactedTopicContext.thenCompose(context -> {
if (context.ledger.getLastAddConfirmed() == -1) {
return CompletableFuture.completedFuture(null);
}
return readEntries(
context.ledger, context.ledger.getLastAddConfirmed(), context.ledger.getLastAddConfirmed())
.thenCompose(entries -> entries.size() > 0
? CompletableFuture.completedFuture(entries.get(0))
: CompletableFuture.completedFuture(null));
});
}
private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
return ComparisonChain.start()
.compare(p.getLedgerId(), m.getLedgerId())
.compare(p.getEntryId(), m.getEntryId()).result();
}
public Optional<Position> getCompactionHorizon() {
return Optional.ofNullable(this.compactionHorizon);
}
@Nullable
public CompletableFuture<CompactedTopicContext> getCompactedTopicContextFuture() {
return compactedTopicContext;
}
private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
}