blob: e76ee545449fb7851444f08f7cbe70efff7d2601 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.MockLedgerManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test metadata update loop.
*/
public class MetadataUpdateLoopTest {
static final Logger LOG = LoggerFactory.getLogger(MetadataUpdateLoopTest.class);
/**
* Test that we can update the metadata using the update loop.
*/
@Test
public void testBasicUpdate() throws Exception {
try (LedgerManager lm = new MockLedgerManager()) {
LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(5)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.newEnsembleEntry(0L, Lists.newArrayList(
new BookieSocketAddress("0.0.0.0:3181"),
new BookieSocketAddress("0.0.0.1:3181"),
new BookieSocketAddress("0.0.0.2:3181"),
new BookieSocketAddress("0.0.0.3:3181"),
new BookieSocketAddress("0.0.0.4:3181"))).build();
long ledgerId = 1234L;
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<>(writtenMetadata);
BookieSocketAddress newAddress = new BookieSocketAddress("0.0.0.5:3181");
MetadataUpdateLoop loop = new MetadataUpdateLoop(
lm,
ledgerId,
reference::get,
(currentMetadata) -> true,
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(0, newAddress);
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference::compareAndSet);
loop.run().get();
Assert.assertNotEquals(reference.get(), writtenMetadata);
Assert.assertEquals(reference.get().getValue().getEnsembleAt(0L).get(0), newAddress);
}
}
/**
* Test that when 2 update loops conflict when making diffent updates to the metadata,
* both will eventually succeed, and both updates will be reflected in the final metadata.
*/
@Test
public void testConflictOnWrite() throws Exception {
try (BlockableMockLedgerManager lm = spy(new BlockableMockLedgerManager())) {
lm.blockWrites();
long ledgerId = 1234L;
BookieSocketAddress b0 = new BookieSocketAddress("0.0.0.0:3181");
BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
Versioned<LedgerMetadata> writtenMetadata =
lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference1 = new AtomicReference<>(writtenMetadata);
CompletableFuture<Versioned<LedgerMetadata>> loop1 = new MetadataUpdateLoop(
lm,
ledgerId,
reference1::get,
(currentMetadata) -> currentMetadata.getEnsembleAt(0L).contains(b0),
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(0, b2);
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference1::compareAndSet).run();
AtomicReference<Versioned<LedgerMetadata>> reference2 = new AtomicReference<>(writtenMetadata);
CompletableFuture<Versioned<LedgerMetadata>> loop2 = new MetadataUpdateLoop(
lm,
ledgerId,
reference2::get,
(currentMetadata) -> currentMetadata.getEnsembleAt(0L).contains(b1),
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(1, b3);
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference2::compareAndSet).run();
lm.releaseWrites();
Versioned<LedgerMetadata> l1meta = loop1.get();
Versioned<LedgerMetadata> l2meta = loop2.get();
Assert.assertEquals(l1meta, reference1.get());
Assert.assertEquals(l2meta, reference2.get());
Assert.assertEquals(l1meta.getVersion().compare(l2meta.getVersion()), Version.Occurred.BEFORE);
Assert.assertEquals(l1meta.getValue().getEnsembleAt(0L).get(0), b2);
Assert.assertEquals(l1meta.getValue().getEnsembleAt(0L).get(1), b1);
Assert.assertEquals(l2meta.getValue().getEnsembleAt(0L).get(0), b2);
Assert.assertEquals(l2meta.getValue().getEnsembleAt(0L).get(1), b3);
verify(lm, times(3)).writeLedgerMetadata(anyLong(), any(), any());
}
}
/**
* Test that when 2 updates loops try to make the same modification, and they
* conflict on the write to the store, the one that receives the conflict won't
* try to write again, as the value is now correct.
*/
@Test
public void testConflictOnWriteBothWritingSame() throws Exception {
try (BlockableMockLedgerManager lm = spy(new BlockableMockLedgerManager())) {
lm.blockWrites();
long ledgerId = 1234L;
BookieSocketAddress b0 = new BookieSocketAddress("0.0.0.0:3181");
BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<>(writtenMetadata);
CompletableFuture<Versioned<LedgerMetadata>> loop1 = new MetadataUpdateLoop(
lm,
ledgerId,
reference::get,
(currentMetadata) -> currentMetadata.getEnsembleAt(0L).contains(b0),
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(0, b2);
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference::compareAndSet).run();
CompletableFuture<Versioned<LedgerMetadata>> loop2 = new MetadataUpdateLoop(
lm,
ledgerId,
reference::get,
(currentMetadata) -> currentMetadata.getEnsembleAt(0L).contains(b0),
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(0, b2);
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference::compareAndSet).run();
lm.releaseWrites();
Assert.assertEquals(loop1.get(), loop2.get());
Assert.assertEquals(loop1.get(), reference.get());
Assert.assertEquals(reference.get().getValue().getEnsembleAt(0L).get(0), b2);
Assert.assertEquals(reference.get().getValue().getEnsembleAt(0L).get(1), b1);
verify(lm, times(2)).writeLedgerMetadata(anyLong(), any(), any());
}
}
/**
* Test that when 2 update loops both manage to write, but conflict on
* updating the local value.
*/
@Test
public void testConflictOnLocalUpdate() throws Exception {
try (DeferCallbacksMockLedgerManager lm = spy(new DeferCallbacksMockLedgerManager(1))) {
long ledgerId = 1234L;
BookieSocketAddress b0 = new BookieSocketAddress("0.0.0.0:3181");
BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<>(writtenMetadata);
CompletableFuture<Versioned<LedgerMetadata>> loop1 = new MetadataUpdateLoop(
lm,
ledgerId,
reference::get,
(currentMetadata) -> currentMetadata.getEnsembleAt(0L).contains(b0),
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(0, b2);
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference::compareAndSet).run();
lm.waitForWriteCount(1);
CompletableFuture<Versioned<LedgerMetadata>> loop2 = new MetadataUpdateLoop(
lm,
ledgerId,
reference::get,
(currentMetadata) -> currentMetadata.getEnsembleAt(0L).contains(b1),
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(1, b3);
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference::compareAndSet).run();
Assert.assertEquals(loop2.get(), reference.get());
lm.runDeferred();
Assert.assertEquals(loop1.get(), reference.get());
Assert.assertEquals(reference.get().getValue().getEnsembleAt(0L).get(0), b2);
Assert.assertEquals(reference.get().getValue().getEnsembleAt(0L).get(1), b3);
verify(lm, times(3)).writeLedgerMetadata(anyLong(), any(), any());
}
}
private static BookieSocketAddress address(String s) {
try {
return new BookieSocketAddress(s);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Hammer test. Kick off a lot of metadata updates concurrently with a ledger manager
* that runs callbacks on random threads, and validate all updates complete eventually,
* and that the final metadata reflects all the updates.
*/
@Test
public void testHammer() throws Exception {
try (NonDeterministicMockLedgerManager lm = new NonDeterministicMockLedgerManager()) {
long ledgerId = 1234L;
int ensembleSize = 100;
List<BookieSocketAddress> initialEnsemble = IntStream.range(0, ensembleSize)
.mapToObj((i) -> address(String.format("0.0.0.%d:3181", i)))
.collect(Collectors.toList());
LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.newEnsembleEntry(0L, initialEnsemble).build();
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<>(writtenMetadata);
List<BookieSocketAddress> replacementBookies = IntStream.range(0, ensembleSize)
.mapToObj((i) -> address(String.format("0.0.%d.1:3181", i)))
.collect(Collectors.toList());
List<CompletableFuture<Versioned<LedgerMetadata>>> loops = IntStream.range(0, ensembleSize)
.mapToObj((i) -> new MetadataUpdateLoop(
lm,
ledgerId,
reference::get,
(currentMetadata) -> currentMetadata.getEnsembleAt(0L).contains(initialEnsemble.get(i)),
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(i, replacementBookies.get(i));
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference::compareAndSet).run())
.collect(Collectors.toList());
loops.forEach((l) -> l.join());
Assert.assertEquals(reference.get().getValue().getEnsembleAt(0L), replacementBookies);
}
}
/**
* Test that if we have two conflicting updates, only one of the loops will complete.
* The other will throw an exception.
*/
@Test
public void testNewestValueCannotBeUsedAfterReadBack() throws Exception {
try (BlockableMockLedgerManager lm = spy(new BlockableMockLedgerManager())) {
lm.blockWrites();
long ledgerId = 1234L;
BookieSocketAddress b0 = new BookieSocketAddress("0.0.0.0:3181");
BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(1)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withWriteQuorumSize(1).withAckQuorumSize(1)
.newEnsembleEntry(0L, Lists.newArrayList(b0)).build();
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<>(writtenMetadata);
CompletableFuture<Versioned<LedgerMetadata>> loop1 = new MetadataUpdateLoop(
lm,
ledgerId,
reference::get,
(currentMetadata) -> !currentMetadata.isClosed(),
(currentMetadata) -> {
return LedgerMetadataBuilder.from(currentMetadata)
.withClosedState().withLastEntryId(10L).withLength(100L).build();
},
reference::compareAndSet).run();
CompletableFuture<Versioned<LedgerMetadata>> loop2 = new MetadataUpdateLoop(
lm,
ledgerId,
reference::get,
(currentMetadata) -> {
if (currentMetadata.isClosed()) {
throw new BKException.BKLedgerClosedException();
} else {
return currentMetadata.getEnsembleAt(0L).contains(b0);
}
},
(currentMetadata) -> {
List<BookieSocketAddress> ensemble = Lists.newArrayList(currentMetadata.getEnsembleAt(0L));
ensemble.set(0, b1);
return LedgerMetadataBuilder.from(currentMetadata).replaceEnsembleEntry(0L, ensemble).build();
},
reference::compareAndSet).run();
lm.releaseWrites();
Versioned<LedgerMetadata> l1meta = loop1.get();
try {
loop2.get();
Assert.fail("Update loop should have failed");
} catch (ExecutionException ee) {
Assert.assertEquals(ee.getCause().getClass(), BKException.BKLedgerClosedException.class);
}
Assert.assertEquals(l1meta, reference.get());
Assert.assertEquals(l1meta.getValue().getEnsembleAt(0L).get(0), b0);
Assert.assertTrue(l1meta.getValue().isClosed());
verify(lm, times(2)).writeLedgerMetadata(anyLong(), any(), any());
}
}
static class NonDeterministicMockLedgerManager extends MockLedgerManager {
final ExecutorService cbExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("non-deter-%d").build());
@Override
public void executeCallback(Runnable r) {
cbExecutor.execute(r);
}
@Override
public void close() {
cbExecutor.shutdownNow();
super.close();
}
}
static class DeferCallbacksMockLedgerManager extends MockLedgerManager {
int writeCount = 0;
final int numToDefer;
List<Triple<CompletableFuture<Versioned<LedgerMetadata>>, Versioned<LedgerMetadata>, Throwable>> deferred =
Lists.newArrayList();
DeferCallbacksMockLedgerManager(int numToDefer) {
this.numToDefer = numToDefer;
}
synchronized void runDeferred() {
deferred.forEach((d) -> {
Throwable t = d.getRight();
if (t != null) {
d.getLeft().completeExceptionally(t);
} else {
d.getLeft().complete(d.getMiddle());
}
});
}
synchronized void waitForWriteCount(int count) throws Exception {
while (writeCount < count) {
wait();
}
}
@Override
public synchronized CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(
long ledgerId, LedgerMetadata metadata,
Version currentVersion) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
super.writeLedgerMetadata(ledgerId, metadata, currentVersion)
.whenComplete((written, exception) -> {
synchronized (DeferCallbacksMockLedgerManager.this) {
if (writeCount++ < numToDefer) {
LOG.info("Added to deferals");
deferred.add(Triple.of(promise, written, exception));
} else {
LOG.info("Completing {}", numToDefer);
if (exception != null) {
promise.completeExceptionally(exception);
} else {
promise.complete(written);
}
}
DeferCallbacksMockLedgerManager.this.notifyAll();
}
});
return promise;
}
}
@Data
@AllArgsConstructor
static class DeferredUpdate {
final CompletableFuture<Versioned<LedgerMetadata>> promise;
final long ledgerId;
final LedgerMetadata metadata;
final Version currentVersion;
}
static class BlockableMockLedgerManager extends MockLedgerManager {
boolean blocking = false;
List<DeferredUpdate> reqs = Lists.newArrayList();
synchronized void blockWrites() {
blocking = true;
}
synchronized void releaseWrites() {
blocking = false;
reqs.forEach((r) -> {
super.writeLedgerMetadata(r.getLedgerId(), r.getMetadata(),
r.getCurrentVersion())
.whenComplete((written, exception) -> {
if (exception != null) {
r.getPromise().completeExceptionally(exception);
} else {
r.getPromise().complete(written);
}
});
});
}
@Override
public synchronized CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(
long ledgerId, LedgerMetadata metadata, Version currentVersion) {
if (blocking) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
reqs.add(new DeferredUpdate(promise, ledgerId, metadata, currentVersion));
return promise;
} else {
return super.writeLedgerMetadata(ledgerId, metadata, currentVersion);
}
}
}
}