blob: 5c8cb14e65295828de69be7f49f8e36b168acc7b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mechanism to safely update the metadata of a ledger.
*
* <p>The loop takes the following steps:
* 1. Check if the metadata needs to be changed.
* 2. Make a copy of the metadata and modify it.
* 3. Write the modified copy to zookeeper.
* 3.1 If the write succeeds, go to 6.
* 3.2 If the write fails because of a failed compare and swap, go to 4.
* 4. Read the metadata back from the store
* 5. Update the local copy of the metadata with the metadata read in 4, go to 1.
* 6. Update the local copy of the metadata with the metadata which has just been written.
*
* <p>All mutating operations are compare and swap operation. If the compare fails, another
* iteration of the loop begins.
*/
class MetadataUpdateLoop {
static final Logger LOG = LoggerFactory.getLogger(MetadataUpdateLoop.class);
private final LedgerManager lm;
private final long ledgerId;
private final Supplier<Versioned<LedgerMetadata>> currentLocalValue;
private final NeedsUpdatePredicate needsTransformation;
private final MetadataTransform transform;
private final LocalValueUpdater updateLocalValue;
private final String logContext;
private volatile int writeLoopCount = 0;
private static final AtomicIntegerFieldUpdater<MetadataUpdateLoop> WRITE_LOOP_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(MetadataUpdateLoop.class, "writeLoopCount");
interface NeedsUpdatePredicate {
boolean needsUpdate(LedgerMetadata metadata) throws Exception;
}
interface MetadataTransform {
LedgerMetadata transform(LedgerMetadata metadata) throws Exception;
}
interface LocalValueUpdater {
boolean updateValue(Versioned<LedgerMetadata> oldValue, Versioned<LedgerMetadata> newValue);
}
/**
* Construct the loop. This takes a set of functions which may be called multiple times
* during the loop.
*
* @param lm the ledger manager used for reading and writing metadata
* @param ledgerId the id of the ledger we will be operating on
* @param currentLocalValue should return the current local value of the metadata
* @param needsTransformation should return true, if the metadata needs to be modified.
* should throw an exception, if this update doesn't make sense.
* @param transform takes a metadata objects, transforms, and returns it, without modifying
* the original
* @param updateLocalValue if the local value matches the first parameter, update it to the
* second parameter and return true, return false otherwise
*/
MetadataUpdateLoop(LedgerManager lm,
long ledgerId,
Supplier<Versioned<LedgerMetadata>> currentLocalValue,
NeedsUpdatePredicate needsTransformation,
MetadataTransform transform,
LocalValueUpdater updateLocalValue) {
this.lm = lm;
this.ledgerId = ledgerId;
this.currentLocalValue = currentLocalValue;
this.needsTransformation = needsTransformation;
this.transform = transform;
this.updateLocalValue = updateLocalValue;
this.logContext = String.format("UpdateLoop(ledgerId=%d,loopId=%08x)",
ledgerId, System.identityHashCode(this));
}
CompletableFuture<Versioned<LedgerMetadata>> run() {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
writeLoop(currentLocalValue.get(), promise);
return promise;
}
private void writeLoop(Versioned<LedgerMetadata> currentLocal,
CompletableFuture<Versioned<LedgerMetadata>> promise) {
LOG.debug("{} starting write loop iteration, attempt {}",
logContext, WRITE_LOOP_COUNT_UPDATER.incrementAndGet(this));
try {
if (needsTransformation.needsUpdate(currentLocal.getValue())) {
LedgerMetadata transformed = transform.transform(currentLocal.getValue());
lm.writeLedgerMetadata(ledgerId, transformed, currentLocal.getVersion())
.whenComplete((writtenMetadata, ex) -> {
if (ex == null) {
if (updateLocalValue.updateValue(currentLocal, writtenMetadata)) {
LOG.debug("{} success", logContext);
promise.complete(writtenMetadata);
} else {
LOG.debug("{} local value changed while we were writing, try again", logContext);
writeLoop(currentLocalValue.get(), promise);
}
} else if (ex instanceof BKException.BKMetadataVersionException) {
LOG.info("{} conflict writing metadata to store, update local value and try again",
logContext);
updateLocalValueFromStore(ledgerId).whenComplete((readMetadata, readEx) -> {
if (readEx == null) {
writeLoop(readMetadata, promise);
} else {
promise.completeExceptionally(readEx);
}
});
} else {
LOG.error("{} Error writing metadata to store", logContext, ex);
promise.completeExceptionally(ex);
}
});
} else {
LOG.debug("{} Update not needed, completing", logContext);
promise.complete(currentLocal);
}
} catch (Exception e) {
LOG.error("{} Exception updating", logContext, e);
promise.completeExceptionally(e);
}
}
private CompletableFuture<Versioned<LedgerMetadata>> updateLocalValueFromStore(long ledgerId) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
readLoop(ledgerId, promise);
return promise;
}
private void readLoop(long ledgerId, CompletableFuture<Versioned<LedgerMetadata>> promise) {
Versioned<LedgerMetadata> current = currentLocalValue.get();
lm.readLedgerMetadata(ledgerId).whenComplete(
(read, exception) -> {
if (exception != null) {
LOG.error("{} Failed to read metadata from store",
logContext, exception);
promise.completeExceptionally(exception);
} else if (current.getVersion().compare(read.getVersion()) == Version.Occurred.CONCURRENTLY) {
// no update needed, these are the same in the immutable world
promise.complete(current);
} else if (updateLocalValue.updateValue(current, read)) {
// updated local value successfully
promise.complete(read);
} else {
// local value changed while we were reading,
// look at new value, and try to read again
readLoop(ledgerId, promise);
}
});
}
}