METRON-2148 Stellar REST POST function (merrimanr) closes apache/metron#1440
diff --git a/metron-stellar/stellar-common/README.md b/metron-stellar/stellar-common/README.md
index 5e48b1c..9f1634b 100644
--- a/metron-stellar/stellar-common/README.md
+++ b/metron-stellar/stellar-common/README.md
@@ -963,6 +963,16 @@
   * Input:
     * url - URI to the REST service
     * rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter of the same name. Default is the empty Map, meaning no overrides.
+    * query_parameters - Optional - Map (in curly braces) of name:value pairs that will be added to the request as query parameters
+  * Returns: JSON results as a Map
+  
+### `REST_POST`
+  * Description: Performs a REST POST request and parses the JSON results into a map.
+  * Input:
+    * url - URI to the REST service
+    * post_data - POST data that will be sent in the POST request.  Must be well-formed JSON unless the 'enforce.json' property is set to false.
+    * rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter of the same name. Default is the empty Map, meaning no overrides.
+    * query_parameters - Optional - Map (in curly braces) of name:value pairs that will be added to the request as query parameters
   * Returns: JSON results as a Map
 
 ### `ROUND`
@@ -1649,15 +1659,64 @@
 
 ## Stellar REST Client
 
-Stellar provides a REST Client with the `REST_GET` function.  This function depends on the Apache HttComponents library for
-executing Http requests.  The syntax is:
+Stellar provides a REST Client with the `REST_GET` and `REST_POST` functions.  This function depends on the Apache HttComponents library for
+executing Http requests.  
+
+### REST GET Syntax
+The REST_GET function requires a URI along with an optional configuration and an optional map of query parameters.  The syntax is:
 ```
-REST_GET( uri , optional config )
+REST_GET( uri , optional config , optional query parameters )
+```
+
+### REST POST Syntax
+The REST_POST function requires a URI and POST data along with an optional configuration and an optional map of query parameters.  The syntax is:
+```
+REST_POST( uri , data, optional config , optional query parameters )
 ```
 
 ### Configuration
 
-The second argument is an optional Map of settings.  The following settings are available:
+Stellar REST functions can be configured several different ways.  Sensible defaults are set for applicable settings with the option to override settings at different levels.
+For REST_GET, configuration settings are applied in this order (last has highest priority):
+1. Default settings
+2. Settings stored in the Global Config for all Stellar REST functions
+3. Settings stored in the Global Config for all Stellar REST_GET calls
+4. Settings passed into the function call as an argument
+
+For REST_POST, configuration settings are applied in this order (last has highest priority):
+1. Default settings
+2. Settings stored in the Global Config for all Stellar REST functions
+3. Settings stored in the Global Config for all Stellar REST_POST calls
+4. Settings passed into the function call as an argument
+
+For example, assume the Global Config is set to:
+```
+{
+  "stellar.rest.settings": {
+    "proxy.basic.auth.user": "global_proxy_user",
+    "basic.auth.user": "global_user",
+    "empty.content.override": "global content override"
+  },
+  "stellar.rest.get.settings": {
+    "basic.auth.user": "rest_get_user",
+    "empty.content.override": "rest get content override"
+  }
+}
+```
+and the function call is:
+```
+REST_GET('some uri', { "empty.content.override": "function config override" } )
+```
+After the various settings are applied in order of priority, the final configuration is:
+```
+{
+  "proxy.basic.auth.user": "global_proxy_user",
+  "basic.auth.user": "rest_get_user",
+  "empty.content.override": "function config override"
+}
+```
+
+The following is a list of settings that are available:
 
 * basic.auth.user - User name for basic authentication.
 * basic.auth.password.path - Path to the basic authentication password file stored in HDFS.
@@ -1665,31 +1724,19 @@
 * proxy.port - Proxy port.
 * proxy.basic.auth.user - User name for proxy basic authentication.
 * proxy.basic.auth.password.path - Path to the proxy basic authentication password file stored in HDFS.
-* timeout - Stellar enforced hard timeout for the total request time. Defaults to 1000 ms.  HttpClient timeouts alone are insufficient to guarantee the hard timeout.
+* timeout - Stellar enforced hard timeout (in milliseconds) for the total request time. HttpClient timeouts alone are insufficient to guarantee the hard timeout. (Defaults to `1000`)
 * connect.timeout - Connect timeout exposed by the HttpClient object.
 * connection.request.timeout - Connection request timeout exposed by the HttpClient object.
 * socket.timeout - Socket timeout exposed by the HttpClient object.
-* response.codes.allowed - A list of response codes that are allowed.  All others will be treated as errors.  Defaults to `200`.
-* empty.content.override - The default value that will be returned on a successful request with empty content.  Defaults to null.
-* error.value.override - The default value that will be returned on an error.  Defaults to null.
+* response.codes.allowed - A list of response codes that are allowed.  All others will be treated as errors.  (Defaults to `200`)
+* empty.content.override - The default value that will be returned on a successful request with empty content.  (Defaults to null)
+* error.value.override - The default value that will be returned on an error.  (Defaults to null)
 * pooling.max.total - The maximum number of connections in the connection pool.
 * pooling.default.max.per.route - The default maximum number of connections per route in the connection pool.
+* verify.content.length - Setting this to true will verify the actual body content length equals the content length header. (Defaults to false)
+* enforce.json - Setting this to true will verify POST data is well-formed JSON. (Defaults to true)
 
-This Map of settings can also be stored in the global config `stellar.rest.settings` property.  For example, to configure basic authentication
-settings you would add this property to the global config:
-
-```
-{
-  "stellar.rest.settings": {
-    "basic.auth.user": "user",
-    "basic.auth.password.path": "/password/path"
-  }
-}
-```
-
-Any settings passed into the expression will take precedence over the global config settings.  The global config settings will take precedence over the defaults.
-
-For security purposes, passwords are read from a file in HDFS.  Passwords are read as is including any new lines or spaces. Be careful not to include these in the file unless they are specifically part of the password.
+For security purposes, all passwords are read from a file in HDFS.  Passwords are read as is including any new lines or spaces. Be careful not to include these in the file unless they are specifically part of the password.
 
 ### Security
 
