blob: 3352fd00ccf7bbf5630907f2bb51b3f3b60aa24a [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Random;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestFavoredNodesEndToEnd {
{
GenericTestUtils.setLogLevel(LogFactory.getLog(BlockPlacementPolicy.class),
Level.ALL);
}
private static MiniDFSCluster cluster;
private static Configuration conf;
private final static int NUM_DATA_NODES = 10;
private final static int NUM_FILES = 10;
private final static byte[] SOME_BYTES = new String("foo").getBytes();
private static DistributedFileSystem dfs;
private static ArrayList<DataNode> datanodes;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
.build();
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
datanodes = cluster.getDataNodes();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Test(timeout=180000)
public void testFavoredNodesEndToEnd() throws Exception {
//create 10 files with random preferred nodes
for (int i = 0; i < NUM_FILES; i++) {
Random rand = new Random(System.currentTimeMillis() + i);
//pass a new created rand so as to get a uniform distribution each time
//without too much collisions (look at the do-while loop in getDatanodes)
InetSocketAddress datanode[] = getDatanodes(rand);
Path p = new Path("/filename"+i);
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
4096, (short)3, 4096L, null, datanode);
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);
//verify the files got created in the right nodes
for (BlockLocation loc : locations) {
String[] hosts = loc.getNames();
String[] hosts1 = getStringForInetSocketAddrs(datanode);
assertTrue(compareNodes(hosts, hosts1));
}
}
}
@Test(timeout=180000)
public void testWhenFavoredNodesNotPresent() throws Exception {
//when we ask for favored nodes but the nodes are not there, we should
//get some other nodes. In other words, the write to hdfs should not fail
//and if we do getBlockLocations on the file, we should see one blklocation
//and three hosts for that
InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3];
for (int i = 0; i < 3; i++) {
arbitraryAddrs[i] = getArbitraryLocalHostAddr();
}
Path p = new Path("/filename-foo-bar");
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
4096, (short)3, 4096L, null, arbitraryAddrs);
out.write(SOME_BYTES);
out.close();
getBlockLocations(p);
}
@Test(timeout=180000)
public void testWhenSomeNodesAreNotGood() throws Exception {
// 4 favored nodes
final InetSocketAddress addrs[] = new InetSocketAddress[4];
final String[] hosts = new String[addrs.length];
for (int i = 0; i < addrs.length; i++) {
addrs[i] = datanodes.get(i).getXferAddress();
hosts[i] = addrs[i].getAddress().getHostAddress() + ":" + addrs[i].getPort();
}
//make some datanode not "good" so that even if the client prefers it,
//the namenode would not give it as a replica to write to
DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanodeByXferAddr(
addrs[0].getAddress().getHostAddress(), addrs[0].getPort());
//set the decommission status to true so that
//BlockPlacementPolicyDefault.isGoodTarget returns false for this dn
d.setDecommissioned();
Path p = new Path("/filename-foo-bar-baz");
final short replication = (short)3;
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
4096, replication, 4096L, null, addrs);
out.write(SOME_BYTES);
out.close();
//reset the state
d.stopDecommission();
BlockLocation[] locations = getBlockLocations(p);
Assert.assertEquals(replication, locations[0].getNames().length);;
//also make sure that the datanode[0] is not in the list of hosts
for (int i = 0; i < replication; i++) {
final String loc = locations[0].getNames()[i];
int j = 0;
for(; j < hosts.length && !loc.equals(hosts[j]); j++);
Assert.assertTrue("j=" + j, j > 0);
Assert.assertTrue("loc=" + loc + " not in host list "
+ Arrays.asList(hosts) + ", j=" + j, j < hosts.length);
}
}
@Test(timeout = 180000)
public void testFavoredNodesEndToEndForAppend() throws Exception {
// create 10 files with random preferred nodes
for (int i = 0; i < NUM_FILES; i++) {
Random rand = new Random(System.currentTimeMillis() + i);
// pass a new created rand so as to get a uniform distribution each time
// without too much collisions (look at the do-while loop in getDatanodes)
InetSocketAddress datanode[] = getDatanodes(rand);
Path p = new Path("/filename" + i);
// create and close the file.
dfs.create(p, FsPermission.getDefault(), true, 4096, (short) 3, 4096L,
null, null).close();
// re-open for append
FSDataOutputStream out = dfs.append(p, EnumSet.of(CreateFlag.APPEND),
4096, null, datanode);
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);
// verify the files got created in the right nodes
for (BlockLocation loc : locations) {
String[] hosts = loc.getNames();
String[] hosts1 = getStringForInetSocketAddrs(datanode);
assertTrue(compareNodes(hosts, hosts1));
}
}
}
@Test(timeout = 180000)
public void testCreateStreamBuilderFavoredNodesEndToEnd() throws Exception {
//create 10 files with random preferred nodes
for (int i = 0; i < NUM_FILES; i++) {
Random rand = new Random(System.currentTimeMillis() + i);
//pass a new created rand so as to get a uniform distribution each time
//without too much collisions (look at the do-while loop in getDatanodes)
InetSocketAddress[] dns = getDatanodes(rand);
Path p = new Path("/filename"+i);
FSDataOutputStream out =
dfs.createFile(p).favoredNodes(dns).build();
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);
//verify the files got created in the right nodes
for (BlockLocation loc : locations) {
String[] hosts = loc.getNames();
String[] hosts1 = getStringForInetSocketAddrs(dns);
assertTrue(compareNodes(hosts, hosts1));
}
}
}
private BlockLocation[] getBlockLocations(Path p) throws Exception {
DFSTestUtil.waitReplication(dfs, p, (short)3);
BlockLocation[] locations = dfs.getClient().getBlockLocations(
p.toUri().getPath(), 0, Long.MAX_VALUE);
assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
return locations;
}
private String[] getStringForInetSocketAddrs(InetSocketAddress[] datanode) {
String strs[] = new String[datanode.length];
for (int i = 0; i < datanode.length; i++) {
strs[i] = datanode[i].getAddress().getHostAddress() + ":" +
datanode[i].getPort();
}
return strs;
}
private boolean compareNodes(String[] dnList1, String[] dnList2) {
for (int i = 0; i < dnList1.length; i++) {
boolean matched = false;
for (int j = 0; j < dnList2.length; j++) {
if (dnList1[i].equals(dnList2[j])) {
matched = true;
break;
}
}
if (matched == false) {
fail(dnList1[i] + " not a favored node");
}
}
return true;
}
private InetSocketAddress[] getDatanodes(Random rand) {
//Get some unique random indexes
int idx1 = rand.nextInt(NUM_DATA_NODES);
int idx2;
do {
idx2 = rand.nextInt(NUM_DATA_NODES);
} while (idx1 == idx2);
int idx3;
do {
idx3 = rand.nextInt(NUM_DATA_NODES);
} while (idx2 == idx3 || idx1 == idx3);
InetSocketAddress[] addrs = new InetSocketAddress[3];
addrs[0] = datanodes.get(idx1).getXferAddress();
addrs[1] = datanodes.get(idx2).getXferAddress();
addrs[2] = datanodes.get(idx3).getXferAddress();
return addrs;
}
private InetSocketAddress getArbitraryLocalHostAddr()
throws UnknownHostException{
Random rand = new Random(System.currentTimeMillis());
int port = rand.nextInt(65535);
while (true) {
boolean conflict = false;
for (DataNode d : datanodes) {
if (d.getXferAddress().getPort() == port) {
port = rand.nextInt(65535);
conflict = true;
}
}
if (conflict == false) {
break;
}
}
return new InetSocketAddress(InetAddress.getLocalHost(), port);
}
}