blob: 8b2742d958783708d130d41e877f81e9f7a8d62d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.metadata.api.Stat;
/**
* Detailed design can be found in <a href="https://github.com/apache/pulsar/issues/16153">PIP-180</a>.
*/
@Slf4j
public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
private final String sourceMLName;
private volatile Stat sourceLedgersStat;
public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper,
MetaStore store, ManagedLedgerConfig config,
OrderedScheduler scheduledExecutor,
String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
this.sourceMLName = config.getShadowSourceName();
}
/**
* ShadowManagedLedger init steps:
* 1. this.initialize : read source managedLedgerInfo
* 2. super.initialize : read its own read source managedLedgerInfo
* 3. this.initializeBookKeeper
* 4. super.initializeCursors
*/
@Override
synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) {
log.info("Opening shadow managed ledger {} with source={}", name, sourceMLName);
executor.execute(() -> doInitialize(callback, ctx));
}
private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) {
// Fetch the list of existing ledgers in the source managed ledger
store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
executor.execute(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat))
);
store.getManagedLedgerInfo(sourceMLName, false, null, new MetaStore.MetaStoreCallback<>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Source ML info:{}", name, sourceMLName, mlInfo);
}
if (sourceLedgersStat != null && sourceLedgersStat.getVersion() >= stat.getVersion()) {
log.warn("Newer version of mlInfo is already processed. Previous stat={}, current stat={}",
sourceLedgersStat, stat);
return;
}
sourceLedgersStat = stat;
if (mlInfo.getLedgerInfoCount() == 0) {
// Small chance here, since shadow topic is created after source topic exists.
log.warn("[{}] Source topic ledger list is empty! source={},mlInfo={},stat={}", name, sourceMLName,
mlInfo, stat);
ShadowManagedLedgerImpl.super.initialize(callback, ctx);
return;
}
if (mlInfo.hasTerminatedPosition()) {
lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition());
log.info("[{}][{}] Recovering managed ledger terminated at {}", name, sourceMLName,
lastConfirmedEntry);
}
for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
}
final long lastLedgerId = ledgers.lastKey();
mbean.startDataLedgerOpenOp();
AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> executor.execute(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened source ledger {}", name, lastLedgerId);
}
if (rc == BKException.Code.OK) {
LedgerInfo info =
LedgerInfo.newBuilder()
.setLedgerId(lastLedgerId)
.setEntries(lh.getLastAddConfirmed() + 1)
.setSize(lh.getLength())
.setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);
//Always consider the last ledger is opened in source.
STATE_UPDATER.set(ShadowManagedLedgerImpl.this, State.LedgerOpened);
currentLedger = lh;
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh)
.thenRun(() -> ShadowManagedLedgerImpl.super.initialize(callback, ctx))
.exceptionally(ex -> {
callback.initializeFailed(
new ManagedLedgerException.ManagedLedgerInterceptException(
ex.getCause()));
return null;
});
} else {
ShadowManagedLedgerImpl.super.initialize(callback, ctx);
}
} else if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Source ledger not found: {}", name, lastLedgerId);
ledgers.remove(lastLedgerId);
ShadowManagedLedgerImpl.super.initialize(callback, ctx);
} else {
log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId,
BKException.getMessage(rc));
callback.initializeFailed(createManagedLedgerException(rc));
}
});
//open ledger in readonly mode.
bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(), opencb, null);
}
@Override
public void operationFailed(ManagedLedgerException.MetaStoreException e) {
if (e instanceof ManagedLedgerException.MetadataNotFoundException) {
callback.initializeFailed(new ManagedLedgerException.ManagedLedgerNotFoundException(e));
} else {
callback.initializeFailed(new ManagedLedgerException(e));
}
}
});
}
@Override
protected synchronized void initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing bookkeeper for shadowManagedLedger; ledgers {}", name, ledgers);
}
// Calculate total entries and size
Iterator<LedgerInfo> iterator = ledgers.values().iterator();
while (iterator.hasNext()) {
LedgerInfo li = iterator.next();
if (li.getEntries() > 0) {
NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries());
TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
} else if (li.getLedgerId() != currentLedger.getId()) {
//do not remove the last empty ledger.
iterator.remove();
}
}
initLastConfirmedEntry();
// Save it back to ensure all nodes exist and properties are persisted.
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStore.MetaStoreCallback<>() {
@Override
public void operationComplete(Void result, Stat stat) {
ledgersStat = stat;
initializeCursors(callback);
}
@Override
public void operationFailed(ManagedLedgerException.MetaStoreException e) {
handleBadVersion(e);
callback.initializeFailed(new ManagedLedgerException(e));
}
});
}
private void initLastConfirmedEntry() {
if (currentLedger == null) {
return;
}
lastConfirmedEntry = new PositionImpl(currentLedger.getId(), currentLedger.getLastAddConfirmed());
// bypass empty ledgers, find last ledger with Message if possible.
while (lastConfirmedEntry.getEntryId() == -1) {
Map.Entry<Long, LedgerInfo> formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
if (formerLedger != null) {
LedgerInfo ledgerInfo = formerLedger.getValue();
lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
} else {
break;
}
}
}
@Override
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
if (state != State.LedgerOpened) {
addOperation.failed(new ManagedLedgerException("Managed ledger is not opened"));
return;
}
if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) {
addOperation.failed(new ManagedLedgerException("Illegal addOperation context object."));
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})",
name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId());
}
pendingAddEntries.add(addOperation);
if (position.getLedgerId() <= currentLedger.getId()) {
// Write into lastLedger
if (position.getLedgerId() == currentLedger.getId()) {
addOperation.setLedger(currentLedger);
}
currentLedgerEntries = position.getEntryId();
currentLedgerSize += addOperation.data.readableBytes();
addOperation.initiateShadowWrite();
} // for addOperation with ledgerId > currentLedger, will be processed in `updateLedgersIdsComplete`
lastAddEntryTimeMs = System.currentTimeMillis();
}
/**
* terminate is not allowed on shadow topic.
* @param callback
* @param ctx
*/
@Override
public synchronized void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) {
callback.terminateFailed(new ManagedLedgerException("Terminate is not allowed on shadow topic."), ctx);
}
/**
* Handle source ManagedLedgerInfo updates.
* Update types:
* 1. new ledgers.
* 2. old ledgers deleted.
* 3. old ledger offload info updated (including ledger deleted from bookie by offloader)
*/
private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] new SourceManagedLedgerInfo:{}, prevStat={},stat={}", name, sourceMLName, mlInfo,
sourceLedgersStat, stat);
}
if (sourceLedgersStat != null && sourceLedgersStat.getVersion() >= stat.getVersion()) {
log.warn("Newer version of mlInfo is already processed. Previous stat={}, current stat={}",
sourceLedgersStat, stat);
return;
}
sourceLedgersStat = stat;
if (mlInfo.hasTerminatedPosition()) {
lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition());
log.info("[{}][{}] Process managed ledger terminated at {}", name, sourceMLName, lastConfirmedEntry);
}
TreeMap<Long, LedgerInfo> newLedgerInfos = new TreeMap<>();
for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
newLedgerInfos.put(ls.getLedgerId(), ls);
}
for (Map.Entry<Long, LedgerInfo> ledgerInfoEntry : newLedgerInfos.entrySet()) {
Long ledgerId = ledgerInfoEntry.getKey();
LedgerInfo ledgerInfo = ledgerInfoEntry.getValue();
if (ledgerInfo.getEntries() > 0) {
LedgerInfo oldLedgerInfo = ledgers.put(ledgerId, ledgerInfo);
if (oldLedgerInfo == null) {
log.info("[{}]Read new ledger info from source,ledgerId={}", name, ledgerId);
} else {
if (!oldLedgerInfo.equals(ledgerInfo)) {
log.info("[{}] Old ledger info updated in source,ledgerId={}", name, ledgerId);
// ledger deleted from bookkeeper by offloader.
if (ledgerInfo.hasOffloadContext()
&& ledgerInfo.getOffloadContext().getBookkeeperDeleted()
&& (!oldLedgerInfo.hasOffloadContext() || !oldLedgerInfo.getOffloadContext()
.getBookkeeperDeleted())) {
log.info("[{}] Old ledger removed from bookkeeper by offloader in source,ledgerId={}", name,
ledgerId);
invalidateReadHandle(ledgerId);
}
}
}
}
}
Long lastLedgerId = newLedgerInfos.lastKey();
// open the last ledger.
if (lastLedgerId != null && !(currentLedger != null && currentLedger.getId() == lastLedgerId)) {
ledgers.put(lastLedgerId, newLedgerInfos.get(lastLedgerId));
mbean.startDataLedgerOpenOp();
//open ledger in readonly mode.
bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(),
(rc, lh, ctx1) -> executor.execute(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened new source ledger {}", name, lastLedgerId);
}
if (rc == BKException.Code.OK) {
LedgerInfo info = LedgerInfo.newBuilder()
.setLedgerId(lastLedgerId)
.setEntries(lh.getLastAddConfirmed() + 1)
.setSize(lh.getLength())
.setTimestamp(clock.millis()).build();
ledgers.put(lastLedgerId, info);
currentLedger = lh;
currentLedgerEntries = 0;
currentLedgerSize = 0;
initLastConfirmedEntry();
updateLedgersIdsComplete(null);
maybeUpdateCursorBeforeTrimmingConsumedLedger();
} else if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Source ledger not found: {}", name, lastLedgerId);
ledgers.remove(lastLedgerId);
} else {
log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId,
BKException.getMessage(rc));
}
}), null);
}
//handle old ledgers deleted.
List<LedgerInfo> ledgersToDelete = new ArrayList<>(ledgers.headMap(newLedgerInfos.firstKey(), false).values());
if (!ledgersToDelete.isEmpty()) {
log.info("[{}]ledgers deleted in source, size={}", name, ledgersToDelete.size());
try {
advanceCursorsIfNecessary(ledgersToDelete);
} catch (ManagedLedgerException.LedgerNotExistException e) {
log.info("[{}] First non deleted Ledger is not found, advanceCursors fails", name);
}
doDeleteLedgers(ledgersToDelete);
}
}
@Override
public synchronized void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) {
store.unwatchManagedLedgerInfo(sourceMLName);
super.asyncClose(callback, ctx);
}
@Override
protected synchronized void updateLedgersIdsComplete(LedgerHandle originalCurrentLedger) {
STATE_UPDATER.set(this, State.LedgerOpened);
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
}
createNewOpAddEntryForNewLedger();
// Process all the pending addEntry requests
for (OpAddEntry op : pendingAddEntries) {
Position position = (Position) op.getCtx();
if (position.getLedgerId() <= currentLedger.getId()) {
if (position.getLedgerId() == currentLedger.getId()) {
op.setLedger(currentLedger);
} else {
op.setLedger(null);
}
currentLedgerEntries = position.getEntryId();
currentLedgerSize += op.data.readableBytes();
op.initiateShadowWrite();
} else {
break;
}
}
}
@Override
protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
}
}