blob: 7ba0edcecc6c0fbe2bb5db4f59f63992ee4732fa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.shortcircuit;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheVisitor;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaCreator;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.HashMultimap;
@NotThreadSafe
public class TestShortCircuitCache {
static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
private static class TestFileDescriptorPair {
final TemporarySocketDirectory dir = new TemporarySocketDirectory();
final FileInputStream[] fis;
public TestFileDescriptorPair() throws IOException {
fis = new FileInputStream[2];
for (int i = 0; i < 2; i++) {
String name = dir.getDir() + "/file" + i;
FileOutputStream fos = new FileOutputStream(name);
if (i == 0) {
// write 'data' file
fos.write(1);
} else {
// write 'metadata' file
BlockMetadataHeader header =
new BlockMetadataHeader((short)1,
DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4));
DataOutputStream dos = new DataOutputStream(fos);
BlockMetadataHeader.writeHeader(dos, header);
dos.close();
}
fos.close();
fis[i] = new FileInputStream(name);
}
}
public FileInputStream[] getFileInputStreams() {
return fis;
}
public void close() throws IOException {
IOUtils.cleanup(LOG, fis);
dir.close();
}
public boolean compareWith(FileInputStream data, FileInputStream meta) {
return ((data == fis[0]) && (meta == fis[1]));
}
}
private static class SimpleReplicaCreator
implements ShortCircuitReplicaCreator {
private final int blockId;
private final ShortCircuitCache cache;
private final TestFileDescriptorPair pair;
SimpleReplicaCreator(int blockId, ShortCircuitCache cache,
TestFileDescriptorPair pair) {
this.blockId = blockId;
this.cache = cache;
this.pair = pair;
}
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
try {
ExtendedBlockId key = new ExtendedBlockId(blockId, "test_bp1");
return new ShortCircuitReplicaInfo(
new ShortCircuitReplica(key,
pair.getFileInputStreams()[0], pair.getFileInputStreams()[1],
cache, Time.monotonicNow(), null));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@Test(timeout=60000)
public void testCreateAndDestroy() throws Exception {
ShortCircuitCache cache =
new ShortCircuitCache(10, 1, 10, 1, 1, 10000, 0);
cache.close();
}
@Test(timeout=60000)
public void testAddAndRetrieve() throws Exception {
final ShortCircuitCache cache =
new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0);
final TestFileDescriptorPair pair = new TestFileDescriptorPair();
ShortCircuitReplicaInfo replicaInfo1 =
cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
new SimpleReplicaCreator(123, cache, pair));
Preconditions.checkNotNull(replicaInfo1.getReplica());
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
pair.compareWith(replicaInfo1.getReplica().getDataStream(),
replicaInfo1.getReplica().getMetaStream());
ShortCircuitReplicaInfo replicaInfo2 =
cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Assert.fail("expected to use existing entry.");
return null;
}
});
Preconditions.checkNotNull(replicaInfo2.getReplica());
Preconditions.checkState(replicaInfo2.getInvalidTokenException() == null);
Preconditions.checkState(replicaInfo1 == replicaInfo2);
pair.compareWith(replicaInfo2.getReplica().getDataStream(),
replicaInfo2.getReplica().getMetaStream());
replicaInfo1.getReplica().unref();
replicaInfo2.getReplica().unref();
// Even after the reference count falls to 0, we still keep the replica
// around for a while (we have configured the expiry period to be really,
// really long here)
ShortCircuitReplicaInfo replicaInfo3 =
cache.fetchOrCreate(
new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Assert.fail("expected to use existing entry.");
return null;
}
});
Preconditions.checkNotNull(replicaInfo3.getReplica());
Preconditions.checkState(replicaInfo3.getInvalidTokenException() == null);
replicaInfo3.getReplica().unref();
pair.close();
cache.close();
}
@Test(timeout=100000)
public void testExpiry() throws Exception {
final ShortCircuitCache cache =
new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000000, 0);
final TestFileDescriptorPair pair = new TestFileDescriptorPair();
ShortCircuitReplicaInfo replicaInfo1 =
cache.fetchOrCreate(
new ExtendedBlockId(123, "test_bp1"),
new SimpleReplicaCreator(123, cache, pair));
Preconditions.checkNotNull(replicaInfo1.getReplica());
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
pair.compareWith(replicaInfo1.getReplica().getDataStream(),
replicaInfo1.getReplica().getMetaStream());
replicaInfo1.getReplica().unref();
final MutableBoolean triedToCreate = new MutableBoolean(false);
do {
Thread.sleep(10);
ShortCircuitReplicaInfo replicaInfo2 =
cache.fetchOrCreate(
new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
triedToCreate.setValue(true);
return null;
}
});
if ((replicaInfo2 != null) && (replicaInfo2.getReplica() != null)) {
replicaInfo2.getReplica().unref();
}
} while (triedToCreate.isFalse());
cache.close();
}
@Test(timeout=60000)
public void testEviction() throws Exception {
final ShortCircuitCache cache =
new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000, 0);
final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
new TestFileDescriptorPair(),
new TestFileDescriptorPair(),
new TestFileDescriptorPair(),
};
ShortCircuitReplicaInfo replicaInfos[] = new ShortCircuitReplicaInfo[] {
null,
null,
null
};
for (int i = 0; i < pairs.length; i++) {
replicaInfos[i] = cache.fetchOrCreate(
new ExtendedBlockId(i, "test_bp1"),
new SimpleReplicaCreator(i, cache, pairs[i]));
Preconditions.checkNotNull(replicaInfos[i].getReplica());
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
replicaInfos[i].getReplica().getMetaStream());
}
// At this point, we have 3 replicas in use.
// Let's close them all.
for (int i = 0; i < pairs.length; i++) {
replicaInfos[i].getReplica().unref();
}
// The last two replicas should still be cached.
for (int i = 1; i < pairs.length; i++) {
final Integer iVal = i;
replicaInfos[i] = cache.fetchOrCreate(
new ExtendedBlockId(i, "test_bp1"),
new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Assert.fail("expected to use existing entry for " + iVal);
return null;
}
});
Preconditions.checkNotNull(replicaInfos[i].getReplica());
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
replicaInfos[i].getReplica().getMetaStream());
}
// The first (oldest) replica should not be cached.
final MutableBoolean calledCreate = new MutableBoolean(false);
replicaInfos[0] = cache.fetchOrCreate(
new ExtendedBlockId(0, "test_bp1"),
new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
calledCreate.setValue(true);
return null;
}
});
Preconditions.checkState(replicaInfos[0].getReplica() == null);
Assert.assertTrue(calledCreate.isTrue());
// Clean up
for (int i = 1; i < pairs.length; i++) {
replicaInfos[i].getReplica().unref();
}
for (int i = 0; i < pairs.length; i++) {
pairs[i].close();
}
cache.close();
}
@Test(timeout=60000)
public void testTimeBasedStaleness() throws Exception {
// Set up the cache with a short staleness time.
final ShortCircuitCache cache =
new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10, 0);
final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] {
new TestFileDescriptorPair(),
new TestFileDescriptorPair(),
};
ShortCircuitReplicaInfo replicaInfos[] = new ShortCircuitReplicaInfo[] {
null,
null
};
final long HOUR_IN_MS = 60 * 60 * 1000;
for (int i = 0; i < pairs.length; i++) {
final Integer iVal = i;
final ExtendedBlockId key = new ExtendedBlockId(i, "test_bp1");
replicaInfos[i] = cache.fetchOrCreate(key,
new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
try {
return new ShortCircuitReplicaInfo(
new ShortCircuitReplica(key,
pairs[iVal].getFileInputStreams()[0],
pairs[iVal].getFileInputStreams()[1],
cache, Time.monotonicNow() + (iVal * HOUR_IN_MS), null));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
Preconditions.checkNotNull(replicaInfos[i].getReplica());
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
pairs[i].compareWith(replicaInfos[i].getReplica().getDataStream(),
replicaInfos[i].getReplica().getMetaStream());
}
// Keep trying to getOrCreate block 0 until it goes stale (and we must re-create.)
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
ShortCircuitReplicaInfo info = cache.fetchOrCreate(
new ExtendedBlockId(0, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
return null;
}
});
if (info.getReplica() != null) {
info.getReplica().unref();
return false;
}
return true;
}
}, 500, 60000);
// Make sure that second replica did not go stale.
ShortCircuitReplicaInfo info = cache.fetchOrCreate(
new ExtendedBlockId(1, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Assert.fail("second replica went stale, despite 1 " +
"hour staleness time.");
return null;
}
});
info.getReplica().unref();
// Clean up
for (int i = 1; i < pairs.length; i++) {
replicaInfos[i].getReplica().unref();
}
cache.close();
}
private static Configuration createShortCircuitConf(String testName,
TemporarySocketDirectory sockDir) {
Configuration conf = new Configuration();
conf.set(DFS_CLIENT_CONTEXT, testName);
conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
testName).getAbsolutePath());
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
false);
conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
DFSInputStream.tcpReadsDisabledForTesting = true;
DomainSocket.disableBindPathValidation();
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
return conf;
}
private static DomainPeer getDomainPeerToDn(Configuration conf)
throws IOException {
DomainSocket sock =
DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY));
return new DomainPeer(sock);
}
@Test(timeout=60000)
public void testAllocShm() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf("testAllocShm", sockDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
// The ClientShmManager starts off empty
Assert.assertEquals(0, info.size());
}
});
DomainPeer peer = getDomainPeerToDn(conf);
MutableBoolean usedPeer = new MutableBoolean(false);
ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
final DatanodeInfo datanode = new DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
.build();
// Allocating the first shm slot requires using up a peer.
Slot slot = cache.allocShmSlot(datanode, peer, usedPeer,
blockId, "testAllocShm_client");
Assert.assertNotNull(slot);
Assert.assertTrue(usedPeer.booleanValue());
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
// The ClientShmManager starts off empty
Assert.assertEquals(1, info.size());
PerDatanodeVisitorInfo vinfo = info.get(datanode);
Assert.assertFalse(vinfo.disabled);
Assert.assertEquals(0, vinfo.full.size());
Assert.assertEquals(1, vinfo.notFull.size());
}
});
cache.scheduleSlotReleaser(slot);
// Wait for the slot to be released, and the shared memory area to be
// closed. Since we didn't register this shared memory segment on the
// server, it will also be a test of how well the server deals with
// bogus client behavior.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final MutableBoolean done = new MutableBoolean(false);
try {
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
done.setValue(info.get(datanode).full.isEmpty() &&
info.get(datanode).notFull.isEmpty());
}
});
} catch (IOException e) {
LOG.error("error running visitor", e);
}
return done.booleanValue();
}
}, 10, 60000);
cluster.shutdown();
sockDir.close();
}
@Test(timeout=60000)
public void testShmBasedStaleness() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf("testShmBasedStaleness", sockDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
String TEST_FILE = "/test_file";
final int TEST_FILE_LEN = 8193;
final int SEED = 0xFADED;
DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
(short)1, SEED);
FSDataInputStream fis = fs.open(new Path(TEST_FILE));
int first = fis.read();
final ExtendedBlock block =
DFSTestUtil.getFirstBlock(fs, new Path(TEST_FILE));
Assert.assertTrue(first != -1);
cache.accept(new CacheVisitor() {
@Override
public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads,
LinkedMap evictable,
LinkedMap evictableMmapped) {
ShortCircuitReplica replica = replicas.get(
ExtendedBlockId.fromExtendedBlock(block));
Assert.assertNotNull(replica);
Assert.assertTrue(replica.getSlot().isValid());
}
});
// Stop the Namenode. This will close the socket keeping the client's
// shared memory segment alive, and make it stale.
cluster.getDataNodes().get(0).shutdown();
cache.accept(new CacheVisitor() {
@Override
public void visit(int numOutstandingMmaps,
Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<ExtendedBlockId, InvalidToken> failedLoads,
LinkedMap evictable,
LinkedMap evictableMmapped) {
ShortCircuitReplica replica = replicas.get(
ExtendedBlockId.fromExtendedBlock(block));
Assert.assertNotNull(replica);
Assert.assertFalse(replica.getSlot().isValid());
}
});
cluster.shutdown();
sockDir.close();
}
/**
* Test unlinking a file whose blocks we are caching in the DFSClient.
* The DataNode will notify the DFSClient that the replica is stale via the
* ShortCircuitShm.
*/
@Test(timeout=60000)
public void testUnlinkingReplicasInFileDescriptorCache() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testUnlinkingReplicasInFileDescriptorCache", sockDir);
// We don't want the CacheCleaner to time out short-circuit shared memory
// segments during the test, so set the timeout really high.
conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
// The ClientShmManager starts off empty.
Assert.assertEquals(0, info.size());
}
});
final Path TEST_PATH = new Path("/test_file");
final int TEST_FILE_LEN = 8193;
final int SEED = 0xFADE0;
DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LEN,
(short)1, SEED);
byte contents[] = DFSTestUtil.readFileBuffer(fs, TEST_PATH);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
// Loading this file brought the ShortCircuitReplica into our local
// replica cache.
final DatanodeInfo datanode = new DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
.build();
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
Assert.assertTrue(info.get(datanode).full.isEmpty());
Assert.assertFalse(info.get(datanode).disabled);
Assert.assertEquals(1, info.get(datanode).notFull.values().size());
DfsClientShm shm =
info.get(datanode).notFull.values().iterator().next();
Assert.assertFalse(shm.isDisconnected());
}
});
// Remove the file whose blocks we just read.
fs.delete(TEST_PATH, false);
// Wait for the replica to be purged from the DFSClient's cache.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
MutableBoolean done = new MutableBoolean(true);
@Override
public Boolean get() {
try {
done.setValue(true);
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo,
PerDatanodeVisitorInfo> info) throws IOException {
Assert.assertTrue(info.get(datanode).full.isEmpty());
Assert.assertFalse(info.get(datanode).disabled);
Assert.assertEquals(1,
info.get(datanode).notFull.values().size());
DfsClientShm shm = info.get(datanode).notFull.values().
iterator().next();
// Check that all slots have been invalidated.
for (Iterator<Slot> iter = shm.slotIterator();
iter.hasNext(); ) {
Slot slot = iter.next();
if (slot.isValid()) {
done.setValue(false);
}
}
}
});
} catch (IOException e) {
LOG.error("error running visitor", e);
}
return done.booleanValue();
}
}, 10, 60000);
cluster.shutdown();
sockDir.close();
}
static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
final int expectedSlots, final ShortCircuitRegistry registry)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return registry.visit(new ShortCircuitRegistry.Visitor() {
@Override
public boolean accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots) {
return (expectedSegments == segments.size()) &&
(expectedSlots == slots.size());
}
});
}
}, 100, 10000);
}
public static class TestCleanupFailureInjector
extends BlockReaderFactory.FailureInjector {
@Override
public void injectRequestFileDescriptorsFailure() throws IOException {
throw new IOException("injected I/O error");
}
}
// Regression test for HDFS-7915
@Test(timeout=60000)
public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testDataXceiverCleansUpSlotsOnFailure", sockDir);
conf.setLong(
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final Path TEST_PATH1 = new Path("/test_file1");
final Path TEST_PATH2 = new Path("/test_file2");
final int TEST_FILE_LEN = 4096;
final int SEED = 0xFADE1;
DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN,
(short)1, SEED);
DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN,
(short)1, SEED);
// The first read should allocate one shared memory segment and slot.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
// The second read should fail, and we should only have 1 segment and 1 slot
// left.
BlockReaderFactory.setFailureInjectorForTesting(
new TestCleanupFailureInjector());
try {
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t);
}
checkNumberOfSegmentsAndSlots(1, 1,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
cluster.shutdown();
sockDir.close();
}
// Regression test for HADOOP-11802
@Test(timeout=60000)
public void testDataXceiverHandlesRequestShortCircuitShmFailure()
throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final Path TEST_PATH1 = new Path("/test_file1");
DFSTestUtil.createFile(fs, TEST_PATH1, 4096,
(short)1, 0xFADE1);
LOG.info("Setting failure injector and performing a read which " +
"should fail...");
DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("injected error into sendShmResponse");
}
}).when(failureInjector).sendShortCircuitShmResponse();
DataNodeFaultInjector prevInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(failureInjector);
try {
// The first read will try to allocate a shared memory segment and slot.
// The shared memory segment allocation will fail because of the failure
// injector.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
Assert.fail("expected readFileBuffer to fail, but it succeeded.");
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t);
}
checkNumberOfSegmentsAndSlots(0, 0,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
LOG.info("Clearing failure injector and performing another read...");
DataNodeFaultInjector.set(prevInjector);
fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
// The second read should succeed.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
// We should have added a new short-circuit shared memory segment and slot.
checkNumberOfSegmentsAndSlots(1, 1,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
cluster.shutdown();
sockDir.close();
}
public static class TestPreReceiptVerificationFailureInjector
extends BlockReaderFactory.FailureInjector {
@Override
public boolean getSupportsReceiptVerification() {
return false;
}
}
// Regression test for HDFS-8070
@Test(timeout=60000)
public void testPreReceiptVerificationDfsClientCanDoScr() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testPreReceiptVerificationDfsClientCanDoScr", sockDir);
conf.setLong(
HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
BlockReaderFactory.setFailureInjectorForTesting(
new TestPreReceiptVerificationFailureInjector());
final Path TEST_PATH1 = new Path("/test_file1");
DFSTestUtil.createFile(fs, TEST_PATH1, 4096, (short)1, 0xFADE2);
final Path TEST_PATH2 = new Path("/test_file2");
DFSTestUtil.createFile(fs, TEST_PATH2, 4096, (short)1, 0xFADE2);
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
checkNumberOfSegmentsAndSlots(1, 2,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
cluster.shutdown();
sockDir.close();
}
}