blob: 74ac16708f87e1fe3d806ceb3adc2917600716fd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Random;
/**
* Ensure that the DN reserves disk space equivalent to a full block for
* replica being written (RBW).
*/
public class TestRbwSpaceReservation {
static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
private static final short REPL_FACTOR = 1;
private static final int DU_REFRESH_INTERVAL_MSEC = 500;
private static final int STORAGES_PER_DATANODE = 1;
private static final int BLOCK_SIZE = 1024 * 1024;
private static final int SMALL_BLOCK_SIZE = 1024;
protected MiniDFSCluster cluster;
private Configuration conf;
private DistributedFileSystem fs = null;
private DFSClient client = null;
FsVolumeImpl singletonVolume = null;
private static Random rand = new Random();
private void initConfig(int blockSize) {
conf = new HdfsConfiguration();
// Refresh disk usage information frequently.
conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
// Disable the scanner
conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
}
static {
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
private void startCluster(int blockSize, long perVolumeCapacity) throws IOException {
initConfig(blockSize);
cluster = new MiniDFSCluster
.Builder(conf)
.storagesPerDatanode(STORAGES_PER_DATANODE)
.numDataNodes(REPL_FACTOR)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
cluster.waitActive();
if (perVolumeCapacity >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0));
singletonVolume.setCapacityForTesting(perVolumeCapacity);
}
}
@After
public void shutdownCluster() throws IOException {
if (client != null) {
client.close();
client = null;
}
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private void createFileAndTestSpaceReservation(
final String fileNamePrefix, final int fileBlockSize)
throws IOException, InterruptedException {
// Enough for 1 block + meta files + some delta.
final long configuredCapacity = fileBlockSize * 2 - 1;
startCluster(BLOCK_SIZE, configuredCapacity);
FSDataOutputStream out = null;
Path path = new Path("/" + fileNamePrefix + ".dat");
try {
out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
out.write(buffer);
out.hsync();
int bytesWritten = buffer.length;
// Check that space was reserved for a full block minus the bytesWritten.
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
out.close();
out = null;
// Check that the reserved space has been released since we closed the
// file.
assertThat(singletonVolume.getReservedForRbw(), is(0L));
// Reopen the file for appends and write 1 more byte.
out = fs.append(path);
out.write(buffer);
out.hsync();
bytesWritten += buffer.length;
// Check that space was again reserved for a full block minus the
// bytesWritten so far.
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
// Write once again and again verify the available space. This ensures
// that the reserved space is progressively adjusted to account for bytes
// written to disk.
out.write(buffer);
out.hsync();
bytesWritten += buffer.length;
assertThat(singletonVolume.getReservedForRbw(),
is((long) fileBlockSize - bytesWritten));
} finally {
if (out != null) {
out.close();
}
}
}
@Test (timeout=300000)
public void testWithDefaultBlockSize()
throws IOException, InterruptedException {
createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE);
}
@Test (timeout=300000)
public void testWithNonDefaultBlockSize()
throws IOException, InterruptedException {
// Same test as previous one, but with a non-default block size.
createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2);
}
/**
* Stress test to ensure we are not leaking reserved space.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=600000)
public void stressTest() throws IOException, InterruptedException {
final int numWriters = 5;
startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10);
Writer[] writers = new Writer[numWriters];
// Start a few writers and let them run for a while.
for (int i = 0; i < numWriters; ++i) {
writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
writers[i].start();
}
Thread.sleep(60000);
// Stop the writers.
for (Writer w : writers) {
w.stopWriter();
}
int filesCreated = 0;
int numFailures = 0;
for (Writer w : writers) {
w.join();
filesCreated += w.getFilesCreated();
numFailures += w.getNumFailures();
}
LOG.info("Stress test created " + filesCreated +
" files and hit " + numFailures + " failures");
// Check no space was leaked.
assertThat(singletonVolume.getReservedForRbw(), is(0L));
}
private static class Writer extends Daemon {
private volatile boolean keepRunning;
private final DFSClient localClient;
private int filesCreated = 0;
private int numFailures = 0;
byte[] data;
Writer(DFSClient client, int blockSize) throws IOException {
localClient = client;
keepRunning = true;
filesCreated = 0;
numFailures = 0;
// At least some of the files should span a block boundary.
data = new byte[blockSize * 2];
}
@Override
public void run() {
/**
* Create a file, write up to 3 blocks of data and close the file.
* Do this in a loop until we are told to stop.
*/
while (keepRunning) {
OutputStream os = null;
try {
String filename = "/file-" + rand.nextLong();
os = localClient.create(filename, false);
os.write(data, 0, rand.nextInt(data.length));
IOUtils.closeQuietly(os);
os = null;
localClient.delete(filename, false);
Thread.sleep(50); // Sleep for a bit to avoid killing the system.
++filesCreated;
} catch (IOException ioe) {
// Just ignore the exception and keep going.
++numFailures;
} catch (InterruptedException ie) {
return;
} finally {
if (os != null) {
IOUtils.closeQuietly(os);
}
}
}
}
public void stopWriter() {
keepRunning = false;
}
public int getFilesCreated() {
return filesCreated;
}
public int getNumFailures() {
return numFailures;
}
}
}