blob: 8c435923e7777afb782501924f1f48cd25c16f07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class TestLazyPersistFiles extends LazyPersistTestCase {
private static final int THREADPOOL_SIZE = 10;
/**
* Append to lazy persist file is denied.
* @throws IOException
*/
@Test
public void testAppendIsDenied() throws IOException {
getClusterBuilder().build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
try {
client.append(path.toString(), BUFFER_LENGTH,
EnumSet.of(CreateFlag.APPEND), null, null).close();
fail("Append to LazyPersist file did not fail as expected");
} catch (Throwable t) {
LOG.info("Got expected exception ", t);
}
}
/**
* Truncate to lazy persist file is denied.
* @throws IOException
*/
@Test
public void testTruncateIsDenied() throws IOException {
getClusterBuilder().build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
try {
client.truncate(path.toString(), BLOCK_SIZE/2);
fail("Truncate to LazyPersist file did not fail as expected");
} catch (Throwable t) {
LOG.info("Got expected exception ", t);
}
}
/**
* If one or more replicas of a lazyPersist file are lost, then the file
* must be discarded by the NN, instead of being kept around as a
* 'corrupt' file.
*/
@Test
public void testCorruptFilesAreDiscarded()
throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(2).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Stop the DataNode and sleep for the time it takes the NN to
// detect the DN as being dead.
cluster.shutdownDataNodes();
Thread.sleep(30000L);
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
// Next, wait for the replication monitor to mark the file as corrupt
Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
// Wait for the LazyPersistFileScrubber to run
Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
// Ensure that path1 does not exist anymore, whereas path2 does.
assert(!fs.exists(path1));
// We should have zero blocks that needs replication i.e. the one
// belonging to path2.
assertThat(cluster.getNameNode()
.getNamesystem()
.getBlockManager()
.getUnderReplicatedBlocksCount(),
is(0L));
}
@Test
public void testDisableLazyPersistFileScrubber()
throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(2).disableScrubber().build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Stop the DataNode and sleep for the time it takes the NN to
// detect the DN as being dead.
cluster.shutdownDataNodes();
Thread.sleep(30000L);
// Next, wait for the replication monitor to mark the file as corrupt
Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
// Wait for the LazyPersistFileScrubber to run
Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
// Ensure that path1 exist.
Assert.assertTrue(fs.exists(path1));
}
/**
* If NN restarted then lazyPersist files should not deleted
*/
@Test
public void testFileShouldNotDiscardedIfNNRestarted()
throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(2).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
cluster.shutdownDataNodes();
cluster.restartNameNodes();
// wait for the replication monitor to mark the file as corrupt
Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
Long corruptBlkCount = (long) Iterators.size(cluster.getNameNode()
.getNamesystem().getBlockManager().getCorruptReplicaBlockIterator());
// Check block detected as corrupted
assertThat(corruptBlkCount, is(1L));
// Ensure path1 exist.
Assert.assertTrue(fs.exists(path1));
}
/**
* Concurrent read from the same node and verify the contents.
*/
@Test
public void testConcurrentRead()
throws Exception {
getClusterBuilder().setRamDiskReplicaCapacity(2).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".dat");
final int SEED = 0xFADED;
final int NUM_TASKS = 5;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
//Read from multiple clients
final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
final AtomicBoolean testFailed = new AtomicBoolean(false);
Runnable readerRunnable = new Runnable() {
@Override
public void run() {
try {
Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
} catch (Throwable e) {
LOG.error("readerRunnable error", e);
testFailed.set(true);
} finally {
latch.countDown();
}
}
};
Thread threads[] = new Thread[NUM_TASKS];
for (int i = 0; i < NUM_TASKS; i++) {
threads[i] = new Thread(readerRunnable);
threads[i].start();
}
Thread.sleep(500);
for (int i = 0; i < NUM_TASKS; i++) {
Uninterruptibles.joinUninterruptibly(threads[i]);
}
Assert.assertFalse(testFailed.get());
}
/**
* Concurrent write with eviction
* RAM_DISK can hold 9 replicas
* 4 threads each write 5 replicas
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testConcurrentWrites()
throws IOException, InterruptedException {
getClusterBuilder().setRamDiskReplicaCapacity(9).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED;
final int NUM_WRITERS = 4;
final int NUM_WRITER_PATHS = 5;
Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS];
for (int i = 0; i < NUM_WRITERS; i++) {
paths[i] = new Path[NUM_WRITER_PATHS];
for (int j = 0; j < NUM_WRITER_PATHS; j++) {
paths[i][j] =
new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat");
}
}
final CountDownLatch latch = new CountDownLatch(NUM_WRITERS);
final AtomicBoolean testFailed = new AtomicBoolean(false);
ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
for (int i = 0; i < NUM_WRITERS; i++) {
Runnable writer = new WriterRunnable(i, paths[i], SEED, latch, testFailed);
executor.execute(writer);
}
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Stop executor from adding new tasks to finish existing threads in queue
latch.await();
assertThat(testFailed.get(), is(false));
}
class WriterRunnable implements Runnable {
private final int id;
private final Path paths[];
private final int seed;
private CountDownLatch latch;
private AtomicBoolean bFail;
public WriterRunnable(int threadIndex, Path[] paths,
int seed, CountDownLatch latch,
AtomicBoolean bFail) {
id = threadIndex;
this.paths = paths;
this.seed = seed;
this.latch = latch;
this.bFail = bFail;
System.out.println("Creating Writer: " + id);
}
public void run() {
System.out.println("Writer " + id + " starting... ");
int i = 0;
try {
for (i = 0; i < paths.length; i++) {
makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed);
// eviction may faiL when all blocks are not persisted yet.
// ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
}
} catch (IOException e) {
bFail.set(true);
LOG.error("Writer exception: writer id:" + id +
" testfile: " + paths[i].toString() +
" " + e);
} finally {
latch.countDown();
}
}
}
}