blob: 075917bfbe09f50689d025940ca8b07b22f0dbe4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests the use of the resolvers that write in all subclusters from the
* Router. It supports:
* <li>HashResolver
* <li>RandomResolver.
*/
public class TestRouterAllResolver {
/** Directory that will be in a HASH_ALL mount point. */
private static final String TEST_DIR_HASH_ALL = "/hashall";
/** Directory that will be in a RANDOM mount point. */
private static final String TEST_DIR_RANDOM = "/random";
/** Directory that will be in a SPACE mount point. */
private static final String TEST_DIR_SPACE = "/space";
/** Number of namespaces. */
private static final int NUM_NAMESPACES = 2;
/** Mini HDFS clusters with Routers and State Store. */
private static StateStoreDFSCluster cluster;
/** Router for testing. */
private static RouterContext routerContext;
/** Router/federated filesystem. */
private static FileSystem routerFs;
/** Filesystem for each namespace. */
private static List<FileSystem> nsFss = new LinkedList<>();
@Before
public void setup() throws Exception {
// 2 nameservices with 1 namenode each (no HA needed for this test)
cluster = new StateStoreDFSCluster(
false, NUM_NAMESPACES, MultipleDestinationMountTableResolver.class);
// Start NNs and DNs and wait until ready
cluster.startCluster();
// Build and start a Router with: State Store + Admin + RPC
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
cluster.addRouterOverrides(routerConf);
cluster.startRouters();
routerContext = cluster.getRandomRouter();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
// Setup the test mount point
createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL);
createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM);
createMountTableEntry(TEST_DIR_SPACE, DestinationOrder.SPACE);
// Get filesystems for federated and each namespace
routerFs = routerContext.getFileSystem();
for (String nsId : cluster.getNameservices()) {
List<NamenodeContext> nns = cluster.getNamenodes(nsId);
for (NamenodeContext nn : nns) {
FileSystem nnFs = nn.getFileSystem();
nsFss.add(nnFs);
}
}
assertEquals(NUM_NAMESPACES, nsFss.size());
}
@After
public void cleanup() {
cluster.shutdown();
cluster = null;
routerContext = null;
routerFs = null;
nsFss.clear();
}
@Test
public void testHashAll() throws Exception {
testAll(TEST_DIR_HASH_ALL);
}
@Test
public void testRandomAll() throws Exception {
testAll(TEST_DIR_RANDOM);
}
@Test
public void testSpaceAll() throws Exception {
testAll(TEST_DIR_SPACE);
}
/**
* Tests that the resolver spreads files across subclusters in the whole
* tree.
* @throws Exception If the resolver is not working.
*/
private void testAll(final String path) throws Exception {
// Create directories in different levels
routerFs.mkdirs(new Path(path + "/dir0"));
routerFs.mkdirs(new Path(path + "/dir1"));
routerFs.mkdirs(new Path(path + "/dir2/dir20"));
routerFs.mkdirs(new Path(path + "/dir2/dir21"));
routerFs.mkdirs(new Path(path + "/dir2/dir22"));
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir220"));
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir221"));
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir222"));
assertDirsEverywhere(path, 9);
// Create 14 files at different levels of the tree
createTestFile(routerFs, path + "/dir0/file1.txt");
createTestFile(routerFs, path + "/dir0/file2.txt");
createTestFile(routerFs, path + "/dir1/file2.txt");
createTestFile(routerFs, path + "/dir1/file3.txt");
createTestFile(routerFs, path + "/dir2/dir20/file4.txt");
createTestFile(routerFs, path + "/dir2/dir20/file5.txt");
createTestFile(routerFs, path + "/dir2/dir21/file6.txt");
createTestFile(routerFs, path + "/dir2/dir21/file7.txt");
createTestFile(routerFs, path + "/dir2/dir22/file8.txt");
createTestFile(routerFs, path + "/dir2/dir22/file9.txt");
createTestFile(routerFs, path + "/dir2/dir22/dir220/file10.txt");
createTestFile(routerFs, path + "/dir2/dir22/dir220/file11.txt");
createTestFile(routerFs, path + "/dir2/dir22/dir220/file12.txt");
createTestFile(routerFs, path + "/dir2/dir22/dir220/file13.txt");
assertDirsEverywhere(path, 9);
assertFilesDistributed(path, 14);
// Test append
String testFile = path + "/dir2/dir22/dir220/file-append.txt";
createTestFile(routerFs, testFile);
Path testFilePath = new Path(testFile);
assertTrue("Created file is too small",
routerFs.getFileStatus(testFilePath).getLen() > 50);
appendTestFile(routerFs, testFile);
assertTrue("Append file is too small",
routerFs.getFileStatus(testFilePath).getLen() > 110);
assertDirsEverywhere(path, 9);
assertFilesDistributed(path, 15);
// Test truncate
String testTruncateFile = path + "/dir2/dir22/dir220/file-truncate.txt";
createTestFile(routerFs, testTruncateFile);
Path testTruncateFilePath = new Path(testTruncateFile);
routerFs.truncate(testTruncateFilePath, 10);
TestFileTruncate.checkBlockRecovery(testTruncateFilePath,
(DistributedFileSystem) routerFs);
assertEquals("Truncate file fails", 10,
routerFs.getFileStatus(testTruncateFilePath).getLen());
assertDirsEverywhere(path, 9);
assertFilesDistributed(path, 16);
// Removing a directory should remove it from every subcluster
routerFs.delete(new Path(path + "/dir2/dir22/dir220"), true);
assertDirsEverywhere(path, 8);
assertFilesDistributed(path, 10);
// Removing all sub directories
routerFs.delete(new Path(path + "/dir0"), true);
routerFs.delete(new Path(path + "/dir1"), true);
routerFs.delete(new Path(path + "/dir2"), true);
assertDirsEverywhere(path, 0);
assertFilesDistributed(path, 0);
}
/**
* Directories in HASH_ALL mount points must be in every namespace.
* @param path Path to check under.
* @param expectedNumDirs Expected number of directories.
* @throws IOException If it cannot check the directories.
*/
private void assertDirsEverywhere(String path, int expectedNumDirs)
throws IOException {
// Check for the directories in each filesystem
List<FileStatus> files = listRecursive(routerFs, path);
int numDirs = 0;
for (FileStatus file : files) {
if (file.isDirectory()) {
numDirs++;
Path dirPath = file.getPath();
Path checkPath = getRelativePath(dirPath);
for (FileSystem nsFs : nsFss) {
FileStatus fileStatus1 = nsFs.getFileStatus(checkPath);
assertTrue(file + " should be a directory",
fileStatus1.isDirectory());
}
}
}
assertEquals(expectedNumDirs, numDirs);
}
/**
* Check that the files are somewhat spread across namespaces.
* @param path Path to check under.
* @param expectedNumFiles Number of files expected.
* @throws IOException If the files cannot be checked.
*/
private void assertFilesDistributed(String path, int expectedNumFiles)
throws IOException {
// Check where the files went
List<FileStatus> routerFiles = listRecursive(routerFs, path);
List<List<FileStatus>> nssFiles = new LinkedList<>();
for (FileSystem nsFs : nsFss) {
List<FileStatus> nsFiles = listRecursive(nsFs, path);
nssFiles.add(nsFiles);
}
// We should see all the files in the federated view
int numRouterFiles = getNumTxtFiles(routerFiles);
assertEquals(numRouterFiles, expectedNumFiles);
// All the files should be spread somewhat evenly across subclusters
List<Integer> numNsFiles = new LinkedList<>();
int sumNsFiles = 0;
for (int i = 0; i < NUM_NAMESPACES; i++) {
List<FileStatus> nsFiles = nssFiles.get(i);
int numFiles = getNumTxtFiles(nsFiles);
numNsFiles.add(numFiles);
sumNsFiles += numFiles;
}
assertEquals(numRouterFiles, sumNsFiles);
if (expectedNumFiles > 0) {
for (int numFiles : numNsFiles) {
assertTrue("Files not distributed: " + numNsFiles, numFiles > 0);
}
}
}
/**
* Create a test file in the filesystem and check if it was written.
* @param fs Filesystem.
* @param filename Name of the file to create.
* @throws IOException If it cannot create the file.
*/
private static void createTestFile(
final FileSystem fs, final String filename)throws IOException {
final Path path = new Path(filename);
// Write the data
FSDataOutputStream os = fs.create(path);
os.writeUTF("Test data " + filename);
os.close();
// Read the data and check
FSDataInputStream is = fs.open(path);
String read = is.readUTF();
assertEquals("Test data " + filename, read);
is.close();
}
/**
* Append to a test file in the filesystem and check if we appended.
* @param fs Filesystem.
* @param filename Name of the file to append to.
* @throws IOException
*/
private static void appendTestFile(
final FileSystem fs, final String filename) throws IOException {
final Path path = new Path(filename);
// Write the data
FSDataOutputStream os = fs.append(path);
os.writeUTF("Test append data " + filename);
os.close();
// Read the data previous data
FSDataInputStream is = fs.open(path);
String read = is.readUTF();
assertEquals(read, "Test data " + filename);
// Read the new data and check
read = is.readUTF();
assertEquals(read, "Test append data " + filename);
is.close();
}
/**
* Count the number of text files in a list.
* @param files File list.
* @return Number of .txt files.
*/
private static int getNumTxtFiles(final List<FileStatus> files) {
int numFiles = 0;
for (FileStatus file : files) {
if (file.getPath().getName().endsWith(".txt")) {
numFiles++;
}
}
return numFiles;
}
/**
* Get the relative path within a filesystem (removes the filesystem prefix).
* @param path Path to check.
* @return File within the filesystem.
*/
private static Path getRelativePath(final Path path) {
URI uri = path.toUri();
String uriPath = uri.getPath();
return new Path(uriPath);
}
/**
* Get the list the files/dirs under a path.
* @param fs Filesystem to check in.
* @param path Path to check for.
* @return List of files.
* @throws IOException If it cannot list the files.
*/
private List<FileStatus> listRecursive(
final FileSystem fs, final String path) throws IOException {
List<FileStatus> ret = new LinkedList<>();
List<Path> temp = new LinkedList<>();
temp.add(new Path(path));
while (!temp.isEmpty()) {
Path p = temp.remove(0);
for (FileStatus fileStatus : fs.listStatus(p)) {
ret.add(fileStatus);
if (fileStatus.isDirectory()) {
temp.add(fileStatus.getPath());
}
}
}
return ret;
}
/**
* Add a mount table entry in all nameservices and wait until it is
* available in all routers.
* @param mountPoint Name of the mount point.
* @param order Order of the mount table entry.
* @throws Exception If the entry could not be created.
*/
private void createMountTableEntry(
final String mountPoint, final DestinationOrder order) throws Exception {
RouterClient admin = routerContext.getAdminClient();
MountTableManager mountTable = admin.getMountTableManager();
Map<String, String> destMap = new HashMap<>();
for (String nsId : cluster.getNameservices()) {
destMap.put(nsId, mountPoint);
}
MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
newEntry.setDestOrder(order);
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
boolean created = addResponse.getStatus();
assertTrue(created);
// Refresh the caches to get the mount table
Router router = routerContext.getRouter();
StateStoreService stateStore = router.getStateStore();
stateStore.refreshCaches(true);
// Check for the path
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mountPoint);
GetMountTableEntriesResponse getResponse =
mountTable.getMountTableEntries(getRequest);
List<MountTable> entries = getResponse.getEntries();
assertEquals(1, entries.size());
assertEquals(mountPoint, entries.get(0).getSourcePath());
}
}