| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.standard; |
| |
| import com.burgstaller.okhttp.AuthenticationCacheInterceptor; |
| import com.burgstaller.okhttp.CachingAuthenticatorDecorator; |
| import com.burgstaller.okhttp.digest.CachingAuthenticator; |
| import com.burgstaller.okhttp.digest.DigestAuthenticator; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.Proxy; |
| import java.net.Proxy.Type; |
| import java.net.URL; |
| import java.nio.charset.Charset; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.security.Principal; |
| import java.time.ZoneOffset; |
| import java.time.ZonedDateTime; |
| import java.time.format.DateTimeFormatter; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import javax.annotation.Nullable; |
| import javax.net.ssl.SSLContext; |
| import javax.net.ssl.SSLSocketFactory; |
| import javax.net.ssl.X509TrustManager; |
| |
| import okhttp3.Cache; |
| import okhttp3.ConnectionPool; |
| import okhttp3.Credentials; |
| import okhttp3.Handshake; |
| import okhttp3.Headers; |
| import okhttp3.MediaType; |
| import okhttp3.MultipartBody; |
| import okhttp3.MultipartBody.Builder; |
| import okhttp3.OkHttpClient; |
| import okhttp3.Protocol; |
| import okhttp3.Request; |
| import okhttp3.RequestBody; |
| import okhttp3.Response; |
| import okhttp3.ResponseBody; |
| import okio.BufferedSink; |
| import org.apache.commons.io.input.TeeInputStream; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.nifi.annotation.behavior.DynamicProperties; |
| import org.apache.nifi.annotation.behavior.DynamicProperty; |
| import org.apache.nifi.annotation.behavior.InputRequirement; |
| import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; |
| import org.apache.nifi.annotation.behavior.SupportsBatching; |
| import org.apache.nifi.annotation.behavior.WritesAttribute; |
| import org.apache.nifi.annotation.behavior.WritesAttributes; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.expression.AttributeExpression; |
| import org.apache.nifi.expression.ExpressionLanguageScope; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.logging.ComponentLog; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.DataUnit; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.processors.standard.util.ProxyAuthenticator; |
| import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream; |
| import org.apache.nifi.proxy.ProxyConfiguration; |
| import org.apache.nifi.proxy.ProxySpec; |
| import org.apache.nifi.security.util.SslContextFactory; |
| import org.apache.nifi.security.util.TlsConfiguration; |
| import org.apache.nifi.security.util.TlsException; |
| import org.apache.nifi.ssl.SSLContextService; |
| import org.apache.nifi.stream.io.StreamUtils; |
| |
| import static org.apache.commons.lang3.StringUtils.trimToEmpty; |
| |
| @SupportsBatching |
| @Tags({"http", "https", "rest", "client"}) |
| @InputRequirement(Requirement.INPUT_ALLOWED) |
| @CapabilityDescription("An HTTP client processor which can interact with a configurable HTTP Endpoint. The destination URL and HTTP Method are configurable." |
| + " FlowFile attributes are converted to HTTP headers and the FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH).") |
| @WritesAttributes({ |
| @WritesAttribute(attribute = InvokeHTTP.STATUS_CODE, description = "The status code that is returned"), |
| @WritesAttribute(attribute = InvokeHTTP.STATUS_MESSAGE, description = "The status message that is returned"), |
| @WritesAttribute(attribute = InvokeHTTP.RESPONSE_BODY, description = "In the instance where the status code received is not a success (2xx) " |
| + "then the response body will be put to the 'invokehttp.response.body' attribute of the request FlowFile."), |
| @WritesAttribute(attribute = InvokeHTTP.REQUEST_URL, description = "The original request URL"), |
| @WritesAttribute(attribute = InvokeHTTP.RESPONSE_URL, description = "The URL that was ultimately requested after any redirects were followed"), |
| @WritesAttribute(attribute = InvokeHTTP.TRANSACTION_ID, description = "The transaction ID that is returned after reading the response"), |
| @WritesAttribute(attribute = InvokeHTTP.REMOTE_DN, description = "The DN of the remote server"), |
| @WritesAttribute(attribute = InvokeHTTP.EXCEPTION_CLASS, description = "The Java exception class raised when the processor fails"), |
| @WritesAttribute(attribute = InvokeHTTP.EXCEPTION_MESSAGE, description = "The Java exception message raised when the processor fails"), |
| @WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to " |
| + "will become the attribute key and the value would be the body of the HTTP response.")}) |
| @DynamicProperties({ |
| @DynamicProperty(name = "Header Name", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, |
| description = |
| "Send request header with a key matching the Dynamic Property Key and a value created by evaluating " |
| + "the Attribute Expression Language set in the value of the Dynamic Property."), |
| @DynamicProperty(name = "post:form:<NAME>", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, |
| description = |
| "When the HTTP Method is POST, dynamic properties with the property name in the form of post:form:<NAME>," |
| + " where the <NAME> will be the form data name, will be used to fill out the multipart form parts." |
| + " If send message body is false, the flowfile will not be sent, but any other form data will be.") |
| }) |
| public class InvokeHTTP extends AbstractProcessor { |
| public final static String STATUS_CODE = "invokehttp.status.code"; |
| public final static String STATUS_MESSAGE = "invokehttp.status.message"; |
| public final static String RESPONSE_BODY = "invokehttp.response.body"; |
| public final static String REQUEST_URL = "invokehttp.request.url"; |
| public final static String RESPONSE_URL = "invokehttp.response.url"; |
| public final static String TRANSACTION_ID = "invokehttp.tx.id"; |
| public final static String REMOTE_DN = "invokehttp.remote.dn"; |
| public final static String EXCEPTION_CLASS = "invokehttp.java.exception.class"; |
| public final static String EXCEPTION_MESSAGE = "invokehttp.java.exception.message"; |
| |
| public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream"; |
| |
| public static final String FORM_BASE = "post:form"; |
| |
| // Set of flowfile attributes which we generally always ignore during |
| // processing, including when converting http headers, copying attributes, etc. |
| // This set includes our strings defined above as well as some standard flowfile |
| // attributes. |
| public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( |
| STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, RESPONSE_URL, TRANSACTION_ID, REMOTE_DN, |
| EXCEPTION_CLASS, EXCEPTION_MESSAGE, |
| "uuid", "filename", "path"))); |
| |
| public static final String HTTP = "http"; |
| public static final String HTTPS = "https"; |
| |
| private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = Pattern.compile("post:form:(?<formDataName>.*)$"); |
| private static final String FORM_DATA_NAME_GROUP = "formDataName"; |
| |
| // properties |
| public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() |
| .name("HTTP Method") |
| .description("HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS). Arbitrary methods are also supported. " |
| + "Methods other than POST, PUT and PATCH will be sent without a message body.") |
| .required(true) |
| .defaultValue("GET") |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder() |
| .name("Remote URL") |
| .description("Remote URL which will be connected to, including scheme, host, port, path.") |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.URL_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder() |
| .name("Connection Timeout") |
| .description("Max wait time for connection to remote service.") |
| .required(true) |
| .defaultValue("5 secs") |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder() |
| .name("Read Timeout") |
| .description("Max wait time for response from remote service.") |
| .required(true) |
| .defaultValue("15 secs") |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_IDLE_TIMEOUT = new PropertyDescriptor.Builder() |
| .name("idle-timeout") |
| .displayName("Idle Timeout") |
| .description("Max idle time before closing connection to the remote service.") |
| .required(true) |
| .defaultValue("5 mins") |
| .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.MILLISECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_MAX_IDLE_CONNECTIONS = new PropertyDescriptor.Builder() |
| .name("max-idle-connections") |
| .displayName("Max Idle Connections") |
| .description("Max number of idle connections to keep open.") |
| .required(true) |
| .defaultValue("5") |
| .addValidator(StandardValidators.INTEGER_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder() |
| .name("Include Date Header") |
| .description("Include an RFC-2616 Date header in the request.") |
| .required(true) |
| .defaultValue("True") |
| .allowableValues("True", "False") |
| .addValidator(StandardValidators.BOOLEAN_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() |
| .name("Follow Redirects") |
| .description("Follow HTTP redirects issued by remote server.") |
| .required(true) |
| .defaultValue("True") |
| .allowableValues("True", "False") |
| .addValidator(StandardValidators.BOOLEAN_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder() |
| .name("Attributes to Send") |
| .description("Regular expression that defines which attributes to send as HTTP headers in the request. " |
| + "If not defined, no attributes are sent as headers. Also any dynamic properties set will be sent as headers. " |
| + "The dynamic property key will be the header key and the dynamic property value will be interpreted as expression " |
| + "language will be the header value.") |
| .required(false) |
| .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_USERAGENT = new PropertyDescriptor.Builder() |
| .name("Useragent") |
| .displayName("Useragent") |
| .description("The Useragent identifier sent along with each request") |
| .required(false) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() |
| .name("SSL Context Service") |
| .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections." |
| + " It is also used to connect to HTTPS Proxy.") |
| .required(false) |
| .identifiesControllerService(SSLContextService.class) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_PROXY_TYPE = new PropertyDescriptor.Builder() |
| .name("Proxy Type") |
| .displayName("Proxy Type") |
| .description("The type of the proxy we are connecting to. Must be either " + HTTP + " or " + HTTPS) |
| .defaultValue(HTTP) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder() |
| .name("Proxy Host") |
| .description("The fully qualified hostname or IP address of the proxy server") |
| .required(false) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_PROXY_PORT = new PropertyDescriptor.Builder() |
| .name("Proxy Port") |
| .description("The port of the proxy server") |
| .required(false) |
| .addValidator(StandardValidators.PORT_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_PROXY_USER = new PropertyDescriptor.Builder() |
| .name("invokehttp-proxy-user") |
| .displayName("Proxy Username") |
| .description("Username to set when authenticating against proxy") |
| .required(false) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_PROXY_PASSWORD = new PropertyDescriptor.Builder() |
| .name("invokehttp-proxy-password") |
| .displayName("Proxy Password") |
| .description("Password to set when authenticating against proxy") |
| .required(false) |
| .sensitive(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_CONTENT_TYPE = new PropertyDescriptor.Builder() |
| .name("Content-Type") |
| .description("The Content-Type to specify for when content is being transmitted through a PUT, POST or PATCH. " |
| + "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE) |
| .required(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}") |
| .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_SEND_BODY = new PropertyDescriptor.Builder() |
| .name("send-message-body") |
| .displayName("Send Message Body") |
| .description("If true, sends the HTTP message body on POST/PUT/PATCH requests (default). If false, suppresses the message body and content-type header for these requests.") |
| .defaultValue("true") |
| .allowableValues("true", "false") |
| .required(false) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_FORM_BODY_FORM_NAME = new PropertyDescriptor.Builder() |
| .name("form-body-form-name") |
| .displayName("FlowFile Form Data Name") |
| .description("When Send Message Body is true, and FlowFile Form Data Name is set, " |
| + " the FlowFile will be sent as the message body in multipart/form format with this value " |
| + "as the form data name.") |
| .required(false) |
| .addValidator( |
| StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_SET_FORM_FILE_NAME = new PropertyDescriptor.Builder() |
| .name("set-form-filename") |
| .displayName("Set FlowFile Form Data File Name") |
| .description( |
| "When Send Message Body is true, FlowFile Form Data Name is set, " |
| + "and Set FlowFile Form Data File Name is true, the FlowFile's fileName value " |
| + "will be set as the filename property of the form data.") |
| .required(false) |
| .defaultValue("true") |
| .allowableValues("true", "false") |
| .build(); |
| |
| // Per RFC 7235, 2617, and 2616. |
| // basic-credentials = base64-user-pass |
| // base64-user-pass = userid ":" password |
| // userid = *<TEXT excluding ":"> |
| // password = *TEXT |
| // |
| // OCTET = <any 8-bit sequence of data> |
| // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)> |
| // LWS = [CRLF] 1*( SP | HT ) |
| // TEXT = <any OCTET except CTLs but including LWS> |
| // |
| // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs. |
| public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder() |
| .name("Basic Authentication Username") |
| .displayName("Basic Authentication Username") |
| .description("The username to be used by the client to authenticate against the Remote URL. Cannot include control characters (0-31), ':', or DEL (127).") |
| .required(false) |
| .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$"))) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder() |
| .name("Basic Authentication Password") |
| .displayName("Basic Authentication Password") |
| .description("The password to be used by the client to authenticate against the Remote URL.") |
| .required(false) |
| .sensitive(true) |
| .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$"))) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder() |
| .name("Put Response Body In Attribute") |
| .description("If set, the response body received back will be put into an attribute of the original FlowFile instead of a separate " |
| + "FlowFile. The attribute key to put to is determined by evaluating value of this property. ") |
| .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| |
| public static final PropertyDescriptor PROP_PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder() |
| .name("Max Length To Put In Attribute") |
| .description("If routing the response body to an attribute of the original (by setting the \"Put response body in attribute\" " |
| + "property or by receiving an error status code), the number of characters put to the attribute value will be at " |
| + "most this amount. This is important because attributes are held in memory and large attributes will quickly " |
| + "cause out of memory issues. If the output goes longer than this value, it will be truncated to fit. " |
| + "Consider making this smaller if able.") |
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) |
| .defaultValue("256") |
| .build(); |
| |
| public static final PropertyDescriptor PROP_DIGEST_AUTH = new PropertyDescriptor.Builder() |
| .name("Digest Authentication") |
| .displayName("Use Digest Authentication") |
| .description("Whether to communicate with the website using Digest Authentication. 'Basic Authentication Username' and 'Basic Authentication Password' are used " |
| + "for authentication.") |
| .required(false) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .build(); |
| |
| public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder() |
| .name("Always Output Response") |
| .description("Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is " |
| + "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the " |
| + "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.") |
| .required(false) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .build(); |
| |
| public static final PropertyDescriptor PROP_ADD_HEADERS_TO_REQUEST = new PropertyDescriptor.Builder() |
| .name("Add Response Headers to Request") |
| .description("Enabling this property saves all the response headers to the original request. This may be when the response headers are needed " |
| + "but a response is not generated due to the status code received.") |
| .required(false) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .build(); |
| |
| public static final PropertyDescriptor PROP_USE_CHUNKED_ENCODING = new PropertyDescriptor.Builder() |
| .name("Use Chunked Encoding") |
| .description("When POST'ing, PUT'ing or PATCH'ing content set this property to true in order to not pass the 'Content-length' header and instead send 'Transfer-Encoding' with " |
| + "a value of 'chunked'. This will enable the data transfer mechanism which was introduced in HTTP 1.1 to pass data of unknown lengths in chunks.") |
| .required(true) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .build(); |
| |
| public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder() |
| .name("Penalize on \"No Retry\"") |
| .description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.") |
| .required(false) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .build(); |
| |
| public static final PropertyDescriptor PROP_USE_ETAG = new PropertyDescriptor.Builder() |
| .name("use-etag") |
| .description("Enable HTTP entity tag (ETag) support for HTTP requests.") |
| .displayName("Use HTTP ETag") |
| .required(true) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .build(); |
| |
| public static final PropertyDescriptor PROP_ETAG_MAX_CACHE_SIZE = new PropertyDescriptor.Builder() |
| .name("etag-max-cache-size") |
| .description("The maximum size that the ETag cache should be allowed to grow to. The default size is 10MB.") |
| .displayName("Maximum ETag Cache Size") |
| .required(true) |
| .defaultValue("10MB") |
| .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) |
| .build(); |
| |
| public static final PropertyDescriptor IGNORE_RESPONSE_CONTENT = new PropertyDescriptor.Builder() |
| .name("ignore-response-content") |
| .description("If true, the processor will not write the response's content into the flow file.") |
| .displayName("Ignore response's content") |
| .required(true) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .build(); |
| |
| public static final PropertyDescriptor DISABLE_HTTP2_PROTOCOL = new PropertyDescriptor.Builder() |
| .name("disable-http2") |
| .description("Determines whether or not to disable use of the HTTP/2 protocol version. If disabled, only HTTP/1.1 is supported.") |
| .displayName("Disable HTTP/2") |
| .required(true) |
| .defaultValue("False") |
| .allowableValues("True", "False") |
| .build(); |
| |
| private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH, ProxySpec.SOCKS}; |
| public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE |
| = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS); |
| |
| public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( |
| PROP_METHOD, |
| PROP_URL, |
| PROP_SSL_CONTEXT_SERVICE, |
| PROP_CONNECT_TIMEOUT, |
| PROP_READ_TIMEOUT, |
| PROP_IDLE_TIMEOUT, |
| PROP_MAX_IDLE_CONNECTIONS, |
| PROP_DATE_HEADER, |
| PROP_FOLLOW_REDIRECTS, |
| DISABLE_HTTP2_PROTOCOL, |
| PROP_ATTRIBUTES_TO_SEND, |
| PROP_USERAGENT, |
| PROP_BASIC_AUTH_USERNAME, |
| PROP_BASIC_AUTH_PASSWORD, |
| PROXY_CONFIGURATION_SERVICE, |
| PROP_PROXY_HOST, |
| PROP_PROXY_PORT, |
| PROP_PROXY_TYPE, |
| PROP_PROXY_USER, |
| PROP_PROXY_PASSWORD, |
| PROP_PUT_OUTPUT_IN_ATTRIBUTE, |
| PROP_PUT_ATTRIBUTE_MAX_LENGTH, |
| PROP_DIGEST_AUTH, |
| PROP_OUTPUT_RESPONSE_REGARDLESS, |
| PROP_ADD_HEADERS_TO_REQUEST, |
| PROP_CONTENT_TYPE, |
| PROP_SEND_BODY, |
| PROP_USE_CHUNKED_ENCODING, |
| PROP_PENALIZE_NO_RETRY, |
| PROP_USE_ETAG, |
| PROP_ETAG_MAX_CACHE_SIZE, |
| IGNORE_RESPONSE_CONTENT, |
| PROP_FORM_BODY_FORM_NAME, |
| PROP_SET_FORM_FILE_NAME)); |
| |
| // relationships |
| public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder() |
| .name("Original") |
| .description("The original FlowFile will be routed upon success (2xx status codes). It will have new attributes detailing the " |
| + "success of the request.") |
| .build(); |
| |
| public static final Relationship REL_RESPONSE = new Relationship.Builder() |
| .name("Response") |
| .description("A Response FlowFile will be routed upon success (2xx status codes). If the 'Output Response Regardless' property " |
| + "is true then the response will be sent to this relationship regardless of the status code received.") |
| .build(); |
| |
| public static final Relationship REL_RETRY = new Relationship.Builder() |
| .name("Retry") |
| .description("The original FlowFile will be routed on any status code that can be retried (5xx status codes). It will have new " |
| + "attributes detailing the request.") |
| .build(); |
| |
| public static final Relationship REL_NO_RETRY = new Relationship.Builder() |
| .name("No Retry") |
| .description("The original FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes). " |
| + "It will have new attributes detailing the request.") |
| .build(); |
| |
| public static final Relationship REL_FAILURE = new Relationship.Builder() |
| .name("Failure") |
| .description("The original FlowFile will be routed on any type of connection failure, timeout or general exception. " |
| + "It will have new attributes detailing the request.") |
| .build(); |
| |
| public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( |
| REL_SUCCESS_REQ, REL_RESPONSE, REL_RETRY, REL_NO_RETRY, REL_FAILURE))); |
| |
| // RFC 2616 Date Time Formatter with hard-coded GMT Zone |
| // https://tools.ietf.org/html/rfc2616#section-3.3 - date format header should not be localized |
| private static final DateTimeFormatter RFC_2616_DATE_TIME = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.US); |
| |
| // Multiple Header Delimiter |
| private static final String MULTIPLE_HEADER_DELIMITER = ", "; |
| |
| private volatile Set<String> dynamicPropertyNames = new HashSet<>(); |
| |
| private volatile Pattern regexAttributesToSend = null; |
| |
| private volatile boolean useChunked = false; |
| |
| private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>(); |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return PROPERTIES; |
| } |
| |
| @Override |
| protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { |
| if (propertyDescriptorName.startsWith(FORM_BASE)) { |
| |
| Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(propertyDescriptorName); |
| if (matcher.matches()) { |
| return new PropertyDescriptor.Builder() |
| .required(false) |
| .name(propertyDescriptorName) |
| .description("Form Data " + matcher.group(FORM_DATA_NAME_GROUP)) |
| .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) |
| .dynamic(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| } |
| return null; |
| } |
| |
| return new PropertyDescriptor.Builder() |
| .required(false) |
| .name(propertyDescriptorName) |
| .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) |
| .dynamic(true) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| } |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| return RELATIONSHIPS; |
| } |
| |
| @Override |
| public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { |
| if (descriptor.isDynamic()) { |
| final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames); |
| if (newValue == null) { |
| newDynamicPropertyNames.remove(descriptor.getName()); |
| } else if (oldValue == null) { // new property |
| newDynamicPropertyNames.add(descriptor.getName()); |
| } |
| this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames); |
| } else { |
| // compile the attributes-to-send filter pattern |
| if (PROP_ATTRIBUTES_TO_SEND.getName().equalsIgnoreCase(descriptor.getName())) { |
| if (newValue == null || newValue.isEmpty()) { |
| regexAttributesToSend = null; |
| } else { |
| final String trimmedValue = StringUtils.trimToEmpty(newValue); |
| regexAttributesToSend = Pattern.compile(trimmedValue); |
| } |
| } |
| } |
| } |
| |
| @Override |
| protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { |
| final List<ValidationResult> results = new ArrayList<>(3); |
| final boolean proxyHostSet = validationContext.getProperty(PROP_PROXY_HOST).isSet(); |
| final boolean proxyPortSet = validationContext.getProperty(PROP_PROXY_PORT).isSet(); |
| |
| if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) { |
| results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build()); |
| } |
| |
| final boolean proxyUserSet = validationContext.getProperty(PROP_PROXY_USER).isSet(); |
| final boolean proxyPwdSet = validationContext.getProperty(PROP_PROXY_PASSWORD).isSet(); |
| |
| if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) { |
| results.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build()); |
| } |
| if (proxyUserSet && !proxyHostSet) { |
| results.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy username is set, proxy host must be set").build()); |
| } |
| |
| final String proxyType = validationContext.getProperty(PROP_PROXY_TYPE).evaluateAttributeExpressions().getValue(); |
| |
| if (!HTTP.equals(proxyType) && !HTTPS.equals(proxyType)) { |
| results.add(new ValidationResult.Builder().subject(PROP_PROXY_TYPE.getDisplayName()).valid(false) |
| .explanation(PROP_PROXY_TYPE.getDisplayName() + " must be either " + HTTP + " or " + HTTPS).build()); |
| } |
| |
| if (HTTPS.equals(proxyType) |
| && !validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) { |
| results.add(new ValidationResult.Builder().subject("SSL Context Service").valid(false).explanation("If Proxy Type is HTTPS, SSL Context Service must be set").build()); |
| } |
| |
| ProxyConfiguration.validateProxySpec(validationContext, results, PROXY_SPECS); |
| |
| // Check for dynamic properties for form components. |
| // Even if the flowfile is not sent, we may still send form parameters. |
| boolean hasFormData = false; |
| for (final PropertyDescriptor descriptor : validationContext.getProperties().keySet()) { |
| final Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(descriptor.getName()); |
| if (matcher.matches()) { |
| hasFormData = true; |
| break; |
| } |
| } |
| |
| // if form data exists, and send body is true, Flowfile Form Data Name must be set. |
| final boolean sendBody = validationContext.getProperty(PROP_SEND_BODY).asBoolean(); |
| final boolean contentNameSet = validationContext.getProperty(PROP_FORM_BODY_FORM_NAME).isSet(); |
| if (hasFormData) { |
| if (sendBody && !contentNameSet) { |
| final String explanation = String.format("[%s] is required when Form Data properties are configured and [%s] is enabled", |
| PROP_FORM_BODY_FORM_NAME.getDisplayName(), |
| PROP_SEND_BODY.getDisplayName()); |
| results.add(new ValidationResult.Builder() |
| .subject(PROP_FORM_BODY_FORM_NAME.getDisplayName()) |
| .valid(false) |
| .explanation(explanation) |
| .build()); |
| } |
| } |
| if (!sendBody && contentNameSet) { |
| final String explanation = String.format("[%s] must be [true] when Form Data properties are configured and [%s] is configured", |
| PROP_SEND_BODY.getDisplayName(), |
| PROP_FORM_BODY_FORM_NAME.getDisplayName()); |
| results.add(new ValidationResult.Builder() |
| .subject(PROP_FORM_BODY_FORM_NAME.getDisplayName()) |
| .valid(false) |
| .explanation(explanation) |
| .build()); |
| } |
| |
| return results; |
| } |
| |
| @OnScheduled |
| public void setUpClient(final ProcessContext context) throws TlsException, IOException { |
| okHttpClientAtomicReference.set(null); |
| |
| OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder(); |
| |
| final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> { |
| final ProxyConfiguration componentProxyConfig = new ProxyConfiguration(); |
| final String proxyHost = context.getProperty(PROP_PROXY_HOST).evaluateAttributeExpressions().getValue(); |
| final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).evaluateAttributeExpressions().asInteger(); |
| if (proxyHost != null && proxyPort != null) { |
| componentProxyConfig.setProxyType(Type.HTTP); |
| componentProxyConfig.setProxyServerHost(proxyHost); |
| componentProxyConfig.setProxyServerPort(proxyPort); |
| final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).evaluateAttributeExpressions().getValue()); |
| final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue(); |
| componentProxyConfig.setProxyUserName(proxyUsername); |
| componentProxyConfig.setProxyUserPassword(proxyPassword); |
| } |
| return componentProxyConfig; |
| }); |
| |
| final Proxy proxy = proxyConfig.createProxy(); |
| if (!Type.DIRECT.equals(proxy.type())) { |
| okHttpClientBuilder.proxy(proxy); |
| if (proxyConfig.hasCredential()) { |
| ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword()); |
| okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator); |
| } |
| } |
| |
| // configure ETag cache if enabled |
| final boolean etagEnabled = context.getProperty(PROP_USE_ETAG).asBoolean(); |
| if (etagEnabled) { |
| final int maxCacheSizeBytes = context.getProperty(PROP_ETAG_MAX_CACHE_SIZE).asDataSize(DataUnit.B).intValue(); |
| okHttpClientBuilder.cache(new Cache(getETagCacheDir(), maxCacheSizeBytes)); |
| } |
| |
| // Configure whether HTTP/2 protocol should be disabled |
| if (context.getProperty(DISABLE_HTTP2_PROTOCOL).asBoolean()) { |
| okHttpClientBuilder.protocols(Collections.singletonList(Protocol.HTTP_1_1)); |
| } |
| |
| // Set timeouts |
| okHttpClientBuilder.connectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); |
| okHttpClientBuilder.readTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); |
| |
| // Set connectionpool limits |
| okHttpClientBuilder.connectionPool( |
| new ConnectionPool( |
| context.getProperty(PROP_MAX_IDLE_CONNECTIONS).asInteger(), |
| context.getProperty(PROP_IDLE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS |
| ) |
| ); |
| |
| // Set whether to follow redirects |
| okHttpClientBuilder.followRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean()); |
| |
| // Apply the TLS configuration if present |
| final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); |
| if (sslService != null) { |
| final SSLContext sslContext = sslService.createContext(); |
| final SSLSocketFactory socketFactory = sslContext.getSocketFactory(); |
| final TlsConfiguration tlsConfiguration = sslService.createTlsConfiguration(); |
| final X509TrustManager trustManager = SslContextFactory.getX509TrustManager(tlsConfiguration); |
| okHttpClientBuilder.sslSocketFactory(socketFactory, trustManager); |
| } |
| |
| setAuthenticator(okHttpClientBuilder, context); |
| |
| useChunked = context.getProperty(PROP_USE_CHUNKED_ENCODING).asBoolean(); |
| |
| okHttpClientAtomicReference.set(okHttpClientBuilder.build()); |
| } |
| |
| private void setAuthenticator(OkHttpClient.Builder okHttpClientBuilder, ProcessContext context) { |
| final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); |
| |
| // If the username/password properties are set then check if digest auth is being used |
| if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) { |
| final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue()); |
| |
| /* |
| * OkHttp doesn't have built-in Digest Auth Support. A ticket for adding it is here[1] but they authors decided instead to rely on a 3rd party lib. |
| * |
| * [1] https://github.com/square/okhttp/issues/205#issuecomment-154047052 |
| */ |
| final Map<String, CachingAuthenticator> authCache = new ConcurrentHashMap<>(); |
| com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass); |
| final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials); |
| |
| okHttpClientBuilder.interceptors().add(new AuthenticationCacheInterceptor(authCache)); |
| okHttpClientBuilder.authenticator(new CachingAuthenticatorDecorator(digestAuthenticator, authCache)); |
| } |
| } |
| |
| @Override |
| public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { |
| OkHttpClient okHttpClient = okHttpClientAtomicReference.get(); |
| |
| FlowFile requestFlowFile = session.get(); |
| |
| // Checking to see if the property to put the body of the response in an attribute was set |
| boolean putToAttribute = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).isSet(); |
| if (requestFlowFile == null) { |
| if (context.hasNonLoopConnection()) { |
| return; |
| } |
| |
| String request = context.getProperty(PROP_METHOD).evaluateAttributeExpressions().getValue().toUpperCase(); |
| if ("POST".equals(request) || "PUT".equals(request) || "PATCH".equals(request)) { |
| return; |
| } else if (putToAttribute) { |
| requestFlowFile = session.create(); |
| } |
| } |
| |
| // Setting some initial variables |
| final int maxAttributeSize = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger(); |
| final ComponentLog logger = getLogger(); |
| |
| // Every request/response cycle has a unique transaction id which will be stored as a flowfile attribute. |
| final UUID txId = UUID.randomUUID(); |
| |
| FlowFile responseFlowFile = null; |
| try { |
| // read the url property from the context |
| final String urlProperty = trimToEmpty(context.getProperty(PROP_URL).evaluateAttributeExpressions(requestFlowFile).getValue()); |
| final URL url = new URL(urlProperty); |
| |
| Request httpRequest = configureRequest(context, session, requestFlowFile, url); |
| |
| // log request |
| logRequest(logger, httpRequest); |
| |
| // emit send provenance event if successfully sent to the server |
| if (httpRequest.body() != null) { |
| session.getProvenanceReporter().send(requestFlowFile, url.toExternalForm(), true); |
| } |
| |
| final long startNanos = System.nanoTime(); |
| |
| try (Response responseHttp = okHttpClient.newCall(httpRequest).execute()) { |
| // output the raw response headers (DEBUG level only) |
| logResponse(logger, url, responseHttp); |
| |
| // store the status code and message |
| int statusCode = responseHttp.code(); |
| String statusMessage = responseHttp.message(); |
| |
| // Create a map of the status attributes that are always written to the request and response FlowFiles |
| Map<String, String> statusAttributes = new HashMap<>(); |
| statusAttributes.put(STATUS_CODE, String.valueOf(statusCode)); |
| statusAttributes.put(STATUS_MESSAGE, statusMessage); |
| statusAttributes.put(REQUEST_URL, url.toExternalForm()); |
| statusAttributes.put(RESPONSE_URL, responseHttp.request().url().toString()); |
| statusAttributes.put(TRANSACTION_ID, txId.toString()); |
| |
| if (requestFlowFile != null) { |
| requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes); |
| } |
| |
| // If the property to add the response headers to the request flowfile is true then add them |
| if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean() && requestFlowFile != null) { |
| // write the response headers as attributes |
| // this will overwrite any existing flowfile attributes |
| requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp)); |
| } |
| |
| boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null; |
| boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean(); |
| ResponseBody responseBody = responseHttp.body(); |
| boolean bodyExists = responseBody != null && !context.getProperty(IGNORE_RESPONSE_CONTENT).asBoolean(); |
| |
| InputStream responseBodyStream = null; |
| SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null; |
| TeeInputStream teeInputStream = null; |
| try { |
| responseBodyStream = bodyExists ? responseBody.byteStream() : null; |
| if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) { |
| outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize); |
| teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute); |
| } |
| |
| if (outputBodyToResponseContent) { |
| /* |
| * If successful and putting to response flowfile, store the response body as the flowfile payload |
| * we include additional flowfile attributes including the response headers and the status codes. |
| */ |
| |
| // clone the flowfile to capture the response |
| if (requestFlowFile != null) { |
| responseFlowFile = session.create(requestFlowFile); |
| } else { |
| responseFlowFile = session.create(); |
| } |
| |
| // write attributes to response flowfile |
| responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes); |
| |
| // write the response headers as attributes |
| // this will overwrite any existing flowfile attributes |
| responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp)); |
| |
| // transfer the message body to the payload |
| // can potentially be null in edge cases |
| if (bodyExists) { |
| // write content type attribute to response flowfile if it is available |
| final MediaType contentType = responseBody.contentType(); |
| if (contentType != null) { |
| responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), contentType.toString()); |
| } |
| if (teeInputStream != null) { |
| responseFlowFile = session.importFrom(teeInputStream, responseFlowFile); |
| } else { |
| responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile); |
| } |
| |
| // emit provenance event |
| final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); |
| if (requestFlowFile != null) { |
| session.getProvenanceReporter().fetch(responseFlowFile, url.toExternalForm(), millis); |
| } else { |
| session.getProvenanceReporter().receive(responseFlowFile, url.toExternalForm(), millis); |
| } |
| } |
| } |
| |
| // if not successful and request flowfile is not null, store the response body into a flowfile attribute |
| if (outputBodyToRequestAttribute && bodyExists) { |
| String attributeKey = context.getProperty(PROP_PUT_OUTPUT_IN_ATTRIBUTE).evaluateAttributeExpressions(requestFlowFile).getValue(); |
| if (attributeKey == null) { |
| attributeKey = RESPONSE_BODY; |
| } |
| byte[] outputBuffer; |
| int size; |
| |
| if (outputStreamToRequestAttribute != null) { |
| outputBuffer = outputStreamToRequestAttribute.getBuffer(); |
| size = outputStreamToRequestAttribute.size(); |
| } else { |
| outputBuffer = new byte[maxAttributeSize]; |
| size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false); |
| } |
| String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType())); |
| requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString); |
| |
| final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); |
| session.getProvenanceReporter().modifyAttributes(requestFlowFile, "The " + attributeKey + " has been added. The value of which is the body of a http call to " |
| + url.toExternalForm() + ". It took " + millis + "millis,"); |
| } |
| } finally { |
| if (outputStreamToRequestAttribute != null) { |
| outputStreamToRequestAttribute.close(); |
| } |
| if (teeInputStream != null) { |
| teeInputStream.close(); |
| } else if (responseBodyStream != null) { |
| responseBodyStream.close(); |
| } |
| } |
| |
| route(requestFlowFile, responseFlowFile, session, context, statusCode); |
| |
| } |
| } catch (final Exception e) { |
| // penalize or yield |
| if (requestFlowFile != null) { |
| logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e); |
| requestFlowFile = session.penalize(requestFlowFile); |
| requestFlowFile = session.putAttribute(requestFlowFile, EXCEPTION_CLASS, e.getClass().getName()); |
| requestFlowFile = session.putAttribute(requestFlowFile, EXCEPTION_MESSAGE, e.getMessage()); |
| // transfer original to failure |
| session.transfer(requestFlowFile, REL_FAILURE); |
| } else { |
| logger.error("Yielding processor due to exception encountered as a source processor: {}", e); |
| context.yield(); |
| } |
| |
| |
| // cleanup response flowfile, if applicable |
| if (responseFlowFile != null) { |
| session.remove(responseFlowFile); |
| } |
| } |
| } |
| |
| |
| private Request configureRequest(final ProcessContext context, final ProcessSession session, final FlowFile requestFlowFile, URL url) { |
| final Request.Builder requestBuilder = new Request.Builder(); |
| |
| requestBuilder.url(url); |
| final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue()); |
| |
| // If the username/password properties are set then check if digest auth is being used |
| if (!authUser.isEmpty() && "false".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) { |
| final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue()); |
| |
| String credential = Credentials.basic(authUser, authPass); |
| requestBuilder.header("Authorization", credential); |
| } |
| |
| // set the request method |
| String method = trimToEmpty(context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile).getValue()).toUpperCase(); |
| switch (method) { |
| case "GET": |
| requestBuilder.get(); |
| break; |
| case "POST": |
| RequestBody requestBody = getRequestBodyToSend(session, context, requestFlowFile); |
| requestBuilder.post(requestBody); |
| break; |
| case "PUT": |
| requestBody = getRequestBodyToSend(session, context, requestFlowFile); |
| requestBuilder.put(requestBody); |
| break; |
| case "PATCH": |
| requestBody = getRequestBodyToSend(session, context, requestFlowFile); |
| requestBuilder.patch(requestBody); |
| break; |
| case "HEAD": |
| requestBuilder.head(); |
| break; |
| case "DELETE": |
| requestBuilder.delete(); |
| break; |
| default: |
| requestBuilder.method(method, null); |
| } |
| |
| String userAgent = trimToEmpty(context.getProperty(PROP_USERAGENT).evaluateAttributeExpressions(requestFlowFile).getValue()); |
| requestBuilder.addHeader("User-Agent", userAgent); |
| |
| setHeaderProperties(context, requestBuilder, requestFlowFile); |
| |
| return requestBuilder.build(); |
| } |
| |
| private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context, |
| final FlowFile requestFlowFile) { |
| |
| boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean(); |
| |
| String evalContentType = context.getProperty(PROP_CONTENT_TYPE) |
| .evaluateAttributeExpressions(requestFlowFile).getValue(); |
| final String contentType = StringUtils.isBlank(evalContentType) ? DEFAULT_CONTENT_TYPE : evalContentType; |
| String contentKey = context.getProperty(PROP_FORM_BODY_FORM_NAME).evaluateAttributeExpressions(requestFlowFile).getValue(); |
| |
| // Check for dynamic properties for form components. |
| // Even if the flowfile is not sent, we may still send form parameters. |
| Map<String, PropertyDescriptor> propertyDescriptors = new HashMap<>(); |
| for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { |
| Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(entry.getKey().getName()); |
| if (matcher.matches()) { |
| propertyDescriptors.put(matcher.group(FORM_DATA_NAME_GROUP), entry.getKey()); |
| } |
| } |
| |
| RequestBody requestBody = new RequestBody() { |
| @Nullable |
| @Override |
| public MediaType contentType() { |
| return MediaType.parse(contentType); |
| } |
| |
| @Override |
| public void writeTo(BufferedSink sink) { |
| session.exportTo(requestFlowFile, sink.outputStream()); |
| } |
| |
| @Override |
| public long contentLength() { |
| return useChunked ? -1 : requestFlowFile.getSize(); |
| } |
| }; |
| |
| if (propertyDescriptors.size() > 0 || StringUtils.isNotEmpty(contentKey)) { |
| // we have form data |
| MultipartBody.Builder builder = new Builder().setType(MultipartBody.FORM); |
| boolean useFileName = context.getProperty(PROP_SET_FORM_FILE_NAME).asBoolean(); |
| String contentFileName = null; |
| if (useFileName) { |
| contentFileName = requestFlowFile.getAttribute(CoreAttributes.FILENAME.key()); |
| } |
| // loop through the dynamic form parameters |
| for (final Map.Entry<String, PropertyDescriptor> entry : propertyDescriptors.entrySet()) { |
| final String propValue = context.getProperty(entry.getValue().getName()) |
| .evaluateAttributeExpressions(requestFlowFile).getValue(); |
| builder.addFormDataPart(entry.getKey(), propValue); |
| } |
| if (sendBody) { |
| builder.addFormDataPart(contentKey, contentFileName, requestBody); |
| } |
| return builder.build(); |
| } else if (sendBody) { |
| return requestBody; |
| } |
| return RequestBody.create(new byte[0], null); |
| } |
| |
| private void setHeaderProperties(final ProcessContext context, final Request.Builder requestBuilder, final FlowFile requestFlowFile) { |
| // check if we should send the a Date header with the request |
| if (context.getProperty(PROP_DATE_HEADER).asBoolean()) { |
| final ZonedDateTime universalCoordinatedTimeNow = ZonedDateTime.now(ZoneOffset.UTC); |
| requestBuilder.addHeader("Date", RFC_2616_DATE_TIME.format(universalCoordinatedTimeNow)); |
| } |
| |
| for (String headerKey : dynamicPropertyNames) { |
| String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestFlowFile).getValue(); |
| |
| // don't include dynamic form data properties |
| if (DYNAMIC_FORM_PARAMETER_NAME.matcher(headerKey).matches()) { |
| continue; |
| } |
| |
| requestBuilder.addHeader(headerKey, headerValue); |
| } |
| |
| // iterate through the flowfile attributes, adding any attribute that |
| // matches the attributes-to-send pattern. if the pattern is not set |
| // (it's an optional property), ignore that attribute entirely |
| if (regexAttributesToSend != null && requestFlowFile != null) { |
| Map<String, String> attributes = requestFlowFile.getAttributes(); |
| Matcher m = regexAttributesToSend.matcher(""); |
| for (Map.Entry<String, String> entry : attributes.entrySet()) { |
| String headerKey = trimToEmpty(entry.getKey()); |
| |
| // don't include any of the ignored attributes |
| if (IGNORED_ATTRIBUTES.contains(headerKey)) { |
| continue; |
| } |
| |
| // check if our attribute key matches the pattern |
| // if so, include in the request as a header |
| m.reset(headerKey); |
| if (m.matches()) { |
| String headerVal = trimToEmpty(entry.getValue()); |
| requestBuilder.addHeader(headerKey, headerVal); |
| } |
| } |
| } |
| } |
| |
| |
| private void route(FlowFile request, FlowFile response, ProcessSession session, ProcessContext context, int statusCode) { |
| // check if we should yield the processor |
| if (!isSuccess(statusCode) && request == null) { |
| context.yield(); |
| } |
| |
| // If the property to output the response flowfile regardless of status code is set then transfer it |
| boolean responseSent = false; |
| if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) { |
| session.transfer(response, REL_RESPONSE); |
| responseSent = true; |
| } |
| |
| // transfer to the correct relationship |
| // 2xx -> SUCCESS |
| if (isSuccess(statusCode)) { |
| // we have two flowfiles to transfer |
| if (request != null) { |
| session.transfer(request, REL_SUCCESS_REQ); |
| } |
| if (response != null && !responseSent) { |
| session.transfer(response, REL_RESPONSE); |
| } |
| |
| // 5xx -> RETRY |
| } else if (statusCode / 100 == 5) { |
| if (request != null) { |
| request = session.penalize(request); |
| session.transfer(request, REL_RETRY); |
| } |
| |
| // 1xx, 3xx, 4xx -> NO RETRY |
| } else { |
| if (request != null) { |
| if (context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean()) { |
| request = session.penalize(request); |
| } |
| session.transfer(request, REL_NO_RETRY); |
| } |
| } |
| |
| } |
| |
| private boolean isSuccess(int statusCode) { |
| return statusCode / 100 == 2; |
| } |
| |
| private void logRequest(ComponentLog logger, Request request) { |
| logger.debug("\nRequest to remote service:\n\t{}\n{}", |
| request.url().url().toExternalForm(), getLogString(request.headers().toMultimap())); |
| } |
| |
| private void logResponse(ComponentLog logger, URL url, Response response) { |
| logger.debug("\nResponse from remote service:\n\t{}\n{}", |
| url.toExternalForm(), getLogString(response.headers().toMultimap())); |
| } |
| |
| private String getLogString(Map<String, List<String>> map) { |
| StringBuilder sb = new StringBuilder(); |
| for (Map.Entry<String, List<String>> entry : map.entrySet()) { |
| List<String> list = entry.getValue(); |
| if (!list.isEmpty()) { |
| sb.append("\t"); |
| sb.append(entry.getKey()); |
| sb.append(": "); |
| if (list.size() == 1) { |
| sb.append(list.get(0)); |
| } else { |
| sb.append(list.toString()); |
| } |
| sb.append("\n"); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings. |
| */ |
| private Map<String, String> convertAttributesFromHeaders(final Response responseHttp) { |
| // create a new hashmap to store the values from the connection |
| final Map<String, String> attributes = new HashMap<>(); |
| final Headers headers = responseHttp.headers(); |
| headers.names().forEach((key) -> { |
| final List<String> values = headers.values(key); |
| // we ignore any headers with no actual values (rare) |
| if (!values.isEmpty()) { |
| // create a comma separated string from the values, this is stored in the map |
| final String value = StringUtils.join(values, MULTIPLE_HEADER_DELIMITER); |
| attributes.put(key, value); |
| } |
| }); |
| |
| final Handshake handshake = responseHttp.handshake(); |
| if (handshake != null) { |
| final Principal principal = handshake.peerPrincipal(); |
| if (principal != null) { |
| attributes.put(REMOTE_DN, principal.getName()); |
| } |
| } |
| |
| return attributes; |
| } |
| |
| private Charset getCharsetFromMediaType(MediaType contentType) { |
| return contentType != null ? contentType.charset(StandardCharsets.UTF_8) : StandardCharsets.UTF_8; |
| } |
| |
| /** |
| * Retrieve the directory in which OkHttp should cache responses. This method opts |
| * to use a temp directory to write the cache, which means that the cache will be written |
| * to a new location each time this processor is scheduled. |
| * <p> |
| * Ref: https://github.com/square/okhttp/wiki/Recipes#response-caching |
| * |
| * @return the directory in which the ETag cache should be written |
| */ |
| private static File getETagCacheDir() throws IOException { |
| return Files.createTempDirectory(InvokeHTTP.class.getSimpleName()).toFile(); |
| } |
| } |