| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.webapp; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.StringWriter; |
| import java.net.HttpURLConnection; |
| import java.net.URL; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.concurrent.Callable; |
| |
| import javax.ws.rs.core.MediaType; |
| import javax.xml.bind.JAXBContext; |
| import javax.xml.bind.Marshaller; |
| |
| import org.apache.commons.io.IOUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.minikdc.MiniKdc; |
| import org.apache.hadoop.security.AuthenticationFilterInitializer; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.authentication.KerberosTestUtils; |
| import org.apache.hadoop.security.authentication.server.AuthenticationFilter; |
| import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; |
| import org.apache.hadoop.yarn.server.resourcemanager.MockRM; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo; |
| import org.codehaus.jettison.json.JSONObject; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import com.sun.jersey.api.client.ClientResponse.Status; |
| import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PROXY_USER_PREFIX; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| @RunWith(Parameterized.class) |
| public class TestRMWebServicesDelegationTokenAuthentication { |
| |
| private static final File testRootDir = new File("target", |
| TestRMWebServicesDelegationTokenAuthentication.class.getName() + "-root"); |
| private static File httpSpnegoKeytabFile = new File( |
| KerberosTestUtils.getKeytabFile()); |
| private static final String SUN_SECURITY_KRB5_RCACHE_KEY = |
| "sun.security.krb5.rcache"; |
| |
| private static String httpSpnegoPrincipal = KerberosTestUtils |
| .getServerPrincipal(); |
| |
| private static boolean miniKDCStarted = false; |
| private static MiniKdc testMiniKDC; |
| private static MockRM rm; |
| private static String sunSecurityKrb5RcacheValue; |
| |
| String delegationTokenHeader; |
| |
| // use published header name |
| final static String OldDelegationTokenHeader = |
| "Hadoop-YARN-Auth-Delegation-Token"; |
| |
| // alternate header name |
| final static String NewDelegationTokenHeader = |
| DelegationTokenAuthenticator.DELEGATION_TOKEN_HEADER; |
| |
| @BeforeClass |
| public static void setUp() { |
| try { |
| // Disabling kerberos replay cache to avoid "Request is a replay" errors |
| // caused by frequent webservice calls |
| sunSecurityKrb5RcacheValue = |
| System.getProperty(SUN_SECURITY_KRB5_RCACHE_KEY); |
| System.setProperty(SUN_SECURITY_KRB5_RCACHE_KEY, "none"); |
| testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir); |
| setupKDC(); |
| setupAndStartRM(); |
| } catch (Exception e) { |
| assertTrue("Couldn't create MiniKDC", false); |
| } |
| } |
| |
| @AfterClass |
| public static void tearDown() { |
| if (testMiniKDC != null) { |
| testMiniKDC.stop(); |
| } |
| if (rm != null) { |
| rm.stop(); |
| } |
| if (sunSecurityKrb5RcacheValue == null) { |
| System.clearProperty(SUN_SECURITY_KRB5_RCACHE_KEY); |
| } else { |
| System.setProperty(SUN_SECURITY_KRB5_RCACHE_KEY, |
| sunSecurityKrb5RcacheValue); |
| } |
| } |
| |
| @Parameterized.Parameters |
| public static Collection<Object[]> headers() { |
| return Arrays.asList(new Object[][] { {OldDelegationTokenHeader}, {NewDelegationTokenHeader}}); |
| } |
| |
| public TestRMWebServicesDelegationTokenAuthentication(String header) throws Exception { |
| super(); |
| this.delegationTokenHeader = header; |
| } |
| |
| private static void setupAndStartRM() throws Exception { |
| Configuration rmconf = new Configuration(); |
| rmconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, |
| YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); |
| rmconf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, |
| ResourceScheduler.class); |
| rmconf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); |
| String httpPrefix = "hadoop.http.authentication."; |
| rmconf.setStrings(httpPrefix + "type", "kerberos"); |
| rmconf.set(httpPrefix + KerberosAuthenticationHandler.PRINCIPAL, |
| httpSpnegoPrincipal); |
| rmconf.set(httpPrefix + KerberosAuthenticationHandler.KEYTAB, |
| httpSpnegoKeytabFile.getAbsolutePath()); |
| // use any file for signature secret |
| rmconf.set(httpPrefix + AuthenticationFilter.SIGNATURE_SECRET + ".file", |
| httpSpnegoKeytabFile.getAbsolutePath()); |
| rmconf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, |
| "kerberos"); |
| rmconf.setBoolean(YarnConfiguration.RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER, |
| true); |
| rmconf.set("hadoop.http.filter.initializers", |
| AuthenticationFilterInitializer.class.getName()); |
| rmconf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, |
| httpSpnegoPrincipal); |
| rmconf.set(YarnConfiguration.RM_KEYTAB, |
| httpSpnegoKeytabFile.getAbsolutePath()); |
| rmconf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, |
| httpSpnegoKeytabFile.getAbsolutePath()); |
| rmconf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, |
| httpSpnegoPrincipal); |
| rmconf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, |
| httpSpnegoKeytabFile.getAbsolutePath()); |
| rmconf.setBoolean("mockrm.webapp.enabled", true); |
| rmconf.set(RM_PROXY_USER_PREFIX + "client.hosts", "*"); |
| rmconf.set(RM_PROXY_USER_PREFIX + "client.groups", "*"); |
| UserGroupInformation.setConfiguration(rmconf); |
| rm = new MockRM(rmconf); |
| rm.start(); |
| |
| } |
| |
| private static void setupKDC() throws Exception { |
| if (!miniKDCStarted) { |
| testMiniKDC.start(); |
| getKdc().createPrincipal(httpSpnegoKeytabFile, "HTTP/localhost", |
| "client", UserGroupInformation.getLoginUser().getShortUserName(), |
| "client2"); |
| miniKDCStarted = true; |
| } |
| } |
| |
| private static MiniKdc getKdc() { |
| return testMiniKDC; |
| } |
| |
| // Test that you can authenticate with only delegation tokens |
| // 1. Get a delegation token using Kerberos auth(this ends up |
| // testing the fallback authenticator) |
| // 2. Submit an app without kerberos or delegation-token |
| // - we should get an UNAUTHORIZED response |
| // 3. Submit same app with delegation-token |
| // - we should get OK response |
| // - confirm owner of the app is the user whose |
| // delegation-token we used |
| |
| @Test |
| public void testDelegationTokenAuth() throws Exception { |
| final String token = getDelegationToken("test"); |
| |
| ApplicationSubmissionContextInfo app = |
| new ApplicationSubmissionContextInfo(); |
| String appid = "application_123_0"; |
| app.setApplicationId(appid); |
| String requestBody = getMarshalledAppInfo(app); |
| |
| URL url = new URL("http://localhost:8088/ws/v1/cluster/apps"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| setupConn(conn, "POST", "application/xml", requestBody); |
| |
| // this should fail with unauthorized because only |
| // auth is kerberos or delegation token |
| try { |
| conn.getInputStream(); |
| fail("we should not be here"); |
| } catch (IOException e) { |
| assertEquals(Status.UNAUTHORIZED.getStatusCode(), conn.getResponseCode()); |
| } |
| |
| conn = (HttpURLConnection) url.openConnection(); |
| conn.setRequestProperty(delegationTokenHeader, token); |
| setupConn(conn, "POST", MediaType.APPLICATION_XML, requestBody); |
| |
| // this should not fail |
| try { |
| conn.getInputStream(); |
| } |
| catch(IOException ie) { |
| InputStream errorStream = conn.getErrorStream(); |
| String error = ""; |
| BufferedReader reader = null; |
| reader = new BufferedReader(new InputStreamReader(errorStream, "UTF8")); |
| for (String line; (line = reader.readLine()) != null;) { |
| error += line; |
| } |
| reader.close(); |
| errorStream.close(); |
| fail("Response " + conn.getResponseCode() + "; " + error); |
| } |
| |
| boolean appExists = |
| rm.getRMContext().getRMApps() |
| .containsKey(ApplicationId.fromString(appid)); |
| assertTrue(appExists); |
| RMApp actualApp = |
| rm.getRMContext().getRMApps() |
| .get(ApplicationId.fromString(appid)); |
| String owner = actualApp.getUser(); |
| assertEquals("client", owner); |
| } |
| |
| // Test to make sure that cancelled delegation tokens |
| // are rejected |
| @Test |
| public void testCancelledDelegationToken() throws Exception { |
| String token = getDelegationToken("client"); |
| cancelDelegationToken(token); |
| ApplicationSubmissionContextInfo app = |
| new ApplicationSubmissionContextInfo(); |
| String appid = "application_123_0"; |
| app.setApplicationId(appid); |
| String requestBody = getMarshalledAppInfo(app); |
| |
| URL url = new URL("http://localhost:8088/ws/v1/cluster/apps"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| conn.setRequestProperty(delegationTokenHeader, token); |
| setupConn(conn, "POST", MediaType.APPLICATION_XML, requestBody); |
| |
| // this should fail with unauthorized because only |
| // auth is kerberos or delegation token |
| try { |
| conn.getInputStream(); |
| fail("Authentication should fail with expired delegation tokens"); |
| } catch (IOException e) { |
| assertEquals(Status.FORBIDDEN.getStatusCode(), conn.getResponseCode()); |
| } |
| } |
| |
| // Test to make sure that we can't do delegation token |
| // functions using just delegation token auth |
| @Test |
| public void testDelegationTokenOps() throws Exception { |
| String token = getDelegationToken("client"); |
| String createRequest = "{\"renewer\":\"test\"}"; |
| String renewRequest = "{\"token\": \"" + token + "\"}"; |
| |
| // first test create and renew |
| String[] requests = { createRequest, renewRequest }; |
| for (String requestBody : requests) { |
| URL url = new URL("http://localhost:8088/ws/v1/cluster/delegation-token"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| conn.setRequestProperty(delegationTokenHeader, token); |
| setupConn(conn, "POST", MediaType.APPLICATION_JSON, requestBody); |
| try { |
| conn.getInputStream(); |
| fail("Creation/Renewing delegation tokens should not be " |
| + "allowed with token auth"); |
| } catch (IOException e) { |
| assertEquals(Status.FORBIDDEN.getStatusCode(), conn.getResponseCode()); |
| } |
| } |
| |
| // test cancel |
| URL url = new URL("http://localhost:8088/ws/v1/cluster/delegation-token"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| conn.setRequestProperty(delegationTokenHeader, token); |
| conn.setRequestProperty(RMWebServices.DELEGATION_TOKEN_HEADER, token); |
| setupConn(conn, "DELETE", null, null); |
| try { |
| conn.getInputStream(); |
| fail("Cancelling delegation tokens should not be allowed with token auth"); |
| } catch (IOException e) { |
| assertEquals(Status.FORBIDDEN.getStatusCode(), conn.getResponseCode()); |
| } |
| } |
| |
| // Superuser "client" should be able to get a delegation token |
| // for user "client2" when authenticated using Kerberos |
| // The request shouldn't work when authenticated using DelegationTokens |
| @Test |
| public void testDoAs() throws Exception { |
| |
| KerberosTestUtils.doAsClient(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| String token = ""; |
| String owner = ""; |
| String renewer = "renewer"; |
| String body = "{\"renewer\":\"" + renewer + "\"}"; |
| URL url = |
| new URL("http://localhost:8088/ws/v1/cluster/delegation-token?doAs=client2"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| setupConn(conn, "POST", MediaType.APPLICATION_JSON, body); |
| InputStream response = conn.getInputStream(); |
| assertEquals(Status.OK.getStatusCode(), conn.getResponseCode()); |
| BufferedReader reader = null; |
| try { |
| reader = new BufferedReader(new InputStreamReader(response, "UTF8")); |
| for (String line; (line = reader.readLine()) != null;) { |
| JSONObject obj = new JSONObject(line); |
| if (obj.has("token")) { |
| token = obj.getString("token"); |
| } |
| if(obj.has("owner")) { |
| owner = obj.getString("owner"); |
| } |
| } |
| } finally { |
| IOUtils.closeQuietly(reader); |
| IOUtils.closeQuietly(response); |
| } |
| Assert.assertEquals("client2", owner); |
| Token<RMDelegationTokenIdentifier> realToken = new Token<RMDelegationTokenIdentifier>(); |
| realToken.decodeFromUrlString(token); |
| Assert.assertEquals("client2", realToken.decodeIdentifier().getOwner().toString()); |
| return null; |
| } |
| }); |
| |
| // this should not work |
| final String token = getDelegationToken("client"); |
| String renewer = "renewer"; |
| String body = "{\"renewer\":\"" + renewer + "\"}"; |
| URL url = |
| new URL("http://localhost:8088/ws/v1/cluster/delegation-token?doAs=client2"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| conn.setRequestProperty(delegationTokenHeader, token); |
| setupConn(conn, "POST", MediaType.APPLICATION_JSON, body); |
| try { |
| conn.getInputStream(); |
| fail("Client should not be allowed to impersonate using delegation tokens"); |
| } |
| catch(IOException ie) { |
| assertEquals(Status.FORBIDDEN.getStatusCode(), conn.getResponseCode()); |
| } |
| |
| // this should also fail due to client2 not being a super user |
| KerberosTestUtils.doAs("client2@EXAMPLE.COM", new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| String renewer = "renewer"; |
| String body = "{\"renewer\":\"" + renewer + "\"}"; |
| URL url = |
| new URL( |
| "http://localhost:8088/ws/v1/cluster/delegation-token?doAs=client"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| setupConn(conn, "POST", MediaType.APPLICATION_JSON, body); |
| try { |
| conn.getInputStream(); |
| fail("Non superuser client should not be allowed to carry out doAs"); |
| } |
| catch (IOException ie) { |
| assertEquals(Status.FORBIDDEN.getStatusCode(), conn.getResponseCode()); |
| } |
| return null; |
| } |
| }); |
| |
| } |
| |
| private String getDelegationToken(final String renewer) throws Exception { |
| return KerberosTestUtils.doAsClient(new Callable<String>() { |
| @Override |
| public String call() throws Exception { |
| String ret = null; |
| String body = "{\"renewer\":\"" + renewer + "\"}"; |
| URL url = |
| new URL("http://localhost:8088/ws/v1/cluster/delegation-token"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| setupConn(conn, "POST", MediaType.APPLICATION_JSON, body); |
| InputStream response = conn.getInputStream(); |
| assertEquals(Status.OK.getStatusCode(), conn.getResponseCode()); |
| BufferedReader reader = null; |
| try { |
| reader = new BufferedReader(new InputStreamReader(response, "UTF8")); |
| for (String line; (line = reader.readLine()) != null;) { |
| JSONObject obj = new JSONObject(line); |
| if (obj.has("token")) { |
| reader.close(); |
| response.close(); |
| ret = obj.getString("token"); |
| break; |
| } |
| } |
| } finally { |
| IOUtils.closeQuietly(reader); |
| IOUtils.closeQuietly(response); |
| } |
| return ret; |
| } |
| }); |
| } |
| |
| private void cancelDelegationToken(final String tokenString) throws Exception { |
| |
| KerberosTestUtils.doAsClient(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| URL url = |
| new URL("http://localhost:8088/ws/v1/cluster/delegation-token"); |
| HttpURLConnection conn = (HttpURLConnection) url.openConnection(); |
| conn.setRequestProperty(RMWebServices.DELEGATION_TOKEN_HEADER, |
| tokenString); |
| setupConn(conn, "DELETE", null, null); |
| InputStream response = conn.getInputStream(); |
| assertEquals(Status.OK.getStatusCode(), conn.getResponseCode()); |
| response.close(); |
| return null; |
| } |
| }); |
| } |
| |
| static String getMarshalledAppInfo(ApplicationSubmissionContextInfo appInfo) |
| throws Exception { |
| |
| StringWriter writer = new StringWriter(); |
| JAXBContext context = |
| JAXBContext.newInstance(ApplicationSubmissionContextInfo.class); |
| Marshaller m = context.createMarshaller(); |
| m.marshal(appInfo, writer); |
| return writer.toString(); |
| } |
| |
| static void setupConn(HttpURLConnection conn, String method, |
| String contentType, String body) throws Exception { |
| conn.setRequestMethod(method); |
| conn.setDoOutput(true); |
| conn.setRequestProperty("Accept-Charset", "UTF8"); |
| if (contentType != null && !contentType.isEmpty()) { |
| conn.setRequestProperty("Content-Type", contentType + ";charset=UTF8"); |
| if (body != null && !body.isEmpty()) { |
| OutputStream stream = conn.getOutputStream(); |
| stream.write(body.getBytes("UTF8")); |
| stream.close(); |
| } |
| } |
| } |
| } |