blob: a271d439e060960e2806eac7312229ec16fac200 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Enumeration;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.Errors;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*/
public class ManagedLedgerOfflineBacklog {
private final byte[] password;
private final BookKeeper.DigestType digestType;
private static final int META_READ_TIMEOUT_SECONDS = 60;
private final boolean accurate;
private final String brokerName;
public ManagedLedgerOfflineBacklog(DigestType digestType, byte[] password, String brokerName,
boolean accurate) {
this.digestType = BookKeeper.DigestType.fromApiDigestType(digestType);
this.password = password;
this.accurate = accurate;
this.brokerName = brokerName;
}
// need a better way than to duplicate the functionality below from ML
private long getNumberOfEntries(Range<PositionImpl> range,
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) {
PositionImpl fromPosition = range.lowerEndpoint();
boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
PositionImpl toPosition = range.upperEndpoint();
boolean toIncluded = range.upperBoundType() == BoundType.CLOSED;
if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
// If the 2 positions are in the same ledger
long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1;
count += fromIncluded ? 1 : 0;
count += toIncluded ? 1 : 0;
return count;
} else {
long count = 0;
// If the from & to are pointing to different ledgers, then we need to :
// 1. Add the entries in the ledger pointed by toPosition
count += toPosition.getEntryId();
count += toIncluded ? 1 : 0;
// 2. Add the entries in the ledger pointed by fromPosition
MLDataFormats.ManagedLedgerInfo.LedgerInfo li = ledgers.get(fromPosition.getLedgerId());
if (li != null) {
count += li.getEntries() - (fromPosition.getEntryId() + 1);
count += fromIncluded ? 1 : 0;
}
// 3. Add the whole ledgers entries in between
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers
.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false).values()) {
count += ls.getEntries();
}
return count;
}
}
public PersistentOfflineTopicStats getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
String managedLedgerName) throws Exception {
return estimateUnloadedTopicBacklog(factory, TopicName.get("persistent://" + managedLedgerName));
}
public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
TopicName topicName) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
long numberOfEntries = 0;
long totalSize = 0;
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
final PersistentOfflineTopicStats offlineTopicStats = new PersistentOfflineTopicStats(managedLedgerName,
brokerName);
// calculate total managed ledger size and number of entries without loading the topic
readLedgerMeta(factory, topicName, ledgers);
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values()) {
numberOfEntries += ls.getEntries();
totalSize += ls.getSize();
if (accurate) {
offlineTopicStats.addLedgerDetails(ls.getEntries(), ls.getTimestamp(), ls.getSize(), ls.getLedgerId());
}
}
offlineTopicStats.totalMessages = numberOfEntries;
offlineTopicStats.storageSize = totalSize;
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of entries - {} and size - {}", managedLedgerName, numberOfEntries, totalSize);
}
// calculate per cursor message backlog
calculateCursorBacklogs(factory, topicName, ledgers, offlineTopicStats);
offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis());
return offlineTopicStats;
}
private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicName topicName,
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch mlMetaCounter = new CountDownLatch(1);
store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */,
new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
}
// find no of entries in last ledger
if (!ledgers.isEmpty()) {
final long id = ledgers.lastKey();
AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: {}", managedLedgerName, id,
BKException.getMessage(rc));
}
if (rc == BKException.Code.OK) {
MLDataFormats.ManagedLedgerInfo.LedgerInfo info =
MLDataFormats.ManagedLedgerInfo.LedgerInfo
.newBuilder().setLedgerId(id).setEntries(lh.getLastAddConfirmed() + 1)
.setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
ledgers.put(id, info);
mlMetaCounter.countDown();
} else if (Errors.isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Ledger not found: {}", managedLedgerName, ledgers.lastKey());
ledgers.remove(ledgers.lastKey());
mlMetaCounter.countDown();
} else {
log.error("[{}] Failed to open ledger {}: {}", managedLedgerName, id,
BKException.getMessage(rc));
mlMetaCounter.countDown();
}
};
if (log.isDebugEnabled()) {
log.debug("[{}] Opening ledger {}", managedLedgerName, id);
}
try {
bk.asyncOpenLedgerNoRecovery(id, digestType, password, opencb, null);
} catch (Exception e) {
log.warn("[{}] Failed to open ledger {}: {}", managedLedgerName, id, e);
mlMetaCounter.countDown();
}
} else {
log.warn("[{}] Ledger list empty", managedLedgerName);
mlMetaCounter.countDown();
}
}
@Override
public void operationFailed(ManagedLedgerException.MetaStoreException e) {
log.warn("[{}] Unable to obtain managed ledger metadata - {}", managedLedgerName, e);
mlMetaCounter.countDown();
}
});
if (accurate) {
// block until however long it takes for operation to complete
mlMetaCounter.await();
} else {
mlMetaCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
}
private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, final TopicName topicName,
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers,
final PersistentOfflineTopicStats offlineTopicStats) throws Exception {
if (ledgers.isEmpty()) {
return;
}
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
ConcurrentOpenHashMap.<String, Long>newBuilder().build();
final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
if (log.isDebugEnabled()) {
log.debug("[{}] Last ledger position {}", managedLedgerName, lastLedgerPosition);
}
store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback<List<String>>() {
@Override
public void operationComplete(List<String> cursors, Stat v) {
// Load existing cursors
if (log.isDebugEnabled()) {
log.debug("[{}] Found {} cursors", managedLedgerName, cursors.size());
}
if (cursors.isEmpty()) {
allCursorsCounter.countDown();
return;
}
final CountDownLatch cursorCounter = new CountDownLatch(cursors.size());
for (final String cursorName : cursors) {
// determine subscription position from cursor ledger
if (log.isDebugEnabled()) {
log.debug("[{}] Loading cursor {}", managedLedgerName, cursorName);
}
AsyncCallback.OpenCallback cursorLedgerOpenCb = (rc, lh, ctx1) -> {
long ledgerId = lh.getId();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened cursor ledger {} for cursor {}. rc={}", managedLedgerName, ledgerId,
cursorName, rc);
}
if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", managedLedgerName,
ledgerId, cursorName, BKException.getMessage(rc));
cursorCounter.countDown();
return;
}
long lac = lh.getLastAddConfirmed();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} LAC {} read from ledger {}", managedLedgerName, cursorName, lac,
ledgerId);
}
if (lac == LedgerHandle.INVALID_ENTRY_ID) {
// save the ledger id and cursor to retry outside of this call back
// since we are trying to read the same cursor ledger, we will block until
// this current callback completes, since an attempt to read the entry
// will block behind this current operation to complete
ledgerRetryMap.put(cursorName, ledgerId);
log.info("[{}] Cursor {} LAC {} read from ledger {}", managedLedgerName, cursorName, lac,
ledgerId);
cursorCounter.countDown();
return;
}
final long entryId = lac;
// read last acked message position for subscription
lh.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
Object ctx) {
try {
if (log.isDebugEnabled()) {
log.debug("readComplete rc={} entryId={}", rc, entryId);
}
if (rc != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}",
managedLedgerName, ledgerId, cursorName, BKException.getMessage(rc));
// indicate that this cursor should be excluded
offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor,
lh.getId());
} else {
LedgerEntry entry = seq.nextElement();
MLDataFormats.PositionInfo positionInfo;
try {
positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntry());
} catch (InvalidProtocolBufferException e) {
log.warn(
"[{}] Error reading position from metadata ledger {} for cursor {}: {}",
managedLedgerName, ledgerId, cursorName, e);
offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor,
lh.getId());
return;
}
final PositionImpl lastAckedMessagePosition = new PositionImpl(positionInfo);
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} MD {} read last ledger position {}",
managedLedgerName, cursorName, lastAckedMessagePosition,
lastLedgerPosition);
}
// calculate cursor backlog
Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition,
lastLedgerPosition);
if (log.isDebugEnabled()) {
log.debug("[{}] Calculating backlog for cursor {} using range {}",
managedLedgerName, cursorName, range);
}
long cursorBacklog = getNumberOfEntries(range, ledgers);
offlineTopicStats.messageBacklog += cursorBacklog;
offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, lh.getId());
}
} finally {
cursorCounter.countDown();
}
}
}, null);
}; // end of cursor meta read callback
store.asyncGetCursorInfo(managedLedgerName, cursorName,
new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedCursorInfo info,
Stat stat) {
long cursorLedgerId = info.getCursorsLedgerId();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} meta-data read ledger id {}", managedLedgerName,
cursorName, cursorLedgerId);
}
if (cursorLedgerId != -1) {
bk.asyncOpenLedgerNoRecovery(cursorLedgerId, digestType, password,
cursorLedgerOpenCb, null);
} else {
PositionImpl lastAckedMessagePosition = new PositionImpl(
info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition,
lastLedgerPosition);
if (log.isDebugEnabled()) {
log.debug("[{}] Calculating backlog for cursor {} using range {}",
managedLedgerName, cursorName, range);
}
long cursorBacklog = getNumberOfEntries(range, ledgers);
offlineTopicStats.messageBacklog += cursorBacklog;
offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, cursorLedgerId);
cursorCounter.countDown();
}
}
@Override
public void operationFailed(ManagedLedgerException.MetaStoreException e) {
log.warn("[{}] Unable to obtain cursor ledger for cursor {}: {}", managedLedgerName,
cursorName, e);
cursorCounter.countDown();
}
});
} // for every cursor find backlog
try {
if (accurate) {
cursorCounter.await();
} else {
cursorCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
} catch (Exception e) {
log.warn("[{}] Error reading subscription positions{}", managedLedgerName, e);
} finally {
allCursorsCounter.countDown();
}
}
@Override
public void operationFailed(ManagedLedgerException.MetaStoreException e) {
log.warn("[{}] Failed to get the cursors list", managedLedgerName, e);
allCursorsCounter.countDown();
}
});
if (accurate) {
allCursorsCounter.await();
} else {
allCursorsCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
// go through ledgers where LAC was -1
if (accurate && ledgerRetryMap.size() > 0) {
ledgerRetryMap.forEach((cursorName, ledgerId) -> {
if (log.isDebugEnabled()) {
log.debug("Cursor {} Ledger {} Trying to obtain MD from BkAdmin", cursorName, ledgerId);
}
PositionImpl lastAckedMessagePosition = tryGetMDPosition(bk, ledgerId, cursorName);
if (lastAckedMessagePosition == null) {
log.warn("[{}] Cursor {} read from ledger {}. Unable to determine cursor position",
managedLedgerName, cursorName, ledgerId);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} read from ledger using bk admin {}. position {}", managedLedgerName,
cursorName, ledgerId, lastAckedMessagePosition);
}
// calculate cursor backlog
Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition, lastLedgerPosition);
if (log.isDebugEnabled()) {
log.debug("[{}] Calculating backlog for cursor {} using range {}", managedLedgerName,
cursorName, range);
}
long cursorBacklog = getNumberOfEntries(range, ledgers);
offlineTopicStats.messageBacklog += cursorBacklog;
offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, ledgerId);
}
});
}
}
private PositionImpl tryGetMDPosition(BookKeeper bookKeeper, long ledgerId, String cursorName) {
BookKeeperAdmin bookKeeperAdmin = null;
long lastEntry = LedgerHandle.INVALID_ENTRY_ID;
PositionImpl lastAckedMessagePosition = null;
try {
bookKeeperAdmin = new BookKeeperAdmin(bookKeeper);
for (LedgerEntry ledgerEntry : bookKeeperAdmin.readEntries(ledgerId, 0, lastEntry)) {
lastEntry = ledgerEntry.getEntryId();
if (log.isDebugEnabled()) {
log.debug(" Read entry {} from ledger {} for cursor {}", lastEntry, ledgerId, cursorName);
}
MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry());
lastAckedMessagePosition = new PositionImpl(positionInfo);
if (log.isDebugEnabled()) {
log.debug("Cursor {} read position {}", cursorName, lastAckedMessagePosition);
}
}
} catch (Exception e) {
log.warn("Unable to determine LAC for ledgerId {} for cursor {}: {}", ledgerId, cursorName, e);
} finally {
if (bookKeeperAdmin != null) {
try {
bookKeeperAdmin.close();
} catch (Exception e) {
log.warn("Unable to close bk admin for ledgerId {} for cursor {}", ledgerId, cursorName, e);
}
}
}
return lastAckedMessagePosition;
}
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerOfflineBacklog.class);
}