| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode; |
| |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.doNothing; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.BufferedInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.channels.FileChannel; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.SortedMap; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.StripedFileTestUtil; |
| import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; |
| import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; |
| import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; |
| import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; |
| import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.erasurecode.ECSchema; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.test.GenericTestUtils.LogCapturer; |
| import org.apache.hadoop.test.PathUtils; |
| import org.apache.hadoop.util.FakeTimer; |
| import org.slf4j.event.Level; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.Parameters; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; |
| import org.apache.hadoop.thirdparty.com.google.common.io.Files; |
| |
| @RunWith(Parameterized.class) |
| public class TestFSEditLogLoader { |
| @Parameters |
| public static Collection<Object[]> data() { |
| Collection<Object[]> params = new ArrayList<Object[]>(); |
| params.add(new Object[]{ Boolean.FALSE }); |
| params.add(new Object[]{ Boolean.TRUE }); |
| return params; |
| } |
| |
| private static boolean useAsyncEditLog; |
| public TestFSEditLogLoader(Boolean async) { |
| useAsyncEditLog = async; |
| } |
| |
| private static Configuration getConf() { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, |
| useAsyncEditLog); |
| return conf; |
| } |
| |
| static { |
| GenericTestUtils.setLogLevel(FSImage.LOG, Level.TRACE); |
| GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.TRACE); |
| } |
| |
| private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class); |
| |
| private static final int NUM_DATA_NODES = 0; |
| private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM"; |
| |
| private final ErasureCodingPolicy testECPolicy |
| = StripedFileTestUtil.getDefaultECPolicy(); |
| |
| @Test |
| public void testDisplayRecentEditLogOpCodes() throws IOException { |
| // start a cluster |
| Configuration conf = getConf(); |
| MiniDFSCluster cluster = null; |
| FileSystem fileSys = null; |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) |
| .enableManagedDfsDirsRedundancy(false).build(); |
| cluster.waitActive(); |
| fileSys = cluster.getFileSystem(); |
| final FSNamesystem namesystem = cluster.getNamesystem(); |
| |
| FSImage fsimage = namesystem.getFSImage(); |
| for (int i = 0; i < 20; i++) { |
| fileSys.mkdirs(new Path("/tmp/tmp" + i)); |
| } |
| StorageDirectory sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next(); |
| cluster.shutdown(); |
| |
| File editFile = FSImageTestUtil.findLatestEditsLog(sd).getFile(); |
| assertTrue("Should exist: " + editFile, editFile.exists()); |
| |
| // Corrupt the edits file. |
| long fileLen = editFile.length(); |
| RandomAccessFile rwf = new RandomAccessFile(editFile, "rw"); |
| rwf.seek(fileLen - 40); |
| for (int i = 0; i < 20; i++) { |
| rwf.write(FSEditLogOpCodes.OP_DELETE.getOpCode()); |
| } |
| rwf.close(); |
| |
| StringBuilder bld = new StringBuilder(); |
| bld.append("^Error replaying edit log at offset \\d+. "); |
| bld.append("Expected transaction ID was \\d+\n"); |
| bld.append("Recent opcode offsets: (\\d+\\s*){4}$"); |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) |
| .enableManagedDfsDirsRedundancy(false).format(false).build(); |
| fail("should not be able to start"); |
| } catch (IOException e) { |
| assertTrue("error message contains opcodes message", |
| e.getMessage().matches(bld.toString())); |
| } |
| } |
| |
| /** |
| * Test that, if the NN restarts with a new minimum replication, |
| * any files created with the old replication count will get |
| * automatically bumped up to the new minimum upon restart. |
| */ |
| @Test |
| public void testReplicationAdjusted() throws Exception { |
| // start a cluster |
| Configuration conf = getConf(); |
| // Replicate and heartbeat fast to shave a few seconds off test |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); |
| conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); |
| |
| MiniDFSCluster cluster = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) |
| .build(); |
| cluster.waitActive(); |
| FileSystem fs = cluster.getFileSystem(); |
| |
| // Create a file with replication count 1 |
| Path p = new Path("/testfile"); |
| DFSTestUtil.createFile(fs, p, 10, /*repl*/ (short)1, 1); |
| DFSTestUtil.waitReplication(fs, p, (short)1); |
| |
| // Shut down and restart cluster with new minimum replication of 2 |
| cluster.shutdown(); |
| cluster = null; |
| |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, 2); |
| |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) |
| .format(false).build(); |
| cluster.waitActive(); |
| fs = cluster.getFileSystem(); |
| |
| // The file should get adjusted to replication 2 when |
| // the edit log is replayed. |
| DFSTestUtil.waitReplication(fs, p, (short)2); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Corrupt the byte at the given offset in the given file, |
| * by subtracting 1 from it. |
| */ |
| private void corruptByteInFile(File file, long offset) |
| throws IOException { |
| RandomAccessFile raf = new RandomAccessFile(file, "rw"); |
| try { |
| raf.seek(offset); |
| int origByte = raf.read(); |
| raf.seek(offset); |
| raf.writeByte(origByte - 1); |
| } finally { |
| IOUtils.closeStream(raf); |
| } |
| } |
| |
| /** |
| * Truncate the given file to the given length |
| */ |
| private void truncateFile(File logFile, long newLength) |
| throws IOException { |
| RandomAccessFile raf = new RandomAccessFile(logFile, "rw"); |
| raf.setLength(newLength); |
| raf.close(); |
| } |
| |
| /** |
| * Return the length of bytes in the given file after subtracting |
| * the trailer of 0xFF (OP_INVALID)s. |
| * This seeks to the end of the file and reads chunks backwards until |
| * it finds a non-0xFF byte. |
| * @throws IOException if the file cannot be read |
| */ |
| private static long getNonTrailerLength(File f) throws IOException { |
| final int chunkSizeToRead = 256*1024; |
| FileInputStream fis = new FileInputStream(f); |
| try { |
| |
| byte buf[] = new byte[chunkSizeToRead]; |
| |
| FileChannel fc = fis.getChannel(); |
| long size = fc.size(); |
| long pos = size - (size % chunkSizeToRead); |
| |
| while (pos >= 0) { |
| fc.position(pos); |
| |
| int readLen = (int) Math.min(size - pos, chunkSizeToRead); |
| IOUtils.readFully(fis, buf, 0, readLen); |
| for (int i = readLen - 1; i >= 0; i--) { |
| if (buf[i] != FSEditLogOpCodes.OP_INVALID.getOpCode()) { |
| return pos + i + 1; // + 1 since we count this byte! |
| } |
| } |
| |
| pos -= chunkSizeToRead; |
| } |
| return 0; |
| } finally { |
| fis.close(); |
| } |
| } |
| |
| @Test |
| public void testStreamLimiter() throws IOException { |
| final File LIMITER_TEST_FILE = new File(TEST_DIR, "limiter.test"); |
| |
| FileOutputStream fos = new FileOutputStream(LIMITER_TEST_FILE); |
| try { |
| fos.write(0x12); |
| fos.write(0x12); |
| fos.write(0x12); |
| } finally { |
| fos.close(); |
| } |
| |
| FileInputStream fin = new FileInputStream(LIMITER_TEST_FILE); |
| BufferedInputStream bin = new BufferedInputStream(fin); |
| FSEditLogLoader.PositionTrackingInputStream tracker = |
| new FSEditLogLoader.PositionTrackingInputStream(bin); |
| try { |
| tracker.setLimit(2); |
| tracker.mark(100); |
| tracker.read(); |
| tracker.read(); |
| try { |
| tracker.read(); |
| fail("expected to get IOException after reading past the limit"); |
| } catch (IOException e) { |
| } |
| tracker.reset(); |
| tracker.mark(100); |
| byte arr[] = new byte[3]; |
| try { |
| tracker.read(arr); |
| fail("expected to get IOException after reading past the limit"); |
| } catch (IOException e) { |
| } |
| tracker.reset(); |
| arr = new byte[2]; |
| tracker.read(arr); |
| } finally { |
| tracker.close(); |
| } |
| } |
| |
| /** |
| * Create an unfinalized edit log for testing purposes |
| * |
| * @param testDir Directory to create the edit log in |
| * @param numTx Number of transactions to add to the new edit log |
| * @param offsetToTxId A map from transaction IDs to offsets in the |
| * edit log file. |
| * @return The new edit log file name. |
| * @throws IOException |
| */ |
| static private File prepareUnfinalizedTestEditLog(File testDir, int numTx, |
| SortedMap<Long, Long> offsetToTxId) throws IOException { |
| File inProgressFile = new File(testDir, NNStorage.getInProgressEditsFileName(1)); |
| FSEditLog fsel = null, spyLog = null; |
| try { |
| fsel = FSImageTestUtil.createStandaloneEditLog(testDir); |
| spyLog = spy(fsel); |
| // Normally, the in-progress edit log would be finalized by |
| // FSEditLog#endCurrentLogSegment. For testing purposes, we |
| // disable that here. |
| doNothing().when(spyLog).endCurrentLogSegment(true); |
| spyLog.openForWrite(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); |
| assertTrue("should exist: " + inProgressFile, inProgressFile.exists()); |
| |
| for (int i = 0; i < numTx; i++) { |
| long trueOffset = getNonTrailerLength(inProgressFile); |
| long thisTxId = spyLog.getLastWrittenTxId() + 1; |
| offsetToTxId.put(trueOffset, thisTxId); |
| System.err.println("txid " + thisTxId + " at offset " + trueOffset); |
| spyLog.logDelete("path" + i, i, false); |
| spyLog.logSync(); |
| } |
| } finally { |
| if (spyLog != null) { |
| spyLog.close(); |
| } else if (fsel != null) { |
| fsel.close(); |
| } |
| } |
| return inProgressFile; |
| } |
| |
| @Test |
| public void testValidateEditLogWithCorruptHeader() throws IOException { |
| File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptHeader"); |
| SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap(); |
| File logFile = prepareUnfinalizedTestEditLog(testDir, 2, offsetToTxId); |
| RandomAccessFile rwf = new RandomAccessFile(logFile, "rw"); |
| try { |
| rwf.seek(0); |
| rwf.writeLong(42); // corrupt header |
| } finally { |
| rwf.close(); |
| } |
| EditLogValidation validation = |
| EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true); |
| assertTrue(validation.hasCorruptHeader()); |
| } |
| |
| @Test |
| public void testValidateEditLogWithCorruptBody() throws IOException { |
| File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptBody"); |
| SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap(); |
| final int NUM_TXNS = 20; |
| File logFile = prepareUnfinalizedTestEditLog(testDir, NUM_TXNS, |
| offsetToTxId); |
| // Back up the uncorrupted log |
| File logFileBak = new File(testDir, logFile.getName() + ".bak"); |
| Files.copy(logFile, logFileBak); |
| EditLogValidation validation = |
| EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true); |
| assertTrue(!validation.hasCorruptHeader()); |
| // We expect that there will be an OP_START_LOG_SEGMENT, followed by |
| // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT. |
| assertEquals(NUM_TXNS + 1, validation.getEndTxId()); |
| // Corrupt each edit and verify that validation continues to work |
| for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) { |
| long txOffset = entry.getKey(); |
| long txId = entry.getValue(); |
| |
| // Restore backup, corrupt the txn opcode |
| Files.copy(logFileBak, logFile); |
| corruptByteInFile(logFile, txOffset); |
| validation = EditLogFileInputStream.scanEditLog(logFile, |
| Long.MAX_VALUE, true); |
| long expectedEndTxId = (txId == (NUM_TXNS + 1)) ? |
| NUM_TXNS : (NUM_TXNS + 1); |
| assertEquals("Failed when corrupting txn opcode at " + txOffset, |
| expectedEndTxId, validation.getEndTxId()); |
| assertTrue(!validation.hasCorruptHeader()); |
| } |
| |
| // Truncate right before each edit and verify that validation continues |
| // to work |
| for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) { |
| long txOffset = entry.getKey(); |
| long txId = entry.getValue(); |
| |
| // Restore backup, corrupt the txn opcode |
| Files.copy(logFileBak, logFile); |
| truncateFile(logFile, txOffset); |
| validation = EditLogFileInputStream.scanEditLog(logFile, |
| Long.MAX_VALUE, true); |
| long expectedEndTxId = (txId == 0) ? |
| HdfsServerConstants.INVALID_TXID : (txId - 1); |
| assertEquals("Failed when corrupting txid " + txId + " txn opcode " + |
| "at " + txOffset, expectedEndTxId, validation.getEndTxId()); |
| assertTrue(!validation.hasCorruptHeader()); |
| } |
| } |
| |
| @Test |
| public void testValidateEmptyEditLog() throws IOException { |
| File testDir = new File(TEST_DIR, "testValidateEmptyEditLog"); |
| SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap(); |
| File logFile = prepareUnfinalizedTestEditLog(testDir, 0, offsetToTxId); |
| // Truncate the file so that there is nothing except the header and |
| // layout flags section. |
| truncateFile(logFile, 8); |
| EditLogValidation validation = |
| EditLogFileInputStream.scanEditLog(logFile, Long.MAX_VALUE, true); |
| assertTrue(!validation.hasCorruptHeader()); |
| assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId()); |
| } |
| |
| private static final Map<Byte, FSEditLogOpCodes> byteToEnum = |
| new HashMap<Byte, FSEditLogOpCodes>(); |
| static { |
| for(FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) { |
| byteToEnum.put(opCode.getOpCode(), opCode); |
| } |
| } |
| |
| private static FSEditLogOpCodes fromByte(byte opCode) { |
| return byteToEnum.get(opCode); |
| } |
| |
| @Test |
| public void testFSEditLogOpCodes() throws IOException { |
| //try all codes |
| for(FSEditLogOpCodes c : FSEditLogOpCodes.values()) { |
| final byte code = c.getOpCode(); |
| assertEquals("c=" + c + ", code=" + code, |
| c, FSEditLogOpCodes.fromByte(code)); |
| } |
| |
| //try all byte values |
| for(int b = 0; b < (1 << Byte.SIZE); b++) { |
| final byte code = (byte)b; |
| assertEquals("b=" + b + ", code=" + code, |
| fromByte(code), FSEditLogOpCodes.fromByte(code)); |
| } |
| } |
| |
| @Test |
| public void testAddNewStripedBlock() throws IOException{ |
| // start a cluster |
| Configuration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) |
| .build(); |
| cluster.waitActive(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| FSNamesystem fns = cluster.getNamesystem(); |
| fs.enableErasureCodingPolicy(testECPolicy.getName()); |
| |
| String testDir = "/ec"; |
| String testFile = "testfile_001"; |
| String testFilePath = testDir + "/" + testFile; |
| String clientName = "testUser1"; |
| String clientMachine = "testMachine1"; |
| long blkId = 1; |
| long blkNumBytes = 1024; |
| long timestamp = 1426222918; |
| short blockNum = (short) testECPolicy.getNumDataUnits(); |
| short parityNum = (short) testECPolicy.getNumParityUnits(); |
| |
| //set the storage policy of the directory |
| fs.mkdir(new Path(testDir), new FsPermission("755")); |
| fs.getClient().getNamenode().setErasureCodingPolicy( |
| testDir, testECPolicy.getName()); |
| |
| // Create a file with striped block |
| Path p = new Path(testFilePath); |
| DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); |
| |
| fns.enterSafeMode(false); |
| fns.saveNamespace(0, 0); |
| fns.leaveSafeMode(false); |
| |
| // Add a striped block to the file |
| BlockInfoStriped stripedBlk = new BlockInfoStriped( |
| new Block(blkId, blkNumBytes, timestamp), testECPolicy); |
| INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); |
| file.toUnderConstruction(clientName, clientMachine); |
| file.addBlock(stripedBlk); |
| fns.getEditLog().logAddBlock(testFilePath, file); |
| TestINodeFile.toCompleteFile(file); |
| |
| //If the block by loaded is the same as above it means that |
| //we have successfully applied the edit log to the fsimage. |
| cluster.restartNameNodes(); |
| cluster.waitActive(); |
| fns = cluster.getNamesystem(); |
| |
| INodeFile inodeLoaded = (INodeFile)fns.getFSDirectory() |
| .getINode(testFilePath); |
| |
| assertTrue(inodeLoaded.isStriped()); |
| |
| BlockInfo[] blks = inodeLoaded.getBlocks(); |
| assertEquals(1, blks.length); |
| assertEquals(blkId, blks[0].getBlockId()); |
| assertEquals(blkNumBytes, blks[0].getNumBytes()); |
| assertEquals(timestamp, blks[0].getGenerationStamp()); |
| assertEquals(blockNum, ((BlockInfoStriped)blks[0]).getDataBlockNum()); |
| assertEquals(parityNum, ((BlockInfoStriped)blks[0]).getParityBlockNum()); |
| |
| cluster.shutdown(); |
| cluster = null; |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testUpdateStripedBlocks() throws IOException{ |
| // start a cluster |
| Configuration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) |
| .build(); |
| cluster.waitActive(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| FSNamesystem fns = cluster.getNamesystem(); |
| fs.enableErasureCodingPolicy(testECPolicy.getName()); |
| |
| String testDir = "/ec"; |
| String testFile = "testfile_002"; |
| String testFilePath = testDir + "/" + testFile; |
| String clientName = "testUser2"; |
| String clientMachine = "testMachine2"; |
| long blkId = 1; |
| long blkNumBytes = 1024; |
| long timestamp = 1426222918; |
| short blockNum = (short) testECPolicy.getNumDataUnits(); |
| short parityNum = (short) testECPolicy.getNumParityUnits(); |
| |
| //set the storage policy of the directory |
| fs.mkdir(new Path(testDir), new FsPermission("755")); |
| fs.getClient().getNamenode().setErasureCodingPolicy( |
| testDir, testECPolicy.getName()); |
| |
| //create a file with striped blocks |
| Path p = new Path(testFilePath); |
| DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); |
| BlockInfoStriped stripedBlk = new BlockInfoStriped( |
| new Block(blkId, blkNumBytes, timestamp), testECPolicy); |
| INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); |
| file.toUnderConstruction(clientName, clientMachine); |
| file.addBlock(stripedBlk); |
| fns.getEditLog().logAddBlock(testFilePath, file); |
| TestINodeFile.toCompleteFile(file); |
| fns.enterSafeMode(false); |
| fns.saveNamespace(0, 0); |
| fns.leaveSafeMode(false); |
| |
| //update the last block |
| long newBlkNumBytes = 1024*8; |
| long newTimestamp = 1426222918+3600; |
| file.toUnderConstruction(clientName, clientMachine); |
| file.getLastBlock().setNumBytes(newBlkNumBytes); |
| file.getLastBlock().setGenerationStamp(newTimestamp); |
| fns.getEditLog().logUpdateBlocks(testFilePath, file, true); |
| TestINodeFile.toCompleteFile(file); |
| |
| //After the namenode restarts if the block by loaded is the same as above |
| //(new block size and timestamp) it means that we have successfully |
| //applied the edit log to the fsimage. |
| cluster.restartNameNodes(); |
| cluster.waitActive(); |
| fns = cluster.getNamesystem(); |
| |
| INodeFile inodeLoaded = (INodeFile)fns.getFSDirectory() |
| .getINode(testFilePath); |
| |
| assertTrue(inodeLoaded.isStriped()); |
| |
| BlockInfo[] blks = inodeLoaded.getBlocks(); |
| assertEquals(1, blks.length); |
| assertTrue(blks[0].isStriped()); |
| assertEquals(blkId, blks[0].getBlockId()); |
| assertEquals(newBlkNumBytes, blks[0].getNumBytes()); |
| assertEquals(newTimestamp, blks[0].getGenerationStamp()); |
| assertEquals(blockNum, ((BlockInfoStriped)blks[0]).getDataBlockNum()); |
| assertEquals(parityNum, ((BlockInfoStriped)blks[0]).getParityBlockNum()); |
| |
| cluster.shutdown(); |
| cluster = null; |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testHasNonEcBlockUsingStripedIDForAddBlock() throws IOException{ |
| // start a cluster |
| Configuration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) |
| .build(); |
| cluster.waitActive(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| FSNamesystem fns = cluster.getNamesystem(); |
| |
| String testDir = "/test_block_manager"; |
| String testFile = "testfile_addblock"; |
| String testFilePath = testDir + "/" + testFile; |
| String clientName = "testUser_addblock"; |
| String clientMachine = "testMachine_addblock"; |
| long blkId = -1; |
| long blkNumBytes = 1024; |
| long timestamp = 1426222918; |
| |
| fs.mkdir(new Path(testDir), new FsPermission("755")); |
| Path p = new Path(testFilePath); |
| |
| //check whether the hasNonEcBlockUsingStripedID is set |
| //after loading a addblock-editlog |
| DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); |
| BlockInfoContiguous cBlk = new BlockInfoContiguous( |
| new Block(blkId, blkNumBytes, timestamp), (short)3); |
| INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); |
| file.toUnderConstruction(clientName, clientMachine); |
| file.addBlock(cBlk); |
| fns.getEditLog().logAddBlock(testFilePath, file); |
| TestINodeFile.toCompleteFile(file); |
| cluster.restartNameNodes(); |
| cluster.waitActive(); |
| fns = cluster.getNamesystem(); |
| assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID()); |
| |
| cluster.shutdown(); |
| cluster = null; |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testHasNonEcBlockUsingStripedIDForUpdateBlocks() |
| throws IOException{ |
| // start a cluster |
| Configuration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) |
| .build(); |
| cluster.waitActive(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| FSNamesystem fns = cluster.getNamesystem(); |
| |
| String testDir = "/test_block_manager"; |
| String testFile = "testfile_002"; |
| String testFilePath = testDir + "/" + testFile; |
| String clientName = "testUser2"; |
| String clientMachine = "testMachine1"; |
| long blkId = 100; |
| long blkNumBytes = 1024; |
| long timestamp = 1426222918; |
| |
| fs.mkdir(new Path(testDir), new FsPermission("755")); |
| Path p = new Path(testFilePath); |
| |
| DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); |
| BlockInfoContiguous cBlk = new BlockInfoContiguous( |
| new Block(blkId, blkNumBytes, timestamp), (short)3); |
| INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); |
| file.toUnderConstruction(clientName, clientMachine); |
| file.addBlock(cBlk); |
| TestINodeFile.toCompleteFile(file); |
| |
| long newBlkNumBytes = 1024*8; |
| long newTimestamp = 1426222918+3600; |
| file.toUnderConstruction(clientName, clientMachine); |
| file.getLastBlock().setBlockId(-100); |
| file.getLastBlock().setNumBytes(newBlkNumBytes); |
| file.getLastBlock().setGenerationStamp(newTimestamp); |
| fns.getEditLog().logUpdateBlocks(testFilePath, file, true); |
| TestINodeFile.toCompleteFile(file); |
| cluster.restartNameNodes(); |
| cluster.waitActive(); |
| fns = cluster.getNamesystem(); |
| assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID()); |
| |
| cluster.shutdown(); |
| cluster = null; |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testErasureCodingPolicyOperations() throws IOException { |
| // start a cluster |
| Configuration conf = new HdfsConfiguration(); |
| final int blockSize = 16 * 1024; |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); |
| MiniDFSCluster cluster = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) |
| .build(); |
| cluster.waitActive(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| |
| // 1. add new policy |
| ECSchema schema = new ECSchema("rs", 5, 3); |
| int cellSize = 2 * 1024; |
| ErasureCodingPolicy newPolicy = |
| new ErasureCodingPolicy(schema, cellSize, (byte) 0); |
| ErasureCodingPolicy[] policyArray = new ErasureCodingPolicy[]{newPolicy}; |
| AddErasureCodingPolicyResponse[] responses = |
| fs.addErasureCodingPolicies(policyArray); |
| assertEquals(1, responses.length); |
| assertTrue(responses[0].isSucceed()); |
| newPolicy = responses[0].getPolicy(); |
| |
| // Restart NameNode without saving namespace |
| cluster.restartNameNodes(); |
| cluster.waitActive(); |
| |
| // check if new policy is reapplied through edit log |
| ErasureCodingPolicy ecPolicy = |
| ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId()); |
| assertEquals(ErasureCodingPolicyState.DISABLED, |
| DFSTestUtil.getECPolicyState(ecPolicy)); |
| |
| // 2. enable policy |
| fs.enableErasureCodingPolicy(newPolicy.getName()); |
| cluster.restartNameNodes(); |
| cluster.waitActive(); |
| ecPolicy = |
| ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId()); |
| assertEquals(ErasureCodingPolicyState.ENABLED, |
| DFSTestUtil.getECPolicyState(ecPolicy)); |
| |
| // create a new file, use the policy |
| final Path dirPath = new Path("/striped"); |
| final Path filePath = new Path(dirPath, "file"); |
| final int fileLength = blockSize * newPolicy.getNumDataUnits(); |
| fs.mkdirs(dirPath); |
| fs.setErasureCodingPolicy(dirPath, newPolicy.getName()); |
| final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); |
| DFSTestUtil.writeFile(fs, filePath, bytes); |
| |
| // 3. disable policy |
| fs.disableErasureCodingPolicy(newPolicy.getName()); |
| cluster.restartNameNodes(); |
| cluster.waitActive(); |
| ecPolicy = |
| ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId()); |
| assertEquals(ErasureCodingPolicyState.DISABLED, |
| DFSTestUtil.getECPolicyState(ecPolicy)); |
| // read file |
| DFSTestUtil.readFileAsBytes(fs, filePath); |
| |
| // 4. remove policy |
| fs.removeErasureCodingPolicy(newPolicy.getName()); |
| cluster.restartNameNodes(); |
| cluster.waitActive(); |
| ecPolicy = |
| ErasureCodingPolicyManager.getInstance().getByID(newPolicy.getId()); |
| assertEquals(ErasureCodingPolicyState.REMOVED, |
| DFSTestUtil.getECPolicyState(ecPolicy)); |
| // read file |
| DFSTestUtil.readFileAsBytes(fs, filePath); |
| |
| cluster.shutdown(); |
| cluster = null; |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test |
| public void testLoadFSEditLogThrottling() throws Exception { |
| FSNamesystem namesystem = mock(FSNamesystem.class); |
| namesystem.dir = mock(FSDirectory.class); |
| |
| FakeTimer timer = new FakeTimer(); |
| FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer); |
| FSEditLogLoader.LOAD_EDITS_LOG_HELPER.reset(); |
| |
| LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG); |
| loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1); |
| assertTrue(capture.getOutput().contains("Start loading edits file " + |
| FAKE_EDIT_STREAM_NAME)); |
| assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)")); |
| assertFalse(capture.getOutput().contains("suppressed")); |
| |
| timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2); |
| capture.clearOutput(); |
| loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11); |
| assertFalse(capture.getOutput().contains("Start loading edits file")); |
| assertFalse(capture.getOutput().contains("edits file(s)")); |
| |
| timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS); |
| capture.clearOutput(); |
| loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21); |
| assertTrue(capture.getOutput().contains("Start loading edits file " + |
| FAKE_EDIT_STREAM_NAME)); |
| assertTrue(capture.getOutput().contains("suppressed logging 1 times")); |
| assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)")); |
| assertTrue(capture.getOutput().contains("total size 2.0")); |
| } |
| |
| private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx) |
| throws IOException { |
| EditLogInputStream fakeStream = mock(EditLogInputStream.class); |
| when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME); |
| when(fakeStream.getFirstTxId()).thenReturn(startTx); |
| when(fakeStream.getLastTxId()).thenReturn(endTx); |
| when(fakeStream.length()).thenReturn(1L); |
| return fakeStream; |
| } |
| |
| } |