blob: 2de5568f48ddc6e805b911f4b8cbf1a16feb1ddf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.raid;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.RaidDFSUtil;
import org.apache.hadoop.hdfs.TestRaidDfs;
import org.apache.hadoop.raid.RaidNode;
public class TestBlockFixerDistConcurrency extends TestBlockFixer {
/**
* tests that we can have 2 concurrent jobs fixing files
* (dist block fixer)
*/
@Test
public void testConcurrentJobs() throws Exception {
LOG.info("Test testConcurrentJobs started.");
long blockSize = 8192L;
int stripeLength = 3;
mySetup(stripeLength, -1); // never har
Path file1 = new Path("/user/dhruba/raidtest/file1");
Path file2 = new Path("/user/dhruba/raidtest/file2");
Path destPath = new Path("/destraid/user/dhruba/raidtest");
long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
1, 20, blockSize);
long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
1, 20, blockSize);
long file1Len = fileSys.getFileStatus(file1).getLen();
long file2Len = fileSys.getFileStatus(file2).getLen();
LOG.info("Test testConcurrentJobs created test files");
// create an instance of the RaidNode
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
localConf.setInt("raid.blockfix.interval", 1000);
localConf.set("raid.blockfix.classname",
"org.apache.hadoop.raid.DistBlockFixer");
localConf.setLong("raid.blockfix.filespertask", 2L);
try {
cnode = RaidNode.createRaidNode(null, localConf);
TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
cnode.stop(); cnode.join();
FileStatus file1Stat = fileSys.getFileStatus(file1);
FileStatus file2Stat = fileSys.getFileStatus(file2);
DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
LocatedBlocks file1Loc =
RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
0, file1Stat.getLen());
LocatedBlocks file2Loc =
RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
0, file2Stat.getLen());
String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
assertEquals("no corrupt files expected", 0, corruptFiles.length);
assertEquals("filesFixed() should return 0 before fixing files",
0, cnode.blockFixer.filesFixed());
// corrupt file1
int[] corruptBlockIdxs = new int[]{0, 4, 6};
for (int idx: corruptBlockIdxs)
corruptBlock(file1Loc.get(idx).getBlock());
reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
cnode = RaidNode.createRaidNode(null, localConf);
DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
long start = System.currentTimeMillis();
while (blockFixer.jobsRunning() < 1 &&
System.currentTimeMillis() - start < 240000) {
LOG.info("Test testBlockFix waiting for fixing job 1 to start");
Thread.sleep(10);
}
assertEquals("job 1 not running", 1, blockFixer.jobsRunning());
// corrupt file2
for (int idx: corruptBlockIdxs)
corruptBlock(file2Loc.get(idx).getBlock());
reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
while (blockFixer.jobsRunning() < 2 &&
System.currentTimeMillis() - start < 240000) {
LOG.info("Test testBlockFix waiting for fixing job 2 to start");
Thread.sleep(10);
}
assertEquals("2 jobs not running", 2, blockFixer.jobsRunning());
while (blockFixer.filesFixed() < 2 &&
System.currentTimeMillis() - start < 240000) {
LOG.info("Test testBlockFix waiting for files to be fixed.");
Thread.sleep(10);
}
assertEquals("files not fixed", 2, blockFixer.filesFixed());
dfs = getDFS(conf, dfs);
try {
Thread.sleep(5*1000);
} catch (InterruptedException ignore) {
}
assertTrue("file not fixed",
TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
assertTrue("file not fixed",
TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
} catch (Exception e) {
LOG.info("Test testConcurrentJobs exception " + e +
StringUtils.stringifyException(e));
throw e;
} finally {
myTearDown();
}
}
/**
* tests that the distributed block fixer obeys
* the limit on how many files to fix simultaneously
*/
@Test
public void testMaxPendingFiles() throws Exception {
LOG.info("Test testMaxPendingFiles started.");
long blockSize = 8192L;
int stripeLength = 3;
mySetup(stripeLength, -1); // never har
Path file1 = new Path("/user/dhruba/raidtest/file1");
Path file2 = new Path("/user/dhruba/raidtest/file2");
Path destPath = new Path("/destraid/user/dhruba/raidtest");
long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
1, 20, blockSize);
long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
1, 20, blockSize);
long file1Len = fileSys.getFileStatus(file1).getLen();
long file2Len = fileSys.getFileStatus(file2).getLen();
LOG.info("Test testMaxPendingFiles created test files");
// create an instance of the RaidNode
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
localConf.setInt("raid.blockfix.interval", 1000);
localConf.set("raid.blockfix.classname",
"org.apache.hadoop.raid.DistBlockFixer");
localConf.setLong("raid.blockfix.filespertask", 2L);
localConf.setLong("raid.blockfix.maxpendingfiles", 1L);
try {
cnode = RaidNode.createRaidNode(null, localConf);
TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
cnode.stop(); cnode.join();
FileStatus file1Stat = fileSys.getFileStatus(file1);
FileStatus file2Stat = fileSys.getFileStatus(file2);
DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
LocatedBlocks file1Loc =
RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
0, file1Stat.getLen());
LocatedBlocks file2Loc =
RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
0, file2Stat.getLen());
String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
assertEquals("no corrupt files expected", 0, corruptFiles.length);
assertEquals("filesFixed() should return 0 before fixing files",
0, cnode.blockFixer.filesFixed());
// corrupt file1
int[] corruptBlockIdxs = new int[]{0, 4, 6};
for (int idx: corruptBlockIdxs)
corruptBlock(file1Loc.get(idx).getBlock());
reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
cnode = RaidNode.createRaidNode(null, localConf);
DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
long start = System.currentTimeMillis();
while (blockFixer.jobsRunning() < 1 &&
System.currentTimeMillis() - start < 240000) {
LOG.info("Test testBlockFix waiting for fixing job 1 to start");
Thread.sleep(10);
}
assertEquals("job not running", 1, blockFixer.jobsRunning());
// corrupt file2
for (int idx: corruptBlockIdxs)
corruptBlock(file2Loc.get(idx).getBlock());
reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
// wait until both files are fixed
while (blockFixer.filesFixed() < 2 &&
System.currentTimeMillis() - start < 240000) {
// make sure the block fixer does not start a second job while
// the first one is still running
assertTrue("too many jobs running", blockFixer.jobsRunning() <= 1);
Thread.sleep(10);
}
assertEquals("files not fixed", 2, blockFixer.filesFixed());
dfs = getDFS(conf, dfs);
try {
Thread.sleep(5*1000);
} catch (InterruptedException ignore) {
}
assertTrue("file not fixed",
TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
assertTrue("file not fixed",
TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
} catch (Exception e) {
LOG.info("Test testMaxPendingFiles exception " + e +
StringUtils.stringifyException(e));
throw e;
} finally {
myTearDown();
}
}
}