blob: a2d6b2816aae87844db73fc5fd8d7f3b70abae3c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.util.HTTPUtils;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestHandleHttpResponse {
private static final String CONTEXT_MAP_ID = MockHttpContextMap.class.getSimpleName();
private static final String HTTP_REQUEST_ID = "HTTP-Request-Identifier";
private static final int HTTP_STATUS_CREATED = HttpServletResponse.SC_CREATED;
private static final String FLOW_FILE_CONTENT = "TESTING";
@Test
public void testEnsureCompleted() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client");
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("no-valid-attr"));
assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
@Test
public void testRegexHeaders() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty(HandleHttpResponse.ATTRIBUTES_AS_HEADERS_REGEX, "^(my.*)$");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put(HTTPUtils.HTTP_REQUEST_URI, "/test");
attributes.put(HTTPUtils.HTTP_LOCAL_NAME, "server");
attributes.put(HTTPUtils.HTTP_PORT, "8443");
attributes.put(HTTPUtils.HTTP_REMOTE_HOST, "client");
attributes.put(HTTPUtils.HTTP_SSL_CERT, "sslDN");
attributes.put("my-attr", "hello");
attributes.put("my-blank-attr", "");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_SUCCESS, 1);
assertEquals(1, runner.getProvenanceEvents().size());
assertEquals(ProvenanceEventType.SEND, runner.getProvenanceEvents().get(0).getEventType());
assertEquals("https://client@server:8443/test", runner.getProvenanceEvents().get(0).getTransitUri());
assertEquals(FLOW_FILE_CONTENT, contextMap.outputStream.toString());
assertEquals("hello", contextMap.headersSent.get("my-attr"));
assertNull(contextMap.headersSent.get("my-blank-attr"));
assertEquals(HTTP_STATUS_CREATED, contextMap.statusCode);
assertEquals(1, contextMap.getCompletionCount());
assertTrue(contextMap.headersWithNoValue.isEmpty());
}
@Test
public void testResponseFlowFileAccessException() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new FlowFileAccessException("Access Problem"), null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
assertEquals(0, contextMap.getCompletionCount());
}
@Test
public void testResponseProcessException() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
assertEquals(1, contextMap.getCompletionCount());
}
@Test
public void testResponseProcessExceptionThenIllegalStateException() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, new ProcessException(), new IllegalStateException());
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
runner.setProperty("my-attr", "${my-attr}");
runner.setProperty("no-valid-attr", "${no-valid-attr}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
attributes.put("status.code", Integer.toString(HTTP_STATUS_CREATED));
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
assertEquals(0, contextMap.getCompletionCount());
}
@Test
public void testStatusCodeEmpty() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpResponse.class);
final MockHttpContextMap contextMap = new MockHttpContextMap(HTTP_REQUEST_ID, null, null);
runner.addControllerService(CONTEXT_MAP_ID, contextMap);
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpResponse.HTTP_CONTEXT_MAP, CONTEXT_MAP_ID);
runner.setProperty(HandleHttpResponse.STATUS_CODE, "${status.code}");
final Map<String, String> attributes = new HashMap<>();
attributes.put(HTTPUtils.HTTP_CONTEXT_ID, HTTP_REQUEST_ID);
attributes.put("my-attr", "hello");
runner.enqueue(FLOW_FILE_CONTENT.getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(HandleHttpResponse.REL_FAILURE, 1);
assertEquals(0, contextMap.getCompletionCount());
}
private static class MockHttpContextMap extends AbstractControllerService implements HttpContextMap {
private final String id;
private final AtomicInteger completedCount = new AtomicInteger(0);
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
private final ConcurrentMap<String, String> headersSent = new ConcurrentHashMap<>();
private final Exception responseException;
private final RuntimeException completeException;
private volatile int statusCode = -1;
private final List<String> headersWithNoValue = new CopyOnWriteArrayList<>();
public MockHttpContextMap(final String expectedIdentifier, final Exception responseException, final RuntimeException completeException) {
this.id = expectedIdentifier;
this.responseException = responseException;
this.completeException = completeException;
}
@Override
public boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context) {
return true;
}
@Override
public HttpServletResponse getResponse(final String identifier) {
if (!id.equals(identifier)) {
Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier);
}
try {
final HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
if (responseException == null) {
Mockito.when(response.getOutputStream()).thenReturn(new ServletOutputStream() {
@Override
public boolean isReady() {
return true;
}
@Override
public void setWriteListener(WriteListener writeListener) {
}
@Override
public void write(int b) {
outputStream.write(b);
}
@Override
public void write(byte[] b) throws IOException {
outputStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) {
outputStream.write(b, off, len);
}
});
} else {
Mockito.when(response.getOutputStream()).thenThrow(responseException);
}
Mockito.doAnswer(invocation -> {
final String key = invocation.getArgument(0);
final String value = invocation.getArgument(1);
if (value == null) {
headersWithNoValue.add(key);
} else {
headersSent.put(key, value);
}
return null;
}).when(response).setHeader(Mockito.any(String.class), Mockito.any(String.class));
Mockito.doAnswer(invocation -> {
statusCode = invocation.getArgument(0);
return null;
}).when(response).setStatus(Mockito.anyInt());
return response;
} catch (final Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
return null;
}
}
@Override
public void complete(final String identifier) {
if (!id.equals(identifier)) {
Assert.fail("attempting to respond to wrong request; should have been " + id + " but was " + identifier);
}
if (completeException != null) {
throw completeException;
}
completedCount.incrementAndGet();
}
public int getCompletionCount() {
return completedCount.get();
}
@Override
public long getRequestTimeout(TimeUnit timeUnit) {
return timeUnit.convert(30000, TimeUnit.MILLISECONDS);
}
}
}