blob: d3d34216190872fa3a9949587dbd5f854e779f54 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST_STRING;
import static org.apache.hadoop.ipc.CallerContext.PROXY_USER_PORT;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
/**
* The the RPC interface of the {@link Router} implemented by
* {@link RouterRpcServer}.
* Tests covering the functionality of RouterRPCServer with
* multi nameServices.
*/
public class TestRouterRpc {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterRpc.class);
private static final int NUM_SUBCLUSTERS = 2;
// We need at least 6 DNs to test Erasure Coding with RS-6-3-64k
private static final int NUM_DNS = 6;
private static final Comparator<ErasureCodingPolicyInfo> EC_POLICY_CMP =
new Comparator<ErasureCodingPolicyInfo>() {
public int compare(
ErasureCodingPolicyInfo ec0,
ErasureCodingPolicyInfo ec1) {
String name0 = ec0.getPolicy().getName();
String name1 = ec1.getPolicy().getName();
return name0.compareTo(name1);
}
};
/** Federated HDFS cluster. */
private static MiniRouterDFSCluster cluster;
/** Random Router for this federated cluster. */
private RouterContext router;
/** Random nameservice in the federated cluster. */
private String ns;
/** First namenode in the nameservice. */
private NamenodeContext namenode;
/** Client interface to the Router. */
private ClientProtocol routerProtocol;
/** Client interface to the Namenode. */
private ClientProtocol nnProtocol;
/** NameNodeProtocol interface to the Router. */
private NamenodeProtocol routerNamenodeProtocol;
/** NameNodeProtocol interface to the Namenode. */
private NamenodeProtocol nnNamenodeProtocol;
private NamenodeProtocol nnNamenodeProtocol1;
/** Filesystem interface to the Router. */
private FileSystem routerFS;
/** Filesystem interface to the Namenode. */
private FileSystem nnFS;
/** File in the Router. */
private String routerFile;
/** File in the Namenode. */
private String nnFile;
@BeforeClass
public static void globalSetUp() throws Exception {
Configuration namenodeConf = new Configuration();
namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
true);
namenodeConf.set(HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY, "256");
// It's very easy to become overloaded for some specific dn in this small
// cluster, which will cause the EC file block allocation failure. To avoid
// this issue, we disable considerLoad option.
namenodeConf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false);
namenodeConf.setBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY, true);
cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
cluster.setNumDatanodesPerNameservice(NUM_DNS);
cluster.addNamenodeOverrides(namenodeConf);
cluster.setIndependentDNs();
Configuration conf = new Configuration();
// Setup proxy users.
conf.set("hadoop.proxyuser.testRealUser.groups", "*");
conf.set("hadoop.proxyuser.testRealUser.hosts", "*");
String loginUser = UserGroupInformation.getLoginUser().getUserName();
conf.set(String.format("hadoop.proxyuser.%s.groups", loginUser), "*");
conf.set(String.format("hadoop.proxyuser.%s.hosts", loginUser), "*");
// Enable IP proxy users.
conf.set(DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS, "placeholder");
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5);
cluster.addNamenodeOverrides(conf);
// Start NNs and DNs and wait until ready
cluster.startCluster();
// Start routers with only an RPC service
Configuration routerConf = new RouterConfigBuilder()
.metrics()
.rpc()
.build();
// We decrease the DN cache times to make the test faster
routerConf.setTimeDuration(
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
cluster.addRouterOverrides(routerConf);
cluster.startRouters();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
// We decrease the DN heartbeat expire interval to make them dead faster
cluster.getCluster().getNamesystem(0).getBlockManager()
.getDatanodeManager().setHeartbeatInterval(1);
cluster.getCluster().getNamesystem(1).getBlockManager()
.getDatanodeManager().setHeartbeatInterval(1);
cluster.getCluster().getNamesystem(0).getBlockManager()
.getDatanodeManager().setHeartbeatExpireInterval(3000);
cluster.getCluster().getNamesystem(1).getBlockManager()
.getDatanodeManager().setHeartbeatExpireInterval(3000);
}
@AfterClass
public static void tearDown() {
cluster.shutdown();
}
@Before
public void testSetup() throws Exception {
// Create mock locations
cluster.installMockLocations();
// Delete all files via the NNs and verify
cluster.deleteAllFiles();
// Create test fixtures on NN
cluster.createTestDirectoriesNamenode();
// Wait to ensure NN has fully created its test directories
Thread.sleep(100);
// Random router for this test
RouterContext rndRouter = cluster.getRandomRouter();
this.setRouter(rndRouter);
// Pick a namenode for this test
String ns0 = cluster.getNameservices().get(0);
this.setNs(ns0);
this.setNamenode(cluster.getNamenode(ns0, null));
// Create a test file on the NN
Random rnd = new Random();
String randomFile = "testfile-" + rnd.nextInt();
this.nnFile =
cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
this.routerFile =
cluster.getFederatedTestDirectoryForNS(ns) + "/" + randomFile;
createFile(nnFS, nnFile, 32);
verifyFileExists(nnFS, nnFile);
}
@Test
public void testRpcService() throws IOException {
Router testRouter = new Router();
List<String> nss = cluster.getNameservices();
String ns0 = nss.get(0);
Configuration routerConfig = cluster.generateRouterConfiguration(ns0, null);
RouterRpcServer server = new RouterRpcServer(routerConfig, testRouter,
testRouter.getNamenodeResolver(), testRouter.getSubclusterResolver());
server.init(routerConfig);
assertEquals(STATE.INITED, server.getServiceState());
server.start();
assertEquals(STATE.STARTED, server.getServiceState());
server.stop();
assertEquals(STATE.STOPPED, server.getServiceState());
server.close();
testRouter.close();
}
protected MiniRouterDFSCluster getCluster() {
return TestRouterRpc.cluster;
}
protected RouterContext getRouterContext() {
return this.router;
}
protected void setRouter(RouterContext r)
throws IOException, URISyntaxException {
this.router = r;
this.routerProtocol = r.getClient().getNamenode();
this.routerFS = r.getFileSystem();
this.routerNamenodeProtocol = NameNodeProxies.createProxy(router.getConf(),
router.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
}
protected FileSystem getRouterFileSystem() {
return this.routerFS;
}
protected FileSystem getNamenodeFileSystem() {
return this.nnFS;
}
protected ClientProtocol getRouterProtocol() {
return this.routerProtocol;
}
protected ClientProtocol getNamenodeProtocol() {
return this.nnProtocol;
}
protected NamenodeContext getNamenode() {
return this.namenode;
}
protected void setNamenodeFile(String filename) {
this.nnFile = filename;
}
protected String getNamenodeFile() {
return this.nnFile;
}
protected void setRouterFile(String filename) {
this.routerFile = filename;
}
protected String getRouterFile() {
return this.routerFile;
}
protected void setNamenode(NamenodeContext nn)
throws IOException, URISyntaxException {
this.namenode = nn;
this.nnProtocol = nn.getClient().getNamenode();
this.nnFS = nn.getFileSystem();
// Namenode from the default namespace
String ns0 = cluster.getNameservices().get(0);
NamenodeContext nn0 = cluster.getNamenode(ns0, null);
this.nnNamenodeProtocol = NameNodeProxies.createProxy(nn0.getConf(),
nn0.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
// Namenode from the other namespace
String ns1 = cluster.getNameservices().get(1);
NamenodeContext nn1 = cluster.getNamenode(ns1, null);
this.nnNamenodeProtocol1 = NameNodeProxies.createProxy(nn1.getConf(),
nn1.getFileSystem().getUri(), NamenodeProtocol.class).getProxy();
}
protected String getNs() {
return this.ns;
}
protected void setNs(String nameservice) {
this.ns = nameservice;
}
protected static void compareResponses(
ClientProtocol protocol1, ClientProtocol protocol2,
Method m, Object[] paramList) {
Object return1 = null;
Exception exception1 = null;
try {
return1 = m.invoke(protocol1, paramList);
} catch (Exception ex) {
exception1 = ex;
}
Object return2 = null;
Exception exception2 = null;
try {
return2 = m.invoke(protocol2, paramList);
} catch (Exception ex) {
exception2 = ex;
}
assertEquals(return1, return2);
if (exception1 == null && exception2 == null) {
return;
}
assertEquals(
exception1.getCause().getClass(),
exception2.getCause().getClass());
}
@Test
public void testProxyListFiles() throws IOException, InterruptedException,
URISyntaxException, NoSuchMethodException, SecurityException {
// Verify that the root listing is a union of the mount table destinations
// and the files stored at all nameservices mounted at the root (ns0 + ns1)
//
// / -->
// /ns0 (from mount table)
// /ns1 (from mount table)
// all items in / of ns0 (default NS)
// Collect the mount table entries from the root mount point
Set<String> requiredPaths = new TreeSet<>();
FileSubclusterResolver fileResolver =
router.getRouter().getSubclusterResolver();
for (String mount : fileResolver.getMountPoints("/")) {
requiredPaths.add(mount);
}
// Collect all files/dirs on the root path of the default NS
String defaultNs = cluster.getNameservices().get(0);
NamenodeContext nn = cluster.getNamenode(defaultNs, null);
FileStatus[] iterator = nn.getFileSystem().listStatus(new Path("/"));
for (FileStatus file : iterator) {
requiredPaths.add(file.getPath().getName());
}
// Fetch listing
DirectoryListing listing =
routerProtocol.getListing("/", HdfsFileStatus.EMPTY_NAME, false);
Iterator<String> requiredPathsIterator = requiredPaths.iterator();
// Match each path returned and verify order returned
for(HdfsFileStatus f : listing.getPartialListing()) {
String fileName = requiredPathsIterator.next();
String currentFile = f.getFullPath(new Path("/")).getName();
assertEquals(currentFile, fileName);
}
// Verify the total number of results found/matched
assertEquals(requiredPaths.size(), listing.getPartialListing().length);
// List a path that doesn't exist and validate error response with NN
// behavior.
Method m = ClientProtocol.class.getMethod(
"getListing", String.class, byte[].class, boolean.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
}
@Test
public void testProxyListFilesLargeDir() throws IOException {
// Call listStatus against a dir with many files
// Create a parent point as well as a subfolder mount
// /parent
// ns0 -> /parent
// /parent/file-7
// ns0 -> /parent/file-7
// /parent/file-0
// ns0 -> /parent/file-0
for (RouterContext rc : cluster.getRouters()) {
MockResolver resolver =
(MockResolver) rc.getRouter().getSubclusterResolver();
resolver.addLocation("/parent", ns, "/parent");
// file-0 is only in mount table
resolver.addLocation("/parent/file-0", ns, "/parent/file-0");
// file-7 is both in mount table and in file system
resolver.addLocation("/parent/file-7", ns, "/parent/file-7");
}
// Test the case when there is no subcluster path and only mount point
FileStatus[] result = routerFS.listStatus(new Path("/parent"));
assertEquals(2, result.length);
// this makes sure file[0-8] is added in order
assertEquals("file-0", result[0].getPath().getName());
assertEquals("file-7", result[1].getPath().getName());
// Create files and test full listing in order
NamenodeContext nn = cluster.getNamenode(ns, null);
FileSystem nnFileSystem = nn.getFileSystem();
for (int i = 1; i < 9; i++) {
createFile(nnFileSystem, "/parent/file-"+i, 32);
}
result = routerFS.listStatus(new Path("/parent"));
assertEquals(9, result.length);
// this makes sure file[0-8] is added in order
for (int i = 0; i < 9; i++) {
assertEquals("file-"+i, result[i].getPath().getName());
}
// Add file-9 and now this listing will be added from mount point
for (RouterContext rc : cluster.getRouters()) {
MockResolver resolver =
(MockResolver) rc.getRouter().getSubclusterResolver();
resolver.addLocation("/parent/file-9", ns, "/parent/file-9");
}
assertFalse(verifyFileExists(nnFileSystem, "/parent/file-9"));
result = routerFS.listStatus(new Path("/parent"));
// file-9 will be added by mount point
assertEquals(10, result.length);
for (int i = 0; i < 10; i++) {
assertEquals("file-"+i, result[i].getPath().getName());
}
}
@Test
public void testProxyListFilesWithConflict()
throws IOException, InterruptedException {
// Add a directory to the namespace that conflicts with a mount point
NamenodeContext nn = cluster.getNamenode(ns, null);
FileSystem nnFs = nn.getFileSystem();
addDirectory(nnFs, cluster.getFederatedTestDirectoryForNS(ns));
FileSystem routerFs = router.getFileSystem();
int initialCount = countContents(routerFs, "/");
// Root file system now for NS X:
// / ->
// /ns0 (mount table)
// /ns1 (mount table)
// /target-ns0 (the target folder for the NS0 mapped to /
// /nsX (local directory that duplicates mount table)
int newCount = countContents(routerFs, "/");
assertEquals(initialCount, newCount);
// Verify that each root path is readable and contains one test directory
assertEquals(1, countContents(routerFs, cluster.getFederatedPathForNS(ns)));
// Verify that real folder for the ns contains a single test directory
assertEquals(1, countContents(nnFs, cluster.getNamenodePathForNS(ns)));
}
protected void testRename(RouterContext testRouter, String filename,
String renamedFile, boolean exceptionExpected) throws IOException {
createFile(testRouter.getFileSystem(), filename, 32);
// verify
verifyFileExists(testRouter.getFileSystem(), filename);
// rename
boolean exceptionThrown = false;
try {
DFSClient client = testRouter.getClient();
ClientProtocol clientProtocol = client.getNamenode();
clientProtocol.rename(filename, renamedFile);
} catch (Exception ex) {
exceptionThrown = true;
}
if (exceptionExpected) {
// Error was expected
assertTrue(exceptionThrown);
FileContext fileContext = testRouter.getFileContext();
assertTrue(fileContext.delete(new Path(filename), true));
} else {
// No error was expected
assertFalse(exceptionThrown);
// verify
assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile));
// delete
FileContext fileContext = testRouter.getFileContext();
assertTrue(fileContext.delete(new Path(renamedFile), true));
}
}
protected void testRename2(RouterContext testRouter, String filename,
String renamedFile, boolean exceptionExpected) throws IOException {
createFile(testRouter.getFileSystem(), filename, 32);
// verify
verifyFileExists(testRouter.getFileSystem(), filename);
// rename
boolean exceptionThrown = false;
try {
DFSClient client = testRouter.getClient();
ClientProtocol clientProtocol = client.getNamenode();
clientProtocol.rename2(filename, renamedFile, new Options.Rename[] {});
} catch (Exception ex) {
exceptionThrown = true;
}
assertEquals(exceptionExpected, exceptionThrown);
if (exceptionExpected) {
// Error was expected
FileContext fileContext = testRouter.getFileContext();
assertTrue(fileContext.delete(new Path(filename), true));
} else {
// verify
assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile));
// delete
FileContext fileContext = testRouter.getFileContext();
assertTrue(fileContext.delete(new Path(renamedFile), true));
}
}
@Test
public void testProxyRenameFiles() throws IOException, InterruptedException {
Thread.sleep(5000);
List<String> nss = cluster.getNameservices();
String ns0 = nss.get(0);
String ns1 = nss.get(1);
// Rename within the same namespace
// /ns0/testdir/testrename -> /ns0/testdir/testrename-append
String filename =
cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename";
String renamedFile = filename + "-append";
testRename(router, filename, renamedFile, false);
testRename2(router, filename, renamedFile, false);
// Rename a file to a destination that is in a different namespace (fails)
filename = cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename";
renamedFile = cluster.getFederatedTestDirectoryForNS(ns1) + "/testrename";
testRename(router, filename, renamedFile, true);
testRename2(router, filename, renamedFile, true);
}
@Test
public void testProxyChownFiles() throws Exception {
String newUsername = "TestUser";
String newGroup = "TestGroup";
// change owner
routerProtocol.setOwner(routerFile, newUsername, newGroup);
// Verify with NN
FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile);
assertEquals(file.getOwner(), newUsername);
assertEquals(file.getGroup(), newGroup);
// Bad request and validate router response matches NN response.
Method m = ClientProtocol.class.getMethod("setOwner", String.class,
String.class, String.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, newUsername, newGroup});
}
@Test
public void testProxyGetStats() throws Exception {
// Some of the statistics are out of sync because of the mini cluster
Supplier<Boolean> check = new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
long[] combinedData = routerProtocol.getStats();
long[] individualData = getAggregateStats();
int len = Math.min(combinedData.length, individualData.length);
for (int i = 0; i < len; i++) {
if (combinedData[i] != individualData[i]) {
LOG.error("Stats for {} don't match: {} != {}",
i, combinedData[i], individualData[i]);
return false;
}
}
return true;
} catch (Exception e) {
LOG.error("Cannot get stats: {}", e.getMessage());
return false;
}
}
};
GenericTestUtils.waitFor(check, 500, 5 * 1000);
}
/**
* Get the sum of each subcluster statistics.
* @return Aggregated statistics.
* @throws Exception If it cannot get the stats from the Router or Namenode.
*/
private long[] getAggregateStats() throws Exception {
long[] individualData = new long[10];
for (String nameservice : cluster.getNameservices()) {
NamenodeContext n = cluster.getNamenode(nameservice, null);
DFSClient client = n.getClient();
ClientProtocol clientProtocol = client.getNamenode();
long[] data = clientProtocol.getStats();
for (int i = 0; i < data.length; i++) {
individualData[i] += data[i];
}
}
return individualData;
}
@Test
public void testProxyGetDatanodeReport() throws Exception {
DatanodeInfo[] combinedData =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(0, routerProtocol.getSlowDatanodeReport().length);
final Map<Integer, String> routerDNMap = new TreeMap<>();
for (DatanodeInfo dn : combinedData) {
String subcluster = dn.getNetworkLocation().split("/")[1];
routerDNMap.put(dn.getXferPort(), subcluster);
}
final Map<Integer, String> nnDNMap = new TreeMap<>();
for (String nameservice : cluster.getNameservices()) {
NamenodeContext n = cluster.getNamenode(nameservice, null);
DFSClient client = n.getClient();
ClientProtocol clientProtocol = client.getNamenode();
DatanodeInfo[] data =
clientProtocol.getDatanodeReport(DatanodeReportType.ALL);
for (int i = 0; i < data.length; i++) {
// Collect unique DNs based on their xfer port
DatanodeInfo info = data[i];
nnDNMap.put(info.getXferPort(), nameservice);
}
}
assertEquals(nnDNMap, routerDNMap);
}
@Test
public void testProxyGetDatanodeStorageReport()
throws IOException, InterruptedException, URISyntaxException {
DatanodeStorageReport[] combinedData =
routerProtocol.getDatanodeStorageReport(DatanodeReportType.ALL);
Set<String> individualData = new HashSet<>();
for (String nameservice : cluster.getNameservices()) {
NamenodeContext n = cluster.getNamenode(nameservice, null);
DFSClient client = n.getClient();
ClientProtocol clientProtocol = client.getNamenode();
DatanodeStorageReport[] data =
clientProtocol.getDatanodeStorageReport(DatanodeReportType.ALL);
for (DatanodeStorageReport report : data) {
// Determine unique DN instances
DatanodeInfo dn = report.getDatanodeInfo();
individualData.add(dn.toString());
}
}
assertEquals(combinedData.length, individualData.size());
}
@Test
public void testProxyMkdir() throws Exception {
// Check the initial folders
FileStatus[] filesInitial = routerFS.listStatus(new Path("/"));
// Create a directory via the router at the root level
String dirPath = "/testdir";
FsPermission permission = new FsPermission("705");
routerProtocol.mkdirs(dirPath, permission, false);
// Verify the root listing has the item via the router
FileStatus[] files = routerFS.listStatus(new Path("/"));
assertEquals(Arrays.toString(files) + " should be " +
Arrays.toString(filesInitial) + " + " + dirPath,
filesInitial.length + 1, files.length);
assertTrue(verifyFileExists(routerFS, dirPath));
// Verify the directory is present in only 1 Namenode
int foundCount = 0;
for (NamenodeContext n : cluster.getNamenodes()) {
if (verifyFileExists(n.getFileSystem(), dirPath)) {
foundCount++;
}
}
assertEquals(1, foundCount);
assertTrue(deleteFile(routerFS, dirPath));
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod("mkdirs", String.class,
FsPermission.class, boolean.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, permission, false});
}
@Test
public void testProxyChmodFiles() throws Exception {
FsPermission permission = new FsPermission("444");
// change permissions
routerProtocol.setPermission(routerFile, permission);
// Validate permissions NN
FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile);
assertEquals(permission, file.getPermission());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"setPermission", String.class, FsPermission.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, permission});
}
@Test
public void testProxySetReplication() throws Exception {
// Check current replication via NN
FileStatus file = getFileStatus(nnFS, nnFile);
assertEquals(1, file.getReplication());
// increment replication via router
routerProtocol.setReplication(routerFile, (short) 2);
// Verify via NN
file = getFileStatus(nnFS, nnFile);
assertEquals(2, file.getReplication());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"setReplication", String.class, short.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, (short) 2});
}
@Test
public void testProxyTruncateFile() throws Exception {
// Check file size via NN
FileStatus file = getFileStatus(nnFS, nnFile);
assertTrue(file.getLen() > 0);
// Truncate to 0 bytes via router
routerProtocol.truncate(routerFile, 0, "testclient");
// Verify via NN
file = getFileStatus(nnFS, nnFile);
assertEquals(0, file.getLen());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"truncate", String.class, long.class, String.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, (long) 0, "testclient"});
}
@Test
public void testAllowDisallowSnapshots() throws Exception {
// Create a directory via the router at the root level
String dirPath = "/testdir";
String filePath1 = "/sample";
FsPermission permission = new FsPermission("705");
routerProtocol.mkdirs(dirPath, permission, false);
createFile(routerFS, filePath1, 32);
// Check that initially doesn't allow snapshots
NamenodeContext nnContext = cluster.getNamenodes().get(0);
NameNode nn = nnContext.getNamenode();
FSNamesystem fsn = NameNodeAdapter.getNamesystem(nn);
FSDirectory fsdir = fsn.getFSDirectory();
INodeDirectory dirNode = fsdir.getINode4Write(dirPath).asDirectory();
assertFalse(dirNode.isSnapshottable());
// Allow snapshots and verify the folder allows them
routerProtocol.allowSnapshot("/testdir");
dirNode = fsdir.getINode4Write(dirPath).asDirectory();
assertTrue(dirNode.isSnapshottable());
// Disallow snapshot on dir and verify does not allow snapshots anymore
routerProtocol.disallowSnapshot("/testdir");
dirNode = fsdir.getINode4Write(dirPath).asDirectory();
assertFalse(dirNode.isSnapshottable());
// Cleanup
routerProtocol.delete(dirPath, true);
}
@Test
public void testManageSnapshot() throws Exception {
final String mountPoint = "/mntsnapshot";
final String snapshotFolder = mountPoint + "/folder";
LOG.info("Setup a mount point for snapshots: {}", mountPoint);
Router r = router.getRouter();
MockResolver resolver = (MockResolver) r.getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
resolver.addLocation(mountPoint, ns0, "/");
FsPermission permission = new FsPermission("777");
routerProtocol.mkdirs(snapshotFolder, permission, false);
try {
for (int i = 1; i <= 9; i++) {
String folderPath = snapshotFolder + "/subfolder" + i;
routerProtocol.mkdirs(folderPath, permission, false);
}
LOG.info("Create the snapshot: {}", snapshotFolder);
routerProtocol.allowSnapshot(snapshotFolder);
String snapshotName =
routerProtocol.createSnapshot(snapshotFolder, "snap");
assertEquals(snapshotFolder + "/.snapshot/snap", snapshotName);
assertTrue(
verifyFileExists(routerFS, snapshotFolder + "/.snapshot/snap"));
LOG.info("Rename the snapshot and check it changed");
routerProtocol.renameSnapshot(snapshotFolder, "snap", "newsnap");
assertFalse(
verifyFileExists(routerFS, snapshotFolder + "/.snapshot/snap"));
assertTrue(
verifyFileExists(routerFS, snapshotFolder + "/.snapshot/newsnap"));
LambdaTestUtils.intercept(SnapshotException.class,
"Cannot delete snapshot snap from path " + snapshotFolder + ":",
() -> routerFS.deleteSnapshot(new Path(snapshotFolder), "snap"));
LOG.info("Delete the snapshot and check it is not there");
routerProtocol.deleteSnapshot(snapshotFolder, "newsnap");
assertFalse(
verifyFileExists(routerFS, snapshotFolder + "/.snapshot/newsnap"));
} finally {
// Cleanup
assertTrue(routerProtocol.delete(snapshotFolder, true));
assertTrue(resolver.removeLocation(mountPoint, ns0, "/"));
}
}
@Test
public void testGetSnapshotListing() throws IOException {
// Create a directory via the router and allow snapshots
final String snapshotPath = "/testGetSnapshotListing";
final String childDir = snapshotPath + "/subdir";
FsPermission permission = new FsPermission("705");
routerProtocol.mkdirs(snapshotPath, permission, false);
routerProtocol.allowSnapshot(snapshotPath);
// Create two snapshots
final String snapshot1 = "snap1";
final String snapshot2 = "snap2";
routerProtocol.createSnapshot(snapshotPath, snapshot1);
routerProtocol.mkdirs(childDir, permission, false);
routerProtocol.createSnapshot(snapshotPath, snapshot2);
// Check for listing through the Router
SnapshottableDirectoryStatus[] dirList =
routerProtocol.getSnapshottableDirListing();
assertEquals(1, dirList.length);
SnapshottableDirectoryStatus snapshotDir0 = dirList[0];
assertEquals(snapshotPath, snapshotDir0.getFullPath().toString());
// check for snapshot listing through the Router
SnapshotStatus[] snapshots = routerProtocol.
getSnapshotListing(snapshotPath);
assertEquals(2, snapshots.length);
assertEquals(SnapshotTestHelper.getSnapshotRoot(
new Path(snapshotPath), snapshot1),
snapshots[0].getFullPath());
assertEquals(SnapshotTestHelper.getSnapshotRoot(
new Path(snapshotPath), snapshot2),
snapshots[1].getFullPath());
// Check for difference report in two snapshot
SnapshotDiffReport diffReport = routerProtocol.getSnapshotDiffReport(
snapshotPath, snapshot1, snapshot2);
assertEquals(2, diffReport.getDiffList().size());
// Check for difference in two snapshot
byte[] startPath = {};
SnapshotDiffReportListing diffReportListing =
routerProtocol.getSnapshotDiffReportListing(
snapshotPath, snapshot1, snapshot2, startPath, -1);
assertEquals(1, diffReportListing.getModifyList().size());
assertEquals(1, diffReportListing.getCreateList().size());
// Cleanup
routerProtocol.deleteSnapshot(snapshotPath, snapshot1);
routerProtocol.deleteSnapshot(snapshotPath, snapshot2);
routerProtocol.disallowSnapshot(snapshotPath);
}
@Test
public void testProxyGetBlockLocations() throws Exception {
// Fetch block locations via router
LocatedBlocks locations =
routerProtocol.getBlockLocations(routerFile, 0, 1024);
assertEquals(1, locations.getLocatedBlocks().size());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"getBlockLocations", String.class, long.class, long.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol,
m, new Object[] {badPath, (long) 0, (long) 0});
}
@Test
public void testProxyStoragePolicy() throws Exception {
// Query initial policy via NN
HdfsFileStatus status = namenode.getClient().getFileInfo(nnFile);
// Set a random policy via router
BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies();
BlockStoragePolicy policy = policies[0];
while (policy.isCopyOnCreateFile()) {
// Pick a non copy on create policy
Random rand = new Random();
int randIndex = rand.nextInt(policies.length);
policy = policies[randIndex];
}
routerProtocol.setStoragePolicy(routerFile, policy.getName());
// Verify policy via NN
HdfsFileStatus newStatus = namenode.getClient().getFileInfo(nnFile);
assertTrue(newStatus.getStoragePolicy() == policy.getId());
assertTrue(newStatus.getStoragePolicy() != status.getStoragePolicy());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod("setStoragePolicy", String.class,
String.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol,
m, new Object[] {badPath, "badpolicy"});
}
@Test
public void testProxyGetAndUnsetStoragePolicy() throws Exception {
String file = "/testGetStoragePolicy";
String nnFilePath = cluster.getNamenodeTestDirectoryForNS(ns) + file;
String routerFilePath = cluster.getFederatedTestDirectoryForNS(ns) + file;
createFile(routerFS, routerFilePath, 32);
// Get storage policy via router
BlockStoragePolicy policy = routerProtocol.getStoragePolicy(routerFilePath);
// Verify default policy is HOT
assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName());
assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId());
// Get storage policies via router
BlockStoragePolicy[] policies = routerProtocol.getStoragePolicies();
BlockStoragePolicy[] nnPolicies = namenode.getClient().getStoragePolicies();
// Verify policie returned by router is same as policies returned by NN
assertArrayEquals(nnPolicies, policies);
BlockStoragePolicy newPolicy = policies[0];
while (newPolicy.isCopyOnCreateFile()) {
// Pick a non copy on create policy. Beacuse if copyOnCreateFile is set
// then the policy cannot be changed after file creation.
Random rand = new Random();
int randIndex = rand.nextInt(policies.length);
newPolicy = policies[randIndex];
}
routerProtocol.setStoragePolicy(routerFilePath, newPolicy.getName());
// Get storage policy via router
policy = routerProtocol.getStoragePolicy(routerFilePath);
// Verify default policy
assertEquals(newPolicy.getName(), policy.getName());
assertEquals(newPolicy.getId(), policy.getId());
// Verify policy via NN
BlockStoragePolicy nnPolicy =
namenode.getClient().getStoragePolicy(nnFilePath);
assertEquals(nnPolicy.getName(), policy.getName());
assertEquals(nnPolicy.getId(), policy.getId());
// Unset storage policy via router
routerProtocol.unsetStoragePolicy(routerFilePath);
// Get storage policy
policy = routerProtocol.getStoragePolicy(routerFilePath);
assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName());
assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId());
// Verify policy via NN
nnPolicy = namenode.getClient().getStoragePolicy(nnFilePath);
assertEquals(nnPolicy.getName(), policy.getName());
assertEquals(nnPolicy.getId(), policy.getId());
}
@Test
public void testListStoragePolicies() throws IOException, URISyntaxException {
MockResolver resolver =
(MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies();
assertArrayEquals(policies, routerProtocol.getStoragePolicies());
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
assertArrayEquals(policies, routerProtocol.getStoragePolicies());
} finally {
resolver.setDisableNamespace(false);
}
}
@Test
public void testGetServerDefaults() throws IOException, URISyntaxException {
MockResolver resolver =
(MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
FsServerDefaults defaults = namenode.getClient().getServerDefaults();
assertEquals(defaults.getBlockSize(),
routerProtocol.getServerDefaults().getBlockSize());
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
assertEquals(defaults.getBlockSize(),
routerProtocol.getServerDefaults().getBlockSize());
} finally {
resolver.setDisableNamespace(false);
}
}
@Test
public void testProxyGetPreferedBlockSize() throws Exception {
// Query via NN and Router and verify
long namenodeSize = nnProtocol.getPreferredBlockSize(nnFile);
long routerSize = routerProtocol.getPreferredBlockSize(routerFile);
assertEquals(routerSize, namenodeSize);
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"getPreferredBlockSize", String.class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(
routerProtocol, nnProtocol, m, new Object[] {badPath});
}
private void testConcat(
String source, String target, boolean failureExpected) {
boolean failure = false;
try {
// Concat test file with fill block length file via router
routerProtocol.concat(target, new String[] {source});
} catch (IOException ex) {
failure = true;
}
assertEquals(failureExpected, failure);
}
@Test
public void testProxyConcatFile() throws Exception {
// Create a stub file in the primary ns
String sameNameservice = ns;
String existingFile =
cluster.getFederatedTestDirectoryForNS(sameNameservice) +
"_concatfile";
int existingFileSize = 32;
createFile(routerFS, existingFile, existingFileSize);
// Identify an alternate nameservice that doesn't match the existing file
String alternateNameservice = null;
for (String n : cluster.getNameservices()) {
if (!n.equals(sameNameservice)) {
alternateNameservice = n;
break;
}
}
// Create new files, must be a full block to use concat. One file is in the
// same namespace as the target file, the other is in a different namespace.
String altRouterFile =
cluster.getFederatedTestDirectoryForNS(alternateNameservice) +
"_newfile";
String sameRouterFile =
cluster.getFederatedTestDirectoryForNS(sameNameservice) +
"_newfile";
createFile(routerFS, altRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
createFile(routerFS, sameRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
// Concat in different namespaces, fails
testConcat(existingFile, altRouterFile, true);
// Concat in same namespaces, succeeds
testConcat(existingFile, sameRouterFile, false);
// Check target file length
FileStatus status = getFileStatus(routerFS, sameRouterFile);
assertEquals(
existingFileSize + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT,
status.getLen());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod(
"concat", String.class, String[].class);
String badPath = "/unknownlocation/unknowndir";
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, new String[] {routerFile}});
}
@Test
public void testProxyAppend() throws Exception {
// Append a test string via router
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
DFSClient routerClient = getRouterContext().getClient();
HdfsDataOutputStream stream =
routerClient.append(routerFile, 1024, createFlag, null, null);
stream.writeBytes(TEST_STRING);
stream.close();
// Verify file size via NN
FileStatus status = getFileStatus(nnFS, nnFile);
assertTrue(status.getLen() > TEST_STRING.length());
// Validate router failure response matches NN failure response.
Method m = ClientProtocol.class.getMethod("append", String.class,
String.class, EnumSetWritable.class);
String badPath = "/unknownlocation/unknowndir";
EnumSetWritable<CreateFlag> createFlagWritable =
new EnumSetWritable<CreateFlag>(createFlag);
compareResponses(routerProtocol, nnProtocol, m,
new Object[] {badPath, "testClient", createFlagWritable});
}
@Test
public void testProxyGetAdditionalDatanode()
throws IOException, InterruptedException, URISyntaxException {
// Use primitive APIs to open a file, add a block, and get datanode location
EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
String clientName = getRouterContext().getClient().getClientName();
String newRouterFile = routerFile + "_additionalDatanode";
HdfsFileStatus status = routerProtocol.create(
newRouterFile, new FsPermission("777"), clientName,
new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1,
(long) 1024, CryptoProtocolVersion.supported(), null, null);
// Add a block via router (requires client to have same lease)
LocatedBlock block = routerProtocol.addBlock(
newRouterFile, clientName, null, null,
status.getFileId(), null, null);
DatanodeInfo[] exclusions = DatanodeInfo.EMPTY_ARRAY;
LocatedBlock newBlock = routerProtocol.getAdditionalDatanode(
newRouterFile, status.getFileId(), block.getBlock(),
block.getLocations(), block.getStorageIDs(), exclusions, 1, clientName);
assertNotNull(newBlock);
}
@Test
public void testProxyCreateFileAlternateUser()
throws IOException, URISyntaxException, InterruptedException {
// Create via Router
String routerDir = cluster.getFederatedTestDirectoryForNS(ns);
String namenodeDir = cluster.getNamenodeTestDirectoryForNS(ns);
String newRouterFile = routerDir + "/unknownuser";
String newNamenodeFile = namenodeDir + "/unknownuser";
String username = "unknownuser";
// Allow all user access to dir
namenode.getFileContext().setPermission(
new Path(namenodeDir), new FsPermission("777"));
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username);
DFSClient client = getRouterContext().getClient(ugi);
client.create(newRouterFile, true);
// Fetch via NN and check user
FileStatus status = getFileStatus(nnFS, newNamenodeFile);
assertEquals(status.getOwner(), username);
}
@Test
public void testProxyGetFileInfoAcessException() throws IOException {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("unknownuser");
// List files from the NN and trap the exception
Exception nnFailure = null;
try {
String testFile = cluster.getNamenodeTestFileForNS(ns);
namenode.getClient(ugi).getLocatedBlocks(testFile, 0);
} catch (Exception e) {
nnFailure = e;
}
assertNotNull(nnFailure);
// List files from the router and trap the exception
Exception routerFailure = null;
try {
String testFile = cluster.getFederatedTestFileForNS(ns);
getRouterContext().getClient(ugi).getLocatedBlocks(testFile, 0);
} catch (Exception e) {
routerFailure = e;
}
assertNotNull(routerFailure);
assertEquals(routerFailure.getClass(), nnFailure.getClass());
}
@Test
public void testProxyVersionRequest() throws Exception {
MockResolver resolver =
(MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
NamespaceInfo rVersion = routerNamenodeProtocol.versionRequest();
NamespaceInfo nnVersion = nnNamenodeProtocol.versionRequest();
NamespaceInfo nnVersion1 = nnNamenodeProtocol1.versionRequest();
compareVersion(rVersion, nnVersion);
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
// Verify the NamespaceInfo is of nn0 or nn1
boolean isNN0 =
rVersion.getBlockPoolID().equals(nnVersion.getBlockPoolID());
compareVersion(rVersion, isNN0 ? nnVersion : nnVersion1);
} finally {
resolver.setDisableNamespace(false);
}
}
private void compareVersion(NamespaceInfo rVersion, NamespaceInfo nnVersion) {
assertEquals(nnVersion.getBlockPoolID(), rVersion.getBlockPoolID());
assertEquals(nnVersion.getNamespaceID(), rVersion.getNamespaceID());
assertEquals(nnVersion.getClusterID(), rVersion.getClusterID());
assertEquals(nnVersion.getLayoutVersion(), rVersion.getLayoutVersion());
assertEquals(nnVersion.getCTime(), rVersion.getCTime());
}
@Test
public void testProxyGetBlockKeys() throws Exception {
MockResolver resolver =
(MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
ExportedBlockKeys rKeys = routerNamenodeProtocol.getBlockKeys();
ExportedBlockKeys nnKeys = nnNamenodeProtocol.getBlockKeys();
compareBlockKeys(rKeys, nnKeys);
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
rKeys = routerNamenodeProtocol.getBlockKeys();
compareBlockKeys(rKeys, nnKeys);
} finally {
resolver.setDisableNamespace(false);
}
}
private void compareBlockKeys(ExportedBlockKeys rKeys,
ExportedBlockKeys nnKeys) {
assertEquals(nnKeys.getCurrentKey(), rKeys.getCurrentKey());
assertEquals(nnKeys.getKeyUpdateInterval(), rKeys.getKeyUpdateInterval());
assertEquals(nnKeys.getTokenLifetime(), rKeys.getTokenLifetime());
}
@Test
public void testProxyGetBlocks() throws Exception {
// Get datanodes
DatanodeInfo[] dns =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
DatanodeInfo dn0 = dns[0];
// Verify that checking that datanode works
BlocksWithLocations routerBlockLocations =
routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
BlocksWithLocations nnBlockLocations =
nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
assertEquals(nnBlocks.length, routerBlocks.length);
for (int i = 0; i < routerBlocks.length; i++) {
assertEquals(
nnBlocks[i].getBlock().getBlockId(),
routerBlocks[i].getBlock().getBlockId());
}
}
@Test
public void testProxyGetTransactionID() throws IOException {
MockResolver resolver =
(MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
long routerTransactionID = routerNamenodeProtocol.getTransactionID();
long nnTransactionID = nnNamenodeProtocol.getTransactionID();
long nnTransactionID1 = nnNamenodeProtocol1.getTransactionID();
assertEquals(nnTransactionID, routerTransactionID);
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
// Verify the transaction ID is of nn0 or nn1
routerTransactionID = routerNamenodeProtocol.getTransactionID();
assertThat(routerTransactionID).isIn(nnTransactionID, nnTransactionID1);
} finally {
resolver.setDisableNamespace(false);
}
}
@Test
public void testProxyGetMostRecentCheckpointTxId() throws IOException {
MockResolver resolver =
(MockResolver) router.getRouter().getSubclusterResolver();
try {
// Check with default namespace specified.
long routerCheckPointId =
routerNamenodeProtocol.getMostRecentCheckpointTxId();
long nnCheckPointId = nnNamenodeProtocol.getMostRecentCheckpointTxId();
assertEquals(nnCheckPointId, routerCheckPointId);
// Check with default namespace unspecified.
resolver.setDisableNamespace(true);
routerCheckPointId = routerNamenodeProtocol.getMostRecentCheckpointTxId();
} finally {
resolver.setDisableNamespace(false);
}
}
@Test
public void testProxySetSafemode() throws Exception {
boolean routerSafemode =
routerProtocol.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, false);
boolean nnSafemode =
nnProtocol.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, false);
assertEquals(nnSafemode, routerSafemode);
routerSafemode =
routerProtocol.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
nnSafemode =
nnProtocol.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true);
assertEquals(nnSafemode, routerSafemode);
assertFalse(routerProtocol.setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_GET, false));
assertTrue(routerProtocol.setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_ENTER, false));
assertTrue(routerProtocol.setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_GET, false));
assertFalse(routerProtocol.setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false));
assertFalse(routerProtocol.setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_GET, false));
}
@Test
public void testProxyRestoreFailedStorage() throws Exception {
boolean routerSuccess = routerProtocol.restoreFailedStorage("check");
boolean nnSuccess = nnProtocol.restoreFailedStorage("check");
assertEquals(nnSuccess, routerSuccess);
}
private void testRenewLeaseInternal(DistributedFileSystem dfs,
FederationRPCMetrics rpcMetrics, Path testPath, boolean createFlag)
throws Exception {
FSDataOutputStream outputStream = null;
try {
if (createFlag) {
outputStream = dfs.create(testPath);
} else {
outputStream = dfs.append(testPath);
}
outputStream.write("hello world. \n".getBytes());
long proxyOpBeforeRenewLease = rpcMetrics.getProxyOps();
assertTrue(dfs.getClient().renewLease());
long proxyOpAfterRenewLease = rpcMetrics.getProxyOps();
assertEquals((proxyOpBeforeRenewLease + 1), proxyOpAfterRenewLease);
} finally {
if (outputStream != null) {
outputStream.close();
}
}
}
@Test
public void testRenewLeaseForECFile() throws Exception {
String ecName = "RS-6-3-1024k";
FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
// Install a mount point to a different path to check
MockResolver resolver =
(MockResolver)router.getRouter().getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
// Stop LeaseRenewer
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
Path testECPath = new Path("/testRenewLease0/ecDirectory/test_ec.txt");
routerDFS.mkdirs(testECPath.getParent());
routerDFS.setErasureCodingPolicy(
testECPath.getParent(), ecName);
testRenewLeaseInternal(routerDFS, metrics, testECPath, true);
ErasureCodingPolicy ecPolicy = routerDFS.getErasureCodingPolicy(testECPath);
assertNotNull(ecPolicy);
assertEquals(ecName, ecPolicy.getName());
}
@Test
public void testRenewLeaseForReplicaFile() throws Exception {
FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
// Install a mount point to a different path to check
MockResolver resolver =
(MockResolver)router.getRouter().getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
// Stop LeaseRenewer
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
// Test Replica File
Path testPath = new Path("/testRenewLease0/test_replica.txt");
testRenewLeaseInternal(routerDFS, metrics, testPath, true);
testRenewLeaseInternal(routerDFS, metrics, testPath, false);
}
@Test
public void testRenewLeaseWithMultiStream() throws Exception {
FederationRPCMetrics metrics = router.getRouterRpcServer().getRPCMetrics();
// Install a mount point to a different path to check
MockResolver resolver =
(MockResolver)router.getRouter().getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
String ns1 = cluster.getNameservices().get(1);
resolver.addLocation("/testRenewLease0", ns0, "/testRenewLease0");
resolver.addLocation("/testRenewLease1", ns1, "/testRenewLease1");
// Stop LeaseRenewer
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
routerDFS.getClient().getLeaseRenewer().interruptAndJoin();
Path newTestPath0 = new Path("/testRenewLease0/test1.txt");
Path newTestPath1 = new Path("/testRenewLease1/test1.txt");
try (FSDataOutputStream outStream1 = routerDFS.create(newTestPath0);
FSDataOutputStream outStream2 = routerDFS.create(newTestPath1)) {
outStream1.write("hello world \n".getBytes());
outStream2.write("hello world \n".getBytes());
long proxyOpBeforeRenewLease2 = metrics.getProxyOps();
assertTrue(routerDFS.getClient().renewLease());
long proxyOpAfterRenewLease2 = metrics.getProxyOps();
assertEquals((proxyOpBeforeRenewLease2 + 2), proxyOpAfterRenewLease2);
}
}
@Test
public void testMkdirWithDisableNameService() throws Exception {
MockResolver resolver = (MockResolver)router.getRouter().getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
resolver.addLocation("/mnt", ns0, "/");
MockResolver activeNamenodeResolver = (MockResolver)router.getRouter().getNamenodeResolver();
activeNamenodeResolver.disableNamespace(ns0);
try {
FsPermission permission = new FsPermission("777");
RouterRpcServer rpcServer = router.getRouter().getRpcServer();
LambdaTestUtils.intercept(NoLocationException.class,
() -> rpcServer.mkdirs("/mnt/folder0/folder1", permission, true));
} finally {
activeNamenodeResolver.clearDisableNamespaces();
}
}
@Test
public void testProxyExceptionMessages() throws IOException {
// Install a mount point to a different path to check
MockResolver resolver =
(MockResolver)router.getRouter().getSubclusterResolver();
String ns0 = cluster.getNameservices().get(0);
resolver.addLocation("/mnt", ns0, "/");
try {
FsPermission permission = new FsPermission("777");
routerProtocol.mkdirs("/mnt/folder0/folder1", permission, false);
fail("mkdirs for non-existing parent folder should have failed");
} catch (IOException ioe) {
assertExceptionContains("/mnt/folder0", ioe,
"Wrong path in exception for mkdirs");
}
try {
FsPermission permission = new FsPermission("777");
routerProtocol.setPermission("/mnt/testfile.txt", permission);
fail("setPermission for non-existing file should have failed");
} catch (IOException ioe) {
assertExceptionContains("/mnt/testfile.txt", ioe,
"Wrong path in exception for setPermission");
}
try {
FsPermission permission = new FsPermission("777");
routerProtocol.mkdirs("/mnt/folder0/folder1", permission, false);
routerProtocol.delete("/mnt/folder0", false);
fail("delete for non-existing file should have failed");
} catch (IOException ioe) {
assertExceptionContains("/mnt/folder0", ioe,
"Wrong path in exception for delete");
}
resolver.cleanRegistrations();
// Check corner cases
assertEquals(
"Parent directory doesn't exist: /ns1/a/a/b",
RouterRpcClient.processExceptionMsg(
"Parent directory doesn't exist: /a/a/b", "/a", "/ns1/a"));
}
/**
* Create a file for each NameSpace, then find their 1st block and mark one of
* the replica as corrupt through BlockManager#findAndMarkBlockAsCorrupt.
*
* After all NameNode received the corrupt replica report, the
* replicatedBlockStats.getCorruptBlocks() should equal to the sum of
* corruptBlocks of all NameSpaces.
*/
@Test
public void testGetReplicatedBlockStats() throws Exception {
String testFile = "/test-file";
for (String nsid : cluster.getNameservices()) {
NamenodeContext context = cluster.getNamenode(nsid, null);
NameNode nameNode = context.getNamenode();
FSNamesystem namesystem = nameNode.getNamesystem();
BlockManager bm = namesystem.getBlockManager();
FileSystem fileSystem = context.getFileSystem();
// create a test file
createFile(fileSystem, testFile, 1024);
// mark a replica as corrupt
LocatedBlock block = NameNodeAdapter
.getBlockLocations(nameNode, testFile, 0, 1024).get(0);
namesystem.writeLock();
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"STORAGE_ID", "TEST");
namesystem.writeUnlock();
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.waitCorruptReplicas(fileSystem, namesystem,
new Path(testFile), block.getBlock(), 1);
// save the getReplicatedBlockStats result
ReplicatedBlockStats stats =
context.getClient().getNamenode().getReplicatedBlockStats();
assertEquals(1, stats.getCorruptBlocks());
}
ReplicatedBlockStats routerStat = routerProtocol.getReplicatedBlockStats();
assertEquals("There should be 1 corrupt blocks for each NN",
cluster.getNameservices().size(), routerStat.getCorruptBlocks());
}
@Test
public void testErasureCoding() throws Exception {
LOG.info("List the available erasurce coding policies");
ErasureCodingPolicyInfo[] policies = checkErasureCodingPolicies();
for (ErasureCodingPolicyInfo policy : policies) {
LOG.info(" {}", policy);
}
LOG.info("List the erasure coding codecs");
Map<String, String> codecsRouter = routerProtocol.getErasureCodingCodecs();
Map<String, String> codecsNamenode = nnProtocol.getErasureCodingCodecs();
assertTrue(Maps.difference(codecsRouter, codecsNamenode).areEqual());
for (Entry<String, String> entry : codecsRouter.entrySet()) {
LOG.info(" {}: {}", entry.getKey(), entry.getValue());
}
LOG.info("Create a testing directory via the router at the root level");
String dirPath = "/testec";
String filePath1 = dirPath + "/testfile1";
FsPermission permission = new FsPermission("755");
routerProtocol.mkdirs(dirPath, permission, false);
createFile(routerFS, filePath1, 32);
assertTrue(verifyFileExists(routerFS, filePath1));
DFSClient file1Protocol = getFileDFSClient(filePath1);
LOG.info("The policy for the new file should not be set");
assertNull(routerProtocol.getErasureCodingPolicy(filePath1));
assertNull(file1Protocol.getErasureCodingPolicy(filePath1));
String policyName = "RS-6-3-1024k";
LOG.info("Set policy \"{}\" for \"{}\"", policyName, dirPath);
routerProtocol.setErasureCodingPolicy(dirPath, policyName);
String filePath2 = dirPath + "/testfile2";
LOG.info("Create {} in the path with the new EC policy", filePath2);
createFile(routerFS, filePath2, 32);
assertTrue(verifyFileExists(routerFS, filePath2));
DFSClient file2Protocol = getFileDFSClient(filePath2);
LOG.info("Check that the policy is set for {}", filePath2);
ErasureCodingPolicy policyRouter1 =
routerProtocol.getErasureCodingPolicy(filePath2);
ErasureCodingPolicy policyNamenode1 =
file2Protocol.getErasureCodingPolicy(filePath2);
assertNotNull(policyRouter1);
assertEquals(policyName, policyRouter1.getName());
assertEquals(policyName, policyNamenode1.getName());
LOG.info("Create a new erasure coding policy");
String newPolicyName = "RS-6-3-128k";
ECSchema ecSchema = new ECSchema(ErasureCodeConstants.RS_CODEC_NAME, 6, 3);
ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(
newPolicyName,
ecSchema,
128 * 1024,
(byte) -1);
ErasureCodingPolicy[] newPolicies = new ErasureCodingPolicy[] {
ecPolicy
};
AddErasureCodingPolicyResponse[] responses =
routerProtocol.addErasureCodingPolicies(newPolicies);
assertEquals(1, responses.length);
assertTrue(responses[0].isSucceed());
routerProtocol.disableErasureCodingPolicy(newPolicyName);
LOG.info("The new policy should be there and disabled");
policies = checkErasureCodingPolicies();
boolean found = false;
for (ErasureCodingPolicyInfo policy : policies) {
LOG.info(" {}" + policy);
if (policy.getPolicy().getName().equals(newPolicyName)) {
found = true;
assertEquals(ErasureCodingPolicyState.DISABLED, policy.getState());
break;
}
}
assertTrue(found);
LOG.info("Set the test folder to use the new policy");
routerProtocol.enableErasureCodingPolicy(newPolicyName);
routerProtocol.setErasureCodingPolicy(dirPath, newPolicyName);
LOG.info("Create a file in the path with the new EC policy");
String filePath3 = dirPath + "/testfile3";
createFile(routerFS, filePath3, 32);
assertTrue(verifyFileExists(routerFS, filePath3));
DFSClient file3Protocol = getFileDFSClient(filePath3);
ErasureCodingPolicy policyRouterFile3 =
routerProtocol.getErasureCodingPolicy(filePath3);
assertEquals(newPolicyName, policyRouterFile3.getName());
ErasureCodingPolicy policyNamenodeFile3 =
file3Protocol.getErasureCodingPolicy(filePath3);
assertEquals(newPolicyName, policyNamenodeFile3.getName());
LOG.info("Remove the policy and check the one for the test folder");
routerProtocol.removeErasureCodingPolicy(newPolicyName);
ErasureCodingPolicy policyRouter3 =
routerProtocol.getErasureCodingPolicy(filePath3);
assertEquals(newPolicyName, policyRouter3.getName());
ErasureCodingPolicy policyNamenode3 =
file3Protocol.getErasureCodingPolicy(filePath3);
assertEquals(newPolicyName, policyNamenode3.getName());
LOG.info("Check the stats");
ECBlockGroupStats statsRouter = routerProtocol.getECBlockGroupStats();
ECBlockGroupStats statsNamenode = getNamenodeECBlockGroupStats();
assertEquals(statsNamenode, statsRouter);
}
/**
* Get the EC stats from all namenodes and aggregate them.
* @return Aggregated EC stats from all namenodes.
* @throws Exception If we cannot get the stats.
*/
private ECBlockGroupStats getNamenodeECBlockGroupStats() throws Exception {
List<ECBlockGroupStats> nnStats = new ArrayList<>();
for (NamenodeContext nnContext : cluster.getNamenodes()) {
ClientProtocol cp = nnContext.getClient().getNamenode();
nnStats.add(cp.getECBlockGroupStats());
}
return ECBlockGroupStats.merge(nnStats);
}
@Test
public void testGetCurrentTXIDandRollEdits() throws IOException {
Long rollEdits = routerProtocol.rollEdits();
Long currentTXID = routerProtocol.getCurrentEditLogTxid();
assertEquals(rollEdits, currentTXID);
}
@Test
public void testSaveNamespace() throws IOException {
cluster.getCluster().getFileSystem(0)
.setSafeMode(SafeModeAction.ENTER);
cluster.getCluster().getFileSystem(1)
.setSafeMode(SafeModeAction.ENTER);
Boolean saveNamespace = routerProtocol.saveNamespace(0, 0);
assertTrue(saveNamespace);
cluster.getCluster().getFileSystem(0)
.setSafeMode(SafeModeAction.LEAVE);
cluster.getCluster().getFileSystem(1)
.setSafeMode(SafeModeAction.LEAVE);
}
/*
* This case is used to test NameNodeMetrics on 2 purposes:
* 1. NameNodeMetrics should be cached, since the cost of gathering the
* metrics is expensive
* 2. Metrics cache should updated regularly
* 3. Without any subcluster available, we should return an empty list
*/
@Test
public void testNamenodeMetrics() throws Exception {
final NamenodeBeanMetrics metrics =
router.getRouter().getNamenodeMetrics();
final String jsonString0 = metrics.getLiveNodes();
// We should have the nodes in all the subclusters
JSONObject jsonObject = new JSONObject(jsonString0);
assertEquals(NUM_SUBCLUSTERS * NUM_DNS, jsonObject.names().length());
// We should be caching this information
String jsonString1 = metrics.getLiveNodes();
assertEquals(jsonString0, jsonString1);
// We wait until the cached value is updated
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !jsonString0.equals(metrics.getLiveNodes());
}
}, 500, 5 * 1000);
// The cache should be updated now
final String jsonString2 = metrics.getLiveNodes();
assertNotEquals(jsonString0, jsonString2);
// Without any subcluster available, we should return an empty list
MockResolver resolver =
(MockResolver) router.getRouter().getNamenodeResolver();
resolver.cleanRegistrations();
resolver.setDisableRegistration(true);
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !jsonString2.equals(metrics.getLiveNodes());
}
}, 500, 5 * 1000);
assertEquals("{}", metrics.getLiveNodes());
} finally {
// Reset the registrations again
resolver.setDisableRegistration(false);
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
}
}
@Test
public void testRBFMetricsMethodsRelayOnStateStore() {
assertNull(router.getRouter().getStateStore());
RBFMetrics metrics = router.getRouter().getMetrics();
assertEquals("{}", metrics.getNamenodes());
assertEquals("[]", metrics.getMountTable());
assertEquals("{}", metrics.getRouters());
assertEquals(0, metrics.getNumNamenodes());
assertEquals(0, metrics.getNumExpiredNamenodes());
// These 2 methods relays on {@link RBFMetrics#getNamespaceInfo()}
assertEquals("[]", metrics.getClusterId());
assertEquals("[]", metrics.getBlockPoolId());
// These methods relays on
// {@link RBFMetrics#getActiveNamenodeRegistration()}
assertEquals("{}", metrics.getNameservices());
assertEquals(0, metrics.getNumLiveNodes());
}
@Test
public void testNamenodeMetricsEnteringMaintenanceNodes() throws IOException {
final NamenodeBeanMetrics metrics =
router.getRouter().getNamenodeMetrics();
assertEquals("{}", metrics.getEnteringMaintenanceNodes());
}
@Test
public void testCacheAdmin() throws Exception {
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
// Verify cache directive commands.
CachePoolInfo cpInfo = new CachePoolInfo("Check");
cpInfo.setOwnerName("Owner");
// Add a cache pool.
routerProtocol.addCachePool(cpInfo);
RemoteIterator<CachePoolEntry> iter = routerDFS.listCachePools();
assertTrue(iter.hasNext());
// Modify a cache pool.
CachePoolInfo info = iter.next().getInfo();
assertEquals("Owner", info.getOwnerName());
cpInfo.setOwnerName("new Owner");
routerProtocol.modifyCachePool(cpInfo);
iter = routerDFS.listCachePools();
assertTrue(iter.hasNext());
info = iter.next().getInfo();
assertEquals("new Owner", info.getOwnerName());
// Remove a cache pool.
routerProtocol.removeCachePool("Check");
iter = routerDFS.listCachePools();
assertFalse(iter.hasNext());
// Verify cache directive commands.
cpInfo.setOwnerName("Owner");
routerProtocol.addCachePool(cpInfo);
routerDFS.mkdirs(new Path("/ns1/dir"));
// Add a cache directive.
CacheDirectiveInfo cacheDir = new CacheDirectiveInfo.Builder()
.setPath(new Path("/ns1/dir"))
.setReplication((short) 1)
.setPool("Check")
.build();
long id = routerDFS.addCacheDirective(cacheDir);
CacheDirectiveInfo filter =
new CacheDirectiveInfo.Builder().setPath(new Path("/ns1/dir")).build();
assertTrue(routerDFS.listCacheDirectives(filter).hasNext());
// List cache directive.
assertEquals("Check",
routerDFS.listCacheDirectives(filter).next().getInfo().getPool());
cacheDir = new CacheDirectiveInfo.Builder().setReplication((short) 2)
.setId(id).setPath(new Path("/ns1/dir")).build();
// Modify cache directive.
routerDFS.modifyCacheDirective(cacheDir);
assertEquals((short) 2, (short) routerDFS.listCacheDirectives(filter).next()
.getInfo().getReplication());
routerDFS.removeCacheDirective(id);
assertFalse(routerDFS.listCacheDirectives(filter).hasNext());
}
@Test
public void testgetGroupsForUser() throws IOException {
String[] group = new String[] {"bar", "group2"};
UserGroupInformation.createUserForTesting("user",
new String[] {"bar", "group2"});
String[] result =
router.getRouter().getRpcServer().getGroupsForUser("user");
assertArrayEquals(group, result);
}
@Test
public void testGetCachedDatanodeReport() throws Exception {
RouterRpcServer rpcServer = router.getRouter().getRpcServer();
final DatanodeInfo[] datanodeReport =
rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
// We should have 12 nodes in total
assertEquals(12, datanodeReport.length);
// We should be caching this information
DatanodeInfo[] datanodeReport1 =
rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
assertArrayEquals(datanodeReport1, datanodeReport);
// Stop one datanode
MiniDFSCluster miniDFSCluster = getCluster().getCluster();
DataNodeProperties dnprop = miniDFSCluster.stopDataNode(0);
// We wait until the cached value is updated
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
DatanodeInfo[] dn = null;
try {
dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException ex) {
LOG.error("Error on getCachedDatanodeReport");
}
return !Arrays.equals(datanodeReport, dn);
}
}, 500, 5 * 1000);
// The cache should be updated now
final DatanodeInfo[] datanodeReport2 =
rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
assertEquals(datanodeReport.length - 1, datanodeReport2.length);
// Restart the DN we just stopped
miniDFSCluster.restartDataNode(dnprop);
miniDFSCluster.waitActive();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
DatanodeInfo[] dn = null;
try {
dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException ex) {
LOG.error("Error on getCachedDatanodeReport");
}
return datanodeReport.length == dn.length;
}
}, 100, 10 * 1000);
// The cache should be updated now
final DatanodeInfo[] datanodeReport3 =
rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
assertEquals(datanodeReport.length, datanodeReport3.length);
}
/**
* Check the erasure coding policies in the Router and the Namenode.
* @return The erasure coding policies.
*/
private ErasureCodingPolicyInfo[] checkErasureCodingPolicies()
throws IOException {
ErasureCodingPolicyInfo[] policiesRouter =
routerProtocol.getErasureCodingPolicies();
assertNotNull(policiesRouter);
ErasureCodingPolicyInfo[] policiesNamenode =
nnProtocol.getErasureCodingPolicies();
Arrays.sort(policiesRouter, EC_POLICY_CMP);
Arrays.sort(policiesNamenode, EC_POLICY_CMP);
assertArrayEquals(policiesRouter, policiesNamenode);
return policiesRouter;
}
/**
* Find the Namenode for a particular file and return the DFSClient.
* @param path Path of the file to check.
* @return The DFSClient to the Namenode holding the file.
*/
private DFSClient getFileDFSClient(final String path) {
for (String nsId : cluster.getNameservices()) {
LOG.info("Checking {} for {}", nsId, path);
NamenodeContext nn = cluster.getNamenode(nsId, null);
try {
DFSClient nnClientProtocol = nn.getClient();
if (nnClientProtocol.getFileInfo(path) != null) {
return nnClientProtocol;
}
} catch (Exception ignore) {
// ignore
}
}
return null;
}
@Test
public void testMkdirsWithCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditlog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// Current callerContext is null
assertNull(CallerContext.getCurrent());
// Set client context
CallerContext.setCurrent(
new CallerContext.Builder("clientContext").build());
// Create a directory via the router
String dirPath = "/test_dir_with_callercontext";
FsPermission permission = new FsPermission("755");
routerProtocol.mkdirs(dirPath, permission, false);
// The audit log should contains "callerContext=clientIp:...,clientContext"
final String logOutput = auditlog.getOutput();
assertTrue(logOutput.contains("callerContext=clientIp:"));
assertTrue(logOutput.contains(",clientContext"));
assertTrue(logOutput.contains(",clientId"));
assertTrue(logOutput.contains(",clientCallId"));
assertTrue(verifyFileExists(routerFS, dirPath));
}
@Test
public void testRealUserPropagationInCallerContext()
throws IOException, InterruptedException {
GenericTestUtils.LogCapturer auditlog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// Current callerContext is null
assertNull(CallerContext.getCurrent());
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
UserGroupInformation realUser = UserGroupInformation
.createUserForTesting("testRealUser", new String[]{"group"});
UserGroupInformation proxyUser = UserGroupInformation
.createProxyUser("testProxyUser", realUser);
FileSystem proxyFs = proxyUser.doAs(
(PrivilegedExceptionAction<FileSystem>) () -> router.getFileSystem());
proxyFs.listStatus(new Path("/"));
final String logOutput = auditlog.getOutput();
// Login user, which is used as the router's user, is different from the realUser.
assertNotEquals(loginUser.getUserName(), realUser.getUserName());
// Login user is used in the audit log's ugi field.
assertTrue("The login user is the proxyUser in the UGI field",
logOutput.contains(String.format("ugi=%s (auth:PROXY) via %s (auth:SIMPLE)",
proxyUser.getUserName(),
loginUser.getUserName())));
// Real user is added to the caller context.
assertTrue("The audit log should contain the real user.",
logOutput.contains(String.format("realUser:%s", realUser.getUserName())));
assertTrue("The audit log should contain the proxyuser port.",
logOutput.contains(PROXY_USER_PORT));
}
@Test
public void testSetBalancerBandwidth() throws Exception {
long defaultBandwidth =
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT;
long newBandwidth = defaultBandwidth * 2;
routerProtocol.setBalancerBandwidth(newBandwidth);
ArrayList<DataNode> datanodes = cluster.getCluster().getDataNodes();
GenericTestUtils.waitFor(() -> {
return datanodes.get(0).getBalancerBandwidth() == newBandwidth;
}, 100, 60 * 1000);
}
@Test
public void testAddClientIpPortToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// 1. ClientIp and ClientPort are not set on the client.
// Set client context.
CallerContext.setCurrent(
new CallerContext.Builder("clientContext").build());
// Create a directory via the router.
String dirPath = "/test";
routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
// The audit log should contains "clientIp:" and "clientPort:".
assertTrue(auditLog.getOutput().contains("clientIp:"));
assertTrue(auditLog.getOutput().contains("clientPort:"));
assertTrue(verifyFileExists(routerFS, dirPath));
auditLog.clearOutput();
// 2. ClientIp and ClientPort are set on the client.
// Reset client context.
CallerContext.setCurrent(
new CallerContext.Builder(
"clientContext,clientIp:1.1.1.1,clientPort:1234").build());
// Create a directory via the router.
routerProtocol.getFileInfo(dirPath);
// The audit log should not contain the original clientIp and clientPort
// set by client.
assertFalse(auditLog.getOutput().contains("clientIp:1.1.1.1"));
assertFalse(auditLog.getOutput().contains("clientPort:1234"));
}
@Test
public void testAddClientIdAndCallIdToCallerContext() throws IOException {
GenericTestUtils.LogCapturer auditLog =
GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
// 1. ClientId and ClientCallId are not set on the client.
// Set client context.
CallerContext.setCurrent(
new CallerContext.Builder("clientContext").build());
// Create a directory via the router.
String dirPath = "/test";
routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
// The audit log should contains "clientId:" and "clientCallId:".
assertTrue(auditLog.getOutput().contains("clientId:"));
assertTrue(auditLog.getOutput().contains("clientCallId:"));
assertTrue(verifyFileExists(routerFS, dirPath));
auditLog.clearOutput();
// 2. ClientId and ClientCallId are set on the client.
// Reset client context.
CallerContext.setCurrent(
new CallerContext.Builder(
"clientContext,clientId:mockClientId,clientCallId:4321").build());
// Create a directory via the router.
routerProtocol.getFileInfo(dirPath);
// The audit log should not contain the original clientId and clientCallId
// set by client.
assertFalse(auditLog.getOutput().contains("clientId:mockClientId"));
assertFalse(auditLog.getOutput().contains("clientCallId:4321"));
}
@Test
public void testContentSummaryWithSnapshot() throws Exception {
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
Path dirPath = new Path("/testdir");
Path subdirPath = new Path(dirPath, "subdir");
Path filePath1 = new Path(dirPath, "file");
Path filePath2 = new Path(subdirPath, "file2");
// Create directories.
routerDFS.mkdirs(dirPath);
routerDFS.mkdirs(subdirPath);
// Create files.
createFile(routerDFS, filePath1.toString(), 32);
createFile(routerDFS, filePath2.toString(), 16);
// Allow & Create snapshot.
routerDFS.allowSnapshot(dirPath);
routerDFS.createSnapshot(dirPath, "s1");
try {
// Check content summary, snapshot count should be 0
ContentSummary contentSummary = routerDFS.getContentSummary(dirPath);
assertEquals(0, contentSummary.getSnapshotDirectoryCount());
assertEquals(0, contentSummary.getSnapshotFileCount());
// Delete the file & subdir(Total 2 files deleted & 1 directory)
routerDFS.delete(filePath1, true);
routerDFS.delete(subdirPath, true);
// Get the Content Summary
contentSummary = routerDFS.getContentSummary(dirPath);
assertEquals(1, contentSummary.getSnapshotDirectoryCount());
assertEquals(2, contentSummary.getSnapshotFileCount());
} finally {
// Cleanup
routerDFS.deleteSnapshot(dirPath, "s1");
routerDFS.disallowSnapshot(dirPath);
routerDFS.delete(dirPath, true);
}
}
@Test
public void testDisableNodeUsageInRBFMetrics() throws JSONException {
RBFMetrics rbfMetrics = router.getRouter().getMetrics();
FederationRPCMetrics federationRPCMetrics = router.getRouter().getRpcServer().getRPCMetrics();
long proxyOpBefore = federationRPCMetrics.getProxyOps();
String nodeUsageEnable = router.getRouter().getMetrics().getNodeUsage();
assertNotNull(nodeUsageEnable);
long proxyOpAfterWithEnable = federationRPCMetrics.getProxyOps();
assertEquals(proxyOpBefore + 2, proxyOpAfterWithEnable);
rbfMetrics.setEnableGetDNUsage(false);
String nodeUsageDisable = rbfMetrics.getNodeUsage();
assertNotNull(nodeUsageDisable);
long proxyOpAfterWithDisable = federationRPCMetrics.getProxyOps();
assertEquals(proxyOpAfterWithEnable, proxyOpAfterWithDisable);
JSONObject jsonObject = new JSONObject(nodeUsageDisable);
JSONObject json = jsonObject.getJSONObject("nodeUsage");
assertEquals("0.00%", json.get("min"));
assertEquals("0.00%", json.get("median"));
assertEquals("0.00%", json.get("max"));
assertEquals("0.00%", json.get("stdDev"));
rbfMetrics.setEnableGetDNUsage(true);
String nodeUsageWithReEnable = rbfMetrics.getNodeUsage();
assertNotNull(nodeUsageWithReEnable);
long proxyOpAfterWithReEnable = federationRPCMetrics.getProxyOps();
assertEquals(proxyOpAfterWithDisable + 2, proxyOpAfterWithReEnable);
}
}