NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()

NIFI-8120 Renamed exception variable and reordered log statements

This closes #4747.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
index 41c6ecd..a0d3f4f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
@@ -65,8 +65,6 @@
 @SeeAlso(value = {HandleHttpRequest.class}, classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"})
 public class HandleHttpResponse extends AbstractProcessor {
 
-    public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+");
-
     public static final PropertyDescriptor STATUS_CODE = new PropertyDescriptor.Builder()
             .name("HTTP Status Code")
             .description("The HTTP Status Code to use when responding to the HTTP Request. See Section 10 of RFC 2616 for more information.")
@@ -136,25 +134,25 @@
 
         final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
         if (contextIdentifier == null) {
-            session.transfer(flowFile, REL_FAILURE);
             getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an '" + HTTPUtils.HTTP_CONTEXT_ID + "' attribute",
                     new Object[]{flowFile});
+            session.transfer(flowFile, REL_FAILURE);
             return;
         }
 
         final String statusCodeValue = context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue();
         if (!isNumber(statusCodeValue)) {
-            session.transfer(flowFile, REL_FAILURE);
             getLogger().error("Failed to respond to HTTP request for {} because status code was '{}', which is not a valid number", new Object[]{flowFile, statusCodeValue});
+            session.transfer(flowFile, REL_FAILURE);
             return;
         }
 
         final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
         final HttpServletResponse response = contextMap.getResponse(contextIdentifier);
         if (response == null) {
-            session.transfer(flowFile, REL_FAILURE);
             getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' attribute of {} but could not find an HTTP Response Object for this identifier",
                     new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier});
+            session.transfer(flowFile, REL_FAILURE);
             return;
         }
 
@@ -192,27 +190,31 @@
             session.exportTo(flowFile, response.getOutputStream());
             response.flushBuffer();
         } catch (final ProcessException e) {
-            session.transfer(flowFile, REL_FAILURE);
             getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
-            contextMap.complete(contextIdentifier);
+            try {
+                contextMap.complete(contextIdentifier);
+            } catch (final RuntimeException ce) {
+                getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce});
+            }
+            session.transfer(flowFile, REL_FAILURE);
             return;
         } catch (final Exception e) {
-            session.transfer(flowFile, REL_FAILURE);
             getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
             return;
         }
 
         try {
             contextMap.complete(contextIdentifier);
-        } catch (final IllegalStateException ise) {
-            getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ise});
+        } catch (final RuntimeException ce) {
+            getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce});
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
 
         session.getProvenanceReporter().send(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-        session.transfer(flowFile, REL_SUCCESS);
         getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[]{flowFile, statusCode});
+        session.transfer(flowFile, REL_SUCCESS);
     }
 
     private static boolean isNumber(final String value) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
