blob: d172033d7b94e2a9eefbe2aa6a8b23e472fddb06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_HOST;
import static org.apache.nifi.processors.standard.util.HTTPUtils.PROXY_PORT;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.security.Principal;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.auth.x500.X500Principal;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response.Status;
import org.apache.http.Header;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.NoHttpResponseException;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.ContentProducer;
import org.apache.http.entity.EntityTemplate;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.apache.http.util.EntityUtils;
import org.apache.http.util.VersionInfo;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
import org.apache.nifi.stream.io.StreamThrottler;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFilePackager;
import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2;
import org.apache.nifi.util.FlowFilePackagerV3;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
@Deprecated
@DeprecationNotice(alternatives = {InvokeHTTP.class}, reason = "This processor is deprecated and may be removed in future releases.")
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"http", "https", "remote", "copy", "archive"})
@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use InvokeHTTP instead. Performs an HTTP Post with the content of the FlowFile. "
+ "Uses a connection pool with max number of connections equal to "
+ "the number of possible endpoints multiplied by the Concurrent Tasks configuration.")
public class PostHTTP extends AbstractProcessor {
public static final String CONTENT_TYPE_HEADER = "Content-Type";
public static final String ACCEPT = "Accept";
public static final String ACCEPT_ENCODING = "Accept-Encoding";
public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";
public static final String FLOWFILE_CONFIRMATION_HEADER = "x-prefer-acknowledge-uri";
public static final String LOCATION_HEADER_NAME = "Location";
public static final String LOCATION_URI_INTENT_NAME = "x-location-uri-intent";
public static final String LOCATION_URI_INTENT_VALUE = "flowfile-hold";
public static final String GZIPPED_HEADER = "flowfile-gzipped";
public static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
public static final String CONTENT_ENCODING_GZIP_VALUE = "gzip";
public static final String PROTOCOL_VERSION_HEADER = "x-nifi-transfer-protocol-version";
public static final String TRANSACTION_ID_HEADER = "x-nifi-transaction-id";
public static final String PROTOCOL_VERSION = "3";
public static final String REMOTE_DN = "remote.dn";
private static final String FLOW_FILE_CONNECTION_LOG = "Connection to URI {} will be using Content Type {} if sending data as FlowFile";
public static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
.name("URL")
.description("The URL to POST to. The URL may be defined using the Attribute Expression Language. "
+ "A separate connection pool will be created for each unique host:port combination.")
.required(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("https?\\://.*")))
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SEND_AS_FLOWFILE = new PropertyDescriptor.Builder()
.name("Send as FlowFile")
.description("If true, will package the FlowFile's contents and attributes together and send the FlowFile Package; otherwise, will send only the FlowFile's content")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("Connection Timeout")
.description("How long to wait when attempting to connect to the remote server before giving up")
.required(true)
.defaultValue("30 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder()
.name("Data Timeout")
.description("How long to wait between receiving segments of data from the remote server before giving up and discarding the partial file")
.required(true)
.defaultValue("30 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("Username required to access the URL")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("Password required to access the URL")
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USER_AGENT = new PropertyDescriptor.Builder()
.name("User Agent")
.description("What to report as the User Agent when we connect to the remote server")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue(VersionInfo.getUserAgent("Apache-HttpClient", "org.apache.http.client", HttpClientBuilder.class))
.build();
public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
.name("Compression Level")
.description("Determines the GZIP Compression Level to use when sending the file; the value must be in the range of 0-9. A value of 0 indicates that the file will not be GZIP'ed")
.required(true)
.addValidator(StandardValidators.createLongValidator(0, 9, true))
.defaultValue("0")
.build();
public static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new PropertyDescriptor.Builder()
.name("Attributes to Send as HTTP Headers (Regex)")
.description("Specifies the Regular Expression that determines the names of FlowFile attributes that should be sent as HTTP Headers")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder()
.name("Max Data to Post per Second")
.description("The maximum amount of data to send per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size")
.description("If the Send as FlowFile property is true, specifies the max data size for a batch of FlowFiles to send in a single "
+ "HTTP POST. If not specified, each FlowFile will be sent separately. If the Send as FlowFile property is false, this "
+ "property is ignored")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("100 MB")
.build();
public static final PropertyDescriptor CHUNKED_ENCODING = new PropertyDescriptor.Builder()
.name("Use Chunked Encoding")
.description("Specifies whether or not to use Chunked Encoding to send the data. This property is ignored in the event the contents are compressed "
+ "or sent as FlowFiles.")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content-Type")
.description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. "
+ "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Files that are successfully send will be transferred to success")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Files that fail to send will transferred to failure")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>();
private final ConcurrentMap<String, DestinationAccepts> destinationAcceptsMap = new ConcurrentHashMap<>();
private volatile PoolingHttpClientConnectionManager connManager;
private volatile CloseableHttpClient client;
private volatile RequestConfig requestConfig;
// this is used when creating thet HttpContext, which is a thread local variable that is used by
// HTTPClient to obtain an available, reusable connection
private volatile Principal principal;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(URL);
properties.add(MAX_BATCH_SIZE);
properties.add(MAX_DATA_RATE);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(USERNAME);
properties.add(PASSWORD);
properties.add(SEND_AS_FLOWFILE);
properties.add(CHUNKED_ENCODING);
properties.add(COMPRESSION_LEVEL);
properties.add(CONNECTION_TIMEOUT);
properties.add(DATA_TIMEOUT);
properties.add(ATTRIBUTES_AS_HEADERS_REGEX);
properties.add(USER_AGENT);
properties.add(HTTPUtils.PROXY_CONFIGURATION_SERVICE);
properties.add(PROXY_HOST);
properties.add(PROXY_PORT);
properties.add(CONTENT_TYPE);
this.properties = Collections.unmodifiableList(properties);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
if (context.getProperty(URL).getValue().startsWith("https") && context.getProperty(SSL_CONTEXT_SERVICE).getValue() == null) {
results.add(new ValidationResult.Builder()
.explanation("URL is set to HTTPS protocol but no SSLContext has been specified")
.valid(false).subject("SSL Context").build());
}
boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
boolean chunkedSet = context.getProperty(CHUNKED_ENCODING).isSet();
if (compressionLevel == 0 && !sendAsFlowFile && !chunkedSet) {
results.add(new ValidationResult.Builder().valid(false).subject(CHUNKED_ENCODING.getName())
.explanation("if compression level is 0 and not sending as a FlowFile, then the \'" + CHUNKED_ENCODING.getName() + "\' property must be set").build());
}
HTTPUtils.validateProxyProperties(context, results);
return results;
}
@OnStopped
public void onStopped() {
destinationAcceptsMap.clear();
try {
connManager.shutdown();
client.close();
} catch (IOException e) {
getLogger().error("Could not properly shutdown connections", e);
}
final StreamThrottler throttler = throttlerRef.getAndSet(null);
if (throttler != null) {
try {
throttler.close();
} catch (IOException e) {
getLogger().error("Failed to close StreamThrottler", e);
}
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
final Double bytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B);
this.throttlerRef.set(bytesPerSecond == null ? null : new LeakyBucketStreamThrottler(bytesPerSecond.intValue()));
String hostname = "unknown";
try {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException ignore) {}
principal = new X500Principal("CN=" + hostname + ", OU=unknown, O=unknown, C=unknown");
// setup the PoolingHttpClientConnectionManager
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService == null) {
connManager = new PoolingHttpClientConnectionManager();
} else {
final SSLContext sslContext;
try {
sslContext = sslContextService.createContext();
} catch (final Exception e) {
throw new ProcessException(e);
}
final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext);
// Also use a plain socket factory for regular http connections (especially proxies)
final Registry<ConnectionSocketFactory> socketFactoryRegistry =
RegistryBuilder.<ConnectionSocketFactory>create()
.register("https", sslsf)
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.build();
connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
}
// setup SocketConfig
SocketConfig.Builder socketConfigBuilder = SocketConfig.custom();
socketConfigBuilder.setSoTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
SocketConfig socketConfig = socketConfigBuilder.build();
connManager.setDefaultSocketConfig(socketConfig);
// the +1 here accommodates math error calculating excess connections in AbstractConnPool.getPoolEntryBlocking()
connManager.setDefaultMaxPerRoute(context.getMaxConcurrentTasks() + 1);
// max total connections will get set in onTrigger(), because a new route will require increasing this
connManager.setMaxTotal(1);
// enable inactivity check, to detect and close idle connections
connManager.setValidateAfterInactivity(30_000);
// setup the HttpClientBuilder
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
clientBuilder.setConnectionManager(connManager);
clientBuilder.setUserAgent(context.getProperty(USER_AGENT).getValue());
clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
@Override
public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
final HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
final ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
if (!conn.isOpen()) {
return;
}
final SSLSession sslSession = conn.getSSLSession();
if (sslSession != null) {
final Certificate[] certChain = sslSession.getPeerCertificates();
if (certChain == null || certChain.length == 0) {
throw new SSLPeerUnverifiedException("No certificates found");
}
try {
final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
httpContext.setAttribute(REMOTE_DN, cert.getSubjectDN().getName().trim());
} catch (CertificateException e) {
final String msg = "Could not extract subject DN from SSL session peer certificate";
getLogger().warn(msg);
throw new SSLPeerUnverifiedException(msg);
}
}
}
});
HttpRequestRetryHandler retryHandler = (exception, attempt, httpContext) -> {
if (attempt > 3 || !isScheduled()) {
return false;
}
final HttpClientContext clientContext = HttpClientContext.adapt(httpContext);
// A heavily loaded remote listener can manifest as NoHttpResponseExceptions here.
// When this happens, take a 5 second snooze before retrying to give the remote a short break.
if (exception instanceof NoHttpResponseException) {
if (getLogger().isDebugEnabled()) {
getLogger().debug("Sleeping for 5 secs then retrying {} request for remote server {}",
new Object[]{clientContext.getRequest().getRequestLine().getMethod(), clientContext.getTargetHost()});
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
return false;
}
return true;
}
// do not retry more serious exceptions
return false;
};
clientBuilder.setRetryHandler(retryHandler);
clientBuilder.disableContentCompression();
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
// set the credentials if appropriate
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
if (username != null) {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
}
// Set the proxy if specified
HTTPUtils.setProxy(context, clientBuilder, credentialsProvider);
// complete the HTTPClient build
client = clientBuilder.build();
// setup RequestConfig
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
requestConfigBuilder.setRedirectsEnabled(false);
requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
requestConfig = requestConfigBuilder.build();
}
private String getBaseUrl(final String url) {
final int index = url.indexOf("/", 9);
if (index < 0) {
return url;
}
return url.substring(0, index);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile firstFlowFile = session.get();
if (firstFlowFile == null) {
return;
}
final ComponentLog logger = getLogger();
final String url = context.getProperty(URL).evaluateAttributeExpressions(firstFlowFile).getValue();
try {
new java.net.URL(url);
} catch (final MalformedURLException e) {
logger.error("After substituting attribute values for {}, URL is {}; this is not a valid URL, so routing to failure",
new Object[]{firstFlowFile, url});
firstFlowFile = session.penalize(firstFlowFile);
session.transfer(firstFlowFile, REL_FAILURE);
return;
}
final List<FlowFile> toSend = new ArrayList<>();
toSend.add(firstFlowFile);
final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
final StreamThrottler throttler = throttlerRef.get();
final Double maxBatchBytes = context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B);
final AtomicLong bytesToSend = new AtomicLong(firstFlowFile.getSize());
DestinationAccepts destinationAccepts = null;
final String transactionId = UUID.randomUUID().toString();
final HttpClientContext httpClientContext = HttpClientContext.create();
httpClientContext.setUserToken(principal);
// determine whether or not destination accepts flowfile/gzip
final String baseUrl = getBaseUrl(url);
destinationAccepts = destinationAcceptsMap.get(baseUrl);
if (destinationAccepts == null) {
try {
destinationAccepts = getDestinationAcceptance(sendAsFlowFile, url, transactionId, httpClientContext);
if (null == destinationAcceptsMap.putIfAbsent(baseUrl, destinationAccepts)) {
// url indicates a new route, so increase the max allowed open connections
connManager.setMaxTotal(connManager.getMaxTotal() + connManager.getDefaultMaxPerRoute());
}
} catch (final IOException e) {
firstFlowFile = session.penalize(firstFlowFile);
session.transfer(firstFlowFile, REL_FAILURE);
logger.error("Unable to communicate with destination {} to determine whether or not it can accept "
+ "flowfiles/gzip; routing {} to failure due to {}", new Object[]{url, firstFlowFile, e});
return;
}
}
// if we are sending as flowfile and the destination accepts V3 or V2 (streaming) format,
// then we can get more flowfiles from the session up to MAX_BATCH_SIZE for the same URL
if (sendAsFlowFile && (destinationAccepts.isFlowFileV3Accepted() || destinationAccepts.isFlowFileV2Accepted())) {
toSend.addAll(session.get(new FlowFileFilter() {
@Override
public FlowFileFilterResult filter(FlowFile flowFile) {
// if over MAX_BATCH_SIZE, then stop adding files
if (bytesToSend.get() + flowFile.getSize() > maxBatchBytes) {
return FlowFileFilterResult.REJECT_AND_TERMINATE;
}
// check URL to see if this flowfile can be included in the batch
final String urlToCheck = context.getProperty(URL).evaluateAttributeExpressions(flowFile).getValue();
if (url.equals(urlToCheck)) {
bytesToSend.addAndGet(flowFile.getSize());
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
} else {
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
}
}));
}
final HttpPost post = new HttpPost(url);
final DestinationAccepts accepts = destinationAccepts;
final boolean isDestinationLegacyNiFi = accepts.getProtocolVersion() == null;
final EntityTemplate entity = new EntityTemplate(new ContentProducer() {
@Override
public void writeTo(final OutputStream rawOut) throws IOException {
final OutputStream throttled = throttler == null ? rawOut : throttler.newThrottledOutputStream(rawOut);
OutputStream wrappedOut = new BufferedOutputStream(throttled);
if (compressionLevel > 0 && accepts.isGzipAccepted()) {
wrappedOut = new GZIPOutputStream(wrappedOut, compressionLevel);
}
try (final OutputStream out = wrappedOut) {
final FlowFilePackager packager;
if (!sendAsFlowFile) {
packager = null;
} else if (accepts.isFlowFileV3Accepted()) {
packager = new FlowFilePackagerV3();
} else if (accepts.isFlowFileV2Accepted()) {
packager = new FlowFilePackagerV2();
} else if (accepts.isFlowFileV1Accepted()) {
packager = new FlowFilePackagerV1();
} else {
packager = null;
}
for (final FlowFile flowFile : toSend) {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
// if none of the above conditions is met, we should never get here, because
// we will have already verified that at least 1 of the FlowFile packaging
// formats is acceptable if sending as FlowFile.
if (packager == null) {
StreamUtils.copy(in, out);
} else {
final Map<String, String> flowFileAttributes;
if (isDestinationLegacyNiFi) {
// Old versions of NiFi expect nf.file.name and nf.file.path to indicate filename & path;
// in order to maintain backward compatibility, we copy the filename & path to those attribute keys.
flowFileAttributes = new HashMap<>(flowFile.getAttributes());
flowFileAttributes.put("nf.file.name", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
flowFileAttributes.put("nf.file.path", flowFile.getAttribute(CoreAttributes.PATH.key()));
} else {
flowFileAttributes = flowFile.getAttributes();
}
packager.packageFlowFile(in, out, flowFileAttributes, flowFile.getSize());
}
}
}
});
}
out.flush();
} catch (ProcessException pe) {
// Pull out IOExceptions so that HTTPClient can properly do what it needs to do
Throwable t = pe.getCause();
if (t != null && t instanceof IOException) {
IOException ioe = new IOException(t.getMessage());
ioe.setStackTrace(t.getStackTrace());
throw ioe;
}
throw pe;
}
}
}) {
@Override
public long getContentLength() {
if (compressionLevel == 0 && !sendAsFlowFile && !context.getProperty(CHUNKED_ENCODING).asBoolean()) {
return toSend.get(0).getSize();
} else {
return -1;
}
}
};
final String flowFileDescription = toSend.size() <= 10 ? toSend.toString() : toSend.size() + " FlowFiles";
if (context.getProperty(CHUNKED_ENCODING).isSet()) {
entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
}
post.setEntity(entity);
post.setConfig(requestConfig);
final String contentType;
if (sendAsFlowFile) {
if (accepts.isFlowFileV3Accepted()) {
contentType = StandardFlowFileMediaType.VERSION_3.getMediaType();
} else if (accepts.isFlowFileV2Accepted()) {
contentType = StandardFlowFileMediaType.VERSION_2.getMediaType();
} else if (accepts.isFlowFileV1Accepted()) {
contentType = StandardFlowFileMediaType.VERSION_1.getMediaType();
} else {
logger.error("Cannot send {} to {} because the destination does not accept FlowFiles and this processor is "
+ "configured to deliver FlowFiles; routing to failure",
new Object[] {flowFileDescription, url});
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
return;
}
} else {
final String contentTypeValue = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(toSend.get(0)).getValue();
contentType = StringUtils.isBlank(contentTypeValue) ? DEFAULT_CONTENT_TYPE : contentTypeValue;
}
final String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();
if (attributeHeaderRegex != null && !sendAsFlowFile && toSend.size() == 1) {
final Pattern pattern = Pattern.compile(attributeHeaderRegex);
final Map<String, String> attributes = toSend.get(0).getAttributes();
for (final Map.Entry<String, String> entry : attributes.entrySet()) {
final String key = entry.getKey();
if (pattern.matcher(key).matches()) {
post.setHeader(entry.getKey(), entry.getValue());
}
}
}
post.setHeader(CONTENT_TYPE_HEADER, contentType);
post.setHeader(FLOWFILE_CONFIRMATION_HEADER, "true");
post.setHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION);
post.setHeader(TRANSACTION_ID_HEADER, transactionId);
if (compressionLevel > 0 && accepts.isGzipAccepted()) {
if (sendAsFlowFile) {
post.setHeader(GZIPPED_HEADER, "true");
} else {
post.setHeader(CONTENT_ENCODING_HEADER, CONTENT_ENCODING_GZIP_VALUE);
}
}
// Do the actual POST
final String uploadDataRate;
final long uploadMillis;
CloseableHttpResponse response = null;
try {
final StopWatch stopWatch = new StopWatch(true);
response = client.execute(post, httpClientContext);
stopWatch.stop();
uploadDataRate = stopWatch.calculateDataRate(bytesToSend.get());
uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
} catch (final IOException | ProcessException e) {
logger.error("Failed to Post {} due to {}; transferring to failure", new Object[]{flowFileDescription, e});
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
return;
} finally {
if (response != null) {
try {
// consume input stream entirely, ignoring its contents. If we
// don't do this, the Connection will not be returned to the pool
EntityUtils.consume(response.getEntity());
} catch (final IOException ignore) {
}
}
}
// If we get a 'SEE OTHER' status code and an HTTP header that indicates that the intent
// of the Location URI is a flowfile hold, we will store this holdUri. This prevents us
// from posting to some other webservice and then attempting to delete some resource to which
// we are redirected
final int responseCode = response.getStatusLine().getStatusCode();
final String responseReason = response.getStatusLine().getReasonPhrase();
String holdUri = null;
if (responseCode == HttpServletResponse.SC_SEE_OTHER) {
final Header locationUriHeader = response.getFirstHeader(LOCATION_URI_INTENT_NAME);
if (locationUriHeader != null) {
if (LOCATION_URI_INTENT_VALUE.equals(locationUriHeader.getValue())) {
final Header holdUriHeader = response.getFirstHeader(LOCATION_HEADER_NAME);
if (holdUriHeader != null) {
holdUri = holdUriHeader.getValue();
}
}
}
if (holdUri == null) {
logger.error("Failed to Post {} to {}: sent content and received status code {}:{} but no Hold URI",
new Object[]{flowFileDescription, url, responseCode, responseReason});
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
return;
}
}
if (holdUri == null) {
if (responseCode == HttpServletResponse.SC_SERVICE_UNAVAILABLE) {
logger.error("Failed to Post {} to {}: response code was {}:{}",
new Object[]{flowFileDescription, url, responseCode, responseReason});
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
return;
}
if (responseCode >= 300) {
logger.error("Failed to Post {} to {}: response code was {}:{}",
new Object[]{flowFileDescription, url, responseCode, responseReason});
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
return;
}
logger.info("Successfully Posted {} to {} in {} at a rate of {}",
new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate});
for (final FlowFile flowFile : toSend) {
session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + httpClientContext.getAttribute(REMOTE_DN), uploadMillis, true);
session.transfer(flowFile, REL_SUCCESS);
}
return;
}
//
// the response indicated a Hold URI; delete the Hold.
//
// determine the full URI of the Flow File's Hold; Unfortunately, the responses that are returned have
// changed over the past, so we have to take into account a few different possibilities.
String fullHoldUri = holdUri;
if (holdUri.startsWith("/contentListener")) {
// If the Hold URI that we get starts with /contentListener, it may not really be /contentListener,
// as this really indicates that it should be whatever we posted to -- if posting directly to the
// ListenHTTP component, it will be /contentListener, but if posting to a proxy/load balancer, we may
// be posting to some other URL.
fullHoldUri = url + holdUri.substring(16);
} else if (holdUri.startsWith("/")) {
// URL indicates the full path but not hostname or port; use the same hostname & port that we posted
// to but use the full path indicated by the response.
int firstSlash = url.indexOf("/", 8);
if (firstSlash < 0) {
firstSlash = url.length();
}
final String beforeSlash = url.substring(0, firstSlash);
fullHoldUri = beforeSlash + holdUri;
} else if (!holdUri.startsWith("http")) {
// Absolute URL
fullHoldUri = url + (url.endsWith("/") ? "" : "/") + holdUri;
}
final HttpDelete delete = new HttpDelete(fullHoldUri);
delete.setHeader(TRANSACTION_ID_HEADER, transactionId);
delete.setConfig(requestConfig);
HttpResponse holdResponse = null;
try {
holdResponse = client.execute(delete, httpClientContext);
final int holdStatusCode = holdResponse.getStatusLine().getStatusCode();
final String holdReason = holdResponse.getStatusLine().getReasonPhrase();
if (holdStatusCode >= 300) {
logger.error("Failed to delete Hold that destination placed on {}: got response code {}:{}; routing to failure",
new Object[]{flowFileDescription, holdStatusCode, holdReason});
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
return;
}
logger.info("Successfully Posted {} to {} in {} at a rate of {}",
new Object[]{flowFileDescription, url, FormatUtils.formatMinutesSeconds(uploadMillis, TimeUnit.MILLISECONDS), uploadDataRate});
for (final FlowFile flowFile : toSend) {
session.getProvenanceReporter().send(flowFile, url, "Remote DN=" + httpClientContext.getAttribute(REMOTE_DN), uploadMillis, true);
session.transfer(flowFile, REL_SUCCESS);
}
return;
} catch (final IOException e) {
logger.warn("Failed to delete Hold that destination placed on {} due to {}; routing to failure", new Object[]{flowFileDescription, e});
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
} finally {
if (null != holdResponse) {
try {
// consume input stream entirely, ignoring its contents. If we
// don't do this, the Connection will not be returned to the pool
EntityUtils.consume(holdResponse.getEntity());
} catch (IOException ignore) {}
}
}
}
private DestinationAccepts getDestinationAcceptance(final boolean sendAsFlowFile, final String uri, final String transactionId, final HttpContext httpContext) throws IOException {
final HttpHead head = new HttpHead(uri);
head.setConfig(requestConfig);
if (sendAsFlowFile) {
head.addHeader(TRANSACTION_ID_HEADER, transactionId);
}
final HttpResponse response = client.execute(head, httpContext);
// we assume that the destination can support FlowFile v1 always when the processor is also configured to send as a FlowFile
// otherwise, we do not bother to make any determinations concerning this compatibility
final boolean acceptsFlowFileV1 = sendAsFlowFile;
boolean acceptsFlowFileV2 = false;
boolean acceptsFlowFileV3 = false;
boolean acceptsGzip = false;
Integer protocolVersion = null;
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == Status.METHOD_NOT_ALLOWED.getStatusCode()) {
return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, false, null);
} else if (statusCode == Status.OK.getStatusCode()) {
Header[] headers = response.getHeaders(ACCEPT);
// If configured to send as a flowfile, determine the capabilities of the endpoint
if (sendAsFlowFile) {
if (headers != null) {
for (final Header header : headers) {
for (final String accepted : header.getValue().split(",")) {
final String trimmed = accepted.trim();
if (trimmed.equals(StandardFlowFileMediaType.VERSION_3.getMediaType())) {
acceptsFlowFileV3 = true;
} else if (trimmed.equals(StandardFlowFileMediaType.VERSION_2.getMediaType())) {
acceptsFlowFileV2 = true;
}
}
}
}
final Header destinationVersion = response.getFirstHeader(PROTOCOL_VERSION_HEADER);
if (destinationVersion != null) {
try {
protocolVersion = Integer.valueOf(destinationVersion.getValue());
} catch (final NumberFormatException e) {
// nothing to do here really.... it's an invalid value, so treat the same as if not specified
}
}
if (getLogger().isDebugEnabled()) {
if (acceptsFlowFileV3) {
getLogger().debug(FLOW_FILE_CONNECTION_LOG, new Object[]{uri, StandardFlowFileMediaType.VERSION_3.getMediaType()});
} else if (acceptsFlowFileV2) {
getLogger().debug(FLOW_FILE_CONNECTION_LOG, new Object[]{uri, StandardFlowFileMediaType.VERSION_2.getMediaType()});
} else if (acceptsFlowFileV1) {
getLogger().debug(FLOW_FILE_CONNECTION_LOG, new Object[]{uri, StandardFlowFileMediaType.VERSION_1.getMediaType()});
}
}
}
headers = response.getHeaders(ACCEPT_ENCODING);
if (headers != null) {
for (final Header header : headers) {
for (final String accepted : header.getValue().split(",")) {
if (accepted.equalsIgnoreCase("gzip")) {
acceptsGzip = true;
}
}
}
}
if (getLogger().isDebugEnabled()) {
if (acceptsGzip) {
getLogger().debug("Connection to URI " + uri + " indicates that inline GZIP compression is supported");
} else {
getLogger().debug("Connection to URI " + uri + " indicates that it does NOT support inline GZIP compression");
}
}
return new DestinationAccepts(acceptsFlowFileV3, acceptsFlowFileV2, acceptsFlowFileV1, acceptsGzip, protocolVersion);
} else {
getLogger().warn("Unable to communicate with destination; when attempting to perform an HTTP HEAD, got unexpected response code of "
+ statusCode + ": " + response.getStatusLine().getReasonPhrase());
return new DestinationAccepts(false, false, false, false, null);
}
}
private static class DestinationAccepts {
private final boolean flowFileV1;
private final boolean flowFileV2;
private final boolean flowFileV3;
private final boolean gzip;
private final Integer protocolVersion;
public DestinationAccepts(final boolean flowFileV3, final boolean flowFileV2, final boolean flowFileV1, final boolean gzip, final Integer protocolVersion) {
this.flowFileV3 = flowFileV3;
this.flowFileV2 = flowFileV2;
this.flowFileV1 = flowFileV1;
this.gzip = gzip;
this.protocolVersion = protocolVersion;
}
public boolean isFlowFileV3Accepted() {
return flowFileV3;
}
public boolean isFlowFileV2Accepted() {
return flowFileV2;
}
public boolean isFlowFileV1Accepted() {
return flowFileV1;
}
public boolean isGzipAccepted() {
return gzip;
}
public Integer getProtocolVersion() {
return protocolVersion;
}
}
}