blob: ce320f462bb09a73cd336db8d925d1d03f20c061 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.List;
import java.util.Random;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
/**
* Helper utilities for testing HDFS Federation.
*/
public final class FederationTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FederationTestUtils.class);
public final static String[] NAMESERVICES = {"ns0", "ns1"};
public final static String[] NAMENODES = {"nn0", "nn1", "nn2", "nn3"};
public final static String[] ROUTERS =
{"router0", "router1", "router2", "router3"};
private FederationTestUtils() {
// Utility class
}
public static void verifyException(Object obj, String methodName,
Class<? extends Exception> exceptionClass, Class<?>[] parameterTypes,
Object[] arguments) {
Throwable triggeredException = null;
try {
Method m = obj.getClass().getMethod(methodName, parameterTypes);
m.invoke(obj, arguments);
} catch (InvocationTargetException ex) {
triggeredException = ex.getTargetException();
} catch (Exception e) {
triggeredException = e;
}
if (exceptionClass != null) {
assertNotNull("No exception was triggered, expected exception"
+ exceptionClass.getName(), triggeredException);
assertEquals(exceptionClass, triggeredException.getClass());
} else {
assertNull("Exception was triggered but no exception was expected",
triggeredException);
}
}
public static NamenodeStatusReport createNamenodeReport(String ns, String nn,
HAServiceState state) {
Random rand = new Random();
NamenodeStatusReport report = new NamenodeStatusReport(ns, nn,
"localhost:" + rand.nextInt(10000), "localhost:" + rand.nextInt(10000),
"localhost:" + rand.nextInt(10000), "testwebaddress-" + ns + nn);
if (state == null) {
// Unavailable, no additional info
return report;
}
report.setHAServiceState(state);
NamespaceInfo nsInfo = new NamespaceInfo(
1, "tesclusterid", ns, 0, "testbuildvesion", "testsoftwareversion");
report.setNamespaceInfo(nsInfo);
return report;
}
/**
* Wait for a namenode to be registered with a particular state.
* @param resolver Active namenode resolver.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
* @param finalState State to check for.
* @throws Exception Failed to verify State Store registration of namenode
* nsId:nnId for state.
*/
public static void waitNamenodeRegistered(
final ActiveNamenodeResolver resolver,
final String nsId, final String nnId,
final FederationNamenodeServiceState state) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
List<? extends FederationNamenodeContext> namenodes =
resolver.getNamenodesForNameserviceId(nsId);
if (namenodes != null) {
for (FederationNamenodeContext namenode : namenodes) {
// Check if this is the Namenode we are checking
if (namenode.getNamenodeId() == nnId ||
namenode.getNamenodeId().equals(nnId)) {
return state == null || namenode.getState().equals(state);
}
}
}
} catch (IOException e) {
// Ignore
}
return false;
}
}, 1000, 20 * 1000);
}
/**
* Wait for a namenode to be registered with a particular state.
* @param resolver Active namenode resolver.
* @param nsId Nameservice identifier.
* @param state State to check for.
* @throws Exception Failed to verify State Store registration of namenode
* nsId for state.
*/
public static void waitNamenodeRegistered(
final ActiveNamenodeResolver resolver, final String nsId,
final FederationNamenodeServiceState state) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
List<? extends FederationNamenodeContext> nns =
resolver.getNamenodesForNameserviceId(nsId);
for (FederationNamenodeContext nn : nns) {
if (nn.getState().equals(state)) {
return true;
}
}
} catch (IOException e) {
// Ignore
}
return false;
}
}, 1000, 20 * 1000);
}
public static boolean verifyDate(Date d1, Date d2, long precision) {
return Math.abs(d1.getTime() - d2.getTime()) < precision;
}
public static <T> T getBean(String name, Class<T> obj)
throws MalformedObjectNameException {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName poolName = new ObjectName(name);
return JMX.newMXBeanProxy(mBeanServer, poolName, obj);
}
public static boolean addDirectory(FileSystem context, String path)
throws IOException {
context.mkdirs(new Path(path), new FsPermission("777"));
return verifyFileExists(context, path);
}
public static FileStatus getFileStatus(FileSystem context, String path)
throws IOException {
return context.getFileStatus(new Path(path));
}
public static boolean verifyFileExists(FileSystem context, String path) {
try {
FileStatus status = getFileStatus(context, path);
if (status != null) {
return true;
}
} catch (Exception e) {
return false;
}
return false;
}
public static boolean checkForFileInDirectory(
FileSystem context, String testPath, String targetFile)
throws IOException, AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IllegalArgumentException {
FileStatus[] fileStatus = context.listStatus(new Path(testPath));
String file = null;
String verifyPath = testPath + "/" + targetFile;
if (testPath.equals("/")) {
verifyPath = testPath + targetFile;
}
Boolean found = false;
for (int i = 0; i < fileStatus.length; i++) {
FileStatus f = fileStatus[i];
file = Path.getPathWithoutSchemeAndAuthority(f.getPath()).toString();
if (file.equals(verifyPath)) {
found = true;
}
}
return found;
}
public static int countContents(FileSystem context, String testPath)
throws IOException {
Path path = new Path(testPath);
FileStatus[] fileStatus = context.listStatus(path);
return fileStatus.length;
}
public static void createFile(FileSystem fs, String path, long length)
throws IOException {
FsPermission permissions = new FsPermission("700");
FSDataOutputStream writeStream = fs.create(new Path(path), permissions,
true, 1000, (short) 1, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null);
for (int i = 0; i < length; i++) {
writeStream.write(i);
}
writeStream.close();
}
public static String readFile(FileSystem fs, String path) throws IOException {
// Read the file from the filesystem via the active namenode
Path fileName = new Path(path);
InputStreamReader reader = new InputStreamReader(fs.open(fileName));
BufferedReader bufferedReader = new BufferedReader(reader);
StringBuilder data = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
data.append(line);
}
bufferedReader.close();
reader.close();
return data.toString();
}
public static boolean deleteFile(FileSystem fs, String path)
throws IOException {
return fs.delete(new Path(path), true);
}
/**
* Simulate that a Namenode is slow by adding a sleep to the check operation
* in the NN.
* @param nn Namenode to simulate slow.
* @param seconds Number of seconds to add to the Namenode.
* @throws Exception If we cannot add the sleep time.
*/
public static void simulateSlowNamenode(final NameNode nn, final int seconds)
throws Exception {
FSNamesystem namesystem = nn.getNamesystem();
HAContext haContext = namesystem.getHAContext();
HAContext spyHAContext = spy(haContext);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
LOG.info("Simulating slow namenode {}", invocation.getMock());
try {
Thread.sleep(seconds * 1000);
} catch(InterruptedException e) {
LOG.error("Simulating a slow namenode aborted");
}
return null;
}
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
}
}