index 02b1d1c..a2d6b28 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
@@ -49,46 +49,52 @@
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 public class TestHandleHttpResponse {
 
+    private static final String CONTEXT_MAP_ID = MockHttpContextMap.class.getSimpleName();
+
+    private static final String HTTP_REQUEST_ID = "HTTP-Request-Identifier";
+
+    private static final int HTTP_STATUS_CREATED = HttpServletResponse.SC_CREATED;
+
+    private static final String FLOW_FILE_CONTENT = "TESTING";
+
     @Test
     public void testEnsureCompleted() throws InitializationException {
         final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
 
-        final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
-        runner.addControllerService("http-context-map", contextMap);
+        final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+        runner.addControllerService(CONTEXT_MAP_ID, contextMap);
         runner.enableControllerService(contextMap);
-        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
         runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
         runner.setProperty("my-attr", "${my-attr}");
         runner.setProperty("no-valid-attr", "${no-valid-attr}");
 
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
         attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
         attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
         attributes.put(HTTPUtils.HTTP_PORT, "8443");
         attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client");
         attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
         attributes.put("my-attr", "hello");
-        attributes.put("status.code", "201");
+        attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
 
-        runner.enqueue("hello".getBytes(), attributes);
+        runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
 
         runner.run();
 
         runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
-        assertTrue(runner.getProvenanceEvents().size() == 1);
+        assertEquals(1, runner.getProvenanceEvents().size());
         assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
         assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
 
-        assertEquals("hello", contextMap.baos.toString());
+        assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
         assertEquals("hello", contextMap.headersSent.get("my-attr"));
         assertNull(contextMap.headersSent.get("no-valid-attr"));
-        assertEquals(201, contextMap.statusCode);
+        assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
         assertEquals(1, contextMap.getCompletionCount());
         assertTrue(contextMap.headersWithNoValue.isEmpty());
     }
@@ -97,15 +103,15 @@
     public void testRegexHeaders() throws InitializationException {
         final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
 
-        final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
-        runner.addControllerService("http-context-map", contextMap);
+        final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+        runner.addControllerService(CONTEXT_MAP_ID, contextMap);
         runner.enableControllerService(contextMap);
-        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
         runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
         runner.setProperty(HandleHttpResponse.ATTRIBUTES_AS_HEADERS_REGEX, "^(my.*)$");
 
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
         attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
         attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
         attributes.put(HTTPUtils.HTTP_PORT, "8443");
@@ -113,43 +119,43 @@
         attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
         attributes.put("my-attr", "hello");
         attributes.put("my-blank-attr", "");
-        attributes.put("status.code", "201");
+        attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
 
-        runner.enqueue("hello".getBytes(), attributes);
+        runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
 
         runner.run();
 
         runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
-        assertTrue(runner.getProvenanceEvents().size() == 1);
+        assertEquals(1, runner.getProvenanceEvents().size());
         assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
         assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
 
-        assertEquals("hello", contextMap.baos.toString());
+        assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
         assertEquals("hello", contextMap.headersSent.get("my-attr"));
         assertNull(contextMap.headersSent.get("my-blank-attr"));
-        assertEquals(201, contextMap.statusCode);
+        assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
         assertEquals(1, contextMap.getCompletionCount());
         assertTrue(contextMap.headersWithNoValue.isEmpty());
     }
 
     @Test
-    public void testWithExceptionThrown() throws InitializationException {
+    public void testResponseFlowFileAccessException() throws InitializationException {
         final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
 
-        final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "FlowFileAccessException");
-        runner.addControllerService("http-context-map", contextMap);
+        final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new FlowFileAccessException("Access Problem"), null);
+        runner.addControllerService(CONTEXT_MAP_ID, contextMap);
         runner.enableControllerService(contextMap);
-        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
         runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
         runner.setProperty("my-attr", "${my-attr}");
         runner.setProperty("no-valid-attr", "${no-valid-attr}");
 
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
         attributes.put("my-attr", "hello");
-        attributes.put("status.code", "201");
+        attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
 
-        runner.enqueue("hello".getBytes(), attributes);
+        runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
 
         runner.run();
 
@@ -158,23 +164,23 @@
     }
 
     @Test
-    public void testCannotWriteResponse() throws InitializationException {
+    public void testResponseProcessException() throws InitializationException {
         final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
 
-        final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "ProcessException");
-        runner.addControllerService("http-context-map", contextMap);
+        final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), null);
+        runner.addControllerService(CONTEXT_MAP_ID, contextMap);
         runner.enableControllerService(contextMap);
-        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
         runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
         runner.setProperty("my-attr", "${my-attr}");
         runner.setProperty("no-valid-attr", "${no-valid-attr}");
 
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
         attributes.put("my-attr", "hello");
-        attributes.put("status.code", "201");
+        attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
 
-        runner.enqueue("hello".getBytes(), attributes);
+        runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
 
         runner.run();
 
@@ -183,20 +189,45 @@
     }
 
     @Test
