| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode.ha; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.AbstractFileSystem; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; |
| import org.apache.hadoop.hdfs.*; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; |
| import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.RetriableException; |
| import org.apache.hadoop.ipc.StandbyException; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.SecurityUtilTestHelper; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.test.Whitebox; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.event.Level; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.Collection; |
| import java.util.HashSet; |
| |
| import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.OBSERVER_PROBE_RETRY_PERIOD_KEY; |
| import static org.junit.Assert.*; |
| |
| /** |
| * Test case for client support of delegation tokens in an HA cluster. |
| * See HDFS-2904 for more info. |
| **/ |
| public class TestDelegationTokensWithHA { |
| private static final Configuration conf = new Configuration(); |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestDelegationTokensWithHA.class); |
| private static MiniDFSCluster cluster; |
| private static NameNode nn0; |
| private static NameNode nn1; |
| private static FileSystem fs; |
| private static DelegationTokenSecretManager dtSecretManager; |
| private static DistributedFileSystem dfs; |
| |
| private volatile boolean catchup = false; |
| |
| @Before |
| public void setupCluster() throws Exception { |
| SecurityUtilTestHelper.setTokenServiceUseIp(true); |
| |
| conf.setBoolean( |
| DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); |
| conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL, |
| "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT"); |
| |
| cluster = new MiniDFSCluster.Builder(conf) |
| .nnTopology(MiniDFSNNTopology.simpleHATopology()) |
| .numDataNodes(0) |
| .build(); |
| cluster.waitActive(); |
| |
| String logicalName = HATestUtil.getLogicalHostname(cluster); |
| HATestUtil.setFailoverConfigurations(cluster, conf, logicalName, null, 0); |
| |
| nn0 = cluster.getNameNode(0); |
| nn1 = cluster.getNameNode(1); |
| fs = HATestUtil.configureFailoverFs(cluster, conf); |
| dfs = (DistributedFileSystem)fs; |
| |
| cluster.transitionToActive(0); |
| dtSecretManager = NameNodeAdapter.getDtSecretManager( |
| nn0.getNamesystem()); |
| } |
| |
| @After |
| public void shutdownCluster() throws IOException { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| } |
| |
| /** |
| * Test that, when using ObserverReadProxyProvider with DT authentication, |
| * the ORPP gracefully handles when the Standby NN throws a StandbyException. |
| */ |
| @Test(timeout = 300000) |
| public void testObserverReadProxyProviderWithDT() throws Exception { |
| // Make the first node standby, so that the ORPP will try it first |
| // instead of just using and succeeding on the active |
| conf.setInt(OBSERVER_PROBE_RETRY_PERIOD_KEY, 0); |
| cluster.transitionToStandby(0); |
| cluster.transitionToActive(1); |
| |
| HATestUtil.setFailoverConfigurations(cluster, conf, |
| HATestUtil.getLogicalHostname(cluster), 0, |
| ObserverReadProxyProvider.class); |
| conf.setBoolean("fs.hdfs.impl.disable.cache", true); |
| |
| dfs = (DistributedFileSystem) FileSystem.get(conf); |
| final UserGroupInformation ugi = UserGroupInformation |
| .createRemoteUser("JobTracker"); |
| final Token<DelegationTokenIdentifier> token = |
| getDelegationToken(dfs, ugi.getShortUserName()); |
| ugi.addToken(token); |
| // Recreate the DFS, this time authenticating using a DT |
| dfs = ugi.doAs((PrivilegedExceptionAction<DistributedFileSystem>) |
| () -> (DistributedFileSystem) FileSystem.get(conf)); |
| |
| GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG); |
| GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer |
| .captureLogs(ObserverReadProxyProvider.LOG); |
| try { |
| dfs.access(new Path("/"), FsAction.READ); |
| assertTrue(logCapture.getOutput() |
| .contains("threw StandbyException when fetching HAState")); |
| HATestUtil.isSentToAnyOfNameNodes(dfs, cluster, 1); |
| |
| cluster.shutdownNameNode(0); |
| logCapture.clearOutput(); |
| dfs.access(new Path("/"), FsAction.READ); |
| assertTrue(logCapture.getOutput().contains("Failed to connect to")); |
| } finally { |
| logCapture.stopCapturing(); |
| } |
| } |
| |
| @Test(timeout = 300000) |
| public void testDelegationTokenDFSApi() throws Exception { |
| final Token<DelegationTokenIdentifier> token = |
| getDelegationToken(fs, "JobTracker"); |
| DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); |
| byte[] tokenId = token.getIdentifier(); |
| identifier.readFields(new DataInputStream( |
| new ByteArrayInputStream(tokenId))); |
| |
| // Ensure that it's present in the NN's secret manager and can |
| // be renewed directly from there. |
| LOG.info("A valid token should have non-null password, " + |
| "and should be renewed successfully"); |
| assertTrue(null != dtSecretManager.retrievePassword(identifier)); |
| dtSecretManager.renewToken(token, "JobTracker"); |
| |
| // Use the client conf with the failover info present to check |
| // renewal. |
| Configuration clientConf = dfs.getConf(); |
| doRenewOrCancel(token, clientConf, TokenTestAction.RENEW); |
| |
| // Using a configuration that doesn't have the logical nameservice |
| // configured should result in a reasonable error message. |
| Configuration emptyConf = new Configuration(); |
| try { |
| doRenewOrCancel(token, emptyConf, TokenTestAction.RENEW); |
| fail("Did not throw trying to renew with an empty conf!"); |
| } catch (IOException ioe) { |
| GenericTestUtils.assertExceptionContains( |
| "Unable to map logical nameservice URI", ioe); |
| } |
| |
| |
| // Ensure that the token can be renewed again after a failover. |
| cluster.transitionToStandby(0); |
| cluster.transitionToActive(1); |
| doRenewOrCancel(token, clientConf, TokenTestAction.RENEW); |
| |
| doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL); |
| } |
| |
| private class EditLogTailerForTest extends EditLogTailer { |
| public EditLogTailerForTest(FSNamesystem namesystem, Configuration conf) { |
| super(namesystem, conf); |
| } |
| |
| public void catchupDuringFailover() throws IOException { |
| synchronized (TestDelegationTokensWithHA.this) { |
| while (!catchup) { |
| try { |
| LOG.info("The editlog tailer is waiting to catchup..."); |
| TestDelegationTokensWithHA.this.wait(); |
| } catch (InterruptedException e) {} |
| } |
| } |
| super.catchupDuringFailover(); |
| } |
| } |
| |
| /** |
| * Test if correct exception (StandbyException or RetriableException) can be |
| * thrown during the NN failover. |
| */ |
| @Test(timeout = 300000) |
| public void testDelegationTokenDuringNNFailover() throws Exception { |
| EditLogTailer editLogTailer = nn1.getNamesystem().getEditLogTailer(); |
| // stop the editLogTailer of nn1 |
| editLogTailer.stop(); |
| Configuration conf = (Configuration) Whitebox.getInternalState( |
| editLogTailer, "conf"); |
| nn1.getNamesystem().setEditLogTailerForTests( |
| new EditLogTailerForTest(nn1.getNamesystem(), conf)); |
| |
| // create token |
| final Token<DelegationTokenIdentifier> token = |
| getDelegationToken(fs, "JobTracker"); |
| DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); |
| byte[] tokenId = token.getIdentifier(); |
| identifier.readFields(new DataInputStream( |
| new ByteArrayInputStream(tokenId))); |
| |
| // Ensure that it's present in the nn0 secret manager and can |
| // be renewed directly from there. |
| LOG.info("A valid token should have non-null password, " + |
| "and should be renewed successfully"); |
| assertTrue(null != dtSecretManager.retrievePassword(identifier)); |
| dtSecretManager.renewToken(token, "JobTracker"); |
| |
| // transition nn0 to standby |
| cluster.transitionToStandby(0); |
| |
| try { |
| cluster.getNameNodeRpc(0).renewDelegationToken(token); |
| fail("StandbyException is expected since nn0 is in standby state"); |
| } catch (StandbyException e) { |
| GenericTestUtils.assertExceptionContains( |
| HAServiceState.STANDBY.toString(), e); |
| } |
| |
| new Thread() { |
| @Override |
| public void run() { |
| try { |
| cluster.transitionToActive(1); |
| } catch (Exception e) { |
| LOG.error("Transition nn1 to active failed", e); |
| } |
| } |
| }.start(); |
| |
| Thread.sleep(1000); |
| try { |
| nn1.getNamesystem().verifyToken(token.decodeIdentifier(), |
| token.getPassword()); |
| fail("RetriableException/StandbyException is expected since nn1 is in transition"); |
| } catch (IOException e) { |
| assertTrue(e instanceof StandbyException |
| || e instanceof RetriableException); |
| LOG.info("Got expected exception", e); |
| } |
| |
| catchup = true; |
| synchronized (this) { |
| this.notifyAll(); |
| } |
| |
| Configuration clientConf = dfs.getConf(); |
| doRenewOrCancel(token, clientConf, TokenTestAction.RENEW); |
| doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL); |
| } |
| |
| @Test(timeout = 300000) |
| public void testDelegationTokenWithDoAs() throws Exception { |
| final Token<DelegationTokenIdentifier> token = |
| getDelegationToken(fs, "JobTracker"); |
| final UserGroupInformation longUgi = UserGroupInformation |
| .createRemoteUser("JobTracker/foo.com@FOO.COM"); |
| final UserGroupInformation shortUgi = UserGroupInformation |
| .createRemoteUser("JobTracker"); |
| longUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| // try renew with long name |
| token.renew(conf); |
| return null; |
| } |
| }); |
| shortUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| token.renew(conf); |
| return null; |
| } |
| }); |
| longUgi.doAs(new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| token.cancel(conf); |
| return null; |
| } |
| }); |
| } |
| |
| @Test(timeout = 300000) |
| public void testHAUtilClonesDelegationTokens() throws Exception { |
| final Token<DelegationTokenIdentifier> token = |
| getDelegationToken(fs, "JobTracker"); |
| |
| UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test"); |
| |
| URI haUri = new URI("hdfs://my-ha-uri/"); |
| token.setService(HAUtilClient.buildTokenServiceForLogicalUri(haUri, |
| HdfsConstants.HDFS_URI_SCHEME)); |
| ugi.addToken(token); |
| |
| Collection<InetSocketAddress> nnAddrs = new HashSet<InetSocketAddress>(); |
| nnAddrs.add(new InetSocketAddress("localhost", |
| nn0.getNameNodeAddress().getPort())); |
| nnAddrs.add(new InetSocketAddress("localhost", |
| nn1.getNameNodeAddress().getPort())); |
| HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); |
| |
| Collection<Token<? extends TokenIdentifier>> tokens = ugi.getTokens(); |
| assertEquals(3, tokens.size()); |
| |
| LOG.info("Tokens:\n" + Joiner.on("\n").join(tokens)); |
| DelegationTokenSelector dts = new DelegationTokenSelector(); |
| |
| // check that the token selected for one of the physical IPC addresses |
| // matches the one we received |
| for (InetSocketAddress addr : nnAddrs) { |
| Text ipcDtService = SecurityUtil.buildTokenService(addr); |
| Token<DelegationTokenIdentifier> token2 = |
| dts.selectToken(ipcDtService, ugi.getTokens()); |
| assertNotNull(token2); |
| assertArrayEquals(token.getIdentifier(), token2.getIdentifier()); |
| assertArrayEquals(token.getPassword(), token2.getPassword()); |
| } |
| |
| // switch to host-based tokens, shouldn't match existing tokens |
| SecurityUtilTestHelper.setTokenServiceUseIp(false); |
| for (InetSocketAddress addr : nnAddrs) { |
| Text ipcDtService = SecurityUtil.buildTokenService(addr); |
| Token<DelegationTokenIdentifier> token2 = |
| dts.selectToken(ipcDtService, ugi.getTokens()); |
| assertNull(token2); |
| } |
| |
| // reclone the tokens, and see if they match now |
| HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); |
| for (InetSocketAddress addr : nnAddrs) { |
| Text ipcDtService = SecurityUtil.buildTokenService(addr); |
| Token<DelegationTokenIdentifier> token2 = |
| dts.selectToken(ipcDtService, ugi.getTokens()); |
| assertNotNull(token2); |
| assertArrayEquals(token.getIdentifier(), token2.getIdentifier()); |
| assertArrayEquals(token.getPassword(), token2.getPassword()); |
| } |
| } |
| |
| /** |
| * HDFS-3062: DistributedFileSystem.getCanonicalServiceName() throws an |
| * exception if the URI is a logical URI. This bug fails the combination of |
| * ha + mapred + security. |
| */ |
| @Test(timeout = 300000) |
| public void testDFSGetCanonicalServiceName() throws Exception { |
| URI hAUri = HATestUtil.getLogicalUri(cluster); |
| String haService = HAUtilClient.buildTokenServiceForLogicalUri(hAUri, |
| HdfsConstants.HDFS_URI_SCHEME).toString(); |
| assertEquals(haService, dfs.getCanonicalServiceName()); |
| final String renewer = UserGroupInformation.getCurrentUser().getShortUserName(); |
| final Token<DelegationTokenIdentifier> token = |
| getDelegationToken(dfs, renewer); |
| assertEquals(haService, token.getService().toString()); |
| // make sure the logical uri is handled correctly |
| token.renew(dfs.getConf()); |
| token.cancel(dfs.getConf()); |
| } |
| |
| @Test(timeout = 300000) |
| public void testHdfsGetCanonicalServiceName() throws Exception { |
| Configuration conf = dfs.getConf(); |
| URI haUri = HATestUtil.getLogicalUri(cluster); |
| AbstractFileSystem afs = AbstractFileSystem.createFileSystem(haUri, conf); |
| String haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri, |
| HdfsConstants.HDFS_URI_SCHEME).toString(); |
| assertEquals(haService, afs.getCanonicalServiceName()); |
| Token<?> token = afs.getDelegationTokens( |
| UserGroupInformation.getCurrentUser().getShortUserName()).get(0); |
| assertEquals(haService, token.getService().toString()); |
| // make sure the logical uri is handled correctly |
| token.renew(conf); |
| token.cancel(conf); |
| } |
| |
| @Test(timeout = 300000) |
| public void testCancelAndUpdateDelegationTokens() throws Exception { |
| // Create UGI with token1 |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user); |
| |
| ugi1.doAs(new PrivilegedExceptionAction<Void>() { |
| public Void run() throws Exception { |
| final Token<DelegationTokenIdentifier> token1 = |
| getDelegationToken(fs, "JobTracker"); |
| UserGroupInformation.getCurrentUser() |
| .addToken(token1.getService(), token1); |
| |
| FileSystem fs1 = HATestUtil.configureFailoverFs(cluster, conf); |
| |
| // Cancel token1 |
| doRenewOrCancel(token1, conf, TokenTestAction.CANCEL); |
| |
| // Update UGI with token2 |
| final Token<DelegationTokenIdentifier> token2 = |
| getDelegationToken(fs, "JobTracker"); |
| UserGroupInformation.getCurrentUser() |
| .addToken(token2.getService(), token2); |
| |
| // Check whether token2 works |
| fs1.listFiles(new Path("/"), false); |
| return null; |
| } |
| }); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs, |
| String renewer) throws IOException { |
| final Token<?> tokens[] = fs.addDelegationTokens(renewer, null); |
| assertEquals(1, tokens.length); |
| return (Token<DelegationTokenIdentifier>) tokens[0]; |
| } |
| enum TokenTestAction { |
| RENEW, CANCEL; |
| } |
| |
| private static void doRenewOrCancel( |
| final Token<DelegationTokenIdentifier> token, final Configuration conf, |
| final TokenTestAction action) |
| throws IOException, InterruptedException { |
| UserGroupInformation.createRemoteUser("JobTracker").doAs( |
| new PrivilegedExceptionAction<Void>() { |
| @Override |
| public Void run() throws Exception { |
| switch (action) { |
| case RENEW: |
| token.renew(conf); |
| break; |
| case CANCEL: |
| token.cancel(conf); |
| break; |
| default: |
| fail("bad action:" + action); |
| } |
| return null; |
| } |
| }); |
| } |
| } |