NIFI-9255: Support asynchronous session commits in ListenHTTP (#5419)
NIFI-9255: Support asynchronous session commits in ListenHTTP
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
index 0f6a51d..70ab08b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java
@@ -38,6 +38,7 @@
import org.apache.nifi.util.FlowFileUnpackagerV3;
import org.eclipse.jetty.server.Request;
+import javax.servlet.AsyncContext;
import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
@@ -242,7 +243,7 @@
private void handleException(final HttpServletRequest request, final HttpServletResponse response,
final ProcessSession session, final String foundSubject, final String foundIssuer, final Throwable t) throws IOException {
session.rollback();
- logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", new Object[]{request.getRemoteHost(), foundSubject, foundIssuer, t});
+ logger.error("Unable to receive file from Remote Host: [{}] SubjectDN [{}] IssuerDN [{}] due to {}", request.getRemoteHost(), foundSubject, foundIssuer, t);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, t.toString());
}
@@ -410,15 +411,25 @@
response.getOutputStream().write(ackUri.getBytes("UTF-8"));
if (logger.isDebugEnabled()) {
logger.debug("Ingested {} from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; placed hold on these {} files with ID {}",
- new Object[]{flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid});
+ flowFileSet, request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, flowFileSet.size(), uuid);
}
} else {
- response.setStatus(this.returnCode);
logger.info("Received from Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]; transferring to 'success'",
- new Object[]{request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer});
+ request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer);
session.transfer(flowFileSet, ListenHTTP.RELATIONSHIP_SUCCESS);
- session.commitAsync();
+
+ final AsyncContext asyncContext = request.startAsync();
+ session.commitAsync(() -> {
+ response.setStatus(this.returnCode);
+ asyncContext.complete();
+ }, t -> {
+ logger.error("Failed to commit session. Returning error response to Remote Host: [{}] Port [{}] SubjectDN [{}] IssuerDN [{}]",
+ request.getRemoteHost(), request.getRemotePort(), foundSubject, foundIssuer, t);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ asyncContext.complete();
+ }
+ );
}
}