blob: 8b1a6c0814ca818a2c01d5bbfd7183d629f8e18e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.CacheStats.PageRounder;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.event.Level;
import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY;
@NotThreadSafe
public class TestFsDatasetCache {
private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestFsDatasetCache.class);
// Most Linux installs allow a default of 64KB locked memory
public static final long CACHE_CAPACITY = 64 * 1024;
// mlock always locks the entire page. So we don't need to deal with this
// rounding, use the OS page size for the block size.
private static final long PAGE_SIZE =
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
private static final long BLOCK_SIZE = PAGE_SIZE;
private static Configuration conf;
private static MiniDFSCluster cluster = null;
private static FileSystem fs;
private static NameNode nn;
private static FSImage fsImage;
private static DataNode dn;
private static FsDatasetSpi<?> fsd;
private static DatanodeProtocolClientSideTranslatorPB spyNN;
/**
* Used to pause DN BPServiceActor threads. BPSA threads acquire the
* shared read lock. The test acquires the write lock for exclusive access.
*/
private static ReadWriteLock lock = new ReentrantReadWriteLock(true);
private static final PageRounder rounder = new PageRounder();
private static CacheManipulator prevCacheManipulator;
private static DataNodeFaultInjector oldInjector;
static {
GenericTestUtils.setLogLevel(
LoggerFactory.getLogger(FsDatasetCache.class), Level.DEBUG);
}
@BeforeClass
public static void setUpClass() throws Exception {
oldInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
@Override
public void startOfferService() throws Exception {
lock.readLock().lock();
}
@Override
public void endOfferService() throws Exception {
lock.readLock().unlock();
}
});
}
@AfterClass
public static void tearDownClass() throws Exception {
DataNodeFaultInjector.set(oldInjector);
}
@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
conf.setLong(
DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 100);
conf.setLong(DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
CACHE_CAPACITY);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY, 10);
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
nn = cluster.getNameNode();
fsImage = nn.getFSImage();
dn = cluster.getDataNodes().get(0);
fsd = dn.getFSDataset();
spyNN = InternalDataNodeTestUtils.spyOnBposToNN(dn, nn);
}
@After
public void tearDown() throws Exception {
// Verify that each test uncached whatever it cached. This cleanup is
// required so that file descriptors are not leaked across tests.
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
// Restore the original CacheManipulator
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
}
private static void setHeartbeatResponse(DatanodeCommand[] cmds)
throws Exception {
lock.writeLock().lock();
try {
NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
fsImage.getLastAppliedOrWrittenTxId());
HeartbeatResponse response =
new HeartbeatResponse(cmds, ha, null,
ThreadLocalRandom.current().nextLong() | 1L);
doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean(), any(SlowPeerReports.class),
any(SlowDiskReports.class));
} finally {
lock.writeLock().unlock();
}
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
return cacheBlocks(new HdfsBlockLocation[] {loc});
}
private static DatanodeCommand[] cacheBlocks(HdfsBlockLocation[] locs) {
return new DatanodeCommand[] {
getResponse(locs, DatanodeProtocol.DNA_CACHE)
};
}
private static DatanodeCommand[] uncacheBlock(HdfsBlockLocation loc) {
return uncacheBlocks(new HdfsBlockLocation[] {loc});
}
private static DatanodeCommand[] uncacheBlocks(HdfsBlockLocation[] locs) {
return new DatanodeCommand[] {
getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
};
}
/**
* Creates a cache or uncache DatanodeCommand from an array of locations
*/
private static DatanodeCommand getResponse(HdfsBlockLocation[] locs,
int action) {
String bpid = locs[0].getLocatedBlock().getBlock().getBlockPoolId();
long[] blocks = new long[locs.length];
for (int i=0; i<locs.length; i++) {
blocks[i] = locs[i].getLocatedBlock().getBlock().getBlockId();
}
return new BlockIdCommand(action, bpid, blocks);
}
private static long[] getBlockSizes(HdfsBlockLocation[] locs)
throws Exception {
long[] sizes = new long[locs.length];
for (int i=0; i<locs.length; i++) {
HdfsBlockLocation loc = locs[i];
String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
Block block = loc.getLocatedBlock().getBlock().getLocalBlock();
ExtendedBlock extBlock = new ExtendedBlock(bpid, block);
FileInputStream blockInputStream = null;
FileChannel blockChannel = null;
try {
blockInputStream =
(FileInputStream)fsd.getBlockInputStream(extBlock, 0);
blockChannel = blockInputStream.getChannel();
sizes[i] = blockChannel.size();
} finally {
IOUtils.cleanupWithLogger(LOG, blockChannel, blockInputStream);
}
}
return sizes;
}
private void testCacheAndUncacheBlock() throws Exception {
LOG.info("beginning testCacheAndUncacheBlock");
final int NUM_BLOCKS = 5;
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
assertEquals(0, fsd.getNumBlocksCached());
// Write a test file
final Path testFile = new Path("/testCacheBlock");
final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 0xABBAl);
// Get the details of the written file
HdfsBlockLocation[] locs =
(HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0, testFileLen);
assertEquals("Unexpected number of blocks", NUM_BLOCKS, locs.length);
final long[] blockSizes = getBlockSizes(locs);
// Check initial state
final long cacheCapacity = fsd.getCacheCapacity();
long cacheUsed = fsd.getCacheUsed();
long current = 0;
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
assertEquals("Unexpected amount of cache used", current, cacheUsed);
MetricsRecordBuilder dnMetrics;
long numCacheCommands = 0;
long numUncacheCommands = 0;
// Cache each block in succession, checking each time
for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(cacheBlock(locs[i]));
current = DFSTestUtil.verifyExpectedCacheUsage(
current + blockSizes[i], i + 1, fsd);
dnMetrics = getMetrics(dn.getMetrics().name());
long cmds = MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
assertTrue("Expected more cache requests from the NN ("
+ cmds + " <= " + numCacheCommands + ")",
cmds > numCacheCommands);
numCacheCommands = cmds;
}
// Uncache each block in succession, again checking each time
for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(uncacheBlock(locs[i]));
current = DFSTestUtil.
verifyExpectedCacheUsage(current - blockSizes[i],
NUM_BLOCKS - 1 - i, fsd);
dnMetrics = getMetrics(dn.getMetrics().name());
long cmds = MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
assertTrue("Expected more uncache requests from the NN",
cmds > numUncacheCommands);
numUncacheCommands = cmds;
}
LOG.info("finishing testCacheAndUncacheBlock");
}
@Test(timeout=600000)
public void testCacheAndUncacheBlockSimple() throws Exception {
testCacheAndUncacheBlock();
}
/**
* Run testCacheAndUncacheBlock with some failures injected into the mlock
* call. This tests the ability of the NameNode to resend commands.
*/
@Test(timeout=600000)
public void testCacheAndUncacheBlockWithRetries() throws Exception {
// We don't have to save the previous cacheManipulator
// because it will be reinstalled by the @After function.
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
private final Set<String> seenIdentifiers = new HashSet<String>();
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
if (seenIdentifiers.contains(identifier)) {
// mlock succeeds the second time.
LOG.info("mlocking " + identifier);
return;
}
seenIdentifiers.add(identifier);
throw new IOException("injecting IOException during mlock of " +
identifier);
}
});
testCacheAndUncacheBlock();
}
@Test(timeout=600000)
public void testFilesExceedMaxLockedMemory() throws Exception {
LOG.info("beginning testFilesExceedMaxLockedMemory");
// Create some test files that will exceed total cache capacity
final int numFiles = 5;
final long fileSize = CACHE_CAPACITY / (numFiles-1);
final Path[] testFiles = new Path[numFiles];
final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][];
final long[] fileSizes = new long[numFiles];
for (int i=0; i<numFiles; i++) {
testFiles[i] = new Path("/testFilesExceedMaxLockedMemory-" + i);
DFSTestUtil.createFile(fs, testFiles[i], fileSize, (short)1, 0xDFAl);
fileLocs[i] = (HdfsBlockLocation[])fs.getFileBlockLocations(
testFiles[i], 0, fileSize);
// Get the file size (sum of blocks)
long[] sizes = getBlockSizes(fileLocs[i]);
for (int j=0; j<sizes.length; j++) {
fileSizes[i] += sizes[j];
}
}
// Cache the first n-1 files
long total = 0;
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(cacheBlocks(fileLocs[i]));
total = DFSTestUtil.verifyExpectedCacheUsage(
rounder.roundUp(total + fileSizes[i]), 4 * (i + 1), fsd);
}
// nth file should hit a capacity exception
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
setHeartbeatResponse(cacheBlocks(fileLocs[numFiles-1]));
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
// check the log reported by FsDataSetCache
// in the case that cache capacity is exceeded.
int lines = appender.countLinesWithMessage(
"could not reserve more bytes in the cache: ");
return lines > 0;
}
}, 500, 30000);
// Also check the metrics for the failure
assertTrue("Expected more than 0 failed cache attempts",
fsd.getNumBlocksFailedToCache() > 0);
// Uncache the n-1 files
int curCachedBlocks = 16;
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
long uncachedBytes = rounder.roundUp(fileSizes[i]);
total -= uncachedBytes;
curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);
}
LOG.info("finishing testFilesExceedMaxLockedMemory");
}
@Test(timeout=600000)
public void testUncachingBlocksBeforeCachingFinishes() throws Exception {
LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
final int NUM_BLOCKS = 5;
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
// Write a test file
final Path testFile = new Path("/testCacheBlock");
final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 0xABBAl);
// Get the details of the written file
HdfsBlockLocation[] locs =
(HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0, testFileLen);
assertEquals("Unexpected number of blocks", NUM_BLOCKS, locs.length);
final long[] blockSizes = getBlockSizes(locs);
// Check initial state
final long cacheCapacity = fsd.getCacheCapacity();
long cacheUsed = fsd.getCacheUsed();
long current = 0;
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
assertEquals("Unexpected amount of cache used", current, cacheUsed);
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
LOG.info("An mlock operation is starting on " + identifier);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Assert.fail();
}
}
});
// Starting caching each block in succession. The usedBytes amount
// should increase, even though caching doesn't complete on any of them.
for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(cacheBlock(locs[i]));
current = DFSTestUtil.verifyExpectedCacheUsage(
current + blockSizes[i], i + 1, fsd);
}
setHeartbeatResponse(new DatanodeCommand[] {
getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
});
// wait until all caching jobs are finished cancelling.
current = DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
}
@Test(timeout=60000)
public void testUncacheUnknownBlock() throws Exception {
// Create a file
Path fileName = new Path("/testUncacheUnknownBlock");
int fileLen = 4096;
DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
fileName, 0, fileLen);
// Try to uncache it without caching it first
setHeartbeatResponse(uncacheBlocks(locs));
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return fsd.getNumBlocksFailedToUncache() > 0;
}
}, 100, 10000);
}
@Test(timeout=600000)
public void testPageRounder() throws Exception {
// Write a small file
Path fileName = new Path("/testPageRounder");
final int smallBlocks = 512; // This should be smaller than the page size
assertTrue("Page size should be greater than smallBlocks!",
PAGE_SIZE > smallBlocks);
final int numBlocks = 5;
final int fileLen = smallBlocks * numBlocks;
FSDataOutputStream out =
fs.create(fileName, false, 4096, (short)1, smallBlocks);
out.write(new byte[fileLen]);
out.close();
HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
fileName, 0, fileLen);
// Cache the file and check the sizes match the page size
setHeartbeatResponse(cacheBlocks(locs));
DFSTestUtil.verifyExpectedCacheUsage(PAGE_SIZE * numBlocks, numBlocks, fsd);
// Uncache and check that it decrements by the page size too
setHeartbeatResponse(uncacheBlocks(locs));
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
}
@Test(timeout=60000)
public void testUncacheQuiesces() throws Exception {
// Create a file
Path fileName = new Path("/testUncacheQuiesces");
int fileLen = 4096;
DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
// Cache it
DistributedFileSystem dfs = cluster.getFileSystem();
dfs.addCachePool(new CachePoolInfo("pool"));
dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
.setPool("pool").setPath(fileName).setReplication((short)3).build());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
return blocksCached > 0;
}
}, 1000, 30000);
// Uncache it
dfs.removeCacheDirective(1);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksUncached =
MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
return blocksUncached > 0;
}
}, 1000, 30000);
// Make sure that no additional messages were sent
Thread.sleep(10000);
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
MetricsAsserts.assertCounter("BlocksCached", 1l, dnMetrics);
MetricsAsserts.assertCounter("BlocksUncached", 1l, dnMetrics);
}
@Test(timeout=60000)
public void testReCacheAfterUncache() throws Exception {
final int TOTAL_BLOCKS_PER_CACHE =
Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
BlockReaderTestUtil.enableHdfsCachingTracing();
Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
// Create a small file
final Path SMALL_FILE = new Path("/smallFile");
DFSTestUtil.createFile(fs, SMALL_FILE,
BLOCK_SIZE, (short)1, 0xcafe);
// Create a file that will take up the whole cache
final Path BIG_FILE = new Path("/bigFile");
DFSTestUtil.createFile(fs, BIG_FILE,
TOTAL_BLOCKS_PER_CACHE * BLOCK_SIZE, (short)1, 0xbeef);
final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.addCachePool(new CachePoolInfo("pool"));
final long bigCacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
.setPool("pool").setPath(BIG_FILE).setReplication((short)1).build());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
long blocksCached =
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
if (blocksCached != TOTAL_BLOCKS_PER_CACHE) {
LOG.info("waiting for " + TOTAL_BLOCKS_PER_CACHE + " to " +
"be cached. Right now only " + blocksCached + " blocks are cached.");
return false;
}
LOG.info(TOTAL_BLOCKS_PER_CACHE + " blocks are now cached.");
return true;
}
}, 1000, 30000);
// Try to cache a smaller file. It should fail.
final long shortCacheDirectiveId =
dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
.setPool("pool").setPath(SMALL_FILE).setReplication((short)1).build());
Thread.sleep(10000);
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
Assert.assertEquals(TOTAL_BLOCKS_PER_CACHE,
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics));
// Uncache the big file and verify that the small file can now be
// cached (regression test for HDFS-6107)
dfs.removeCacheDirective(bigCacheDirectiveId);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
RemoteIterator<CacheDirectiveEntry> iter;
try {
iter = dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().build());
CacheDirectiveEntry entry;
do {
entry = iter.next();
} while (entry.getInfo().getId() != shortCacheDirectiveId);
if (entry.getStats().getFilesCached() != 1) {
LOG.info("waiting for directive " + shortCacheDirectiveId +
" to be cached. stats = " + entry.getStats());
return false;
}
LOG.info("directive " + shortCacheDirectiveId + " has been cached.");
} catch (IOException e) {
Assert.fail("unexpected exception" + e.toString());
}
return true;
}
}, 1000, 30000);
dfs.removeCacheDirective(shortCacheDirectiveId);
}
}