Add fencing to recovery reads to avoid data loss issue
Descriptions of the changes in this PR:
### Motivation
Adding the fencing flag to recovery reads avoids a data loss scenario as described in [issue 2614](https://github.com/apache/bookkeeper/issues/2614)
### Changes
Added the fencing flag to recovery reads. Refactored some mocking unit test code to introduce fencing and allow two writers to share the same bookie state. Added a new unit to verify the fix. You can recreate the initial problem by removing the fencing flag from recovery reads and running the new unit test.
Master Issue: #2614
Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Andrey Yegorov <andrey.yegorov@datastax.com>
This closes #2616 from Vanlightly/fix-fencing
(cherry picked from commit 017307bc67431a7616861ad09927c4e3327633d1)
Signed-off-by: Enrico Olivelli <eolivelli@apache.org>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index e6c011e..b48761e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -568,9 +568,14 @@
lh.throttler.acquire();
}
- int flags = isRecoveryRead ? BookieProtocol.FLAG_HIGH_PRIORITY : BookieProtocol.FLAG_NONE;
- clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
- this, new ReadContext(bookieIndex, to, entry), flags);
+ if (isRecoveryRead) {
+ int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING;
+ clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+ this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey);
+ } else {
+ clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId,
+ this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE);
+ }
}
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
index af6d980..38e4879 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
@@ -21,16 +21,23 @@
package org.apache.bookkeeper.client;
import com.google.common.collect.Lists;
+
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
+import org.apache.bookkeeper.proto.MockBookies;
import org.apache.bookkeeper.versioning.Versioned;
import org.junit.Assert;
import org.junit.Test;
@@ -86,7 +93,7 @@
Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, b2, b3));
- clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
clientCtx.getMockBookieClient().setPreWriteHook(
(bookie, ledgerId, entryId) -> FutureUtils.exception(new BKException.BKWriteException()));
@@ -110,7 +117,7 @@
CompletableFuture<Void> writingBack = new CompletableFuture<>();
CompletableFuture<Void> blocker = new CompletableFuture<>();
- clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
// will block recovery at the writeback phase
clientCtx.getMockBookieClient().setPreWriteHook(
(bookie, ledgerId, entryId) -> {
@@ -149,7 +156,7 @@
CompletableFuture<Void> writingBack = new CompletableFuture<>();
CompletableFuture<Void> blocker = new CompletableFuture<>();
CompletableFuture<Void> failing = new CompletableFuture<>();
- clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
// will block recovery at the writeback phase
clientCtx.getMockBookieClient().setPreWriteHook(
(bookie, ledgerId, entryId) -> {
@@ -195,7 +202,7 @@
CompletableFuture<Void> writingBack = new CompletableFuture<>();
CompletableFuture<Void> blocker = new CompletableFuture<>();
CompletableFuture<Void> failing = new CompletableFuture<>();
- clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
clientCtx.getMockBookieClient().errorBookies(b2);
ReadOnlyLedgerHandle lh = new ReadOnlyLedgerHandle(
@@ -216,8 +223,8 @@
clientCtx.getMockRegistrationClient().addBookies(b4).get();
Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, b2, b3));
- clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
- clientCtx.getMockBookieClient().seedEntries(b3, 1L, 1L, -1L);
+ clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().getMockBookies().seedEntries(b3, 1L, 1L, -1L);
clientCtx.getMockBookieClient().setPreWriteHook(
(bookie, ledgerId, entryId) -> {
if (bookie.equals(b2) && entryId == 1L) {
@@ -248,7 +255,7 @@
clientCtx.getMockRegistrationClient().addBookies(b4, b5).get();
Versioned<LedgerMetadata> md = setupLedger(clientCtx, 1, Lists.newArrayList(b1, b2, b3));
- clientCtx.getMockBookieClient().seedEntries(b1, 1L, 0L, -1L);
+ clientCtx.getMockBookieClient().getMockBookies().seedEntries(b1, 1L, 0L, -1L);
clientCtx.getMockBookieClient().setPreWriteHook(
(bookie, ledgerId, entryId) -> {
if (bookie.equals(b1) || bookie.equals(b2)) {
@@ -271,4 +278,217 @@
Assert.assertTrue(lh.getLedgerMetadata().getAllEnsembles().get(0L).contains(b5));
Assert.assertEquals(lh.getLastAddConfirmed(), 0L);
}
+
+ /**
+ * This test verifies the fix for the data loss scenario found by the TLA+ specfication, specifically
+ * the invariant violation that metadata and writer can diverge. The scenario is that the original writer
+ * can commit an entry e that will later be lost because a second writer can close the ledger at e-1.
+ * The cause is that fencing was originally only performed on LAC reads which is not enough to prevent
+ * the 1st writer from reaching Ack Quorum after the 2nd writer has closed the ledger. The fix has
+ * been to fence on recovery reads also.
+ */
+ @Test
+ public void testFirstWriterCannotCommitWriteAfter2ndWriterCloses() throws Exception {
+ /*
+ This test uses CompletableFutures to control the sequence of actions performed by
+ two writers. There are different sets of futures:
+ - block*: These futures block the various reads, writes and metadata updates until the
+ test thread is ready for them to be executed. Thus ensuring the right sequence
+ of events occur.
+ - reachedStepN: These futures block in the test thread to ensure that we only unblock
+ an action when the prior one has been executed and we are already blocked
+ on the next actionin the sequence.
+ */
+
+ // Setup w1
+ CompletableFuture<Void> reachedStep1 = new CompletableFuture<>();
+ CompletableFuture<Void> reachedStep2 = new CompletableFuture<>();
+ CompletableFuture<Void> reachedStep3 = new CompletableFuture<>();
+ CompletableFuture<Void> reachedStep4 = new CompletableFuture<>();
+ CompletableFuture<Void> reachedStep5 = new CompletableFuture<>();
+ CompletableFuture<Void> reachedStep6 = new CompletableFuture<>();
+ CompletableFuture<Void> reachedStep7 = new CompletableFuture<>();
+ CompletableFuture<Void> reachedStep8 = new CompletableFuture<>();
+ CompletableFuture<Void> reachedStep9 = new CompletableFuture<>();
+
+ MockBookies mockBookies = new MockBookies();
+ MockClientContext clientCtx1 = MockClientContext.create(mockBookies);
+ Versioned<LedgerMetadata> md1 = setupLedger(clientCtx1, 1, Lists.newArrayList(b1, b2, b3));
+
+ CompletableFuture<Void> blockB1Write = new CompletableFuture<>();
+ CompletableFuture<Void> blockB2Write = new CompletableFuture<>();
+ CompletableFuture<Void> blockB3Write = new CompletableFuture<>();
+ clientCtx1.getMockBookieClient().setPreWriteHook(
+ (bookie, ledgerId, entryId) -> {
+ // ignore seed entries e0 and e1
+ if (entryId < 2) {
+ return FutureUtils.value(null);
+ }
+
+ if (!reachedStep1.isDone()) {
+ reachedStep1.complete(null);
+ }
+
+ if (bookie.equals(b1)) {
+ return blockB1Write;
+ } else if (bookie.equals(b2)) {
+ reachedStep9.complete(null);
+ return blockB2Write;
+ } else if (bookie.equals(b3)) {
+ reachedStep3.complete(null);
+ return blockB3Write;
+ } else {
+ return FutureUtils.value(null);
+ }
+ });
+
+ LedgerHandle w1 = new LedgerHandle(clientCtx1, 1, md1,
+ BookKeeper.DigestType.CRC32C,
+ ClientUtil.PASSWD, WriteFlag.NONE);
+ w1.addEntry("e0".getBytes(StandardCharsets.UTF_8));
+ w1.addEntry("e1".getBytes(StandardCharsets.UTF_8));
+
+ // Setup w2
+ MockClientContext clientCtx2 = MockClientContext.create(mockBookies);
+ Versioned<LedgerMetadata> md2 = setupLedger(clientCtx2, 1, Lists.newArrayList(b1, b2, b3));
+
+ CompletableFuture<Void> blockB1ReadLac = new CompletableFuture<>();
+ CompletableFuture<Void> blockB2ReadLac = new CompletableFuture<>();
+ CompletableFuture<Void> blockB3ReadLac = new CompletableFuture<>();
+
+ CompletableFuture<Void> blockB1ReadEntry0 = new CompletableFuture<>();
+ CompletableFuture<Void> blockB2ReadEntry0 = new CompletableFuture<>();
+ CompletableFuture<Void> blockB3ReadEntry0 = new CompletableFuture<>();
+
+ AtomicBoolean isB1LacRead = new AtomicBoolean(true);
+ AtomicBoolean isB2LacRead = new AtomicBoolean(true);
+ AtomicBoolean isB3LacRead = new AtomicBoolean(true);
+
+ clientCtx2.getMockBookieClient().setPreReadHook(
+ (bookie, ledgerId, entryId) -> {
+ if (bookie.equals(b1)) {
+ if (isB1LacRead.get()) {
+ isB1LacRead.set(false);
+ reachedStep2.complete(null);
+ return blockB1ReadLac;
+ } else {
+ reachedStep6.complete(null);
+ return blockB1ReadEntry0;
+ }
+ } else if (bookie.equals(b2)) {
+ if (isB2LacRead.get()) {
+ try {
+ isB2LacRead.set(false);
+ reachedStep4.complete(null);
+ blockB2ReadLac.get(); // block this read - it does not succeed
+ } catch (Throwable t){}
+ return FutureUtils.exception(new BKException.BKWriteException());
+ } else {
+ reachedStep7.complete(null);
+ return blockB2ReadEntry0;
+ }
+ } else if (bookie.equals(b3)) {
+ if (isB3LacRead.get()) {
+ isB3LacRead.set(false);
+ reachedStep5.complete(null);
+ return blockB3ReadLac;
+ } else {
+ return blockB3ReadEntry0;
+ }
+ } else {
+ return FutureUtils.value(null);
+ }
+ });
+
+ AtomicInteger w2MetaUpdates = new AtomicInteger(0);
+ CompletableFuture<Void> blockW2StartingRecovery = new CompletableFuture<>();
+ CompletableFuture<Void> blockW2ClosingLedger = new CompletableFuture<>();
+ clientCtx2.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) -> {
+ if (w2MetaUpdates.get() == 0) {
+ w2MetaUpdates.incrementAndGet();
+ return blockW2StartingRecovery;
+ } else {
+ reachedStep8.complete(null);
+ return blockW2ClosingLedger;
+ }
+ });
+
+ ReadOnlyLedgerHandle w2 = new ReadOnlyLedgerHandle(
+ clientCtx2, 1L, md2, BookKeeper.DigestType.CRC32C, PASSWD, false);
+
+ // Start an async add entry, blocked for now.
+ CompletableFuture<Object> w1WriteFuture = new CompletableFuture<>();
+ AtomicInteger writeResult = new AtomicInteger(0);
+ w1.asyncAddEntry("e2".getBytes(), (int rc, LedgerHandle lh1, long entryId, Object ctx) -> {
+ if (rc == BKException.Code.OK) {
+ writeResult.set(1);
+ } else {
+ writeResult.set(2);
+ }
+ SyncCallbackUtils.finish(rc, null, w1WriteFuture);
+ }, null);
+
+ // Step 1. w2 starts recovery
+ stepBlock(reachedStep1);
+ GenericCallbackFuture<Void> recoveryPromise = new GenericCallbackFuture<>();
+ w2.recover(recoveryPromise, null, false);
+ blockW2StartingRecovery.complete(null);
+
+ // Step 2. w2 fencing read LAC reaches B1
+ stepBlock(reachedStep2);
+ blockB1ReadLac.complete(null);
+
+ // Step 3. w1 add e0 reaches B3
+ stepBlock(reachedStep3);
+ blockB3Write.complete(null);
+
+ // Step 4. w2 fencing LAC read does not reach B2 or it fails
+ stepBlock(reachedStep4);
+ blockB2ReadLac.complete(null);
+
+ // Step 5. w2 fencing LAC read reaches B3
+ stepBlock(reachedStep5);
+ blockB3ReadLac.complete(null);
+
+ // Step 6. w2 sends read e0 to b1, gets NoSuchLedger
+ stepBlock(reachedStep6);
+ blockB1ReadEntry0.complete(null);
+
+ // Step 7. w2 send read e0 to b2, gets NoSuchLedger
+ stepBlock(reachedStep7);
+ blockB2ReadEntry0.complete(null);
+
+ // Step 8. w2 closes ledger because (Qw-Qa)+1 bookies confirmed they do not have it
+ // last entry id set to 0
+ stepBlock(reachedStep8);
+ blockW2ClosingLedger.complete(null);
+
+ // Step 9. w1 add e0 reaches b2 (which was fenced by a recovery read)
+ stepBlock(reachedStep9);
+ blockB2Write.complete(null);
+
+ // Step 10. w1 write fails to reach AckQuorum
+ try {
+ w1WriteFuture.get(200, TimeUnit.MILLISECONDS);
+ Assert.fail("The write to b2 should have failed as it was fenced by the recovery read of step 7");
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof BKException.BKLedgerFencedException);
+ }
+
+ // w1 received negative acknowledgement of e2 being written
+ Assert.assertEquals(1, w1.getLedgerMetadata().getAllEnsembles().size());
+ Assert.assertEquals(2, writeResult.get());
+ Assert.assertEquals(1L, w1.getLastAddConfirmed());
+
+ // w2 closed the ledger with only the original entries, not the third one
+ // i.e there is no divergence between w1m, w2 and metadata
+ Assert.assertEquals(1, w2.getLedgerMetadata().getAllEnsembles().size());
+ Assert.assertEquals(1L, w2.getLastAddConfirmed());
+ }
+
+ private void stepBlock(CompletableFuture<Void> reachedStepFuture) {
+ try {
+ reachedStepFuture.get();
+ } catch (Exception e) {}
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
index 68d0502..ae3475e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -35,6 +35,7 @@
import org.apache.bookkeeper.meta.MockLedgerManager;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.MockBookieClient;
+import org.apache.bookkeeper.proto.MockBookies;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.mockito.Mockito;
@@ -55,7 +56,7 @@
private MockRegistrationClient regClient;
private ByteBufAllocator allocator;
- static MockClientContext create() throws Exception {
+ static MockClientContext create(MockBookies mockBookies) throws Exception {
ClientConfiguration conf = new ClientConfiguration();
OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().name("mock-executor").numThreads(1).build();
MockRegistrationClient regClient = new MockRegistrationClient();
@@ -67,17 +68,22 @@
bookieWatcherImpl.initialBlockingBookieRead();
return new MockClientContext()
- .setConf(ClientInternalConf.fromConfig(conf))
- .setLedgerManager(new MockLedgerManager())
- .setBookieWatcher(bookieWatcherImpl)
- .setPlacementPolicy(placementPolicy)
- .setRegistrationClient(regClient)
- .setBookieClient(new MockBookieClient(scheduler))
- .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
- .setMainWorkerPool(scheduler)
- .setScheduler(scheduler)
- .setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
- .setIsClientClosed(() -> false);
+ .setConf(ClientInternalConf.fromConfig(conf))
+ .setLedgerManager(new MockLedgerManager())
+ .setBookieWatcher(bookieWatcherImpl)
+ .setPlacementPolicy(placementPolicy)
+ .setRegistrationClient(regClient)
+ .setBookieClient(new MockBookieClient(scheduler, mockBookies))
+ .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
+ .setMainWorkerPool(scheduler)
+ .setScheduler(scheduler)
+ .setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
+ .setIsClientClosed(() -> false);
+ }
+
+ static MockClientContext create() throws Exception {
+ MockBookies mockBookies = new MockBookies();
+ return create(mockBookies);
}
static MockClientContext copyOf(ClientContext other) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index fa3d1ba..f655328 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -20,23 +20,20 @@
*/
package org.apache.bookkeeper.proto;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD;
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -49,14 +46,13 @@
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
-import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Mock implementation of BookieClient.
*/
@@ -64,9 +60,9 @@
static final Logger LOG = LoggerFactory.getLogger(MockBookieClient.class);
final OrderedExecutor executor;
- final ConcurrentHashMap<BookieId, ConcurrentHashMap<Long, LedgerData>> data = new ConcurrentHashMap<>();
+ final MockBookies mockBookies;
final Set<BookieId> errorBookies =
- Collections.newSetFromMap(new ConcurrentHashMap<BookieId, Boolean>());
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
/**
* Runs before or after an operation. Can stall the operation or error it.
@@ -82,6 +78,13 @@
public MockBookieClient(OrderedExecutor executor) {
this.executor = executor;
+ this.mockBookies = new MockBookies();
+ }
+
+ public MockBookieClient(OrderedExecutor executor,
+ MockBookies mockBookies) {
+ this.executor = executor;
+ this.mockBookies = mockBookies;
}
public void setPreReadHook(Hook hook) {
@@ -110,14 +113,12 @@
}
}
- public void seedEntries(BookieId bookie, long ledgerId, long entryId, long lac) throws Exception {
- DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DigestType.CRC32C,
- UnpooledByteBufAllocator.DEFAULT, false);
- ByteBuf entry = ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
- entryId, lac, 0, Unpooled.buffer(10)));
+ public boolean isErrored(BookieId bookieId) {
+ return errorBookies.contains(bookieId);
+ }
- LedgerData ledger = getBookieData(bookie).computeIfAbsent(ledgerId, LedgerData::new);
- ledger.addEntry(entryId, entry);
+ public MockBookies getMockBookies() {
+ return mockBookies;
}
@Override
@@ -140,9 +141,9 @@
ForceLedgerCallback cb, Object ctx) {
executor.executeOrdered(ledgerId,
safeRun(() -> {
- cb.forceLedgerComplete(BKException.Code.IllegalOpException,
- ledgerId, addr, ctx);
- }));
+ cb.forceLedgerComplete(BKException.Code.IllegalOpException,
+ ledgerId, addr, ctx);
+ }));
}
@Override
@@ -150,9 +151,9 @@
long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx) {
executor.executeOrdered(ledgerId,
safeRun(() -> {
- cb.writeLacComplete(BKException.Code.IllegalOpException,
- ledgerId, addr, ctx);
- }));
+ cb.writeLacComplete(BKException.Code.IllegalOpException,
+ ledgerId, addr, ctx);
+ }));
}
@Override
@@ -161,23 +162,33 @@
int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags) {
toSend.retain();
preWriteHook.runHook(addr, ledgerId, entryId)
- .thenComposeAsync(
- (ignore) -> {
- LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
- if (errorBookies.contains(addr)) {
- LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, entryId);
- return FutureUtils.exception(new BKException.BKWriteException());
- }
- LedgerData ledger = getBookieData(addr).computeIfAbsent(ledgerId, LedgerData::new);
- ledger.addEntry(entryId, copyData(toSend));
- toSend.release();
- return FutureUtils.value(null);
- }, executor.chooseThread(ledgerId))
- .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId, entryId))
- .whenCompleteAsync((res, ex) -> {
+ .thenComposeAsync(
+ (ignore) -> {
+ LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId);
+ if (isErrored(addr)) {
+ LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, entryId);
+ return FutureUtils.exception(new BKException.BKWriteException());
+ }
+
+ try {
+ if ((options & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD) {
+ mockBookies.recoveryAddEntry(addr, ledgerId, entryId, copyData(toSend));
+ } else {
+ mockBookies.addEntry(addr, ledgerId, entryId, copyData(toSend));
+ }
+ } catch (BKException bke) {
+ return FutureUtils.exception(bke);
+ } finally {
+ toSend.release();
+ }
+
+ return FutureUtils.value(null);
+ }, executor.chooseThread(ledgerId))
+ .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId, entryId))
+ .whenCompleteAsync((res, ex) -> {
if (ex != null) {
cb.writeComplete(BKException.getExceptionCode(ex, BKException.Code.WriteException),
- ledgerId, entryId, addr, ctx);
+ ledgerId, entryId, addr, ctx);
} else {
cb.writeComplete(BKException.Code.OK, ledgerId, entryId, addr, ctx);
}
@@ -188,9 +199,9 @@
public void readLac(BookieId addr, long ledgerId, ReadLacCallback cb, Object ctx) {
executor.executeOrdered(ledgerId,
safeRun(() -> {
- cb.readLacComplete(BKException.Code.IllegalOpException,
- ledgerId, null, null, ctx);
- }));
+ cb.readLacComplete(BKException.Code.IllegalOpException,
+ ledgerId, null, null, ctx);
+ }));
}
@Override
@@ -198,35 +209,28 @@
ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
boolean allowFastFail) {
preReadHook.runHook(addr, ledgerId, entryId)
- .thenComposeAsync((res) -> {
+ .thenComposeAsync((res) -> {
LOG.info("[{};L{}] read entry {}", addr, ledgerId, entryId);
- if (errorBookies.contains(addr)) {
+ if (isErrored(addr)) {
LOG.warn("[{};L{}] erroring read {}", addr, ledgerId, entryId);
return FutureUtils.exception(new BKException.BKReadException());
}
- LedgerData ledger = getBookieData(addr).get(ledgerId);
- if (ledger == null) {
- LOG.warn("[{};L{}] ledger not found", addr, ledgerId);
- return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException());
+ try {
+ ByteBuf entry = mockBookies.readEntry(addr, flags, ledgerId, entryId);
+ return FutureUtils.value(entry);
+ } catch (BKException bke) {
+ return FutureUtils.exception(bke);
}
-
- ByteBuf entry = ledger.getEntry(entryId);
- if (entry == null) {
- LOG.warn("[{};L{}] entry({}) not found", addr, ledgerId, entryId);
- return FutureUtils.exception(new BKException.BKNoSuchEntryException());
- }
-
- return FutureUtils.value(entry);
}, executor.chooseThread(ledgerId))
- .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId, entryId).thenApply((res) -> buf))
- .whenCompleteAsync((res, ex) -> {
+ .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId, entryId).thenApply((res) -> buf))
+ .whenCompleteAsync((res, ex) -> {
if (ex != null) {
cb.readEntryComplete(BKException.getExceptionCode(ex, BKException.Code.ReadException),
- ledgerId, entryId, null, ctx);
+ ledgerId, entryId, null, ctx);
} else {
cb.readEntryComplete(BKException.Code.OK,
- ledgerId, entryId, res.slice(), ctx);
+ ledgerId, entryId, res.slice(), ctx);
}
}, executor.chooseThread(ledgerId));
}
@@ -242,9 +246,9 @@
Object ctx) {
executor.executeOrdered(ledgerId,
safeRun(() -> {
- cb.readEntryComplete(BKException.Code.IllegalOpException,
- ledgerId, entryId, null, ctx);
- }));
+ cb.readEntryComplete(BKException.Code.IllegalOpException,
+ ledgerId, entryId, null, ctx);
+ }));
}
@Override
@@ -252,14 +256,14 @@
GetBookieInfoCallback cb, Object ctx) {
executor.executeOrdered(addr,
safeRun(() -> {
- cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
- null, ctx);
- }));
+ cb.getBookieInfoComplete(BKException.Code.IllegalOpException,
+ null, ctx);
+ }));
}
@Override
public CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieId address,
- long ledgerId) {
+ long ledgerId) {
FutureGetListOfEntriesOfLedger futureResult = new FutureGetListOfEntriesOfLedger(ledgerId);
executor.executeOrdered(address, safeRun(() -> {
futureResult
@@ -277,10 +281,6 @@
public void close() {
}
- private ConcurrentHashMap<Long, LedgerData> getBookieData(BookieId addr) {
- return data.computeIfAbsent(addr, (key) -> new ConcurrentHashMap<>());
- }
-
private static ByteBuf copyData(ByteBufList list) {
ByteBuf buf = Unpooled.buffer(list.readableBytes());
for (int i = 0; i < list.size(); i++) {
@@ -288,29 +288,4 @@
}
return buf;
}
-
- private static class LedgerData {
- final long ledgerId;
- private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
- LedgerData(long ledgerId) {
- this.ledgerId = ledgerId;
- }
-
- void addEntry(long entryId, ByteBuf entry) {
- entries.put(entryId, entry);
- }
-
- ByteBuf getEntry(long entryId) {
- if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
- Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
- if (lastEntry != null) {
- return lastEntry.getValue();
- } else {
- return null;
- }
- } else {
- return entries.get(entryId);
- }
- }
- }
-}
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
new file mode 100644
index 0000000..c670e87
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiPredicate;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.DistributionSchedule;
+import org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
+import org.apache.bookkeeper.util.ByteBufList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mocks an ensemble of bookies and can be shared between more than one MockBookieClient
+ * so that it can be used to check two writers accessing the same ledger.
+ */
+public class MockBookies {
+ static final Logger LOG = LoggerFactory.getLogger(MockBookies.class);
+ final ConcurrentHashMap<BookieId, ConcurrentHashMap<Long, MockLedgerData>> data = new ConcurrentHashMap<>();
+
+ public void seedLedgerForBookie(BookieId bookieId, long ledgerId,
+ LedgerMetadata metadata) throws Exception {
+ seedLedgerBase(ledgerId, metadata, (bookie, entry) -> bookie.equals(bookieId));
+ }
+
+ public void seedLedger(long ledgerId, LedgerMetadata metadata) throws Exception {
+ seedLedgerBase(ledgerId, metadata, (bookie, entry) -> true);
+ }
+
+ public void seedLedgerBase(long ledgerId, LedgerMetadata metadata,
+ BiPredicate<BookieId, Long> shouldSeed) throws Exception {
+ DistributionSchedule schedule = new RoundRobinDistributionSchedule(metadata.getWriteQuorumSize(),
+ metadata.getAckQuorumSize(),
+ metadata.getEnsembleSize());
+ long lastEntry = metadata.isClosed()
+ ? metadata.getLastEntryId() : metadata.getAllEnsembles().lastEntry().getKey() - 1;
+ long lac = -1;
+ for (long e = 0; e <= lastEntry; e++) {
+ List<BookieId> ensemble = metadata.getEnsembleAt(e);
+ DistributionSchedule.WriteSet ws = schedule.getWriteSet(e);
+ for (int i = 0; i < ws.size(); i++) {
+ BookieId bookieId = ensemble.get(ws.get(i));
+ if (shouldSeed.test(bookieId, e)) {
+ seedEntries(bookieId, ledgerId, e, lac);
+ }
+ }
+ lac = e;
+ }
+ }
+
+ public void seedEntries(BookieId bookieId, long ledgerId, long entryId, long lac) throws Exception {
+ ByteBuf entry = generateEntry(ledgerId, entryId, lac);
+ MockLedgerData ledger = getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+ ledger.addEntry(entryId, entry);
+ }
+
+ public ByteBuf generateEntry(long ledgerId, long entryId, long lac) throws Exception {
+ DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0],
+ DataFormats.LedgerMetadataFormat.DigestType.CRC32C,
+ UnpooledByteBufAllocator.DEFAULT, false);
+ return ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
+ entryId, lac, 0, Unpooled.buffer(10)));
+
+ }
+
+ public void addEntry(BookieId bookieId, long ledgerId, long entryId, ByteBuf entry) throws BKException {
+ MockLedgerData ledger = getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+ if (ledger.isFenced()) {
+ throw new BKException.BKLedgerFencedException();
+ }
+ ledger.addEntry(entryId, entry);
+ }
+
+ public void recoveryAddEntry(BookieId bookieId, long ledgerId, long entryId, ByteBuf entry) throws BKException {
+ MockLedgerData ledger = getBookieData(bookieId).computeIfAbsent(ledgerId, MockLedgerData::new);
+ ledger.addEntry(entryId, entry);
+ }
+
+ public ByteBuf readEntry(BookieId bookieId, int flags, long ledgerId, long entryId) throws BKException {
+ MockLedgerData ledger = getBookieData(bookieId).get(ledgerId);
+
+ if (ledger == null) {
+ LOG.warn("[{};L{}] ledger not found", bookieId, ledgerId);
+ throw new BKException.BKNoSuchLedgerExistsException();
+ }
+
+ if ((flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING) {
+ ledger.fence();
+ }
+
+ ByteBuf entry = ledger.getEntry(entryId);
+ if (entry == null) {
+ LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, entryId);
+ throw new BKException.BKNoSuchEntryException();
+ }
+
+ return entry;
+ }
+
+ public ConcurrentHashMap<Long, MockLedgerData> getBookieData(BookieId bookieId) {
+ return data.computeIfAbsent(bookieId, (key) -> new ConcurrentHashMap<>());
+ }
+
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java
new file mode 100644
index 0000000..05447f0
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockLedgerData.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ Mock ledger data.
+ */
+public class MockLedgerData {
+ final long ledgerId;
+ boolean isFenced;
+ private TreeMap<Long, ByteBuf> entries = new TreeMap<>();
+
+ MockLedgerData(long ledgerId) {
+ this.ledgerId = ledgerId;
+ }
+
+ boolean isFenced() {
+ return isFenced;
+ }
+
+ void fence() {
+ isFenced = true;
+ }
+
+ void addEntry(long entryId, ByteBuf entry) {
+ entries.put(entryId, entry);
+ }
+
+ ByteBuf getEntry(long entryId) {
+ if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+ Map.Entry<Long, ByteBuf> lastEntry = entries.lastEntry();
+ if (lastEntry != null) {
+ return lastEntry.getValue();
+ } else {
+ return null;
+ }
+ } else {
+ return entries.get(entryId);
+ }
+ }
+}