blob: b24df2bfce929f4cd4bb2e85fb81805fdc5804ca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import com.google.common.collect.HashMultiset;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.net.unix.DomainSocket;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class TestPeerCache {
static final Logger LOG = LoggerFactory.getLogger(TestPeerCache.class);
private static class FakePeer implements Peer {
private boolean closed = false;
private final boolean hasDomain;
private final DatanodeID dnId;
public FakePeer(DatanodeID dnId, boolean hasDomain) {
this.dnId = dnId;
this.hasDomain = hasDomain;
}
@Override
public ReadableByteChannel getInputStreamChannel() {
throw new UnsupportedOperationException();
}
@Override
public void setReadTimeout(int timeoutMs) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public int getReceiveBufferSize() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean getTcpNoDelay() throws IOException {
return false;
}
@Override
public void setWriteTimeout(int timeoutMs) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public String getRemoteAddressString() {
return dnId.getInfoAddr();
}
@Override
public String getLocalAddressString() {
return "127.0.0.1:123";
}
@Override
public InputStream getInputStream() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public OutputStream getOutputStream() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean isLocal() {
return true;
}
@Override
public String toString() {
return "FakePeer(dnId=" + dnId + ")";
}
@Override
public DomainSocket getDomainSocket() {
if (!hasDomain) return null;
// Return a mock which throws an exception whenever any function is
// called.
return Mockito.mock(DomainSocket.class,
new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation)
throws Throwable {
throw new RuntimeException("injected fault.");
} });
}
@Override
public boolean equals(Object o) {
if (!(o instanceof FakePeer)) return false;
FakePeer other = (FakePeer)o;
return hasDomain == other.hasDomain &&
dnId.equals(other.dnId);
}
@Override
public int hashCode() {
return dnId.hashCode() ^ (hasDomain ? 1 : 0);
}
@Override
public boolean hasSecureChannel() {
return false;
}
}
@Test
public void testAddAndRetrieve() throws Exception {
PeerCache cache = new PeerCache(3, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_datanode_id",
100, 101, 102, 103);
FakePeer peer = new FakePeer(dnId, false);
cache.put(dnId, peer);
assertTrue(!peer.isClosed());
assertEquals(1, cache.size());
assertEquals(peer, cache.get(dnId, false));
assertEquals(0, cache.size());
cache.close();
}
@Test
public void testExpiry() throws Exception {
final int CAPACITY = 3;
final int EXPIRY_PERIOD = 10;
PeerCache cache = new PeerCache(CAPACITY, EXPIRY_PERIOD);
DatanodeID dnIds[] = new DatanodeID[CAPACITY];
FakePeer peers[] = new FakePeer[CAPACITY];
for (int i = 0; i < CAPACITY; ++i) {
dnIds[i] = new DatanodeID("192.168.0.1",
"fakehostname_" + i, "fake_datanode_id",
100, 101, 102, 103);
peers[i] = new FakePeer(dnIds[i], false);
}
for (int i = 0; i < CAPACITY; ++i) {
cache.put(dnIds[i], peers[i]);
}
// Wait for the peers to expire
Thread.sleep(EXPIRY_PERIOD * 50);
assertEquals(0, cache.size());
// make sure that the peers were closed when they were expired
for (int i = 0; i < CAPACITY; ++i) {
assertTrue(peers[i].isClosed());
}
// sleep for another second and see if
// the daemon thread runs fine on empty cache
Thread.sleep(EXPIRY_PERIOD * 50);
cache.close();
}
@Test
public void testEviction() throws Exception {
final int CAPACITY = 3;
PeerCache cache = new PeerCache(CAPACITY, 100000);
DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1];
FakePeer peers[] = new FakePeer[CAPACITY + 1];
for (int i = 0; i < dnIds.length; ++i) {
dnIds[i] = new DatanodeID("192.168.0.1",
"fakehostname_" + i, "fake_datanode_id_" + i,
100, 101, 102, 103);
peers[i] = new FakePeer(dnIds[i], false);
}
for (int i = 0; i < CAPACITY; ++i) {
cache.put(dnIds[i], peers[i]);
}
// Check that the peers are cached
assertEquals(CAPACITY, cache.size());
// Add another entry and check that the first entry was evicted
cache.put(dnIds[CAPACITY], peers[CAPACITY]);
assertEquals(CAPACITY, cache.size());
assertSame(null, cache.get(dnIds[0], false));
// Make sure that the other entries are still there
for (int i = 1; i < CAPACITY; ++i) {
Peer peer = cache.get(dnIds[i], false);
assertSame(peers[i], peer);
assertTrue(!peer.isClosed());
peer.close();
}
assertEquals(1, cache.size());
cache.close();
}
@Test
public void testMultiplePeersWithSameKey() throws Exception {
final int CAPACITY = 3;
PeerCache cache = new PeerCache(CAPACITY, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_datanode_id",
100, 101, 102, 103);
HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
for (int i = 0; i < CAPACITY; ++i) {
FakePeer peer = new FakePeer(dnId, false);
peers.add(peer);
cache.put(dnId, peer);
}
// Check that all of the peers ended up in the cache
assertEquals(CAPACITY, cache.size());
while (!peers.isEmpty()) {
Peer peer = cache.get(dnId, false);
assertTrue(peer != null);
assertTrue(!peer.isClosed());
peers.remove(peer);
}
assertEquals(0, cache.size());
cache.close();
}
@Test
public void testDomainSocketPeers() throws Exception {
final int CAPACITY = 3;
PeerCache cache = new PeerCache(CAPACITY, 100000);
DatanodeID dnId = new DatanodeID("192.168.0.1",
"fakehostname", "fake_datanode_id",
100, 101, 102, 103);
HashMultiset<FakePeer> peers = HashMultiset.create(CAPACITY);
for (int i = 0; i < CAPACITY; ++i) {
FakePeer peer = new FakePeer(dnId, i == CAPACITY - 1);
peers.add(peer);
cache.put(dnId, peer);
}
// Check that all of the peers ended up in the cache
assertEquals(CAPACITY, cache.size());
// Test that get(requireDomainPeer=true) finds the peer with the
// domain socket.
Peer peer = cache.get(dnId, true);
assertTrue(peer.getDomainSocket() != null);
peers.remove(peer);
// Test that get(requireDomainPeer=true) returns null when there are
// no more peers with domain sockets.
peer = cache.get(dnId, true);
assertTrue(peer == null);
// Check that all of the other peers ended up in the cache.
while (!peers.isEmpty()) {
peer = cache.get(dnId, false);
assertTrue(peer != null);
assertTrue(!peer.isClosed());
peers.remove(peer);
}
assertEquals(0, cache.size());
cache.close();
}
}