NIFI-8120 Added RuntimeException handling on HttpContextMap.complete()
NIFI-8120 Renamed exception variable and reordered log statements
This closes #4747.
Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
index 41c6ecd..a0d3f4f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpResponse.java
@@ -65,8 +65,6 @@
@SeeAlso(value = {HandleHttpRequest.class}, classNames = {"org.apache.nifi.http.StandardHttpContextMap", "org.apache.nifi.ssl.StandardSSLContextService"})
public class HandleHttpResponse extends AbstractProcessor {
- public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+");
-
public static final PropertyDescriptor STATUS_CODE = new PropertyDescriptor.Builder()
.name("HTTP Status Code")
.description("The HTTP Status Code to use when responding to the HTTP Request. See Section 10 of RFC 2616 for more information.")
@@ -136,25 +134,25 @@
final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
if (contextIdentifier == null) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().warn("Failed to respond to HTTP request for {} because FlowFile did not have an '" + HTTPUtils.HTTP_CONTEXT_ID + "' attribute",
new Object[]{flowFile});
+ session.transfer(flowFile, REL_FAILURE);
return;
}
final String statusCodeValue = context.getProperty(STATUS_CODE).evaluateAttributeExpressions(flowFile).getValue();
if (!isNumber(statusCodeValue)) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} because status code was '{}', which is not a valid number", new Object[]{flowFile, statusCodeValue});
+ session.transfer(flowFile, REL_FAILURE);
return;
}
final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
final HttpServletResponse response = contextMap.getResponse(contextIdentifier);
if (response == null) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} because FlowFile had an '{}' attribute of {} but could not find an HTTP Response Object for this identifier",
new Object[]{flowFile, HTTPUtils.HTTP_CONTEXT_ID, contextIdentifier});
+ session.transfer(flowFile, REL_FAILURE);
return;
}
@@ -192,27 +190,31 @@
session.exportTo(flowFile, response.getOutputStream());
response.flushBuffer();
} catch (final ProcessException e) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
- contextMap.complete(contextIdentifier);
+ try {
+ contextMap.complete(contextIdentifier);
+ } catch (final RuntimeException ce) {
+ getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce});
+ }
+ session.transfer(flowFile, REL_FAILURE);
return;
} catch (final Exception e) {
- session.transfer(flowFile, REL_FAILURE);
getLogger().error("Failed to respond to HTTP request for {} due to {}", new Object[]{flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
return;
}
try {
contextMap.complete(contextIdentifier);
- } catch (final IllegalStateException ise) {
- getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ise});
+ } catch (final RuntimeException ce) {
+ getLogger().error("Failed to complete HTTP Transaction for {} due to {}", new Object[]{flowFile, ce});
session.transfer(flowFile, REL_FAILURE);
return;
}
session.getProvenanceReporter().send(flowFile, HTTPUtils.getURI(flowFile.getAttributes()), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully responded to HTTP Request for {} with status code {}", new Object[]{flowFile, statusCode});
+ session.transfer(flowFile, REL_SUCCESS);
}
private static boolean isNumber(final String value) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
index 02b1d1c..a2d6b28 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestHandleHttpResponse.java
@@ -49,46 +49,52 @@
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class TestHandleHttpResponse {
+ private static final String CONTEXT_MAP_ID = MockHttpContextMap.class.getSimpleName();
+
+ private static final String HTTP_REQUEST_ID = "HTTP-Request-Identifier";
+
+ private static final int HTTP_STATUS_CREATED = HttpServletResponse.SC_CREATED;
+
+ private static final String FLOW_FILE_CONTENT = "TESTING";
+
@Test
public void testEnsureCompleted() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client");
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
- attributes.put("status.code", "201");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
- assertTrue(runner.getProvenanceEvents().size() == 1);
+ assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
- assertEquals("hello", contextMap.baos.toString());
+ assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("no-valid-attr"));
- assertEquals(201, contextMap.statusCode);
+ assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
@@ -97,15 +103,15 @@
public void testRegexHeaders() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty(HandleHttpResponse.ATTRIBUTES_AS_HEADERS_REGEX, "^(my.*)$");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
@@ -113,43 +119,43 @@
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
attributes.put("my-blank-attr", "");
- attributes.put("status.code", "201");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
- assertTrue(runner.getProvenanceEvents().size() == 1);
+ assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
- assertEquals("hello", contextMap.baos.toString());
+ assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("my-blank-attr"));
- assertEquals(201, contextMap.statusCode);
+ assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
@Test
- public void testWithExceptionThrown() throws InitializationException {
+ public void testResponseFlowFileAccessException() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "FlowFileAccessException");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new FlowFileAccessException("Access Problem"), null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
- attributes.put("status.code", "201");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@@ -158,23 +164,23 @@
}
@Test
- public void testCannotWriteResponse() throws InitializationException {
+ public void testResponseProcessException() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "ProcessException");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
- attributes.put("status.code", "201");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@@ -183,20 +189,45 @@
}
@Test
+ public void testResponseProcessExceptionThenIllegalStateException() throws InitializationException {
+ final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), new IllegalStateException());
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
+ runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
+ runner.setProperty("my-attr", "${my-attr}");
+ runner.setProperty("no-valid-attr", "${no-valid-attr}");
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
+ attributes.put("my-attr", "hello");
+ attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
+
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
+ assertEquals(0, contextMap.getCompletionCount());
+ }
+
+ @Test
public void testStatusCodeEmpty() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
- final MockHttpContextMap contextMap = new MockHttpContextMap("my-id", "");
- runner.addControllerService("http-context-map", contextMap);
+ final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
+ runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
- runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, "http-context-map");
+ runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
final Map<String, String> attributes = new HashMap<>();
- attributes.put(HTTPUtils.HTTP_CONTEXT_ID, "my-id");
+ attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
- runner.enqueue("hello".getBytes(), attributes);
+ runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
@@ -208,16 +239,18 @@
private final String id;
private final AtomicInteger completedCount = new AtomicInteger(0);
- private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
- private final String shouldThrowExceptionClass;
+ private final Exception responseException;
+ private final RuntimeException completeException;
private volatile int statusCode = -1;
private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
- public MockHttpContextMap(final String expectedIdentifier, final String shouldThrowExceptionClass) {
+ public MockHttpContextMap(final String expectedIdentifier, final Exception responseException, final RuntimeException completeException) {
this.id = expectedIdentifier;
- this.shouldThrowExceptionClass = shouldThrowExceptionClass;
+ this.responseException = responseException;
+ this.completeException = completeException;
}
@Override
@@ -233,11 +266,7 @@
try {
final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
- if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("FlowFileAccessException")) {
- Mockito.when(response.getOutputStream()).thenThrow(new FlowFileAccessException("exception"));
- } else if(shouldThrowExceptionClass != null && shouldThrowExceptionClass.equals("ProcessException")) {
- Mockito.when(response.getOutputStream()).thenThrow(new ProcessException("exception"));
- } else {
+ if (responseException == null) {
Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() {
@Override
public boolean isReady() {
@@ -249,43 +278,39 @@
}
@Override
- public void write(int b) throws IOException {
- baos.write(b);
+ public void write(int b) {
+ outputStream.write(b);
}
@Override
public void write(byte[] b) throws IOException {
- baos.write(b);
+ outputStream.write(b);
}
@Override
- public void write(byte[] b, int off, int len) throws IOException {
- baos.write(b, off, len);
+ public void write(byte[] b, int off, int len) {
+ outputStream.write(b, off, len);
}
});
+ } else {
+ Mockito.when(response.getOutputStream()).thenThrow(responseException);
}
- Mockito.doAnswer(new Answer<Object>() {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable {
- final String key = invocation.getArgument(0);
- final String value = invocation.getArgument(1);
- if (value == null) {
- headersWithNoValue.add(key);
- } else {
- headersSent.put(key, value);
- }
-
- return null;
+ Mockito.doAnswer(invocation -> {
+ final String key = invocation.getArgument(0);
+ final String value = invocation.getArgument(1);
+ if (value == null) {
+ headersWithNoValue.add(key);
+ } else {
+ headersSent.put(key, value);
}
+
+ return null;
}).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class));
- Mockito.doAnswer(new Answer<Object>() {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable {
- statusCode = invocation.getArgument(0);
- return null;
- }
+ Mockito.doAnswer(invocation -> {
+ statusCode = invocation.getArgument(0);
+ return null;
}).when(response).setStatus(Mockito.anyInt());
return response;
@@ -302,6 +327,10 @@
Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier);
}
+ if (completeException != null) {
+ throw completeException;
+ }
+
completedCount.incrementAndGet();
}