| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.spy; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import com.google.common.base.Supplier; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.fs.permission.PermissionStatus; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; |
| import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; |
| import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; |
| import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.Time; |
| import org.apache.log4j.Level; |
| import org.mockito.Mockito; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.Parameters; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| /** |
| * This class tests various synchronization bugs in FSEditLog rolling |
| * and namespace saving. |
| */ |
| @RunWith(Parameterized.class) |
| public class TestEditLogRace { |
| static { |
| GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); |
| } |
| |
| @Parameters |
| public static Collection<Object[]> data() { |
| Collection<Object[]> params = new ArrayList<Object[]>(); |
| params.add(new Object[]{ false }); |
| params.add(new Object[]{ true }); |
| return params; |
| } |
| |
| private static boolean useAsyncEditLog; |
| |
| public TestEditLogRace(boolean useAsyncEditLog) { |
| TestEditLogRace.useAsyncEditLog = useAsyncEditLog; |
| } |
| |
| private static final String NAME_DIR = MiniDFSCluster.getBaseDirectory() + "name-0-1"; |
| |
| private static final Log LOG = LogFactory.getLog(TestEditLogRace.class); |
| |
| // This test creates NUM_THREADS threads and each thread continuously writes |
| // transactions |
| static final int NUM_THREADS = 16; |
| |
| /** |
| * The number of times to roll the edit log during the test. Since this |
| * tests for a race condition, higher numbers are more likely to find |
| * a bug if it exists, but the test will take longer. |
| */ |
| static final int NUM_ROLLS = 30; |
| |
| /** |
| * The number of times to save the fsimage and create an empty edit log. |
| */ |
| static final int NUM_SAVE_IMAGE = 30; |
| |
| private final List<Transactions> workers = new ArrayList<Transactions>(); |
| |
| private static final int NUM_DATA_NODES = 1; |
| |
| /** |
| * Several of the test cases work by introducing a sleep |
| * into an operation that is usually fast, and then verifying |
| * that another operation blocks for at least this amount of time. |
| * This value needs to be significantly longer than the average |
| * time for an fsync() or enterSafeMode(). |
| */ |
| private static final int BLOCK_TIME = 4; // 4 sec pretty generous |
| |
| // |
| // an object that does a bunch of transactions |
| // |
| static class Transactions implements Runnable { |
| final NamenodeProtocols nn; |
| final MiniDFSCluster cluster; |
| FileSystem fs; |
| short replication = 3; |
| long blockSize = 64; |
| volatile boolean stopped = false; |
| volatile Thread thr; |
| final AtomicReference<Throwable> caught; |
| |
| Transactions(MiniDFSCluster cluster, AtomicReference<Throwable> caught) { |
| this.cluster = cluster; |
| this.nn = cluster.getNameNodeRpc(); |
| try { |
| this.fs = cluster.getFileSystem(); |
| } catch (IOException e) { |
| caught.set(e); |
| } |
| this.caught = caught; |
| } |
| |
| // add a bunch of transactions. |
| @Override |
| public void run() { |
| thr = Thread.currentThread(); |
| FsPermission p = new FsPermission((short)0777); |
| |
| int i = 0; |
| while (!stopped) { |
| try { |
| String dirname = "/thr-" + thr.getId() + "-dir-" + i; |
| if (i % 2 == 0) { |
| Path dirnamePath = new Path(dirname); |
| fs.mkdirs(dirnamePath); |
| fs.delete(dirnamePath, true); |
| } else { |
| nn.mkdirs(dirname, p, true); |
| nn.delete(dirname, true); |
| } |
| } catch (SafeModeException sme) { |
| // This is OK - the tests will bring NN in and out of safemode |
| } catch (Throwable e) { |
| // This is OK - the tests will bring NN in and out of safemode |
| if (e instanceof RemoteException && |
| ((RemoteException)e).getClassName() |
| .contains("SafeModeException")) { |
| return; |
| } |
| LOG.warn("Got error in transaction thread", e); |
| caught.compareAndSet(null, e); |
| break; |
| } |
| i++; |
| } |
| } |
| |
| public void stop() { |
| stopped = true; |
| } |
| |
| public Thread getThread() { |
| return thr; |
| } |
| } |
| |
| private void startTransactionWorkers(MiniDFSCluster cluster, |
| AtomicReference<Throwable> caughtErr) { |
| // Create threads and make them run transactions concurrently. |
| for (int i = 0; i < NUM_THREADS; i++) { |
| Transactions trans = new Transactions(cluster, caughtErr); |
| new Thread(trans, "TransactionThread-" + i).start(); |
| workers.add(trans); |
| } |
| } |
| |
| private void stopTransactionWorkers() { |
| // wait for all transactions to get over |
| for (Transactions worker : workers) { |
| worker.stop(); |
| } |
| |
| for (Transactions worker : workers) { |
| Thread thr = worker.getThread(); |
| try { |
| if (thr != null) thr.join(); |
| } catch (InterruptedException ignored) {} |
| } |
| } |
| |
| /** |
| * Tests rolling edit logs while transactions are ongoing. |
| */ |
| @Test |
| public void testEditLogRolling() throws Exception { |
| // start a cluster |
| Configuration conf = getConf(); |
| final MiniDFSCluster cluster = |
| new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); |
| FileSystem fileSys = null; |
| |
| |
| AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>(); |
| try { |
| cluster.waitActive(); |
| fileSys = cluster.getFileSystem(); |
| final NamenodeProtocols nn = cluster.getNameNode().getRpcServer(); |
| FSImage fsimage = cluster.getNamesystem().getFSImage(); |
| StorageDirectory sd = fsimage.getStorage().getStorageDir(0); |
| |
| startTransactionWorkers(cluster, caughtErr); |
| |
| long previousLogTxId = 1; |
| |
| for (int i = 0; i < NUM_ROLLS && caughtErr.get() == null; i++) { |
| try { |
| Thread.sleep(20); |
| } catch (InterruptedException e) {} |
| |
| LOG.info("Starting roll " + i + "."); |
| CheckpointSignature sig = nn.rollEditLog(); |
| |
| long nextLog = sig.curSegmentTxId; |
| String logFileName = NNStorage.getFinalizedEditsFileName( |
| previousLogTxId, nextLog - 1); |
| previousLogTxId += verifyEditLogs(cluster.getNamesystem(), fsimage, |
| logFileName, previousLogTxId); |
| |
| assertEquals(previousLogTxId, nextLog); |
| |
| File expectedLog = NNStorage.getInProgressEditsFile(sd, previousLogTxId); |
| assertTrue("Expect " + expectedLog + " to exist", expectedLog.exists()); |
| } |
| } finally { |
| stopTransactionWorkers(); |
| if (caughtErr.get() != null) { |
| throw new RuntimeException(caughtErr.get()); |
| } |
| |
| if(fileSys != null) fileSys.close(); |
| if(cluster != null) cluster.shutdown(); |
| } |
| } |
| |
| private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage, |
| String logFileName, long startTxId) |
| throws IOException { |
| |
| long numEdits = -1; |
| |
| // Verify that we can read in all the transactions that we have written. |
| // If there were any corruptions, it is likely that the reading in |
| // of these transactions will throw an exception. |
| for (StorageDirectory sd : |
| fsimage.getStorage().dirIterable(NameNodeDirType.EDITS)) { |
| |
| File editFile = new File(sd.getCurrentDir(), logFileName); |
| |
| System.out.println("Verifying file: " + editFile); |
| FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId); |
| long numEditsThisLog = loader.loadFSEdits( |
| new EditLogFileInputStream(editFile), startTxId); |
| |
| System.out.println("Number of edits: " + numEditsThisLog); |
| assertTrue(numEdits == -1 || numEditsThisLog == numEdits); |
| numEdits = numEditsThisLog; |
| } |
| |
| assertTrue(numEdits != -1); |
| return numEdits; |
| } |
| |
| /** |
| * Tests saving fs image while transactions are ongoing. |
| */ |
| @Test |
| public void testSaveNamespace() throws Exception { |
| // start a cluster |
| Configuration conf = getConf(); |
| MiniDFSCluster cluster = null; |
| FileSystem fileSys = null; |
| |
| AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>(); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); |
| cluster.waitActive(); |
| fileSys = cluster.getFileSystem(); |
| final FSNamesystem namesystem = cluster.getNamesystem(); |
| |
| FSImage fsimage = namesystem.getFSImage(); |
| FSEditLog editLog = fsimage.getEditLog(); |
| |
| startTransactionWorkers(cluster, caughtErr); |
| |
| for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) { |
| try { |
| Thread.sleep(20); |
| } catch (InterruptedException ignored) {} |
| |
| |
| LOG.info("Save " + i + ": entering safe mode"); |
| namesystem.enterSafeMode(false); |
| |
| // Verify edit logs before the save |
| // They should start with the first edit after the checkpoint |
| long logStartTxId = fsimage.getStorage().getMostRecentCheckpointTxId() + 1; |
| verifyEditLogs(namesystem, fsimage, |
| NNStorage.getInProgressEditsFileName(logStartTxId), |
| logStartTxId); |
| |
| |
| LOG.info("Save " + i + ": saving namespace"); |
| namesystem.saveNamespace(); |
| LOG.info("Save " + i + ": leaving safemode"); |
| |
| long savedImageTxId = fsimage.getStorage().getMostRecentCheckpointTxId(); |
| |
| // Verify that edit logs post save got finalized and aren't corrupt |
| verifyEditLogs(namesystem, fsimage, |
| NNStorage.getFinalizedEditsFileName(logStartTxId, savedImageTxId), |
| logStartTxId); |
| |
| // The checkpoint id should be 1 less than the last written ID, since |
| // the log roll writes the "BEGIN" transaction to the new log. |
| assertEquals(fsimage.getStorage().getMostRecentCheckpointTxId(), |
| editLog.getLastWrittenTxId() - 1); |
| |
| namesystem.leaveSafeMode(false); |
| LOG.info("Save " + i + ": complete"); |
| } |
| } finally { |
| stopTransactionWorkers(); |
| if (caughtErr.get() != null) { |
| throw new RuntimeException(caughtErr.get()); |
| } |
| if(fileSys != null) fileSys.close(); |
| if(cluster != null) cluster.shutdown(); |
| } |
| } |
| |
| private Configuration getConf() { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, |
| useAsyncEditLog); |
| FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); |
| conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); |
| conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR); |
| conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR); |
| conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); |
| return conf; |
| } |
| |
| |
| /** |
| * The logSync() method in FSEditLog is unsynchronized whiel syncing |
| * so that other threads can concurrently enqueue edits while the prior |
| * sync is ongoing. This test checks that the log is saved correctly |
| * if the saveImage occurs while the syncing thread is in the unsynchronized middle section. |
| * |
| * This replicates the following manual test proposed by Konstantin: |
| * I start the name-node in debugger. |
| * I do -mkdir and stop the debugger in logSync() just before it does flush. |
| * Then I enter safe mode with another client |
| * I start saveNamepsace and stop the debugger in |
| * FSImage.saveFSImage() -> FSEditLog.createEditLogFile() |
| * -> EditLogFileOutputStream.create() -> |
| * after truncating the file but before writing LAYOUT_VERSION into it. |
| * Then I let logSync() run. |
| * Then I terminate the name-node. |
| * After that the name-node wont start, since the edits file is broken. |
| */ |
| @Test |
| public void testSaveImageWhileSyncInProgress() throws Exception { |
| Configuration conf = getConf(); |
| NameNode.initMetrics(conf, NamenodeRole.NAMENODE); |
| DFSTestUtil.formatNameNode(conf); |
| final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf); |
| |
| try { |
| FSImage fsimage = namesystem.getFSImage(); |
| FSEditLog editLog = fsimage.getEditLog(); |
| |
| JournalAndStream jas = editLog.getJournals().get(0); |
| EditLogFileOutputStream spyElos = |
| spy((EditLogFileOutputStream)jas.getCurrentStream()); |
| jas.setCurrentStreamForTests(spyElos); |
| |
| final AtomicReference<Throwable> deferredException = |
| new AtomicReference<Throwable>(); |
| final CountDownLatch waitToEnterFlush = new CountDownLatch(1); |
| |
| final Thread doAnEditThread = new Thread() { |
| @Override |
| public void run() { |
| try { |
| LOG.info("Starting mkdirs"); |
| namesystem.mkdirs("/test", |
| new PermissionStatus("test","test", new FsPermission((short)00755)), |
| true); |
| LOG.info("mkdirs complete"); |
| } catch (Throwable ioe) { |
| LOG.fatal("Got exception", ioe); |
| deferredException.set(ioe); |
| waitToEnterFlush.countDown(); |
| } |
| } |
| }; |
| |
| Answer<Void> blockingFlush = new Answer<Void>() { |
| @Override |
| public Void answer(InvocationOnMock invocation) throws Throwable { |
| LOG.info("Flush called"); |
| if (useAsyncEditLog || Thread.currentThread() == doAnEditThread) { |
| LOG.info("edit thread: Telling main thread we made it to flush section..."); |
| // Signal to main thread that the edit thread is in the racy section |
| waitToEnterFlush.countDown(); |
| LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs"); |
| Thread.sleep(BLOCK_TIME*1000); |
| LOG.info("Going through to flush. This will allow the main thread to continue."); |
| } |
| invocation.callRealMethod(); |
| LOG.info("Flush complete"); |
| return null; |
| } |
| }; |
| doAnswer(blockingFlush).when(spyElos).flush(); |
| |
| doAnEditThread.start(); |
| // Wait for the edit thread to get to the logsync unsynchronized section |
| LOG.info("Main thread: waiting to enter flush..."); |
| waitToEnterFlush.await(); |
| assertNull(deferredException.get()); |
| LOG.info("Main thread: detected that logSync is in unsynchronized section."); |
| LOG.info("Trying to enter safe mode."); |
| LOG.info("This should block for " + BLOCK_TIME + "sec, since flush will sleep that long"); |
| |
| long st = Time.now(); |
| namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); |
| long et = Time.now(); |
| LOG.info("Entered safe mode"); |
| // Make sure we really waited for the flush to complete! |
| assertTrue(et - st > (BLOCK_TIME - 1)*1000); |
| |
| // Once we're in safe mode, save namespace. |
| namesystem.saveNamespace(); |
| |
| LOG.info("Joining on edit thread..."); |
| doAnEditThread.join(); |
| assertNull(deferredException.get()); |
| |
| // We did 3 edits: begin, txn, and end |
| assertEquals(3, verifyEditLogs(namesystem, fsimage, |
| NNStorage.getFinalizedEditsFileName(1, 3), |
| 1)); |
| // after the save, just the one "begin" |
| assertEquals(1, verifyEditLogs(namesystem, fsimage, |
| NNStorage.getInProgressEditsFileName(4), |
| 4)); |
| } finally { |
| LOG.info("Closing nn"); |
| if(namesystem != null) namesystem.close(); |
| } |
| } |
| |
| /** |
| * Most of the FSNamesystem methods have a synchronized section where they |
| * update the name system itself and write to the edit log, and then |
| * unsynchronized, they call logSync. This test verifies that, if an |
| * operation has written to the edit log but not yet synced it, |
| * we wait for that sync before entering safe mode. |
| */ |
| @Test |
| public void testSaveRightBeforeSync() throws Exception { |
| Configuration conf = getConf(); |
| NameNode.initMetrics(conf, NamenodeRole.NAMENODE); |
| DFSTestUtil.formatNameNode(conf); |
| final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf); |
| |
| try { |
| FSImage fsimage = namesystem.getFSImage(); |
| final FSEditLog editLog = fsimage.getEditLog(); |
| |
| final AtomicReference<Throwable> deferredException = |
| new AtomicReference<Throwable>(); |
| final CountDownLatch sleepingBeforeSync = new CountDownLatch(1); |
| |
| final Thread doAnEditThread = new Thread() { |
| @Override |
| public void run() { |
| try { |
| LOG.info("Starting setOwner"); |
| namesystem.writeLock(); |
| try { |
| editLog.logSetOwner("/","test","test"); |
| } finally { |
| namesystem.writeUnlock(); |
| } |
| sleepingBeforeSync.countDown(); |
| LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs"); |
| Thread.sleep(BLOCK_TIME*1000); |
| editLog.logSync(); |
| LOG.info("edit thread: logSync complete"); |
| } catch (Throwable ioe) { |
| LOG.fatal("Got exception", ioe); |
| deferredException.set(ioe); |
| sleepingBeforeSync.countDown(); |
| } |
| } |
| }; |
| doAnEditThread.setDaemon(true); |
| doAnEditThread.start(); |
| LOG.info("Main thread: waiting to just before logSync..."); |
| sleepingBeforeSync.await(200, TimeUnit.MILLISECONDS); |
| assertNull(deferredException.get()); |
| LOG.info("Main thread: detected that logSync about to be called."); |
| LOG.info("Trying to enter safe mode."); |
| |
| long st = Time.now(); |
| namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); |
| long et = Time.now(); |
| LOG.info("Entered safe mode after "+(et-st)+"ms"); |
| |
| // Make sure we didn't wait for the thread that did a logEdit but |
| // not logSync. Going into safemode does a logSyncAll that will flush |
| // its edit. |
| assertTrue(et - st < (BLOCK_TIME/2)*1000); |
| |
| // Once we're in safe mode, save namespace. |
| namesystem.saveNamespace(); |
| |
| LOG.info("Joining on edit thread..."); |
| doAnEditThread.join(); |
| assertNull(deferredException.get()); |
| |
| // We did 3 edits: begin, txn, and end |
| assertEquals(3, verifyEditLogs(namesystem, fsimage, |
| NNStorage.getFinalizedEditsFileName(1, 3), |
| 1)); |
| // after the save, just the one "begin" |
| assertEquals(1, verifyEditLogs(namesystem, fsimage, |
| NNStorage.getInProgressEditsFileName(4), |
| 4)); |
| } finally { |
| LOG.info("Closing nn"); |
| if(namesystem != null) namesystem.close(); |
| } |
| } |
| |
| @Test(timeout=180000) |
| public void testDeadlock() throws Throwable { |
| GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO); |
| |
| Configuration conf = getConf(); |
| NameNode.initMetrics(conf, NamenodeRole.NAMENODE); |
| DFSTestUtil.formatNameNode(conf); |
| final FSNamesystem namesystem = FSNamesystem.loadFromDisk(conf); |
| |
| final AtomicBoolean done = new AtomicBoolean(false); |
| final Semaphore blockerSemaphore = new Semaphore(0); |
| final CountDownLatch startSpamLatch = new CountDownLatch(1); |
| |
| ExecutorService executor = Executors.newCachedThreadPool(); |
| try { |
| final FSEditLog editLog = namesystem.getEditLog(); |
| |
| FSEditLogOp.OpInstanceCache cache = editLog.cache.get(); |
| final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache) |
| .setSource("/").setUser("u").setGroup("g"); |
| // don't reset fields so instance can be reused. |
| final FSEditLogOp reuseOp = Mockito.spy(op); |
| Mockito.doNothing().when(reuseOp).reset(); |
| |
| // only job is spam edits. it will fill the queue when the test |
| // loop injects the blockingOp. |
| Future[] logSpammers = new Future[16]; |
| for (int i=0; i < logSpammers.length; i++) { |
| final int ii = i; |
| logSpammers[i] = executor.submit(new Callable() { |
| @Override |
| public Void call() throws Exception { |
| Thread.currentThread().setName("Log spammer " + ii); |
| // wait until a blocking edit op notifies us to go. |
| startSpamLatch.await(); |
| for (int i = 0; !done.get() && i < 1000000; i++) { |
| // do not logSync here because we need to congest the queue. |
| editLog.logEdit(reuseOp); |
| if (i % 2048 == 0) { |
| LOG.info("thread[" + ii +"] edits=" + i); |
| } |
| } |
| assertTrue("too many edits", done.get()); |
| return null; |
| } |
| }); |
| } |
| |
| // the tx id is set while the edit log monitor is held, so this will |
| // effectively stall the async processing thread which will cause the |
| // edit queue to fill up. |
| final FSEditLogOp blockingOp = Mockito.spy(op); |
| doAnswer( |
| new Answer<Void>() { |
| @Override |
| public Void answer(InvocationOnMock invocation) throws Throwable { |
| // flip the latch to unleash the spamming threads to congest |
| // the queue. |
| startSpamLatch.countDown(); |
| // wait until unblocked after a synchronized thread is started. |
| blockerSemaphore.acquire(); |
| invocation.callRealMethod(); |
| return null; |
| } |
| } |
| ).when(blockingOp).setTransactionId(Mockito.anyLong()); |
| // don't reset fields so instance can be reused. |
| Mockito.doNothing().when(blockingOp).reset(); |
| |
| // repeatedly overflow the queue and verify it doesn't deadlock. |
| for (int i = 0; i < 8; i++) { |
| // when the blockingOp is logged, it triggers the latch to unleash the |
| // spammers to overflow the edit queue, then waits for a permit |
| // from blockerSemaphore that will be released at the bottom of |
| // this loop. |
| Future blockingEdit = executor.submit(new Callable() { |
| @Override |
| public Void call() throws Exception { |
| Thread.currentThread().setName("Log blocker"); |
| editLog.logEdit(blockingOp); |
| editLog.logSync(); |
| return null; |
| } |
| }); |
| |
| // wait for spammers to seize up the edit log. |
| final long startTxId = editLog.getLastWrittenTxIdWithoutLock(); |
| final long[] txIds = { startTxId, startTxId, startTxId }; |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| txIds[0] = txIds[1]; |
| txIds[1] = txIds[2]; |
| txIds[2] = editLog.getLastWrittenTxIdWithoutLock(); |
| return (txIds[0] == txIds[1] && |
| txIds[1] == txIds[2] && |
| txIds[2] > startTxId); |
| } |
| }, 100, 10000); |
| |
| // callers that synchronize on the edit log while the queue is full |
| // are prone to deadlock if the locking is incorrect. at this point: |
| // 1. the blocking edit is holding the log's monitor. |
| // 2. the spammers have filled the queue. |
| // 3. the spammers are blocked waiting to queue another edit. |
| // Now we'll start another thread to synchronize on the log (simulates |
| // what log rolling does), unblock the op currently holding the |
| // monitor, and ensure deadlock does not occur. |
| final CountDownLatch readyLatch = new CountDownLatch(1); |
| Future synchedEdits = executor.submit(new Callable() { |
| @Override |
| public Void call() throws Exception { |
| Thread.currentThread().setName("Log synchronizer"); |
| // the sync is CRUCIAL for this test. it's what causes edit |
| // log rolling to deadlock when queue is full. |
| readyLatch.countDown(); |
| synchronized (editLog) { |
| editLog.logEdit(reuseOp); |
| editLog.logSync(); |
| } |
| return null; |
| } |
| }); |
| // unblock the edit jammed in setting its txid. queued edits should |
| // start flowing and the synced edits should complete. |
| readyLatch.await(); |
| blockerSemaphore.release(); |
| blockingEdit.get(); |
| synchedEdits.get(); |
| } |
| |
| // tell spammers to stop. |
| done.set(true); |
| for (int i=0; i < logSpammers.length; i++) { |
| logSpammers[i].get(); |
| } |
| // just make sure everything can be synced. |
| editLog.logSyncAll(); |
| } finally { |
| LOG.info("Closing nn"); |
| executor.shutdownNow(); |
| if (namesystem != null) { |
| namesystem.getFSImage().getStorage().close(); |
| namesystem.close(); |
| } |
| } |
| } |
| } |