blob: 354322a497547c5bac834bbf631a673986e62bfa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.stellar.dsl.functions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.stellar.common.utils.JSONUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.Stellar;
import org.apache.metron.stellar.dsl.StellarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
import static org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_DEFAULT_MAX_PER_RUOTE;
import static org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_MAX_TOTAL;
import static org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
/**
* Defines functions that enable REST requests with proper result and error handling. Depends on an
* Apache HttpComponents client being supplied as a Stellar HTTP_CLIENT capability. Exposes various Http settings
* including authentication, proxy and timeouts through the global config with the option to override any settings
* through a config object supplied in the expression.
*/
public class RestFunctions {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Get an argument from a list of arguments.
*
* @param index The index within the list of arguments.
* @param clazz The type expected.
* @param args All of the arguments.
* @param <T> The type of the argument expected.
*/
public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
if(index >= args.size()) {
throw new IllegalArgumentException(format("Expected at least %d argument(s), found %d", index+1, args.size()));
}
return ConversionUtils.convert(args.get(index), clazz);
}
@Stellar(
namespace = "REST",
name = "GET",
description = "Performs a REST GET request and parses the JSON results into a map.",
params = {
"url - URI to the REST service",
"rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
"of the same name. Default is the empty Map, meaning no overrides."
},
returns = "JSON results as a Map")
public static class RestGet implements StellarFunction {
/**
* Whether the function has been initialized.
*/
private boolean initialized = false;
/**
* The CloseableHttpClient.
*/
private CloseableHttpClient httpClient;
/**
* Executor used to impose a hard request timeout.
*/
private ScheduledExecutorService scheduledExecutorService;
/**
* Initialize the function by creating a ScheduledExecutorService and looking up the CloseableHttpClient from the
* Stellar context.
* @param context
*/
@Override
public void initialize(Context context) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
httpClient = getHttpClient(context);
initialized = true;
}
@Override
public boolean isInitialized() {
return initialized;
}
/**
* Apply the function.
* @param args The function arguments including uri and rest config.
* @param context Stellar context
*/
@Override
public Object apply(List<Object> args, Context context) throws ParseException {
RestConfig restConfig = new RestConfig();
try {
URI uri = new URI(getArg(0, String.class, args));
restConfig = getRestConfig(args, getGlobalConfig(context));
HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
Optional<HttpHost> proxy = getProxy(restConfig);
HttpClientContext httpClientContext = getHttpClientContext(restConfig, target, proxy);
HttpGet httpGet = new HttpGet(uri);
httpGet.addHeader("Accept", "application/json");
httpGet.setConfig(getRequestConfig(restConfig, proxy));
return doGet(restConfig, httpGet, httpClientContext);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
return restConfig.getErrorValueOverride();
}
}
@Override
public void close() throws IOException {
if (httpClient != null) {
httpClient.close();
}
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdown();
}
}
/**
* Retrieves the ClosableHttpClient from a pooling connection manager.
*
* @param context The execution context.
* @return A ClosableHttpClient.
*/
protected CloseableHttpClient getHttpClient(Context context) {
RestConfig restConfig = getRestConfig(Collections.emptyList(), getGlobalConfig(context));
PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
return HttpClients.custom()
.setConnectionManager(cm)
.build();
}
protected PoolingHttpClientConnectionManager getConnectionManager(RestConfig restConfig) {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
cm.setMaxTotal(restConfig.getPoolingMaxTotal());
}
if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
}
return cm;
}
/**
* Only used for testing.
* @param httpClient
*/
protected void setHttpClient(CloseableHttpClient httpClient) {
this.httpClient = httpClient;
}
/**
* Perform the HttpClient get and handle the results. A configurable list of status codes are accepted and the
* response content (expected to be json) is parsed into a Map. Values returned on errors and when response content
* is also configurable. The rest config "timeout" setting is imposed in this method and will abort the get request
* if exceeded.
*
* @param restConfig
* @param httpGet
* @param httpClientContext
* @return
* @throws IOException
*/
protected Object doGet(RestConfig restConfig, HttpGet httpGet, HttpClientContext httpClientContext) throws IOException {
// Schedule a command to abort the httpGet request if the timeout is exceeded
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(httpGet::abort, restConfig.getTimeout(), TimeUnit.MILLISECONDS);
CloseableHttpResponse response;
try {
response = httpClient.execute(httpGet, httpClientContext);
} catch(Exception e) {
// Report a timeout if the httpGet request was aborted. Otherwise rethrow exception.
if (httpGet.isAborted()) {
throw new IOException(String.format("Total Stellar REST request time to %s exceeded the configured timeout of %d ms.", httpGet.getURI().toString(), restConfig.getTimeout()));
} else {
throw e;
}
}
// Cancel the future if the request finished within the timeout
if (!scheduledFuture.isDone()) {
scheduledFuture.cancel(true);
}
int statusCode = response.getStatusLine().getStatusCode();
if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
HttpEntity httpEntity = response.getEntity();
// Parse the reponse if present, return the empty value override if not
if (httpEntity != null && httpEntity.getContentLength() > 0) {
String json = EntityUtils.toString(response.getEntity());
return JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER);
}
return restConfig.getEmptyContentOverride();
} else {
throw new IOException(String.format("Stellar REST request to %s expected status code to be one of %s but " +
"failed with http status code %d: %s",
httpGet.getURI().toString(),
restConfig.getResponseCodesAllowed().toString(),
statusCode,
EntityUtils.toString(response.getEntity())));
}
}
private Map<String, Object> getGlobalConfig(Context context) {
Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, false);
return globalCapability.map(o -> (Map<String, Object>) o).orElseGet(HashMap::new);
}
/**
* Build the RestConfig object using the following order of precedence:
* <ul>
* <li>rest config supplied as an expression parameter</li>
* <li>rest config stored in the global config</li>
* <li>default rest config</li>
* </ul>
* Only settings specified in the rest config will override lower priority config settings.
* @param args
* @param globalConfig
* @return
* @throws IOException
*/
protected RestConfig getRestConfig(List<Object> args, Map<String, Object> globalConfig) {
Map<String, Object> globalRestConfig = (Map<String, Object>) globalConfig.get(STELLAR_REST_SETTINGS);
Map<String, Object> functionRestConfig = null;
if (args.size() > 1) {
functionRestConfig = getArg(1, Map.class, args);
}
// Add settings in order of precedence
RestConfig restConfig = new RestConfig();
if (globalRestConfig != null) {
restConfig.putAll(globalRestConfig);
}
if (functionRestConfig != null) {
restConfig.putAll(functionRestConfig);
}
return restConfig;
}
/**
* Returns the proxy HttpHost object if the proxy rest config settings are set.
* @param restConfig
* @return
*/
protected Optional<HttpHost> getProxy(RestConfig restConfig) {
Optional<HttpHost> proxy = Optional.empty();
if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != null) {
proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), restConfig.getProxyPort(), "http"));
}
return proxy;
}
/**
* Builds the RequestConfig object by setting HttpClient settings defined in the rest config.
* @param restConfig
* @param proxy
* @return
*/
protected RequestConfig getRequestConfig(RestConfig restConfig, Optional<HttpHost> proxy) {
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
if (restConfig.getConnectTimeout() != null) {
requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
}
if (restConfig.getConnectionRequestTimeout() != null) {
requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
}
if (restConfig.getSocketTimeout() != null) {
requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
}
proxy.ifPresent(requestConfigBuilder::setProxy);
return requestConfigBuilder.build();
}
/**
* Builds the HttpClientContext object by setting the basic auth and/or proxy basic auth credentials when the
* necessary rest config settings are configured. Passwords are stored in HDFS.
* @param restConfig
* @param target
* @param proxy
* @return
* @throws IOException
*/
protected HttpClientContext getHttpClientContext(RestConfig restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
HttpClientContext httpClientContext = HttpClientContext.create();
boolean credentialsAdded = false;
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// Add the basic auth credentials if the rest config settings are present
if (restConfig.getBasicAuthUser() != null && restConfig.getBasicAuthPasswordPath() != null) {
String password = new String(readBytes(new Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
credentialsProvider.setCredentials(
new AuthScope(target),
new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), password));
credentialsAdded = true;
}
// Add the proxy basic auth credentials if the rest config settings are present
if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
restConfig.getProxyBasicAuthPasswordPath() != null) {
String password = new String(readBytes(new Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
credentialsProvider.setCredentials(
new AuthScope(proxy.get()),
new UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
credentialsAdded = true;
}
if (credentialsAdded) {
httpClientContext.setCredentialsProvider(credentialsProvider);
}
return httpClientContext;
}
/**
* Read bytes from a HDFS path.
* @param inPath
* @return
* @throws IOException
*/
private byte[] readBytes(Path inPath) throws IOException {
FileSystem fs = FileSystem.get(inPath.toUri(), new Configuration());
try (FSDataInputStream inputStream = fs.open(inPath)) {
return IOUtils.toByteArray(inputStream);
}
}
}
}