blob: 011e2cc9855247505f772d19075905acf1d60752 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ranger.tagsync.sink.tagadmin;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Cookie;
import javax.ws.rs.core.NewCookie;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ranger.admin.client.datatype.RESTResponse;
import org.apache.ranger.plugin.util.RangerRESTClient;
import org.apache.ranger.plugin.util.ServiceTags;
import org.apache.ranger.tagsync.model.TagSink;
import org.apache.ranger.tagsync.process.TagSyncConfig;
import com.sun.jersey.api.client.ClientResponse;
public class TagAdminRESTSink implements TagSink, Runnable {
private static final Log LOG = LogFactory.getLog(TagAdminRESTSink.class);
private static final String REST_PREFIX = "/service";
private static final String MODULE_PREFIX = "/tags";
private static final String REST_URL_IMPORT_SERVICETAGS_RESOURCE = REST_PREFIX + MODULE_PREFIX + "/importservicetags/";
private long rangerAdminConnectionCheckInterval;
private Cookie sessionId=null;
private boolean isValidRangerCookie=false;
List<NewCookie> cookieList=new ArrayList<>();
private boolean isRangerCookieEnabled;
private String rangerAdminCookieName;
private RangerRESTClient tagRESTClient = null;
private boolean isKerberized;
private BlockingQueue<UploadWorkItem> uploadWorkItems;
private Thread myThread = null;
@Override
public boolean initialize(Properties properties) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagAdminRESTSink.initialize()");
}
boolean ret = false;
String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties);
String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
String userName = TagSyncConfig.getTagAdminUserName(properties);
String password = TagSyncConfig.getTagAdminPassword(properties);
rangerAdminConnectionCheckInterval = TagSyncConfig.getTagAdminConnectionCheckInterval(properties);
isKerberized = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
isRangerCookieEnabled = TagSyncConfig.isTagSyncRangerCookieEnabled(properties);
rangerAdminCookieName=TagSyncConfig.getRangerAdminCookieName(properties);
sessionId=null;
if (LOG.isDebugEnabled()) {
LOG.debug("restUrl=" + restUrl);
LOG.debug("sslConfigFile=" + sslConfigFile);
LOG.debug("userName=" + userName);
LOG.debug("rangerAdminConnectionCheckInterval=" + rangerAdminConnectionCheckInterval);
LOG.debug("isKerberized=" + isKerberized);
}
if (StringUtils.isNotBlank(restUrl)) {
tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile, TagSyncConfig.getInstance());
if (!isKerberized) {
tagRESTClient.setBasicAuthInfo(userName, password);
}
// Build and cache REST client. This will catch any errors in building REST client up-front
tagRESTClient.getClient();
uploadWorkItems = new LinkedBlockingQueue<UploadWorkItem>();
ret = true;
} else {
LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!");
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== TagAdminRESTSink.initialize(), result=" + ret);
}
return ret;
}
@Override
public ServiceTags upload(ServiceTags toUpload) throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("==> upload() ");
}
UploadWorkItem uploadWorkItem = new UploadWorkItem(toUpload);
uploadWorkItems.put(uploadWorkItem);
// Wait until message is successfully delivered
ServiceTags ret = uploadWorkItem.waitForUpload();
if(LOG.isDebugEnabled()) {
LOG.debug("<== upload()");
}
return ret;
}
private ServiceTags doUpload(ServiceTags serviceTags) throws Exception {
if(isKerberized) {
try{
UserGroupInformation userGroupInformation = UserGroupInformation.getLoginUser();
if (userGroupInformation != null) {
try {
userGroupInformation.checkTGTAndReloginFromKeytab();
} catch (IOException ioe) {
LOG.error("Error renewing TGT and relogin", ioe);
userGroupInformation = null;
}
}
if (userGroupInformation != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using Principal = " + userGroupInformation.getUserName());
}
final ServiceTags serviceTag = serviceTags;
ServiceTags ret = userGroupInformation.doAs(new PrivilegedAction<ServiceTags>() {
@Override
public ServiceTags run() {
try {
return uploadServiceTags(serviceTag);
} catch (Exception e) {
LOG.error("Upload of service-tags failed with message ", e);
}
return null;
}
});
return ret;
} else {
LOG.error("Failed to get UserGroupInformation.getLoginUser()");
return null; // This will cause retries !!!
}
}catch(Exception e){
LOG.error("Upload of service-tags failed with message ", e);
}
return null;
}else{
return uploadServiceTags(serviceTags);
}
}
private ServiceTags uploadServiceTags(ServiceTags serviceTags) throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("==> doUpload()");
}
ClientResponse response = null;
if (isRangerCookieEnabled) {
response = uploadServiceTagsUsingCookie(serviceTags);
} else {
response = tagRESTClient.put(REST_URL_IMPORT_SERVICETAGS_RESOURCE, null, serviceTags);
}
if(response == null || response.getStatus() != HttpServletResponse.SC_NO_CONTENT) {
RESTResponse resp = RESTResponse.fromClientResponse(response);
LOG.error("Upload of service-tags failed with message " + resp.getMessage());
if (response == null || resp.getHttpStatusCode() != HttpServletResponse.SC_BAD_REQUEST) {
// NOT an application error
throw new Exception("Upload of service-tags failed with response: " + response);
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== doUpload()");
}
return serviceTags;
}
private ClientResponse uploadServiceTagsUsingCookie(ServiceTags serviceTags) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> uploadServiceTagCache()");
}
ClientResponse clientResponse = null;
if (sessionId != null && isValidRangerCookie) {
clientResponse = tryWithCookie(serviceTags);
} else {
clientResponse = tryWithCred(serviceTags);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== uploadServiceTagCache()");
}
return clientResponse;
}
private ClientResponse tryWithCred(ServiceTags serviceTags) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> tryWithCred");
}
ClientResponse clientResponsebyCred = uploadTagsWithCred(serviceTags);
if (clientResponsebyCred != null && clientResponsebyCred.getStatus() != HttpServletResponse.SC_NO_CONTENT
&& clientResponsebyCred.getStatus() != HttpServletResponse.SC_BAD_REQUEST
&& clientResponsebyCred.getStatus() != HttpServletResponse.SC_OK) {
sessionId = null;
clientResponsebyCred = null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== tryWithCred");
}
return clientResponsebyCred;
}
private ClientResponse tryWithCookie(ServiceTags serviceTags) {
ClientResponse clientResponsebySessionId = uploadTagsWithCookie(serviceTags);
if (clientResponsebySessionId != null
&& clientResponsebySessionId.getStatus() != HttpServletResponse.SC_NO_CONTENT
&& clientResponsebySessionId.getStatus() != HttpServletResponse.SC_BAD_REQUEST
&& clientResponsebySessionId.getStatus() != HttpServletResponse.SC_OK) {
sessionId = null;
isValidRangerCookie = false;
clientResponsebySessionId = null;
}
return clientResponsebySessionId;
}
private synchronized ClientResponse uploadTagsWithCred(ServiceTags serviceTags) {
if (sessionId == null) {
tagRESTClient.resetClient();
ClientResponse response = null;
try {
response = tagRESTClient.put(REST_URL_IMPORT_SERVICETAGS_RESOURCE, null, serviceTags);
} catch (Exception e) {
LOG.error("Failed to get response, Error is : "+e.getMessage());
}
if (response != null) {
if (!(response.toString().contains(REST_URL_IMPORT_SERVICETAGS_RESOURCE))) {
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
} else if (response.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
LOG.warn("Credentials response from ranger is 401.");
} else if (response.getStatus() == HttpServletResponse.SC_OK
|| response.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
cookieList = response.getCookies();
// save cookie received from credentials session login
for (NewCookie cookie : cookieList) {
if (cookie.getName().equalsIgnoreCase(rangerAdminCookieName)) {
sessionId = cookie.toCookie();
isValidRangerCookie = true;
break;
} else {
isValidRangerCookie = false;
}
}
}
}
return response;
} else {
ClientResponse clientResponsebySessionId = uploadTagsWithCookie(serviceTags);
if (!(clientResponsebySessionId.toString().contains(REST_URL_IMPORT_SERVICETAGS_RESOURCE))) {
clientResponsebySessionId.setStatus(HttpServletResponse.SC_NOT_FOUND);
}
return clientResponsebySessionId;
}
}
private ClientResponse uploadTagsWithCookie(ServiceTags serviceTags) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> uploadTagsWithCookie");
}
ClientResponse response = null;
try {
response = tagRESTClient.put(REST_URL_IMPORT_SERVICETAGS_RESOURCE, serviceTags, sessionId);
} catch (Exception e) {
LOG.error("Failed to get response, Error is : "+e.getMessage());
}
if (response != null) {
if (!(response.toString().contains(REST_URL_IMPORT_SERVICETAGS_RESOURCE))) {
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
sessionId = null;
isValidRangerCookie = false;
} else if (response.getStatus() == HttpServletResponse.SC_UNAUTHORIZED) {
sessionId = null;
isValidRangerCookie = false;
} else if (response.getStatus() == HttpServletResponse.SC_NO_CONTENT
|| response.getStatus() == HttpServletResponse.SC_OK) {
List<NewCookie> respCookieList = response.getCookies();
for (NewCookie respCookie : respCookieList) {
if (respCookie.getName().equalsIgnoreCase(rangerAdminCookieName)) {
if (!(sessionId.getValue().equalsIgnoreCase(respCookie.toCookie().getValue()))) {
sessionId = respCookie.toCookie();
}
isValidRangerCookie = true;
break;
}
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== uploadTagsWithCookie");
}
return response;
}
@Override
public boolean start() {
myThread = new Thread(this);
myThread.setDaemon(true);
myThread.start();
return true;
}
@Override
public void stop() {
if (myThread != null && myThread.isAlive()) {
myThread.interrupt();
}
}
@Override
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("==> TagAdminRESTSink.run()");
}
while (true) {
UploadWorkItem uploadWorkItem;
try {
uploadWorkItem = uploadWorkItems.take();
ServiceTags toUpload = uploadWorkItem.getServiceTags();
boolean doRetry;
do {
doRetry = false;
try {
ServiceTags uploaded = doUpload(toUpload);
if (uploaded == null) { // Treat this as if an Exception is thrown by doUpload
doRetry = true;
Thread.sleep(rangerAdminConnectionCheckInterval);
} else {
// ServiceTags uploaded successfully
uploadWorkItem.uploadCompleted(uploaded);
}
} catch (InterruptedException interrupted) {
LOG.error("Caught exception..: ", interrupted);
return;
} catch (Exception exception) {
doRetry = true;
Thread.sleep(rangerAdminConnectionCheckInterval);
}
} while (doRetry);
}
catch (InterruptedException exception) {
LOG.error("Interrupted..: ", exception);
return;
}
}
}
static class UploadWorkItem {
private ServiceTags serviceTags;
private BlockingQueue<ServiceTags> uploadedServiceTags;
ServiceTags getServiceTags() {
return serviceTags;
}
ServiceTags waitForUpload() throws InterruptedException {
return uploadedServiceTags.take();
}
void uploadCompleted(ServiceTags uploaded) throws InterruptedException {
// ServiceTags uploaded successfully
uploadedServiceTags.put(uploaded);
}
UploadWorkItem(ServiceTags serviceTags) {
setServiceTags(serviceTags);
uploadedServiceTags = new ArrayBlockingQueue<ServiceTags>(1);
}
void setServiceTags(ServiceTags serviceTags) {
this.serviceTags = serviceTags;
}
}
}