@@ -1719,6 +1766,11 @@
 {args={}, headers={Accept=application/json, Accept-Encoding=gzip,deflate, Cache-Control=max-age=259200, Connection=close, Host=httpbin.org, User-Agent=Apache-HttpClient/4.3.2 (java 1.5)}, origin=127.0.0.1, 136.62.241.236, url=http://httpbin.org/get}
 ```
 
+Perform a POST request with additional query parameters:
+```
+
+```
+
 ### Latency
 
 Performing a REST request will introduce latency in a streaming pipeline.  Therefore this function should only be used for low volume telemetries that are unlikely to be
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
index 9fb1c3f..90620a8 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/JSONUtils.java
@@ -116,6 +116,10 @@
    * Transforms a bean (aka POJO) to a JSONObject.
    */
   public JSONObject toJSONObject(Object o) throws JsonProcessingException, ParseException {
-    return (JSONObject) _parser.get().parse(toJSON(o, false));
+    return toJSONObject(toJSON(o, false));
+  }
+
+  public JSONObject toJSONObject(String json) throws ParseException {
+    return (JSONObject) _parser.get().parse(json);
   }
 }
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
index 610717e..a32faec 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestConfig.java
@@ -20,6 +20,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A Map containing the Stellar REST settings.
@@ -32,6 +33,16 @@
   public final static String STELLAR_REST_SETTINGS = "stellar.rest.settings";
 
   /**
+   * A global config prefix used for storing Stellar REST GET settings.
+   */
+  public final static String STELLAR_REST_GET_SETTINGS = "stellar.rest.get.settings";
+
+  /**
+   * A global config prefix used for storing Stellar REST POST settings.
+   */
+  public final static String STELLAR_REST_POST_SETTINGS = "stellar.rest.post.settings";
+
+  /**
    * User name for basic authentication.
    */
   public final static String BASIC_AUTH_USER = "basic.auth.user";
@@ -99,14 +110,20 @@
   public final static String POOLING_DEFAULT_MAX_PER_RUOTE = "pooling.default.max.per.route";
 
   /**
-   * Setting this to true will verify the actual body content length equals the content length header
+   * Setting this to true will verify the actual body content length equals the content length header.
    */
   public final static String VERIFY_CONTENT_LENGTH = "verify.content.length";
 
+  /**
+   * Setting this to true will verify POST data is well-formed JSON.
+   */
+  public final static String ENFORCE_JSON = "enforce.json";
+
   public RestConfig() {
     put(TIMEOUT, 1000);
     put(RESPONSE_CODES_ALLOWED, Collections.singletonList(200));
     put(VERIFY_CONTENT_LENGTH, false);
+    put(ENFORCE_JSON, true);
   }
 
   public String getBasicAuthUser() {
@@ -173,4 +190,8 @@
   public Boolean verifyContentLength() {
     return (Boolean) get(VERIFY_CONTENT_LENGTH);
   }
+
+  public Boolean enforceJson() {
+    return (Boolean) get(ENFORCE_JSON);
+  }
 }
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
index d6b03ce..5058527 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.stellar.dsl.functions;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -28,9 +29,10 @@
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.*;
 import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
@@ -46,11 +48,11 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -62,9 +64,7 @@
 
 import static java.lang.String.format;
 import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_DEFAULT_MAX_PER_RUOTE;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_MAX_TOTAL;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.*;
 
 /**
  * Defines functions that enable REST requests with proper result and error handling.  Depends on an
@@ -77,6 +77,250 @@
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   /**
+   * The CloseableHttpClient.
+   */
+  private static CloseableHttpClient closeableHttpClient;
+
+  /**
+   * Executor used to impose a hard request timeout.
+   */
+  private static ScheduledExecutorService scheduledExecutorService;
+
+  /**
+   * Initialize a single HttpClient to be shared by REST functions.
+   * @param context
+   */
+  private static synchronized void initializeHttpClient(Context context) {
+    if (closeableHttpClient == null) {
+      closeableHttpClient = getHttpClient(context);
+    }
+  }
+
+  /**
+   * Close the shared HttpClient.
+   */
+  private static synchronized void closeHttpClient() throws IOException {
+    if (closeableHttpClient != null) {
+      closeableHttpClient.close();
+      closeableHttpClient = null;
+    }
+  }
+
+  /**
+   * Initialize a single ExecutorService to be shared by REST functions.
+   */
+  private static synchronized void initializeExecutorService() {
+    if (scheduledExecutorService == null) {
+      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    }
+  }
+
+  /**
+   * Shutdown the shared ExecutorService.
+   */
+  private static synchronized void closeExecutorService() {
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdown();
+      scheduledExecutorService = null;
+    }
+  }
+
+  @Stellar(
+          namespace = "REST",
+          name = "GET",
+          description = "Performs a REST GET request and parses the JSON results into a map.",
+          params = {
+                  "url - URI to the REST service",
+                  "rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
+                          "of the same name. Default is the empty Map, meaning no overrides.",
+                  "query_parameters - Optional - Map (in curly braces) of name:value pairs that will be added to the request as query parameters"
+          },
+          returns = "JSON results as a Map")
+  public static class RestGet implements StellarFunction {
+
+    /**
+     * Whether the function has been initialized.
+     */
+    private boolean initialized = false;
+
+    /**
+     * Initialize the function by creating a ScheduledExecutorService and looking up the CloseableHttpClient from the
+     * Stellar context.
+     * @param context
+     */
+    @Override
+    public void initialize(Context context) {
+      initializeExecutorService();
+      initializeHttpClient(context);
+      initialized = true;
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return initialized;
+    }
+
+    /**
+     * Apply the function.
+     * @param args The function arguments including uri and rest config.
+     * @param context Stellar context
+     */
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      String uriString = getArg(0, String.class, args);
+      Map<String, Object> functionRestConfig = null;
+      Map<String, Object> queryParameters = new HashMap<>();
+      if (args.size() > 1) {
+        functionRestConfig = getArg(1, Map.class, args);
+        if (args.size() == 3) {
+          queryParameters = getArg(2, Map.class, args);
+        }
+      }
+
+      // Build the RestConfig by applying settins in order of precedence
+      Map<String, Object> globalRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_SETTINGS);
+      Map<String, Object> getRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_GET_SETTINGS);
+      RestConfig restConfig = buildRestConfig(globalRestConfig, getRestConfig, functionRestConfig);
+
+      try {
+        HttpGet httpGet = buildGetRequest(uriString, queryParameters);
+        return executeRequest(restConfig, httpGet);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage(), e);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return restConfig.getErrorValueOverride();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeHttpClient();
+      closeExecutorService();
+    }
+
+    private HttpGet buildGetRequest(String uri, Map<String, Object> queryParameters) throws URISyntaxException {
+      HttpGet httpGet = new HttpGet(getURI(uri, queryParameters));
+      httpGet.addHeader("Accept", "application/json");
+
+      return httpGet;
+    }
+  }
+
+  @Stellar(
+          namespace = "REST",
+          name = "POST",
+          description = "Performs a REST POST request and parses the JSON results into a map.",
+          params = {
+                  "url - URI to the REST service",
+                  "post_data - POST data that will be sent in the POST request.  Must be well-formed JSON unless the 'enforce.json' property is set to false.",
+                  "rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
+                          "of the same name. Default is the empty Map, meaning no overrides.",
+                  "query_parameters - Optional - Map (in curly braces) of name:value pairs that will be added to the request as query parameters"
+
+          },
+          returns = "JSON results as a Map")
+  public static class RestPost implements StellarFunction {
+
+    /**
+     * Whether the function has been initialized.
+     */
+    private boolean initialized = false;
+
+    /**
+     * Initialize the function by creating a ScheduledExecutorService and looking up the CloseableHttpClient from the
+     * Stellar context.
+     * @param context
+     */
+    @Override
+    public void initialize(Context context) {
+      initializeExecutorService();
+      initializeHttpClient(context);
+      initialized = true;
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return initialized;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      String uriString = getArg(0, String.class, args);
+      Object dataObject = getArg(1, Object.class, args);
+      Map<String, Object> functionRestConfig = null;
+      Map<String, Object> queryParameters = new HashMap<>();
+      if (args.size() > 2) {
+        functionRestConfig = getArg(2, Map.class, args);
+        if (args.size() == 4) {
+          queryParameters = getArg(3, Map.class, args);
+        }
+      }
+
+      // Build the RestConfig by applying settins in order of precedence
+      Map<String, Object> globalRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_SETTINGS);
+      Map<String, Object> postRestConfig = (Map<String, Object>) getGlobalConfig(context).get(STELLAR_REST_POST_SETTINGS);
+      RestConfig restConfig = buildRestConfig(globalRestConfig, postRestConfig, functionRestConfig);
+
+      try {
+        HttpPost httpPost = buildPostRequest(restConfig, uriString, dataObject, queryParameters);
+        return executeRequest(restConfig, httpPost);
+      } catch (URISyntaxException e) {
+        throw new IllegalArgumentException(e.getMessage(), e);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        return restConfig.getErrorValueOverride();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      closeHttpClient();
+      closeExecutorService();
+    }
+
+    private HttpPost buildPostRequest(RestConfig restConfig, String uriString, Object dataObject, Map<String, Object> queryParameters) throws JsonProcessingException, URISyntaxException, UnsupportedEncodingException {
+      String body = getPostData(restConfig, dataObject);
+
+      URI uri = getURI(uriString, queryParameters);
+      HttpPost httpPost = new HttpPost(uri);
+      httpPost.setEntity(new StringEntity(body));
+      httpPost.addHeader("Accept", "application/json");
+      httpPost.addHeader("Content-type", "application/json");
+
+      return httpPost;
+    }
+
+    /**
+     * Serializes the supplied POST data to be sent in the POST request.  Checks for well-formed JSON by default unless 'enforce.json' is set to false.
+     * @param restConfig RestConfig
+     * @param arg POST data
+     * @return Serialized POST data
+     * @throws JsonProcessingException
+     */
+    private String getPostData(RestConfig restConfig, Object arg) throws JsonProcessingException {
+      String data = "";
+      if (arg == null) {
+        return data;
+      }
+      if (arg instanceof Map) {
+        data = JSONUtils.INSTANCE.toJSON(arg, false);
+      } else {
+        data = arg.toString();
+        if (restConfig.enforceJson()) {
+          try {
+            JSONUtils.INSTANCE.toJSONObject(data);
+          } catch (org.json.simple.parser.ParseException e) {
+            throw new IllegalArgumentException(String.format("POST data '%s' must be properly formatted JSON.  " +
+                    "Set the '%s' property to false to disable this check.", data, RestConfig.ENFORCE_JSON));
+          }
+        }
+      }
+      return data;
+    }
+  }
+
+  /**
    * Get an argument from a list of arguments.
    *
    * @param index The index within the list of arguments.
@@ -93,316 +337,257 @@
     return ConversionUtils.convert(args.get(index), clazz);
   }
 
-  @Stellar(
-          namespace = "REST",
-          name = "GET",
-          description = "Performs a REST GET request and parses the JSON results into a map.",
-          params = {
-                  "url - URI to the REST service",
-                  "rest_config - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
-                          "of the same name. Default is the empty Map, meaning no overrides."
-          },
-          returns = "JSON results as a Map")
-  public static class RestGet implements StellarFunction {
+  /**
+   * Retrieves the ClosableHttpClient from a pooling connection manager.
+   *
+   * @param context The execution context.
+   * @return A ClosableHttpClient.
+   */
+  protected static CloseableHttpClient getHttpClient(Context context) {
+    RestConfig restConfig = buildRestConfig(getGlobalConfig(context));
 
-    /**
-     * Whether the function has been initialized.
-     */
-    private boolean initialized = false;
+    PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
 
-    /**
-     * The CloseableHttpClient.
-     */
-    private CloseableHttpClient httpClient;
+    return HttpClients.custom()
+            .setConnectionManager(cm)
+            .build();
+  }
 
-    /**
-     * Executor used to impose a hard request timeout.
-     */
-    private ScheduledExecutorService scheduledExecutorService;
-
-    /**
-     * Initialize the function by creating a ScheduledExecutorService and looking up the CloseableHttpClient from the
-     * Stellar context.
-     * @param context
-     */
-    @Override
-    public void initialize(Context context) {
-      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
-      httpClient = getHttpClient(context);
-      initialized = true;
+  protected static PoolingHttpClientConnectionManager getConnectionManager(RestConfig restConfig) {
+    PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
+    if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
+      cm.setMaxTotal(restConfig.getPoolingMaxTotal());
     }
-
-    @Override
-    public boolean isInitialized() {
-      return initialized;
+    if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
+      cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
     }
+    return cm;
+  }
 
-    /**
-     * Apply the function.
-     * @param args The function arguments including uri and rest config.
-     * @param context Stellar context
-     */
-    @Override
-    public Object apply(List<Object> args, Context context) throws ParseException {
-      RestConfig restConfig = new RestConfig();
-      try {
-        URI uri = new URI(getArg(0, String.class, args));
-        restConfig = getRestConfig(args, getGlobalConfig(context));
+  @SuppressWarnings("unchecked")
+  private static Map<String, Object> getGlobalConfig(Context context) {
+    Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, false);
+    return globalCapability.map(o -> (Map<String, Object>) o).orElseGet(HashMap::new);
+  }
 
-        HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
-        Optional<HttpHost> proxy = getProxy(restConfig);
-        HttpClientContext httpClientContext = getHttpClientContext(restConfig, target, proxy);
+  /**
+   * Build the RestConfig by applying settings in order of precedence (last item in the input list has highest priority).
+   * Only settings specified in the rest config will override lower priority config settings.
+   * @param configs
+   * @return
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  protected static RestConfig buildRestConfig(Map<String, Object>... configs) {
+    RestConfig restConfig = new RestConfig();
 
-        HttpGet httpGet = new HttpGet(uri);
-        httpGet.addHeader("Accept", "application/json");
-        httpGet.setConfig(getRequestConfig(restConfig, proxy));
-
-        return doGet(restConfig, httpGet, httpClientContext);
-      } catch (URISyntaxException e) {
-        throw new IllegalArgumentException(e.getMessage(), e);
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-        return restConfig.getErrorValueOverride();
+    // Add settings in order of precedence
+    for(Map<String, Object> config: configs) {
+      if (config != null) {
+        restConfig.putAll(config);
       }
     }
+    return restConfig;
+  }
 
-    @Override
-    public void close() throws IOException {
-      if (httpClient != null) {
-        httpClient.close();
-      }
-      if (scheduledExecutorService != null) {
-        scheduledExecutorService.shutdown();
+  /**
+   * Builds a URI from the supplied URI string and adds query parameters.
+   * @param uriString
+   * @param queryParameters
+   * @return
+   * @throws URISyntaxException
+   */
+  private static URI getURI(String uriString, Map<String, Object> queryParameters) throws URISyntaxException {
+    URIBuilder uriBuilder = new URIBuilder(uriString);
+    if (queryParameters != null) {
+      for(Map.Entry<String, Object> entry: queryParameters.entrySet()) {
+        uriBuilder.setParameter(entry.getKey(), (String) entry.getValue());
       }
     }
+    return uriBuilder.build();
+  }
 
-    /**
-     * Retrieves the ClosableHttpClient from a pooling connection manager.
-     *
-     * @param context The execution context.
-     * @return A ClosableHttpClient.
-     */
-    protected CloseableHttpClient getHttpClient(Context context) {
-      RestConfig restConfig = getRestConfig(Collections.emptyList(), getGlobalConfig(context));
+  /**
+   * Returns the proxy HttpHost object if the proxy rest config settings are set.
+   * @param restConfig
+   * @return
+   */
+  protected static Optional<HttpHost> getProxy(RestConfig restConfig) {
+    Optional<HttpHost> proxy = Optional.empty();
+    if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != null) {
+      proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), restConfig.getProxyPort(), "http"));
+    }
+    return proxy;
+  }
 
-      PoolingHttpClientConnectionManager cm = getConnectionManager(restConfig);
-
-      return HttpClients.custom()
-              .setConnectionManager(cm)
-              .build();
+  /**
+   * Builds the RequestConfig object by setting HttpClient settings defined in the rest config.
+   * @param restConfig
+   * @param proxy
+   * @return
+   */
+  protected static RequestConfig getRequestConfig(RestConfig restConfig, Optional<HttpHost> proxy) {
+    RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+    if (restConfig.getConnectTimeout() != null) {
+      requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
+    }
+    if (restConfig.getConnectionRequestTimeout() != null) {
+      requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
+    }
+    if (restConfig.getSocketTimeout() != null) {
+      requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
     }
 
-    protected PoolingHttpClientConnectionManager getConnectionManager(RestConfig restConfig) {
-      PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
-      if (restConfig.containsKey(POOLING_MAX_TOTAL)) {
-        cm.setMaxTotal(restConfig.getPoolingMaxTotal());
-      }
-      if (restConfig.containsKey(POOLING_DEFAULT_MAX_PER_RUOTE)) {
-        cm.setDefaultMaxPerRoute(restConfig.getPoolingDefaultMaxPerRoute());
-      }
-      return cm;
+    proxy.ifPresent(requestConfigBuilder::setProxy);
+    return requestConfigBuilder.build();
+  }
+
+  /**
+   * Builds the HttpClientContext object by setting the basic auth and/or proxy basic auth credentials when the
+   * necessary rest config settings are configured.  Passwords are stored in HDFS.
+   * @param restConfig
+   * @param target
+   * @param proxy
+   * @return
+   * @throws IOException
+   */
+  protected static HttpClientContext getHttpClientContext(RestConfig restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
+    HttpClientContext httpClientContext = HttpClientContext.create();
+    boolean credentialsAdded = false;
+    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+
+    // Add the basic auth credentials if the rest config settings are present
+    if (restConfig.getBasicAuthUser() != null && restConfig.getBasicAuthPasswordPath() != null) {
+      String password = new String(readBytes(new Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
+      credentialsProvider.setCredentials(
+              new AuthScope(target),
+              new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), password));
+      credentialsAdded = true;
     }
 
-    /**
-     * Only used for testing.
-     * @param httpClient
-     */
-    protected void setHttpClient(CloseableHttpClient httpClient) {
-      this.httpClient = httpClient;
+    // Add the proxy basic auth credentials if the rest config settings are present
+    if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
+            restConfig.getProxyBasicAuthPasswordPath() != null) {
+      String password = new String(readBytes(new Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
+      credentialsProvider.setCredentials(
+              new AuthScope(proxy.get()),
+              new UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
+      credentialsAdded = true;
     }
+    if (credentialsAdded) {
+      httpClientContext.setCredentialsProvider(credentialsProvider);
+    }
+    return httpClientContext;
+  }
 
-    /**
-     * Perform the HttpClient get and handle the results.  A configurable list of status codes are accepted and the
-     * response content (expected to be json) is parsed into a Map.  Values returned on errors and when response content
-     * is also configurable.  The rest config "timeout" setting is imposed in this method and will abort the get request
-     * if exceeded.
-     *
-     * @param restConfig
-     * @param httpGet
-     * @param httpClientContext
-     * @return
-     * @throws IOException
-     */
-    protected Object doGet(RestConfig restConfig, HttpGet httpGet, HttpClientContext httpClientContext) throws IOException {
+  /**
+   * Read bytes from a HDFS path.
+   * @param inPath
+   * @return
+   * @throws IOException
+   */
+  private static byte[] readBytes(Path inPath) throws IOException {
+    FileSystem fs = FileSystem.get(inPath.toUri(), new Configuration());
+    try (FSDataInputStream inputStream = fs.open(inPath)) {
+      return IOUtils.toByteArray(inputStream);
+    }
+  }
 
-      // Schedule a command to abort the httpGet request if the timeout is exceeded
-      ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(httpGet::abort, restConfig.getTimeout(), TimeUnit.MILLISECONDS);
-      CloseableHttpResponse response;
-      try {
-        response = httpClient.execute(httpGet, httpClientContext);
-      } catch(Exception e) {
-        // Report a timeout if the httpGet request was aborted.  Otherwise rethrow exception.
-        if (httpGet.isAborted()) {
-          throw new IOException(String.format("Total Stellar REST request time to %s exceeded the configured timeout of %d ms.", httpGet.getURI().toString(), restConfig.getTimeout()));
-        } else {
-          throw e;
-        }
-      }
+  /**
+   * Perform the HttpClient request and handle the results.  A configurable list of status codes are accepted and the
+   * response content (expected to be json) is parsed into a Map.  Values returned on errors and when response content
+   * is also configurable.  The rest config "timeout" setting is imposed in this method and will abort the get request
+   * if exceeded.
+   *
+   * @param restConfig
+   * @param httpRequestBase
+   * @return
+   * @throws IOException
+   */
+  protected static Object executeRequest(RestConfig restConfig, HttpRequestBase httpRequestBase) throws IOException {
+    URI uri = httpRequestBase.getURI();
+    HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
+    Optional<HttpHost> proxy = getProxy(restConfig);
+    HttpClientContext httpClientContext = getHttpClientContext(restConfig, target, proxy);
+    httpRequestBase.setConfig(getRequestConfig(restConfig, proxy));
 
-      // Cancel the future if the request finished within the timeout
-      if (!scheduledFuture.isDone()) {
-        scheduledFuture.cancel(true);
-      }
-      int statusCode = response.getStatusLine().getStatusCode();
-      LOG.debug("request = {}; response = {}", httpGet, response);
-      if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
-        HttpEntity httpEntity = response.getEntity();
-
-        // Parse the response if present, return the empty value override if not
-        Optional<Object> parsedResponse = parseResponse(restConfig, httpGet, httpEntity);
-        return parsedResponse.orElseGet(restConfig::getEmptyContentOverride);
+    // Schedule a command to abort the request if the timeout is exceeded
+    ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(httpRequestBase::abort, restConfig.getTimeout(), TimeUnit.MILLISECONDS);
+    CloseableHttpResponse response;
+    try {
+      response = closeableHttpClient.execute(httpRequestBase, httpClientContext);
+    } catch(Exception e) {
+      // Report a timeout if the httpGet request was aborted.  Otherwise rethrow exception.
+      if (httpRequestBase.isAborted()) {
+        throw new IOException(String.format("Total Stellar REST request time to %s exceeded the configured timeout of %d ms.", httpRequestBase.getURI().toString(), restConfig.getTimeout()));
       } else {
-        throw new IOException(String.format("Stellar REST request to %s expected status code to be one of %s but " +
-                "failed with http status code %d: %s",
-                httpGet.getURI().toString(),
-                restConfig.getResponseCodesAllowed().toString(),
-                statusCode,
-                EntityUtils.toString(response.getEntity())));
+        throw e;
       }
     }
 
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> getGlobalConfig(Context context) {
-      Optional<Object> globalCapability = context.getCapability(GLOBAL_CONFIG, false);
-      return globalCapability.map(o -> (Map<String, Object>) o).orElseGet(HashMap::new);
+    // Cancel the future if the request finished within the timeout
+    if (!scheduledFuture.isDone()) {
+      scheduledFuture.cancel(true);
     }
+    int statusCode = response.getStatusLine().getStatusCode();
+    LOG.debug("request = {}; response = {}", httpRequestBase, response);
+    if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
+      HttpEntity httpEntity = response.getEntity();
 
-    /**
-     * Build the RestConfig object using the following order of precedence:
-     * <ul>
-     *   <li>rest config supplied as an expression parameter</li>
-     *   <li>rest config stored in the global config</li>
-     *   <li>default rest config</li>
-     * </ul>
-     * Only settings specified in the rest config will override lower priority config settings.
-     * @param args
-     * @param globalConfig
-     * @return
-     * @throws IOException
-     */
-    @SuppressWarnings("unchecked")
-    protected RestConfig getRestConfig(List<Object> args, Map<String, Object> globalConfig) {
-      Map<String, Object> globalRestConfig = (Map<String, Object>) globalConfig.get(STELLAR_REST_SETTINGS);
-      Map<String, Object> functionRestConfig = null;
-      if (args.size() > 1) {
-        functionRestConfig = getArg(1, Map.class, args);
-      }
-
-      // Add settings in order of precedence
-      RestConfig restConfig = new RestConfig();
-      if (globalRestConfig != null) {
-        restConfig.putAll(globalRestConfig);
-      }
-      if (functionRestConfig != null) {
-        restConfig.putAll(functionRestConfig);
-      }
-      return restConfig;
+      // Parse the response if present, return the empty value override if not
+      Optional<Object> parsedResponse = parseResponse(restConfig, httpRequestBase, httpEntity);
+      return parsedResponse.orElseGet(restConfig::getEmptyContentOverride);
+    } else {
+      throw new IOException(String.format("Stellar REST request to %s expected status code to be one of %s but " +
+                      "failed with http status code %d: %s",
+              httpRequestBase.getURI().toString(),
+              restConfig.getResponseCodesAllowed().toString(),
+              statusCode,
+              EntityUtils.toString(response.getEntity())));
     }
+  }
 
-    /**
-     * Returns the proxy HttpHost object if the proxy rest config settings are set.
-     * @param restConfig
-     * @return
-     */
-    protected Optional<HttpHost> getProxy(RestConfig restConfig) {
-      Optional<HttpHost> proxy = Optional.empty();
-      if (restConfig.getProxyHost() != null && restConfig.getProxyPort() != null) {
-        proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), restConfig.getProxyPort(), "http"));
+  /**
+   * Parses the Http response into a Map and checks for content length.
+   * @param restConfig
+   * @param httpUriRequest
+   * @param httpEntity
+   * @return
+   * @throws IOException
+   */
+  protected static Optional<Object> parseResponse(RestConfig restConfig, HttpUriRequest httpUriRequest, HttpEntity httpEntity) throws IOException {
+    Optional<Object> parsedResponse = Optional.empty();
+    if (httpEntity != null) {
+      int actualContentLength = 0;
+      String json = EntityUtils.toString(httpEntity);
+      if (json != null && !json.isEmpty()) {
+        actualContentLength = json.length();
+        parsedResponse = Optional.of(JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER));
       }
-      return proxy;
-    }
-
-    /**
-     * Builds the RequestConfig object by setting HttpClient settings defined in the rest config.
-     * @param restConfig
-     * @param proxy
-     * @return
-     */
-    protected RequestConfig getRequestConfig(RestConfig restConfig, Optional<HttpHost> proxy) {
-      RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
-      if (restConfig.getConnectTimeout() != null) {
-        requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
-      }
-      if (restConfig.getConnectionRequestTimeout() != null) {
-        requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
-      }
-      if (restConfig.getSocketTimeout() != null) {
-        requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
-      }
-
-      proxy.ifPresent(requestConfigBuilder::setProxy);
-      return requestConfigBuilder.build();
-    }
-
-    /**
-     * Builds the HttpClientContext object by setting the basic auth and/or proxy basic auth credentials when the
-     * necessary rest config settings are configured.  Passwords are stored in HDFS.
-     * @param restConfig
-     * @param target
-     * @param proxy
-     * @return
-     * @throws IOException
-     */
-    protected HttpClientContext getHttpClientContext(RestConfig restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
-      HttpClientContext httpClientContext = HttpClientContext.create();
-      boolean credentialsAdded = false;
-      CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-
-      // Add the basic auth credentials if the rest config settings are present
-      if (restConfig.getBasicAuthUser() != null && restConfig.getBasicAuthPasswordPath() != null) {
-        String password = new String(readBytes(new Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8);
-        credentialsProvider.setCredentials(
-                new AuthScope(target),
-                new UsernamePasswordCredentials(restConfig.getBasicAuthUser(), password));
-        credentialsAdded = true;
-      }
-
-      // Add the proxy basic auth credentials if the rest config settings are present
-      if (proxy.isPresent() && restConfig.getProxyBasicAuthUser() != null &&
-              restConfig.getProxyBasicAuthPasswordPath() != null) {
-        String password = new String(readBytes(new Path(restConfig.getProxyBasicAuthPasswordPath())), StandardCharsets.UTF_8);
-        credentialsProvider.setCredentials(
-                new AuthScope(proxy.get()),
-                new UsernamePasswordCredentials(restConfig.getProxyBasicAuthUser(), password));
-        credentialsAdded = true;
-      }
-      if (credentialsAdded) {
-        httpClientContext.setCredentialsProvider(credentialsProvider);
-      }
-      return httpClientContext;
-    }
-
-    protected Optional<Object> parseResponse(RestConfig restConfig, HttpGet httpGet, HttpEntity httpEntity) throws IOException {
-      Optional<Object> parsedResponse = Optional.empty();
-      if (httpEntity != null) {
-        int actualContentLength = 0;
-        String json = EntityUtils.toString(httpEntity);
-        if (json != null && !json.isEmpty()) {
-          actualContentLength = json.length();
-          parsedResponse = Optional.of(JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER));
-        }
-        if (restConfig.verifyContentLength() && actualContentLength != httpEntity.getContentLength()) {
-          throw new IOException(String.format("Stellar REST request to %s returned incorrect or missing content length. " +
-                          "Content length in the response was %d but the actual body content length was %d.",
-                  httpGet.getURI().toString(),
-                  httpEntity.getContentLength(),
-                  actualContentLength));
-        }
-      }
-      return parsedResponse;
-    }
-
-    /**
-     * Read bytes from a HDFS path.
-     * @param inPath
-     * @return
-     * @throws IOException
-     */
-    private byte[] readBytes(Path inPath) throws IOException {
-      FileSystem fs = FileSystem.get(inPath.toUri(), new Configuration());
-      try (FSDataInputStream inputStream = fs.open(inPath)) {
-        return IOUtils.toByteArray(inputStream);
+      if (restConfig.verifyContentLength() && actualContentLength != httpEntity.getContentLength()) {
+        throw new IOException(String.format("Stellar REST request to %s returned incorrect or missing content length. " +
+                        "Content length in the response was %d but the actual body content length was %d.",
+                httpUriRequest.getURI().toString(),
+                httpEntity.getContentLength(),
+                actualContentLength));
       }
     }
+    return parsedResponse;
+  }
+
+  /**
+   * Only used for testing.
+   * @param httpClient
+   */
+  protected static void setCloseableHttpClient(CloseableHttpClient httpClient) {
+    closeableHttpClient = httpClient;
+  }
+
+  /**
+   * Only used for testing.
+   * @param executorService
+   */
+  protected static void setScheduledExecutorService(ScheduledExecutorService executorService) {
+    scheduledExecutorService = executorService;
   }
 }
diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsIntegrationTest.java b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsIntegrationTest.java
new file mode 100644
index 0000000..865453e
--- /dev/null
+++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsIntegrationTest.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.stellar.dsl.functions;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.mockserver.client.server.MockServerClient;
+import org.mockserver.junit.MockServerRule;
+import org.mockserver.junit.ProxyRule;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class RestFunctionsIntegrationTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  @Rule
+  public MockServerRule mockServerRule = new MockServerRule(this);
+
+  @Rule
+  public ProxyRule proxyRule = new ProxyRule(1080, this);
+
+  private MockServerClient mockServerClient;
+  private String baseUri;
+  private String getUri;
+  private String emptyGetUri;
+  private String postUri;
+  private String emptyPostUri;
+  private Context context;
+
+  private File basicAuthPasswordFile;
+  private String basicAuthPassword = "password";
+  private File proxyBasicAuthPasswordFile;
+  private String proxyAuthPassword = "proxyPassword";
+
+  @Before
+  public void setup() throws Exception {
+    context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, HashMap::new)
+            .build();
+
+    // Store the passwords in the local file system
+    basicAuthPasswordFile = tempDir.newFile("basicAuth.txt");
+    FileUtils.writeStringToFile(basicAuthPasswordFile, basicAuthPassword, StandardCharsets.UTF_8);
+    proxyBasicAuthPasswordFile = tempDir.newFile("proxyBasicAuth.txt");
+    FileUtils.writeStringToFile(proxyBasicAuthPasswordFile, proxyAuthPassword, StandardCharsets.UTF_8);
+
+    // By default, the mock server expects a GET request with the path set to /get
+    baseUri = String.format("http://localhost:%d", mockServerRule.getPort());
+    getUri = baseUri + "/get";
+    emptyGetUri = baseUri + "/get/empty";
+    postUri = baseUri + "/post";
+    emptyPostUri = baseUri + "/post/empty";
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"get\":\"success\"}"));
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get/empty"))
+            .respond(response()
+                    .withStatusCode(404));
+
+    mockServerClient.when(
+            request()
+                    .withMethod("POST")
+                    .withPath("/post")
+                    .withBody("{\"key\":\"value\"}"))
+            .respond(response()
+                    .withBody("{\"post\":\"success\"}"));
+    mockServerClient.when(
+            request()
+                    .withMethod("POST")
+                    .withPath("/post/empty"))
+            .respond(response()
+                    .withStatusCode(404));
+  }
+
+  /**
+   * The REST_GET function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldSucceed() throws Exception {
+    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_GET('%s')", getUri), context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("get"));
+  }
+
+  /**
+   * The REST_GET function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldSucceedWithQueryParameters() throws Exception {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get/with/query/parameters")
+                    .withQueryStringParameter("key", "value"))
+            .respond(response()
+                    .withBody("{\"get.with.query.parameters\":\"success\"}"));
+
+    Map<String, Object> variables = ImmutableMap.of("queryParameters", ImmutableMap.of("key", "value"));
+    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_GET('%s', {}, queryParameters)",
+            baseUri + "/get/with/query/parameters"), variables, context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("get.with.query.parameters"));
+  }
+
+  /**
+   * The REST_GET function should perform a get request using a proxy and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldSucceedWithProxy() {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"proxyGet\":\"success\"}"));
+
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> new HashMap<String, Object>() {{
+      put(PROXY_HOST, "localhost");
+      put(PROXY_PORT, proxyRule.getHttpPort());
+    }});
+
+    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_GET('%s')", getUri), context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("proxyGet"));
+  }
+
+  /**
+   * The REST_GET function should handle an error status code and return null by default.
+   */
+  @Test
+  public void restGetShouldHandleErrorStatusCode() {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withStatusCode(403));
+
+    assertNull(run(String.format("REST_GET('%s')", getUri), context));
+  }
+
+  /**
+   * {
+   *   "response.codes.allowed": [200,404],
+   *   "empty.content.override": "function config override"
+   * }
+   */
+  @Multiline
+  private String emptyContentOverride;
+
+  /**
+   * The REST_GET function should return the empty content override setting when status is allowed and content is empty.
+   */
+  @Test
+  public void restGetShouldReturnEmptyContentOverride() {
+    assertEquals("function config override", run(String.format("REST_GET('%s', %s)", emptyGetUri, emptyContentOverride), context));
+  }
+
+  /**
+   * {
+   *   "error.value.override": "error message"
+   * }
+   */
+  @Multiline
+  private String errorValueOverride;
+
+  /**
+   * The REST_GET function should return the error value override setting on error.
+   */
+  @Test
+  public void restGetShouldReturnErrorValueOverride() {
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withStatusCode(500));
+
+    Object result = run(String.format("REST_GET('%s', %s)", getUri, errorValueOverride), context);
+    assertEquals("error message" , result);
+  }
+
+  /**
+   * The REST_GET function should timeout and return null.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldTimeout() {
+    String uri = String.format("http://localhost:%d/get", mockServerRule.getPort());
+
+    mockServerClient.when(
+            request()
+                    .withMethod("GET")
+                    .withPath("/get"))
+            .respond(response()
+                    .withBody("{\"get\":\"success\"}"));
+
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(TIMEOUT, 1);
+      }});
+    }};
+
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
+
+    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_GET('%s')", uri), context);
+    assertNull(actual);
+  }
+
+  /**
+   * {
+   * "timeout": 1
+   * }
+   */
+  @Multiline
+  private String timeoutConfig;
+
+  /**
+   * The REST_GET function should honor the function supplied timeout setting.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restGetShouldTimeoutWithSuppliedTimeout() {
+    String expression = String.format("REST_GET('%s', %s)", getUri, timeoutConfig);
+    Map<String, Object> actual = (Map<String, Object>) run(expression, context);
+    assertNull(actual);
+  }
+
+  /**
+   * The REST_GET function should throw an exception on a malformed uri.
+   * @throws IllegalArgumentException
+   * @throws IOException
+   */
+  @Test
+  public void restGetShouldHandleURISyntaxException() throws IllegalArgumentException, IOException {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Unable to parse REST_GET('some invalid uri'): Unable to parse: REST_GET('some invalid uri') due to: Illegal character in path at index 4: some invalid uri");
+
+    run("REST_GET('some invalid uri')", context);
+  }
+
+
+
+  /**
+   * The REST_GET function should throw an exception when the required uri parameter is missing.
+   */
+  @Test
+  public void restGetShouldThrownExceptionOnMissingParameter() {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Unable to parse REST_GET(): Unable to parse: REST_GET() due to: Expected at least 1 argument(s), found 0");
+
+    run("REST_GET()", context);
+  }
+
+  /**
+   * Global config Stellar REST settings should take precedence over defaults in the REST_GET function.
+   */
+  @Test
+  public void restGetShouldUseGlobalConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
+
+    assertEquals("global config override", run(String.format("REST_GET('%s')", emptyGetUri), context));
+  }
+
+  /**
+   * Global config Stellar REST GET settings should take precedence over general Stellar REST settings in the REST_GET function.
+   */
+  @Test
+  public void restGetShouldUseGetConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+      put(STELLAR_REST_GET_SETTINGS, new HashMap<String, Object>() {{
+        put(EMPTY_CONTENT_OVERRIDE, "get config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
+
+    assertEquals("get config override", run(String.format("REST_GET('%s')", emptyGetUri), context));
+  }
+
+  /**
+   * Settings passed into the function should take precedence over all other settings in the REST_GET function.
+   */
+  @Test
+  public void restGetShouldUseFunctionConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+      put(STELLAR_REST_GET_SETTINGS, new HashMap<String, Object>() {{
+        put(EMPTY_CONTENT_OVERRIDE, "get config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
+
+    assertEquals("function config override", run(String.format("REST_GET('%s', %s)", emptyGetUri, emptyContentOverride), context));
+  }
+
+  /**
+   * The REST_POST function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restPostShouldSucceed() throws Exception {
+    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_POST('%s', '{\"key\":\"value\"}')", postUri), context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("post"));
+  }
+
+  /**
+   * The REST_POST function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restPostShouldSucceedWithQueryParameters() throws Exception {
+    mockServerClient.when(
+            request()
+                    .withMethod("POST")
+                    .withPath("/post/with/query/parameters")
+                    .withQueryStringParameter("key", "value"))
+            .respond(response()
+                    .withBody("{\"post.with.query.parameters\":\"success\"}"));
+
+    Map<String, Object> variables = ImmutableMap.of("queryParameters", ImmutableMap.of("key", "value"));
+    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_POST('%s', {}, {}, queryParameters)",
+            baseUri + "/post/with/query/parameters"), variables, context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("post.with.query.parameters"));
+  }
+
+  /**
+   * The REST_POST function should perform a get request and parse the results.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void restPostShouldSucceedWithStellarMap() throws Exception {
+    Map<String, Object> variables = ImmutableMap.of("body", ImmutableMap.of("key", "value"));
+    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_POST('%s', body)", postUri), variables, context);
+
+    assertEquals(1, actual.size());
+    assertEquals("success", actual.get("post"));
+  }
+
+  /**
+   * The REST_POST function should throw an exception on a malformed uri.
+   * @throws IllegalArgumentException
+   * @throws IOException
+   */
+  @Test
+  public void restPostShouldHandleURISyntaxException() throws IllegalArgumentException, IOException {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Unable to parse REST_POST('some invalid uri', {}): Unable to parse: REST_POST('some invalid uri', {}) due to: Illegal character in path at index 4: some invalid uri");
+
+    run("REST_POST('some invalid uri', {})", context);
+  }
+
+  /**
+   * The REST_POST function should throw an exception when POST data is not well-formed JSON and 'enforce.json' is set to true.
+   * @throws IllegalArgumentException
+   * @throws IOException
+   */
+  @Test
+  public void restPostShouldThrowExceptionOnMalformedJson() throws IllegalArgumentException, IOException {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage(String.format("Unable to parse: REST_POST('%s', 'malformed json') due to: POST data 'malformed json' must be properly formatted JSON.  " +
+            "Set the 'enforce.json' property to false to disable this check.", postUri));
+
+    run(String.format("REST_POST('%s', 'malformed json')", postUri), context);
+  }
+
+  /**
+   * Global config Stellar REST settings should take precedence over defaults in the REST_POST function.
+   */
+  @Test
+  public void restPostShouldUseGlobalConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
+
+    assertEquals("global config override", run(String.format("REST_POST('%s', {})", emptyGetUri), context));
+  }
+
+  /**
+   * Global config Stellar REST POST settings should take precedence over general Stellar REST settings in the REST_POST function.
+   */
+  @Test
+  public void restPostShouldUseGetConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+      put(STELLAR_REST_POST_SETTINGS, new HashMap<String, Object>() {{
+        put(EMPTY_CONTENT_OVERRIDE, "post config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
+
+    assertEquals("post config override", run(String.format("REST_POST('%s', {})", emptyGetUri), context));
+  }
+
+  /**
+   * Settings passed into the function should take precedence over all other settings in the REST_POST function.
+   */
+  @Test
+  public void restPostShouldUseFunctionConfig() {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
+        put(RESPONSE_CODES_ALLOWED, Arrays.asList(200, 404));
+        put(EMPTY_CONTENT_OVERRIDE, "global config override");
+      }});
+      put(STELLAR_REST_POST_SETTINGS, new HashMap<String, Object>() {{
+        put(EMPTY_CONTENT_OVERRIDE, "post config override");
+      }});
+    }};
+    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
+
+    assertEquals("function config override", run(String.format("REST_POST('%s', {}, %s)", emptyGetUri, emptyContentOverride), context));
+  }
+
+}
diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
index 2008a95..a746321 100644
--- a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
+++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/RestFunctionsTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.metron.stellar.dsl.functions;
 
-import org.adrianwalker.multilinestring.Multiline;
 import org.apache.commons.io.FileUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
@@ -26,60 +25,34 @@
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.mockserver.client.server.MockServerClient;
-import org.mockserver.junit.MockServerRule;
-import org.mockserver.junit.ProxyRule;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
 
-import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.BASIC_AUTH_PASSWORD_PATH;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.BASIC_AUTH_USER;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.CONNECTION_REQUEST_TIMEOUT;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.CONNECT_TIMEOUT;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_DEFAULT_MAX_PER_RUOTE;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.POOLING_MAX_TOTAL;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_BASIC_AUTH_PASSWORD_PATH;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_BASIC_AUTH_USER;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_HOST;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.PROXY_PORT;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.SOCKET_TIMEOUT;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.TIMEOUT;
-import static org.apache.metron.stellar.dsl.functions.RestConfig.VERIFY_CONTENT_LENGTH;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.apache.metron.stellar.dsl.functions.RestConfig.*;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-import static org.mockserver.model.HttpRequest.request;
-import static org.mockserver.model.HttpResponse.response;
+import static org.mockito.Mockito.*;
 
 /**
  * Tests the RestFunctions class.
@@ -90,18 +63,13 @@
   public ExpectedException thrown = ExpectedException.none();
 
   @Rule
-  public MockServerRule mockServerRule = new MockServerRule(this);
+  public TemporaryFolder tempDir = new TemporaryFolder();
 
-  @Rule
-  public ProxyRule proxyRule = new ProxyRule(1080, this);
-
-  private MockServerClient mockServerClient;
-  private String getUri;
   private Context context;
 
-  private String basicAuthPasswordPath = "./target/basicAuth.txt";
+  private File basicAuthPasswordFile;
   private String basicAuthPassword = "password";
-  private String proxyBasicAuthPasswordPath = "./target/proxyBasicAuth.txt";
+  private File proxyBasicAuthPasswordFile;
   private String proxyAuthPassword = "proxyPassword";
 
   @Before
@@ -111,116 +79,10 @@
             .build();
 
     // Store the passwords in the local file system
-    FileUtils.writeStringToFile(new File(basicAuthPasswordPath), basicAuthPassword, StandardCharsets.UTF_8);
-    FileUtils.writeStringToFile(new File(proxyBasicAuthPasswordPath), proxyAuthPassword, StandardCharsets.UTF_8);
-
-    // By default, the mock server expects a GET request with the path set to /get
-    getUri = String.format("http://localhost:%d/get", mockServerRule.getPort());
-    mockServerClient.when(
-            request()
-                    .withMethod("GET")
-                    .withPath("/get"))
-            .respond(response()
-                    .withBody("{\"get\":\"success\"}"));
-  }
-
-  /**
-   * The REST_GET function should perform a get request and parse the results.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void restGetShouldSucceed() throws Exception {
-    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_GET('%s')", getUri), context);
-
-    assertEquals(1, actual.size());
-    assertEquals("success", actual.get("get"));
-  }
-
-  /**
-   * The REST_GET function should perform a get request using a proxy and parse the results.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void restGetShouldSucceedWithProxy() {
-    mockServerClient.when(
-            request()
-                    .withMethod("GET")
-                    .withPath("/get"))
-            .respond(response()
-                    .withBody("{\"proxyGet\":\"success\"}"));
-
-    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> new HashMap<String, Object>() {{
-      put(PROXY_HOST, "localhost");
-      put(PROXY_PORT, proxyRule.getHttpPort());
-    }});
-
-    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_GET('%s')", getUri), context);
-
-    assertEquals(1, actual.size());
-    assertEquals("success", actual.get("proxyGet"));
-  }
-
-  /**
-   * The REST_GET function should handle an error status code and return null by default.
-   */
-  @Test
-  public void restGetShouldHandleErrorStatusCode() {
-      mockServerClient.when(
-              request()
-                      .withMethod("GET")
-                      .withPath("/get"))
-              .respond(response()
-                      .withStatusCode(403));
-
-      assertNull(run(String.format("REST_GET('%s')", getUri), context));
-  }
-
-  /**
-   * {
-   *   "response.codes.allowed": [200,404],
-   *   "empty.content.override": {}
-   * }
-   */
-  @Multiline
-  private String emptyContentOverride;
-
-  /**
-   * The REST_GET function should return the empty content override setting when status is allowed and content is empty.
-   */
-  @Test
-  public void restGetShouldReturnEmptyContentOverride() {
-      mockServerClient.when(
-              request()
-                      .withMethod("GET")
-                      .withPath("/get"))
-              .respond(response()
-                      .withStatusCode(404));
-
-    assertEquals(new HashMap<>(), run(String.format("REST_GET('%s', %s)", getUri, emptyContentOverride), context));
-  }
-
-  /**
-   * {
-   *   "error.value.override": "error message"
-   * }
-   */
-  @Multiline
-  private String errorValueOverride;
-
-  /**
-   * The REST_GET function should return the error value override setting on error.
-   */
-  @Test
-  public void restGetShouldReturnErrorValueOverride() {
-    mockServerClient.when(
-            request()
-                    .withMethod("GET")
-                    .withPath("/get"))
-            .respond(response()
-                    .withStatusCode(500));
-
-    Object result = run(String.format("REST_GET('%s', %s)", getUri, errorValueOverride), context);
-    assertEquals("error message" , result);
+    basicAuthPasswordFile = tempDir.newFile("basicAuth.txt");
+    FileUtils.writeStringToFile(basicAuthPasswordFile, basicAuthPassword, StandardCharsets.UTF_8);
+    proxyBasicAuthPasswordFile = tempDir.newFile("proxyBasicAuth.txt");
+    FileUtils.writeStringToFile(proxyBasicAuthPasswordFile, proxyAuthPassword, StandardCharsets.UTF_8);
   }
 
   /**
@@ -228,11 +90,9 @@
    */
   @Test
   public void restGetShouldGetProxy() {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-
     {
       RestConfig restConfig = new RestConfig();
-      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+      Optional<HttpHost> actual = RestFunctions.getProxy(restConfig);
 
       assertEquals(Optional.empty(), actual);
     }
@@ -240,15 +100,15 @@
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(PROXY_HOST, "localhost");
-      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+      Optional<HttpHost> actual = RestFunctions.getProxy(restConfig);
 
       assertEquals(Optional.empty(), actual);
     }
 
     {
       RestConfig restConfig = new RestConfig();
-      restConfig.put(PROXY_PORT, proxyRule.getHttpPort());
-      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+      restConfig.put(PROXY_PORT, 3128);
+      Optional<HttpHost> actual = RestFunctions.getProxy(restConfig);
 
       assertEquals(Optional.empty(), actual);
     }
@@ -256,125 +116,36 @@
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(PROXY_HOST, "localhost");
-      restConfig.put(PROXY_PORT, proxyRule.getHttpPort());
-      Optional<HttpHost> actual = restGet.getProxy(restConfig);
+      restConfig.put(PROXY_PORT, 3128);
+      Optional<HttpHost> actual = RestFunctions.getProxy(restConfig);
 
-      assertEquals(new HttpHost("localhost", proxyRule.getHttpPort()), actual.get());
+      assertEquals(new HttpHost("localhost", 3128), actual.get());
     }
   }
 
   /**
-   * The REST_GET function should return settings in the correct order of precedence.
+   * RestConfig should be built with settings in the correct order of precedence.
    * @throws Exception
    */
   @Test
-  public void restGetShouldGetRestConfig() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-
-    {
-      // Test for default timeout
-      RestConfig restConfig = restGet.getRestConfig(Collections.singletonList("uri"), new HashMap<>());
-
-      assertEquals(3, restConfig.size());
-      assertEquals(1000, restConfig.getTimeout().intValue());
-      assertEquals(Collections.singletonList(200), restConfig.getResponseCodesAllowed());
-      assertNull(restConfig.getBasicAuthUser());
-    }
-
-    Map<String, Object> globalRestConfig = new HashMap<String, Object>() {{
-      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
-        put(SOCKET_TIMEOUT, 2000);
-        put(BASIC_AUTH_USER, "globalUser");
-        put(PROXY_HOST, "globalHost");
-      }});
+  public void restShouldBuildRestConfig() throws Exception {
+    Map<String, Object> config = new HashMap<String, Object>() {{
+      put(BASIC_AUTH_USER, "user");
+      put(PROXY_BASIC_AUTH_USER, "proxyUser");
     }};
 
-    // Global config settings should take effect
-    {
-      RestConfig restConfig = restGet.getRestConfig(Collections.singletonList("uri"), globalRestConfig);
-
-      assertEquals(6, restConfig.size());
-      assertEquals(1000, restConfig.getTimeout().intValue());
-      assertEquals(Collections.singletonList(200), restConfig.getResponseCodesAllowed());
-      assertEquals(2000, restConfig.getSocketTimeout().intValue());
-      assertEquals("globalUser", restConfig.getBasicAuthUser());
-      assertEquals("globalHost", restConfig.getProxyHost());
-    }
-
-    Map<String, Object> functionRestConfig = new HashMap<String, Object>() {{
-      put(SOCKET_TIMEOUT, 1);
-      put(BASIC_AUTH_USER, "functionUser");
-      put(TIMEOUT, 100);
+    Map<String, Object> priorityConfig = new HashMap<String, Object>() {{
+      put(BASIC_AUTH_USER, "priorityUser");
     }};
 
-
-    // Function call settings should override global settings
-    {
-      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", functionRestConfig), globalRestConfig);
-
-      assertEquals(6, restConfig.size());
-      assertEquals(Collections.singletonList(200), restConfig.getResponseCodesAllowed());
-      assertEquals(100, restConfig.getTimeout().intValue());
-      assertEquals(1, restConfig.getSocketTimeout().intValue());
-      assertEquals("functionUser", restConfig.getBasicAuthUser());
-      assertEquals("globalHost", restConfig.getProxyHost());
-    }
-
-    functionRestConfig = new HashMap<String, Object>() {{
-      put(BASIC_AUTH_USER, "functionUser");
-      put(TIMEOUT, 100);
-    }};
-
-    // New function call settings should take effect with global settings staying the same
-    {
-      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", functionRestConfig), globalRestConfig);
-
-      assertEquals(6, restConfig.size());
-      assertEquals(Collections.singletonList(200), restConfig.getResponseCodesAllowed());
-      assertEquals(100, restConfig.getTimeout().intValue());
-      assertEquals(2000, restConfig.getSocketTimeout().intValue());
-      assertEquals("functionUser", restConfig.getBasicAuthUser());
-      assertEquals("globalHost", restConfig.getProxyHost());
-    }
-
-    globalRestConfig = new HashMap<String, Object>() {{
-      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
-        put(SOCKET_TIMEOUT, 2000);
-        put(BASIC_AUTH_USER, "globalUser");
-      }});
-    }};
-
-    // New global settings should take effect with function call settings staying the same
-    {
-      RestConfig restConfig = restGet.getRestConfig(Arrays.asList("uri", functionRestConfig), globalRestConfig);
-
-      assertEquals(5, restConfig.size());
-      assertEquals(Collections.singletonList(200), restConfig.getResponseCodesAllowed());
-      assertEquals(100, restConfig.getTimeout().intValue());
-      assertEquals(2000, restConfig.getSocketTimeout().intValue());
-      assertEquals("functionUser", restConfig.getBasicAuthUser());
-    }
-
-    // Should fall back to global settings on missing function call config
-    {
-      RestConfig restConfig = restGet.getRestConfig(Collections.singletonList("uri"), globalRestConfig);
-
-      assertEquals(5, restConfig.size());
-      assertEquals(Collections.singletonList(200), restConfig.getResponseCodesAllowed());
-      assertEquals(1000, restConfig.getTimeout().intValue());
-      assertEquals(2000, restConfig.getSocketTimeout().intValue());
-      assertEquals("globalUser", restConfig.getBasicAuthUser());
-    }
-
-    // Should fall back to default settings on missing global settings
-    {
-      RestConfig restConfig = restGet.getRestConfig(Collections.singletonList("uri"), new HashMap<>());
-
-      assertEquals(3, restConfig.size());
-      assertEquals(Collections.singletonList(200), restConfig.getResponseCodesAllowed());
-      assertEquals(1000, restConfig.getTimeout().intValue());
-    }
-
+    RestConfig restConfig = RestFunctions.buildRestConfig(config, priorityConfig);
+    assertEquals(6, restConfig.size());
+    assertEquals(Collections.singletonList(200), restConfig.getResponseCodesAllowed());
+    assertEquals("priorityUser", restConfig.getBasicAuthUser());
+    assertEquals("proxyUser", restConfig.getProxyBasicAuthUser());
+    assertTrue(restConfig.enforceJson());
+    assertEquals(1000, restConfig.getTimeout().intValue());
+    assertFalse(restConfig.verifyContentLength());
   }
 
   /**
@@ -382,10 +153,8 @@
    */
   @Test
   public void restGetShouldGetRequestConfig() {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-
     {
-      RequestConfig actual = restGet.getRequestConfig(new RestConfig(), Optional.empty());
+      RequestConfig actual = RestFunctions.getRequestConfig(new RestConfig(), Optional.empty());
       RequestConfig expected = RequestConfig.custom().build();
 
       assertEquals(expected.getConnectTimeout(), actual.getConnectTimeout());
@@ -399,10 +168,10 @@
       restConfig.put(CONNECT_TIMEOUT, 1);
       restConfig.put(CONNECTION_REQUEST_TIMEOUT, 2);
       restConfig.put(SOCKET_TIMEOUT, 3);
-      HttpHost proxy = new HttpHost("localhost", proxyRule.getHttpPort());
+      HttpHost proxy = new HttpHost("localhost", 3128);
       Optional<HttpHost> proxyOptional = Optional.of(proxy);
 
-      RequestConfig actual = restGet.getRequestConfig(restConfig, proxyOptional);
+      RequestConfig actual = RestFunctions.getRequestConfig(restConfig, proxyOptional);
       RequestConfig expected = RequestConfig.custom()
               .setConnectTimeout(1)
               .setConnectionRequestTimeout(2)
@@ -424,13 +193,12 @@
    */
   @Test
   public void restGetShouldGetHttpClientContext() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-    HttpHost target = new HttpHost("localhost", mockServerRule.getPort());
-    HttpHost proxy = new HttpHost("localhost", proxyRule.getHttpPort());
+    HttpHost target = new HttpHost("localhost", 8080);
+    HttpHost proxy = new HttpHost("localhost", 3128);
 
     {
       RestConfig restConfig = new RestConfig();
-      HttpClientContext actual = restGet.getHttpClientContext(restConfig, target, Optional.empty());
+      HttpClientContext actual = RestFunctions.getHttpClientContext(restConfig, target, Optional.empty());
 
       assertNull(actual.getCredentialsProvider());
     }
@@ -438,9 +206,9 @@
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(BASIC_AUTH_USER, "user");
-      restConfig.put(BASIC_AUTH_PASSWORD_PATH, basicAuthPasswordPath);
+      restConfig.put(BASIC_AUTH_PASSWORD_PATH, basicAuthPasswordFile.getAbsolutePath());
 
-      HttpClientContext actual = restGet.getHttpClientContext(restConfig, target, Optional.empty());
+      HttpClientContext actual = RestFunctions.getHttpClientContext(restConfig, target, Optional.empty());
       HttpClientContext expected = HttpClientContext.create();
       CredentialsProvider expectedCredentialsProvider = new BasicCredentialsProvider();
       expectedCredentialsProvider.setCredentials(
@@ -457,9 +225,9 @@
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(PROXY_BASIC_AUTH_USER, "proxyUser");
-      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, proxyBasicAuthPasswordPath);
+      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, proxyBasicAuthPasswordFile.getAbsolutePath());
 
-      HttpClientContext actual = restGet.getHttpClientContext(restConfig, target, Optional.of(proxy));
+      HttpClientContext actual = RestFunctions.getHttpClientContext(restConfig, target, Optional.of(proxy));
       HttpClientContext expected = HttpClientContext.create();
       CredentialsProvider expectedCredentialsProvider = new BasicCredentialsProvider();
       expectedCredentialsProvider.setCredentials(
@@ -476,11 +244,11 @@
     {
       RestConfig restConfig = new RestConfig();
       restConfig.put(BASIC_AUTH_USER, "user");
-      restConfig.put(BASIC_AUTH_PASSWORD_PATH, basicAuthPasswordPath);
+      restConfig.put(BASIC_AUTH_PASSWORD_PATH, basicAuthPasswordFile.getAbsolutePath());
       restConfig.put(PROXY_BASIC_AUTH_USER, "proxyUser");
-      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, proxyBasicAuthPasswordPath);
+      restConfig.put(PROXY_BASIC_AUTH_PASSWORD_PATH, proxyBasicAuthPasswordFile.getAbsolutePath());
 
-      HttpClientContext actual = restGet.getHttpClientContext(restConfig, target, Optional.of(proxy));
+      HttpClientContext actual = RestFunctions.getHttpClientContext(restConfig, target, Optional.of(proxy));
       HttpClientContext expected = HttpClientContext.create();
       CredentialsProvider expectedCredentialsProvider = new BasicCredentialsProvider();
       expectedCredentialsProvider.setCredentials(
@@ -499,161 +267,109 @@
   }
 
   /**
-   * The REST_GET function should timeout and return null.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void restGetShouldTimeout() {
-    String uri = String.format("http://localhost:%d/get", mockServerRule.getPort());
-
-    mockServerClient.when(
-            request()
-                    .withMethod("GET")
-                    .withPath("/get"))
-            .respond(response()
-                    .withBody("{\"get\":\"success\"}"));
-
-    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
-      put(STELLAR_REST_SETTINGS, new HashMap<String, Object>() {{
-        put(TIMEOUT, 1);
-      }});
-    }};
-
-    context.addCapability(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
-
-    Map<String, Object> actual = (Map<String, Object>) run(String.format("REST_GET('%s')", uri), context);
-    assertNull(actual);
-  }
-
-  /**
-   * {
-   * "timeout": 1
-   * }
-   */
-  @Multiline
-  private String timeoutConfig;
-
-  /**
-   * The REST_GET function should honor the function supplied timeout setting.
-   */
-  @Test
-  @SuppressWarnings("unchecked")
-  public void restGetShouldTimeoutWithSuppliedTimeout() {
-    String expression = String.format("REST_GET('%s', %s)", getUri, timeoutConfig);
-    Map<String, Object> actual = (Map<String, Object>) run(expression, context);
-    assertNull(actual);
-  }
-
-  /**
-   * The REST_GET function should throw an exception on a malformed uri.
-   * @throws IllegalArgumentException
-   * @throws IOException
-   */
-  @Test
-  public void restGetShouldHandleURISyntaxException() throws IllegalArgumentException, IOException {
-    thrown.expect(ParseException.class);
-    thrown.expectMessage("Unable to parse REST_GET('some invalid uri'): Unable to parse: REST_GET('some invalid uri') due to: Illegal character in path at index 4: some invalid uri");
-
-    run("REST_GET('some invalid uri')", context);
-  }
-
-  /**
    * The REST_GET function should handle IOExceptions and return null.
    * @throws IllegalArgumentException
    * @throws IOException
    */
   @Test
   public void restGetShouldHandleIOException() throws IllegalArgumentException, IOException {
-    RestFunctions.RestGet restGet = spy(RestFunctions.RestGet.class);
-    doThrow(new IOException("io exception")).when(restGet).doGet(any(RestConfig.class), any(HttpGet.class), any(HttpClientContext.class));
+    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
+    CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+    ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
 
-    Object result = restGet.apply(Collections.singletonList(getUri), context);
+    RestFunctions.setCloseableHttpClient(httpClient);
+    RestFunctions.setScheduledExecutorService(executorService);
+
+    when(httpClient.execute(any(HttpRequestBase.class), any(HttpClientContext.class))).thenThrow(new IOException("io exception"));
+
+    Object result = restGet.apply(Collections.singletonList("http://www.host.com:8080/some/uri"), context);
     Assert.assertNull(result);
   }
 
-  /**
-   * The REST_GET function should throw an exception when the required uri parameter is missing.
-   */
-  @Test
-  public void restGetShouldThrownExceptionOnMissingParameter() {
-    thrown.expect(ParseException.class);
-    thrown.expectMessage("Unable to parse REST_GET(): Unable to parse: REST_GET() due to: Expected at least 1 argument(s), found 0");
-
-    run("REST_GET()", context);
-  }
-
   @Test
   public void restGetShouldGetPoolingConnectionManager() {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
-
     RestConfig restConfig = new RestConfig();
     restConfig.put(POOLING_MAX_TOTAL, 5);
     restConfig.put(POOLING_DEFAULT_MAX_PER_RUOTE, 2);
 
-    PoolingHttpClientConnectionManager cm = restGet.getConnectionManager(restConfig);
+    PoolingHttpClientConnectionManager cm = RestFunctions.getConnectionManager(restConfig);
 
     assertEquals(5, cm.getMaxTotal());
     assertEquals(2, cm.getDefaultMaxPerRoute());
   }
 
   @Test
-  public void restGetShouldCloseHttpClient() throws Exception {
+  public void restGetShouldClose() throws Exception {
     RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+    ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
 
-    restGet.setHttpClient(httpClient);
+    RestFunctions.setCloseableHttpClient(httpClient);
+    RestFunctions.setScheduledExecutorService(executorService);
     restGet.close();
 
     verify(httpClient, times(1)).close();
+    verify(executorService, times(1)).shutdown();
+    verifyNoMoreInteractions(httpClient);
+  }
+
+  @Test
+  public void restPostShouldClose() throws Exception {
+    RestFunctions.RestPost restPost = new RestFunctions.RestPost();
+    CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+    ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
+
+    RestFunctions.setCloseableHttpClient(httpClient);
+    RestFunctions.setScheduledExecutorService(executorService);
+    restPost.close();
+
+    verify(httpClient, times(1)).close();
+    verify(executorService, times(1)).shutdown();
     verifyNoMoreInteractions(httpClient);
   }
 
   @Test
   public void restGetShouldParseResponse() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     RestConfig restConfig = new RestConfig();
     HttpGet httpGet = mock(HttpGet.class);
     HttpEntity httpEntity = mock(HttpEntity.class);
 
     // return successfully parsed response
     when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream("{\"get\":\"success\"}".getBytes()));
-    Optional<Object> actual = restGet.parseResponse(restConfig, httpGet, httpEntity);
+    Optional<Object> actual = RestFunctions.parseResponse(restConfig, httpGet, httpEntity);
     assertTrue(actual.isPresent());
     assertEquals("success", ((Map<String, Object>) actual.get()).get("get"));
   }
 
   @Test
   public void restGetShouldParseResponseOnNullHttpEntity() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     RestConfig restConfig = new RestConfig();
     HttpGet httpGet = mock(HttpGet.class);
 
     // return empty on null httpEntity
-    assertEquals(Optional.empty(), restGet.parseResponse(restConfig, httpGet, null));
+    assertEquals(Optional.empty(), RestFunctions.parseResponse(restConfig, httpGet, null));
   }
 
   @Test
   public void restGetShouldParseResponseOnNullContent() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     RestConfig restConfig = new RestConfig();
     HttpGet httpGet = mock(HttpGet.class);
     HttpEntity httpEntity = mock(HttpEntity.class);
 
     // return empty on null content
     when(httpEntity.getContent()).thenReturn(null);
-    assertEquals(Optional.empty(), restGet.parseResponse(restConfig, httpGet, httpEntity));
+    assertEquals(Optional.empty(), RestFunctions.parseResponse(restConfig, httpGet, httpEntity));
   }
 
   @Test
   public void restGetShouldParseResponseOnEmptyInputStream() throws Exception {
-    RestFunctions.RestGet restGet = new RestFunctions.RestGet();
     RestConfig restConfig = new RestConfig();
     HttpGet httpGet = mock(HttpGet.class);
     HttpEntity httpEntity = mock(HttpEntity.class);
 
     // return empty on empty input stream
     when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream("".getBytes()));
-    assertEquals(Optional.empty(), restGet.parseResponse(restConfig, httpGet, httpEntity));
+    assertEquals(Optional.empty(), RestFunctions.parseResponse(restConfig, httpGet, httpEntity));
   }
 
   @Test
@@ -670,7 +386,6 @@
     when(httpGet.getURI()).thenReturn(new URI("uri"));
     when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream("{\"get\":\"success\"}".getBytes()));
     when(httpEntity.getContentLength()).thenReturn(-1L);
-    restGet.parseResponse(restConfig, httpGet, httpEntity);
+    RestFunctions.parseResponse(restConfig, httpGet, httpEntity);
   }
-
 }