blob: 28956928d4db62d50be08023cdacf8eb8ce84ee8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ImpersonationProvider;
import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.security.PrivilegedExceptionAction;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE;
import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING;
import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PRINCIPAL;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Basic tests of router federation rename. Rename across namespaces.
*/
public class TestRouterFederationRenameInKerberosEnv
extends ClientBaseWithFixes {
private static final int NUM_SUBCLUSTERS = 2;
private static final int NUM_DNS = 6;
private static String clientPrincipal = "client@EXAMPLE.COM";
private static String serverPrincipal = System.getenv().get("USERNAME")
+ "/localhost@EXAMPLE.COM";
private static String keytab = new File(
System.getProperty("test.dir", "target"),
UUID.randomUUID().toString())
.getAbsolutePath();
private static Configuration baseConf = new Configuration(false);
private static MiniKdc kdc;
/** Federated HDFS cluster. */
private MiniRouterDFSCluster cluster;
/** Random Router for this federated cluster. */
private RouterContext router;
@BeforeClass
public static void globalSetUp() throws Exception {
// init KDC
File workDir = new File(System.getProperty("test.dir", "target"));
kdc = new MiniKdc(MiniKdc.createConf(), workDir);
kdc.start();
kdc.createPrincipal(new File(keytab), clientPrincipal, serverPrincipal);
baseConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
true);
SecurityUtil.setAuthenticationMethod(KERBEROS, baseConf);
baseConf.set(RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_KEY,
serverPrincipal);
baseConf.set(RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY, keytab);
baseConf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
serverPrincipal);
baseConf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
baseConf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
serverPrincipal);
baseConf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
baseConf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
true);
baseConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY,
DFS_DATA_TRANSFER_PROTECTION_DEFAULT);
baseConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);
baseConf.setClass(HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS,
AllowUserImpersonationProvider.class, ImpersonationProvider.class);
DistCpProcedure.enableForTest();
}
/**
* {@link ImpersonationProvider} that confirms the user doing the
* impersonating is the same as the user running the MiniCluster.
*/
private static class AllowUserImpersonationProvider extends Configured
implements ImpersonationProvider {
public void init(String configurationPrefix) {
// Do nothing
}
public void authorize(UserGroupInformation user, InetAddress remoteAddress)
throws AuthorizationException {
try {
if (!user.getRealUser().getShortUserName()
.equals(UserGroupInformation.getCurrentUser().getShortUserName())) {
throw new AuthorizationException();
}
} catch (IOException ioe) {
throw new AuthorizationException(ioe);
}
}
}
@AfterClass
public static void globalTearDown() {
kdc.stop();
DistCpProcedure.disableForTest();
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
cluster.shutdown();
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
cluster.setNumDatanodesPerNameservice(NUM_DNS);
cluster.addNamenodeOverrides(baseConf);
cluster.setIndependentDNs();
// Start NNs and DNs and wait until ready.
cluster.startCluster();
// Start routers, enable router federation rename.
String journal = "hdfs://" + cluster.getCluster().getNameNode(1)
.getClientNamenodeAddress() + "/journal";
Configuration routerConf = new RouterConfigBuilder()
.metrics()
.rpc()
.routerRenameOption()
.set(SCHEDULER_JOURNAL_URI, journal)
.set(DFS_ROUTER_FEDERATION_RENAME_MAP, "1")
.set(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, "1")
.set(ZK_DTSM_ZK_CONNECTION_STRING, hostPort)
.set(ZK_DTSM_ZK_AUTH_TYPE, "none")
.set(RM_PRINCIPAL, serverPrincipal)
.build();
// We decrease the DN cache times to make the test faster.
routerConf.setTimeDuration(
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
cluster.addRouterOverrides(baseConf);
cluster.addRouterOverrides(routerConf);
cluster.startRouters();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
// We decrease the DN heartbeat expire interval to make them dead faster
cluster.getCluster().getNamesystem(0).getBlockManager()
.getDatanodeManager().setHeartbeatInterval(1);
cluster.getCluster().getNamesystem(1).getBlockManager()
.getDatanodeManager().setHeartbeatInterval(1);
cluster.getCluster().getNamesystem(0).getBlockManager()
.getDatanodeManager().setHeartbeatExpireInterval(3000);
cluster.getCluster().getNamesystem(1).getBlockManager()
.getDatanodeManager().setHeartbeatExpireInterval(3000);
// Create mock locations
cluster.installMockLocations();
// Create test fixtures on NN
cluster.createTestDirectoriesNamenode();
// Random router for this test
RouterContext rndRouter = cluster.getRandomRouter();
setRouter(rndRouter);
}
protected void prepareEnv(FileSystem fs, Path path, Path renamedPath)
throws IOException {
// Set permission of parent to 777.
fs.setPermission(path.getParent(),
FsPermission.createImmutable((short)511));
fs.setPermission(renamedPath.getParent(),
FsPermission.createImmutable((short)511));
// Create src path and file.
fs.mkdirs(path);
String file = path.toString() + "/file";
createFile(fs, file, 32);
verifyFileExists(fs, path.toString());
verifyFileExists(fs, file);
}
protected void testRenameDir(RouterContext testRouter, String path,
String renamedPath, boolean exceptionExpected, Callable<Object> call)
throws IOException {
prepareEnv(testRouter.getFileSystem(), new Path(path),
new Path(renamedPath));
// rename
boolean exceptionThrown = false;
try {
call.call();
assertFalse(verifyFileExists(testRouter.getFileSystem(), path));
assertTrue(verifyFileExists(testRouter.getFileSystem(),
renamedPath + "/file"));
} catch (Exception ex) {
exceptionThrown = true;
assertTrue(verifyFileExists(testRouter.getFileSystem(),
path + "/file"));
assertFalse(verifyFileExists(testRouter.getFileSystem(), renamedPath));
} finally {
FileContext fileContext = testRouter.getFileContext();
fileContext.delete(new Path(path), true);
fileContext.delete(new Path(renamedPath), true);
}
if (exceptionExpected) {
// Error was expected.
assertTrue(exceptionThrown);
} else {
// No error was expected.
assertFalse(exceptionThrown);
}
}
protected void setRouter(RouterContext r) throws IOException {
this.router = r;
}
@Test
public void testClientRename() throws IOException {
String ns0 = cluster.getNameservices().get(0);
String ns1 = cluster.getNameservices().get(1);
// Test successfully rename a dir to a destination that is in a different
// namespace.
String dir =
cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName();
String renamedDir =
cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName();
testRenameDir(router, dir, renamedDir, false, () -> {
UserGroupInformation ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(clientPrincipal, keytab);
ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {
DFSClient client = router.getClient();
ClientProtocol clientProtocol = client.getNamenode();
clientProtocol.rename(dir, renamedDir);
return null;
});
return null;
});
}
}