blob: e4801cf2fb49e485e6332809d0a22a3634f4282a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Unit tests for parallel reading.
*/
public class TestParallelRead extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory.getLogger(TestParallelRead.class);
final DigestType digestType;
final byte[] passwd = "parallel-read".getBytes();
public TestParallelRead() {
super(6);
this.digestType = DigestType.CRC32;
}
long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int numEntries)
throws Exception {
LedgerHandle lh = bkc.createLedger(ensemble, writeQuorum, ackQuorum, digestType, passwd);
for (int i = 0; i < numEntries; i++) {
lh.addEntry(("" + i).getBytes());
}
lh.close();
return lh.getId();
}
PendingReadOp createReadOp(LedgerHandle lh, long from, long to) {
return new PendingReadOp(lh, bkc.getClientCtx(), from, to, false);
}
PendingReadOp createRecoveryReadOp(LedgerHandle lh, long from, long to) {
return new PendingReadOp(lh, bkc.getClientCtx(), from, to, true);
}
@Test
public void testNormalParallelRead() throws Exception {
int numEntries = 10;
long id = getLedgerToRead(5, 2, 2, numEntries);
LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
// read single entry
for (int i = 0; i < numEntries; i++) {
PendingReadOp readOp = createReadOp(lh, i, i);
readOp.parallelRead(true).submit();
Iterator<LedgerEntry> entries = readOp.future().get().iterator();
assertTrue(entries.hasNext());
LedgerEntry entry = entries.next();
assertNotNull(entry);
assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes())));
entry.close();
assertFalse(entries.hasNext());
}
// read multiple entries
PendingReadOp readOp = createReadOp(lh, 0, numEntries - 1);
readOp.parallelRead(true).submit();
Iterator<LedgerEntry> iterator = readOp.future().get().iterator();
int numReads = 0;
while (iterator.hasNext()) {
LedgerEntry entry = iterator.next();
assertNotNull(entry);
assertEquals(numReads, Integer.parseInt(new String(entry.getEntryBytes())));
entry.close();
++numReads;
}
assertEquals(numEntries, numReads);
lh.close();
}
private static <T> void expectFail(CompletableFuture<T> future, int expectedRc) {
try {
result(future);
fail("Expect to fail");
} catch (Exception e) {
assertTrue(e instanceof BKException);
BKException bke = (BKException) e;
assertEquals(expectedRc, bke.getCode());
}
}
@Test
public void testParallelReadMissingEntries() throws Exception {
int numEntries = 10;
long id = getLedgerToRead(5, 2, 2, numEntries);
LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
// read single entry
PendingReadOp readOp = createReadOp(lh, 11, 11);
readOp.parallelRead(true).submit();
expectFail(readOp.future(), Code.NoSuchEntryException);
// read multiple entries
readOp = createReadOp(lh, 8, 11);
readOp.parallelRead(true).submit();
expectFail(readOp.future(), Code.NoSuchEntryException);
lh.close();
}
@Test
public void testFailParallelRecoveryReadMissingEntryImmediately() throws Exception {
int numEntries = 1;
long id = getLedgerToRead(5, 5, 3, numEntries);
ClientConfiguration newConf = new ClientConfiguration()
.setReadEntryTimeout(30000);
newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
BookKeeper newBk = new BookKeeper(newConf);
LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(10);
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
// sleep two bookie
sleepBookie(ensemble.get(0), latch1);
sleepBookie(ensemble.get(1), latch2);
PendingReadOp readOp = createRecoveryReadOp(lh, 10, 10);
readOp.parallelRead(true).submit();
// would fail immediately if found missing entries don't cover ack quorum
expectFail(readOp.future(), Code.NoSuchEntryException);
latch1.countDown();
latch2.countDown();
lh.close();
newBk.close();
}
@Test
public void testParallelReadWithFailedBookies() throws Exception {
int numEntries = 10;
long id = getLedgerToRead(5, 3, 3, numEntries);
ClientConfiguration newConf = new ClientConfiguration()
.setReadEntryTimeout(30000);
newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
BookKeeper newBk = new BookKeeper(newConf);
LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(5);
// kill two bookies
killBookie(ensemble.get(0));
killBookie(ensemble.get(1));
// read multiple entries
PendingReadOp readOp = createReadOp(lh, 0, numEntries - 1);
readOp.parallelRead(true).submit();
Iterator<LedgerEntry> entries = readOp.future().get().iterator();
int numReads = 0;
while (entries.hasNext()) {
LedgerEntry entry = entries.next();
assertNotNull(entry);
assertEquals(numReads, Integer.parseInt(new String(entry.getEntryBytes())));
++numReads;
}
assertEquals(numEntries, numReads);
lh.close();
newBk.close();
}
@Test
public void testParallelReadFailureWithFailedBookies() throws Exception {
int numEntries = 10;
long id = getLedgerToRead(5, 3, 3, numEntries);
ClientConfiguration newConf = new ClientConfiguration()
.setReadEntryTimeout(30000);
newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
BookKeeper newBk = new BookKeeper(newConf);
LedgerHandle lh = bkc.openLedger(id, digestType, passwd);
List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(5);
// kill two bookies
killBookie(ensemble.get(0));
killBookie(ensemble.get(1));
killBookie(ensemble.get(2));
// read multiple entries
PendingReadOp readOp = createReadOp(lh, 0, numEntries - 1);
readOp.parallelRead(true).submit();
expectFail(readOp.future(), Code.BookieHandleNotAvailableException);
lh.close();
newBk.close();
}
}