blob: 72930f7acee278b9c534c4722c6e37eb924c4da9 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import java.security.GeneralSecurityException;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Read only ledger handle. This ledger handle allows you to
* read from a ledger but not to write to it. It overrides all
* the public write operations from LedgerHandle.
* It should be returned for BookKeeper#openLedger operations.
*/
class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListener {
private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyLedgerHandle.class);
private Object metadataLock = new Object();
private final NavigableMap<Long, List<BookieSocketAddress>> newEnsemblesFromRecovery = new TreeMap<>();
class MetadataUpdater extends SafeRunnable {
final Versioned<LedgerMetadata> newMetadata;
MetadataUpdater(Versioned<LedgerMetadata> metadata) {
this.newMetadata = metadata;
}
@Override
public void safeRun() {
while (true) {
Versioned<LedgerMetadata> currentMetadata = getVersionedLedgerMetadata();
Version.Occurred occurred = currentMetadata.getVersion().compare(newMetadata.getVersion());
if (Version.Occurred.BEFORE == occurred) {
synchronized (ReadOnlyLedgerHandle.this) {
if (setLedgerMetadata(currentMetadata, newMetadata)) {
LOG.info("Updated ledger metadata for ledger {} to {}, version {}.",
ledgerId, newMetadata.getValue().toSafeString(), newMetadata.getVersion());
break;
}
}
} else {
break;
}
}
}
@Override
public String toString() {
return String.format("MetadataUpdater(%d)", ledgerId);
}
}
ReadOnlyLedgerHandle(ClientContext clientCtx,
long ledgerId, Versioned<LedgerMetadata> metadata,
BookKeeper.DigestType digestType, byte[] password,
boolean watch)
throws GeneralSecurityException, NumberFormatException {
super(clientCtx, ledgerId, metadata, digestType, password, WriteFlag.NONE);
if (watch) {
clientCtx.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
}
}
@Override
public void close()
throws InterruptedException, BKException {
clientCtx.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
}
@Override
public void asyncClose(CloseCallback cb, Object ctx) {
clientCtx.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
cb.closeComplete(BKException.Code.OK, this, ctx);
}
@Override
public long addEntry(byte[] data) throws InterruptedException, BKException {
return addEntry(data, 0, data.length);
}
@Override
public long addEntry(byte[] data, int offset, int length)
throws InterruptedException, BKException {
LOG.error("Tried to add entry on a Read-Only ledger handle, ledgerid=" + ledgerId);
throw BKException.create(BKException.Code.IllegalOpException);
}
@Override
public void asyncAddEntry(final byte[] data, final AddCallback cb,
final Object ctx) {
asyncAddEntry(data, 0, data.length, cb, ctx);
}
@Override
public void asyncAddEntry(final byte[] data, final int offset, final int length,
final AddCallback cb, final Object ctx) {
LOG.error("Tried to add entry on a Read-Only ledger handle, ledgerid=" + ledgerId);
cb.addComplete(BKException.Code.IllegalOpException, this,
LedgerHandle.INVALID_ENTRY_ID, ctx);
}
@Override
public void onChanged(long lid, Versioned<LedgerMetadata> newMetadata) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ledger metadata update on {} : {}", lid, newMetadata);
}
if (this.ledgerId != lid) {
return;
}
if (null == newMetadata) {
return;
}
Versioned<LedgerMetadata> currentMetadata = getVersionedLedgerMetadata();
Version.Occurred occurred = currentMetadata.getVersion().compare(newMetadata.getVersion());
if (LOG.isDebugEnabled()) {
LOG.debug("Try to update metadata from {} to {} : {}",
currentMetadata, newMetadata, occurred);
}
if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
try {
clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new MetadataUpdater(newMetadata));
} catch (RejectedExecutionException ree) {
LOG.error("Failed on submitting updater to update ledger metadata on ledger {} : {}",
ledgerId, newMetadata);
}
}
}
@Override
public String toString() {
return String.format("ReadOnlyLedgerHandle(lid = %d, id = %d)", ledgerId, super.hashCode());
}
@Override
protected void initializeWriteHandleState() {
// Essentially a noop, we don't want to set up write handle state here for a ReadOnlyLedgerHandle
explicitLacFlushPolicy = ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY;
}
@Override
public void asyncReadLastEntry(ReadCallback cb, Object ctx) {
asyncReadLastConfirmed(new ReadLastConfirmedCallback() {
@Override
public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
if (rc == BKException.Code.OK) {
if (lastConfirmed < 0) {
// Ledger was empty, so there is no last entry to read
cb.readComplete(BKException.Code.NoSuchEntryException, ReadOnlyLedgerHandle.this, null, ctx);
} else {
asyncReadEntriesInternal(lastConfirmed, lastConfirmed, cb, ctx, false);
}
} else {
LOG.error("ReadException in asyncReadLastEntry, ledgerId: {}, lac: {}, rc:{}",
lastConfirmed, ledgerId, rc);
cb.readComplete(rc, ReadOnlyLedgerHandle.this, null, ctx);
}
}
}, ctx);
}
/**
* For a read only ledger handle, this method will only ever be called during recovery,
* when we are reading forward from LAC and writing back those entries. As such,
* unlike with LedgerHandle, we do not want to persist changes to the metadata as they occur,
* but rather, we want to defer the persistence until recovery has completed, and do it all
* on the close.
*/
@Override
void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
// handleBookieFailure should always run in the ordered executor thread for this
// ledger, so this synchronized should be unnecessary, but putting it here now
// just in case (can be removed when we validate threads)
synchronized (metadataLock) {
String logContext = String.format("[RecoveryEnsembleChange(ledger:%d)]", ledgerId);
long lac = getLastAddConfirmed();
LedgerMetadata metadata = getLedgerMetadata();
List<BookieSocketAddress> currentEnsemble = getCurrentEnsemble();
try {
List<BookieSocketAddress> newEnsemble = EnsembleUtils.replaceBookiesInEnsemble(
clientCtx.getBookieWatcher(), metadata, currentEnsemble, failedBookies, logContext);
Set<Integer> replaced = EnsembleUtils.diffEnsemble(currentEnsemble, newEnsemble);
if (!replaced.isEmpty()) {
newEnsemblesFromRecovery.put(lac + 1, newEnsemble);
unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
}
} catch (BKException.BKNotEnoughBookiesException e) {
LOG.error("Could not get additional bookie to remake ensemble, closing ledger: {}", ledgerId);
handleUnrecoverableErrorDuringAdd(e.getCode());
return;
}
}
}
@Override
void handleUnrecoverableErrorDuringAdd(int rc) {
errorOutPendingAdds(rc);
}
void recover(GenericCallback<Void> finalCb) {
recover(finalCb, null, false);
}
/**
* Recover the ledger.
*
* @param finalCb
* callback after recovery is done.
* @param listener
* read entry listener on recovery reads.
* @param forceRecovery
* force the recovery procedure even the ledger metadata shows the ledger is closed.
*/
void recover(GenericCallback<Void> finalCb,
final @VisibleForTesting ReadEntryListener listener,
final boolean forceRecovery) {
final GenericCallback<Void> cb = new TimedGenericCallback<Void>(
finalCb,
BKException.Code.OK,
clientCtx.getClientStats().getRecoverOpLogger());
MetadataUpdateLoop.NeedsUpdatePredicate needsUpdate =
(metadata) -> metadata.getState() == LedgerMetadata.State.OPEN;
if (forceRecovery) {
// in the force recovery case, we want to update the metadata
// to IN_RECOVERY, even if the ledger is already closed
needsUpdate = (metadata) -> metadata.getState() != LedgerMetadata.State.IN_RECOVERY;
}
new MetadataUpdateLoop(
clientCtx.getLedgerManager(), getId(),
this::getVersionedLedgerMetadata,
needsUpdate,
(metadata) -> LedgerMetadataBuilder.from(metadata).withInRecoveryState().build(),
this::setLedgerMetadata)
.run()
.thenCompose((metadata) -> {
if (metadata.getValue().isClosed()) {
return CompletableFuture.completedFuture(ReadOnlyLedgerHandle.this);
} else {
return new LedgerRecoveryOp(ReadOnlyLedgerHandle.this, clientCtx)
.setEntryListener(listener)
.initiate();
}
})
.thenCompose((ignore) -> closeRecovered())
.whenComplete((ignore, ex) -> {
if (ex != null) {
cb.operationComplete(
BKException.getExceptionCode(ex, BKException.Code.UnexpectedConditionException), null);
} else {
cb.operationComplete(BKException.Code.OK, null);
}
});
}
CompletableFuture<Versioned<LedgerMetadata>> closeRecovered() {
long lac, len;
synchronized (this) {
lac = lastAddConfirmed;
len = length;
}
LOG.info("Closing recovered ledger {} at entry {}", getId(), lac);
CompletableFuture<Versioned<LedgerMetadata>> f = new MetadataUpdateLoop(
clientCtx.getLedgerManager(), getId(),
this::getVersionedLedgerMetadata,
(metadata) -> metadata.getState() == LedgerMetadata.State.IN_RECOVERY,
(metadata) -> {
LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
Long lastEnsembleKey = LedgerMetadataUtils.getLastEnsembleKey(metadata);
synchronized (metadataLock) {
newEnsemblesFromRecovery.entrySet().forEach(
(e) -> {
checkState(e.getKey() >= lastEnsembleKey,
"Once a ledger is in recovery, noone can add ensembles without closing");
// Occurs when a bookie need to be replaced at very start of recovery
if (lastEnsembleKey.equals(e.getKey())) {
builder.replaceEnsembleEntry(e.getKey(), e.getValue());
} else {
builder.newEnsembleEntry(e.getKey(), e.getValue());
}
});
}
return builder.withClosedState().withLastEntryId(lac).withLength(len).build();
},
this::setLedgerMetadata).run();
f.thenRun(() -> {
synchronized (metadataLock) {
newEnsemblesFromRecovery.clear();
}
});
return f;
}
@Override
List<BookieSocketAddress> getCurrentEnsemble() {
synchronized (metadataLock) {
if (!newEnsemblesFromRecovery.isEmpty()) {
return newEnsemblesFromRecovery.lastEntry().getValue();
} else {
return super.getCurrentEnsemble();
}
}
}
}