blob: f60161d243493c2b7b5a6e0b5b1bdeed3f27444e [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.SortedLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test reading the last confirmed and entry.
*/
@RunWith(Parameterized.class)
public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase {
private static final Logger logger = LoggerFactory.getLogger(TestReadLastConfirmedAndEntry.class);
final BookKeeper.DigestType digestType;
public TestReadLastConfirmedAndEntry(Class<? extends LedgerStorage> storageClass) {
super(3);
this.digestType = BookKeeper.DigestType.CRC32;
this.baseConf.setAllowEphemeralPorts(false);
this.baseConf.setLedgerStorageClass(storageClass.getName());
}
@Parameters
public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] {
{ InterleavedLedgerStorage.class },
{ SortedLedgerStorage.class },
{ DbLedgerStorage.class },
});
}
static class FakeBookie extends BookieImpl {
final long expectedEntryToFail;
final boolean stallOrRespondNull;
public FakeBookie(ServerConfiguration conf, long expectedEntryToFail, boolean stallOrRespondNull)
throws InterruptedException, BookieException, KeeperException, IOException {
super(conf);
this.expectedEntryToFail = expectedEntryToFail;
this.stallOrRespondNull = stallOrRespondNull;
}
@Override
public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException {
if (entryId == expectedEntryToFail) {
if (stallOrRespondNull) {
try {
Thread.sleep(600000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
} else {
throw new NoEntryException(ledgerId, entryId);
}
}
return super.readEntry(ledgerId, entryId);
}
}
@Test
public void testAdvancedLacWithEmptyResponse() throws Exception {
byte[] passwd = "advanced-lac-with-empty-response".getBytes(UTF_8);
ClientConfiguration newConf = new ClientConfiguration();
newConf.addConfiguration(baseClientConf);
newConf.setAddEntryTimeout(9999999);
newConf.setReadEntryTimeout(9999999);
// stop existing bookies
stopAllBookies();
// add fake bookies
long expectedEntryIdToFail = 2;
for (int i = 0; i < numBookies; i++) {
ServerConfiguration conf = newServerConfiguration();
Bookie b = new FakeBookie(conf, expectedEntryIdToFail, i != 0);
bs.add(startBookie(conf, b));
bsConfs.add(conf);
}
// create bookkeeper
BookKeeper newBk = new BookKeeper(newConf);
// create ledger to write some data
LedgerHandle lh = newBk.createLedger(3, 3, 2, digestType, passwd);
for (int i = 0; i <= expectedEntryIdToFail; i++) {
lh.addEntry("test".getBytes(UTF_8));
}
// open ledger to tail reading
LedgerHandle newLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, passwd);
long lac = newLh.readLastConfirmed();
assertEquals(expectedEntryIdToFail - 1, lac);
Enumeration<LedgerEntry> entries = newLh.readEntries(0, lac);
int numReads = 0;
long expectedEntryId = 0L;
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
assertEquals(expectedEntryId++, entry.getEntryId());
++numReads;
}
assertEquals(lac + 1, numReads);
final AtomicInteger rcHolder = new AtomicInteger(-12345);
final AtomicLong lacHolder = new AtomicLong(lac);
final AtomicReference<LedgerEntry> entryHolder = new AtomicReference<LedgerEntry>(null);
final CountDownLatch latch = new CountDownLatch(1);
newLh.asyncReadLastConfirmedAndEntry(newLh.getLastAddConfirmed() + 1, 99999, false,
new AsyncCallback.ReadLastConfirmedAndEntryCallback() {
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
rcHolder.set(rc);
lacHolder.set(lastConfirmed);
entryHolder.set(entry);
latch.countDown();
}
}, null);
lh.addEntry("another test".getBytes(UTF_8));
latch.await();
assertEquals(expectedEntryIdToFail, lacHolder.get());
assertNull(entryHolder.get());
assertEquals(BKException.Code.OK, rcHolder.get());
}
static class SlowReadLacBookie extends BookieImpl {
private final long lacToSlowRead;
private final CountDownLatch readLatch;
public SlowReadLacBookie(ServerConfiguration conf,
long lacToSlowRead, CountDownLatch readLatch)
throws IOException, KeeperException, InterruptedException, BookieException {
super(conf);
this.lacToSlowRead = lacToSlowRead;
this.readLatch = readLatch;
}
@Override
public long readLastAddConfirmed(long ledgerId) throws IOException {
long lac = super.readLastAddConfirmed(ledgerId);
logger.info("Last Add Confirmed for ledger {} is {}", ledgerId, lac);
if (lacToSlowRead == lac) {
logger.info("Suspend returning lac {} for ledger {}", lac, ledgerId);
try {
readLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// no-op
}
}
return super.readLastAddConfirmed(ledgerId);
}
}
static class ReadLastConfirmedAndEntryResult implements AsyncCallback.ReadLastConfirmedAndEntryCallback {
int rc = -1234;
long lac = -1234L;
LedgerEntry entry = null;
final CountDownLatch doneLatch = new CountDownLatch(1);
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
this.rc = rc;
this.lac = lastConfirmed;
this.entry = entry;
doneLatch.countDown();
}
void await() throws InterruptedException {
doneLatch.await();
}
}
@Test
public void testRaceOnLastAddConfirmed() throws Exception {
byte[] passwd = "race-on-last-add-confirmed".getBytes(UTF_8);
ClientConfiguration newConf = new ClientConfiguration();
newConf.addConfiguration(baseClientConf);
newConf.setAddEntryTimeout(9999999);
newConf.setReadEntryTimeout(9999999);
final long lacToSlowRead = 0L;
final CountDownLatch readLatch = new CountDownLatch(1);
// stop first bookie
ServerConfiguration bsConf = killBookie(0);
// start it with a slow bookie
Bookie b = new SlowReadLacBookie(bsConf, lacToSlowRead, readLatch);
bs.add(startBookie(bsConf, b));
bsConfs.add(bsConf);
// create bookkeeper
BookKeeper newBk = new BookKeeper(newConf);
// create ledger
LedgerHandle lh = newBk.createLedger(3, 3, 3, digestType, passwd);
// 0) write entry 0
lh.addEntry("entry-0".getBytes(UTF_8));
// open ledger to read
LedgerHandle readLh = newBk.openLedgerNoRecovery(lh.getId(), digestType, passwd);
// 1) wait entry 0 to be committed
ReadLastConfirmedAndEntryResult readResult = new ReadLastConfirmedAndEntryResult();
readLh.asyncReadLastConfirmedAndEntry(0L, 9999999, true, readResult, null);
// 2) write entry 1 to commit entry 0 => lac = 0
lh.addEntry("entry-1".getBytes(UTF_8));
readResult.await();
assertEquals(BKException.Code.OK, readResult.rc);
assertEquals(0L, readResult.lac);
assertEquals(0L, readResult.entry.getEntryId());
assertEquals("entry-0", new String(readResult.entry.getEntry(), UTF_8));
// 3) write entry 2 to commit entry 1 => lac = 1
lh.addEntry("entry-2".getBytes(UTF_8));
// 4) count down read latch to trigger previous readLacAndEntry request
readLatch.countDown();
// 5) due to piggyback, the lac is updated to lac = 1
while (readLh.getLastAddConfirmed() < 1L) {
Thread.sleep(100);
}
// 6) write entry 3 to commit entry 2 => lac = 2
lh.addEntry("entry-3".getBytes(UTF_8));
// 7) readLastConfirmedAndEntry for next entry (we are expecting to read entry 1)
readResult = new ReadLastConfirmedAndEntryResult();
readLh.asyncReadLastConfirmedAndEntry(1L, 9999999, true, readResult, null);
readResult.await();
assertEquals(BKException.Code.OK, readResult.rc);
assertEquals(2L, readResult.lac);
assertEquals(1L, readResult.entry.getEntryId());
assertEquals("entry-1", new String(readResult.entry.getEntry(), UTF_8));
lh.close();
readLh.close();
newBk.close();
}
}