blob: 336ea3913859eea8d81eec9ff8946e4360504c40 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.apache.hadoop.test.Whitebox.getInternalState;
import static org.apache.hadoop.test.Whitebox.setInternalState;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import org.slf4j.event.Level;
/**
* The RPC interface of the {@link getRouter()} implemented by
* {@link RouterRpcServer}.
*/
public class TestRouterRpcMultiDestination extends TestRouterRpc {
@Override
public void testSetup() throws Exception {
MiniRouterDFSCluster cluster = getCluster();
// Create mock locations
getCluster().installMockLocations();
List<RouterContext> routers = cluster.getRouters();
// Add extra location to the root mount / such that the root mount points:
// /
// ns0 -> /
// ns1 -> /
for (RouterContext rc : routers) {
Router router = rc.getRouter();
MockResolver resolver = (MockResolver) router.getSubclusterResolver();
resolver.addLocation("/", cluster.getNameservices().get(1), "/");
}
// Create a mount that points to 2 dirs in the same ns:
// /same
// ns0 -> /
// ns0 -> /target-ns0
for (RouterContext rc : routers) {
Router router = rc.getRouter();
MockResolver resolver = (MockResolver) router.getSubclusterResolver();
List<String> nss = cluster.getNameservices();
String ns0 = nss.get(0);
resolver.addLocation("/same", ns0, "/");
resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0));
}
// Delete all files via the NNs and verify
cluster.deleteAllFiles();
// Create test fixtures on NN
cluster.createTestDirectoriesNamenode();
// Wait to ensure NN has fully created its test directories
Thread.sleep(100);
// Pick a NS, namenode and getRouter() for this test
RouterContext router = cluster.getRandomRouter();
this.setRouter(router);
String ns = cluster.getRandomNameservice();
this.setNs(ns);
this.setNamenode(cluster.getNamenode(ns, null));
// Create a test file on a single NN that is accessed via a getRouter() path
// with 2 destinations. All tests should failover to the alternate
// destination if the wrong NN is attempted first.
Random r = new Random();
String randomString = "testfile-" + r.nextInt();
setNamenodeFile("/" + randomString);
setRouterFile("/" + randomString);
FileSystem nnFs = getNamenodeFileSystem();
FileSystem routerFs = getRouterFileSystem();
createFile(nnFs, getNamenodeFile(), 32);
verifyFileExists(nnFs, getNamenodeFile());
verifyFileExists(routerFs, getRouterFile());
}
private void testListing(String path) throws IOException {
// Collect the mount table entries for this path
Set<String> requiredPaths = new TreeSet<>();
RouterContext rc = getRouterContext();
Router router = rc.getRouter();
FileSubclusterResolver subclusterResolver = router.getSubclusterResolver();
List<String> mountList = subclusterResolver.getMountPoints(path);
if (mountList != null) {
requiredPaths.addAll(mountList);
}
// Get files/dirs from the Namenodes
PathLocation location = subclusterResolver.getDestinationForPath(path);
for (RemoteLocation loc : location.getDestinations()) {
String nsId = loc.getNameserviceId();
String dest = loc.getDest();
NamenodeContext nn = getCluster().getNamenode(nsId, null);
FileSystem fs = nn.getFileSystem();
FileStatus[] files = fs.listStatus(new Path(dest));
for (FileStatus file : files) {
String pathName = file.getPath().getName();
requiredPaths.add(pathName);
}
}
// Get files/dirs from the Router
DirectoryListing listing =
getRouterProtocol().getListing(path, HdfsFileStatus.EMPTY_NAME, false);
Iterator<String> requiredPathsIterator = requiredPaths.iterator();
// Match each path returned and verify order returned
HdfsFileStatus[] partialListing = listing.getPartialListing();
for (HdfsFileStatus fileStatus : listing.getPartialListing()) {
String fileName = requiredPathsIterator.next();
String currentFile = fileStatus.getFullPath(new Path(path)).getName();
assertEquals(currentFile, fileName);
}
// Verify the total number of results found/matched
assertEquals(
requiredPaths + " doesn't match " + Arrays.toString(partialListing),
requiredPaths.size(), partialListing.length);
}
/**
* Verify the metric ProxyOp with RemoteException.
*/
@Test
public void testProxyOpWithRemoteException() throws IOException {
final String testPath = "/proxy_op/remote_exception.txt";
final FederationRPCMetrics metrics = getRouterContext().
getRouter().getRpcServer().getRPCMetrics();
String ns1 = getCluster().getNameservices().get(1);
final FileSystem fileSystem1 = getCluster().
getNamenode(ns1, null).getFileSystem();
try {
// Create the test file in ns1.
createFile(fileSystem1, testPath, 32);
long beforeProxyOp = metrics.getProxyOps();
// First retry nn0 with remoteException then nn1.
getRouterProtocol().getBlockLocations(testPath, 0, 1);
assertEquals(2, metrics.getProxyOps() - beforeProxyOp);
} finally {
fileSystem1.delete(new Path(testPath), true);
}
}
@Override
public void testProxyListFiles() throws IOException, InterruptedException,
URISyntaxException, NoSuchMethodException, SecurityException {
// Verify that the root listing is a union of the mount table destinations
// and the files stored at all nameservices mounted at the root (ns0 + ns1)
// / -->
// /ns0 (from mount table)
// /ns1 (from mount table)
// /same (from the mount table)
// all items in / of ns0 from mapping of / -> ns0:::/)
// all items in / of ns1 from mapping of / -> ns1:::/)
testListing("/");
// Verify that the "/same" mount point lists the contents of both dirs in
// the same ns
// /same -->
// /target-ns0 (from root of ns0)
// /testdir (from contents of /target-ns0)
testListing("/same");
// List a non-existing path and validate error response with NN behavior
ClientProtocol namenodeProtocol =
getCluster().getRandomNamenode().getClient().getNamenode();
Method m = ClientProtocol.class.getMethod(
"getListing", String.class, byte[].class, boolean.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(getRouterProtocol(), namenodeProtocol, m,
new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
}
@Override
public void testProxyRenameFiles() throws IOException, InterruptedException {
super.testProxyRenameFiles();
List<String> nss = getCluster().getNameservices();
String ns0 = nss.get(0);
String ns1 = nss.get(1);
// Rename a file from ns0 into the root (mapped to both ns0 and ns1)
String testDir0 = getCluster().getFederatedTestDirectoryForNS(ns0);
String filename0 = testDir0 + "/testrename";
String renamedFile = "/testrename";
testRename(getRouterContext(), filename0, renamedFile, false);
testRename2(getRouterContext(), filename0, renamedFile, false);
// Rename a file from ns1 into the root (mapped to both ns0 and ns1)
String testDir1 = getCluster().getFederatedTestDirectoryForNS(ns1);
String filename1 = testDir1 + "/testrename";
testRename(getRouterContext(), filename1, renamedFile, false);
testRename2(getRouterContext(), filename1, renamedFile, false);
}
/**
* Verify some rpc with previous block not null.
*/
@Test
public void testPreviousBlockNotNull()
throws IOException, URISyntaxException {
final GenericTestUtils.LogCapturer stateChangeLog =
GenericTestUtils.LogCapturer.captureLogs(NameNode.stateChangeLog);
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.DEBUG);
final GenericTestUtils.LogCapturer nameNodeLog =
GenericTestUtils.LogCapturer.captureLogs(NameNode.LOG);
GenericTestUtils.setLogLevel(NameNode.LOG, Level.DEBUG);
final FederationRPCMetrics metrics = getRouterContext().
getRouter().getRpcServer().getRPCMetrics();
final ClientProtocol clientProtocol = getRouterProtocol();
final EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE,
CreateFlag.OVERWRITE);
final String clientName = getRouterContext().getClient().getClientName();
final String testPath = "/getAdditionalData/test.txt";
final String ns1 = getCluster().getNameservices().get(1);
final FileSystem fileSystem1 = getCluster().
getNamenode(ns1, null).getFileSystem();
try {
// Create the test file in NS1.
createFile(fileSystem1, testPath, 32);
// Crate the test file via Router to get file status.
HdfsFileStatus status = clientProtocol.create(
testPath, new FsPermission("777"), clientName,
new EnumSetWritable<>(createFlag), true, (short) 1,
(long) 1024, CryptoProtocolVersion.supported(), null, null);
long proxyNumCreate = metrics.getProcessingOps();
// Add a block via router and previous block is null.
LocatedBlock blockOne = clientProtocol.addBlock(
testPath, clientName, null, null,
status.getFileId(), null, null);
assertNotNull(blockOne);
long proxyNumAddBlock = metrics.getProcessingOps();
assertEquals(2, proxyNumAddBlock - proxyNumCreate);
stateChangeLog.clearOutput();
// Add a block via router and previous block is not null.
LocatedBlock blockTwo = clientProtocol.addBlock(
testPath, clientName, blockOne.getBlock(), null,
status.getFileId(), null, null);
assertNotNull(blockTwo);
long proxyNumAddBlock2 = metrics.getProcessingOps();
assertEquals(1, proxyNumAddBlock2 - proxyNumAddBlock);
assertTrue(stateChangeLog.getOutput().contains("BLOCK* getAdditionalBlock: " + testPath));
nameNodeLog.clearOutput();
// Get additionalDatanode via router and block is not null.
DatanodeInfo[] exclusions = DatanodeInfo.EMPTY_ARRAY;
LocatedBlock newBlock = clientProtocol.getAdditionalDatanode(
testPath, status.getFileId(), blockTwo.getBlock(),
blockTwo.getLocations(), blockTwo.getStorageIDs(), exclusions,
1, clientName);
assertNotNull(newBlock);
long proxyNumAdditionalDatanode = metrics.getProcessingOps();
assertEquals(1, proxyNumAdditionalDatanode - proxyNumAddBlock2);
assertTrue(nameNodeLog.getOutput().contains("getAdditionalDatanode: src=" + testPath));
stateChangeLog.clearOutput();
// Complete the file via router and last block is not null.
clientProtocol.complete(testPath, clientName,
newBlock.getBlock(), status.getFileId());
long proxyNumComplete = metrics.getProcessingOps();
assertEquals(1, proxyNumComplete - proxyNumAdditionalDatanode);
assertTrue(stateChangeLog.getOutput().contains("DIR* NameSystem.completeFile: " + testPath));
} finally {
clientProtocol.delete(testPath, true);
}
}
/**
* Test recoverLease when the result is false.
*/
@Test
public void testRecoverLease() throws Exception {
Path testPath = new Path("/recovery/test_recovery_lease");
DistributedFileSystem routerFs =
(DistributedFileSystem) getRouterFileSystem();
FSDataOutputStream fsDataOutputStream = null;
try {
fsDataOutputStream = routerFs.create(testPath);
fsDataOutputStream.write("hello world".getBytes());
fsDataOutputStream.hflush();
boolean result = routerFs.recoverLease(testPath);
assertFalse(result);
} finally {
IOUtils.closeStream(fsDataOutputStream);
routerFs.delete(testPath, true);
}
}
/**
* Test isFileClosed when the result is false.
*/
@Test
public void testIsFileClosed() throws Exception {
Path testPath = new Path("/is_file_closed.txt");
DistributedFileSystem routerFs =
(DistributedFileSystem) getRouterFileSystem();
FSDataOutputStream fsDataOutputStream = null;
try {
fsDataOutputStream = routerFs.create(testPath);
fsDataOutputStream.write("hello world".getBytes());
fsDataOutputStream.hflush();
boolean result = routerFs.isFileClosed(testPath);
assertFalse(result);
} finally {
IOUtils.closeStream(fsDataOutputStream);
routerFs.delete(testPath, true);
}
}
@Test
public void testGetContentSummaryEc() throws Exception {
DistributedFileSystem routerDFS =
(DistributedFileSystem) getRouterFileSystem();
Path dir = new Path("/");
String expectedECPolicy = "RS-6-3-1024k";
try {
routerDFS.setErasureCodingPolicy(dir, expectedECPolicy);
assertEquals(expectedECPolicy,
routerDFS.getContentSummary(dir).getErasureCodingPolicy());
} finally {
routerDFS.unsetErasureCodingPolicy(dir);
}
}
@Test
public void testSubclusterDown() throws Exception {
final int totalFiles = 6;
List<RouterContext> routers = getCluster().getRouters();
// Test the behavior when everything is fine
FileSystem fs = getRouterFileSystem();
FileStatus[] files = fs.listStatus(new Path("/"));
assertEquals(totalFiles, files.length);
// Simulate one of the subclusters is in standby
NameNode nn0 = getCluster().getNamenode("ns0", null).getNamenode();
FSNamesystem ns0 = nn0.getNamesystem();
HAContext nn0haCtx = (HAContext)getInternalState(ns0, "haContext");
HAContext mockCtx = mock(HAContext.class);
doThrow(new StandbyException("Mock")).when(mockCtx).checkOperation(any());
setInternalState(ns0, "haContext", mockCtx);
// router0 should throw an exception
RouterContext router0 = routers.get(0);
RouterRpcServer router0RPCServer = router0.getRouter().getRpcServer();
RouterClientProtocol router0ClientProtocol =
router0RPCServer.getClientProtocolModule();
setInternalState(router0ClientProtocol, "allowPartialList", false);
try {
router0.getFileSystem().listStatus(new Path("/"));
fail("I should throw an exception");
} catch (RemoteException re) {
GenericTestUtils.assertExceptionContains(
"No namenode available to invoke getListing", re);
}
// router1 should report partial results
RouterContext router1 = routers.get(1);
files = router1.getFileSystem().listStatus(new Path("/"));
assertTrue("Found " + files.length + " items, we should have less",
files.length < totalFiles);
// Restore the HA context and the Router
setInternalState(ns0, "haContext", nn0haCtx);
setInternalState(router0ClientProtocol, "allowPartialList", true);
}
@Test
public void testCallerContextWithMultiDestinations() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// set client context
CallerContext.setCurrent(
new CallerContext.Builder("clientContext").build());
// assert the initial caller context as expected
assertEquals("clientContext", CallerContext.getCurrent().getContext());
DistributedFileSystem routerFs =
(DistributedFileSystem) getRouterFileSystem();
// create a directory via the router
Path dirPath = new Path("/test_caller_context_with_multi_destinations");
routerFs.mkdirs(dirPath);
// invoke concurrently in RouterRpcClient
routerFs.listStatus(dirPath);
// invoke sequentially in RouterRpcClient
routerFs.getFileStatus(dirPath);
String auditFlag = "src=" + dirPath.toString();
String clientIpInfo = "clientIp:"
+ InetAddress.getLocalHost().getHostAddress();
for (String line : auditLog.getOutput().split("\n")) {
if (line.contains(auditFlag)) {
// assert origin caller context exist in audit log
String callerContext = line.substring(line.indexOf("callerContext="));
assertTrue(callerContext.contains("clientContext"));
// assert client ip info exist in caller context
assertTrue(callerContext.contains(clientIpInfo));
// assert client ip info appears only once in caller context
assertEquals(callerContext.indexOf(clientIpInfo),
callerContext.lastIndexOf(clientIpInfo));
}
}
// clear client context
CallerContext.setCurrent(null);
}
}