blob: a75c0d00e5cace8d90e3d6806d4dcb79ed4e3c8e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.fs;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.ServerSocket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for the hdfs fix from HBASE-6435.
*
* Please don't add new subtest which involves starting / stopping MiniDFSCluster in this class.
* When stopping MiniDFSCluster, shutdown hooks would be cleared in hadoop's ShutdownHookManager
* in hadoop 3.
* This leads to 'Failed suppression of fs shutdown hook' error in region server.
*/
@Category({MiscTests.class, LargeTests.class})
public class TestBlockReorder {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockReorder.class);
private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorder.class);
private Configuration conf;
private MiniDFSCluster cluster;
private HBaseTestingUtility htu;
private DistributedFileSystem dfs;
private static final String host1 = "host1";
private static final String host2 = "host2";
private static final String host3 = "host3";
@Rule
public TestName name = new TestName();
@Before
public void setUp() throws Exception {
htu = new HBaseTestingUtility();
htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
htu.getConfiguration().setInt("dfs.replication", 3);
htu.startMiniDFSCluster(3,
new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
conf = htu.getConfiguration();
cluster = htu.getDFSCluster();
dfs = (DistributedFileSystem) FileSystem.get(conf);
}
@After
public void tearDownAfterClass() throws Exception {
htu.shutdownMiniCluster();
}
/**
* Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
*/
@Test
public void testBlockLocationReorder() throws Exception {
Path p = new Path("hello");
Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
final int repCount = 2;
// Let's write the file
FSDataOutputStream fop = dfs.create(p, (short) repCount);
final double toWrite = 875.5613;
fop.writeDouble(toWrite);
fop.close();
// Let's check we can read it when everybody's there
long start = System.currentTimeMillis();
FSDataInputStream fin = dfs.open(p);
Assert.assertTrue(toWrite == fin.readDouble());
long end = System.currentTimeMillis();
LOG.info("readtime= " + (end - start));
fin.close();
Assert.assertTrue((end - start) < 30 * 1000);
// Let's kill the first location. But actually the fist location returned will change
// The first thing to do is to get the location, then the port
FileStatus f = dfs.getFileStatus(p);
BlockLocation[] lbs;
do {
lbs = dfs.getFileBlockLocations(f, 0, 1);
} while (lbs.length != 1 && lbs[0].getLength() != repCount);
final String name = lbs[0].getNames()[0];
Assert.assertTrue(name.indexOf(':') > 0);
String portS = name.substring(name.indexOf(':') + 1);
final int port = Integer.parseInt(portS);
LOG.info("port= " + port);
int ipcPort = -1;
// Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
// to iterate ourselves.
boolean ok = false;
final String lookup = lbs[0].getHosts()[0];
StringBuilder sb = new StringBuilder();
for (DataNode dn : cluster.getDataNodes()) {
final String dnName = getHostName(dn);
sb.append(dnName).append(' ');
if (lookup.equals(dnName)) {
ok = true;
LOG.info("killing datanode " + name + " / " + lookup);
ipcPort = dn.ipcServer.getListenerAddress().getPort();
dn.shutdown();
LOG.info("killed datanode " + name + " / " + lookup);
break;
}
}
Assert.assertTrue(
"didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
LOG.info("ipc port= " + ipcPort);
// Add the hook, with an implementation checking that we don't use the port we've just killed.
Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
new HFileSystem.ReorderBlocks() {
@Override
public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
if (lb.getLocations().length > 1) {
DatanodeInfo[] infos = lb.getLocations();
if (infos[0].getHostName().equals(lookup)) {
LOG.info("HFileSystem bad host, inverting");
DatanodeInfo tmp = infos[0];
infos[0] = infos[1];
infos[1] = tmp;
}
}
}
}
}));
final int retries = 10;
ServerSocket ss = null;
ServerSocket ssI;
try {
ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
ssI = new ServerSocket(ipcPort);
} catch (BindException be) {
LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
", this means that the datanode has not closed the socket or" +
" someone else took it. It may happen, skipping this test for this time.", be);
if (ss != null) {
ss.close();
}
return;
}
// Now it will fail with a timeout, unfortunately it does not always connect to the same box,
// so we try retries times; with the reorder it will never last more than a few milli seconds
for (int i = 0; i < retries; i++) {
start = System.currentTimeMillis();
fin = dfs.open(p);
Assert.assertTrue(toWrite == fin.readDouble());
fin.close();
end = System.currentTimeMillis();
LOG.info("HFileSystem readtime= " + (end - start));
Assert.assertFalse("We took too much time to read", (end - start) > 60000);
}
ss.close();
ssI.close();
}
/**
* Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
*/
private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
Method m;
try {
m = DataNode.class.getMethod("getDisplayName");
} catch (NoSuchMethodException e) {
try {
m = DataNode.class.getMethod("getHostName");
} catch (NoSuchMethodException e1) {
throw new RuntimeException(e1);
}
}
String res = (String) m.invoke(dn);
if (res.contains(":")) {
return res.split(":")[0];
} else {
return res;
}
}
}