blob: b0a897d9f4bb27af50efac2342c0c11bb54b3ae1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper utilities for testing HDFS Federation.
*/
public final class FederationTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FederationTestUtils.class);
public final static String[] NAMESERVICES = {"ns0", "ns1"};
public final static String[] NAMENODES = {"nn0", "nn1", "nn2", "nn3"};
public final static String[] ROUTERS =
{"router0", "router1", "router2", "router3"};
private FederationTestUtils() {
// Utility class
}
public static void verifyException(Object obj, String methodName,
Class<? extends Exception> exceptionClass, Class<?>[] parameterTypes,
Object[] arguments) {
Throwable triggeredException = null;
try {
Method m = obj.getClass().getMethod(methodName, parameterTypes);
m.invoke(obj, arguments);
} catch (InvocationTargetException ex) {
triggeredException = ex.getTargetException();
} catch (Exception e) {
triggeredException = e;
}
if (exceptionClass != null) {
assertNotNull("No exception was triggered, expected exception"
+ exceptionClass.getName(), triggeredException);
assertEquals(exceptionClass, triggeredException.getClass());
} else {
assertNull("Exception was triggered but no exception was expected",
triggeredException);
}
}
public static NamenodeStatusReport createNamenodeReport(String ns, String nn,
HAServiceState state) {
Random rand = new Random();
return createNamenodeReport(ns, nn, "localhost:"
+ rand.nextInt(10000), state);
}
public static NamenodeStatusReport createNamenodeReport(String ns, String nn,
String rpcAddress, HAServiceState state) {
Random rand = new Random();
NamenodeStatusReport report = new NamenodeStatusReport(ns, nn, rpcAddress,
"localhost:" + rand.nextInt(10000),
"localhost:" + rand.nextInt(10000), "http",
"testwebaddress-" + ns + nn);
if (state == null) {
// Unavailable, no additional info
return report;
}
report.setHAServiceState(state);
NamespaceInfo nsInfo = new NamespaceInfo(
1, "tesclusterid", ns, 0, "testbuildvesion", "testsoftwareversion");
report.setNamespaceInfo(nsInfo);
return report;
}
/**
* Wait for a namenode to be registered with a particular state.
* @param resolver Active namenode resolver.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
* @param state State to check for.
* @throws Exception Failed to verify State Store registration of namenode
* nsId:nnId for state.
*/
public static void waitNamenodeRegistered(
final ActiveNamenodeResolver resolver,
final String nsId, final String nnId,
final FederationNamenodeServiceState state) throws Exception {
GenericTestUtils.waitFor(() -> {
try {
List<? extends FederationNamenodeContext> namenodes =
resolver.getNamenodesForNameserviceId(nsId, false);
if (namenodes != null) {
for (FederationNamenodeContext namenode : namenodes) {
// Check if this is the Namenode we are checking
if (namenode.getNamenodeId() == nnId ||
namenode.getNamenodeId().equals(nnId)) {
return state == null || namenode.getState().equals(state);
}
}
}
} catch (IOException e) {
// Ignore
}
return false;
}, 1000, 60 * 1000);
}
/**
* Wait for a namenode to be registered with a particular state.
* @param resolver Active namenode resolver.
* @param nsId Nameservice identifier.
* @param state State to check for.
* @throws Exception Failed to verify State Store registration of namenode
* nsId for state.
*/
public static void waitNamenodeRegistered(
final ActiveNamenodeResolver resolver, final String nsId,
final FederationNamenodeServiceState state) throws Exception {
GenericTestUtils.waitFor(() -> {
try {
List<? extends FederationNamenodeContext> nns =
resolver.getNamenodesForNameserviceId(nsId, false);
for (FederationNamenodeContext nn : nns) {
if (nn.getState().equals(state)) {
return true;
}
}
} catch (IOException e) {
// Ignore
}
return false;
}, 1000, 20 * 1000);
}
public static boolean verifyDate(Date d1, Date d2, long precision) {
return Math.abs(d1.getTime() - d2.getTime()) < precision;
}
public static <T> T getBean(String name, Class<T> obj)
throws MalformedObjectNameException {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName poolName = new ObjectName(name);
return JMX.newMXBeanProxy(mBeanServer, poolName, obj);
}
public static boolean addDirectory(FileSystem context, String path)
throws IOException {
context.mkdirs(new Path(path), new FsPermission("777"));
return verifyFileExists(context, path);
}
public static FileStatus getFileStatus(FileSystem context, String path)
throws IOException {
return context.getFileStatus(new Path(path));
}
public static boolean verifyFileExists(FileSystem context, String path) {
try {
FileStatus status = getFileStatus(context, path);
if (status != null) {
return true;
}
} catch (Exception e) {
return false;
}
return false;
}
public static boolean checkForFileInDirectory(
FileSystem context, String testPath, String targetFile)
throws IOException, AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IllegalArgumentException {
FileStatus[] fileStatus = context.listStatus(new Path(testPath));
String file = null;
String verifyPath = testPath + "/" + targetFile;
if (testPath.equals("/")) {
verifyPath = testPath + targetFile;
}
Boolean found = false;
for (int i = 0; i < fileStatus.length; i++) {
FileStatus f = fileStatus[i];
file = Path.getPathWithoutSchemeAndAuthority(f.getPath()).toString();
if (file.equals(verifyPath)) {
found = true;
}
}
return found;
}
public static int countContents(FileSystem context, String testPath)
throws IOException {
Path path = new Path(testPath);
FileStatus[] fileStatus = context.listStatus(path);
return fileStatus.length;
}
public static void createFile(FileSystem fs, String path, long length)
throws IOException {
FsPermission permissions = new FsPermission("700");
FSDataOutputStream writeStream = fs.create(new Path(path), permissions,
true, 1000, (short) 1, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null);
for (int i = 0; i < length; i++) {
writeStream.write(i);
}
writeStream.close();
}
public static String readFile(FileSystem fs, String path) throws IOException {
// Read the file from the filesystem via the active namenode
Path fileName = new Path(path);
InputStreamReader reader = new InputStreamReader(fs.open(fileName));
BufferedReader bufferedReader = new BufferedReader(reader);
StringBuilder data = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
data.append(line);
}
bufferedReader.close();
reader.close();
return data.toString();
}
public static boolean deleteFile(FileSystem fs, String path)
throws IOException {
return fs.delete(new Path(path), true);
}
/**
* Simulate that a Namenode is slow by adding a sleep to the check operation
* in the NN.
* @param nn Namenode to simulate slow.
* @param seconds Number of seconds to add to the Namenode.
* @throws Exception If we cannot add the sleep time.
*/
public static void simulateSlowNamenode(final NameNode nn, final int seconds)
throws Exception {
FSNamesystem namesystem = nn.getNamesystem();
HAContext haContext = namesystem.getHAContext();
HAContext spyHAContext = spy(haContext);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
LOG.info("Simulating slow namenode {}", invocation.getMock());
try {
Thread.sleep(seconds * 1000);
} catch(InterruptedException e) {
LOG.error("Simulating a slow namenode aborted");
}
return null;
}
}).when(spyHAContext).checkOperation(any(OperationCategory.class));
Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
}
/**
* Wait for a number of routers to be registered in state store.
*
* @param stateManager number of routers to be registered.
* @param routerCount number of routers to be registered.
* @param timeout max wait time in ms
*/
public static void waitRouterRegistered(RouterStore stateManager,
long routerCount, int timeout) throws Exception {
GenericTestUtils.waitFor(() -> {
try {
List<RouterState> cachedRecords = stateManager.getCachedRecords();
if (cachedRecords.size() == routerCount) {
return true;
}
} catch (IOException e) {
// Ignore
}
return false;
}, 100, timeout);
}
/**
* Simulate that a RouterRpcServer, the ConnectionManager of its
* RouterRpcClient throws IOException when call getConnection. So the
* RouterRpcClient will get a null Connection.
* @param server RouterRpcServer
* @throws IOException
*/
public static void simulateThrowExceptionRouterRpcServer(
final RouterRpcServer server) throws IOException {
RouterRpcClient rpcClient = server.getRPCClient();
ConnectionManager connectionManager =
new ConnectionManager(server.getConfig());
ConnectionManager spyConnectionManager = spy(connectionManager);
doAnswer(invocation -> {
LOG.info("Simulating connectionManager throw IOException {}",
invocation.getMock());
throw new IOException("Simulate connectionManager throw IOException");
}).when(spyConnectionManager).getConnection(
any(UserGroupInformation.class), any(String.class), any(Class.class),
any(String.class));
Whitebox.setInternalState(rpcClient, "connectionManager",
spyConnectionManager);
}
/**
* Switch namenodes of all hdfs name services to standby.
* @param cluster a federated HDFS cluster
*/
public static void transitionClusterNSToStandby(
StateStoreDFSCluster cluster) {
// Name services of the cluster
List<String> nameServiceList = cluster.getNameservices();
// Change namenodes of each name service to standby
for (String nameService : nameServiceList) {
List<NamenodeContext> nnList = cluster.getNamenodes(nameService);
for(NamenodeContext namenodeContext : nnList) {
cluster.switchToStandby(nameService, namenodeContext.getNamenodeId());
}
}
}
/**
* Switch the index namenode of all hdfs name services to active.
* @param cluster a federated HDFS cluster
* @param index the index of namenodes
*/
public static void transitionClusterNSToActive(
StateStoreDFSCluster cluster, int index) {
// Name services of the cluster
List<String> nameServiceList = cluster.getNameservices();
// Change the index namenode of each name service to active
for (String nameService : nameServiceList) {
List<NamenodeContext> listNamenodeContext =
cluster.getNamenodes(nameService);
cluster.switchToActive(nameService,
listNamenodeContext.get(index).getNamenodeId());
}
}
/**
* Get the file system for HDFS in an RPC port.
* @param rpcPort RPC port.
* @return HDFS file system.
* @throws IOException If it cannot create the file system.
*/
public static FileSystem getFileSystem(int rpcPort) throws IOException {
Configuration conf = new HdfsConfiguration();
URI uri = URI.create("hdfs://localhost:" + rpcPort);
return DistributedFileSystem.get(uri, conf);
}
/**
* Get the file system for HDFS for a Router.
* @param router Router.
* @return HDFS file system.
* @throws IOException If it cannot create the file system.
*/
public static FileSystem getFileSystem(final Router router)
throws IOException {
InetSocketAddress rpcAddress = router.getRpcServerAddress();
int rpcPort = rpcAddress.getPort();
return getFileSystem(rpcPort);
}
/**
* Get the admin interface for a Router.
* @param router Router.
* @return Admin interface.
* @throws IOException If it cannot create the admin interface.
*/
public static RouterClient getAdminClient(
final Router router) throws IOException {
Configuration conf = new HdfsConfiguration();
InetSocketAddress routerSocket = router.getAdminServerAddress();
return new RouterClient(routerSocket, conf);
}
/**
* Add a mount table entry in some name services and wait until it is
* available. If there are multiple routers,
* {@link #createMountTableEntry(List, String, DestinationOrder, Collection)}
* should be used instead because the method does not refresh
* the mount tables of the other routers.
* @param router Router to change.
* @param mountPoint Name of the mount point.
* @param order Order of the mount table entry.
* @param nsIds Name service identifiers.
* @throws Exception If the entry could not be created.
*/
public static void createMountTableEntry(
final Router router,
final String mountPoint, final DestinationOrder order,
Collection<String> nsIds) throws Exception {
createMountTableEntry(
Collections.singletonList(router), mountPoint, order, nsIds);
}
/**
* Add a mount table entry in some name services and wait until it is
* available.
* @param routers List of routers.
* @param mountPoint Name of the mount point.
* @param order Order of the mount table entry.
* @param nsIds Name service identifiers.
* @throws Exception If the entry could not be created.
*/
public static void createMountTableEntry(
final List<Router> routers,
final String mountPoint,
final DestinationOrder order,
final Collection<String> nsIds) throws Exception {
Router router = routers.get(0);
RouterClient admin = getAdminClient(router);
MountTableManager mountTable = admin.getMountTableManager();
Map<String, String> destMap = new HashMap<>();
for (String nsId : nsIds) {
destMap.put(nsId, mountPoint);
}
MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
newEntry.setDestOrder(order);
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
boolean created = addResponse.getStatus();
assertTrue(created);
refreshRoutersCaches(routers);
// Check for the path
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mountPoint);
GetMountTableEntriesResponse getResponse =
mountTable.getMountTableEntries(getRequest);
List<MountTable> entries = getResponse.getEntries();
assertEquals("Too many entries: " + entries, 1, entries.size());
assertEquals(mountPoint, entries.get(0).getSourcePath());
}
/**
* Refresh the caches of a set of Routers.
* @param routers List of Routers.
*/
public static void refreshRoutersCaches(final List<Router> routers) {
for (final Router router : routers) {
StateStoreService stateStore = router.getStateStore();
stateStore.refreshCaches(true);
}
}
}