| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.replication; |
| |
| import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import io.netty.buffer.ByteBuf; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.UnknownHostException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.bookkeeper.bookie.Bookie; |
| import org.apache.bookkeeper.bookie.BookieAccessor; |
| import org.apache.bookkeeper.bookie.BookieException; |
| import org.apache.bookkeeper.bookie.BookieImpl; |
| import org.apache.bookkeeper.bookie.IndexPersistenceMgr; |
| import org.apache.bookkeeper.client.AsyncCallback.AddCallback; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.client.BookKeeper.DigestType; |
| import org.apache.bookkeeper.client.BookKeeperAdmin; |
| import org.apache.bookkeeper.client.LedgerHandle; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.meta.LedgerManagerFactory; |
| import org.apache.bookkeeper.meta.LedgerUnderreplicationManager; |
| import org.apache.bookkeeper.meta.MetadataBookieDriver; |
| import org.apache.bookkeeper.meta.MetadataDrivers; |
| import org.apache.bookkeeper.net.BookieId; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; |
| import org.apache.bookkeeper.replication.ReplicationException.BKAuditException; |
| import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.test.BookKeeperClusterTestCase; |
| import org.apache.bookkeeper.test.TestStatsProvider; |
| import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger; |
| import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger; |
| import org.apache.zookeeper.KeeperException; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This test verifies that the period check on the auditor |
| * will pick up on missing data in the client. |
| */ |
| public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase { |
| private static final Logger LOG = LoggerFactory |
| .getLogger(AuditorPeriodicCheckTest.class); |
| |
| private MetadataBookieDriver driver; |
| private HashMap<String, AuditorElector> auditorElectors = new HashMap<String, AuditorElector>(); |
| |
| private static final int CHECK_INTERVAL = 1; // run every second |
| |
| public AuditorPeriodicCheckTest() { |
| super(3); |
| baseConf.setPageLimit(1); // to make it easy to push ledger out of cache |
| } |
| |
| @Before |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| |
| for (int i = 0; i < numBookies; i++) { |
| ServerConfiguration conf = new ServerConfiguration(bsConfs.get(i)); |
| conf.setAuditorPeriodicCheckInterval(CHECK_INTERVAL); |
| |
| String addr = bs.get(i).getBookieId().toString(); |
| |
| AuditorElector auditorElector = new AuditorElector(addr, conf); |
| auditorElectors.put(addr, auditorElector); |
| auditorElector.start(); |
| LOG.debug("Starting Auditor Elector"); |
| } |
| |
| driver = MetadataDrivers.getBookieDriver( |
| URI.create(bsConfs.get(0).getMetadataServiceUri())); |
| driver.initialize( |
| bsConfs.get(0), |
| () -> {}, |
| NullStatsLogger.INSTANCE); |
| } |
| |
| @After |
| @Override |
| public void tearDown() throws Exception { |
| if (null != driver) { |
| driver.close(); |
| } |
| |
| for (AuditorElector e : auditorElectors.values()) { |
| e.shutdown(); |
| } |
| super.tearDown(); |
| } |
| |
| /** |
| * test that the periodic checking will detect corruptions in |
| * the bookie entry log. |
| */ |
| @Test |
| public void testEntryLogCorruption() throws Exception { |
| LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); |
| LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager(); |
| underReplicationManager.disableLedgerReplication(); |
| |
| LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); |
| long ledgerId = lh.getId(); |
| for (int i = 0; i < 100; i++) { |
| lh.addEntry("testdata".getBytes()); |
| } |
| lh.close(); |
| |
| BookieAccessor.forceFlush((BookieImpl) bs.get(0).getBookie()); |
| |
| |
| File ledgerDir = bsConfs.get(0).getLedgerDirs()[0]; |
| ledgerDir = BookieImpl.getCurrentDirectory(ledgerDir); |
| // corrupt of entryLogs |
| File[] entryLogs = ledgerDir.listFiles(new FilenameFilter() { |
| public boolean accept(File dir, String name) { |
| return name.endsWith(".log"); |
| } |
| }); |
| ByteBuffer junk = ByteBuffer.allocate(1024 * 1024); |
| for (File f : entryLogs) { |
| FileOutputStream out = new FileOutputStream(f); |
| out.getChannel().write(junk); |
| out.close(); |
| } |
| restartBookies(); // restart to clear read buffers |
| |
| underReplicationManager.enableLedgerReplication(); |
| long underReplicatedLedger = -1; |
| for (int i = 0; i < 10; i++) { |
| underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate(); |
| if (underReplicatedLedger != -1) { |
| break; |
| } |
| Thread.sleep(CHECK_INTERVAL * 1000); |
| } |
| assertEquals("Ledger should be under replicated", ledgerId, underReplicatedLedger); |
| underReplicationManager.close(); |
| } |
| |
| /** |
| * test that the period checker will detect corruptions in |
| * the bookie index files. |
| */ |
| @Test |
| public void testIndexCorruption() throws Exception { |
| LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); |
| |
| LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager(); |
| |
| LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); |
| long ledgerToCorrupt = lh.getId(); |
| for (int i = 0; i < 100; i++) { |
| lh.addEntry("testdata".getBytes()); |
| } |
| lh.close(); |
| |
| // push ledgerToCorrupt out of page cache (bookie is configured to only use 1 page) |
| lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); |
| for (int i = 0; i < 100; i++) { |
| lh.addEntry("testdata".getBytes()); |
| } |
| lh.close(); |
| |
| BookieAccessor.forceFlush((BookieImpl) bs.get(0).getBookie()); |
| |
| File ledgerDir = bsConfs.get(0).getLedgerDirs()[0]; |
| ledgerDir = BookieImpl.getCurrentDirectory(ledgerDir); |
| |
| // corrupt of entryLogs |
| File index = new File(ledgerDir, IndexPersistenceMgr.getLedgerName(ledgerToCorrupt)); |
| LOG.info("file to corrupt{}" , index); |
| ByteBuffer junk = ByteBuffer.allocate(1024 * 1024); |
| FileOutputStream out = new FileOutputStream(index); |
| out.getChannel().write(junk); |
| out.close(); |
| |
| long underReplicatedLedger = -1; |
| for (int i = 0; i < 15; i++) { |
| underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate(); |
| if (underReplicatedLedger != -1) { |
| break; |
| } |
| Thread.sleep(CHECK_INTERVAL * 1000); |
| } |
| assertEquals("Ledger should be under replicated", ledgerToCorrupt, underReplicatedLedger); |
| underReplicationManager.close(); |
| } |
| |
| /** |
| * Test that the period checker will not run when auto replication has been disabled. |
| */ |
| @Test |
| public void testPeriodicCheckWhenDisabled() throws Exception { |
| LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); |
| final LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager(); |
| final int numLedgers = 10; |
| final int numMsgs = 2; |
| final CountDownLatch completeLatch = new CountDownLatch(numMsgs * numLedgers); |
| final AtomicInteger rc = new AtomicInteger(BKException.Code.OK); |
| |
| List<LedgerHandle> lhs = new ArrayList<LedgerHandle>(); |
| for (int i = 0; i < numLedgers; i++) { |
| LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); |
| lhs.add(lh); |
| for (int j = 0; j < 2; j++) { |
| lh.asyncAddEntry("testdata".getBytes(), new AddCallback() { |
| public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) { |
| if (rc.compareAndSet(BKException.Code.OK, rc2)) { |
| LOG.info("Failed to add entry : {}", BKException.getMessage(rc2)); |
| } |
| completeLatch.countDown(); |
| } |
| }, null); |
| } |
| } |
| completeLatch.await(); |
| if (rc.get() != BKException.Code.OK) { |
| throw BKException.create(rc.get()); |
| } |
| |
| for (LedgerHandle lh : lhs) { |
| lh.close(); |
| } |
| |
| underReplicationManager.disableLedgerReplication(); |
| |
| final AtomicInteger numReads = new AtomicInteger(0); |
| ServerConfiguration conf = killBookie(0); |
| |
| Bookie deadBookie = new BookieImpl(conf) { |
| @Override |
| public ByteBuf readEntry(long ledgerId, long entryId) |
| throws IOException, NoLedgerException { |
| // we want to disable during checking |
| numReads.incrementAndGet(); |
| throw new IOException("Fake I/O exception"); |
| } |
| }; |
| bsConfs.add(conf); |
| bs.add(startBookie(conf, deadBookie)); |
| |
| Thread.sleep(CHECK_INTERVAL * 2000); |
| assertEquals("Nothing should have tried to read", 0, numReads.get()); |
| underReplicationManager.enableLedgerReplication(); |
| Thread.sleep(CHECK_INTERVAL * 2000); // give it time to run |
| |
| underReplicationManager.disableLedgerReplication(); |
| // give it time to stop, from this point nothing new should be marked |
| Thread.sleep(CHECK_INTERVAL * 2000); |
| |
| int numUnderreplicated = 0; |
| long underReplicatedLedger = -1; |
| do { |
| underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate(); |
| if (underReplicatedLedger == -1) { |
| break; |
| } |
| numUnderreplicated++; |
| |
| underReplicationManager.markLedgerReplicated(underReplicatedLedger); |
| } while (underReplicatedLedger != -1); |
| |
| Thread.sleep(CHECK_INTERVAL * 2000); // give a chance to run again (it shouldn't, it's disabled) |
| |
| // ensure that nothing is marked as underreplicated |
| underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate(); |
| assertEquals("There should be no underreplicated ledgers", -1, underReplicatedLedger); |
| |
| LOG.info("{} of {} ledgers underreplicated", numUnderreplicated, numUnderreplicated); |
| assertTrue("All should be underreplicated", |
| numUnderreplicated <= numLedgers && numUnderreplicated > 0); |
| } |
| |
| /** |
| * Test that the period check will succeed if a ledger is deleted midway. |
| */ |
| @Test |
| public void testPeriodicCheckWhenLedgerDeleted() throws Exception { |
| for (AuditorElector e : auditorElectors.values()) { |
| e.shutdown(); |
| } |
| |
| final int numLedgers = 10; |
| List<Long> ids = new LinkedList<Long>(); |
| for (int i = 0; i < numLedgers; i++) { |
| LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); |
| ids.add(lh.getId()); |
| for (int j = 0; j < 2; j++) { |
| lh.addEntry("testdata".getBytes()); |
| } |
| lh.close(); |
| } |
| |
| try (final Auditor auditor = new Auditor( |
| BookieImpl.getBookieId(bsConfs.get(0)).toString(), |
| bsConfs.get(0), NullStatsLogger.INSTANCE)) { |
| final AtomicBoolean exceptionCaught = new AtomicBoolean(false); |
| final CountDownLatch latch = new CountDownLatch(1); |
| Thread t = new Thread() { |
| public void run() { |
| try { |
| latch.countDown(); |
| for (int i = 0; i < numLedgers; i++) { |
| auditor.checkAllLedgers(); |
| } |
| } catch (Exception e) { |
| LOG.error("Caught exception while checking all ledgers", e); |
| exceptionCaught.set(true); |
| } |
| } |
| }; |
| t.start(); |
| latch.await(); |
| for (Long id : ids) { |
| bkc.deleteLedger(id); |
| } |
| t.join(); |
| assertFalse("Shouldn't have thrown exception", exceptionCaught.get()); |
| } |
| } |
| |
| @Test |
| public void testInitialDelayOfCheckAllLedgers() throws Exception { |
| for (AuditorElector e : auditorElectors.values()) { |
| e.shutdown(); |
| } |
| |
| final int numLedgers = 10; |
| List<Long> ids = new LinkedList<Long>(); |
| for (int i = 0; i < numLedgers; i++) { |
| LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); |
| ids.add(lh.getId()); |
| for (int j = 0; j < 2; j++) { |
| lh.addEntry("testdata".getBytes()); |
| } |
| lh.close(); |
| } |
| |
| LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); |
| LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager(); |
| |
| ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0)); |
| validateInitialDelayOfCheckAllLedgers(urm, -1, 1000, servConf, bkc); |
| validateInitialDelayOfCheckAllLedgers(urm, 999, 1000, servConf, bkc); |
| validateInitialDelayOfCheckAllLedgers(urm, 1001, 1000, servConf, bkc); |
| } |
| |
| void validateInitialDelayOfCheckAllLedgers(LedgerUnderreplicationManager urm, long timeSinceLastExecutedInSecs, |
| long auditorPeriodicCheckInterval, ServerConfiguration servConf, BookKeeper bkc) |
| throws UnavailableException, UnknownHostException, InterruptedException { |
| TestStatsProvider statsProvider = new TestStatsProvider(); |
| TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE); |
| TestOpStatsLogger checkAllLedgersStatsLogger = (TestOpStatsLogger) statsLogger |
| .getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME); |
| servConf.setAuditorPeriodicCheckInterval(auditorPeriodicCheckInterval); |
| servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0); |
| servConf.setAuditorPeriodicBookieCheckInterval(0); |
| |
| final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, |
| statsLogger); |
| CountDownLatch latch = auditor.getLatch(); |
| assertEquals("CHECK_ALL_LEDGERS_TIME SuccessCount", 0, checkAllLedgersStatsLogger.getSuccessCount()); |
| long curTimeBeforeStart = System.currentTimeMillis(); |
| long checkAllLedgersCTime = -1; |
| long initialDelayInMsecs = -1; |
| long nextExpectedCheckAllLedgersExecutionTime = -1; |
| long bufferTimeInMsecs = 12000L; |
| if (timeSinceLastExecutedInSecs == -1) { |
| /* |
| * if we are setting checkAllLedgersCTime to -1, it means that |
| * checkAllLedgers hasn't run before. So initialDelay for |
| * checkAllLedgers should be 0. |
| */ |
| checkAllLedgersCTime = -1; |
| initialDelayInMsecs = 0; |
| } else { |
| checkAllLedgersCTime = curTimeBeforeStart - timeSinceLastExecutedInSecs * 1000L; |
| initialDelayInMsecs = timeSinceLastExecutedInSecs > auditorPeriodicCheckInterval ? 0 |
| : (auditorPeriodicCheckInterval - timeSinceLastExecutedInSecs) * 1000L; |
| } |
| /* |
| * next checkAllLedgers should happen atleast after |
| * nextExpectedCheckAllLedgersExecutionTime. |
| */ |
| nextExpectedCheckAllLedgersExecutionTime = curTimeBeforeStart + initialDelayInMsecs; |
| |
| urm.setCheckAllLedgersCTime(checkAllLedgersCTime); |
| auditor.start(); |
| /* |
| * since auditorPeriodicCheckInterval are higher values (in the order of |
| * 100s of seconds), its ok bufferTimeInMsecs to be ` 10 secs. |
| */ |
| assertTrue("checkAllLedgers should have executed with initialDelay " + initialDelayInMsecs, |
| latch.await(initialDelayInMsecs + bufferTimeInMsecs, TimeUnit.MILLISECONDS)); |
| for (int i = 0; i < 10; i++) { |
| Thread.sleep(100); |
| if (checkAllLedgersStatsLogger.getSuccessCount() >= 1) { |
| break; |
| } |
| } |
| assertEquals("CHECK_ALL_LEDGERS_TIME SuccessCount", 1, checkAllLedgersStatsLogger.getSuccessCount()); |
| long currentCheckAllLedgersCTime = urm.getCheckAllLedgersCTime(); |
| assertTrue( |
| "currentCheckAllLedgersCTime: " + currentCheckAllLedgersCTime |
| + " should be greater than nextExpectedCheckAllLedgersExecutionTime: " |
| + nextExpectedCheckAllLedgersExecutionTime, |
| currentCheckAllLedgersCTime > nextExpectedCheckAllLedgersExecutionTime); |
| assertTrue( |
| "currentCheckAllLedgersCTime: " + currentCheckAllLedgersCTime |
| + " should be lesser than nextExpectedCheckAllLedgersExecutionTime+bufferTimeInMsecs: " |
| + (nextExpectedCheckAllLedgersExecutionTime + bufferTimeInMsecs), |
| currentCheckAllLedgersCTime < (nextExpectedCheckAllLedgersExecutionTime + bufferTimeInMsecs)); |
| auditor.close(); |
| } |
| |
| @Test |
| public void testInitialDelayOfPlacementPolicyCheck() throws Exception { |
| for (AuditorElector e : auditorElectors.values()) { |
| e.shutdown(); |
| } |
| |
| final int numLedgers = 10; |
| List<Long> ids = new LinkedList<Long>(); |
| for (int i = 0; i < numLedgers; i++) { |
| LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); |
| ids.add(lh.getId()); |
| for (int j = 0; j < 2; j++) { |
| lh.addEntry("testdata".getBytes()); |
| } |
| lh.close(); |
| } |
| |
| LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); |
| LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager(); |
| |
| ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0)); |
| validateInitialDelayOfPlacementPolicyCheck(urm, -1, 1000, servConf, bkc); |
| validateInitialDelayOfPlacementPolicyCheck(urm, 999, 1000, servConf, bkc); |
| validateInitialDelayOfPlacementPolicyCheck(urm, 1001, 1000, servConf, bkc); |
| } |
| |
| void validateInitialDelayOfPlacementPolicyCheck(LedgerUnderreplicationManager urm, long timeSinceLastExecutedInSecs, |
| long auditorPeriodicPlacementPolicyCheckInterval, ServerConfiguration servConf, BookKeeper bkc) |
| throws UnavailableException, UnknownHostException, InterruptedException { |
| TestStatsProvider statsProvider = new TestStatsProvider(); |
| TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE); |
| TestOpStatsLogger placementPolicyCheckStatsLogger = (TestOpStatsLogger) statsLogger |
| .getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME); |
| servConf.setAuditorPeriodicPlacementPolicyCheckInterval(auditorPeriodicPlacementPolicyCheckInterval); |
| servConf.setAuditorPeriodicCheckInterval(0); |
| servConf.setAuditorPeriodicBookieCheckInterval(0); |
| |
| final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, |
| statsLogger); |
| CountDownLatch latch = auditor.getLatch(); |
| assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0, placementPolicyCheckStatsLogger.getSuccessCount()); |
| long curTimeBeforeStart = System.currentTimeMillis(); |
| long placementPolicyCheckCTime = -1; |
| long initialDelayInMsecs = -1; |
| long nextExpectedPlacementPolicyCheckExecutionTime = -1; |
| long bufferTimeInMsecs = 20000L; |
| if (timeSinceLastExecutedInSecs == -1) { |
| /* |
| * if we are setting placementPolicyCheckCTime to -1, it means that |
| * placementPolicyCheck hasn't run before. So initialDelay for |
| * placementPolicyCheck should be 0. |
| */ |
| placementPolicyCheckCTime = -1; |
| initialDelayInMsecs = 0; |
| } else { |
| placementPolicyCheckCTime = curTimeBeforeStart - timeSinceLastExecutedInSecs * 1000L; |
| initialDelayInMsecs = timeSinceLastExecutedInSecs > auditorPeriodicPlacementPolicyCheckInterval ? 0 |
| : (auditorPeriodicPlacementPolicyCheckInterval - timeSinceLastExecutedInSecs) * 1000L; |
| } |
| /* |
| * next placementPolicyCheck should happen atleast after |
| * nextExpectedPlacementPolicyCheckExecutionTime. |
| */ |
| nextExpectedPlacementPolicyCheckExecutionTime = curTimeBeforeStart + initialDelayInMsecs; |
| |
| urm.setPlacementPolicyCheckCTime(placementPolicyCheckCTime); |
| auditor.start(); |
| /* |
| * since auditorPeriodicPlacementPolicyCheckInterval are higher values (in the |
| * order of 100s of seconds), its ok bufferTimeInMsecs to be ` 20 secs. |
| */ |
| assertTrue("placementPolicyCheck should have executed with initialDelay " + initialDelayInMsecs, |
| latch.await(initialDelayInMsecs + bufferTimeInMsecs, TimeUnit.MILLISECONDS)); |
| for (int i = 0; i < 20; i++) { |
| Thread.sleep(100); |
| if (placementPolicyCheckStatsLogger.getSuccessCount() >= 1) { |
| break; |
| } |
| } |
| assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 1, placementPolicyCheckStatsLogger.getSuccessCount()); |
| long currentPlacementPolicyCheckCTime = urm.getPlacementPolicyCheckCTime(); |
| assertTrue( |
| "currentPlacementPolicyCheckCTime: " + currentPlacementPolicyCheckCTime |
| + " should be greater than nextExpectedPlacementPolicyCheckExecutionTime: " |
| + nextExpectedPlacementPolicyCheckExecutionTime, |
| currentPlacementPolicyCheckCTime > nextExpectedPlacementPolicyCheckExecutionTime); |
| assertTrue( |
| "currentPlacementPolicyCheckCTime: " + currentPlacementPolicyCheckCTime |
| + " should be lesser than nextExpectedPlacementPolicyCheckExecutionTime+bufferTimeInMsecs: " |
| + (nextExpectedPlacementPolicyCheckExecutionTime + bufferTimeInMsecs), |
| currentPlacementPolicyCheckCTime < (nextExpectedPlacementPolicyCheckExecutionTime + bufferTimeInMsecs)); |
| auditor.close(); |
| } |
| |
| @Test |
| public void testInitialDelayOfReplicasCheck() throws Exception { |
| for (AuditorElector e : auditorElectors.values()) { |
| e.shutdown(); |
| } |
| |
| LedgerHandle lh = bkc.createLedger(3, 2, DigestType.CRC32, "passwd".getBytes()); |
| for (int j = 0; j < 5; j++) { |
| lh.addEntry("testdata".getBytes()); |
| } |
| lh.close(); |
| |
| long ledgerId = 100000L; |
| lh = bkc.createLedgerAdv(ledgerId, 3, 2, 2, DigestType.CRC32, "passwd".getBytes(), null); |
| lh.close(); |
| |
| ledgerId = 100001234L; |
| lh = bkc.createLedgerAdv(ledgerId, 3, 3, 2, DigestType.CRC32, "passwd".getBytes(), null); |
| for (int j = 0; j < 4; j++) { |
| lh.addEntry(j, "testdata".getBytes()); |
| } |
| lh.close(); |
| |
| ledgerId = 991234L; |
| lh = bkc.createLedgerAdv(ledgerId, 3, 2, 2, DigestType.CRC32, "passwd".getBytes(), null); |
| lh.addEntry(0, "testdata".getBytes()); |
| lh.close(); |
| |
| LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); |
| LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager(); |
| |
| ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0)); |
| validateInitialDelayOfReplicasCheck(urm, -1, 1000, servConf, bkc); |
| validateInitialDelayOfReplicasCheck(urm, 999, 1000, servConf, bkc); |
| validateInitialDelayOfReplicasCheck(urm, 1001, 1000, servConf, bkc); |
| } |
| |
| void validateInitialDelayOfReplicasCheck(LedgerUnderreplicationManager urm, long timeSinceLastExecutedInSecs, |
| long auditorPeriodicReplicasCheckInterval, ServerConfiguration servConf, BookKeeper bkc) |
| throws UnavailableException, UnknownHostException, InterruptedException { |
| TestStatsProvider statsProvider = new TestStatsProvider(); |
| TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE); |
| TestOpStatsLogger replicasCheckStatsLogger = (TestOpStatsLogger) statsLogger |
| .getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME); |
| servConf.setAuditorPeriodicReplicasCheckInterval(auditorPeriodicReplicasCheckInterval); |
| servConf.setAuditorPeriodicCheckInterval(0); |
| servConf.setAuditorPeriodicBookieCheckInterval(0); |
| final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, |
| statsLogger); |
| CountDownLatch latch = auditor.getLatch(); |
| assertEquals("REPLICAS_CHECK_TIME SuccessCount", 0, replicasCheckStatsLogger.getSuccessCount()); |
| long curTimeBeforeStart = System.currentTimeMillis(); |
| long replicasCheckCTime = -1; |
| long initialDelayInMsecs = -1; |
| long nextExpectedReplicasCheckExecutionTime = -1; |
| long bufferTimeInMsecs = 20000L; |
| if (timeSinceLastExecutedInSecs == -1) { |
| /* |
| * if we are setting replicasCheckCTime to -1, it means that |
| * replicasCheck hasn't run before. So initialDelay for |
| * replicasCheck should be 0. |
| */ |
| replicasCheckCTime = -1; |
| initialDelayInMsecs = 0; |
| } else { |
| replicasCheckCTime = curTimeBeforeStart - timeSinceLastExecutedInSecs * 1000L; |
| initialDelayInMsecs = timeSinceLastExecutedInSecs > auditorPeriodicReplicasCheckInterval ? 0 |
| : (auditorPeriodicReplicasCheckInterval - timeSinceLastExecutedInSecs) * 1000L; |
| } |
| /* |
| * next replicasCheck should happen atleast after |
| * nextExpectedReplicasCheckExecutionTime. |
| */ |
| nextExpectedReplicasCheckExecutionTime = curTimeBeforeStart + initialDelayInMsecs; |
| |
| urm.setReplicasCheckCTime(replicasCheckCTime); |
| auditor.start(); |
| /* |
| * since auditorPeriodicReplicasCheckInterval are higher values (in the |
| * order of 100s of seconds), its ok bufferTimeInMsecs to be ` 20 secs. |
| */ |
| assertTrue("replicasCheck should have executed with initialDelay " + initialDelayInMsecs, |
| latch.await(initialDelayInMsecs + bufferTimeInMsecs, TimeUnit.MILLISECONDS)); |
| for (int i = 0; i < 20; i++) { |
| Thread.sleep(100); |
| if (replicasCheckStatsLogger.getSuccessCount() >= 1) { |
| break; |
| } |
| } |
| assertEquals("REPLICAS_CHECK_TIME SuccessCount", 1, replicasCheckStatsLogger.getSuccessCount()); |
| long currentReplicasCheckCTime = urm.getReplicasCheckCTime(); |
| assertTrue( |
| "currentReplicasCheckCTime: " + currentReplicasCheckCTime |
| + " should be greater than nextExpectedReplicasCheckExecutionTime: " |
| + nextExpectedReplicasCheckExecutionTime, |
| currentReplicasCheckCTime > nextExpectedReplicasCheckExecutionTime); |
| assertTrue( |
| "currentReplicasCheckCTime: " + currentReplicasCheckCTime |
| + " should be lesser than nextExpectedReplicasCheckExecutionTime+bufferTimeInMsecs: " |
| + (nextExpectedReplicasCheckExecutionTime + bufferTimeInMsecs), |
| currentReplicasCheckCTime < (nextExpectedReplicasCheckExecutionTime + bufferTimeInMsecs)); |
| auditor.close(); |
| } |
| |
| static class TestAuditor extends Auditor { |
| |
| final AtomicReference<CountDownLatch> latchRef = new AtomicReference<CountDownLatch>(new CountDownLatch(1)); |
| |
| public TestAuditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc, |
| StatsLogger statsLogger) throws UnavailableException { |
| super(bookieIdentifier, conf, bkc, ownBkc, statsLogger); |
| } |
| |
| public TestAuditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc, |
| BookKeeperAdmin bkadmin, boolean ownadmin, StatsLogger statsLogger) throws UnavailableException { |
| super(bookieIdentifier, conf, bkc, ownBkc, bkadmin, ownadmin, statsLogger); |
| } |
| |
| public TestAuditor(final String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger) |
| throws UnavailableException { |
| super(bookieIdentifier, conf, statsLogger); |
| } |
| |
| void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException { |
| super.checkAllLedgers(); |
| latchRef.get().countDown(); |
| } |
| |
| void placementPolicyCheck() throws BKAuditException { |
| super.placementPolicyCheck(); |
| latchRef.get().countDown(); |
| } |
| |
| void replicasCheck() throws BKAuditException { |
| super.replicasCheck(); |
| latchRef.get().countDown(); |
| } |
| |
| CountDownLatch getLatch() { |
| return latchRef.get(); |
| } |
| |
| void setLatch(CountDownLatch latch) { |
| latchRef.set(latch); |
| } |
| } |
| |
| private BookieId replaceBookieWithWriteFailingBookie(LedgerHandle lh) throws Exception { |
| int bookieIdx = -1; |
| Long entryId = lh.getLedgerMetadata().getAllEnsembles().firstKey(); |
| List<BookieId> curEnsemble = lh.getLedgerMetadata().getAllEnsembles().get(entryId); |
| |
| // Identify a bookie in the current ledger ensemble to be replaced |
| BookieId replacedBookie = null; |
| for (int i = 0; i < numBookies; i++) { |
| if (curEnsemble.contains(bs.get(i).getBookieId())) { |
| bookieIdx = i; |
| replacedBookie = bs.get(i).getBookieId(); |
| break; |
| } |
| } |
| assertNotEquals("Couldn't find ensemble bookie in bookie list", -1, bookieIdx); |
| |
| LOG.info("Killing bookie " + bs.get(bookieIdx).getBookieId()); |
| ServerConfiguration conf = killBookie(bookieIdx); |
| Bookie writeFailingBookie = new BookieImpl(conf) { |
| @Override |
| public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb, |
| Object ctx, byte[] masterKey) |
| throws IOException, BookieException { |
| try { |
| LOG.info("Failing write to entry "); |
| // sleep a bit so that writes to other bookies succeed before |
| // the client hears about the failure on this bookie. If the |
| // client gets ack-quorum number of acks first, it won't care |
| // about any failures and won't reform the ensemble. |
| Thread.sleep(100); |
| throw new IOException(); |
| } catch (InterruptedException ie) { |
| // ignore, only interrupted if shutting down, |
| // and an exception would spam the logs |
| Thread.currentThread().interrupt(); |
| } |
| } |
| }; |
| bsConfs.add(conf); |
| bs.add(startBookie(conf, writeFailingBookie)); |
| return replacedBookie; |
| } |
| |
| /* |
| * Validates that the periodic ledger check will fix entries with a failed write. |
| */ |
| @Test |
| public void testFailedWriteRecovery() throws Exception { |
| LedgerManagerFactory mFactory = driver.getLedgerManagerFactory(); |
| LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager(); |
| underReplicationManager.disableLedgerReplication(); |
| |
| LedgerHandle lh = bkc.createLedger(2, 2, 1, DigestType.CRC32, "passwd".getBytes()); |
| |
| // kill one of the bookies and replace it with one that rejects write; |
| // This way we get into the under replication state |
| BookieId replacedBookie = replaceBookieWithWriteFailingBookie(lh); |
| |
| // Write a few entries; this should cause under replication |
| byte[] data = "foobar".getBytes(); |
| data = "foobar".getBytes(); |
| lh.addEntry(data); |
| lh.addEntry(data); |
| lh.addEntry(data); |
| |
| lh.close(); |
| |
| // enable under replication detection and wait for it to report |
| // under replicated ledger |
| underReplicationManager.enableLedgerReplication(); |
| long underReplicatedLedger = -1; |
| for (int i = 0; i < 5; i++) { |
| underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate(); |
| if (underReplicatedLedger != -1) { |
| break; |
| } |
| Thread.sleep(CHECK_INTERVAL * 1000); |
| } |
| assertEquals("Ledger should be under replicated", lh.getId(), underReplicatedLedger); |
| |
| // now start the replication workers |
| List<ReplicationWorker> l = new ArrayList<ReplicationWorker>(); |
| for (int i = 0; i < numBookies; i++) { |
| ReplicationWorker rw = new ReplicationWorker(bsConfs.get(i), NullStatsLogger.INSTANCE); |
| rw.start(); |
| l.add(rw); |
| } |
| underReplicationManager.close(); |
| |
| // Wait for ensemble to change after replication |
| Thread.sleep(3000); |
| for (ReplicationWorker rw : l) { |
| rw.shutdown(); |
| } |
| |
| // check that ensemble has changed and the bookie that rejected writes has |
| // been replaced in the ensemble |
| LedgerHandle newLh = bkc.openLedger(lh.getId(), DigestType.CRC32, "passwd".getBytes()); |
| for (Map.Entry<Long, ? extends List<BookieId>> e : |
| newLh.getLedgerMetadata().getAllEnsembles().entrySet()) { |
| List<BookieId> ensemble = e.getValue(); |
| assertFalse("Ensemble hasn't been updated", ensemble.contains(replacedBookie)); |
| } |
| newLh.close(); |
| } |
| } |