| /** Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.security; |
| |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.mock; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.security.NoSuchAlgorithmException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import javax.crypto.KeyGenerator; |
| import javax.crypto.spec.SecretKeySpec; |
| |
| import org.apache.commons.codec.binary.Base64; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.viewfs.ViewFileSystem; |
| import org.apache.hadoop.hdfs.HftpFileSystem; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; |
| import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.Master; |
| import org.apache.hadoop.mapred.MiniMRCluster; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.SleepJob; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.SecurityUtil; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| public class TestTokenCache { |
| private static final int NUM_OF_KEYS = 10; |
| |
| // my sleep class - adds check for tokenCache |
| static class MySleepMapper extends SleepJob.SleepMapper { |
| /** |
| * attempts to access tokenCache as from client |
| */ |
| @Override |
| public void map(IntWritable key, IntWritable value, Context context) |
| throws IOException, InterruptedException { |
| // get token storage and a key |
| Credentials ts = context.getCredentials(); |
| byte[] key1 = ts.getSecretKey(new Text("alias1")); |
| Collection<Token<? extends TokenIdentifier>> dts = ts.getAllTokens(); |
| int dts_size = 0; |
| if(dts != null) |
| dts_size = dts.size(); |
| |
| |
| if(dts_size != 2) { // one job token and one delegation token |
| throw new RuntimeException("tokens are not available"); // fail the test |
| } |
| |
| |
| if(key1 == null || ts == null || ts.numberOfSecretKeys() != NUM_OF_KEYS) { |
| throw new RuntimeException("secret keys are not available"); // fail the test |
| } |
| super.map(key, value, context); |
| } |
| } |
| |
| class MySleepJob extends SleepJob { |
| @Override |
| public Job createJob(int numMapper, int numReducer, |
| long mapSleepTime, int mapSleepCount, |
| long reduceSleepTime, int reduceSleepCount) |
| throws IOException { |
| Job job = super.createJob(numMapper, numReducer, |
| mapSleepTime, mapSleepCount, |
| reduceSleepTime, reduceSleepCount); |
| |
| job.setMapperClass(MySleepMapper.class); |
| //Populate tokens here because security is disabled. |
| populateTokens(job); |
| return job; |
| } |
| |
| private void populateTokens(Job job) { |
| // Credentials in the job will not have delegation tokens |
| // because security is disabled. Fetch delegation tokens |
| // and populate the credential in the job. |
| try { |
| Credentials ts = job.getCredentials(); |
| Path p1 = new Path("file1"); |
| p1 = p1.getFileSystem(job.getConfiguration()).makeQualified(p1); |
| Credentials cred = new Credentials(); |
| TokenCache.obtainTokensForNamenodesInternal(cred, new Path[] { p1 }, |
| job.getConfiguration()); |
| for (Token<? extends TokenIdentifier> t : cred.getAllTokens()) { |
| ts.addToken(new Text("Hdfs"), t); |
| } |
| } catch (IOException e) { |
| Assert.fail("Exception " + e); |
| } |
| } |
| } |
| |
| private static MiniMRCluster mrCluster; |
| private static MiniDFSCluster dfsCluster; |
| private static final Path TEST_DIR = |
| new Path(System.getProperty("test.build.data","/tmp"), "sleepTest"); |
| private static final Path tokenFileName = new Path(TEST_DIR, "tokenFile.json"); |
| private static int numSlaves = 1; |
| private static JobConf jConf; |
| private static ObjectMapper mapper = new ObjectMapper(); |
| private static Path p1; |
| private static Path p2; |
| |
| @BeforeClass |
| public static void setUp() throws Exception { |
| |
| Configuration conf = new Configuration(); |
| conf.set("hadoop.security.auth_to_local", "RULE:[2:$1]"); |
| dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null); |
| jConf = new JobConf(conf); |
| mrCluster = new MiniMRCluster(0, 0, numSlaves, |
| dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, |
| jConf); |
| |
| createTokenFileJson(); |
| verifySecretKeysInJSONFile(); |
| NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()).startThreads(); |
| FileSystem fs = dfsCluster.getFileSystem(); |
| |
| p1 = new Path("file1"); |
| p2 = new Path("file2"); |
| |
| p1 = fs.makeQualified(p1); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws Exception { |
| if(mrCluster != null) |
| mrCluster.shutdown(); |
| mrCluster = null; |
| if(dfsCluster != null) |
| dfsCluster.shutdown(); |
| dfsCluster = null; |
| } |
| |
| // create jason file and put some keys into it.. |
| private static void createTokenFileJson() throws IOException { |
| Map<String, String> map = new HashMap<String, String>(); |
| |
| try { |
| KeyGenerator kg = KeyGenerator.getInstance("HmacSHA1"); |
| for(int i=0; i<NUM_OF_KEYS; i++) { |
| SecretKeySpec key = (SecretKeySpec) kg.generateKey(); |
| byte [] enc_key = key.getEncoded(); |
| map.put("alias"+i, new String(Base64.encodeBase64(enc_key))); |
| |
| } |
| } catch (NoSuchAlgorithmException e) { |
| throw new IOException(e); |
| } |
| |
| try { |
| File p = new File(tokenFileName.getParent().toString()); |
| p.mkdirs(); |
| // convert to JSON and save to the file |
| mapper.writeValue(new File(tokenFileName.toString()), map); |
| |
| } catch (Exception e) { |
| System.out.println("failed with :" + e.getLocalizedMessage()); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static void verifySecretKeysInJSONFile() throws IOException { |
| Map<String, String> map; |
| map = mapper.readValue(new File(tokenFileName.toString()), Map.class); |
| assertEquals("didn't read JSON correctly", map.size(), NUM_OF_KEYS); |
| } |
| |
| /** |
| * run a distributed job and verify that TokenCache is available |
| * @throws IOException |
| */ |
| @Test |
| public void testTokenCache() throws IOException { |
| |
| System.out.println("running dist job"); |
| |
| // make sure JT starts |
| jConf = mrCluster.createJobConf(); |
| |
| // provide namenodes names for the job to get the delegation tokens for |
| String nnUri = dfsCluster.getURI(0).toString(); |
| jConf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri); |
| // job tracker principla id.. |
| jConf.set(JTConfig.JT_USER_NAME, "jt_id/foo@BAR"); |
| |
| // using argument to pass the file name |
| String[] args = { |
| "-tokenCacheFile", tokenFileName.toString(), |
| "-m", "1", "-r", "1", "-mt", "1", "-rt", "1" |
| }; |
| |
| int res = -1; |
| try { |
| res = ToolRunner.run(jConf, new MySleepJob(), args); |
| } catch (Exception e) { |
| System.out.println("Job failed with" + e.getLocalizedMessage()); |
| e.printStackTrace(System.out); |
| fail("Job failed"); |
| } |
| assertEquals("dist job res is not 0", res, 0); |
| } |
| |
| /** |
| * run a local job and verify that TokenCache is available |
| * @throws NoSuchAlgorithmException |
| * @throws IOException |
| */ |
| @Test |
| public void testLocalJobTokenCache() throws NoSuchAlgorithmException, IOException { |
| |
| System.out.println("running local job"); |
| // this is local job |
| String[] args = {"-m", "1", "-r", "1", "-mt", "1", "-rt", "1"}; |
| jConf.set("mapreduce.job.credentials.json", tokenFileName.toString()); |
| |
| int res = -1; |
| try { |
| res = ToolRunner.run(jConf, new MySleepJob(), args); |
| } catch (Exception e) { |
| System.out.println("Job failed with" + e.getLocalizedMessage()); |
| e.printStackTrace(System.out); |
| fail("local Job failed"); |
| } |
| assertEquals("local job res is not 0", res, 0); |
| } |
| |
| @Test |
| public void testGetTokensForNamenodes() throws IOException { |
| |
| Credentials credentials = new Credentials(); |
| TokenCache.obtainTokensForNamenodesInternal(credentials, new Path[] { p1, |
| p2 }, jConf); |
| |
| // this token is keyed by hostname:port key. |
| String fs_addr = |
| SecurityUtil.buildDTServiceName(p1.toUri(), NameNode.DEFAULT_PORT); |
| Token<DelegationTokenIdentifier> nnt = TokenCache.getDelegationToken( |
| credentials, fs_addr); |
| System.out.println("dt for " + p1 + "(" + fs_addr + ")" + " = " + nnt); |
| assertNotNull("Token for nn is null", nnt); |
| |
| // verify the size |
| Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens(); |
| assertEquals("number of tokens is not 1", 1, tns.size()); |
| |
| boolean found = false; |
| for(Token<? extends TokenIdentifier> t: tns) { |
| if(t.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) && |
| t.getService().equals(new Text(fs_addr))) { |
| found = true; |
| } |
| assertTrue("didn't find token for " + p1 ,found); |
| } |
| } |
| |
| @Test |
| public void testGetTokensForHftpFS() throws IOException, URISyntaxException { |
| HftpFileSystem hfs = mock(HftpFileSystem.class); |
| |
| DelegationTokenSecretManager dtSecretManager = |
| NameNodeAdapter.getDtSecretManager(dfsCluster.getNamesystem()); |
| String renewer = "renewer"; |
| jConf.set(JTConfig.JT_USER_NAME,renewer); |
| DelegationTokenIdentifier dtId = |
| new DelegationTokenIdentifier(new Text("user"), new Text(renewer), null); |
| final Token<DelegationTokenIdentifier> t = |
| new Token<DelegationTokenIdentifier>(dtId, dtSecretManager); |
| |
| final URI uri = new URI("hftp://host:2222/file1"); |
| final String fs_addr = |
| SecurityUtil.buildDTServiceName(uri, NameNode.DEFAULT_PORT); |
| t.setService(new Text(fs_addr)); |
| |
| //when(hfs.getUri()).thenReturn(uri); |
| Mockito.doAnswer(new Answer<URI>(){ |
| @Override |
| public URI answer(InvocationOnMock invocation) |
| throws Throwable { |
| return uri; |
| }}).when(hfs).getUri(); |
| |
| //when(hfs.getDelegationToken()).thenReturn((Token<? extends TokenIdentifier>) t); |
| Mockito.doAnswer(new Answer<Token<DelegationTokenIdentifier>>(){ |
| @Override |
| public Token<DelegationTokenIdentifier> answer(InvocationOnMock invocation) |
| throws Throwable { |
| return t; |
| }}).when(hfs).getDelegationToken(renewer); |
| |
| //when(hfs.getDelegationTokens()).thenReturn((Token<? extends TokenIdentifier>) t); |
| Mockito.doAnswer(new Answer<List<Token<DelegationTokenIdentifier>>>(){ |
| @Override |
| public List<Token<DelegationTokenIdentifier>> answer(InvocationOnMock invocation) |
| throws Throwable { |
| return Collections.singletonList(t); |
| }}).when(hfs).getDelegationTokens(renewer); |
| |
| //when(hfs.getCanonicalServiceName).thenReturn(fs_addr); |
| Mockito.doAnswer(new Answer<String>(){ |
| @Override |
| public String answer(InvocationOnMock invocation) |
| throws Throwable { |
| return fs_addr; |
| }}).when(hfs).getCanonicalServiceName(); |
| |
| Credentials credentials = new Credentials(); |
| Path p = new Path(uri.toString()); |
| System.out.println("Path for hftp="+ p + "; fs_addr="+fs_addr + "; rn=" + renewer); |
| TokenCache.obtainTokensForNamenodesInternal(hfs, credentials, jConf); |
| |
| Collection<Token<? extends TokenIdentifier>> tns = credentials.getAllTokens(); |
| assertEquals("number of tokens is not 1", 1, tns.size()); |
| |
| boolean found = false; |
| for(Token<? extends TokenIdentifier> tt: tns) { |
| System.out.println("token="+tt); |
| if(tt.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) && |
| tt.getService().equals(new Text(fs_addr))) { |
| found = true; |
| assertEquals("different token", tt, t); |
| } |
| assertTrue("didn't find token for " + p, found); |
| } |
| } |
| |
| /** |
| * verify _HOST substitution |
| * @throws IOException |
| */ |
| @Test |
| public void testGetJTPrincipal() throws IOException { |
| String serviceName = "jt/"; |
| String hostName = "foo"; |
| String domainName = "@BAR"; |
| Configuration conf = new Configuration(); |
| conf.set(JTConfig.JT_IPC_ADDRESS, hostName + ":8888"); |
| conf.set(JTConfig.JT_USER_NAME, serviceName + SecurityUtil.HOSTNAME_PATTERN |
| + domainName); |
| assertEquals("Failed to substitute HOSTNAME_PATTERN with hostName", |
| serviceName + hostName + domainName, Master.getMasterPrincipal(conf)); |
| } |
| |
| @Test |
| public void testGetTokensForViewFS() throws IOException, URISyntaxException { |
| Configuration conf = new Configuration(jConf); |
| FileSystem dfs = dfsCluster.getFileSystem(); |
| String serviceName = dfs.getCanonicalServiceName(); |
| |
| Path p1 = new Path("/mount1"); |
| Path p2 = new Path("/mount2"); |
| p1 = dfs.makeQualified(p1); |
| p2 = dfs.makeQualified(p2); |
| |
| conf.set("fs.viewfs.mounttable.default.link./dir1", p1.toString()); |
| conf.set("fs.viewfs.mounttable.default.link./dir2", p2.toString()); |
| Credentials credentials = new Credentials(); |
| Path lp1 = new Path("viewfs:///dir1"); |
| Path lp2 = new Path("viewfs:///dir2"); |
| Path[] paths = new Path[2]; |
| paths[0] = lp1; |
| paths[1] = lp2; |
| TokenCache.obtainTokensForNamenodesInternal(credentials, paths, conf); |
| |
| Collection<Token<? extends TokenIdentifier>> tns = |
| credentials.getAllTokens(); |
| assertEquals("number of tokens is not 1", 1, tns.size()); |
| |
| boolean found = false; |
| for (Token<? extends TokenIdentifier> tt : tns) { |
| System.out.println("token=" + tt); |
| if (tt.getKind().equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND) |
| && tt.getService().equals(new Text(serviceName))) { |
| found = true; |
| } |
| assertTrue("didn't find token for [" + lp1 + ", " + lp2 + "]", found); |
| } |
| } |
| } |