blob: cbc11b27b2b056cbd57c2783aeb889f5d5904095 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests router rpc with multiple destination mount table resolver.
*/
public class TestRouterRPCMultipleDestinationMountTableResolver {
private static final List<String> NS_IDS = Arrays.asList("ns0", "ns1", "ns2");
private static StateStoreDFSCluster cluster;
private static RouterContext routerContext;
private static MountTableResolver resolver;
private static DistributedFileSystem nnFs0;
private static DistributedFileSystem nnFs1;
private static DistributedFileSystem nnFs2;
private static DistributedFileSystem routerFs;
private static RouterRpcServer rpcServer;
@BeforeClass
public static void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 3,
MultipleDestinationMountTableResolver.class);
Configuration routerConf =
new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
Configuration hdfsConf = new Configuration(false);
hdfsConf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
cluster.addRouterOverrides(routerConf);
cluster.addNamenodeOverrides(hdfsConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
routerContext = cluster.getRandomRouter();
resolver =
(MountTableResolver) routerContext.getRouter().getSubclusterResolver();
nnFs0 = (DistributedFileSystem) cluster
.getNamenode(cluster.getNameservices().get(0), null).getFileSystem();
nnFs1 = (DistributedFileSystem) cluster
.getNamenode(cluster.getNameservices().get(1), null).getFileSystem();
nnFs2 = (DistributedFileSystem) cluster
.getNamenode(cluster.getNameservices().get(2), null).getFileSystem();
routerFs = (DistributedFileSystem) routerContext.getFileSystem();
rpcServer =routerContext.getRouter().getRpcServer();
}
@AfterClass
public static void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
/**
* SetUp the mount entry , directories and file to verify invocation.
* @param order The order that the mount entry needs to follow.
* @throws Exception On account of any exception encountered during setting up
* the environment.
*/
public void setupOrderMountPath(DestinationOrder order) throws Exception {
Map<String, String> destMap = new HashMap<>();
destMap.put("ns0", "/tmp");
destMap.put("ns1", "/tmp");
nnFs0.mkdirs(new Path("/tmp"));
nnFs1.mkdirs(new Path("/tmp"));
MountTable addEntry = MountTable.newInstance("/mount", destMap);
addEntry.setDestOrder(order);
assertTrue(addMountTable(addEntry));
routerFs.mkdirs(new Path("/mount/dir/dir"));
DFSTestUtil.createFile(routerFs, new Path("/mount/dir/file"), 100L, (short) 1,
1024L);
DFSTestUtil.createFile(routerFs, new Path("/mount/file"), 100L, (short) 1,
1024L);
}
@After
public void resetTestEnvironment() throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
RemoveMountTableEntryRequest req2 =
RemoveMountTableEntryRequest.newInstance("/mount");
mountTableManager.removeMountTableEntry(req2);
nnFs0.delete(new Path("/tmp"), true);
nnFs1.delete(new Path("/tmp"), true);
}
@Test
public void testInvocationSpaceOrder() throws Exception {
setupOrderMountPath(DestinationOrder.SPACE);
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
assertTrue(isDirAll);
testInvocation(isDirAll);
}
@Test
public void testInvocationHashAllOrder() throws Exception {
setupOrderMountPath(DestinationOrder.HASH_ALL);
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
assertTrue(isDirAll);
testInvocation(isDirAll);
}
@Test
public void testInvocationRandomOrder() throws Exception {
setupOrderMountPath(DestinationOrder.RANDOM);
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
assertTrue(isDirAll);
testInvocation(isDirAll);
}
@Test
public void testInvocationHashOrder() throws Exception {
setupOrderMountPath(DestinationOrder.HASH);
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
assertFalse(isDirAll);
testInvocation(isDirAll);
}
@Test
public void testInvocationLocalOrder() throws Exception {
setupOrderMountPath(DestinationOrder.LOCAL);
boolean isDirAll = rpcServer.isPathAll("/mount/dir");
assertFalse(isDirAll);
testInvocation(isDirAll);
}
/**
* Verifies the invocation of API's at directory level , file level and at
* mount level.
* @param dirAll if true assumes that the mount entry creates directory on all
* locations.
* @throws IOException
*/
private void testInvocation(boolean dirAll) throws IOException {
// Verify invocation on nested directory and file.
Path mountDir = new Path("/mount/dir/dir");
Path nameSpaceFile = new Path("/tmp/dir/file");
Path mountFile = new Path("/mount/dir/file");
Path mountEntry = new Path("/mount");
Path mountDest = new Path("/tmp");
Path nameSpaceDir = new Path("/tmp/dir/dir");
final String name = "user.a1";
final byte[] value = {0x31, 0x32, 0x33};
testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile,
mountFile, nameSpaceDir, name, value);
// Verify invocation on non nested directory and file.
mountDir = new Path("/mount/dir");
nameSpaceFile = new Path("/tmp/file");
mountFile = new Path("/mount/file");
nameSpaceDir = new Path("/tmp/dir");
testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile,
mountFile, nameSpaceDir, name, value);
// Check invocation directly for a mount point.
// Verify owner and permissions.
routerFs.setOwner(mountEntry, "testuser", "testgroup");
routerFs.setPermission(mountEntry,
FsPermission.createImmutable((short) 777));
assertEquals("testuser", routerFs.getFileStatus(mountEntry).getOwner());
assertEquals("testuser", nnFs0.getFileStatus(mountDest).getOwner());
assertEquals("testuser", nnFs1.getFileStatus(mountDest).getOwner());
assertEquals((short) 777,
routerFs.getFileStatus(mountEntry).getPermission().toShort());
assertEquals((short) 777,
nnFs0.getFileStatus(mountDest).getPermission().toShort());
assertEquals((short) 777,
nnFs1.getFileStatus(mountDest).getPermission().toShort());
//Verify storage policy.
routerFs.setStoragePolicy(mountEntry, "COLD");
assertEquals("COLD", routerFs.getStoragePolicy(mountEntry).getName());
assertEquals("COLD", nnFs0.getStoragePolicy(mountDest).getName());
assertEquals("COLD", nnFs1.getStoragePolicy(mountDest).getName());
routerFs.unsetStoragePolicy(mountEntry);
assertEquals("HOT", routerFs.getStoragePolicy(mountDest).getName());
assertEquals("HOT", nnFs0.getStoragePolicy(mountDest).getName());
assertEquals("HOT", nnFs1.getStoragePolicy(mountDest).getName());
//Verify erasure coding policy.
routerFs.setErasureCodingPolicy(mountEntry, "RS-6-3-1024k");
assertEquals("RS-6-3-1024k",
routerFs.getErasureCodingPolicy(mountEntry).getName());
assertEquals("RS-6-3-1024k",
nnFs0.getErasureCodingPolicy(mountDest).getName());
assertEquals("RS-6-3-1024k",
nnFs1.getErasureCodingPolicy(mountDest).getName());
routerFs.unsetErasureCodingPolicy(mountEntry);
assertNull(routerFs.getErasureCodingPolicy(mountDest));
assertNull(nnFs0.getErasureCodingPolicy(mountDest));
assertNull(nnFs1.getErasureCodingPolicy(mountDest));
//Verify xAttr.
routerFs.setXAttr(mountEntry, name, value);
assertArrayEquals(value, routerFs.getXAttr(mountEntry, name));
assertArrayEquals(value, nnFs0.getXAttr(mountDest, name));
assertArrayEquals(value, nnFs1.getXAttr(mountDest, name));
routerFs.removeXAttr(mountEntry, name);
assertEquals(0, routerFs.getXAttrs(mountEntry).size());
assertEquals(0, nnFs0.getXAttrs(mountDest).size());
assertEquals(0, nnFs1.getXAttrs(mountDest).size());
}
/**
* SetUp to verify invocations on directories and file.
*/
private void testDirectoryAndFileLevelInvocation(boolean dirAll,
Path mountDir, Path nameSpaceFile, Path mountFile, Path nameSpaceDir,
final String name, final byte[] value) throws IOException {
// Check invocation for a directory.
routerFs.setOwner(mountDir, "testuser", "testgroup");
routerFs.setPermission(mountDir, FsPermission.createImmutable((short) 777));
routerFs.setStoragePolicy(mountDir, "COLD");
routerFs.setErasureCodingPolicy(mountDir, "RS-6-3-1024k");
routerFs.setXAttr(mountDir, name, value);
// Verify the directory level invocations were checked in case of mounts not
// creating directories in all subclusters.
boolean checkedDir1 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir,
nnFs0, name, value);
boolean checkedDir2 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir,
nnFs1, name, value);
assertTrue("The file didn't existed in either of the subclusters.",
checkedDir1 || checkedDir2);
routerFs.unsetStoragePolicy(mountDir);
routerFs.removeXAttr(mountDir, name);
routerFs.unsetErasureCodingPolicy(mountDir);
checkedDir1 =
verifyDirectoryLevelUnsetInvocations(dirAll, nnFs0, nameSpaceDir);
checkedDir2 =
verifyDirectoryLevelUnsetInvocations(dirAll, nnFs1, nameSpaceDir);
assertTrue("The file didn't existed in either of the subclusters.",
checkedDir1 || checkedDir2);
// Check invocation for a file.
routerFs.setOwner(mountFile, "testuser", "testgroup");
routerFs.setPermission(mountFile,
FsPermission.createImmutable((short) 777));
routerFs.setStoragePolicy(mountFile, "COLD");
routerFs.setReplication(mountFile, (short) 2);
routerFs.setXAttr(mountFile, name, value);
verifyFileLevelInvocations(nameSpaceFile, nnFs0, mountFile, name, value);
verifyFileLevelInvocations(nameSpaceFile, nnFs1, mountFile, name, value);
}
/**
* Verify invocations of API's unseting values at the directory level.
* @param dirAll true if the mount entry order creates directory in all
* locations.
* @param nameSpaceDir path of the directory in the namespace.
* @param nnFs file system where the directory level invocation needs to be
* tested.
* @throws IOException
*/
private boolean verifyDirectoryLevelUnsetInvocations(boolean dirAll,
DistributedFileSystem nnFs, Path nameSpaceDir) throws IOException {
boolean checked = false;
if (dirAll || nnFs.exists(nameSpaceDir)) {
checked = true;
assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceDir).getName());
assertNull(nnFs.getErasureCodingPolicy(nameSpaceDir));
assertEquals(0, nnFs.getXAttrs(nameSpaceDir).size());
}
return checked;
}
/**
* Verify file level invocations.
* @param nameSpaceFile path of the file in the namespace.
* @param nnFs the file system where the file invocation needs to checked.
* @param mountFile path of the file w.r.t. mount table.
* @param name name of Xattr.
* @param value value of Xattr.
* @throws IOException
*/
private void verifyFileLevelInvocations(Path nameSpaceFile,
DistributedFileSystem nnFs, Path mountFile, final String name,
final byte[] value) throws IOException {
if (nnFs.exists(nameSpaceFile)) {
assertEquals("testuser", nnFs.getFileStatus(nameSpaceFile).getOwner());
assertEquals((short) 777,
nnFs.getFileStatus(nameSpaceFile).getPermission().toShort());
assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceFile).getName());
assertEquals((short) 2,
nnFs.getFileStatus(nameSpaceFile).getReplication());
assertArrayEquals(value, nnFs.getXAttr(nameSpaceFile, name));
routerFs.unsetStoragePolicy(mountFile);
routerFs.removeXAttr(mountFile, name);
assertEquals(0, nnFs.getXAttrs(nameSpaceFile).size());
assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceFile).getName());
}
}
/**
* Verify invocations at the directory level.
* @param dirAll true if the mount entry order creates directory in all
* locations.
* @param nameSpaceDir path of the directory in the namespace.
* @param nnFs file system where the directory level invocation needs to be
* tested.
* @param name name for the Xattr.
* @param value value for the Xattr.
* @return true, if directory existed and successful verification of
* invocations.
* @throws IOException
*/
private boolean verifyDirectoryLevelInvocations(boolean dirAll,
Path nameSpaceDir, DistributedFileSystem nnFs, final String name,
final byte[] value) throws IOException {
boolean checked = false;
if (dirAll || nnFs.exists(nameSpaceDir)) {
checked = true;
assertEquals("testuser", nnFs.getFileStatus(nameSpaceDir).getOwner());
assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceDir).getName());
assertEquals("RS-6-3-1024k",
nnFs.getErasureCodingPolicy(nameSpaceDir).getName());
assertArrayEquals(value, nnFs.getXAttr(nameSpaceDir, name));
assertEquals((short) 777,
nnFs.getFileStatus(nameSpaceDir).getPermission().toShort());
}
return checked;
}
/**
* Add a mount table entry to the mount table through the admin API.
* @param entry Mount table entry to add.
* @return If it was successfully added.
* @throws IOException + * Problems adding entries.
*/
private boolean addMountTable(final MountTable entry) throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(entry);
AddMountTableEntryResponse addResponse =
mountTableManager.addMountTableEntry(addRequest);
// Reload the Router cache
resolver.loadCache(true);
return addResponse.getStatus();
}
@Test
public void testECMultipleDestinations() throws Exception {
setupOrderMountPath(DestinationOrder.HASH_ALL);
Path mountPath = new Path("/mount/dir");
routerFs.setErasureCodingPolicy(mountPath, "RS-6-3-1024k");
assertTrue(routerFs.getFileStatus(mountPath).isErasureCoded());
}
@Test
public void testACLMultipleDestinations() throws Exception {
setupOrderMountPath(DestinationOrder.HASH_ALL);
Path mountPath = new Path("/mount/dir/dir");
Path nsPath = new Path("/tmp/dir/dir");
List<AclEntry> aclSpec = Collections.singletonList(
AclEntry.parseAclEntry("default:USER:TestUser:rwx", true));
routerFs.setAcl(mountPath, aclSpec);
assertEquals(5, nnFs0.getAclStatus(nsPath).getEntries().size());
assertEquals(5, nnFs1.getAclStatus(nsPath).getEntries().size());
aclSpec = Collections
.singletonList(AclEntry.parseAclEntry("USER:User:rwx::", true));
routerFs.modifyAclEntries(mountPath, aclSpec);
assertEquals(7, nnFs0.getAclStatus(nsPath).getEntries().size());
assertEquals(7, nnFs1.getAclStatus(nsPath).getEntries().size());
routerFs.removeAclEntries(mountPath, aclSpec);
assertEquals(6, nnFs0.getAclStatus(nsPath).getEntries().size());
assertEquals(6, nnFs1.getAclStatus(nsPath).getEntries().size());
routerFs.modifyAclEntries(mountPath, aclSpec);
routerFs.removeDefaultAcl(mountPath);
assertEquals(2, nnFs0.getAclStatus(nsPath).getEntries().size());
assertEquals(2, nnFs1.getAclStatus(nsPath).getEntries().size());
routerFs.removeAcl(mountPath);
assertEquals(0, nnFs0.getAclStatus(nsPath).getEntries().size());
assertEquals(0, nnFs1.getAclStatus(nsPath).getEntries().size());
}
@Test
public void testGetDestinationHashAll() throws Exception {
testGetDestination(DestinationOrder.HASH_ALL,
Arrays.asList("ns1"),
Arrays.asList("ns1"),
Arrays.asList("ns1", "ns0"));
}
@Test
public void testGetDestinationHash() throws Exception {
testGetDestination(DestinationOrder.HASH,
Arrays.asList("ns1"),
Arrays.asList("ns1"),
Arrays.asList("ns1"));
}
@Test
public void testGetDestinationRandom() throws Exception {
testGetDestination(DestinationOrder.RANDOM,
null, null, Arrays.asList("ns0", "ns1"));
}
@Test
public void testIsMultiDestDir() throws Exception {
RouterClientProtocol client =
routerContext.getRouter().getRpcServer().getClientProtocolModule();
setupOrderMountPath(DestinationOrder.HASH_ALL);
// Should be true only for directory and false for all other cases.
assertTrue(client.isMultiDestDirectory("/mount/dir"));
assertFalse(client.isMultiDestDirectory("/mount/nodir"));
assertFalse(client.isMultiDestDirectory("/mount/dir/file"));
routerFs.createSymlink(new Path("/mount/dir/file"),
new Path("/mount/dir/link"), true);
assertFalse(client.isMultiDestDirectory("/mount/dir/link"));
routerFs.createSymlink(new Path("/mount/dir/dir"),
new Path("/mount/dir/linkDir"), true);
assertFalse(client.isMultiDestDirectory("/mount/dir/linkDir"));
resetTestEnvironment();
// Test single directory destination. Should be false for the directory.
setupOrderMountPath(DestinationOrder.HASH);
assertFalse(client.isMultiDestDirectory("/mount/dir"));
}
/**
* Verifies the snapshot location returned after snapshot operations is in
* accordance to the mount path.
*/
@Test
public void testSnapshotPathResolution() throws Exception {
// Create a mount entry with non isPathAll order, to call
// invokeSequential.
Map<String, String> destMap = new HashMap<>();
destMap.put("ns0", "/tmp_ns0");
destMap.put("ns1", "/tmp_ns1");
nnFs0.mkdirs(new Path("/tmp_ns0"));
nnFs1.mkdirs(new Path("/tmp_ns1"));
MountTable addEntry = MountTable.newInstance("/mountSnap", destMap);
addEntry.setDestOrder(DestinationOrder.HASH);
assertTrue(addMountTable(addEntry));
// Create the actual directory in the destination second in sequence of
// invokeSequential.
nnFs0.mkdirs(new Path("/tmp_ns0/snapDir"));
Path snapDir = new Path("/mountSnap/snapDir");
Path snapshotPath = new Path("/mountSnap/snapDir/.snapshot/snap");
routerFs.allowSnapshot(snapDir);
// Verify the snapshot path returned after createSnapshot is as per mount
// path.
Path snapshot = routerFs.createSnapshot(snapDir, "snap");
assertEquals(snapshotPath, snapshot);
// Verify the snapshot path returned as part of snapshotListing is as per
// mount path.
SnapshotStatus[] snapshots = routerFs.getSnapshotListing(snapDir);
assertEquals(snapshotPath, snapshots[0].getFullPath());
}
@Test
public void testRenameMultipleDestDirectories() throws Exception {
// Test renaming directories using rename API.
verifyRenameOnMultiDestDirectories(DestinationOrder.HASH_ALL, false);
resetTestEnvironment();
verifyRenameOnMultiDestDirectories(DestinationOrder.RANDOM, false);
resetTestEnvironment();
verifyRenameOnMultiDestDirectories(DestinationOrder.SPACE, false);
resetTestEnvironment();
// Test renaming directories using rename2 API.
verifyRenameOnMultiDestDirectories(DestinationOrder.HASH_ALL, true);
resetTestEnvironment();
verifyRenameOnMultiDestDirectories(DestinationOrder.RANDOM, true);
resetTestEnvironment();
verifyRenameOnMultiDestDirectories(DestinationOrder.SPACE, true);
}
@Test
public void testClearQuota() throws Exception {
long nsQuota = 5;
long ssQuota = 100;
Path path = new Path("/router_test");
nnFs0.mkdirs(path);
nnFs1.mkdirs(path);
MountTable addEntry = MountTable.newInstance("/router_test",
Collections.singletonMap("ns0", "/router_test"));
addEntry.setQuota(new RouterQuotaUsage.Builder().build());
assertTrue(addMountTable(addEntry));
RouterQuotaUpdateService updateService =
routerContext.getRouter().getQuotaCacheUpdateService();
updateService.periodicInvoke();
//set quota and validate the quota
RouterAdmin admin = getRouterAdmin();
String[] argv = new String[] {"-setQuota", path.toString(), "-nsQuota",
String.valueOf(nsQuota), "-ssQuota", String.valueOf(ssQuota)};
assertEquals(0, ToolRunner.run(admin, argv));
updateService.periodicInvoke();
resolver.loadCache(true);
ContentSummary cs = routerFs.getContentSummary(path);
assertEquals(nsQuota, cs.getQuota());
assertEquals(ssQuota, cs.getSpaceQuota());
//clear quota and validate the quota
argv = new String[] {"-clrQuota", path.toString()};
assertEquals(0, ToolRunner.run(admin, argv));
updateService.periodicInvoke();
resolver.loadCache(true);
//quota should be cleared
ContentSummary cs1 = routerFs.getContentSummary(path);
assertEquals(-1, cs1.getQuota());
assertEquals(-1, cs1.getSpaceQuota());
}
@Test
public void testContentSummaryWithMultipleDest() throws Exception {
MountTable addEntry;
long nsQuota = 5;
long ssQuota = 100;
Path path = new Path("/testContentSummaryWithMultipleDest");
Map<String, String> destMap = new HashMap<>();
destMap.put("ns0", "/testContentSummaryWithMultipleDest");
destMap.put("ns1", "/testContentSummaryWithMultipleDest");
nnFs0.mkdirs(path);
nnFs1.mkdirs(path);
addEntry =
MountTable.newInstance("/testContentSummaryWithMultipleDest", destMap);
addEntry.setQuota(
new RouterQuotaUsage.Builder().quota(nsQuota).spaceQuota(ssQuota)
.build());
assertTrue(addMountTable(addEntry));
RouterQuotaUpdateService updateService =
routerContext.getRouter().getQuotaCacheUpdateService();
updateService.periodicInvoke();
ContentSummary cs = routerFs.getContentSummary(path);
assertEquals(nsQuota, cs.getQuota());
assertEquals(ssQuota, cs.getSpaceQuota());
ContentSummary ns0Cs = nnFs0.getContentSummary(path);
assertEquals(nsQuota, ns0Cs.getQuota());
assertEquals(ssQuota, ns0Cs.getSpaceQuota());
ContentSummary ns1Cs = nnFs1.getContentSummary(path);
assertEquals(nsQuota, ns1Cs.getQuota());
assertEquals(ssQuota, ns1Cs.getSpaceQuota());
}
@Test
public void testContentSummaryMultipleDestWithMaxValue()
throws Exception {
MountTable addEntry;
long nsQuota = Long.MAX_VALUE - 2;
long ssQuota = Long.MAX_VALUE - 2;
Path path = new Path("/testContentSummaryMultipleDestWithMaxValue");
Map<String, String> destMap = new HashMap<>();
destMap.put("ns0", "/testContentSummaryMultipleDestWithMaxValue");
destMap.put("ns1", "/testContentSummaryMultipleDestWithMaxValue");
nnFs0.mkdirs(path);
nnFs1.mkdirs(path);
addEntry = MountTable
.newInstance("/testContentSummaryMultipleDestWithMaxValue", destMap);
addEntry.setQuota(
new RouterQuotaUsage.Builder().quota(nsQuota).spaceQuota(ssQuota)
.build());
assertTrue(addMountTable(addEntry));
RouterQuotaUpdateService updateService =
routerContext.getRouter().getQuotaCacheUpdateService();
updateService.periodicInvoke();
ContentSummary cs = routerFs.getContentSummary(path);
assertEquals(nsQuota, cs.getQuota());
assertEquals(ssQuota, cs.getSpaceQuota());
}
/**
* Test RouterRpcServer#invokeAtAvailableNs on mount point with multiple destinations
* and making a one of the destination's subcluster unavailable.
*/
@Test
public void testInvokeAtAvailableNs() throws IOException {
// Create a mount point with multiple destinations.
Path path = new Path("/testInvokeAtAvailableNs");
Map<String, String> destMap = new HashMap<>();
destMap.put("ns0", "/testInvokeAtAvailableNs");
destMap.put("ns1", "/testInvokeAtAvailableNs");
nnFs0.mkdirs(path);
nnFs1.mkdirs(path);
MountTable addEntry =
MountTable.newInstance("/testInvokeAtAvailableNs", destMap);
addEntry.setQuota(new RouterQuotaUsage.Builder().build());
addEntry.setDestOrder(DestinationOrder.RANDOM);
addEntry.setFaultTolerant(true);
assertTrue(addMountTable(addEntry));
// Make one subcluster unavailable.
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdownNameNode(0);
dfsCluster.shutdownNameNode(1);
try {
// Verify that #invokeAtAvailableNs works by calling #getServerDefaults.
RemoteMethod method = new RemoteMethod("getServerDefaults");
FsServerDefaults serverDefaults =
rpcServer.invokeAtAvailableNs(method, FsServerDefaults.class);
assertNotNull(serverDefaults);
} finally {
dfsCluster.restartNameNode(0);
dfsCluster.restartNameNode(1);
}
}
/**
* Test write on mount point with multiple destinations
* and making a one of the destination's subcluster unavailable.
*/
@Test
public void testWriteWithUnavailableSubCluster() throws IOException {
//create a mount point with multiple destinations
Path path = new Path("/testWriteWithUnavailableSubCluster");
Map<String, String> destMap = new HashMap<>();
destMap.put("ns0", "/testWriteWithUnavailableSubCluster");
destMap.put("ns1", "/testWriteWithUnavailableSubCluster");
nnFs0.mkdirs(path);
nnFs1.mkdirs(path);
MountTable addEntry =
MountTable.newInstance("/testWriteWithUnavailableSubCluster", destMap);
addEntry.setQuota(new RouterQuotaUsage.Builder().build());
addEntry.setDestOrder(DestinationOrder.RANDOM);
addEntry.setFaultTolerant(true);
assertTrue(addMountTable(addEntry));
//make one subcluster unavailable and perform write on mount point
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdownNameNode(0);
FSDataOutputStream out = null;
Path filePath = new Path(path, "aa");
try {
out = routerFs.create(filePath);
out.write("hello".getBytes());
out.hflush();
assertTrue(routerFs.exists(filePath));
} finally {
IOUtils.closeStream(out);
dfsCluster.restartNameNode(0);
}
}
/**
* Test rename a dir from src dir (mapped to both ns0 and ns1) to ns0.
*/
@Test
public void testRenameWithMultiDestinations() throws Exception {
//create a mount point with multiple destinations
String srcDir = "/mount-source-dir";
Path path = new Path(srcDir);
Map<String, String> destMap = new HashMap<>();
destMap.put("ns0", srcDir);
destMap.put("ns1", srcDir);
nnFs0.mkdirs(path);
nnFs1.mkdirs(path);
MountTable addEntry =
MountTable.newInstance(srcDir, destMap);
addEntry.setDestOrder(DestinationOrder.RANDOM);
assertTrue(addMountTable(addEntry));
//create a mount point with a single destinations ns0
String targetDir = "/ns0_test";
nnFs0.mkdirs(new Path(targetDir));
MountTable addDstEntry = MountTable.newInstance(targetDir,
Collections.singletonMap("ns0", targetDir));
assertTrue(addMountTable(addDstEntry));
//mkdir sub dirs in srcDir mapping ns0 & ns1
routerFs.mkdirs(new Path(srcDir + "/dir1"));
routerFs.mkdirs(new Path(srcDir + "/dir1/dir_1"));
routerFs.mkdirs(new Path(srcDir + "/dir1/dir_2"));
routerFs.mkdirs(new Path(targetDir));
//try to rename sub dir in srcDir (mapping to ns0 & ns1) to targetDir
// (mapping ns0)
LambdaTestUtils.intercept(IOException.class, "The number of" +
" remote locations for both source and target should be same.",
() -> {
routerFs.rename(new Path(srcDir + "/dir1/dir_1"),
new Path(targetDir));
});
}
/**
* Test to verify rename operation on directories in case of multiple
* destinations.
* @param order order to be followed by the mount entry.
* @param isRename2 true if the verification is to be done using rename2(..)
* method.
* @throws Exception on account of any exception during test execution.
*/
private void verifyRenameOnMultiDestDirectories(DestinationOrder order,
boolean isRename2) throws Exception {
setupOrderMountPath(order);
Path src = new Path("/mount/dir/dir");
Path nnSrc = new Path("/tmp/dir/dir");
Path dst = new Path("/mount/dir/subdir");
Path nnDst = new Path("/tmp/dir/subdir");
Path fileSrc = new Path("/mount/dir/dir/file");
Path nnFileSrc = new Path("/tmp/dir/dir/file");
Path fileDst = new Path("/mount/dir/subdir/file");
Path nnFileDst = new Path("/tmp/dir/subdir/file");
DFSTestUtil.createFile(routerFs, fileSrc, 100L, (short) 1, 1024L);
if (isRename2) {
routerFs.rename(src, dst, Rename.NONE);
} else {
assertTrue(routerFs.rename(src, dst));
}
assertTrue(nnFs0.exists(nnDst));
assertTrue(nnFs1.exists(nnDst));
assertFalse(nnFs0.exists(nnSrc));
assertFalse(nnFs1.exists(nnSrc));
assertFalse(routerFs.exists(fileSrc));
assertTrue(routerFs.exists(fileDst));
assertTrue(nnFs0.exists(nnFileDst) || nnFs1.exists(nnFileDst));
assertFalse(nnFs0.exists(nnFileSrc) || nnFs1.exists(nnFileSrc));
// Verify rename file.
Path fileRenamed = new Path("/mount/dir/subdir/renamedFile");
Path nnFileRenamed = new Path("/tmp/dir/subdir/renamedFile");
if (isRename2) {
routerFs.rename(fileDst, fileRenamed, Rename.NONE);
} else {
assertTrue(routerFs.rename(fileDst, fileRenamed));
}
assertTrue(routerFs.exists(fileRenamed));
assertFalse(routerFs.exists(fileDst));
assertTrue(nnFs0.exists(nnFileRenamed) || nnFs1.exists(nnFileRenamed));
assertFalse(nnFs0.exists(nnFileDst) || nnFs1.exists(nnFileDst));
// Verify rename when one source directory is not present.
Path dst1 = new Path("/mount/dir/renameddir");
Path nnDst1 = new Path("/tmp/dir/renameddir");
nnFs1.delete(nnDst, true);
if (isRename2) {
routerFs.rename(dst, dst1, Rename.NONE);
} else {
assertTrue(routerFs.rename(dst, dst1));
}
assertTrue(nnFs0.exists(nnDst1));
assertFalse(nnFs0.exists(nnDst));
// Verify rename when one destination directory is already present.
Path src1 = new Path("/mount/dir");
Path dst2 = new Path("/mount/OneDest");
Path nnDst2 = new Path("/tmp/OneDest");
nnFs0.mkdirs(nnDst2);
if (isRename2) {
routerFs.rename(src1, dst2, Rename.NONE);
} else {
assertTrue(routerFs.rename(src1, dst2));
}
assertTrue(nnFs0.exists(nnDst2));
assertTrue(nnFs1.exists(nnDst2));
}
/**
* Generic test for getting the destination subcluster.
* @param order DestinationOrder of the mount point.
* @param expectFileLocation Expected subclusters of a file. null for any.
* @param expectNoFileLocation Expected subclusters of a non-existing file.
* @param expectDirLocation Expected subclusters of a nested directory.
* @throws Exception If the test cannot run.
*/
private void testGetDestination(DestinationOrder order,
List<String> expectFileLocation,
List<String> expectNoFileLocation,
List<String> expectDirLocation) throws Exception {
setupOrderMountPath(order);
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
// If the file exists, it should be in the expected subcluster
final String pathFile = "dir/file";
final Path pathRouterFile = new Path("/mount", pathFile);
final Path pathLocalFile = new Path("/tmp", pathFile);
FileStatus fileStatus = routerFs.getFileStatus(pathRouterFile);
assertTrue(fileStatus + " should be a file", fileStatus.isFile());
GetDestinationResponse respFile = mountTableManager.getDestination(
GetDestinationRequest.newInstance(pathRouterFile));
if (expectFileLocation != null) {
assertEquals(expectFileLocation, respFile.getDestinations());
assertPathStatus(expectFileLocation, pathLocalFile, false);
} else {
Collection<String> dests = respFile.getDestinations();
assertPathStatus(dests, pathLocalFile, false);
}
// If the file does not exist, it should give us the expected subclusters
final String pathNoFile = "dir/no-file";
final Path pathRouterNoFile = new Path("/mount", pathNoFile);
final Path pathLocalNoFile = new Path("/tmp", pathNoFile);
LambdaTestUtils.intercept(FileNotFoundException.class,
() -> routerFs.getFileStatus(pathRouterNoFile));
GetDestinationResponse respNoFile = mountTableManager.getDestination(
GetDestinationRequest.newInstance(pathRouterNoFile));
if (expectNoFileLocation != null) {
assertEquals(expectNoFileLocation, respNoFile.getDestinations());
}
assertPathStatus(Collections.emptyList(), pathLocalNoFile, false);
// If the folder exists, it should be in the expected subcluster
final String pathNestedDir = "dir/dir";
final Path pathRouterNestedDir = new Path("/mount", pathNestedDir);
final Path pathLocalNestedDir = new Path("/tmp", pathNestedDir);
FileStatus dirStatus = routerFs.getFileStatus(pathRouterNestedDir);
assertTrue(dirStatus + " should be a directory", dirStatus.isDirectory());
GetDestinationResponse respDir = mountTableManager.getDestination(
GetDestinationRequest.newInstance(pathRouterNestedDir));
assertEqualsCollection(expectDirLocation, respDir.getDestinations());
assertPathStatus(expectDirLocation, pathLocalNestedDir, true);
}
/**
* Assert that the status of a file in the subcluster is the expected one.
* @param expectedLocations Subclusters where the file is expected to exist.
* @param path Path of the file/directory to check.
* @param isDir If the path is expected to be a directory.
* @throws Exception If the file cannot be checked.
*/
private void assertPathStatus(Collection<String> expectedLocations,
Path path, boolean isDir) throws Exception {
for (String nsId : NS_IDS) {
final FileSystem fs = getFileSystem(nsId);
if (expectedLocations.contains(nsId)) {
assertTrue(path + " should exist in " + nsId, fs.exists(path));
final FileStatus status = fs.getFileStatus(path);
if (isDir) {
assertTrue(path + " should be a directory", status.isDirectory());
} else {
assertTrue(path + " should be a file", status.isFile());
}
} else {
assertFalse(path + " should not exist in " + nsId, fs.exists(path));
}
}
}
/**
* Assert if two collections are equal without checking the order.
* @param col1 First collection to compare.
* @param col2 Second collection to compare.
*/
private static void assertEqualsCollection(
Collection<String> col1, Collection<String> col2) {
assertEquals(new TreeSet<>(col1), new TreeSet<>(col2));
}
/**
* Get the filesystem for each subcluster.
* @param nsId Identifier of the name space (subcluster).
* @return The FileSystem for
*/
private static FileSystem getFileSystem(final String nsId) {
if (nsId.equals("ns0")) {
return nnFs0;
}
if (nsId.equals("ns1")) {
return nnFs1;
}
if (nsId.equals("ns2")) {
return nnFs2;
}
return null;
}
private RouterAdmin getRouterAdmin() {
Router router = routerContext.getRouter();
Configuration configuration = routerContext.getConf();
InetSocketAddress routerSocket = router.getAdminServerAddress();
configuration.setSocketAddr(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
routerSocket);
return new RouterAdmin(configuration);
}
}