blob: dabf2b6088c22d361bc08dd43c931abf776519d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.net.BookieId;
import org.junit.Test;
/**
* Client side tests on deferred sync write flag.
*/
public class DeferredSyncTest extends MockBookKeeperTestCase {
static final byte[] PASSWORD = "password".getBytes();
static final ByteBuf DATA = Unpooled.wrappedBuffer("foobar".getBytes());
static final int NUM_ENTRIES = 100;
@Test
public void testAddEntryLastAddConfirmedDoesNotAdvance() throws Exception {
try (WriteHandle wh = result(newCreateLedgerOp()
.withEnsembleSize(3)
.withWriteQuorumSize(3)
.withAckQuorumSize(2)
.withPassword(PASSWORD)
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
result(wh.appendAsync(DATA.retainedDuplicate()));
}
long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
assertEquals(NUM_ENTRIES - 1, lastEntryID);
assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
assertEquals(-1, wh.getLastAddConfirmed());
}
}
@Test
public void testAddEntryLastAddConfirmedAdvanceWithForce() throws Exception {
try (WriteHandle wh = result(newCreateLedgerOp()
.withEnsembleSize(3)
.withWriteQuorumSize(3)
.withAckQuorumSize(2)
.withPassword(PASSWORD)
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
result(wh.appendAsync(DATA.retainedDuplicate()));
}
long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
assertEquals(NUM_ENTRIES - 1, lastEntryID);
assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
assertEquals(-1, wh.getLastAddConfirmed());
result(wh.force());
assertEquals(NUM_ENTRIES - 1, wh.getLastAddConfirmed());
}
}
@Test
public void testForceOnWriteAdvHandle() throws Exception {
try (WriteAdvHandle wh = result(newCreateLedgerOp()
.withEnsembleSize(3)
.withWriteQuorumSize(3)
.withAckQuorumSize(2)
.withPassword(PASSWORD)
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.makeAdv()
.execute())) {
CompletableFuture<Long> w0 = wh.writeAsync(0, DATA.retainedDuplicate());
CompletableFuture<Long> w2 = wh.writeAsync(2, DATA.retainedDuplicate());
CompletableFuture<Long> w3 = wh.writeAsync(3, DATA.retainedDuplicate());
result(w0);
result(wh.force());
assertEquals(0, wh.getLastAddConfirmed());
CompletableFuture<Long> w1 = wh.writeAsync(1, DATA.retainedDuplicate());
result(w3);
assertTrue(w1.isDone());
assertTrue(w2.isDone());
CompletableFuture<Long> w5 = wh.writeAsync(5, DATA.retainedDuplicate());
result(wh.force());
assertEquals(3, wh.getLastAddConfirmed());
wh.writeAsync(4, DATA.retainedDuplicate());
result(w5);
result(wh.force());
assertEquals(5, wh.getLastAddConfirmed());
}
}
@Test
public void testForceRequiresFullEnsemble() throws Exception {
try (WriteHandle wh = result(newCreateLedgerOp()
.withEnsembleSize(3)
.withWriteQuorumSize(2)
.withAckQuorumSize(2)
.withPassword(PASSWORD)
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
result(wh.appendAsync(DATA.retainedDuplicate()));
}
long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate()));
assertEquals(NUM_ENTRIES - 1, lastEntryID);
assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed());
assertEquals(-1, wh.getLastAddConfirmed());
BookieId bookieAddress = wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0);
killBookie(bookieAddress);
// write should succeed (we still have 2 bookies out of 3)
result(wh.appendAsync(DATA.retainedDuplicate()));
// force cannot go, it must be acknowledged by all of the bookies in the ensamble
try {
result(wh.force());
} catch (BKException.BKBookieException failed) {
}
// bookie comes up again, force must succeed
startKilledBookie(bookieAddress);
result(wh.force());
}
}
@Test
public void testForceWillAdvanceLacOnlyUpToLastAcknoledgedWrite() throws Exception {
try (WriteHandle wh = result(newCreateLedgerOp()
.withEnsembleSize(3)
.withWriteQuorumSize(3)
.withAckQuorumSize(3)
.withPassword(PASSWORD)
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
result(wh.appendAsync(DATA.retainedDuplicate()));
}
long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA.retainedDuplicate()));
assertEquals(NUM_ENTRIES - 1, lastEntryIdBeforeSuspend);
assertEquals(-1, wh.getLastAddConfirmed());
// one bookie will stop sending acks for forceLedger
BookieId bookieAddress = wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0);
suspendBookieForceLedgerAcks(bookieAddress);
// start and complete a force, lastAddConfirmed cannot be "lastAddPushedAfterSuspendedWrite"
// because the write has not yet been acknowledged by AckQuorumSize Bookies
CompletableFuture<?> forceResult = wh.force();
assertEquals(-1, wh.getLastAddConfirmed());
// send an entry and receive ack
long lastEntry = wh.append(DATA.retainedDuplicate());
// receive the ack for forceLedger
resumeBookieWriteAcks(bookieAddress);
result(forceResult);
// now LastAddConfirmed will be equals to the last confirmed entry
// before force() started
assertEquals(lastEntryIdBeforeSuspend, wh.getLastAddConfirmed());
result(wh.force());
assertEquals(lastEntry, wh.getLastAddConfirmed());
}
}
@Test
public void testForbiddenEnsembleChange() throws Exception {
try (WriteHandle wh = result(newCreateLedgerOp()
.withEnsembleSize(1)
.withWriteQuorumSize(1)
.withAckQuorumSize(1)
.withPassword(PASSWORD)
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute())) {
for (int i = 0; i < NUM_ENTRIES - 1; i++) {
wh.append(DATA.retainedDuplicate());
}
assertEquals(1, availableBookies.size());
// kill the only bookie in the ensamble
killBookie(wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0));
assertEquals(0, availableBookies.size());
startNewBookie();
assertEquals(1, availableBookies.size());
try {
// we cannot switch to the new bookie with DEFERRED_SYNC
wh.append(DATA.retainedDuplicate());
fail("since ensemble change is disable we cannot be able to write any more");
} catch (BKException.BKWriteException ex) {
// expected
}
LedgerHandle lh = (LedgerHandle) wh;
assertFalse(lh.hasDelayedWriteFailedBookies());
}
}
@Test(expected = BKException.BKLedgerClosedException.class)
public void testCannotIssueForceOnClosedLedgerHandle() throws Exception {
WriteHandle wh = result(newCreateLedgerOp()
.withEnsembleSize(1)
.withWriteQuorumSize(1)
.withAckQuorumSize(1)
.withPassword(PASSWORD)
.withWriteFlags(WriteFlag.DEFERRED_SYNC)
.execute());
wh.close();
result(wh.force());
}
}