blob: f98c9e000f1503ebdbf1a393862668087f235913 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed.bucket;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
import com.google.common.collect.Table;
import com.google.common.collect.TreeRangeMap;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
@Slf4j
@ThreadSafe
public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {
public static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";
static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null);
static final int AsyncOperationTimeoutSeconds = 60;
private static final Long INVALID_BUCKET_ID = -1L;
private static final int MAX_MERGE_NUM = 4;
private final long minIndexCountPerBucket;
private final long timeStepPerBucketSnapshotSegmentInMillis;
private final int maxIndexesPerBucketSnapshotSegment;
private final int maxNumBuckets;
private volatile long numberDelayedMessages;
@Getter
@VisibleForTesting
private final MutableBucket lastMutableBucket;
@Getter
@VisibleForTesting
private final TripleLongPriorityQueue sharedBucketPriorityQueue;
@Getter
@VisibleForTesting
private final RangeMap<Long, ImmutableBucket> immutableBuckets;
private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable;
private final BucketDelayedMessageIndexStats stats;
private CompletableFuture<Void> pendingLoad = null;
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment;
this.maxNumBuckets = maxNumBuckets;
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
this.snapshotSegmentLastIndexTable = HashBasedTable.create();
this.lastMutableBucket =
new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(),
bucketSnapshotStorage);
this.stats = new BucketDelayedMessageIndexStats();
this.numberDelayedMessages = recoverBucketSnapshot();
}
private synchronized long recoverBucketSnapshot() throws RuntimeException {
ManagedCursor cursor = this.lastMutableBucket.getCursor();
Map<String, String> cursorProperties = cursor.getCursorProperties();
if (MapUtils.isEmpty(cursorProperties)) {
log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot",
dispatcher.getName());
return 0;
}
FutureUtil.Sequencer<Void> sequencer = this.lastMutableBucket.getSequencer();
Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>();
cursorProperties.keySet().forEach(key -> {
if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
String[] keys = key.split(DELIMITER);
checkArgument(keys.length == 3);
ImmutableBucket immutableBucket =
new ImmutableBucket(dispatcher.getName(), cursor, sequencer,
this.lastMutableBucket.bucketSnapshotStorage,
Long.parseLong(keys[1]), Long.parseLong(keys[2]));
putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId),
immutableBucket, toBeDeletedBucketMap);
}
});
Map<Range<Long>, ImmutableBucket> immutableBucketMap = immutableBuckets.asMapOfRanges();
if (immutableBucketMap.isEmpty()) {
log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot",
dispatcher.getName());
return 0;
}
Map<Range<Long>, CompletableFuture<List<DelayedIndex>>>
futures = new HashMap<>(immutableBucketMap.size());
for (Map.Entry<Range<Long>, ImmutableBucket> entry : immutableBucketMap.entrySet()) {
Range<Long> key = entry.getKey();
ImmutableBucket immutableBucket = entry.getValue();
futures.put(key, immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime));
}
try {
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("[{}] Failed to recover delayed message index bucket snapshot.", dispatcher.getName(), e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(e);
}
for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>> entry : futures.entrySet()) {
Range<Long> key = entry.getKey();
// the future will always be completed since it was waited for above
List<DelayedIndex> indexList = entry.getValue().getNow(null);
ImmutableBucket immutableBucket = immutableBucketMap.get(key);
if (CollectionUtils.isEmpty(indexList)) {
// Delete bucket snapshot if indexList is empty
toBeDeletedBucketMap.put(key, immutableBucket);
} else {
DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), immutableBucket);
for (DelayedIndex index : indexList) {
this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
}
}
for (Map.Entry<Range<Long>, ImmutableBucket> mapEntry : toBeDeletedBucketMap.entrySet()) {
Range<Long> key = mapEntry.getKey();
ImmutableBucket immutableBucket = mapEntry.getValue();
immutableBucketMap.remove(key);
// delete asynchronously without waiting for completion
immutableBucket.asyncDeleteBucketSnapshot(stats);
}
MutableLong numberDelayedMessages = new MutableLong(0);
immutableBucketMap.values().forEach(bucket -> {
numberDelayedMessages.add(bucket.numberBucketDelayedMessages);
});
log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}",
dispatcher.getName(), immutableBucketMap.size(), numberDelayedMessages.getValue());
return numberDelayedMessages.getValue();
}
private synchronized void putAndCleanOverlapRange(Range<Long> range, ImmutableBucket immutableBucket,
Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap) {
RangeMap<Long, ImmutableBucket> subRangeMap = immutableBuckets.subRangeMap(range);
boolean canPut = false;
if (!subRangeMap.asMapOfRanges().isEmpty()) {
for (Map.Entry<Range<Long>, ImmutableBucket> rangeEntry : subRangeMap.asMapOfRanges().entrySet()) {
if (range.encloses(rangeEntry.getKey())) {
toBeDeletedBucketMap.put(rangeEntry.getKey(), rangeEntry.getValue());
canPut = true;
}
}
} else {
canPut = true;
}
if (canPut) {
immutableBuckets.put(range, immutableBucket);
}
}
@Override
public void run(Timeout timeout) throws Exception {
synchronized (this) {
if (timeout == null || timeout.isCancelled()) {
return;
}
lastMutableBucket.moveScheduledMessageToSharedQueue(getCutoffTime(), sharedBucketPriorityQueue);
}
super.run(timeout);
}
private Optional<ImmutableBucket> findImmutableBucket(long ledgerId) {
if (immutableBuckets.asMapOfRanges().isEmpty()) {
return Optional.empty();
}
return Optional.ofNullable(immutableBuckets.get(ledgerId));
}
private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair,
long startTime) {
if (immutableBucketDelayedIndexPair != null) {
ImmutableBucket immutableBucket = immutableBucketDelayedIndexPair.getLeft();
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId),
immutableBucket);
DelayedIndex lastDelayedIndex = immutableBucketDelayedIndexPair.getRight();
snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(),
immutableBucket);
immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> {
CompletableFuture<Long> future = createFuture.handle((bucketId, ex) -> {
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
immutableBucket.asyncUpdateSnapshotLength();
log.info("[{}] Create bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
System.currentTimeMillis() - startTime);
return bucketId;
}
log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey(), ex);
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
// Put indexes back into the shared queue and downgrade to memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
for (SnapshotSegment snapshotSegment : snapshotSegments) {
for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
delayedIndex.getLedgerId(), delayedIndex.getEntryId());
}
}
immutableBucket.setSnapshotSegments(null);
});
immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId);
immutableBuckets.asMapOfRanges().remove(
Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId));
snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getTimestamp());
}
return INVALID_BUCKET_ID;
});
immutableBucket.setSnapshotCreateFuture(future);
});
}
}
@Override
public synchronized boolean addMessage(long ledgerId, long entryId, long deliverAt) {
if (containsMessage(ledgerId, entryId)) {
return true;
}
if (deliverAt < 0 || deliverAt <= getCutoffTime()) {
return false;
}
boolean existBucket = findImmutableBucket(ledgerId).isPresent();
// Create bucket snapshot
if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
long createStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.sealBucketAndAsyncPersistent(
this.timeStepPerBucketSnapshotSegmentInMillis,
this.maxIndexesPerBucketSnapshotSegment,
this.sharedBucketPriorityQueue);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
lastMutableBucket.resetLastMutableBucketRange();
if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
asyncMergeBucketSnapshot();
}
}
if (ledgerId < lastMutableBucket.startLedgerId || existBucket) {
// If (ledgerId < startLedgerId || existBucket) means that message index belong to previous bucket range,
// enter sharedBucketPriorityQueue directly
sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
lastMutableBucket.putIndexBit(ledgerId, entryId);
} else {
checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
}
numberDelayedMessages++;
if (log.isDebugEnabled()) {
log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
deliverAt - clock.millis());
}
updateTimer();
return true;
}
private synchronized List<ImmutableBucket> selectMergedBuckets(final List<ImmutableBucket> values, int mergeNum) {
checkArgument(mergeNum < values.size());
long minNumberMessages = Long.MAX_VALUE;
long minScheduleTimestamp = Long.MAX_VALUE;
int minIndex = -1;
for (int i = 0; i + (mergeNum - 1) < values.size(); i++) {
List<ImmutableBucket> immutableBuckets = values.subList(i, i + mergeNum);
if (immutableBuckets.stream().allMatch(bucket -> {
// We should skip the bucket which last segment already been load to memory,
// avoid record replicated index.
return bucket.lastSegmentEntryId > bucket.currentSegmentEntryId && !bucket.merging;
})) {
long numberMessages = immutableBuckets.stream()
.mapToLong(bucket -> bucket.numberBucketDelayedMessages)
.sum();
if (numberMessages <= minNumberMessages) {
minNumberMessages = numberMessages;
long scheduleTimestamp = immutableBuckets.stream()
.mapToLong(bucket -> bucket.firstScheduleTimestamps.get(bucket.currentSegmentEntryId + 1))
.min().getAsLong();
if (scheduleTimestamp < minScheduleTimestamp) {
minScheduleTimestamp = scheduleTimestamp;
minIndex = i;
}
}
}
}
if (minIndex >= 0) {
return values.subList(minIndex, minIndex + mergeNum);
} else if (mergeNum > 2){
return selectMergedBuckets(values, mergeNum - 1);
} else {
return Collections.emptyList();
}
}
private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
List<ImmutableBucket> immutableBucketList = immutableBuckets.asMapOfRanges().values().stream().toList();
List<ImmutableBucket> toBeMergeImmutableBuckets = selectMergedBuckets(immutableBucketList, MAX_MERGE_NUM);
if (toBeMergeImmutableBuckets.isEmpty()) {
log.warn("[{}] Can't find able merged buckets", dispatcher.getName());
return CompletableFuture.completedFuture(null);
}
final String bucketsStr = toBeMergeImmutableBuckets.stream().map(Bucket::bucketKey).collect(
Collectors.joining(",")).replaceAll(DELAYED_BUCKET_KEY_PREFIX + "_", "");
if (log.isDebugEnabled()) {
log.info("[{}] Merging bucket snapshot, bucketKeys: {}", dispatcher.getName(), bucketsStr);
}
for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
immutableBucket.merging = true;
}
long mergeStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
return asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
synchronized (this) {
for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
immutableBucket.merging = false;
}
}
if (ex != null) {
log.error("[{}] Failed to merge bucket snapshot, bucketKeys: {}",
dispatcher.getName(), bucketsStr, ex);
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
} else {
log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, bucketNum: {}",
dispatcher.getName(), bucketsStr, immutableBuckets.asMapOfRanges().size());
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
System.currentTimeMillis() - mergeStartTime);
}
});
}
private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<ImmutableBucket> buckets) {
List<CompletableFuture<Long>> createFutures =
buckets.stream().map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE))
.toList();
return FutureUtil.waitForAll(createFutures).thenCompose(bucketId -> {
if (createFutures.stream().anyMatch(future -> INVALID_BUCKET_ID.equals(future.join()))) {
return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
}
List<CompletableFuture<List<SnapshotSegment>>> getRemainFutures =
buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();
return FutureUtil.waitForAll(getRemainFutures)
.thenApply(__ -> {
return CombinedSegmentDelayedIndexQueue.wrap(
getRemainFutures.stream().map(CompletableFuture::join).toList());
})
.thenAccept(combinedDelayedIndexQueue -> {
synchronized (BucketDelayedDeliveryTracker.this) {
long createStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment,
sharedBucketPriorityQueue, combinedDelayedIndexQueue,
buckets.get(0).startLedgerId,
buckets.get(buckets.size() - 1).endLedgerId);
// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMap =
new HashMap<>(buckets.get(0).getDelayedIndexBitMap());
for (int i = 1; i < buckets.size(); i++) {
buckets.get(i).delayedIndexBitMap.forEach((ledgerId, bitMapB) -> {
delayedIndexBitMap.compute(ledgerId, (k, bitMap) -> {
if (bitMap == null) {
return bitMapB;
}
bitMap.or(bitMapB);
return bitMap;
});
});
}
// optimize bm
delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
List<CompletableFuture<Void>> removeFutures =
buckets.stream().map(bucket -> bucket.asyncDeleteBucketSnapshot(stats))
.toList();
return FutureUtil.waitForAll(removeFutures);
});
for (ImmutableBucket bucket : buckets) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
}
}
});
});
}
@Override
public synchronized boolean hasMessageAvailable() {
long cutoffTime = getCutoffTime();
boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 && nextDeliveryTime() <= cutoffTime;
if (!hasMessageAvailable) {
updateTimer();
}
return hasMessageAvailable;
}
@Override
protected long nextDeliveryTime() {
if (lastMutableBucket.isEmpty() && !sharedBucketPriorityQueue.isEmpty()) {
return sharedBucketPriorityQueue.peekN1();
} else if (sharedBucketPriorityQueue.isEmpty() && !lastMutableBucket.isEmpty()) {
return lastMutableBucket.nextDeliveryTime();
}
long timestamp = lastMutableBucket.nextDeliveryTime();
long bucketTimestamp = sharedBucketPriorityQueue.peekN1();
return Math.min(timestamp, bucketTimestamp);
}
@Override
public long getNumberOfDelayedMessages() {
return numberDelayedMessages;
}
@Override
public long getBufferMemoryUsage() {
return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity();
}
@Override
public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
if (!checkPendingLoadDone()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.",
dispatcher.getName());
}
return Collections.emptyNavigableSet();
}
long cutoffTime = getCutoffTime();
lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue);
NavigableSet<PositionImpl> positions = new TreeSet<>();
int n = maxMessages;
while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) {
long timestamp = sharedBucketPriorityQueue.peekN1();
if (timestamp > cutoffTime) {
break;
}
long ledgerId = sharedBucketPriorityQueue.peekN2();
long entryId = sharedBucketPriorityQueue.peekN3();
ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId);
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
// All message of current snapshot segment are scheduled, try load next snapshot segment
if (bucket.merging) {
log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}
final int preSegmentEntryId = bucket.currentSegmentEntryId;
if (log.isDebugEnabled()) {
log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
}
boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
if (!createFutureDone) {
log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}
long loadStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry()
.thenAccept(indexList -> {
synchronized (BucketDelayedDeliveryTracker.this) {
this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedIndex
lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), bucket);
for (DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
}
}).whenComplete((__, ex) -> {
if (ex != null) {
// Back bucket state
bucket.setCurrentSegmentEntryId(preSegmentEntryId);
log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
} else {
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(),
(preSegmentEntryId == bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1);
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
System.currentTimeMillis() - loadStartTime);
}
synchronized (this) {
if (timeout != null) {
timeout.cancel();
}
timeout = timer.newTimeout(this, 0, TimeUnit.MILLISECONDS);
}
});
if (!checkPendingLoadDone() || loadFuture.isCompletedExceptionally()) {
break;
}
}
positions.add(new PositionImpl(ledgerId, entryId));
sharedBucketPriorityQueue.pop();
removeIndexBit(ledgerId, entryId);
--n;
--numberDelayedMessages;
}
updateTimer();
return positions;
}
private synchronized boolean checkPendingLoadDone() {
if (pendingLoad == null || pendingLoad.isDone()) {
pendingLoad = null;
return true;
}
return false;
}
@Override
public boolean shouldPauseAllDeliveries() {
return false;
}
@Override
public synchronized CompletableFuture<Void> clear() {
CompletableFuture<Void> future = cleanImmutableBuckets();
sharedBucketPriorityQueue.clear();
lastMutableBucket.clear();
snapshotSegmentLastIndexTable.clear();
numberDelayedMessages = 0;
return future;
}
@Override
public synchronized void close() {
super.close();
lastMutableBucket.close();
sharedBucketPriorityQueue.close();
try {
List<CompletableFuture<Long>> completableFutures = immutableBuckets.asMapOfRanges().values().stream()
.map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList();
FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("[{}] Failed wait to snapshot generate", dispatcher.getName(), e);
}
}
private CompletableFuture<Void> cleanImmutableBuckets() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
Iterator<ImmutableBucket> iterator = immutableBuckets.asMapOfRanges().values().iterator();
while (iterator.hasNext()) {
ImmutableBucket bucket = iterator.next();
futures.add(bucket.clear(stats));
numberDelayedMessages -= bucket.getNumberBucketDelayedMessages();
iterator.remove();
}
return FutureUtil.waitForAll(futures);
}
private boolean removeIndexBit(long ledgerId, long entryId) {
if (lastMutableBucket.removeIndexBit(ledgerId, entryId)) {
return true;
}
return findImmutableBucket(ledgerId).map(bucket -> bucket.removeIndexBit(ledgerId, entryId))
.orElse(false);
}
public boolean containsMessage(long ledgerId, long entryId) {
if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
return true;
}
return findImmutableBucket(ledgerId).map(bucket -> bucket.containsMessage(ledgerId, entryId))
.orElse(false);
}
public Map<String, TopicMetricBean> genTopicMetricMap() {
stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size());
MutableLong totalSnapshotLength = new MutableLong();
immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> {
totalSnapshotLength.add(immutableBucket.getSnapshotLength());
});
stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
return stats.genTopicMetricMap();
}
}