blob: 83c2a58e5a3651ca69b03ba6fa83369b1885ee5f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestViewFileSystemHdfs.class);
private static MiniDFSCluster cluster;
private static Path defaultWorkingDirectory;
private static Path defaultWorkingDirectory2;
private static final Configuration CONF = new Configuration();
private static FileSystem fHdfs;
private static FileSystem fHdfs2;
private FileSystem fsTarget2;
Path targetTestRoot2;
@Override
protected FileSystemTestHelper createFileSystemHelper() {
return new FileSystemTestHelper("/tmp/TestViewFileSystemHdfs");
}
@BeforeClass
public static void clusterSetupAtBegining() throws IOException,
LoginException, URISyntaxException {
SupportsBlocks = true;
CONF.setBoolean(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
cluster =
new MiniDFSCluster.Builder(CONF).nnTopology(
MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(2)
.build();
cluster.waitClusterUp();
fHdfs = cluster.getFileSystem(0);
fHdfs2 = cluster.getFileSystem(1);
fHdfs.getConf().set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
FsConstants.VIEWFS_URI.toString());
fHdfs2.getConf().set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
FsConstants.VIEWFS_URI.toString());
defaultWorkingDirectory = fHdfs.makeQualified( new Path("/user/" +
UserGroupInformation.getCurrentUser().getShortUserName()));
defaultWorkingDirectory2 = fHdfs2.makeQualified( new Path("/user/" +
UserGroupInformation.getCurrentUser().getShortUserName()));
fHdfs.mkdirs(defaultWorkingDirectory);
fHdfs2.mkdirs(defaultWorkingDirectory2);
}
@AfterClass
public static void ClusterShutdownAtEnd() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Override
@Before
public void setUp() throws Exception {
// create the test root on local_fs
fsTarget = fHdfs;
fsTarget2 = fHdfs2;
targetTestRoot2 = new FileSystemTestHelper().getAbsoluteTestRootPath(fsTarget2);
super.setUp();
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
}
@Override
void setupMountPoints() {
super.setupMountPoints();
ConfigUtil.addLink(conf, "/mountOnNn2", new Path(targetTestRoot2,
"mountOnNn2").toUri());
}
// Overriden test helper methods - changed values based on hdfs and the
// additional mount.
@Override
int getExpectedDirPaths() {
return 8;
}
@Override
int getExpectedMountPoints() {
return 9;
}
@Override
int getExpectedDelegationTokenCount() {
return 2; // Mount points to 2 unique hdfs
}
@Override
int getExpectedDelegationTokenCountWithCredentials() {
return 2;
}
//Rename should fail on across different fileSystems
@Test
public void testRenameAccorssFilesystem() throws IOException {
//data is mountpoint in nn1
Path mountDataRootPath = new Path("/data");
//mountOnNn2 is nn2 mountpoint
Path fsTargetFilePath = new Path("/mountOnNn2");
Path filePath = new Path(mountDataRootPath + "/ttest");
Path hdfFilepath = new Path(fsTargetFilePath + "/ttest2");
fsView.create(filePath);
try {
fsView.rename(filePath, hdfFilepath);
ContractTestUtils.fail("Should thrown IOE on Renames across filesytems");
} catch (IOException e) {
GenericTestUtils
.assertExceptionContains("Renames across Mount points not supported",
e);
}
}
@Test
public void testTargetFileSystemLazyInitializationWithUgi() throws Exception {
final Map<String, FileSystem> map = new HashMap<>();
final Path user1Path = new Path("/data/user1");
// Scenario - 1: Create FileSystem with the current user context
// Both mkdir and delete should be successful
FileSystem fs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
fs.mkdirs(user1Path);
fs.delete(user1Path, false);
// Scenario - 2: Create FileSystem with the a different user context
final UserGroupInformation userUgi = UserGroupInformation
.createUserForTesting("user1@HADOOP.COM", new String[]{"hadoop"});
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String doAsUserName = ugi.getUserName();
assertEquals(doAsUserName, "user1@HADOOP.COM");
FileSystem viewFS = FileSystem.get(FsConstants.VIEWFS_URI, conf);
map.put("user1", viewFS);
return null;
}
});
// Try creating a directory with the file context created by a different ugi
// Though we are running the mkdir with the current user context, the
// target filesystem will be instantiated by the ugi with which the
// file context was created.
try {
FileSystem otherfs = map.get("user1");
otherfs.mkdirs(user1Path);
fail("This mkdir should fail");
} catch (AccessControlException ex) {
// Exception is expected as the FileSystem was created with ugi of user1
// So when we are trying to access the /user/user1 path for the first
// time, the corresponding file system is initialized and it tries to
// execute the task with ugi with which the FileSystem was created.
}
// Change the permission of /data path so that user1 can create a directory
fsTarget.setOwner(new Path(targetTestRoot, "data"),
"user1", "test2");
// set permission of target to allow rename to target
fsTarget.setPermission(new Path(targetTestRoot, "data"),
new FsPermission("775"));
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException {
FileSystem viewFS = FileSystem.get(FsConstants.VIEWFS_URI, conf);
map.put("user1", viewFS);
return null;
}
});
// Although we are running with current user context, and current user
// context does not have write permission, we are able to create the
// directory as its using ugi of user1 which has write permission.
FileSystem otherfs = map.get("user1");
otherfs.mkdirs(user1Path);
String owner = otherfs.getFileStatus(user1Path).getOwner();
assertEquals("The owner did not match ", owner, userUgi.getShortUserName());
otherfs.delete(user1Path, false);
}
public void testNflyClosestRepair() throws Exception {
testNflyRepair(NflyFSystem.NflyKey.repairOnRead);
}
@Test
public void testNflyMostRecentRepair() throws Exception {
testNflyRepair(NflyFSystem.NflyKey.readMostRecent);
}
private void testNflyRepair(NflyFSystem.NflyKey repairKey)
throws Exception {
LOG.info("Starting testNflyWriteSimpleFailover");
final URI uri1 = targetTestRoot.toUri();
final URI uri2 = targetTestRoot2.toUri();
final URI[] testUris = new URI[] {
new URI(uri1.getScheme(), uri1.getAuthority(), "/", null, null),
new URI(uri2.getScheme(), uri2.getAuthority(), "/", null, null)
};
final Configuration testConf = new Configuration(conf);
testConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
final String testString = "Hello Nfly!";
final Path nflyRoot = new Path("/nflyroot");
ConfigUtil.addLinkNfly(testConf,
Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
nflyRoot.toString(),
"minReplication=2," + repairKey + "=true", testUris);
final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
// wd = /nflyroot/user/<user>
nfly.setWorkingDirectory(new Path(nflyRoot
+ nfly.getWorkingDirectory().toUri().getPath()));
// 1. test mkdirs
final Path testDir = new Path("testdir1/sub1/sub3");
final Path testDir_tmp = new Path("testdir1/sub1/sub3_temp");
assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir));
// Test renames
assertTrue(nfly.rename(testDir, testDir_tmp));
assertTrue(nfly.rename(testDir_tmp, testDir));
for (final URI testUri : testUris) {
final FileSystem fs = FileSystem.get(testUri, testConf);
assertTrue(testDir + " should exist!", fs.exists(testDir));
}
// 2. test write
final Path testFile = new Path("test.txt");
final FSDataOutputStream fsDos = nfly.create(testFile);
try {
fsDos.writeUTF(testString);
} finally {
fsDos.close();
}
for (final URI testUri : testUris) {
final FileSystem fs = FileSystem.get(testUri, testConf);
final FSDataInputStream fsdis = fs.open(testFile);
try {
assertEquals("Wrong file content", testString, fsdis.readUTF());
} finally {
fsdis.close();
}
}
// 3. test reads when one unavailable
//
// bring one NN down and read through nfly should still work
//
for (int i = 0; i < cluster.getNumNameNodes(); i++) {
cluster.shutdownNameNode(i);
FSDataInputStream fsDis = null;
try {
fsDis = nfly.open(testFile);
assertEquals("Wrong file content", testString, fsDis.readUTF());
} finally {
IOUtils.cleanupWithLogger(LOG, fsDis);
cluster.restartNameNode(i);
}
}
// both nodes are up again, test repair
final FileSystem fs1 = FileSystem.get(testUris[0], conf);
assertTrue(fs1.delete(testFile, false));
assertFalse(fs1.exists(testFile));
FSDataInputStream fsDis = null;
try {
fsDis = nfly.open(testFile);
assertEquals("Wrong file content", testString, fsDis.readUTF());
assertTrue(fs1.exists(testFile));
} finally {
IOUtils.cleanupWithLogger(LOG, fsDis);
}
// test most recent repair
if (repairKey == NflyFSystem.NflyKey.readMostRecent) {
final FileSystem fs2 = FileSystem.get(testUris[0], conf);
final long expectedMtime = fs2.getFileStatus(testFile)
.getModificationTime();
for (final URI testUri : testUris) {
final FileSystem fs = FileSystem.get(testUri, conf);
fs.setTimes(testFile, 1L, 1L);
assertEquals(testUri + "Set mtime failed!", 1L,
fs.getFileStatus(testFile).getModificationTime());
assertEquals("nfly file status wrong", expectedMtime,
nfly.getFileStatus(testFile).getModificationTime());
FSDataInputStream fsDis2 = null;
try {
fsDis2 = nfly.open(testFile);
assertEquals("Wrong file content", testString, fsDis2.readUTF());
// repair is done, now trying via normal fs
//
assertEquals("Repair most recent failed!", expectedMtime,
fs.getFileStatus(testFile).getModificationTime());
} finally {
IOUtils.cleanupWithLogger(LOG, fsDis2);
}
}
}
}
}