blob: 95a5949bdfad47c296122e292fa46935c62b69b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import com.google.common.collect.Lists;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.versioning.Versioned;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ledger recovery tests using mocks rather than a real cluster.
*/
public class LedgerClose2Test {
private static final Logger log = LoggerFactory.getLogger(LedgerRecovery2Test.class);
private static final BookieId b1 = new BookieSocketAddress("b1", 3181).toBookieId();
private static final BookieId b2 = new BookieSocketAddress("b2", 3181).toBookieId();
private static final BookieId b3 = new BookieSocketAddress("b3", 3181).toBookieId();
private static final BookieId b4 = new BookieSocketAddress("b4", 3181).toBookieId();
private static final BookieId b5 = new BookieSocketAddress("b5", 3181).toBookieId();
@Test
public void testTryAddAfterCloseHasBeenCalled() throws Exception {
MockClientContext clientCtx = MockClientContext.create();
for (int i = 0; i < 1000; i++) {
Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, i,
LedgerMetadataBuilder.create().newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
LedgerHandle lh = new LedgerHandle(clientCtx, i, md, BookKeeper.DigestType.CRC32C,
ClientUtil.PASSWD, WriteFlag.NONE);
CompletableFuture<?> closeFuture = lh.closeAsync();
try {
long eid = lh.append("entry".getBytes());
// if it succeeds, it should be in final ledge
closeFuture.get();
Assert.assertTrue(lh.getLedgerMetadata().isClosed());
Assert.assertEquals(lh.getLedgerMetadata().getLastEntryId(), eid);
} catch (BKException.BKLedgerClosedException bke) {
closeFuture.get();
Assert.assertTrue(lh.getLedgerMetadata().isClosed());
Assert.assertEquals(lh.getLedgerMetadata().getLastEntryId(), LedgerHandle.INVALID_ENTRY_ID);
}
}
}
@Test
public void testMetadataChangedDuringClose() throws Exception {
MockClientContext clientCtx = MockClientContext.create();
Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, 10L,
LedgerMetadataBuilder.create()
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
LedgerHandle lh = new LedgerHandle(clientCtx, 10L, md, BookKeeper.DigestType.CRC32C,
ClientUtil.PASSWD, WriteFlag.NONE);
lh.append("entry1".getBytes());
clientCtx.getMockRegistrationClient().addBookies(b4).get();
clientCtx.getMockBookieClient().errorBookies(b3);
lh.append("entry2".getBytes());
CompletableFuture<Void> closeInProgress = new CompletableFuture<>();
CompletableFuture<Void> blockClose = new CompletableFuture<>();
clientCtx.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) -> {
// block the write trying to replace b3 with b4
if (metadata.isClosed()) {
closeInProgress.complete(null);
return blockClose;
} else {
return FutureUtils.value(null);
}
});
CompletableFuture<?> closeFuture = lh.closeAsync();
closeInProgress.get();
ClientUtil.transformMetadata(clientCtx, 10L,
(metadata) -> LedgerMetadataBuilder.from(metadata).replaceEnsembleEntry(
0L, Lists.newArrayList(b4, b2, b5)).build());
blockClose.complete(null);
closeFuture.get();
Assert.assertTrue(lh.getLedgerMetadata().isClosed());
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().size(), 2);
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(0L), Lists.newArrayList(b4, b2, b5));
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(1L), Lists.newArrayList(b1, b2, b4));
Assert.assertEquals(lh.getLedgerMetadata().getLastEntryId(), 1L);
}
@Test
public void testMetadataCloseWithCorrectLengthDuringClose() throws Exception {
MockClientContext clientCtx = MockClientContext.create();
Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, 10L,
LedgerMetadataBuilder.create()
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
LedgerHandle lh = new LedgerHandle(clientCtx, 10L, md, BookKeeper.DigestType.CRC32C,
ClientUtil.PASSWD, WriteFlag.NONE);
long lac = lh.append("entry1".getBytes());
long length = lh.getLength();
CompletableFuture<Void> closeInProgress = new CompletableFuture<>();
CompletableFuture<Void> blockClose = new CompletableFuture<>();
clientCtx.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) -> {
// block the write trying to do the first close
if (!closeInProgress.isDone() && metadata.isClosed()) {
closeInProgress.complete(null);
return blockClose;
} else {
return FutureUtils.value(null);
}
});
CompletableFuture<?> closeFuture = lh.closeAsync();
closeInProgress.get();
ClientUtil.transformMetadata(clientCtx, 10L,
(metadata) -> LedgerMetadataBuilder.from(metadata)
.withClosedState().withLastEntryId(lac).withLength(length).build());
blockClose.complete(null);
closeFuture.get();
Assert.assertTrue(lh.getLedgerMetadata().isClosed());
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().size(), 1);
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(0L), Lists.newArrayList(b1, b2, b3));
Assert.assertEquals(lh.getLedgerMetadata().getLastEntryId(), lac);
Assert.assertEquals(lh.getLedgerMetadata().getLength(), length);
}
@Test
public void testMetadataCloseWithDifferentLengthDuringClose() throws Exception {
MockClientContext clientCtx = MockClientContext.create();
Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, 10L,
LedgerMetadataBuilder.create()
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
LedgerHandle lh = new LedgerHandle(clientCtx, 10L, md, BookKeeper.DigestType.CRC32C,
ClientUtil.PASSWD, WriteFlag.NONE);
long lac = lh.append("entry1".getBytes());
long length = lh.getLength();
CompletableFuture<Void> closeInProgress = new CompletableFuture<>();
CompletableFuture<Void> blockClose = new CompletableFuture<>();
clientCtx.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) -> {
// block the write trying to do the first close
if (!closeInProgress.isDone() && metadata.isClosed()) {
closeInProgress.complete(null);
return blockClose;
} else {
return FutureUtils.value(null);
}
});
CompletableFuture<?> closeFuture = lh.closeAsync();
closeInProgress.get();
/* close with different length. can happen in cases where there's a write outstanding */
ClientUtil.transformMetadata(clientCtx, 10L,
(metadata) -> LedgerMetadataBuilder.from(metadata)
.withClosedState().withLastEntryId(lac + 1).withLength(length + 100).build());
blockClose.complete(null);
try {
closeFuture.get();
Assert.fail("Close should fail. Ledger has been closed in a state we don't know how to untangle");
} catch (ExecutionException ee) {
Assert.assertEquals(ee.getCause().getClass(), BKException.BKMetadataVersionException.class);
}
}
@Test
public void testMetadataCloseMarkedInRecoveryWhileClosing() throws Exception {
MockClientContext clientCtx = MockClientContext.create();
Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, 10L,
LedgerMetadataBuilder.create()
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
LedgerHandle lh = new LedgerHandle(clientCtx, 10L, md, BookKeeper.DigestType.CRC32C,
ClientUtil.PASSWD, WriteFlag.NONE);
long lac = lh.append("entry1".getBytes());
long length = lh.getLength();
CompletableFuture<Void> closeInProgress = new CompletableFuture<>();
CompletableFuture<Void> blockClose = new CompletableFuture<>();
clientCtx.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) -> {
// block the write trying to do the first close
if (metadata.isClosed()) {
closeInProgress.complete(null);
return blockClose;
} else {
return FutureUtils.value(null);
}
});
CompletableFuture<?> closeFuture = lh.closeAsync();
closeInProgress.get();
ClientUtil.transformMetadata(clientCtx, 10L,
(metadata) -> LedgerMetadataBuilder.from(metadata).withInRecoveryState().build());
blockClose.complete(null);
closeFuture.get(); // should override in recovery, since this handle knows what it has written
Assert.assertTrue(lh.getLedgerMetadata().isClosed());
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().size(), 1);
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(0L), Lists.newArrayList(b1, b2, b3));
Assert.assertEquals(lh.getLedgerMetadata().getLastEntryId(), lac);
Assert.assertEquals(lh.getLedgerMetadata().getLength(), length);
}
@Test
public void testCloseWhileAddInProgress() throws Exception {
MockClientContext clientCtx = MockClientContext.create();
Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, 10L,
LedgerMetadataBuilder.create()
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
// block all entry writes from completing
CompletableFuture<Void> writesHittingBookies = new CompletableFuture<>();
clientCtx.getMockBookieClient().setPreWriteHook((bookie, ledgerId, entryId) -> {
writesHittingBookies.complete(null);
return new CompletableFuture<Void>();
});
LedgerHandle lh = new LedgerHandle(clientCtx, 10L, md, BookKeeper.DigestType.CRC32C,
ClientUtil.PASSWD, WriteFlag.NONE);
CompletableFuture<?> future = lh.appendAsync("entry1".getBytes());
writesHittingBookies.get();
lh.close();
try {
future.get();
Assert.fail("That write shouldn't have succeeded");
} catch (ExecutionException ee) {
Assert.assertEquals(ee.getCause().getClass(), BKException.BKLedgerClosedException.class);
}
Assert.assertTrue(lh.getLedgerMetadata().isClosed());
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().size(), 1);
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(0L), Lists.newArrayList(b1, b2, b3));
Assert.assertEquals(lh.getLedgerMetadata().getLastEntryId(), LedgerHandle.INVALID_ENTRY_ID);
Assert.assertEquals(lh.getLedgerMetadata().getLength(), 0);
}
@Test
public void testDoubleCloseOnHandle() throws Exception {
long ledgerId = 123L;
MockClientContext clientCtx = MockClientContext.create();
Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, ledgerId,
LedgerMetadataBuilder.create()
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(3)
.newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
CompletableFuture<Void> metadataPromise = new CompletableFuture<>();
CompletableFuture<Void> clientPromise = new CompletableFuture<>();
LedgerHandle writer = new LedgerHandle(clientCtx, ledgerId, md,
BookKeeper.DigestType.CRC32C,
ClientUtil.PASSWD, WriteFlag.NONE);
long eid1 = writer.append("entry1".getBytes());
log.info("block writes from completing on bookies and metadata");
clientCtx.getMockBookieClient().setPostWriteHook((bookie, lid, eid) -> clientPromise);
clientCtx.getMockLedgerManager().setPreWriteHook((lid, metadata) -> metadataPromise);
log.info("try to add another entry, it will block");
writer.appendAsync("entry2".getBytes());
log.info("attempt one close, should block forever");
CompletableFuture<Void> firstClose = writer.closeAsync();
log.info("attempt second close, should not finish before first one");
CompletableFuture<Void> secondClose = writer.closeAsync();
Thread.sleep(500); // give it a chance to complete, the request jumps around threads
Assert.assertFalse(firstClose.isDone());
Assert.assertFalse(secondClose.isDone());
}
}