| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.bookkeeper.mledger.impl; |
| |
| import com.google.common.collect.BoundType; |
| import com.google.common.collect.Range; |
| import com.google.protobuf.InvalidProtocolBufferException; |
| import java.util.Enumeration; |
| import java.util.List; |
| import java.util.NavigableMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.bookkeeper.client.AsyncCallback; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.client.BookKeeperAdmin; |
| import org.apache.bookkeeper.client.LedgerEntry; |
| import org.apache.bookkeeper.client.LedgerHandle; |
| import org.apache.bookkeeper.client.api.DigestType; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.proto.MLDataFormats; |
| import org.apache.bookkeeper.mledger.util.Errors; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; |
| import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; |
| import org.apache.pulsar.metadata.api.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| */ |
| public class ManagedLedgerOfflineBacklog { |
| |
| private final byte[] password; |
| private final BookKeeper.DigestType digestType; |
| private static final int META_READ_TIMEOUT_SECONDS = 60; |
| private final boolean accurate; |
| private final String brokerName; |
| |
| public ManagedLedgerOfflineBacklog(DigestType digestType, byte[] password, String brokerName, |
| boolean accurate) { |
| this.digestType = BookKeeper.DigestType.fromApiDigestType(digestType); |
| this.password = password; |
| this.accurate = accurate; |
| this.brokerName = brokerName; |
| } |
| |
| // need a better way than to duplicate the functionality below from ML |
| private long getNumberOfEntries(Range<PositionImpl> range, |
| NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) { |
| PositionImpl fromPosition = range.lowerEndpoint(); |
| boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED; |
| PositionImpl toPosition = range.upperEndpoint(); |
| boolean toIncluded = range.upperBoundType() == BoundType.CLOSED; |
| |
| if (fromPosition.getLedgerId() == toPosition.getLedgerId()) { |
| // If the 2 positions are in the same ledger |
| long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1; |
| count += fromIncluded ? 1 : 0; |
| count += toIncluded ? 1 : 0; |
| return count; |
| } else { |
| long count = 0; |
| // If the from & to are pointing to different ledgers, then we need to : |
| // 1. Add the entries in the ledger pointed by toPosition |
| count += toPosition.getEntryId(); |
| count += toIncluded ? 1 : 0; |
| |
| // 2. Add the entries in the ledger pointed by fromPosition |
| MLDataFormats.ManagedLedgerInfo.LedgerInfo li = ledgers.get(fromPosition.getLedgerId()); |
| if (li != null) { |
| count += li.getEntries() - (fromPosition.getEntryId() + 1); |
| count += fromIncluded ? 1 : 0; |
| } |
| |
| // 3. Add the whole ledgers entries in between |
| for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers |
| .subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false).values()) { |
| count += ls.getEntries(); |
| } |
| |
| return count; |
| } |
| } |
| |
| public PersistentOfflineTopicStats getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory, |
| String managedLedgerName) throws Exception { |
| return estimateUnloadedTopicBacklog(factory, TopicName.get("persistent://" + managedLedgerName)); |
| } |
| |
| public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory, |
| TopicName topicName) throws Exception { |
| String managedLedgerName = topicName.getPersistenceNamingEncoding(); |
| long numberOfEntries = 0; |
| long totalSize = 0; |
| final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = new ConcurrentSkipListMap<>(); |
| final PersistentOfflineTopicStats offlineTopicStats = new PersistentOfflineTopicStats(managedLedgerName, |
| brokerName); |
| |
| // calculate total managed ledger size and number of entries without loading the topic |
| readLedgerMeta(factory, topicName, ledgers); |
| for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values()) { |
| numberOfEntries += ls.getEntries(); |
| totalSize += ls.getSize(); |
| if (accurate) { |
| offlineTopicStats.addLedgerDetails(ls.getEntries(), ls.getTimestamp(), ls.getSize(), ls.getLedgerId()); |
| } |
| } |
| offlineTopicStats.totalMessages = numberOfEntries; |
| offlineTopicStats.storageSize = totalSize; |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Total number of entries - {} and size - {}", managedLedgerName, numberOfEntries, totalSize); |
| } |
| |
| // calculate per cursor message backlog |
| calculateCursorBacklogs(factory, topicName, ledgers, offlineTopicStats); |
| offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis()); |
| |
| return offlineTopicStats; |
| } |
| |
| private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicName topicName, |
| final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception { |
| String managedLedgerName = topicName.getPersistenceNamingEncoding(); |
| MetaStore store = factory.getMetaStore(); |
| BookKeeper bk = factory.getBookKeeper(); |
| final CountDownLatch mlMetaCounter = new CountDownLatch(1); |
| |
| store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */, |
| new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() { |
| @Override |
| public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { |
| for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) { |
| ledgers.put(ls.getLedgerId(), ls); |
| } |
| |
| // find no of entries in last ledger |
| if (!ledgers.isEmpty()) { |
| final long id = ledgers.lastKey(); |
| AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Opened ledger {}: {}", managedLedgerName, id, |
| BKException.getMessage(rc)); |
| } |
| if (rc == BKException.Code.OK) { |
| MLDataFormats.ManagedLedgerInfo.LedgerInfo info = |
| MLDataFormats.ManagedLedgerInfo.LedgerInfo |
| .newBuilder().setLedgerId(id).setEntries(lh.getLastAddConfirmed() + 1) |
| .setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build(); |
| ledgers.put(id, info); |
| mlMetaCounter.countDown(); |
| } else if (Errors.isNoSuchLedgerExistsException(rc)) { |
| log.warn("[{}] Ledger not found: {}", managedLedgerName, ledgers.lastKey()); |
| ledgers.remove(ledgers.lastKey()); |
| mlMetaCounter.countDown(); |
| } else { |
| log.error("[{}] Failed to open ledger {}: {}", managedLedgerName, id, |
| BKException.getMessage(rc)); |
| mlMetaCounter.countDown(); |
| } |
| }; |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Opening ledger {}", managedLedgerName, id); |
| } |
| try { |
| bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null); |
| } catch (Exception e) { |
| log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, e); |
| mlMetaCounter.countDown(); |
| } |
| } else { |
| log.warn("[{}] Ledger list empty", managedLedgerName); |
| mlMetaCounter.countDown(); |
| } |
| } |
| |
| @Override |
| public void operationFailed(ManagedLedgerException.MetaStoreException e) { |
| log.warn("[{}] Unable to obtain managed ledger metadata - {}", managedLedgerName, e); |
| mlMetaCounter.countDown(); |
| } |
| }); |
| |
| if (accurate) { |
| // block until however long it takes for operation to complete |
| mlMetaCounter.await(); |
| } else { |
| mlMetaCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
| |
| } |
| } |
| |
| private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, final TopicName topicName, |
| final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers, |
| final PersistentOfflineTopicStats offlineTopicStats) throws Exception { |
| |
| if (ledgers.isEmpty()) { |
| return; |
| } |
| String managedLedgerName = topicName.getPersistenceNamingEncoding(); |
| MetaStore store = factory.getMetaStore(); |
| BookKeeper bk = factory.getBookKeeper(); |
| final CountDownLatch allCursorsCounter = new CountDownLatch(1); |
| final long errorInReadingCursor = -1; |
| ConcurrentOpenHashMap<String, Long> ledgerRetryMap = |
| ConcurrentOpenHashMap.<String, Long>newBuilder().build(); |
| |
| final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue(); |
| final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Last ledger position {}", managedLedgerName, lastLedgerPosition); |
| } |
| |
| store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback<List<String>>() { |
| @Override |
| public void operationComplete(List<String> cursors, Stat v) { |
| // Load existing cursors |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Found {} cursors", managedLedgerName, cursors.size()); |
| } |
| |
| if (cursors.isEmpty()) { |
| allCursorsCounter.countDown(); |
| return; |
| } |
| |
| final CountDownLatch cursorCounter = new CountDownLatch(cursors.size()); |
| |
| for (final String cursorName : cursors) { |
| // determine subscription position from cursor ledger |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Loading cursor {}", managedLedgerName, cursorName); |
| } |
| |
| AsyncCallback.OpenCallback cursorLedgerOpenCb = (rc, lh, ctx1) -> { |
| long ledgerId = lh.getId(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Opened cursor ledger {} for cursor {}. rc={}", managedLedgerName, ledgerId, |
| cursorName, rc); |
| } |
| if (rc != BKException.Code.OK) { |
| log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", managedLedgerName, |
| ledgerId, cursorName, BKException.getMessage(rc)); |
| cursorCounter.countDown(); |
| return; |
| } |
| long lac = lh.getLastAddConfirmed(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Cursor {} LAC {} read from ledger {}", managedLedgerName, cursorName, lac, |
| ledgerId); |
| } |
| |
| if (lac == LedgerHandle.INVALID_ENTRY_ID) { |
| // save the ledger id and cursor to retry outside of this call back |
| // since we are trying to read the same cursor ledger, we will block until |
| // this current callback completes, since an attempt to read the entry |
| // will block behind this current operation to complete |
| ledgerRetryMap.put(cursorName, ledgerId); |
| log.info("[{}] Cursor {} LAC {} read from ledger {}", managedLedgerName, cursorName, lac, |
| ledgerId); |
| cursorCounter.countDown(); |
| return; |
| } |
| final long entryId = lac; |
| // read last acked message position for subscription |
| lh.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback() { |
| @Override |
| public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, |
| Object ctx) { |
| try { |
| if (log.isDebugEnabled()) { |
| log.debug("readComplete rc={} entryId={}", rc, entryId); |
| } |
| if (rc != BKException.Code.OK) { |
| log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", |
| managedLedgerName, ledgerId, cursorName, BKException.getMessage(rc)); |
| // indicate that this cursor should be excluded |
| offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor, |
| lh.getId()); |
| } else { |
| LedgerEntry entry = seq.nextElement(); |
| MLDataFormats.PositionInfo positionInfo; |
| try { |
| positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntry()); |
| } catch (InvalidProtocolBufferException e) { |
| log.warn( |
| "[{}] Error reading position from metadata ledger {} for cursor {}: {}", |
| managedLedgerName, ledgerId, cursorName, e); |
| offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor, |
| lh.getId()); |
| return; |
| } |
| final PositionImpl lastAckedMessagePosition = new PositionImpl(positionInfo); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Cursor {} MD {} read last ledger position {}", |
| managedLedgerName, cursorName, lastAckedMessagePosition, |
| lastLedgerPosition); |
| } |
| // calculate cursor backlog |
| Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition, |
| lastLedgerPosition); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Calculating backlog for cursor {} using range {}", |
| managedLedgerName, cursorName, range); |
| } |
| long cursorBacklog = getNumberOfEntries(range, ledgers); |
| offlineTopicStats.messageBacklog += cursorBacklog; |
| offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, lh.getId()); |
| } |
| } finally { |
| cursorCounter.countDown(); |
| } |
| } |
| }, null); |
| |
| }; // end of cursor meta read callback |
| |
| store.asyncGetCursorInfo(managedLedgerName, cursorName, |
| new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() { |
| @Override |
| public void operationComplete(MLDataFormats.ManagedCursorInfo info, |
| Stat stat) { |
| long cursorLedgerId = info.getCursorsLedgerId(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Cursor {} meta-data read ledger id {}", managedLedgerName, |
| cursorName, cursorLedgerId); |
| } |
| if (cursorLedgerId != -1) { |
| bk.asyncOpenLedgerNoRecovery(cursorLedgerId, digestType, password, |
| cursorLedgerOpenCb, null); |
| } else { |
| PositionImpl lastAckedMessagePosition = new PositionImpl( |
| info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId()); |
| Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition, |
| lastLedgerPosition); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Calculating backlog for cursor {} using range {}", |
| managedLedgerName, cursorName, range); |
| } |
| long cursorBacklog = getNumberOfEntries(range, ledgers); |
| offlineTopicStats.messageBacklog += cursorBacklog; |
| offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, cursorLedgerId); |
| cursorCounter.countDown(); |
| } |
| |
| } |
| |
| @Override |
| public void operationFailed(ManagedLedgerException.MetaStoreException e) { |
| log.warn("[{}] Unable to obtain cursor ledger for cursor {}: {}", managedLedgerName, |
| cursorName, e); |
| cursorCounter.countDown(); |
| } |
| }); |
| } // for every cursor find backlog |
| try { |
| if (accurate) { |
| cursorCounter.await(); |
| } else { |
| cursorCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
| } |
| } catch (Exception e) { |
| log.warn("[{}] Error reading subscription positions{}", managedLedgerName, e); |
| } finally { |
| allCursorsCounter.countDown(); |
| } |
| } |
| |
| @Override |
| public void operationFailed(ManagedLedgerException.MetaStoreException e) { |
| log.warn("[{}] Failed to get the cursors list", managedLedgerName, e); |
| allCursorsCounter.countDown(); |
| } |
| }); |
| if (accurate) { |
| allCursorsCounter.await(); |
| } else { |
| allCursorsCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
| } |
| |
| // go through ledgers where LAC was -1 |
| if (accurate && ledgerRetryMap.size() > 0) { |
| ledgerRetryMap.forEach((cursorName, ledgerId) -> { |
| if (log.isDebugEnabled()) { |
| log.debug("Cursor {} Ledger {} Trying to obtain MD from BkAdmin", cursorName, ledgerId); |
| } |
| PositionImpl lastAckedMessagePosition = tryGetMDPosition(bk, ledgerId, cursorName); |
| if (lastAckedMessagePosition == null) { |
| log.warn("[{}] Cursor {} read from ledger {}. Unable to determine cursor position", |
| managedLedgerName, cursorName, ledgerId); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Cursor {} read from ledger using bk admin {}. position {}", managedLedgerName, |
| cursorName, ledgerId, lastAckedMessagePosition); |
| } |
| // calculate cursor backlog |
| Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition, lastLedgerPosition); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Calculating backlog for cursor {} using range {}", managedLedgerName, |
| cursorName, range); |
| } |
| long cursorBacklog = getNumberOfEntries(range, ledgers); |
| offlineTopicStats.messageBacklog += cursorBacklog; |
| offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, ledgerId); |
| } |
| }); |
| } |
| } |
| |
| private PositionImpl tryGetMDPosition(BookKeeper bookKeeper, long ledgerId, String cursorName) { |
| BookKeeperAdmin bookKeeperAdmin = null; |
| long lastEntry = LedgerHandle.INVALID_ENTRY_ID; |
| PositionImpl lastAckedMessagePosition = null; |
| try { |
| bookKeeperAdmin = new BookKeeperAdmin(bookKeeper); |
| for (LedgerEntry ledgerEntry : bookKeeperAdmin.readEntries(ledgerId, 0, lastEntry)) { |
| lastEntry = ledgerEntry.getEntryId(); |
| if (log.isDebugEnabled()) { |
| log.debug(" Read entry {} from ledger {} for cursor {}", lastEntry, ledgerId, cursorName); |
| } |
| MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry()); |
| lastAckedMessagePosition = new PositionImpl(positionInfo); |
| if (log.isDebugEnabled()) { |
| log.debug("Cursor {} read position {}", cursorName, lastAckedMessagePosition); |
| } |
| } |
| } catch (Exception e) { |
| log.warn("Unable to determine LAC for ledgerId {} for cursor {}: {}", ledgerId, cursorName, e); |
| } finally { |
| if (bookKeeperAdmin != null) { |
| try { |
| bookKeeperAdmin.close(); |
| } catch (Exception e) { |
| log.warn("Unable to close bk admin for ledgerId {} for cursor {}", ledgerId, cursorName, e); |
| } |
| } |
| |
| } |
| return lastAckedMessagePosition; |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(ManagedLedgerOfflineBacklog.class); |
| } |