blob: 401d56bb9f521c03b8a7faec8f939b39edddf427 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.resource.channel;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HTTPSProperties;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.proxy.BufferedRequest;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.DeploymentProperties;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status.Family;
import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Properties;
/**
* A Channel implementation for HTTP.
*/
public class HTTPChannel extends AbstractChannel {
private static final Logger LOG = LoggerFactory.getLogger(HTTPChannel.class);
private static final HttpServletRequest DEFAULT_NULL_REQUEST = new NullServletRequest();
private static final Properties DEPLOYMENT_PROPERTIES = DeploymentProperties.get();
private static final String DO_AS_PARAM = "doAs";
private String colo;
private String serviceName;
private Class service;
/* Name of the HTTP cookie used for the authentication token between Prism and Falcon server.
*/
private static final String AUTH_COOKIE = "hadoop.auth";
private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
protected static final KerberosAuthenticator AUTHENTICATOR = new KerberosAuthenticator();
protected static final HostnameVerifier ALL_TRUSTING_HOSTNAME_VERIFIER = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession sslSession) {
return true;
}
};
public void init(String inColo, String inServiceName) throws FalconException {
this.colo = inColo;
this.serviceName = inServiceName;
try {
String proxyClassName = DEPLOYMENT_PROPERTIES.getProperty(serviceName + ".proxy");
service = Class.forName(proxyClassName);
LOG.info("Service: {}", serviceName);
} catch (Exception e) {
throw new FalconException("Unable to initialize channel for " + serviceName, e);
}
}
private String getFalconEndPoint() {
String prefixPath = DEPLOYMENT_PROPERTIES.getProperty(serviceName + ".path");
String falconEndPoint = RuntimeProperties.get().getProperty("falcon." + colo + ".endpoint");
return falconEndPoint + "/" + prefixPath;
}
@SuppressWarnings("unchecked")
@Override
public <T> T invoke(String methodName, Object... args) throws FalconException {
HttpServletRequest incomingRequest = null;
try {
Method method = getMethod(service, methodName, args);
String urlPrefix = getFalconEndPoint();
final String url = urlPrefix + "/" + pathValue(method, args);
LOG.debug("Executing {}", url);
incomingRequest = getIncomingRequest(args);
incomingRequest.getInputStream().reset();
String httpMethod = getHttpMethod(method);
String mimeType = getConsumes(method);
String accept = MediaType.WILDCARD;
final String user = CurrentUser.getUser();
String doAsUser = incomingRequest.getParameter(DO_AS_PARAM);
WebResource resource = getClient()
.resource(UriBuilder.fromUri(url).build().normalize())
.queryParam("user.name", user);
if (doAsUser != null) {
resource = resource.queryParam("doAs", doAsUser);
}
AuthenticatedURL.Token authenticationToken = null;
if (SecurityUtil.isSecurityEnabled()) {
UserGroupInformation ugiLoginUser = UserGroupInformation.getCurrentUser();
LOG.debug("Security is enabled. Using DoAs : " + ugiLoginUser.getUserName());
authenticationToken = ugiLoginUser.doAs(new PrivilegedExceptionAction<AuthenticatedURL.Token>() {
@Override
public AuthenticatedURL.Token run() throws Exception {
return getToken(url + PseudoAuthenticator.USER_NAME + "=" + user, getClient());
}
});
}
ClientResponse response = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(accept).type(mimeType)
.method(httpMethod, ClientResponse.class,
(isPost(httpMethod) ? incomingRequest.getInputStream() : null));
incomingRequest.getInputStream().reset();
Family status = response.getClientResponseStatus().getFamily();
if (status == Family.INFORMATIONAL || status == Family.SUCCESSFUL) {
return (T) response.getEntity(method.getReturnType());
} else if (response.getClientResponseStatus().getStatusCode()
== Response.Status.BAD_REQUEST.getStatusCode()) {
LOG.error("Request failed: {}", response.getClientResponseStatus().getStatusCode());
throw FalconWebException.newAPIException(response.
getEntity(APIResult.class).getMessage());
} else {
LOG.error("Request failed: {}", response.getClientResponseStatus().getStatusCode());
throw new FalconException(response.getEntity(String.class));
}
} catch (FalconWebException falconWebException) {
LOG.error("Request failed", falconWebException);
throw falconWebException;
} catch (Throwable e) {
LOG.error("Request failed", e);
throw new FalconException(e);
} finally {
try {
if (incomingRequest != null) {
incomingRequest.getInputStream().reset();
}
} catch (IOException e) {
LOG.error("Error in HTTPChannel", e);
}
}
}
protected Client getClient() throws Exception {
return Client.create(new DefaultClientConfig());
}
private boolean isPost(String httpMethod) {
return httpMethod.equals("POST") || httpMethod.equals("PUT");
}
private String pathValue(Method method, Object... args) throws FalconException {
Path pathParam = method.getAnnotation(Path.class);
if (pathParam == null) {
throw new FalconException("No path param mentioned for " + method);
}
String pathValue = pathParam.value();
Annotation[][] paramAnnotations = method.getParameterAnnotations();
StringBuilder queryString = new StringBuilder("?");
for (int index = 0; index < args.length; index++) {
if (args[index] instanceof String || args[index] instanceof Boolean || args[index] instanceof Integer
|| args[index] instanceof List) {
String arg = String.valueOf(args[index]);
for (int annotation = 0; annotation < paramAnnotations[index].length; annotation++) {
Annotation paramAnnotation = paramAnnotations[index][annotation];
String annotationClass = paramAnnotation.annotationType().getName();
if (annotationClass.equals(QueryParam.class.getName())) {
if (args[index] instanceof List){
List lifecycle = (List) args[index];
if (lifecycle.size() > 0 && lifecycle.get(0) instanceof LifeCycle) {
List<LifeCycle> lifecycles = lifecycle;
for (LifeCycle l : lifecycles) {
queryString.append(getAnnotationValue(paramAnnotation, "value")).append("=")
.append(l.name()).append("&");
}
}
}else{
queryString.append(getAnnotationValue(paramAnnotation, "value")).
append('=').append(arg).append("&");
}
} else if (annotationClass.equals(PathParam.class.getName())) {
pathValue = pathValue.replace("{"
+ getAnnotationValue(paramAnnotation, "value") + "}", arg);
}
}
}
}
return pathValue + queryString.toString();
}
private String getAnnotationValue(Annotation paramAnnotation,
String annotationAttribute) throws FalconException {
try {
return String.valueOf(paramAnnotation.annotationType().
getMethod(annotationAttribute).invoke(paramAnnotation));
} catch (Exception e) {
throw new FalconException("Unable to get attribute value for "
+ paramAnnotation + "[" + annotationAttribute + "]");
}
}
private HttpServletRequest getIncomingRequest(Object... args) {
for (Object arg : args) {
if (arg instanceof HttpServletRequest) {
return (HttpServletRequest) arg;
}
}
return new BufferedRequest(DEFAULT_NULL_REQUEST);
}
private String getHttpMethod(Method method) {
PUT put = method.getAnnotation(PUT.class);
if (put != null) {
return HttpMethod.PUT;
}
POST post = method.getAnnotation(POST.class);
if (post != null) {
return HttpMethod.POST;
}
DELETE delete = method.getAnnotation(DELETE.class);
if (delete != null) {
return HttpMethod.DELETE;
}
return HttpMethod.GET;
}
private String getConsumes(Method method) {
Consumes consumes = method.getAnnotation(Consumes.class);
if (consumes == null || consumes.value() == null) {
return MediaType.TEXT_PLAIN;
}
return consumes.value()[0];
}
protected AuthenticatedURL.Token getToken(String baseUrl, Client client) throws FalconException {
AuthenticatedURL.Token currentToken = new AuthenticatedURL.Token();
try {
URL url = new URL(baseUrl);
// using KerberosAuthenticator which falls back to PsuedoAuthenticator
// instead of passing authentication type from the command line - bad factory
HTTPSProperties httpsProperties = ((HTTPSProperties)
client.getProperties().get(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES));
SSLContext sslContext = null;
if (httpsProperties != null) {
sslContext = httpsProperties.getSSLContext();
}
if (sslContext != null) {
HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory());
HttpsURLConnection.setDefaultHostnameVerifier(ALL_TRUSTING_HOSTNAME_VERIFIER);
}
new AuthenticatedURL(AUTHENTICATOR).openConnection(url, currentToken);
} catch (Exception ex) {
throw new FalconException("Could not authenticate, " + ex.getMessage(), ex);
}
return currentToken;
}
}