| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.gateway.dispatch; |
| |
| import org.apache.hadoop.gateway.SpiGatewayMessages; |
| import org.apache.hadoop.gateway.SpiGatewayResources; |
| import org.apache.hadoop.gateway.audit.api.Action; |
| import org.apache.hadoop.gateway.audit.api.ActionOutcome; |
| import org.apache.hadoop.gateway.audit.api.AuditServiceFactory; |
| import org.apache.hadoop.gateway.audit.api.Auditor; |
| import org.apache.hadoop.gateway.audit.api.ResourceType; |
| import org.apache.hadoop.gateway.audit.log4j.audit.AuditConstants; |
| import org.apache.hadoop.gateway.config.Configure; |
| import org.apache.hadoop.gateway.config.Default; |
| import org.apache.hadoop.gateway.config.GatewayConfig; |
| import org.apache.hadoop.gateway.i18n.messages.MessagesFactory; |
| import org.apache.hadoop.gateway.i18n.resources.ResourcesFactory; |
| import org.apache.http.Header; |
| import org.apache.http.HttpEntity; |
| import org.apache.http.HttpResponse; |
| import org.apache.http.HttpStatus; |
| import org.apache.http.client.HttpClient; |
| import org.apache.http.client.methods.HttpDelete; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.client.methods.HttpOptions; |
| import org.apache.http.client.methods.HttpPost; |
| import org.apache.http.client.methods.HttpPut; |
| import org.apache.http.client.methods.HttpUriRequest; |
| import org.apache.http.entity.ContentType; |
| import org.apache.http.entity.InputStreamEntity; |
| import org.apache.http.message.BasicHeader; |
| import org.apache.http.util.EntityUtils; |
| |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.HashSet; |
| import java.util.Set; |
| |
| /** |
| * |
| */ |
| public class DefaultDispatch extends AbstractGatewayDispatch { |
| |
| // private static final String CT_APP_WWW_FORM_URL_ENCODED = "application/x-www-form-urlencoded"; |
| // private static final String CT_APP_XML = "application/xml"; |
| protected static final String Q_DELEGATION_EQ = "?delegation="; |
| protected static final String AMP_DELEGATION_EQ = "&delegation="; |
| protected static final String COOKIE = "Cookie"; |
| protected static final String SET_COOKIE = "Set-Cookie"; |
| protected static final String WWW_AUTHENTICATE = "WWW-Authenticate"; |
| protected static final String NEGOTIATE = "Negotiate"; |
| |
| protected static SpiGatewayMessages LOG = MessagesFactory.get(SpiGatewayMessages.class); |
| protected static SpiGatewayResources RES = ResourcesFactory.get(SpiGatewayResources.class); |
| protected static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(AuditConstants.DEFAULT_AUDITOR_NAME, |
| AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME); |
| |
| protected AppCookieManager appCookieManager; |
| |
| private int replayBufferSize = 0; |
| private Set<String> outboundResponseExcludeHeaders; |
| |
| @Override |
| public void init() { |
| setAppCookieManager(new AppCookieManager()); |
| outboundResponseExcludeHeaders = new HashSet<String>(); |
| outboundResponseExcludeHeaders.add(SET_COOKIE); |
| outboundResponseExcludeHeaders.add(WWW_AUTHENTICATE); |
| } |
| |
| @Override |
| public void destroy() { |
| |
| } |
| |
| public void setAppCookieManager(AppCookieManager appCookieManager) { |
| this.appCookieManager = appCookieManager; |
| } |
| |
| protected void executeRequest( |
| HttpUriRequest outboundRequest, |
| HttpServletRequest inboundRequest, |
| HttpServletResponse outboundResponse) |
| throws IOException { |
| HttpResponse inboundResponse = executeOutboundRequest(outboundRequest); |
| writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse); |
| } |
| |
| protected HttpResponse executeOutboundRequest(HttpUriRequest outboundRequest) throws IOException { |
| LOG.dispatchRequest(outboundRequest.getMethod(), outboundRequest.getURI()); |
| HttpResponse inboundResponse = null; |
| |
| try { |
| String query = outboundRequest.getURI().getQuery(); |
| if (!"true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) { |
| // Hadoop cluster not Kerberos enabled |
| addCredentialsToRequest(outboundRequest); |
| inboundResponse = client.execute(outboundRequest); |
| } else if (query.contains(Q_DELEGATION_EQ) || |
| // query string carries delegation token |
| query.contains(AMP_DELEGATION_EQ)) { |
| inboundResponse = client.execute(outboundRequest); |
| } else { |
| // Kerberos secured, no delegation token in query string |
| inboundResponse = executeKerberosDispatch(outboundRequest, client); |
| } |
| } catch (IOException e) { |
| // we do not want to expose back end host. port end points to clients, see JIRA KNOX-58 |
| LOG.dispatchServiceConnectionException(outboundRequest.getURI(), e); |
| auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.FAILURE); |
| throw new IOException(RES.dispatchConnectionError()); |
| } finally { |
| if (inboundResponse != null) { |
| int statusCode = inboundResponse.getStatusLine().getStatusCode(); |
| if (statusCode != 201) { |
| LOG.dispatchResponseStatusCode(statusCode); |
| } else { |
| Header location = inboundResponse.getFirstHeader("Location"); |
| if (location == null) { |
| LOG.dispatchResponseStatusCode(statusCode); |
| } else { |
| LOG.dispatchResponseCreatedStatusCode(statusCode, location.getValue()); |
| } |
| } |
| auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.SUCCESS, RES.responseStatus(statusCode)); |
| } else { |
| auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.UNAVAILABLE); |
| } |
| |
| } |
| return inboundResponse; |
| } |
| |
| protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException { |
| // Copy the client respond header to the server respond. |
| outboundResponse.setStatus(inboundResponse.getStatusLine().getStatusCode()); |
| Header[] headers = inboundResponse.getAllHeaders(); |
| Set<String> excludeHeaders = getOutboundResponseExcludeHeaders(); |
| boolean hasExcludeHeaders = false; |
| if ((excludeHeaders != null) && !(excludeHeaders.isEmpty())) { |
| hasExcludeHeaders = true; |
| } |
| for ( Header header : headers ) { |
| String name = header.getName(); |
| if (hasExcludeHeaders && excludeHeaders.contains(name)) { |
| continue; |
| } |
| String value = header.getValue(); |
| outboundResponse.addHeader(name, value); |
| } |
| |
| HttpEntity entity = inboundResponse.getEntity(); |
| if ( entity != null ) { |
| Header contentType = entity.getContentType(); |
| if ( contentType != null ) { |
| outboundResponse.setContentType(contentType.getValue()); |
| } |
| //KM[ If this is set here it ends up setting the content length to the content returned from the server. |
| // This length might not match if the the content is rewritten. |
| // long contentLength = entity.getContentLength(); |
| // if( contentLength <= Integer.MAX_VALUE ) { |
| // outboundResponse.setContentLength( (int)contentLength ); |
| // } |
| //] |
| InputStream stream = entity.getContent(); |
| try { |
| writeResponse( inboundRequest, outboundResponse, stream ); |
| } finally { |
| closeInboundResponse( inboundResponse, stream ); |
| } |
| } |
| } |
| |
| protected void closeInboundResponse( HttpResponse response, InputStream stream ) throws IOException { |
| try { |
| stream.close(); |
| } finally { |
| if( response instanceof Closeable ) { |
| ( (Closeable)response).close(); |
| } |
| } |
| } |
| |
| /** |
| * This method provides a hook for specialized credential propagation |
| * in subclasses. |
| * |
| * @param outboundRequest |
| */ |
| protected void addCredentialsToRequest(HttpUriRequest outboundRequest) { |
| } |
| |
| protected HttpResponse executeKerberosDispatch(HttpUriRequest outboundRequest, |
| HttpClient client) throws IOException { |
| HttpResponse inboundResponse; |
| String appCookie = appCookieManager.getCachedAppCookie(); |
| if (appCookie != null) { |
| outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie)); |
| } |
| inboundResponse = client.execute(outboundRequest); |
| // if inBoundResponse has status 401 and header WWW-Authenticate: Negoitate |
| // refresh hadoop.auth.cookie and attempt one more time |
| int statusCode = inboundResponse.getStatusLine().getStatusCode(); |
| if (statusCode == HttpStatus.SC_UNAUTHORIZED) { |
| Header[] wwwAuthHeaders = inboundResponse.getHeaders(WWW_AUTHENTICATE); |
| if (wwwAuthHeaders != null && wwwAuthHeaders.length != 0 && |
| wwwAuthHeaders[0].getValue().trim().startsWith(NEGOTIATE)) { |
| //need to consume the previous inbound response first |
| EntityUtils.consume(inboundResponse.getEntity()); |
| |
| appCookie = appCookieManager.getAppCookie(outboundRequest, true); |
| outboundRequest.removeHeaders(COOKIE); |
| outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie)); |
| inboundResponse = client.execute(outboundRequest); |
| } else { |
| // no supported authentication type found |
| // we would let the original response propagate |
| } |
| } else { |
| // not a 401 Unauthorized status code |
| // we would let the original response propagate |
| } |
| return inboundResponse; |
| } |
| |
| protected HttpEntity createRequestEntity(HttpServletRequest request) |
| throws IOException { |
| |
| String contentType = request.getContentType(); |
| int contentLength = request.getContentLength(); |
| InputStream contentStream = request.getInputStream(); |
| |
| HttpEntity entity; |
| if (contentType == null) { |
| entity = new InputStreamEntity(contentStream, contentLength); |
| } else { |
| entity = new InputStreamEntity(contentStream, contentLength, ContentType.parse(contentType)); |
| } |
| |
| |
| if ("true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) { |
| |
| //Check if delegation token is supplied in the request |
| boolean delegationTokenPresent = false; |
| String queryString = request.getQueryString(); |
| if (queryString != null) { |
| delegationTokenPresent = queryString.startsWith("delegation=") || |
| queryString.contains("&delegation="); |
| } |
| if (!delegationTokenPresent && getReplayBufferSize() > 0) { |
| entity = new CappedBufferHttpEntity(entity, getReplayBufferSize() * 1024); |
| } |
| } |
| |
| return entity; |
| } |
| |
| @Override |
| public void doGet(URI url, HttpServletRequest request, HttpServletResponse response) |
| throws IOException, URISyntaxException { |
| HttpGet method = new HttpGet(url); |
| // https://issues.apache.org/jira/browse/KNOX-107 - Service URLs not rewritten for WebHDFS GET redirects |
| method.getParams().setBooleanParameter("http.protocol.handle-redirects", false); |
| copyRequestHeaderFields(method, request); |
| executeRequest(method, request, response); |
| } |
| |
| @Override |
| public void doOptions(URI url, HttpServletRequest request, HttpServletResponse response) |
| throws IOException, URISyntaxException { |
| HttpOptions method = new HttpOptions(url); |
| executeRequest(method, request, response); |
| } |
| |
| @Override |
| public void doPut(URI url, HttpServletRequest request, HttpServletResponse response) |
| throws IOException, URISyntaxException { |
| HttpPut method = new HttpPut(url); |
| HttpEntity entity = createRequestEntity(request); |
| method.setEntity(entity); |
| copyRequestHeaderFields(method, request); |
| executeRequest(method, request, response); |
| } |
| |
| @Override |
| public void doPost(URI url, HttpServletRequest request, HttpServletResponse response) |
| throws IOException, URISyntaxException { |
| HttpPost method = new HttpPost(url); |
| HttpEntity entity = createRequestEntity(request); |
| method.setEntity(entity); |
| copyRequestHeaderFields(method, request); |
| executeRequest(method, request, response); |
| } |
| |
| @Override |
| public void doDelete(URI url, HttpServletRequest request, HttpServletResponse response) |
| throws IOException, URISyntaxException { |
| HttpDelete method = new HttpDelete(url); |
| copyRequestHeaderFields(method, request); |
| executeRequest(method, request, response); |
| } |
| |
| protected int getReplayBufferSize() { |
| return replayBufferSize; |
| } |
| |
| @Configure |
| protected void setReplayBufferSize(@Default("8") int size) { |
| replayBufferSize = size; |
| } |
| |
| public Set<String> getOutboundResponseExcludeHeaders() { |
| return outboundResponseExcludeHeaders; |
| } |
| } |