+    public void testResponseProcessExceptionThenIllegalStateException() throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
+
+        final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), new IllegalStateException());
+        runner.addControllerService(CONTEXT_MAP_ID, contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
+        runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
+        runner.setProperty("my-attr", "${my-attr}");
+        runner.setProperty("no-valid-attr", "${no-valid-attr}");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
+        attributes.put("my-attr", "hello");
+        attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
+
+        runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
+        assertEquals(0, contextMap.getCompletionCount());
+    }
+
+    @Test
     public void testStatusCodeEmpty() throws InitializationException {
         final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
 
-        final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
-        runner.addControllerService("http-context-map", contextMap);
+        final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+        runner.addControllerService(CONTEXT_MAP_ID, contextMap);
         runner.enableControllerService(contextMap);
-        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+        runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
         runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
 
         final Map<String, String> attributes = new HashMap<>();
-        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+        attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
         attributes.put("my-attr", "hello");
 
-        runner.enqueue("hello".getBytes(), attributes);
+        runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
 
         runner.run();
 
@@ -208,16 +239,18 @@
 
         private final String id;
         private final AtomicInteger completedCount = new AtomicInteger(0);
-        private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
-        private final String shouldThrowExceptionClass;
+        private final Exception responseException;
+        private final RuntimeException completeException;
         private volatile int statusCode = -1;
 
         private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
 
-        public MockHttpContextMap(final String expectedIdentifier, final String shouldThrowExceptionClass) {
+        public MockHttpContextMap(final String expectedIdentifier, final Exception responseException, final RuntimeException completeException) {
             this.id = expectedIdentifier;
-            this.shouldThrowExceptionClass = shouldThrowExceptionClass;
+            this.responseException = responseException;
+            this.completeException = completeException;
         }
 
         @Override
@@ -233,11 +266,7 @@
 
             try {
                 final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
-                if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("FlowFileAccessException")) {
-                    Mockito.when(response.getOutputStream()).thenThrow(new FlowFileAccessException("exception"));
-                } else if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("ProcessException")) {
-                        Mockito.when(response.getOutputStream()).thenThrow(new ProcessException("exception"));
-                } else {
+                if (responseException == null) {
                     Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() {
                         @Override
                         public boolean isReady() {
@@ -249,43 +278,39 @@
                         }
 
                         @Override
-                        public void write(int b) throws IOException {
-                            baos.write(b);
+                        public void write(int b) {
+                            outputStream.write(b);
                         }
 
                         @Override
                         public void write(byte[] b) throws IOException {
-                            baos.write(b);
+                            outputStream.write(b);
                         }
 
                         @Override
-                        public void write(byte[] b, int off, int len) throws IOException {
-                            baos.write(b, off, len);
+                        public void write(byte[] b, int off, int len) {
+                            outputStream.write(b, off, len);
                         }
                     });
+                } else {
+                    Mockito.when(response.getOutputStream()).thenThrow(responseException);
                 }
 
-                Mockito.doAnswer(new Answer<Object>() {
-                    @Override
-                    public Object answer(final InvocationOnMock invocation) throws Throwable {
-                        final String key = invocation.getArgument(0);
-                        final String value = invocation.getArgument(1);
-                        if (value == null) {
-                            headersWithNoValue.add(key);
-                        } else {
-                            headersSent.put(key, value);
-                        }
-
-                        return null;
+                Mockito.doAnswer(invocation -> {
+                    final String key = invocation.getArgument(0);
+                    final String value = invocation.getArgument(1);
+                    if (value == null) {
+                        headersWithNoValue.add(key);
+                    } else {
+                        headersSent.put(key, value);
                     }
+
+                    return null;
                 }).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class));
 
-                Mockito.doAnswer(new Answer<Object>() {
-                    @Override
-                    public Object answer(final InvocationOnMock invocation) throws Throwable {
-                        statusCode = invocation.getArgument(0);
-                        return null;
-                    }
+                Mockito.doAnswer(invocation -> {
+                    statusCode = invocation.getArgument(0);
+                    return null;
                 }).when(response).setStatus(Mockito.anyInt());
 
                 return response;
@@ -302,6 +327,10 @@
                 Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier);
             }
 
+            if (completeException != null) {
+                throw completeException;
+            }
+
             completedCount.incrementAndGet();
         }