blob: 2daca8632053e87eaddc9f535deb7c72042c612f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
/**
* Ensure that the DN reserves disk space equivalent to a full block for
* replica being written (RBW) & Replica being copied from another DN.
*/
public class TestSpaceReservation {
static final Log LOG = LogFactory.getLog(TestSpaceReservation.class);
private static final int DU_REFRESH_INTERVAL_MSEC = 500;
private static final int STORAGES_PER_DATANODE = 1;
private static final int BLOCK_SIZE = 1024 * 1024;
private static final int SMALL_BLOCK_SIZE = 1024;
protected MiniDFSCluster cluster;
private Configuration conf;
private DistributedFileSystem fs = null;
private DFSClient client = null;
FsVolumeReference singletonVolumeRef = null;
FsVolumeImpl singletonVolume = null;
private DataNodeFaultInjector old = null;
private static Random rand = new Random();
@Before
public void before() {
conf = new HdfsConfiguration();
}
private void initConfig(int blockSize) {
// Refresh disk usage information frequently.
conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
// Disable the scanner
conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
}
static {
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
}
/**
*
* @param blockSize
* @param perVolumeCapacity limit the capacity of each volume to the given
* value. If negative, then don't limit.
* @throws IOException
*/
private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
initConfig(blockSize);
cluster = new MiniDFSCluster
.Builder(conf)
.storagesPerDatanode(STORAGES_PER_DATANODE)
.numDataNodes(numDatanodes)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
cluster.waitActive();
if (perVolumeCapacity >= 0) {
try (FsDatasetSpi.FsVolumeReferences volumes =
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
singletonVolumeRef = volumes.get(0).obtainReference();
}
singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume());
singletonVolume.setCapacityForTesting(perVolumeCapacity);
}
}
@After
public void shutdownCluster() throws IOException {
if (singletonVolumeRef != null) {
singletonVolumeRef.close();
singletonVolumeRef = null;
}
if (client != null) {
client.close();
client = null;
}
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
if (old != null) {
DataNodeFaultInjector.set(old);
}
}
private void createFileAndTestSpaceReservation(
final String fileNamePrefix, final int fileBlockSize)
throws IOException, InterruptedException {
// Enough for 1 block + meta files + some delta.
final long configuredCapacity = fileBlockSize * 2 - 1;
startCluster(BLOCK_SIZE, 1, configuredCapacity);
FSDataOutputStream out = null;
Path path = new Path("/" + fileNamePrefix + ".dat");
try {
out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
out.write(buffer);
out.hsync();
int bytesWritten = buffer.length;
// Check that space was reserved for a full block minus the bytesWritten.
assertThat(singletonVolume.getReservedForReplicas(),
is((long) fileBlockSize - bytesWritten));
out.close();
out = null;
// Check that the reserved space has been released since we closed the
// file.
assertThat(singletonVolume.getReservedForReplicas(), is(0L));
// Reopen the file for appends and write 1 more byte.
out = fs.append(path);
out.write(buffer);
out.hsync();
bytesWritten += buffer.length;
// Check that space was again reserved for a full block minus the
// bytesWritten so far.
assertThat(singletonVolume.getReservedForReplicas(),
is((long) fileBlockSize - bytesWritten));
// Write once again and again verify the available space. This ensures
// that the reserved space is progressively adjusted to account for bytes
// written to disk.
out.write(buffer);
out.hsync();
bytesWritten += buffer.length;
assertThat(singletonVolume.getReservedForReplicas(),
is((long) fileBlockSize - bytesWritten));
} finally {
if (out != null) {
out.close();
}
}
}
@Test (timeout=300000)
public void testWithDefaultBlockSize()
throws IOException, InterruptedException {
createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE);
}
@Test (timeout=300000)
public void testWithNonDefaultBlockSize()
throws IOException, InterruptedException {
// Same test as previous one, but with a non-default block size.
createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2);
}
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test (timeout=300000)
public void testWithLimitedSpace() throws IOException {
// Cluster with just enough space for a full block + meta.
startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
final String methodName = GenericTestUtils.getMethodName();
Path file1 = new Path("/" + methodName + ".01.dat");
Path file2 = new Path("/" + methodName + ".02.dat");
// Create two files.
FSDataOutputStream os1 = null, os2 = null;
try {
os1 = fs.create(file1);
os2 = fs.create(file2);
// Write one byte to the first file.
byte[] data = new byte[1];
os1.write(data);
os1.hsync();
// Try to write one byte to the second file.
// The block allocation must fail.
thrown.expect(RemoteException.class);
os2.write(data);
os2.hsync();
} finally {
if (os1 != null) {
os1.close();
}
// os2.close() will fail as no block was allocated.
}
}
/**
* Ensure that reserved space is released when the client goes away
* unexpectedly.
*
* The verification is done for each replica in the write pipeline.
*
* @throws IOException
*/
@Test(timeout=300000)
public void testSpaceReleasedOnUnexpectedEof()
throws IOException, InterruptedException, TimeoutException {
final short replication = 3;
startCluster(BLOCK_SIZE, replication, -1);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
// Write 1 byte to the file and kill the writer.
FSDataOutputStream os = fs.create(file, replication);
os.write(new byte[1]);
os.hsync();
DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
// Ensure all space reserved for the replica was released on each
// DataNode.
for (DataNode dn : cluster.getDataNodes()) {
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return (volume.getReservedForReplicas() == 0);
}
}, 500, Integer.MAX_VALUE); // Wait until the test times out.
}
}
}
@SuppressWarnings("unchecked")
@Test(timeout = 30000)
public void testRBWFileCreationError() throws Exception {
final short replication = 1;
startCluster(BLOCK_SIZE, replication, -1);
final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
.get(0).getFSDataset().getFsVolumeReferences().get(0);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
// Mock BlockPoolSlice so that RBW file creation gives IOExcception
BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
Mockito.when(blockPoolSlice.createRbwFile((Block) Mockito.any()))
.thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
field.setAccessible(true);
Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
.get(fsVolumeImpl);
bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
try {
// Write 1 byte to the file
FSDataOutputStream os = fs.create(file, replication);
os.write(new byte[1]);
os.hsync();
os.close();
fail("Expecting IOException file creation failure");
} catch (IOException e) {
// Exception can be ignored (expected)
}
// Ensure RBW space reserved is released
assertTrue(
"Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas(),
fsVolumeImpl.getReservedForReplicas() == 0);
// Reserve some bytes to verify double clearing space should't happen
fsVolumeImpl.reserveSpaceForReplica(1000);
try {
// Write 1 byte to the file
FSDataOutputStream os = fs.create(new Path("/" + methodName + ".02.dat"),
replication);
os.write(new byte[1]);
os.hsync();
os.close();
fail("Expecting IOException file creation failure");
} catch (IOException e) {
// Exception can be ignored (expected)
}
// Ensure RBW space reserved is released only once
assertTrue(fsVolumeImpl.getReservedForReplicas() == 1000);
}
@Test(timeout = 30000)
public void testReservedSpaceInJMXBean() throws Exception {
final short replication = 1;
startCluster(BLOCK_SIZE, replication, -1);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
try (FSDataOutputStream os = fs.create(file, replication)) {
// Write 1 byte to the file
os.write(new byte[1]);
os.hsync();
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
final ObjectName mxbeanName = new ObjectName(
"Hadoop:service=DataNode,name=DataNodeInfo");
final String volumeInfo = (String) mbs.getAttribute(mxbeanName,
"VolumeInfo");
// verify reserved space for Replicas in JMX bean volume info
assertTrue(volumeInfo.contains("reservedSpaceForReplicas"));
}
}
@Test(timeout = 300000)
public void testTmpSpaceReserve() throws Exception {
final short replication = 2;
startCluster(BLOCK_SIZE, replication, -1);
final int byteCount1 = 100;
final int byteCount2 = 200;
final String methodName = GenericTestUtils.getMethodName();
// Test positive scenario
{
final Path file = new Path("/" + methodName + ".01.dat");
try (FSDataOutputStream os = fs.create(file, (short) 1)) {
// Write test data to the file
os.write(new byte[byteCount1]);
os.hsync();
}
BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
String firstReplicaNode = blockLocations[0].getNames()[0];
int newReplicaDNIndex = 0;
if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
.getDisplayName())) {
newReplicaDNIndex = 1;
}
FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
.get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
performReReplication(file, true);
assertEquals("Wrong reserve space for Tmp ", byteCount1,
fsVolumeImpl.getRecentReserved());
assertEquals("Reserved Tmp space is not released", 0,
fsVolumeImpl.getReservedForReplicas());
}
// Test when file creation fails
{
final Path file = new Path("/" + methodName + ".01.dat");
try (FSDataOutputStream os = fs.create(file, (short) 1)) {
// Write test data to the file
os.write(new byte[byteCount2]);
os.hsync();
}
BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
String firstReplicaNode = blockLocations[0].getNames()[0];
int newReplicaDNIndex = 0;
if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
.getDisplayName())) {
newReplicaDNIndex = 1;
}
BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
Mockito.when(blockPoolSlice.createTmpFile((Block) Mockito.any()))
.thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
.get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
// Reserve some bytes to verify double clearing space should't happen
fsVolumeImpl.reserveSpaceForReplica(1000);
Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
field.setAccessible(true);
@SuppressWarnings("unchecked")
Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
.get(fsVolumeImpl);
bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
performReReplication(file, false);
assertEquals("Wrong reserve space for Tmp ", byteCount2,
fsVolumeImpl.getRecentReserved());
assertEquals("Tmp space is not released OR released twice", 1000,
fsVolumeImpl.getReservedForReplicas());
}
}
private void performReReplication(Path filePath, boolean waitForSuccess)
throws Exception {
fs.setReplication(filePath, (short) 2);
Thread.sleep(4000);
BlockLocation[] blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
if (waitForSuccess) {
// Wait for the re replication
while (blockLocations[0].getNames().length < 2) {
Thread.sleep(2000);
blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
}
}
}
/**
* Stress test to ensure we are not leaking reserved space.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=600000)
public void stressTest() throws IOException, InterruptedException {
final int numWriters = 5;
startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
Writer[] writers = new Writer[numWriters];
// Start a few writers and let them run for a while.
for (int i = 0; i < numWriters; ++i) {
writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
writers[i].start();
}
Thread.sleep(60000);
// Stop the writers.
for (Writer w : writers) {
w.stopWriter();
}
int filesCreated = 0;
int numFailures = 0;
for (Writer w : writers) {
w.join();
filesCreated += w.getFilesCreated();
numFailures += w.getNumFailures();
}
LOG.info("Stress test created " + filesCreated +
" files and hit " + numFailures + " failures");
// Check no space was leaked.
assertThat(singletonVolume.getReservedForReplicas(), is(0L));
}
private static class Writer extends Daemon {
private volatile boolean keepRunning;
private final DFSClient localClient;
private int filesCreated = 0;
private int numFailures = 0;
byte[] data;
Writer(DFSClient client, int blockSize) throws IOException {
localClient = client;
keepRunning = true;
filesCreated = 0;
numFailures = 0;
// At least some of the files should span a block boundary.
data = new byte[blockSize * 2];
}
@Override
public void run() {
/**
* Create a file, write up to 3 blocks of data and close the file.
* Do this in a loop until we are told to stop.
*/
while (keepRunning) {
OutputStream os = null;
try {
String filename = "/file-" + rand.nextLong();
os = localClient.create(filename, false);
os.write(data, 0, rand.nextInt(data.length));
IOUtils.closeQuietly(os);
os = null;
localClient.delete(filename, false);
Thread.sleep(50); // Sleep for a bit to avoid killing the system.
++filesCreated;
} catch (IOException ioe) {
// Just ignore the exception and keep going.
++numFailures;
} catch (InterruptedException ie) {
return;
} finally {
if (os != null) {
IOUtils.closeQuietly(os);
}
}
}
}
public void stopWriter() {
keepRunning = false;
}
public int getFilesCreated() {
return filesCreated;
}
public int getNumFailures() {
return numFailures;
}
}
@Test(timeout = 30000)
public void testReservedSpaceForAppend() throws Exception {
final short replication = 3;
startCluster(BLOCK_SIZE, replication, -1);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
// Write 1 byte to the file and kill the writer.
FSDataOutputStream os = fs.create(file, replication);
os.write(new byte[1024]);
os.close();
final Path file2 = new Path("/" + methodName + ".02.dat");
// Write 1 byte to the file and keep it open.
FSDataOutputStream os2 = fs.create(file2, replication);
os2.write(new byte[1]);
os2.hflush();
int expectedFile2Reserved = BLOCK_SIZE - 1;
checkReservedSpace(expectedFile2Reserved);
// append one byte and verify reservedspace before and after closing
os = fs.append(file);
os.write(new byte[1]);
os.hflush();
int expectedFile1Reserved = BLOCK_SIZE - 1025;
checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
os.close();
checkReservedSpace(expectedFile2Reserved);
// append one byte and verify reservedspace before and after abort
os = fs.append(file);
os.write(new byte[1]);
os.hflush();
expectedFile1Reserved--;
checkReservedSpace(expectedFile2Reserved + expectedFile1Reserved);
DFSTestUtil.abortStream(((DFSOutputStream) os.getWrappedStream()));
checkReservedSpace(expectedFile2Reserved);
}
@Test(timeout = 30000)
public void testReservedSpaceForPipelineRecovery() throws Exception {
final short replication = 3;
startCluster(BLOCK_SIZE, replication, -1);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
old = DataNodeFaultInjector.get();
// Fault injector to fail connection to mirror first time.
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
private int tries = 0;
@Override
public void failMirrorConnection() throws IOException {
if (tries++ == 0) {
throw new IOException("Failing Mirror for space reservation");
}
}
});
// Write 1 byte to the file and kill the writer.
FSDataOutputStream os = fs.create(file, replication);
os.write(new byte[1]);
os.close();
// Ensure all space reserved for the replica was released on each
// DataNode.
cluster.triggerBlockReports();
for (final DataNode dn : cluster.getDataNodes()) {
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info("dn " + dn.getDisplayName() + " space : "
+ volume.getReservedForReplicas());
return (volume.getReservedForReplicas() == 0);
}
}, 100, Integer.MAX_VALUE); // Wait until the test times out.
}
}
}
private void checkReservedSpace(final long expectedReserved) throws TimeoutException,
InterruptedException, IOException {
for (final DataNode dn : cluster.getDataNodes()) {
try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset()
.getFsVolumeReferences()) {
final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LOG.info(
"dn " + dn.getDisplayName() + " space : " + volume
.getReservedForReplicas() + ", Expected ReservedSpace :"
+ expectedReserved);
return (volume.getReservedForReplicas() == expectedReserved);
}
}, 100, 3000);
}
}
}
@Test(timeout = 60000)
public void testReservedSpaceForLeaseRecovery() throws Exception {
final short replication = 3;
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
1000);
startCluster(BLOCK_SIZE, replication, -1);
final String methodName = GenericTestUtils.getMethodName();
final Path file = new Path("/" + methodName + ".01.dat");
// Write to the file and kill the writer.
FSDataOutputStream os = fs.create(file, replication);
os.write(new byte[8192]);
os.hflush();
os.close();
/*
* Reset the pipeline for the append in such a way that, datanode which is
* down is one of the mirror, not the first datanode.
*/
HdfsBlockLocation blockLocation = (HdfsBlockLocation) fs.getClient()
.getBlockLocations(file.toString(), 0, BLOCK_SIZE)[0];
LocatedBlock lastBlock = blockLocation.getLocatedBlock();
// stop 3rd node.
cluster.stopDataNode(lastBlock.getLocations()[2].getName());
try {
os = fs.append(file);
DFSTestUtil.setPipeline((DFSOutputStream) os.getWrappedStream(),
lastBlock);
os.writeBytes("hi");
os.hsync();
} catch (IOException e) {
// Append will fail due to not able to replace datanodes in 3 nodes
// cluster.
LOG.info("", e);
}
DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
/*
* There is a chance that stopped DN could be chosen as primary for
* recovery. If so, then recovery will not happen in time. So mark stopped
* node as dead to exclude that node.
*/
cluster.setDataNodeDead(lastBlock.getLocations()[2]);
fs.recoverLease(file);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return fs.isFileClosed(file);
} catch (IOException e) {
return false;
}
}
}, 500, 30000);
checkReservedSpace(0);
}
}