blob: 5058527983f72b144c855bb46c3624dfa436f446 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.stellar.dsl.functions;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.stellar.common.utils.JSONUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.Stellar;
import org.apache.metron.stellar.dsl.StellarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
import static org.apache.metron.stellar.dsl.functions.RestConfig.*;
/**
* Defines functions that enable REST requests with proper result and error handling. Depends on an
* Apache HttpComponents client being supplied as a Stellar HTTP_CLIENT capability. Exposes various Http settings
* including authentication, proxy and timeouts through the global config with the option to override any settings
* through a config object supplied in the expression.
*/
public class RestFunctions {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* The CloseableHttpClient.
*/
private static CloseableHttpClient closeableHttpClient;
/**
* Executor used to impose a hard request timeout.
*/
private static ScheduledExecutorService scheduledExecutorService;
/**
* Initialize a single HttpClient to be shared by REST functions.
* @param context
*/
private static synchronized void initializeHttpClient(Context context) {
if (closeableHttpClient == null) {
closeableHttpClient = getHttpClient(context);
}
}
/**
* Close the shared HttpClient.
*/
private static synchronized void closeHttpClient() throws IOException {
if (closeableHttpClient != null) {
closeableHttpClient.close();
closeableHttpClient = null;
}
}
/**
* Initialize a single ExecutorService to be shared by REST functions.
*/
private static synchronized void initializeExecutorService() {
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
}
}
/**
* Shutdown the shared ExecutorService.
*/
private static synchronized void closeExecutorService() {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
scheduledExecutorService = null;
}
}
@Stellar(
namespace = "REST",
name = "GET",
description = "Performs a REST GET request and parses the JSON results into a map.",
params = {
"url - URI to the REST service",
"rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
"of the same name. Default is the empty Map, meaning no overrides.",
"query_parameters - Optional - Map (in curly braces) of name:value pairs that will be added to the request as query parameters"
},
returns = "JSON results as a Map")
public static class RestGet implements StellarFunction {
/**
* Whether the function has been initialized.
*/
private boolean initialized = false;
/**
* Initialize the function by creating a ScheduledExecutorService and looking up the CloseableHttpClient from the
* Stellar context.
* @param context
*/
@Override
public void initialize(Context context) {
initializeExecutorService();
initializeHttpClient(context);
initialized = true;
}
@Override
public boolean isInitialized() {
return initialized;
}
/**
* Apply the function.
* @param args The function arguments including uri and rest config.
* @param context Stellar context
*/
@Override
public Object apply(List<Object> args, Context context) throws ParseException {
String uriString = getArg(0, String.class, args);
Map<String, Object> functionRestConfig = null;
Map<String, Object> queryParameters = new HashMap<>();
if (args.size() > 1) {
functionRestConfig = getArg(1, Map.class, args);
if (args.size() == 3) {
queryParameters = getArg(2, Map.class, args);
}
}
// Build the RestConfig by applying settins in order of precedence
Map<String, Object> globalRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_SETTINGS);
Map<String, Object> getRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_GET_SETTINGS);
RestConfig restConfig = buildRestConfig(globalRestConfig, getRestConfig, functionRestConfig);
try {
HttpGet httpGet = buildGetRequest(uriString, queryParameters);
return executeRequest(restConfig, httpGet);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
return restConfig.getErrorValueOverride();
}
}
@Override
public void close() throws IOException {
closeHttpClient();
closeExecutorService();
}
private HttpGet buildGetRequest(String uri, Map<String, Object> queryParameters) throws URISyntaxException {
HttpGet httpGet = new HttpGet(getURI(uri, queryParameters));
httpGet.addHeader("Accept", "application/json");
return httpGet;
}
}
@Stellar(
namespace = "REST",
name = "POST",
description = "Performs a REST POST request and parses the JSON results into a map.",
params = {
"url - URI to the REST service",
"post_data - POST data that will be sent in the POST request. Must be well-formed JSON unless the 'enforce.json' property is set to false.",
"rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
"of the same name. Default is the empty Map, meaning no overrides.",
"query_parameters - Optional - Map (in curly braces) of name:value pairs that will be added to the request as query parameters"
},
returns = "JSON results as a Map")
public static class RestPost implements StellarFunction {
/**
* Whether the function has been initialized.
*/
private boolean initialized = false;
/**
* Initialize the function by creating a ScheduledExecutorService and looking up the CloseableHttpClient from the
* Stellar context.
* @param context
*/
@Override
public void initialize(Context context) {
initializeExecutorService();
initializeHttpClient(context);
initialized = true;
}
@Override
public boolean isInitialized() {
return initialized;
}
@Override
public Object apply(List<Object> args, Context context) throws ParseException {
String uriString = getArg(0, String.class, args);
Object dataObject = getArg(1, Object.class, args);
Map<String, Object> functionRestConfig = null;
Map<String, Object> queryParameters = new HashMap<>();
if (args.size() > 2) {
functionRestConfig = getArg(2, Map.class, args);
if (args.size() == 4) {
queryParameters = getArg(3, Map.class, args);
}
}
// Build the RestConfig by applying settins in order of precedence
Map<String, Object> globalRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_SETTINGS);
Map<String, Object> postRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_POST_SETTINGS);
RestConfig restConfig = buildRestConfig(globalRestConfig, postRestConfig, functionRestConfig);
try {
HttpPost httpPost = buildPostRequest(restConfig, uriString, dataObject, queryParameters);
return executeRequest(restConfig, httpPost);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
return restConfig.getErrorValueOverride();
}
}
@Override
public void close() throws IOException {
closeHttpClient();
closeExecutorService();
}
private HttpPost buildPostRequest(RestConfig restConfig, String uriString, Object dataObject, Map<String, Object> queryParameters) throws JsonProcessingException, URISyntaxException, UnsupportedEncodingException {
String body = getPostData(restConfig, dataObject);
URI uri = getURI(uriString, queryParameters);
HttpPost httpPost = new HttpPost(uri);
httpPost.setEntity(new StringEntity(body));
httpPost.addHeader("Accept", "application/json");
httpPost.addHeader("Content-type", "application/json");
return httpPost;
}
/**
* Serializes the supplied POST data to be sent in the POST request. Checks for well-formed JSON by default unless 'enforce.json' is set to false.
* @param restConfig RestConfig
* @param arg POST data
* @return Serialized POST data
* @throws JsonProcessingException
*/
private String getPostData(RestConfig restConfig, Object arg) throws JsonProcessingException {
String data = "";
if (arg == null) {
return data;
}
if (arg instanceof Map) {
data = JSONUtils.INSTANCE.toJSON(arg, false);
} else {
data = arg.toString();
if (restConfig.enforceJson()) {
try {
JSONUtils.INSTANCE.toJSONObject(data);
} catch (org.json.simple.parser.ParseException e) {
throw new IllegalArgumentException(String.format("POST data '%s' must be properly formatted JSON. " +
"Set the '%s' property to false to disable this check.", data, RestConfig.ENFORCE_JSON));
}
}
}
return data;
}
}
/**
* Get an argument from a list of arguments.
*
* @param index The index within the list of arguments.
* @param clazz The type expected.
* @param args All of the arguments.
* @param <T> The type of the argument expected.
*/
public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
if(index >= args.size()) {
throw new IllegalArgumentException(format("Expected at least %d argument(s), found %d", index+1, args.size()));
}
return ConversionUtils.convert(args.get(index), clazz);
}
/**
* Retrieves the ClosableHttpClient from a pooling connection manager.
*
* @param context The execution context.
* @return A ClosableHttpClient.
*/
protected static CloseableHttpClient getHttpClient(Context context) {
RestConfig restConfig = buildRestConfig(getGlobalConfig(context));
PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
return HttpClients.custom()
.setConnectionManager(cm)
.build();
}
protected static PoolingHttpClientConnectionManager getConnectionManager(RestConfig restConfig) {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
cm.setMaxTotal(restConfig.getPoolingMaxTotal());
}
if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
}
return cm;
}
@SuppressWarnings("unchecked")
private static Map<String, Object> getGlobalConfig(Context context) {
Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, false);
return globalCapability.map(o -> (Map<String, Object>) o).orElseGet(HashMap::new);
}
/**
* Build the RestConfig by applying settings in order of precedence (last item in the input list has highest priority).
* Only settings specified in the rest config will override lower priority config settings.
* @param configs
* @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
protected static RestConfig buildRestConfig(Map<String, Object>... configs) {
RestConfig restConfig = new RestConfig();
// Add settings in order of precedence
for(Map<String, Object> config: configs) {
if (config != null) {
restConfig.putAll(config);
}
}
return restConfig;
}
/**
* Builds a URI from the supplied URI string and adds query parameters.
* @param uriString
* @param queryParameters
* @return
* @throws URISyntaxException
*/
private static URI getURI(String uriString, Map<String, Object> queryParameters) throws URISyntaxException {
URIBuilder uriBuilder = new URIBuilder(uriString);
if (queryParameters != null) {
for(Map.Entry<String, Object> entry: queryParameters.entrySet()) {
uriBuilder.setParameter(entry.getKey(), (String) entry.getValue());
}
}
return uriBuilder.build();
}
/**
* Returns the proxy HttpHost object if the proxy rest config settings are set.
* @param restConfig
* @return
*/
protected static Optional<HttpHost> getProxy(RestConfig restConfig) {
Optional<HttpHost> proxy = Optional.empty();
if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != null) {
proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), restConfig.getProxyPort(), "http"));
}
return proxy;
}
/**
* Builds the RequestConfig object by setting HttpClient settings defined in the rest config.
* @param restConfig
* @param proxy
* @return
*/
protected static RequestConfig getRequestConfig(RestConfig restConfig, Optional<HttpHost> proxy) {
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
if (restConfig.getConnectTimeout() != null) {
requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
}
if (restConfig.getConnectionRequestTimeout() != null) {
requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
}
if (restConfig.getSocketTimeout() != null) {
requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
}
proxy.ifPresent(requestConfigBuilder::setProxy);
return requestConfigBuilder.build();
}
/**
* Builds the HttpClientContext object by setting the basic auth and/or proxy basic auth credentials when the
* necessary rest config settings are configured. Passwords are stored in HDFS.
* @param restConfig
* @param target
* @param proxy
* @return
* @throws IOException
*/
protected static HttpClientContext getHttpClientContext(RestConfig restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
HttpClientContext httpClientContext = HttpClientContext.create();
boolean credentialsAdded = false;
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// Add the basic auth credentials if the rest config settings are present
if (restConfig.getBasicAuthUser() != null && restConfig.getBasicAuthPasswordPath() != null) {
String password = new String(readBytes(new Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
credentialsProvider.setCredentials(
new AuthScope(target),
new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), password));
credentialsAdded = true;
}
// Add the proxy basic auth credentials if the rest config settings are present
if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
restConfig.getProxyBasicAuthPasswordPath() != null) {
String password = new String(readBytes(new Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
credentialsProvider.setCredentials(
new AuthScope(proxy.get()),
new UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
credentialsAdded = true;
}
if (credentialsAdded) {
httpClientContext.setCredentialsProvider(credentialsProvider);
}
return httpClientContext;
}
/**
* Read bytes from a HDFS path.
* @param inPath
* @return
* @throws IOException
*/
private static byte[] readBytes(Path inPath) throws IOException {
FileSystem fs = FileSystem.get(inPath.toUri(), new Configuration());
try (FSDataInputStream inputStream = fs.open(inPath)) {
return IOUtils.toByteArray(inputStream);
}
}
/**
* Perform the HttpClient request and handle the results. A configurable list of status codes are accepted and the
* response content (expected to be json) is parsed into a Map. Values returned on errors and when response content
* is also configurable. The rest config "timeout" setting is imposed in this method and will abort the get request
* if exceeded.
*
* @param restConfig
* @param httpRequestBase
* @return
* @throws IOException
*/
protected static Object executeRequest(RestConfig restConfig, HttpRequestBase httpRequestBase) throws IOException {
URI uri = httpRequestBase.getURI();
HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
Optional<HttpHost> proxy = getProxy(restConfig);
HttpClientContext httpClientContext = getHttpClientContext(restConfig, target, proxy);
httpRequestBase.setConfig(getRequestConfig(restConfig, proxy));
// Schedule a command to abort the request if the timeout is exceeded
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(httpRequestBase::abort, restConfig.getTimeout(), TimeUnit.MILLISECONDS);
CloseableHttpResponse response;
try {
response = closeableHttpClient.execute(httpRequestBase, httpClientContext);
} catch(Exception e) {
// Report a timeout if the httpGet request was aborted. Otherwise rethrow exception.
if (httpRequestBase.isAborted()) {
throw new IOException(String.format("Total Stellar REST request time to %s exceeded the configured timeout of %d ms.", httpRequestBase.getURI().toString(), restConfig.getTimeout()));
} else {
throw e;
}
}
// Cancel the future if the request finished within the timeout
if (!scheduledFuture.isDone()) {
scheduledFuture.cancel(true);
}
int statusCode = response.getStatusLine().getStatusCode();
LOG.debug("request = {}; response = {}", httpRequestBase, response);
if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
HttpEntity httpEntity = response.getEntity();
// Parse the response if present, return the empty value override if not
Optional<Object> parsedResponse = parseResponse(restConfig, httpRequestBase, httpEntity);
return parsedResponse.orElseGet(restConfig::getEmptyContentOverride);
} else {
throw new IOException(String.format("Stellar REST request to %s expected status code to be one of %s but " +
"failed with http status code %d: %s",
httpRequestBase.getURI().toString(),
restConfig.getResponseCodesAllowed().toString(),
statusCode,
EntityUtils.toString(response.getEntity())));
}
}
/**
* Parses the Http response into a Map and checks for content length.
* @param restConfig
* @param httpUriRequest
* @param httpEntity
* @return
* @throws IOException
*/
protected static Optional<Object> parseResponse(RestConfig restConfig, HttpUriRequest httpUriRequest, HttpEntity httpEntity) throws IOException {
Optional<Object> parsedResponse = Optional.empty();
if (httpEntity != null) {
int actualContentLength = 0;
String json = EntityUtils.toString(httpEntity);
if (json != null && !json.isEmpty()) {
actualContentLength = json.length();
parsedResponse = Optional.of(JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER));
}
if (restConfig.verifyContentLength() && actualContentLength != httpEntity.getContentLength()) {
throw new IOException(String.format("Stellar REST request to %s returned incorrect or missing content length. " +
"Content length in the response was %d but the actual body content length was %d.",
httpUriRequest.getURI().toString(),
httpEntity.getContentLength(),
actualContentLength));
}
}
return parsedResponse;
}
/**
* Only used for testing.
* @param httpClient
*/
protected static void setCloseableHttpClient(CloseableHttpClient httpClient) {
closeableHttpClient = httpClient;
}
/**
* Only used for testing.
* @param executorService
*/
protected static void setScheduledExecutorService(ScheduledExecutorService executorService) {
scheduledExecutorService = executorService;
}
}