blob: f7fe99a74e4f3e5533cfbbdf7f9038c435d21568 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Multi-thread read test.
*/
public class MultipleThreadReadTest extends BookKeeperClusterTestCase {
private static final Logger LOG = LoggerFactory.getLogger(MultipleThreadReadTest.class);
BookKeeper.DigestType digestType;
byte [] ledgerPassword = "aaa".getBytes();
private int entriesPerLedger = 100;
final SyncObj mainSyncObj = new SyncObj();
class SyncObj {
volatile int counter;
boolean failed;
public SyncObj() {
counter = 0;
failed = false;
}
}
BookKeeperTestClient readBkc;
public MultipleThreadReadTest() {
super(6);
this.digestType = BookKeeper.DigestType.CRC32;
baseClientConf.setAddEntryTimeout(20);
}
@Override
public void setUp() throws Exception {
super.setUp();
readBkc = new BookKeeperTestClient(baseClientConf);
}
private Thread getWriterThread(final int tNo, final LedgerHandle lh, final AtomicBoolean resultHolder) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
final SyncObj tSync = new SyncObj();
for (int j = 0; j < entriesPerLedger; j++) {
final byte[] entry = ("Entry-" + tNo + "-" + j).getBytes();
lh.asyncAddEntry(entry, new AsyncCallback.AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle ledgerHandle, long eid, Object o) {
SyncObj syncObj = (SyncObj) o;
synchronized (syncObj) {
if (rc != BKException.Code.OK) {
LOG.error("Add entry {} failed : rc = {}", new String(entry, UTF_8), rc);
syncObj.failed = true;
syncObj.notify();
} else {
syncObj.counter++;
syncObj.notify();
}
}
}
}, tSync);
}
synchronized (tSync) {
while (!tSync.failed && tSync.counter < entriesPerLedger) {
try {
tSync.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
resultHolder.set(!tSync.failed);
}
// close this handle
try {
lh.close();
} catch (InterruptedException ie) {
LOG.error("Interrupted on closing ledger handle {} : ", lh.getId(), ie);
Thread.currentThread().interrupt();
} catch (BKException bke) {
LOG.error("Error on closing ledger handle {} : ", lh.getId(), bke);
}
}
}, "WriteThread(Lid=" + lh.getId() + ")");
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread thread, Throwable throwable) {
synchronized (mainSyncObj) {
mainSyncObj.failed = true;
}
}
});
return t;
}
private Thread getReaderThread(final int tNo, final LedgerHandle lh, final int ledgerNumber,
final AtomicBoolean resultHolder) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
//LedgerHandle lh = clientList.get(0).openLedger(ledgerIds.get(tNo % numLedgers),
// digestType, ledgerPassword);
long startEntryId = 0;
long endEntryId;
long eid = 0;
while (startEntryId <= entriesPerLedger - 1) {
endEntryId = Math.min(startEntryId + 10 - 1, entriesPerLedger - 1);
final long numEntries = (endEntryId - startEntryId) + 1;
boolean success = true;
try {
Enumeration<LedgerEntry> list = lh.readEntries(startEntryId, endEntryId);
for (int j = 0; j < numEntries; j++) {
LedgerEntry e;
try {
e = list.nextElement();
} catch (NoSuchElementException exception) {
success = false;
break;
}
long curEid = eid++;
if (e.getEntryId() != curEid) {
LOG.error("Expected entry id {} for ledger {} but {} found.",
curEid, lh.getId(), e.getEntryId());
success = false;
break;
}
byte[] data = e.getEntry();
if (!Arrays.equals(("Entry-" + ledgerNumber + "-" + e.getEntryId()).getBytes(), data)) {
LOG.error("Expected entry data 'Entry-{}-{}' but {} found for ledger {}.",
ledgerNumber, e.getEntryId(), new String(data, UTF_8), lh.getId());
success = false;
break;
}
}
if (success) {
success = !list.hasMoreElements();
if (!success) {
LOG.error("Found more entries returned on reading ({}-{}) from ledger {}.",
startEntryId, endEntryId, lh.getId());
}
}
} catch (InterruptedException ie) {
LOG.error("Interrupted on reading entries ({} - {}) from ledger {} : ",
startEntryId, endEntryId, lh.getId(), ie);
Thread.currentThread().interrupt();
success = false;
} catch (BKException bke) {
LOG.error("Failed on reading entries ({} - {}) from ledger {} : ",
startEntryId, endEntryId, lh.getId(), bke);
success = false;
}
resultHolder.set(success);
if (!success) {
break;
}
startEntryId = endEntryId + 1;
}
}
}, "ReadThread(Tid =" + tNo + ", Lid=" + lh.getId() + ")");
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread thread, Throwable throwable) {
LOG.error("Uncaught exception in thread {} : ", thread.getName(), throwable);
synchronized (mainSyncObj) {
mainSyncObj.failed = true;
}
}
});
return t;
}
/**
* Ledger L is handled threads [L, T+(T/L), T+2*(T/L) ... ]
* Reads are simultaneous, writes are sequential.
* @throws java.io.IOException
*/
public void multiLedgerMultiThreadRead(final int numLedgers,
final int numThreads) throws IOException {
assertTrue(numLedgers != 0 && numThreads >= numLedgers && numThreads % numLedgers == 0);
// We create numThread/numLedger clients so that each client can be used to open a handle.
try {
final List<LedgerHandle> oldLedgerHandles = new ArrayList<LedgerHandle>();
final List<Long> ledgerIds = new ArrayList<Long>();
List<Thread> threadList = new ArrayList<Thread>();
List<AtomicBoolean> writeResults = new ArrayList<AtomicBoolean>();
// Start write threads.
// Only one thread writes to a ledger, so just use numLedgers instead.
for (int i = 0; i < numLedgers; i++) {
LedgerHandle lh = bkc.createLedger(digestType, ledgerPassword);
oldLedgerHandles.add(lh);
ledgerIds.add(lh.getId());
AtomicBoolean writeResult = new AtomicBoolean(false);
writeResults.add(writeResult);
Thread t;
threadList.add(t = getWriterThread(i, oldLedgerHandles.get(i), writeResult));
t.start();
}
// Wait for the threads to complete
for (Thread t : threadList) {
t.join();
}
synchronized (mainSyncObj) {
if (mainSyncObj.failed) {
fail("Test failed because we encountered uncaught exception on adding entries.");
}
}
for (int i = 0; i < numLedgers; i++) {
assertTrue("Failed on adding entries for ledger " + oldLedgerHandles.get(i).getId(),
writeResults.get(i).get());
}
// Close the ledger handles.
for (LedgerHandle lh : oldLedgerHandles) {
try {
lh.close();
} catch (BKException.BKLedgerClosedException e) {
} catch (Exception e) {
fail("Error while closing handle.");
}
}
// Now try to read.
mainSyncObj.failed = false;
threadList.clear();
List<AtomicBoolean> readResults = new ArrayList<AtomicBoolean>();
for (int i = 0; i < numThreads; i++) {
AtomicBoolean readResult = new AtomicBoolean(false);
Thread t;
threadList.add(t = getReaderThread(i, readBkc.openLedger(ledgerIds.get(i % numLedgers),
digestType, ledgerPassword), i % numLedgers, readResult));
readResults.add(readResult);
t.start();
}
// Wait for the threads to complete.
for (Thread t : threadList) {
t.join();
}
synchronized (mainSyncObj) {
if (mainSyncObj.failed) {
fail("Test failed because we encountered uncaught exception on reading entries");
}
}
for (AtomicBoolean readResult : readResults) {
assertTrue("Failed on read entries", readResult.get());
}
} catch (BKException e) {
LOG.error("Test failed", e);
fail("Test failed due to BookKeeper exception");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Test failed", e);
fail("Test failed due to interruption");
}
}
@Test
public void test10Ledgers20ThreadsRead() throws IOException {
multiLedgerMultiThreadRead(10, 20);
}
@Test
public void test10Ledgers200ThreadsRead() throws IOException {
multiLedgerMultiThreadRead(10, 200);
}
@Test
public void test1Ledger20ThreadsRead() throws IOException {
multiLedgerMultiThreadRead(1, 20);
}
@Override
public void tearDown() throws Exception {
readBkc.close();
super.tearDown();
}
}