blob: 60276caea4fab63c530d863edf84718fde2ec6a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas;
import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import com.sun.jersey.multipart.BodyPart;
import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataMultiPart;
import com.sun.jersey.multipart.MultiPart;
import com.sun.jersey.multipart.file.FileDataBodyPart;
import com.sun.jersey.multipart.file.StreamDataBodyPart;
import com.sun.jersey.multipart.impl.MultiPartWriter;
import org.apache.atlas.model.impexp.AtlasServer;
import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.security.SecureClientUtils;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Cookie;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
public abstract class AtlasBaseClient {
public static final String BASE_URI = "api/atlas/";
public static final String TYPES = "types";
public static final String ADMIN_VERSION = "admin/version";
public static final String ADMIN_STATUS = "admin/status";
public static final String ADMIN_METRICS = "admin/metrics";
public static final String ADMIN_IMPORT = "admin/import";
public static final String ADMIN_EXPORT = "admin/export";
public static final String ADMIN_SERVER_TEMPLATE = "%sadmin/server/%s";
public static final String QUERY = "query";
public static final String LIMIT = "limit";
public static final String OFFSET = "offset";
public static final API API_STATUS = new API(BASE_URI + ADMIN_STATUS, HttpMethod.GET, Response.Status.OK);
public static final API API_VERSION = new API(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK);
public static final API API_METRICS = new API(BASE_URI + ADMIN_METRICS, HttpMethod.GET, Response.Status.OK);
static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
static final String UNKNOWN_STATUS = "Unknown status";
static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
// Setting the default value based on testing failovers while client code like quickstart is running.
static final int DEFAULT_NUM_RETRIES = 4;
static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
// Setting the default value based on testing failovers while client code like quickstart is running.
// With number of retries, this gives a total time of about 20s for the server to start.
static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
private static final Logger LOG = LoggerFactory.getLogger(AtlasBaseClient.class);
private static final API IMPORT = new API(BASE_URI + ADMIN_IMPORT, HttpMethod.POST, Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON);
private static final API EXPORT = new API(BASE_URI + ADMIN_EXPORT, HttpMethod.POST, Response.Status.OK, MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM);
private static final String IMPORT_REQUEST_PARAMTER = "request";
private static final String IMPORT_DATA_PARAMETER = "data";
protected WebResource service;
protected Configuration configuration;
private String basicAuthUser;
private String basicAuthPassword;
private AtlasClientContext atlasClientContext;
private boolean retryEnabled = false;
private Cookie cookie = null;
private SecureClientUtils clientUtils;
protected AtlasBaseClient() {
}
protected AtlasBaseClient(String[] baseUrl, String[] basicAuthUserNamePassword) {
if (basicAuthUserNamePassword != null) {
if (basicAuthUserNamePassword.length > 0) {
this.basicAuthUser = basicAuthUserNamePassword[0];
}
if (basicAuthUserNamePassword.length > 1) {
this.basicAuthPassword = basicAuthUserNamePassword[1];
}
}
initializeState(baseUrl, null, null);
}
protected AtlasBaseClient(String... baseUrls) throws AtlasException {
this(getCurrentUGI(), baseUrls);
}
protected AtlasBaseClient(UserGroupInformation ugi, String[] baseUrls) {
this(ugi, ugi.getShortUserName(), baseUrls);
}
protected AtlasBaseClient(UserGroupInformation ugi, String doAsUser, String[] baseUrls) {
initializeState(baseUrls, ugi, doAsUser);
}
protected AtlasBaseClient(String[] baseUrls, Cookie cookie) {
this.cookie = cookie;
initializeState(baseUrls, null, null);
}
@VisibleForTesting
protected AtlasBaseClient(WebResource service, Configuration configuration) {
this.service = service;
this.configuration = configuration;
}
@VisibleForTesting
protected AtlasBaseClient(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamePassword) {
if (basicAuthUserNamePassword != null) {
if (basicAuthUserNamePassword.length > 0) {
this.basicAuthUser = basicAuthUserNamePassword[0];
}
if (basicAuthUserNamePassword.length > 1) {
this.basicAuthPassword = basicAuthUserNamePassword[1];
}
}
initializeState(configuration, baseUrl, null, null);
}
protected static UserGroupInformation getCurrentUGI() throws AtlasException {
try {
return UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new AtlasException(e);
}
}
public void setCookie(Cookie cookie) {
this.cookie = cookie;
}
public boolean isServerReady() throws AtlasServiceException {
WebResource resource = getResource(API_VERSION.getNormalizedPath());
try {
callAPIWithResource(API_VERSION, resource, null, JSONObject.class);
return true;
} catch (ClientHandlerException che) {
return false;
} catch (AtlasServiceException ase) {
if (ase.getStatus() != null && ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) {
LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready");
return false;
}
throw ase;
}
}
/**
* Return status of the service instance the client is pointing to.
*
* @return One of the values in ServiceState.ServiceStateValue or {@link #UNKNOWN_STATUS} if
* there is a JSON parse exception
* @throws AtlasServiceException if there is a HTTP error.
*/
public String getAdminStatus() throws AtlasServiceException {
String result = AtlasBaseClient.UNKNOWN_STATUS;
WebResource resource = getResource(service, API_STATUS.getNormalizedPath());
JSONObject response = callAPIWithResource(API_STATUS, resource, null, JSONObject.class);
try {
result = response.getString("Status");
} catch (JSONException e) {
LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
}
return result;
}
/**
* @return Return metrics of the service instance the client is pointing to
* @throws AtlasServiceException
*/
public AtlasMetrics getAtlasMetrics() throws AtlasServiceException {
return callAPI(API_METRICS, AtlasMetrics.class, null);
}
public <T> T callAPI(API api, Class<T> responseType, Object requestObject, String... params)
throws AtlasServiceException {
return callAPIWithResource(api, getResource(api, params), requestObject, responseType);
}
public <T> T callAPI(API api, GenericType<T> responseType, Object requestObject, String... params)
throws AtlasServiceException {
return callAPIWithResource(api, getResource(api, params), requestObject, responseType);
}
public <T> T callAPI(API api, Class<T> responseType, Object requestBody,
MultivaluedMap<String, String> queryParams, String... params) throws AtlasServiceException {
WebResource resource = getResource(api, queryParams, params);
return callAPIWithResource(api, resource, requestBody, responseType);
}
public <T> T callAPI(API api, Class<T> responseType, MultivaluedMap<String, String> queryParams, String... params)
throws AtlasServiceException {
WebResource resource = getResource(api, queryParams, params);
return callAPIWithResource(api, resource, null, responseType);
}
public <T> T callAPI(API api, GenericType<T> responseType, MultivaluedMap<String, String> queryParams, String... params)
throws AtlasServiceException {
WebResource resource = getResource(api, queryParams, params);
return callAPIWithResource(api, resource, null, responseType);
}
public <T> T callAPI(API api, Class<T> responseType, MultivaluedMap<String, String> queryParams)
throws AtlasServiceException {
return callAPIWithResource(api, getResource(api, queryParams), null, responseType);
}
public <T> T callAPI(API api, Class<T> responseType, String queryParamKey, List<String> queryParamValues)
throws AtlasServiceException {
return callAPIWithResource(api, getResource(api, queryParamKey, queryParamValues), null, responseType);
}
@VisibleForTesting
protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
DefaultClientConfig config = new DefaultClientConfig();
// Enable POJO mapping feature
config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
config.getClasses().add(MultiPartWriter.class);
config.getClasses().add(JacksonJaxbJsonProvider.class);
int readTimeout = configuration.getInt("atlas.client.readTimeoutMSecs", 60000);
int connectTimeout = configuration.getInt("atlas.client.connectTimeoutMSecs", 60000);
if (configuration.getBoolean(TLS_ENABLED, false)) {
// create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced
// to create a
// configuration object, persist it, then subsequently pass in an empty configuration to SSLFactory
try {
SecureClientUtils.persistSSLClientConfiguration(configuration, System.getProperty("atlas.conf") );
} catch (Exception e) {
LOG.info("Error processing client configuration.", e);
}
}
final URLConnectionClientHandler handler;
clientUtils = new SecureClientUtils();
boolean isKerberosEnabled = AuthenticationUtil.isKerberosAuthenticationEnabled(ugi);
if (isKerberosEnabled) {
handler = clientUtils.getClientConnectionHandler(config, configuration, doAsUser, ugi);
} else {
if (configuration.getBoolean(TLS_ENABLED, false)) {
handler = clientUtils.getUrlConnectionClientHandler();
} else {
handler = new URLConnectionClientHandler();
}
}
Client client = new Client(handler, config);
client.setReadTimeout(readTimeout);
client.setConnectTimeout(connectTimeout);
return client;
}
public void close() {
if (clientUtils != null) {
clientUtils.destroyFactory();
}
}
@VisibleForTesting
protected String determineActiveServiceURL(String[] baseUrls, Client client) {
if (baseUrls.length == 0) {
throw new IllegalArgumentException("Base URLs cannot be null or empty");
}
final String baseUrl;
AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls);
if (atlasServerEnsemble.hasSingleInstance()) {
baseUrl = atlasServerEnsemble.firstURL();
LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl);
} else {
try {
baseUrl = selectActiveServerAddress(client, atlasServerEnsemble);
} catch (AtlasServiceException e) {
LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e);
throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e);
}
}
return baseUrl;
}
protected Configuration getClientProperties() {
try {
if (configuration == null) {
configuration = ApplicationProperties.get();
}
} catch (AtlasException e) {
LOG.error("Exception while loading configuration.", e);
}
return configuration;
}
protected WebResource getResource(String path, String... pathParams) {
return getResource(service, path, pathParams);
}
protected <T> T callAPIWithResource(API api, WebResource resource, Object requestObject, Class<T> responseType) throws AtlasServiceException {
GenericType<T> genericType = null;
if (responseType != null) {
genericType = new GenericType<>(responseType);
}
return callAPIWithResource(api, resource, requestObject, genericType);
}
protected <T> T callAPIWithResource(API api, WebResource resource, Object requestObject, GenericType<T> responseType) throws AtlasServiceException {
ClientResponse clientResponse = null;
int i = 0;
do {
LOG.info("------------------------------------------------------");
LOG.info("Call : {} {}", api.getMethod(), api.getNormalizedPath());
LOG.info("Content-type : {} ", api.getConsumes());
LOG.info("Accept : {} ", api.getProduces());
if (requestObject != null) {
LOG.info("Request : {}", requestObject);
}
WebResource.Builder requestBuilder = resource.getRequestBuilder();
// Set content headers
requestBuilder
.accept(api.getProduces())
.type(api.getConsumes());
// Set cookie if present
if (cookie != null) {
requestBuilder.cookie(cookie);
}
clientResponse = requestBuilder.method(api.getMethod(), ClientResponse.class, requestObject);
if (LOG.isDebugEnabled()) {
LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus());
}
if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) {
if (responseType == null) {
return null;
}
try {
if(api.getProduces().equals(MediaType.APPLICATION_OCTET_STREAM)) {
return (T) clientResponse.getEntityInputStream();
} else if (responseType.getRawClass().equals(JSONObject.class)) {
String stringEntity = clientResponse.getEntity(String.class);
try {
JSONObject jsonObject = new JSONObject(stringEntity);
LOG.debug("Response = {}", jsonObject);
LOG.info("------------------------------------------------------");
return (T) jsonObject;
} catch (JSONException e) {
throw new AtlasServiceException(api, e);
}
} else {
T entity = clientResponse.getEntity(responseType);
LOG.debug("Response = {}", entity);
LOG.info("------------------------------------------------------");
return entity;
}
} catch (ClientHandlerException e) {
throw new AtlasServiceException(api, e);
}
} else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
break;
} else {
LOG.error("Got a service unavailable when calling: {}, will retry..", resource);
sleepBetweenRetries();
}
i++;
} while (i < getNumberOfRetries());
throw new AtlasServiceException(api, clientResponse);
}
protected WebResource getResource(API api, String... pathParams) {
return getResource(service, api, pathParams);
}
protected WebResource getResource(API api, MultivaluedMap<String, String> queryParams, String... pathParams) {
WebResource resource = service.path(api.getNormalizedPath());
resource = appendPathParams(resource, pathParams);
resource = appendQueryParams(queryParams, resource);
return resource;
}
protected WebResource getResource(API api, MultivaluedMap<String, String> queryParams) {
return getResource(service, api, queryParams);
}
protected abstract API formatPathParameters(API api, String... params);
void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
initializeState(getClientProperties(), baseUrls, ugi, doAsUser);
}
void initializeState(Configuration configuration, String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
this.configuration = configuration;
Client client = getClient(configuration, ugi, doAsUser);
if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) {
final HTTPBasicAuthFilter authFilter = new HTTPBasicAuthFilter(basicAuthUser, basicAuthPassword);
client.addFilter(authFilter);
}
String activeServiceUrl = determineActiveServiceURL(baseUrls, client);
atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser);
service = client.resource(UriBuilder.fromUri(activeServiceUrl).build());
}
void sleepBetweenRetries() {
try {
Thread.sleep(getSleepBetweenRetriesMs());
} catch (InterruptedException e) {
LOG.error("Interrupted from sleeping between retries.", e);
}
}
int getNumberOfRetries() {
return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES);
}
public InputStream exportData(AtlasExportRequest request) throws AtlasServiceException {
try {
return (InputStream) callAPI(EXPORT, Object.class, request);
} catch (Exception e) {
LOG.error("error writing to file", e);
throw new AtlasServiceException(e);
}
}
public void exportData(AtlasExportRequest request, String absolutePath) throws AtlasServiceException {
OutputStream fileOutputStream = null;
try {
InputStream inputStream = exportData(request);
fileOutputStream = new FileOutputStream(new File(absolutePath));
byte[] buffer = new byte[8 * 1024];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
fileOutputStream.write(buffer, 0, bytesRead);
}
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(fileOutputStream);
} catch (Exception e) {
LOG.error("error writing to file", e);
throw new AtlasServiceException(e);
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
LOG.error("error closing file", e);
throw new AtlasServiceException(e);
}
}
}
}
public AtlasImportResult importData(AtlasImportRequest request, String absoluteFilePath) throws AtlasServiceException {
return performImportData(getImportRequestBodyPart(request),
new FileDataBodyPart(IMPORT_DATA_PARAMETER, new File(absoluteFilePath)));
}
public AtlasImportResult importData(AtlasImportRequest request, InputStream stream) throws AtlasServiceException {
return performImportData(getImportRequestBodyPart(request),
new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
}
private AtlasImportResult performImportData(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException {
MultiPart multipartEntity = new FormDataMultiPart()
.bodyPart(requestPart)
.bodyPart(filePart);
return callAPI(IMPORT, AtlasImportResult.class, multipartEntity);
}
private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest request) {
return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
}
public AtlasServer getServer(String serverName) throws AtlasServiceException {
API api = new API(String.format(ADMIN_SERVER_TEMPLATE, BASE_URI, serverName), HttpMethod.GET, Response.Status.OK);
return callAPI(api, AtlasServer.class, null);
}
boolean isRetryableException(ClientHandlerException che) {
return che.getCause().getClass().equals(IOException.class)
|| che.getCause().getClass().equals(ConnectException.class);
}
void handleClientHandlerException(ClientHandlerException che) {
if (isRetryableException(che)) {
atlasClientContext.getClient().destroy();
LOG.warn("Destroyed current context while handling ClientHandlerEception.");
LOG.warn("Will retry and create new context.");
sleepBetweenRetries();
initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(),
atlasClientContext.getDoAsUser());
return;
}
throw che;
}
@VisibleForTesting
JSONObject callAPIWithRetries(API api, Object requestObject, ResourceCreator resourceCreator)
throws AtlasServiceException {
for (int i = 0; i < getNumberOfRetries(); i++) {
WebResource resource = resourceCreator.createResource();
try {
LOG.debug("Using resource {} for {} times", resource.getURI(), i + 1);
return callAPIWithResource(api, resource, requestObject, JSONObject.class);
} catch (ClientHandlerException che) {
if (i == (getNumberOfRetries() - 1)) {
throw che;
}
LOG.warn("Handled exception in calling api {}", api.getNormalizedPath(), che);
LOG.warn("Exception's cause: {}", che.getCause().getClass());
handleClientHandlerException(che);
}
}
throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries."));
}
@VisibleForTesting
void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
@VisibleForTesting
void setService(WebResource resource) {
this.service = resource;
}
private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble)
throws AtlasServiceException {
List<String> serverInstances = serverEnsemble.getMembers();
String activeServerAddress = null;
for (String serverInstance : serverInstances) {
LOG.info("Trying with address {}", serverInstance);
activeServerAddress = getAddressIfActive(client, serverInstance);
if (activeServerAddress != null) {
LOG.info("Found service {} as active service.", serverInstance);
break;
}
}
if (activeServerAddress != null)
return activeServerAddress;
else
throw new AtlasServiceException(API_STATUS, new RuntimeException("Could not find any active instance"));
}
private String getAddressIfActive(Client client, String serverInstance) {
String activeServerAddress = null;
for (int i = 0; i < getNumberOfRetries(); i++) {
try {
service = client.resource(UriBuilder.fromUri(serverInstance).build());
String adminStatus = getAdminStatus();
if (StringUtils.equals(adminStatus, "ACTIVE")) {
activeServerAddress = serverInstance;
break;
} else {
LOG.info("attempt #{}: Service {} - is not active. status={}", (i + 1), serverInstance, adminStatus);
}
} catch (Exception e) {
LOG.error("attempt #{}: Service {} - could not get status", (i + 1), serverInstance, e);
}
sleepBetweenRetries();
}
return activeServerAddress;
}
private WebResource getResource(WebResource service, String path, String... pathParams) {
WebResource resource = service.path(path);
resource = appendPathParams(resource, pathParams);
return resource;
}
private int getSleepBetweenRetriesMs() {
return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, AtlasBaseClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS);
}
// Modify URL to include the path params
private WebResource getResource(WebResource service, API api, String... pathParams) {
WebResource resource = service.path(api.getNormalizedPath());
resource = appendPathParams(resource, pathParams);
return resource;
}
private WebResource getResource(API api, String queryParamKey, List<String> queryParamValues) {
WebResource resource = service.path(api.getNormalizedPath());
for (String queryParamValue : queryParamValues) {
if (StringUtils.isNotBlank(queryParamKey) && StringUtils.isNotBlank(queryParamValue)) {
resource = resource.queryParam(queryParamKey, queryParamValue);
}
}
return resource;
}
private WebResource appendPathParams(WebResource resource, String[] pathParams) {
if (pathParams != null) {
for (String pathParam : pathParams) {
resource = resource.path(pathParam);
}
}
return resource;
}
// Modify URL to include the query params
private WebResource getResource(WebResource service, API api, MultivaluedMap<String, String> queryParams) {
WebResource resource = service.path(api.getNormalizedPath());
resource = appendQueryParams(queryParams, resource);
return resource;
}
private WebResource appendQueryParams(MultivaluedMap<String, String> queryParams, WebResource resource) {
if (null != queryParams && !queryParams.isEmpty()) {
for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
for (String value : entry.getValue()) {
if (StringUtils.isNotBlank(value)) {
resource = resource.queryParam(entry.getKey(), value);
}
}
}
}
return resource;
}
public static class API {
private final String method;
private final String path;
private final String consumes;
private final String produces;
private final Response.Status status;
public API(String path, String method, Response.Status status) {
this(path, method, status, JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON);
}
public API(String path, String method, Response.Status status, String consumes, String produces) {
this.path = path;
this.method = method;
this.status = status;
this.consumes = consumes;
this.produces = produces;
}
public String getMethod() {
return method;
}
public String getPath() {
return path;
}
public String getNormalizedPath() {
return Paths.get(path).normalize().toString();
}
public Response.Status getExpectedStatus() {
return status;
}
public String getConsumes() {
return consumes;
}
public String getProduces() {
return produces;
}
}
/**
* A class to capture input state while creating the client.
* <p>
* The information here will be reused when the client is re-initialized on switch-over
* in case of High Availability.
*/
private class AtlasClientContext {
private String[] baseUrls;
private Client client;
private String doAsUser;
private UserGroupInformation ugi;
public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
this.baseUrls = baseUrls;
this.client = client;
this.ugi = ugi;
this.doAsUser = doAsUser;
}
public Client getClient() {
return client;
}
public String[] getBaseUrls() {
return baseUrls;
}
public String getDoAsUser() {
return doAsUser;
}
public UserGroupInformation getUgi() {
return ugi;
}
}
}