| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.dfs; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.Random; |
| |
| import org.apache.commons.logging.impl.Log4JLogger; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.BlockLocation; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.log4j.Level; |
| |
| |
| /** |
| * This class tests that a file need not be closed before its |
| * data can be read by another client. |
| */ |
| public class TestFileCreation extends junit.framework.TestCase { |
| static final String DIR = "/" + TestFileCreation.class.getSimpleName() + "/"; |
| |
| { |
| //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); |
| ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); |
| ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); |
| } |
| |
| static final long seed = 0xDEADBEEFL; |
| static final int blockSize = 8192; |
| static final int numBlocks = 2; |
| static final int fileSize = numBlocks * blockSize + 1; |
| boolean simulatedStorage = false; |
| |
| // The test file is 2 times the blocksize plus one. This means that when the |
| // entire file is written, the first two blocks definitely get flushed to |
| // the datanodes. |
| |
| // creates a file but does not close it |
| static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) |
| throws IOException { |
| System.out.println("createFile: Created " + name + " with " + repl + " replica."); |
| FSDataOutputStream stm = fileSys.create(name, true, |
| fileSys.getConf().getInt("io.file.buffer.size", 4096), |
| (short)repl, (long)blockSize); |
| return stm; |
| } |
| |
| // |
| // writes to file but does not close it |
| // |
| private void writeFile(FSDataOutputStream stm) throws IOException { |
| byte[] buffer = new byte[fileSize]; |
| Random rand = new Random(seed); |
| rand.nextBytes(buffer); |
| stm.write(buffer); |
| } |
| |
| // |
| // writes specified bytes to file. |
| // |
| static void writeFile(FSDataOutputStream stm, int size) throws IOException { |
| byte[] buffer = new byte[fileSize]; |
| Random rand = new Random(seed); |
| rand.nextBytes(buffer); |
| stm.write(buffer, 0, size); |
| } |
| |
| // |
| // verify that the data written to the full blocks are sane |
| // |
| private void checkFile(FileSystem fileSys, Path name, int repl) |
| throws IOException { |
| boolean done = false; |
| |
| // wait till all full blocks are confirmed by the datanodes. |
| while (!done) { |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) {} |
| done = true; |
| BlockLocation[] locations = fileSys.getFileBlockLocations(name, 0, |
| fileSize); |
| if (locations.length < numBlocks) { |
| done = false; |
| continue; |
| } |
| for (int idx = 0; idx < locations.length; idx++) { |
| if (locations[idx].getHosts().length < repl) { |
| done = false; |
| break; |
| } |
| } |
| } |
| FSDataInputStream stm = fileSys.open(name); |
| byte[] expected = new byte[numBlocks * blockSize]; |
| if (simulatedStorage) { |
| for (int i= 0; i < expected.length; i++) { |
| expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE; |
| } |
| } else { |
| Random rand = new Random(seed); |
| rand.nextBytes(expected); |
| } |
| // do a sanity check. Read the file |
| byte[] actual = new byte[numBlocks * blockSize]; |
| stm.readFully(0, actual); |
| checkData(actual, 0, expected, "Read 1"); |
| } |
| |
| private void checkData(byte[] actual, int from, byte[] expected, String message) { |
| for (int idx = 0; idx < actual.length; idx++) { |
| assertEquals(message+" byte "+(from+idx)+" differs. expected "+ |
| expected[from+idx]+" actual "+actual[idx], |
| expected[from+idx], actual[idx]); |
| actual[idx] = 0; |
| } |
| } |
| |
| /** |
| * Test that file data becomes available before file is closed. |
| */ |
| public void testFileCreation() throws IOException { |
| Configuration conf = new Configuration(); |
| if (simulatedStorage) { |
| conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
| } |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
| FileSystem fs = cluster.getFileSystem(); |
| try { |
| |
| // |
| // check that / exists |
| // |
| Path path = new Path("/"); |
| System.out.println("Path : \"" + path.toString() + "\""); |
| System.out.println(fs.getFileStatus(path).isDir()); |
| assertTrue("/ should be a directory", |
| fs.getFileStatus(path).isDir() == true); |
| |
| // create a new file in home directory. Do not close it. |
| // |
| Path file1 = new Path("filestatus.dat"); |
| FSDataOutputStream stm = createFile(fs, file1, 1); |
| |
| // verify that file exists in FS namespace |
| assertTrue(file1 + " should be a file", |
| fs.getFileStatus(file1).isDir() == false); |
| System.out.println("Path : \"" + file1 + "\""); |
| |
| // write to file |
| writeFile(stm); |
| |
| // Make sure a client can read it before it is closed. |
| checkFile(fs, file1, 1); |
| |
| // verify that file size has changed |
| long len = fs.getFileStatus(file1).getLen(); |
| assertTrue(file1 + " should be of size " + (numBlocks * blockSize) + |
| " but found to be of size " + len, |
| len == numBlocks * blockSize); |
| |
| stm.close(); |
| |
| // verify that file size has changed to the full size |
| len = fs.getFileStatus(file1).getLen(); |
| assertTrue(file1 + " should be of size " + fileSize + |
| " but found to be of size " + len, |
| len == fileSize); |
| |
| |
| // Check storage usage |
| // can't check capacities for real storage since the OS file system may be changing under us. |
| if (simulatedStorage) { |
| DataNode dn = cluster.getDataNodes().get(0); |
| assertEquals(fileSize, dn.getFSDataset().getDfsUsed()); |
| assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining()); |
| } |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test deleteOnExit |
| */ |
| public void testDeleteOnExit() throws IOException { |
| Configuration conf = new Configuration(); |
| if (simulatedStorage) { |
| conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
| } |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
| FileSystem fs = cluster.getFileSystem(); |
| FileSystem localfs = FileSystem.getLocal(conf); |
| |
| try { |
| |
| // Creates files in HDFS and local file system. |
| // |
| Path file1 = new Path("filestatus.dat"); |
| Path file2 = new Path("filestatus2.dat"); |
| Path file3 = new Path("filestatus3.dat"); |
| FSDataOutputStream stm1 = createFile(fs, file1, 1); |
| FSDataOutputStream stm2 = createFile(fs, file2, 1); |
| FSDataOutputStream stm3 = createFile(localfs, file3, 1); |
| System.out.println("DeleteOnExit: Created files."); |
| |
| // write to files and close. Purposely, do not close file2. |
| writeFile(stm1); |
| writeFile(stm3); |
| stm1.close(); |
| stm2.close(); |
| stm3.close(); |
| |
| // set delete on exit flag on files. |
| fs.deleteOnExit(file1); |
| fs.deleteOnExit(file2); |
| localfs.deleteOnExit(file3); |
| |
| // close the file system. This should make the above files |
| // disappear. |
| fs.close(); |
| localfs.close(); |
| fs = null; |
| localfs = null; |
| |
| // reopen file system and verify that file does not exist. |
| fs = cluster.getFileSystem(); |
| localfs = FileSystem.getLocal(conf); |
| |
| assertTrue(file1 + " still exists inspite of deletOnExit set.", |
| !fs.exists(file1)); |
| assertTrue(file2 + " still exists inspite of deletOnExit set.", |
| !fs.exists(file2)); |
| assertTrue(file3 + " still exists inspite of deletOnExit set.", |
| !localfs.exists(file3)); |
| System.out.println("DeleteOnExit successful."); |
| |
| } finally { |
| IOUtils.closeStream(fs); |
| IOUtils.closeStream(localfs); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that file data does not become corrupted even in the face of errors. |
| */ |
| public void testFileCreationError1() throws IOException { |
| Configuration conf = new Configuration(); |
| conf.setInt("heartbeat.recheck.interval", 1000); |
| conf.setInt("dfs.heartbeat.interval", 1); |
| if (simulatedStorage) { |
| conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
| } |
| // create cluster |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
| FileSystem fs = cluster.getFileSystem(); |
| cluster.waitActive(); |
| InetSocketAddress addr = new InetSocketAddress("localhost", |
| cluster.getNameNodePort()); |
| DFSClient client = new DFSClient(addr, conf); |
| |
| try { |
| |
| // create a new file. |
| // |
| Path file1 = new Path("/filestatus.dat"); |
| FSDataOutputStream stm = createFile(fs, file1, 1); |
| |
| // verify that file exists in FS namespace |
| assertTrue(file1 + " should be a file", |
| fs.getFileStatus(file1).isDir() == false); |
| System.out.println("Path : \"" + file1 + "\""); |
| |
| // kill the datanode |
| cluster.shutdownDataNodes(); |
| |
| // wait for the datanode to be declared dead |
| while (true) { |
| DatanodeInfo[] info = client.datanodeReport( |
| FSConstants.DatanodeReportType.LIVE); |
| if (info.length == 0) { |
| break; |
| } |
| System.out.println("testFileCreationError1: waiting for datanode " + |
| " to die."); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| } |
| } |
| |
| // write 1 byte to file. |
| // This should fail because all datanodes are dead. |
| byte[] buffer = new byte[1]; |
| Random rand = new Random(seed); |
| rand.nextBytes(buffer); |
| try { |
| stm.write(buffer); |
| stm.close(); |
| } catch (Exception e) { |
| System.out.println("Encountered expected exception"); |
| } |
| |
| // verify that no blocks are associated with this file |
| // bad block allocations were cleaned up earlier. |
| LocatedBlocks locations = client.namenode.getBlockLocations( |
| file1.toString(), 0, Long.MAX_VALUE); |
| System.out.println("locations = " + locations.locatedBlockCount()); |
| assertTrue("Error blocks were not cleaned up", |
| locations.locatedBlockCount() == 0); |
| } finally { |
| cluster.shutdown(); |
| client.close(); |
| } |
| } |
| |
| /** |
| * Test that the filesystem removes the last block from a file if its |
| * lease expires. |
| */ |
| public void testFileCreationError2() throws IOException { |
| long leasePeriod = 1000; |
| System.out.println("testFileCreationError2 start"); |
| Configuration conf = new Configuration(); |
| conf.setInt("heartbeat.recheck.interval", 1000); |
| conf.setInt("dfs.heartbeat.interval", 1); |
| if (simulatedStorage) { |
| conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
| } |
| // create cluster |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
| DistributedFileSystem dfs = null; |
| try { |
| cluster.waitActive(); |
| dfs = (DistributedFileSystem)cluster.getFileSystem(); |
| DFSClient client = dfs.dfs; |
| |
| // create a new file. |
| // |
| Path file1 = new Path("/filestatus.dat"); |
| createFile(dfs, file1, 1); |
| System.out.println("testFileCreationError2: " |
| + "Created file filestatus.dat with one replicas."); |
| |
| LocatedBlocks locations = client.namenode.getBlockLocations( |
| file1.toString(), 0, Long.MAX_VALUE); |
| System.out.println("testFileCreationError2: " |
| + "The file has " + locations.locatedBlockCount() + " blocks."); |
| |
| // add another block to the file |
| LocatedBlock location = client.namenode.addBlock(file1.toString(), |
| client.clientName); |
| System.out.println("testFileCreationError2: " |
| + "Added block " + location.getBlock()); |
| |
| locations = client.namenode.getBlockLocations(file1.toString(), |
| 0, Long.MAX_VALUE); |
| int count = locations.locatedBlockCount(); |
| System.out.println("testFileCreationError2: " |
| + "The file now has " + count + " blocks."); |
| |
| // set the soft and hard limit to be 1 second so that the |
| // namenode triggers lease recovery |
| cluster.setLeasePeriod(leasePeriod, leasePeriod); |
| |
| // wait for the lease to expire |
| try { |
| Thread.sleep(5 * leasePeriod); |
| } catch (InterruptedException e) { |
| } |
| |
| // verify that the last block was synchronized. |
| locations = client.namenode.getBlockLocations(file1.toString(), |
| 0, Long.MAX_VALUE); |
| System.out.println("testFileCreationError2: " |
| + "locations = " + locations.locatedBlockCount()); |
| assertEquals(0, locations.locatedBlockCount()); |
| System.out.println("testFileCreationError2 successful"); |
| } finally { |
| IOUtils.closeStream(dfs); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that file leases are persisted across namenode restarts. |
| * This test is currently not triggered because more HDFS work is |
| * is needed to handle persistent leases. |
| */ |
| public void xxxtestFileCreationNamenodeRestart() throws IOException { |
| Configuration conf = new Configuration(); |
| final int MAX_IDLE_TIME = 2000; // 2s |
| conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME); |
| conf.setInt("heartbeat.recheck.interval", 1000); |
| conf.setInt("dfs.heartbeat.interval", 1); |
| if (simulatedStorage) { |
| conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
| } |
| |
| // create cluster |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
| FileSystem fs = null; |
| try { |
| cluster.waitActive(); |
| fs = cluster.getFileSystem(); |
| final int nnport = cluster.getNameNodePort(); |
| |
| // create a new file. |
| Path file1 = new Path("/filestatus.dat"); |
| FSDataOutputStream stm = createFile(fs, file1, 1); |
| System.out.println("testFileCreationNamenodeRestart: " |
| + "Created file " + file1); |
| |
| // write two full blocks. |
| writeFile(stm, numBlocks * blockSize); |
| stm.sync(); |
| |
| // rename file wile keeping it open. |
| Path fileRenamed = new Path("/filestatusRenamed.dat"); |
| fs.rename(file1, fileRenamed); |
| System.out.println("testFileCreationNamenodeRestart: " |
| + "Renamed file " + file1 + " to " + |
| fileRenamed); |
| file1 = fileRenamed; |
| |
| // create another new file. |
| // |
| Path file2 = new Path("/filestatus2.dat"); |
| FSDataOutputStream stm2 = createFile(fs, file2, 1); |
| System.out.println("testFileCreationNamenodeRestart: " |
| + "Created file " + file2); |
| |
| // create yet another new file with full path name. |
| // rename it while open |
| // |
| Path file3 = new Path("/user/home/fullpath.dat"); |
| FSDataOutputStream stm3 = createFile(fs, file3, 1); |
| System.out.println("testFileCreationNamenodeRestart: " |
| + "Created file " + file3); |
| Path file4 = new Path("/user/home/fullpath4.dat"); |
| FSDataOutputStream stm4 = createFile(fs, file4, 1); |
| System.out.println("testFileCreationNamenodeRestart: " |
| + "Created file " + file4); |
| |
| fs.mkdirs(new Path("/bin")); |
| fs.rename(new Path("/user/home"), new Path("/bin")); |
| Path file3new = new Path("/bin/home/fullpath.dat"); |
| System.out.println("testFileCreationNamenodeRestart: " |
| + "Renamed file " + file3 + " to " + |
| file3new); |
| Path file4new = new Path("/bin/home/fullpath4.dat"); |
| System.out.println("testFileCreationNamenodeRestart: " |
| + "Renamed file " + file4 + " to " + |
| file4new); |
| |
| // restart cluster with the same namenode port as before. |
| // This ensures that leases are persisted in fsimage. |
| cluster.shutdown(); |
| try { |
| Thread.sleep(2*MAX_IDLE_TIME); |
| } catch (InterruptedException e) { |
| } |
| cluster = new MiniDFSCluster(nnport, conf, 1, false, true, |
| null, null, null); |
| cluster.waitActive(); |
| |
| // restart cluster yet again. This triggers the code to read in |
| // persistent leases from fsimage. |
| cluster.shutdown(); |
| try { |
| Thread.sleep(5000); |
| } catch (InterruptedException e) { |
| } |
| cluster = new MiniDFSCluster(nnport, conf, 1, false, true, |
| null, null, null); |
| cluster.waitActive(); |
| fs = cluster.getFileSystem(); |
| |
| // instruct the dfsclient to use a new filename when it requests |
| // new blocks for files that were renamed. |
| DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream) |
| (stm.getWrappedStream()); |
| dfstream.setTestFilename(file1.toString()); |
| dfstream = (DFSClient.DFSOutputStream) (stm3.getWrappedStream()); |
| dfstream.setTestFilename(file3new.toString()); |
| dfstream = (DFSClient.DFSOutputStream) (stm4.getWrappedStream()); |
| dfstream.setTestFilename(file4new.toString()); |
| |
| // write 1 byte to file. This should succeed because the |
| // namenode should have persisted leases. |
| byte[] buffer = new byte[1]; |
| Random rand = new Random(seed); |
| rand.nextBytes(buffer); |
| stm.write(buffer); |
| stm.close(); |
| stm2.write(buffer); |
| stm2.close(); |
| stm3.close(); |
| stm4.close(); |
| |
| // verify that new block is associated with this file |
| DFSClient client = ((DistributedFileSystem)fs).dfs; |
| LocatedBlocks locations = client.namenode.getBlockLocations( |
| file1.toString(), 0, Long.MAX_VALUE); |
| System.out.println("locations = " + locations.locatedBlockCount()); |
| assertTrue("Error blocks were not cleaned up for file " + file1, |
| locations.locatedBlockCount() == 3); |
| |
| // verify filestatus2.dat |
| locations = client.namenode.getBlockLocations( |
| file2.toString(), 0, Long.MAX_VALUE); |
| System.out.println("locations = " + locations.locatedBlockCount()); |
| assertTrue("Error blocks were not cleaned up for file " + file2, |
| locations.locatedBlockCount() == 1); |
| } finally { |
| IOUtils.closeStream(fs); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that all open files are closed when client dies abnormally. |
| */ |
| public void testDFSClientDeath() throws IOException { |
| Configuration conf = new Configuration(); |
| System.out.println("Testing adbornal client death."); |
| if (simulatedStorage) { |
| conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
| } |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
| FileSystem fs = cluster.getFileSystem(); |
| DistributedFileSystem dfs = (DistributedFileSystem) fs; |
| DFSClient dfsclient = dfs.dfs; |
| try { |
| |
| // create a new file in home directory. Do not close it. |
| // |
| Path file1 = new Path("/clienttest.dat"); |
| FSDataOutputStream stm = createFile(fs, file1, 1); |
| System.out.println("Created file clienttest.dat"); |
| |
| // write to file |
| writeFile(stm); |
| |
| // close the dfsclient before closing the output stream. |
| // This should close all existing file. |
| dfsclient.close(); |
| |
| try { |
| fs.close(); |
| fs = null; |
| } catch (IOException e) { |
| } |
| |
| // reopen file system and verify that file exists. |
| fs = cluster.getFileSystem(); |
| assertTrue(file1 + " does not exist.", fs.exists(file1)); |
| |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that file data becomes available before file is closed. |
| */ |
| public void testFileCreationSimulated() throws IOException { |
| simulatedStorage = true; |
| testFileCreation(); |
| simulatedStorage = false; |
| } |
| |
| /** |
| * Test creating two files at the same time. |
| */ |
| public void testConcurrentFileCreation() throws IOException { |
| Configuration conf = new Configuration(); |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); |
| |
| try { |
| FileSystem fs = cluster.getFileSystem(); |
| |
| Path[] p = {new Path("/foo"), new Path("/bar")}; |
| |
| //write 2 files at the same time |
| FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])}; |
| int i = 0; |
| for(; i < 100; i++) { |
| out[0].write(i); |
| out[1].write(i); |
| } |
| out[0].close(); |
| for(; i < 200; i++) {out[1].write(i);} |
| out[1].close(); |
| |
| //verify |
| FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])}; |
| for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());} |
| for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());} |
| } finally { |
| if (cluster != null) {cluster.shutdown();} |
| } |
| } |
| |
| /** |
| * Create a file, write something, fsync but not close. |
| * Then change lease period and wait for lease recovery. |
| * Finally, read the block directly from each Datanode and verify the content. |
| */ |
| public void testLeaseExpireHardLimit() throws Exception { |
| System.out.println("testLeaseExpireHardLimit start"); |
| final long leasePeriod = 1000; |
| final int DATANODE_NUM = 3; |
| |
| Configuration conf = new Configuration(); |
| conf.setInt("heartbeat.recheck.interval", 1000); |
| conf.setInt("dfs.heartbeat.interval", 1); |
| |
| // create cluster |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null); |
| DistributedFileSystem dfs = null; |
| try { |
| cluster.waitActive(); |
| dfs = (DistributedFileSystem)cluster.getFileSystem(); |
| |
| // create a new file. |
| final String f = DIR + "foo"; |
| final Path fpath = new Path(f); |
| FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM); |
| out.write("something".getBytes()); |
| out.sync(); |
| |
| // set the soft and hard limit to be 1 second so that the |
| // namenode triggers lease recovery |
| cluster.setLeasePeriod(leasePeriod, leasePeriod); |
| // wait for the lease to expire |
| try {Thread.sleep(5 * leasePeriod);} catch (InterruptedException e) {} |
| |
| LocatedBlocks locations = dfs.dfs.namenode.getBlockLocations( |
| f, 0, Long.MAX_VALUE); |
| assertEquals(1, locations.locatedBlockCount()); |
| LocatedBlock locatedblock = locations.getLocatedBlocks().get(0); |
| int successcount = 0; |
| for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) { |
| DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort); |
| FSDataset dataset = (FSDataset)datanode.data; |
| Block b = dataset.getStoredBlock(locatedblock.getBlock().blkid); |
| File blockfile = dataset.findBlockFile(b.getBlockId()); |
| System.out.println("blockfile=" + blockfile); |
| if (blockfile != null) { |
| BufferedReader in = new BufferedReader(new FileReader(blockfile)); |
| assertEquals("something", in.readLine()); |
| in.close(); |
| successcount++; |
| } |
| } |
| System.out.println("successcount=" + successcount); |
| assertTrue(successcount > 0); |
| } finally { |
| IOUtils.closeStream(dfs); |
| cluster.shutdown(); |
| } |
| |
| System.out.println("testLeaseExpireHardLimit successful"); |
| } |
| |
| /** Test lease recovery Triggered by DFSClient. */ |
| public void testClientTriggeredLeaseRecovery() throws Exception { |
| final int REPLICATION = 3; |
| Configuration conf = new Configuration(); |
| conf.setInt("dfs.datanode.handler.count", 1); |
| conf.setInt("dfs.replication", REPLICATION); |
| MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null); |
| |
| try { |
| final FileSystem fs = cluster.getFileSystem(); |
| final Path dir = new Path("/wrwelkj"); |
| |
| SlowWriter[] slowwriters = new SlowWriter[10]; |
| for(int i = 0; i < slowwriters.length; i++) { |
| slowwriters[i] = new SlowWriter(fs, new Path(dir, "file" + i)); |
| } |
| |
| try { |
| for(int i = 0; i < slowwriters.length; i++) { |
| slowwriters[i].start(); |
| } |
| |
| //stop a datanode, it should have least recover. |
| cluster.stopDataNode(new Random().nextInt(REPLICATION)); |
| |
| //let the slow writer writes a few more seconds |
| System.out.println("Wait a few seconds"); |
| Thread.sleep(5000); |
| } |
| finally { |
| for(int i = 0; i < slowwriters.length; i++) { |
| if (slowwriters[i] != null) { |
| slowwriters[i].interrupt(); |
| } |
| } |
| for(int i = 0; i < slowwriters.length; i++) { |
| if (slowwriters[i] != null) { |
| slowwriters[i].join(); |
| } |
| } |
| } |
| |
| //Verify the file |
| System.out.println("Verify the file"); |
| for(int i = 0; i < slowwriters.length; i++) { |
| System.out.println(slowwriters[i].filepath + ": length=" |
| + fs.getFileStatus(slowwriters[i].filepath).getLen()); |
| FSDataInputStream in = null; |
| try { |
| in = fs.open(slowwriters[i].filepath); |
| for(int j = 0, x; (x = in.read()) != -1; j++) { |
| assertEquals(j, x); |
| } |
| } |
| finally { |
| IOUtils.closeStream(in); |
| } |
| } |
| } finally { |
| if (cluster != null) {cluster.shutdown();} |
| } |
| } |
| |
| static class SlowWriter extends Thread { |
| final FileSystem fs; |
| final Path filepath; |
| |
| SlowWriter(FileSystem fs, Path filepath) { |
| super(SlowWriter.class.getSimpleName() + ":" + filepath); |
| this.fs = fs; |
| this.filepath = filepath; |
| } |
| |
| public void run() { |
| FSDataOutputStream out = null; |
| int i = 0; |
| try { |
| out = fs.create(filepath); |
| for(; ; i++) { |
| System.out.println(getName() + " writes " + i); |
| out.write(i); |
| out.sync(); |
| sleep(100); |
| } |
| } |
| catch(Exception e) { |
| System.out.println(getName() + " dies: e=" + e); |
| } |
| finally { |
| System.out.println(getName() + ": i=" + i); |
| IOUtils.closeStream(out); |
| } |
| } |
| } |
| } |