blob: 7bd03d96fc456eddb7395755376977c75b2e5627 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.BookieAccessor;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.LedgerCacheImpl;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test verifies that the period check on the auditor
* will pick up on missing data in the client
*/
public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
private final static Logger LOG = LoggerFactory
.getLogger(AuditorPeriodicCheckTest.class);
private HashMap<String, AuditorElector> auditorElectors = new HashMap<String, AuditorElector>();
private List<ZooKeeper> zkClients = new LinkedList<ZooKeeper>();
private final static int CHECK_INTERVAL = 1; // run every second
public AuditorPeriodicCheckTest() {
super(3);
baseConf.setPageLimit(1); // to make it easy to push ledger out of cache
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
for (int i = 0; i < numBookies; i++) {
ServerConfiguration conf = new ServerConfiguration(bsConfs.get(i));
conf.setAuditorPeriodicCheckInterval(CHECK_INTERVAL);
String addr = StringUtils.addrToString(bs.get(i).getLocalAddress());
ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
ZooKeeper zk = ZkUtils.createConnectedZookeeperClient(
zkUtil.getZooKeeperConnectString(), w);
zkClients.add(zk);
AuditorElector auditorElector = new AuditorElector(addr,
conf, zk);
auditorElectors.put(addr, auditorElector);
auditorElector.start();
LOG.debug("Starting Auditor Elector");
}
}
@After
@Override
public void tearDown() throws Exception {
for (AuditorElector e : auditorElectors.values()) {
e.shutdown();
}
for (ZooKeeper zk : zkClients) {
zk.close();
}
zkClients.clear();
super.tearDown();
}
/**
* test that the periodic checking will detect corruptions in
* the bookie entry log
*/
@Test(timeout=30000)
public void testEntryLogCorruption() throws Exception {
LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bsConfs.get(0), zkc);
LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
underReplicationManager.disableLedgerReplication();
LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
long ledgerId = lh.getId();
for (int i = 0; i < 100; i++) {
lh.addEntry("testdata".getBytes());
}
lh.close();
BookieAccessor.forceFlush(bs.get(0).getBookie());
File ledgerDir = bsConfs.get(0).getLedgerDirs()[0];
ledgerDir = Bookie.getCurrentDirectory(ledgerDir);
// corrupt of entryLogs
File[] entryLogs = ledgerDir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.endsWith(".log");
}
});
ByteBuffer junk = ByteBuffer.allocate(1024*1024);
for (File f : entryLogs) {
FileOutputStream out = new FileOutputStream(f);
out.getChannel().write(junk);
out.close();
}
restartBookies(); // restart to clear read buffers
underReplicationManager.enableLedgerReplication();
long underReplicatedLedger = -1;
for (int i = 0; i < 10; i++) {
underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
if (underReplicatedLedger != -1) {
break;
}
Thread.sleep(CHECK_INTERVAL * 1000);
}
assertEquals("Ledger should be under replicated", ledgerId, underReplicatedLedger);
underReplicationManager.close();
}
/**
* test that the period checker will detect corruptions in
* the bookie index files
*/
@Test(timeout=30000)
public void testIndexCorruption() throws Exception {
LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bsConfs.get(0), zkc);
LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
long ledgerToCorrupt = lh.getId();
for (int i = 0; i < 100; i++) {
lh.addEntry("testdata".getBytes());
}
lh.close();
// push ledgerToCorrupt out of page cache (bookie is configured to only use 1 page)
lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
for (int i = 0; i < 100; i++) {
lh.addEntry("testdata".getBytes());
}
lh.close();
BookieAccessor.forceFlush(bs.get(0).getBookie());
File ledgerDir = bsConfs.get(0).getLedgerDirs()[0];
ledgerDir = Bookie.getCurrentDirectory(ledgerDir);
// corrupt of entryLogs
File index = new File(ledgerDir, LedgerCacheImpl.getLedgerName(ledgerToCorrupt));
LOG.info("file to corrupt{}" , index);
ByteBuffer junk = ByteBuffer.allocate(1024*1024);
FileOutputStream out = new FileOutputStream(index);
out.getChannel().write(junk);
out.close();
long underReplicatedLedger = -1;
for (int i = 0; i < 10; i++) {
underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
if (underReplicatedLedger != -1) {
break;
}
Thread.sleep(CHECK_INTERVAL * 1000);
}
assertEquals("Ledger should be under replicated", ledgerToCorrupt, underReplicatedLedger);
underReplicationManager.close();
}
/**
* Test that the period checker will not run when auto replication has been disabled
*/
@Test(timeout=60000)
public void testPeriodicCheckWhenDisabled() throws Exception {
LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bsConfs.get(0), zkc);
final LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
final int numLedgers = 100;
for (int i = 0; i < numLedgers; i++) {
LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
for (int j = 0; j < 100; j++) {
lh.addEntry("testdata".getBytes());
}
lh.close();
}
underReplicationManager.disableLedgerReplication();
final AtomicInteger numReads = new AtomicInteger(0);
ServerConfiguration conf = killBookie(0);
Bookie deadBookie = new Bookie(conf) {
@Override
public ByteBuffer readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException {
// we want to disable during checking
numReads.incrementAndGet();
throw new IOException("Fake I/O exception");
}
};
bsConfs.add(conf);
bs.add(startBookie(conf, deadBookie));
Thread.sleep(CHECK_INTERVAL * 2000);
assertEquals("Nothing should have tried to read", 0, numReads.get());
underReplicationManager.enableLedgerReplication();
Thread.sleep(CHECK_INTERVAL * 2000); // give it time to run
underReplicationManager.disableLedgerReplication();
// give it time to stop, from this point nothing new should be marked
Thread.sleep(CHECK_INTERVAL * 2000);
int numUnderreplicated = 0;
long underReplicatedLedger = -1;
do {
underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
if (underReplicatedLedger == -1) {
break;
}
numUnderreplicated++;
underReplicationManager.markLedgerReplicated(underReplicatedLedger);
} while (underReplicatedLedger != -1);
Thread.sleep(CHECK_INTERVAL * 2000); // give a chance to run again (it shouldn't, it's disabled)
// ensure that nothing is marked as underreplicated
underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
assertEquals("There should be no underreplicated ledgers", -1, underReplicatedLedger);
LOG.info("{} of {} ledgers underreplicated", numUnderreplicated, numUnderreplicated);
assertTrue("All should be underreplicated",
numUnderreplicated <= numLedgers && numUnderreplicated > 0);
}
/**
* Test that the period check will succeed if a ledger is deleted midway
*/
@Test(timeout=60000)
public void testPeriodicCheckWhenLedgerDeleted() throws Exception {
for (AuditorElector e : auditorElectors.values()) {
e.shutdown();
}
final int numLedgers = 100;
List<Long> ids = new LinkedList<Long>();
for (int i = 0; i < numLedgers; i++) {
LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
ids.add(lh.getId());
for (int j = 0; j < 10; j++) {
lh.addEntry("testdata".getBytes());
}
lh.close();
}
final Auditor auditor = new Auditor(
StringUtils.addrToString(Bookie.getBookieAddress(bsConfs.get(0))),
bsConfs.get(0), zkc);
final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
try {
latch.countDown();
for (int i = 0; i < numLedgers; i++) {
auditor.checkAllLedgers();
}
} catch (Exception e) {
LOG.error("Caught exception while checking all ledgers", e);
exceptionCaught.set(true);
}
}
};
t.start();
latch.await();
for (Long id : ids) {
bkc.deleteLedger(id);
}
t.join();
assertFalse("Shouldn't have thrown exception", exceptionCaught.get());
}
}