blob: b61a07edb51c5489d84bdd4b90f8f98b3d5ffd66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ranger.services.yarn.client;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
import org.apache.ranger.plugin.client.BaseClient;
import org.apache.ranger.plugin.client.HadoopException;
import org.apache.ranger.services.yarn.client.json.model.YarnSchedulerResponse;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
public class YarnClient extends BaseClient {
private static final Logger LOG = Logger.getLogger(YarnClient.class);
private static final String EXPECTED_MIME_TYPE = "application/json";
private static final String YARN_LIST_API_ENDPOINT = "/ws/v1/cluster/scheduler";
private static final String errMessage = " You can still save the repository and start creating "
+ "policies, but you would not be able to use autocomplete for "
+ "resource names. Check ranger_admin.log for more info.";
String yarnQUrl;
String userName;
String password;
public YarnClient(String serviceName, Map<String, String> configs) {
super(serviceName,configs,"yarn-client");
this.yarnQUrl = configs.get("yarn.url");
this.userName = configs.get("username");
this.password = configs.get("password");
if (this.yarnQUrl == null || this.yarnQUrl.isEmpty()) {
LOG.error("No value found for configuration 'yarn.url'. YARN resource lookup will fail");
}
if (this.userName == null || this.userName.isEmpty()) {
LOG.error("No value found for configuration 'username'. YARN resource lookup will fail");
}
if (this.password == null || this.password.isEmpty()) {
LOG.error("No value found for configuration 'password'. YARN resource lookup will fail");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Yarn Client is build with url [" + this.yarnQUrl + "] user: [" + this.userName + "], password: [" + "*********" + "]");
}
}
public List<String> getQueueList(final String queueNameMatching, final List<String> existingQueueList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting Yarn queue list for queueNameMatching : " + queueNameMatching);
}
final String errMsg = errMessage;
List<String> ret = null;
Callable<List<String>> callableYarnQListGetter = new Callable<List<String>>() {
@Override
public List<String> call() {
List<String> yarnQueueListGetter = null;
Subject subj = getLoginSubject();
if (subj != null) {
yarnQueueListGetter = Subject.doAs(subj, new PrivilegedAction<List<String>>() {
@Override
public List<String> run() {
if (yarnQUrl == null || yarnQUrl.trim().isEmpty()) {
return null;
}
String[] yarnQUrls = yarnQUrl.trim().split("[,;]");
if(yarnQUrls == null || yarnQUrls.length == 0)
{
return null;
}
Client client = Client.create();
ClientResponse response = null;
for(String currentUrl : yarnQUrls)
{
if(currentUrl == null || currentUrl.trim().isEmpty())
{
continue;
}
String url = currentUrl.trim() + YARN_LIST_API_ENDPOINT;
try {
response = getQueueResponse(url, client);
if (response != null) {
if(response.getStatus() == 200)
{
break;
}
else{
response.close();
}
}
} catch (Throwable t) {
String msgDesc = "Exception while getting Yarn Queue List."
+ " URL : " + url;
LOG.error(msgDesc, t);
}
}
List<String> lret = new ArrayList<String>();
try {
if (response != null && response.getStatus() == 200) {
String jsonString = response.getEntity(String.class);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
YarnSchedulerResponse yarnQResponse = gson.fromJson(jsonString, YarnSchedulerResponse.class);
if (yarnQResponse != null) {
List<String> yarnQueueList = yarnQResponse.getQueueNames();
if (yarnQueueList != null) {
for ( String yarnQueueName : yarnQueueList) {
if ( existingQueueList != null && existingQueueList.contains(yarnQueueName)) {
continue;
}
if (queueNameMatching == null || queueNameMatching.isEmpty()
|| yarnQueueName.startsWith(queueNameMatching)) {
if (LOG.isDebugEnabled()) {
LOG.debug("getQueueList():Adding yarnQueue " + yarnQueueName);
}
lret.add(yarnQueueName);
}
}
}
}
} else {
String msgDesc = "Unable to get a valid response for "
+ "expected mime type : [" + EXPECTED_MIME_TYPE
+ "] URL : " + yarnQUrl + " - got null response.";
LOG.error(msgDesc);
HadoopException hdpException = new HadoopException(msgDesc);
hdpException.generateResponseDataMap(false, msgDesc,
msgDesc + errMsg, null, null);
throw hdpException;
}
} catch (HadoopException he) {
throw he;
} catch (Throwable t) {
String msgDesc = "Exception while getting Yarn Queue List."
+ " URL : " + yarnQUrl;
HadoopException hdpException = new HadoopException(msgDesc,
t);
LOG.error(msgDesc, t);
hdpException.generateResponseDataMap(false,
BaseClient.getMessage(t), msgDesc + errMsg, null,
null);
throw hdpException;
} finally {
if (response != null) {
response.close();
}
if (client != null) {
client.destroy();
}
}
return lret;
}
private ClientResponse getQueueResponse(String url, Client client) {
if (LOG.isDebugEnabled()) {
LOG.debug("getQueueResponse():calling " + url);
}
WebResource webResource = client.resource(url);
ClientResponse response = webResource.accept(EXPECTED_MIME_TYPE)
.get(ClientResponse.class);
if (response != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("getQueueResponse():response.getStatus()= " + response.getStatus());
}
if (response.getStatus() != 200) {
LOG.info("getQueueResponse():response.getStatus()= " + response.getStatus() + " for URL " + url + ", failed to get queue list");
String jsonString = response.getEntity(String.class);
LOG.info(jsonString);
}
}
return response;
}
} );
}
return yarnQueueListGetter;
}
};
try {
ret = timedTask(callableYarnQListGetter, 5, TimeUnit.SECONDS);
} catch ( Throwable t) {
LOG.error("Unable to get Yarn Queue list from [" + yarnQUrl + "]", t);
String msgDesc = "Unable to get a valid response for "
+ "expected mime type : [" + EXPECTED_MIME_TYPE
+ "] URL : " + yarnQUrl;
HadoopException hdpException = new HadoopException(msgDesc,
t);
LOG.error(msgDesc, t);
hdpException.generateResponseDataMap(false,
BaseClient.getMessage(t), msgDesc + errMsg, null,
null);
throw hdpException;
}
return ret;
}
public static Map<String, Object> connectionTest(String serviceName,
Map<String, String> configs) {
String errMsg = errMessage;
boolean connectivityStatus = false;
Map<String, Object> responseData = new HashMap<String, Object>();
YarnClient yarnClient = getYarnClient(serviceName,
configs);
List<String> strList = getYarnResource(yarnClient, "",null);
if (strList != null && strList.size() > 0 ) {
if (LOG.isDebugEnabled()) {
LOG.debug("TESTING list size" + strList.size() + " Yarn Queues");
}
connectivityStatus = true;
}
if (connectivityStatus) {
String successMsg = "ConnectionTest Successful";
BaseClient.generateResponseDataMap(connectivityStatus, successMsg,
successMsg, null, null, responseData);
} else {
String failureMsg = "Unable to retrieve any Yarn Queues using given parameters.";
BaseClient.generateResponseDataMap(connectivityStatus, failureMsg,
failureMsg + errMsg, null, null, responseData);
}
return responseData;
}
public static YarnClient getYarnClient(String serviceName,
Map<String, String> configs) {
YarnClient yarnClient = null;
if (LOG.isDebugEnabled()) {
LOG.debug("Getting YarnClient for datasource: " + serviceName);
}
String errMsg = errMessage;
if (configs == null || configs.isEmpty()) {
String msgDesc = "Could not connect as Connection ConfigMap is empty.";
LOG.error(msgDesc);
HadoopException hdpException = new HadoopException(msgDesc);
hdpException.generateResponseDataMap(false, msgDesc, msgDesc
+ errMsg, null, null);
throw hdpException;
} else {
yarnClient = new YarnClient (serviceName, configs);
}
return yarnClient;
}
public static List<String> getYarnResource (final YarnClient yarnClient,
String yarnQname, List<String> existingQueueName) {
List<String> resultList = new ArrayList<String>();
String errMsg = errMessage;
try {
if (yarnClient == null) {
String msgDesc = "Unable to get Yarn Queue : YarnClient is null.";
LOG.error(msgDesc);
HadoopException hdpException = new HadoopException(msgDesc);
hdpException.generateResponseDataMap(false, msgDesc, msgDesc
+ errMsg, null, null);
throw hdpException;
}
if (yarnQname != null) {
String finalyarnQueueName = yarnQname.trim();
resultList = yarnClient
.getQueueList(finalyarnQueueName,existingQueueName);
if (resultList != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Returning list of " + resultList.size() + " Yarn Queues");
}
}
}
}
catch (HadoopException he) {
throw he;
}
catch (Throwable t) {
String msgDesc = "getYarnResource: Unable to get Yarn resources.";
LOG.error(msgDesc, t);
HadoopException hdpException = new HadoopException(msgDesc);
hdpException.generateResponseDataMap(false,
BaseClient.getMessage(t), msgDesc + errMsg, null, null);
throw hdpException;
}
return resultList;
}
public static <T> T timedTask(Callable<T> callableObj, long timeout,
TimeUnit timeUnit) throws Exception {
return callableObj.call();
}
}