blob: 1fdf69395068f58da9f3cb4f53370b95e08f44b1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.bookkeeper.mledger.impl;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.metadata.api.Stat;
public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
String name) {
super(factory, bookKeeper, store, config, scheduledExecutor, name);
CompletableFuture<Void> initialize() {
CompletableFuture<Void> future = new CompletableFuture<>();
// Fetch the list of existing ledgers in the managed ledger
store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
state = State.LedgerOpened;
for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls);
// Last ledger stat may be zeroed, we must update it
if (ledgers.size() > 0 && ledgers.lastEntry().getValue().getEntries() == 0) {
long lastLedgerId = ledgers.lastKey();
// Fetch last add confirmed for last ledger
.thenAccept(readHandle -> {
readHandle.readLastAddConfirmedAsync().thenAccept(lastAddConfirmed -> {
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId)
.setEntries(lastAddConfirmed + 1).setSize(readHandle.getLength())
ledgers.put(lastLedgerId, info);
}).exceptionally(ex -> {
if (ex instanceof CompletionException
&& ex.getCause() instanceof IllegalArgumentException) {
// The last ledger was empty, so we cannot read the last add confirmed.
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId)
ledgers.put(lastLedgerId, info);
} else {
future.completeExceptionally(new ManagedLedgerException(ex));
return null;
}).exceptionally(ex -> {
if (ex instanceof CompletionException
&& ex.getCause() instanceof ArrayIndexOutOfBoundsException) {
// The last ledger was empty, so we cannot read the last add confirmed.
LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId).setEntries(0)
ledgers.put(lastLedgerId, info);
} else {
future.completeExceptionally(new ManagedLedgerException(ex));
return null;
} else {
// The read-only managed ledger is ready to use
public void operationFailed(MetaStoreException e) {
if (e instanceof MetadataNotFoundException) {
future.completeExceptionally(new ManagedLedgerNotFoundException(e));
} else {
future.completeExceptionally(new ManagedLedgerException(e));
return future;
ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
if (ledgers.isEmpty()) {
lastConfirmedEntry = PositionImpl.EARLIEST;
} else if (ledgers.lastEntry().getValue().getEntries() > 0) {
// Last ledger has some of the entries
lastConfirmedEntry = new PositionImpl(ledgers.lastKey(), ledgers.lastEntry().getValue().getEntries() - 1);
} else {
// Last ledger is empty. If there is a previous ledger, position on the last entry of that ledger
if (ledgers.size() > 1) {
long lastLedgerId = ledgers.lastKey();
LedgerInfo li = ledgers.headMap(lastLedgerId, false).lastEntry().getValue();
lastConfirmedEntry = new PositionImpl(li.getLedgerId(), li.getEntries() - 1);
} else {
lastConfirmedEntry = PositionImpl.EARLIEST;
return new ReadOnlyCursorImpl(bookKeeper, config, this, startPosition, "read-only-cursor");
public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
.thenAccept((ledger) -> asyncReadEntry(ledger, position, callback, ctx))
.exceptionally((ex) -> {
log.error("[{}] Error opening ledger for reading at position {} - {}. Op: {}",,
position, ex.getMessage(), callback);
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
public long getNumberOfEntries() {
return getNumberOfEntries(Range.openClosed(PositionImpl.EARLIEST, getLastPosition()));
protected boolean isReadOnly() {
return true;