| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.delayed.bucket; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX; |
| import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.HashBasedTable; |
| import com.google.common.collect.Range; |
| import com.google.common.collect.RangeMap; |
| import com.google.common.collect.Table; |
| import com.google.common.collect.TreeRangeMap; |
| import io.netty.util.Timeout; |
| import io.netty.util.Timer; |
| import java.time.Clock; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableSet; |
| import java.util.Optional; |
| import java.util.TreeSet; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.stream.Collectors; |
| import javax.annotation.concurrent.ThreadSafe; |
| import lombok.Getter; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.mledger.ManagedCursor; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.commons.collections4.CollectionUtils; |
| import org.apache.commons.lang3.mutable.MutableLong; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker; |
| import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; |
| import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; |
| import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; |
| import org.apache.pulsar.common.policies.data.stats.TopicMetricBean; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; |
| import org.roaringbitmap.RoaringBitmap; |
| |
| @Slf4j |
| @ThreadSafe |
| public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { |
| |
| public static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket"; |
| |
| static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null); |
| |
| static final int AsyncOperationTimeoutSeconds = 60; |
| |
| private static final Long INVALID_BUCKET_ID = -1L; |
| |
| private static final int MAX_MERGE_NUM = 4; |
| |
| private final long minIndexCountPerBucket; |
| |
| private final long timeStepPerBucketSnapshotSegmentInMillis; |
| |
| private final int maxIndexesPerBucketSnapshotSegment; |
| |
| private final int maxNumBuckets; |
| |
| private volatile long numberDelayedMessages; |
| |
| @Getter |
| @VisibleForTesting |
| private final MutableBucket lastMutableBucket; |
| |
| @Getter |
| @VisibleForTesting |
| private final TripleLongPriorityQueue sharedBucketPriorityQueue; |
| |
| @Getter |
| @VisibleForTesting |
| private final RangeMap<Long, ImmutableBucket> immutableBuckets; |
| |
| private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable; |
| |
| private final BucketDelayedMessageIndexStats stats; |
| |
| private CompletableFuture<Void> pendingLoad = null; |
| |
| public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, |
| Timer timer, long tickTimeMillis, |
| boolean isDelayedDeliveryDeliverAtTimeStrict, |
| BucketSnapshotStorage bucketSnapshotStorage, |
| long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, |
| int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) { |
| this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, |
| bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis, |
| maxIndexesPerBucketSnapshotSegment, maxNumBuckets); |
| } |
| |
| public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, |
| Timer timer, long tickTimeMillis, Clock clock, |
| boolean isDelayedDeliveryDeliverAtTimeStrict, |
| BucketSnapshotStorage bucketSnapshotStorage, |
| long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, |
| int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) { |
| super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict); |
| this.minIndexCountPerBucket = minIndexCountPerBucket; |
| this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis; |
| this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment; |
| this.maxNumBuckets = maxNumBuckets; |
| this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); |
| this.immutableBuckets = TreeRangeMap.create(); |
| this.snapshotSegmentLastIndexTable = HashBasedTable.create(); |
| this.lastMutableBucket = |
| new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(), |
| bucketSnapshotStorage); |
| this.stats = new BucketDelayedMessageIndexStats(); |
| this.numberDelayedMessages = recoverBucketSnapshot(); |
| } |
| |
| private synchronized long recoverBucketSnapshot() throws RuntimeException { |
| ManagedCursor cursor = this.lastMutableBucket.getCursor(); |
| FutureUtil.Sequencer<Void> sequencer = this.lastMutableBucket.getSequencer(); |
| Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>(); |
| cursor.getCursorProperties().keySet().forEach(key -> { |
| if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) { |
| String[] keys = key.split(DELIMITER); |
| checkArgument(keys.length == 3); |
| ImmutableBucket immutableBucket = |
| new ImmutableBucket(dispatcher.getName(), cursor, sequencer, |
| this.lastMutableBucket.bucketSnapshotStorage, |
| Long.parseLong(keys[1]), Long.parseLong(keys[2])); |
| putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId), |
| immutableBucket, toBeDeletedBucketMap); |
| } |
| }); |
| |
| Map<Range<Long>, ImmutableBucket> immutableBucketMap = immutableBuckets.asMapOfRanges(); |
| if (immutableBucketMap.isEmpty()) { |
| log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot", |
| dispatcher.getName()); |
| return 0; |
| } |
| |
| Map<Range<Long>, CompletableFuture<List<DelayedIndex>>> |
| futures = new HashMap<>(immutableBucketMap.size()); |
| for (Map.Entry<Range<Long>, ImmutableBucket> entry : immutableBucketMap.entrySet()) { |
| Range<Long> key = entry.getKey(); |
| ImmutableBucket immutableBucket = entry.getValue(); |
| futures.put(key, immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime)); |
| } |
| |
| try { |
| FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS); |
| } catch (InterruptedException | ExecutionException | TimeoutException e) { |
| log.error("[{}] Failed to recover delayed message index bucket snapshot.", dispatcher.getName(), e); |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| throw new RuntimeException(e); |
| } |
| |
| for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>> entry : futures.entrySet()) { |
| Range<Long> key = entry.getKey(); |
| // the future will always be completed since it was waited for above |
| List<DelayedIndex> indexList = entry.getValue().getNow(null); |
| ImmutableBucket immutableBucket = immutableBucketMap.get(key); |
| if (CollectionUtils.isEmpty(indexList)) { |
| // Delete bucket snapshot if indexList is empty |
| toBeDeletedBucketMap.put(key, immutableBucket); |
| } else { |
| DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1); |
| this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), |
| lastDelayedIndex.getEntryId(), immutableBucket); |
| for (DelayedIndex index : indexList) { |
| this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), |
| index.getEntryId()); |
| } |
| } |
| } |
| |
| for (Map.Entry<Range<Long>, ImmutableBucket> mapEntry : toBeDeletedBucketMap.entrySet()) { |
| Range<Long> key = mapEntry.getKey(); |
| ImmutableBucket immutableBucket = mapEntry.getValue(); |
| immutableBucketMap.remove(key); |
| // delete asynchronously without waiting for completion |
| immutableBucket.asyncDeleteBucketSnapshot(stats); |
| } |
| |
| MutableLong numberDelayedMessages = new MutableLong(0); |
| immutableBucketMap.values().forEach(bucket -> { |
| numberDelayedMessages.add(bucket.numberBucketDelayedMessages); |
| }); |
| |
| log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}", |
| dispatcher.getName(), immutableBucketMap.size(), numberDelayedMessages.getValue()); |
| |
| return numberDelayedMessages.getValue(); |
| } |
| |
| private synchronized void putAndCleanOverlapRange(Range<Long> range, ImmutableBucket immutableBucket, |
| Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap) { |
| RangeMap<Long, ImmutableBucket> subRangeMap = immutableBuckets.subRangeMap(range); |
| boolean canPut = false; |
| if (!subRangeMap.asMapOfRanges().isEmpty()) { |
| for (Map.Entry<Range<Long>, ImmutableBucket> rangeEntry : subRangeMap.asMapOfRanges().entrySet()) { |
| if (range.encloses(rangeEntry.getKey())) { |
| toBeDeletedBucketMap.put(rangeEntry.getKey(), rangeEntry.getValue()); |
| canPut = true; |
| } |
| } |
| } else { |
| canPut = true; |
| } |
| |
| if (canPut) { |
| immutableBuckets.put(range, immutableBucket); |
| } |
| } |
| |
| @Override |
| public void run(Timeout timeout) throws Exception { |
| synchronized (this) { |
| if (timeout == null || timeout.isCancelled()) { |
| return; |
| } |
| lastMutableBucket.moveScheduledMessageToSharedQueue(getCutoffTime(), sharedBucketPriorityQueue); |
| } |
| super.run(timeout); |
| } |
| |
| private Optional<ImmutableBucket> findImmutableBucket(long ledgerId) { |
| if (immutableBuckets.asMapOfRanges().isEmpty()) { |
| return Optional.empty(); |
| } |
| |
| return Optional.ofNullable(immutableBuckets.get(ledgerId)); |
| } |
| |
| private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair, |
| long startTime) { |
| if (immutableBucketDelayedIndexPair != null) { |
| ImmutableBucket immutableBucket = immutableBucketDelayedIndexPair.getLeft(); |
| immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId), |
| immutableBucket); |
| |
| DelayedIndex lastDelayedIndex = immutableBucketDelayedIndexPair.getRight(); |
| snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(), |
| immutableBucket); |
| |
| immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> { |
| CompletableFuture<Long> future = createFuture.handle((bucketId, ex) -> { |
| if (ex == null) { |
| immutableBucket.setSnapshotSegments(null); |
| immutableBucket.asyncUpdateSnapshotLength(); |
| log.info("[{}] Create bucket snapshot finish, bucketKey: {}", dispatcher.getName(), |
| immutableBucket.bucketKey()); |
| |
| stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create, |
| System.currentTimeMillis() - startTime); |
| |
| return bucketId; |
| } |
| |
| log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", dispatcher.getName(), |
| immutableBucket.bucketKey(), ex); |
| stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create); |
| |
| // Put indexes back into the shared queue and downgrade to memory mode |
| synchronized (BucketDelayedDeliveryTracker.this) { |
| immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> { |
| for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment : |
| snapshotSegments) { |
| for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) { |
| sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(), |
| delayedIndex.getLedgerId(), delayedIndex.getEntryId()); |
| } |
| } |
| immutableBucket.setSnapshotSegments(null); |
| }); |
| |
| immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId); |
| immutableBuckets.asMapOfRanges().remove( |
| Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId)); |
| snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(), |
| lastDelayedIndex.getTimestamp()); |
| } |
| return INVALID_BUCKET_ID; |
| }); |
| immutableBucket.setSnapshotCreateFuture(future); |
| }); |
| } |
| } |
| |
| @Override |
| public synchronized boolean addMessage(long ledgerId, long entryId, long deliverAt) { |
| if (containsMessage(ledgerId, entryId)) { |
| return true; |
| } |
| |
| if (deliverAt < 0 || deliverAt <= getCutoffTime()) { |
| return false; |
| } |
| |
| boolean existBucket = findImmutableBucket(ledgerId).isPresent(); |
| |
| // Create bucket snapshot |
| if (!existBucket && ledgerId > lastMutableBucket.endLedgerId |
| && lastMutableBucket.size() >= minIndexCountPerBucket |
| && !lastMutableBucket.isEmpty()) { |
| long createStartTime = System.currentTimeMillis(); |
| stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create); |
| Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair = |
| lastMutableBucket.sealBucketAndAsyncPersistent( |
| this.timeStepPerBucketSnapshotSegmentInMillis, |
| this.maxIndexesPerBucketSnapshotSegment, |
| this.sharedBucketPriorityQueue); |
| afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime); |
| lastMutableBucket.resetLastMutableBucketRange(); |
| |
| if (maxNumBuckets > 0 && immutableBuckets.asMapOfRanges().size() > maxNumBuckets) { |
| asyncMergeBucketSnapshot(); |
| } |
| } |
| |
| if (ledgerId < lastMutableBucket.startLedgerId || existBucket) { |
| // If (ledgerId < startLedgerId || existBucket) means that message index belong to previous bucket range, |
| // enter sharedBucketPriorityQueue directly |
| sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId); |
| } else { |
| checkArgument(ledgerId >= lastMutableBucket.endLedgerId); |
| lastMutableBucket.addMessage(ledgerId, entryId, deliverAt); |
| } |
| |
| numberDelayedMessages++; |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId, |
| deliverAt - clock.millis()); |
| } |
| |
| updateTimer(); |
| |
| return true; |
| } |
| |
| private synchronized List<ImmutableBucket> selectMergedBuckets(final List<ImmutableBucket> values, int mergeNum) { |
| checkArgument(mergeNum < values.size()); |
| long minNumberMessages = Long.MAX_VALUE; |
| long minScheduleTimestamp = Long.MAX_VALUE; |
| int minIndex = -1; |
| for (int i = 0; i + (mergeNum - 1) < values.size(); i++) { |
| List<ImmutableBucket> immutableBuckets = values.subList(i, i + mergeNum); |
| if (immutableBuckets.stream().allMatch(bucket -> { |
| // We should skip the bucket which last segment already been load to memory, |
| // avoid record replicated index. |
| return bucket.lastSegmentEntryId > bucket.currentSegmentEntryId && !bucket.merging; |
| })) { |
| long numberMessages = immutableBuckets.stream() |
| .mapToLong(bucket -> bucket.numberBucketDelayedMessages) |
| .sum(); |
| if (numberMessages <= minNumberMessages) { |
| minNumberMessages = numberMessages; |
| long scheduleTimestamp = immutableBuckets.stream() |
| .mapToLong(bucket -> bucket.firstScheduleTimestamps.get(bucket.currentSegmentEntryId + 1)) |
| .min().getAsLong(); |
| if (scheduleTimestamp < minScheduleTimestamp) { |
| minScheduleTimestamp = scheduleTimestamp; |
| minIndex = i; |
| } |
| } |
| } |
| } |
| |
| if (minIndex >= 0) { |
| return values.subList(minIndex, minIndex + mergeNum); |
| } else if (mergeNum > 2){ |
| return selectMergedBuckets(values, mergeNum - 1); |
| } else { |
| return Collections.emptyList(); |
| } |
| } |
| |
| private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() { |
| List<ImmutableBucket> immutableBucketList = immutableBuckets.asMapOfRanges().values().stream().toList(); |
| List<ImmutableBucket> toBeMergeImmutableBuckets = selectMergedBuckets(immutableBucketList, MAX_MERGE_NUM); |
| |
| if (toBeMergeImmutableBuckets.isEmpty()) { |
| log.warn("[{}] Can't find able merged buckets", dispatcher.getName()); |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| final String bucketsStr = toBeMergeImmutableBuckets.stream().map(Bucket::bucketKey).collect( |
| Collectors.joining(",")).replaceAll(DELAYED_BUCKET_KEY_PREFIX + "_", ""); |
| if (log.isDebugEnabled()) { |
| log.info("[{}] Merging bucket snapshot, bucketKeys: {}", dispatcher.getName(), bucketsStr); |
| } |
| |
| for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) { |
| immutableBucket.merging = true; |
| } |
| |
| long mergeStartTime = System.currentTimeMillis(); |
| stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge); |
| return asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> { |
| synchronized (this) { |
| for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) { |
| immutableBucket.merging = false; |
| } |
| } |
| if (ex != null) { |
| log.error("[{}] Failed to merge bucket snapshot, bucketKeys: {}", |
| dispatcher.getName(), bucketsStr, ex); |
| |
| stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge); |
| } else { |
| log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, bucketNum: {}", |
| dispatcher.getName(), bucketsStr, immutableBuckets.asMapOfRanges().size()); |
| |
| stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge, |
| System.currentTimeMillis() - mergeStartTime); |
| } |
| }); |
| } |
| |
| private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<ImmutableBucket> buckets) { |
| List<CompletableFuture<Long>> createFutures = |
| buckets.stream().map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)) |
| .toList(); |
| |
| return FutureUtil.waitForAll(createFutures).thenCompose(bucketId -> { |
| if (createFutures.stream().anyMatch(future -> INVALID_BUCKET_ID.equals(future.join()))) { |
| return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed")); |
| } |
| |
| List<CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>> getRemainFutures = |
| buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList(); |
| |
| return FutureUtil.waitForAll(getRemainFutures) |
| .thenApply(__ -> { |
| return CombinedSegmentDelayedIndexQueue.wrap( |
| getRemainFutures.stream().map(CompletableFuture::join).toList()); |
| }) |
| .thenAccept(combinedDelayedIndexQueue -> { |
| synchronized (BucketDelayedDeliveryTracker.this) { |
| long createStartTime = System.currentTimeMillis(); |
| stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create); |
| Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair = |
| lastMutableBucket.createImmutableBucketAndAsyncPersistent( |
| timeStepPerBucketSnapshotSegmentInMillis, |
| maxIndexesPerBucketSnapshotSegment, |
| sharedBucketPriorityQueue, combinedDelayedIndexQueue, |
| buckets.get(0).startLedgerId, |
| buckets.get(buckets.size() - 1).endLedgerId); |
| |
| // Merge bit map to new bucket |
| Map<Long, RoaringBitmap> delayedIndexBitMap = |
| new HashMap<>(buckets.get(0).getDelayedIndexBitMap()); |
| for (int i = 1; i < buckets.size(); i++) { |
| buckets.get(i).delayedIndexBitMap.forEach((ledgerId, bitMapB) -> { |
| delayedIndexBitMap.compute(ledgerId, (k, bitMap) -> { |
| if (bitMap == null) { |
| return bitMapB; |
| } |
| |
| bitMap.or(bitMapB); |
| return bitMap; |
| }); |
| }); |
| } |
| immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap); |
| |
| afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime); |
| |
| immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture() |
| .orElse(NULL_LONG_PROMISE).thenCompose(___ -> { |
| List<CompletableFuture<Void>> removeFutures = |
| buckets.stream().map(bucket -> bucket.asyncDeleteBucketSnapshot(stats)) |
| .toList(); |
| return FutureUtil.waitForAll(removeFutures); |
| }); |
| |
| for (ImmutableBucket bucket : buckets) { |
| immutableBuckets.asMapOfRanges() |
| .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); |
| } |
| } |
| }); |
| }); |
| } |
| |
| @Override |
| public synchronized boolean hasMessageAvailable() { |
| long cutoffTime = getCutoffTime(); |
| |
| boolean hasMessageAvailable = getNumberOfDelayedMessages() > 0 && nextDeliveryTime() <= cutoffTime; |
| if (!hasMessageAvailable) { |
| updateTimer(); |
| } |
| return hasMessageAvailable; |
| } |
| |
| @Override |
| protected long nextDeliveryTime() { |
| if (lastMutableBucket.isEmpty() && !sharedBucketPriorityQueue.isEmpty()) { |
| return sharedBucketPriorityQueue.peekN1(); |
| } else if (sharedBucketPriorityQueue.isEmpty() && !lastMutableBucket.isEmpty()) { |
| return lastMutableBucket.nextDeliveryTime(); |
| } |
| long timestamp = lastMutableBucket.nextDeliveryTime(); |
| long bucketTimestamp = sharedBucketPriorityQueue.peekN1(); |
| return Math.min(timestamp, bucketTimestamp); |
| } |
| |
| @Override |
| public long getNumberOfDelayedMessages() { |
| return numberDelayedMessages; |
| } |
| |
| @Override |
| public long getBufferMemoryUsage() { |
| return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); |
| } |
| |
| @Override |
| public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) { |
| if (!checkPendingOpDone()) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", |
| dispatcher.getName()); |
| } |
| return Collections.emptyNavigableSet(); |
| } |
| |
| long cutoffTime = getCutoffTime(); |
| |
| lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue); |
| |
| NavigableSet<PositionImpl> positions = new TreeSet<>(); |
| int n = maxMessages; |
| |
| while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) { |
| long timestamp = sharedBucketPriorityQueue.peekN1(); |
| if (timestamp > cutoffTime) { |
| break; |
| } |
| |
| long ledgerId = sharedBucketPriorityQueue.peekN2(); |
| long entryId = sharedBucketPriorityQueue.peekN3(); |
| |
| ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); |
| if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { |
| // All message of current snapshot segment are scheduled, try load next snapshot segment |
| if (bucket.merging) { |
| log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}", |
| dispatcher.getName(), bucket.bucketKey()); |
| break; |
| } |
| |
| final int preSegmentEntryId = bucket.currentSegmentEntryId; |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", |
| dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); |
| } |
| boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); |
| if (!createFutureDone) { |
| log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", |
| dispatcher.getName(), bucket.bucketKey()); |
| break; |
| } |
| |
| long loadStartTime = System.currentTimeMillis(); |
| stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); |
| CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() |
| .thenAccept(indexList -> { |
| synchronized (BucketDelayedDeliveryTracker.this) { |
| this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); |
| if (CollectionUtils.isEmpty(indexList)) { |
| immutableBuckets.asMapOfRanges() |
| .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); |
| bucket.asyncDeleteBucketSnapshot(stats); |
| return; |
| } |
| DelayedMessageIndexBucketSnapshotFormat.DelayedIndex |
| lastDelayedIndex = indexList.get(indexList.size() - 1); |
| this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), |
| lastDelayedIndex.getEntryId(), bucket); |
| for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { |
| sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), |
| index.getEntryId()); |
| } |
| } |
| }).whenComplete((__, ex) -> { |
| if (ex != null) { |
| // Back bucket state |
| bucket.setCurrentSegmentEntryId(preSegmentEntryId); |
| |
| log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", |
| dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); |
| |
| stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); |
| } else { |
| log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", |
| dispatcher.getName(), bucket.bucketKey(), |
| (preSegmentEntryId == bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1); |
| |
| stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, |
| System.currentTimeMillis() - loadStartTime); |
| } |
| synchronized (this) { |
| if (timeout != null) { |
| timeout.cancel(); |
| } |
| timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS); |
| } |
| }); |
| |
| if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) { |
| break; |
| } |
| } |
| |
| positions.add(new PositionImpl(ledgerId, entryId)); |
| |
| sharedBucketPriorityQueue.pop(); |
| removeIndexBit(ledgerId, entryId); |
| |
| --n; |
| --numberDelayedMessages; |
| } |
| |
| updateTimer(); |
| |
| return positions; |
| } |
| |
| private synchronized boolean checkPendingOpDone() { |
| if (pendingLoad == null || pendingLoad.isDone()) { |
| pendingLoad = null; |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean shouldPauseAllDeliveries() { |
| return false; |
| } |
| |
| @Override |
| public synchronized CompletableFuture<Void> clear() { |
| CompletableFuture<Void> future = cleanImmutableBuckets(); |
| sharedBucketPriorityQueue.clear(); |
| lastMutableBucket.clear(); |
| snapshotSegmentLastIndexTable.clear(); |
| numberDelayedMessages = 0; |
| return future; |
| } |
| |
| @Override |
| public synchronized void close() { |
| super.close(); |
| lastMutableBucket.close(); |
| sharedBucketPriorityQueue.close(); |
| try { |
| List<CompletableFuture<Long>> completableFutures = immutableBuckets.asMapOfRanges().values().stream() |
| .map(bucket -> bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE)).toList(); |
| FutureUtil.waitForAll(completableFutures).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| log.warn("[{}] Failed wait to snapshot generate", dispatcher.getName(), e); |
| } |
| } |
| |
| private CompletableFuture<Void> cleanImmutableBuckets() { |
| List<CompletableFuture<Void>> futures = new ArrayList<>(); |
| Iterator<ImmutableBucket> iterator = immutableBuckets.asMapOfRanges().values().iterator(); |
| while (iterator.hasNext()) { |
| ImmutableBucket bucket = iterator.next(); |
| futures.add(bucket.clear(stats)); |
| numberDelayedMessages -= bucket.getNumberBucketDelayedMessages(); |
| iterator.remove(); |
| } |
| return FutureUtil.waitForAll(futures); |
| } |
| |
| private boolean removeIndexBit(long ledgerId, long entryId) { |
| if (lastMutableBucket.removeIndexBit(ledgerId, entryId)) { |
| return true; |
| } |
| |
| return findImmutableBucket(ledgerId).map(bucket -> bucket.removeIndexBit(ledgerId, entryId)) |
| .orElse(false); |
| } |
| |
| public boolean containsMessage(long ledgerId, long entryId) { |
| if (lastMutableBucket.containsMessage(ledgerId, entryId)) { |
| return true; |
| } |
| |
| return findImmutableBucket(ledgerId).map(bucket -> bucket.containsMessage(ledgerId, entryId)) |
| .orElse(false); |
| } |
| |
| public Map<String, TopicMetricBean> genTopicMetricMap() { |
| stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1); |
| stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size()); |
| MutableLong totalSnapshotLength = new MutableLong(); |
| immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> { |
| totalSnapshotLength.add(immutableBucket.getSnapshotLength()); |
| }); |
| stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue()); |
| return stats.genTopicMetricMap(); |
| } |
| } |