NIFI-8770 Use queue drainTo() on shutdown in HandleHttpRequest
- Refactored response handling to use shared sendError() method
- Standardized request logging to include HTTP Method and URI
Signed-off-by: Nathan Gough <thenatog@gmail.com>
This closes #5218.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index b9a26f8..96af6c2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -56,10 +56,6 @@
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-
import javax.net.ssl.SSLContext;
import javax.servlet.AsyncContext;
import javax.servlet.DispatcherType;
@@ -73,7 +69,6 @@
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.security.Principal;
import java.security.cert.X509Certificate;
@@ -84,6 +79,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@@ -93,6 +89,11 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
+import static javax.servlet.http.HttpServletResponse.SC_METHOD_NOT_ALLOWED;
+import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND;
+import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
+
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"http", "https", "request", "listen", "ingress", "web service"})
@CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. "
@@ -322,10 +323,10 @@
private volatile Server server;
private volatile boolean ready;
- private AtomicBoolean initialized = new AtomicBoolean(false);
private volatile BlockingQueue<HttpRequestContainer> containerQueue;
- private AtomicBoolean runOnPrimary = new AtomicBoolean(false);
- private AtomicReference<Set<String>> parameterToAttributesReference = new AtomicReference<>(null);
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private final AtomicBoolean runOnPrimary = new AtomicBoolean(false);
+ private final AtomicReference<Set<String>> parameterToAttributesReference = new AtomicReference<>(null);
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -343,7 +344,7 @@
}
synchronized void initializeServer(final ProcessContext context) throws Exception {
- if(initialized.get()){
+ if (initialized.get()) {
return;
}
runOnPrimary.set(context.getExecutionNode().equals(ExecutionNode.PRIMARY));
@@ -357,10 +358,10 @@
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
- if (CLIENT_NEED.equals(clientAuthValue)) {
+ if (CLIENT_NEED.getValue().equals(clientAuthValue)) {
need = true;
want = false;
- } else if (CLIENT_WANT.equals(clientAuthValue)) {
+ } else if (CLIENT_WANT.getValue().equals(clientAuthValue)) {
need = false;
want = true;
} else {
@@ -458,66 +459,40 @@
server.setHandler(new AbstractHandler() {
@Override
- public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response)
- throws IOException, ServletException {
-
+ public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) {
final String requestUri = request.getRequestURI();
- if (!allowedMethods.contains(request.getMethod().toUpperCase())) {
- getLogger().info("Sending back METHOD_NOT_ALLOWED response to {}; method was {}; request URI was {}",
- new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri});
- response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
+ final String method = request.getMethod().toUpperCase();
+ if (!allowedMethods.contains(method)) {
+ sendError(SC_METHOD_NOT_ALLOWED, "Method Not Allowed", request, response);
return;
}
if (pathPattern != null) {
- final URI uri;
- try {
- uri = new URI(requestUri);
- } catch (final URISyntaxException e) {
- throw new ServletException(e);
- }
-
+ final URI uri = URI.create(requestUri);
if (!pathPattern.matcher(uri.getPath()).matches()) {
- getLogger().info("Sending back NOT_FOUND response to {}; request was {} {}",
- new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri});
- response.sendError(HttpServletResponse.SC_NOT_FOUND);
+ sendError(SC_NOT_FOUND, "Path Not Found", request, response);
return;
}
}
- // If destination queues full, send back a 503: Service Unavailable.
if (context.getAvailableRelationships().isEmpty()) {
- getLogger().warn("Request from {} cannot be processed, processor downstream queue is full; responding with SERVICE_UNAVAILABLE",
- new Object[]{request.getRemoteAddr()});
-
- response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue is full");
+ sendError(SC_SERVICE_UNAVAILABLE, "No Available Relationships", request, response);
return;
} else if (!ready) {
- getLogger().warn("Request from {} cannot be processed, processor is being shut down; responding with SERVICE_UNAVAILABLE",
- new Object[]{request.getRemoteAddr()});
-
- response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
+ sendError(SC_SERVICE_UNAVAILABLE, "Server Not Ready", request, response);
return;
}
- // Right now, that information, though, is only in the ProcessSession, not the ProcessContext,
- // so it is not known to us. Should see if it can be added to the ProcessContext.
final AsyncContext async = baseRequest.startAsync();
-
// disable timeout handling on AsyncContext, timeout will be handled in HttpContextMap
async.setTimeout(0);
- final boolean added = containerQueue.offer(new HttpRequestContainer(request, response, async));
-
+ final HttpRequestContainer container = new HttpRequestContainer(request, response, async);
+ final boolean added = containerQueue.offer(container);
if (added) {
- getLogger().debug("Added Http Request to queue for {} {} from {}",
- new Object[]{request.getMethod(), requestUri, request.getRemoteAddr()});
+ getLogger().debug("Request Queued: Method [{}] URI [{}] Address [{}]", method, requestUri, request.getRemoteAddr());
} else {
- getLogger().warn("Request from {} cannot be processed, container queue is full; responding with SERVICE_UNAVAILABLE",
- new Object[]{request.getRemoteAddr()});
-
- response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Container queue is full");
- async.complete();
+ sendError(SC_SERVICE_UNAVAILABLE, "Request Queue Full", container);
}
}
});
@@ -525,7 +500,9 @@
this.server = server;
server.start();
- getLogger().info("Server started and listening on port " + getPort());
+ for (final Connector connector : server.getConnectors()) {
+ getLogger().info("Started Connector {}", connector);
+ }
initialized.set(true);
ready = true;
@@ -561,54 +538,50 @@
public void shutdown() throws Exception {
ready = false;
- if (server != null) {
- getLogger().debug("Shutting down server");
- rejectPendingRequests();
- server.stop();
- server.destroy();
- server.join();
- clearInit();
- getLogger().info("Shut down {}", new Object[]{server});
- }
- }
+ if (server == null) {
+ getLogger().debug("Server not configured");
+ } else {
+ if (server.isStopped()) {
+ getLogger().debug("Server Stopped {}", server);
+ } else {
+ for (final Connector connector : server.getConnectors()) {
+ getLogger().debug("Stopping Connector {}", connector);
+ }
- void rejectPendingRequests() {
- HttpRequestContainer container;
- while ((container = getNextContainer()) != null) {
- try {
- getLogger().warn("Rejecting request from {} during cleanup after processor shutdown; responding with SERVICE_UNAVAILABLE",
- new Object[]{container.getRequest().getRemoteAddr()});
+ drainContainerQueue();
+ server.stop();
+ server.destroy();
+ server.join();
+ clearInit();
- HttpServletResponse response = container.getResponse();
- response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
- container.getContext().complete();
- } catch (final IOException e) {
- getLogger().warn("Failed to send HTTP response to {} due to {}",
- new Object[]{container.getRequest().getRemoteAddr(), e});
+ for (final Connector connector : server.getConnectors()) {
+ getLogger().info("Stopped Connector {}", connector);
+ }
}
}
}
- private HttpRequestContainer getNextContainer() {
- HttpRequestContainer container;
- try {
- container = containerQueue.poll(2, TimeUnit.SECONDS);
- } catch (final InterruptedException e) {
- getLogger().warn("Interrupted while polling for " + HttpRequestContainer.class.getSimpleName() + " during cleanup.");
- container = null;
+ void drainContainerQueue() {
+ if (containerQueue.isEmpty()) {
+ getLogger().debug("No Pending Requests Queued");
+ } else {
+ final List<HttpRequestContainer> pendingContainers = new ArrayList<>();
+ containerQueue.drainTo(pendingContainers);
+ getLogger().warn("Pending Requests Queued [{}]", pendingContainers.size());
+ for (final HttpRequestContainer container : pendingContainers) {
+ sendError(SC_SERVICE_UNAVAILABLE, "Stopping Server", container);
+ }
}
-
- return container;
}
@OnPrimaryNodeStateChange
- public void onPrimaryNodeChange(final PrimaryNodeState newState) {
- if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
+ public void onPrimaryNodeChange(final PrimaryNodeState state) {
+ if (runOnPrimary.get() && state.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
+ getLogger().info("Server Shutdown Started: Primary Node State Changed [{}]", state);
try {
shutdown();
- } catch (final Exception shutdownException) {
- getLogger().warn("Processor is configured to run only on Primary Node, but failed to shutdown HTTP server following revocation of primary node status due to {}",
- shutdownException);
+ } catch (final Exception e) {
+ getLogger().warn("Server Shutdown Failed: Primary Node State Changed [{}]", state, e);
}
}
}
@@ -616,7 +589,7 @@
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
try {
- if(!initialized.get()) {
+ if (!initialized.get()) {
initializeServer(context);
}
} catch (Exception e) {
@@ -626,7 +599,7 @@
// shutdown to release any resources allocated during the failed initialization
shutdown();
} catch (final Exception shutdownException) {
- getLogger().debug("Failed to shutdown following a failed initialization: " + shutdownException);
+ getLogger().debug("Server Shutdown Failed after Initialization Failed", shutdownException);
}
throw new ProcessException("Failed to initialize the server", e);
@@ -647,14 +620,14 @@
final long start = System.nanoTime();
final HttpServletRequest request = container.getRequest();
- if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) {
+ if (StringUtils.contains(request.getContentType(), MIME_TYPE__MULTIPART_FORM_DATA)) {
final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
String tempDir = System.getProperty("java.io.tmpdir");
request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize));
List<Part> parts = null;
try {
- parts = ImmutableList.copyOf(request.getParts());
+ parts = Collections.unmodifiableList(new ArrayList<>(request.getParts()));
int allPartsCount = parts.size();
final String contextIdentifier = UUID.randomUUID().toString();
for (int i = 0; i < allPartsCount; i++) {
@@ -663,22 +636,21 @@
try (OutputStream flowFileOut = session.write(flowFile)) {
StreamUtils.copy(part.getInputStream(), flowFileOut);
} catch (IOException e) {
- handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e);
+ handleFlowContentStreamingError(session, container, Optional.of(flowFile), e);
return;
}
- flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount);
+ flowFile = savePartAttributes(session, part, flowFile, i, allPartsCount);
flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
if (i == 0) {
// each one of multipart comes from a single request, thus registering only once per loop.
- boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile);
+ boolean requestRegistrationSuccess = registerRequest(context, session, container, flowFile);
if (!requestRegistrationSuccess)
break;
}
- forwardFlowFile(context, session, container, start, request, flowFile);
+ forwardFlowFile(session, start, request, flowFile);
}
} catch (IOException | ServletException | IllegalStateException e) {
- handleFlowContentStreamingError(session, container, request, Optional.absent(), e);
- return;
+ handleFlowContentStreamingError(session, container, Optional.empty(), e);
} finally {
if (parts != null) {
for (Part part : parts) {
@@ -695,18 +667,18 @@
try (OutputStream flowFileOut = session.write(flowFile)) {
StreamUtils.copy(request.getInputStream(), flowFileOut);
} catch (final IOException e) {
- handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e);
+ handleFlowContentStreamingError(session, container, Optional.of(flowFile), e);
return;
}
final String contextIdentifier = UUID.randomUUID().toString();
flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
- boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile);
+ boolean requestRegistrationSuccess = registerRequest(context, session, container, flowFile);
if (requestRegistrationSuccess)
- forwardFlowFile(context, session, container, start, request, flowFile);
+ forwardFlowFile(session, start, request, flowFile);
}
}
- private FlowFile savePartAttributes(ProcessContext context, ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
+ private FlowFile savePartAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
final Map<String, String> attributes = new HashMap<>();
for (String headerName : part.getHeaderNames()) {
final String headerValue = part.getHeader(headerName);
@@ -817,76 +789,53 @@
putAttribute(attributes, "http.principal.name", principal.getName());
}
- final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
- final String subjectDn;
+ final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
if (certs != null && certs.length > 0) {
final X509Certificate cert = certs[0];
- subjectDn = cert.getSubjectDN().getName();
+ final String subjectDn = cert.getSubjectDN().getName();
final String issuerDn = cert.getIssuerDN().getName();
putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
putAttribute(attributes, "http.issuer.dn", issuerDn);
- } else {
- subjectDn = null;
}
return session.putAllAttributes(flowFile, attributes);
}
- private void forwardFlowFile(final ProcessContext context, final ProcessSession session,
- HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
+ private void forwardFlowFile(final ProcessSession session, final long start, final HttpServletRequest request, final FlowFile flowFile) {
final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);
+ final String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);
session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(flowFile.getAttributes()),
"Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()});
+ getLogger().debug("Transferred {} to [{}] Remote Address [{}] ", flowFile, REL_SUCCESS, request.getRemoteAddr());
}
private boolean registerRequest(final ProcessContext context, final ProcessSession session,
- HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
+ final HttpRequestContainer container, final FlowFile flowFile) {
final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
- String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
+ final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
+ final HttpServletRequest request = container.getRequest();
final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext());
- if (registered)
- return true;
-
- getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE",
- new Object[]{request.getRemoteAddr()});
-
- try {
- container.getResponse().sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "HttpContextMap is full");
- container.getContext().complete();
- } catch (final Exception e) {
- getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}",
- new Object[]{request.getRemoteAddr(), e});
+ if (registered) {
+ return true;
}
session.remove(flowFile);
+ sendError(SC_SERVICE_UNAVAILABLE, "Request Registration Failed", container);
return false;
}
-
- protected void handleFlowContentStreamingError(final ProcessSession session, HttpRequestContainer container,
- final HttpServletRequest request, Optional<FlowFile> flowFile, final Exception e) {
- // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
- // bad requests, the connection to the client is not closed. In order to address also these cases, we try
- // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
- // processed and makes it aware that the connection can be closed.
- getLogger().error("Failed to receive content from HTTP Request from {} due to {}",
- new Object[]{request.getRemoteAddr(), e});
- if (flowFile.isPresent())
- session.remove(flowFile.get());
-
- try {
- HttpServletResponse response = container.getResponse();
- response.sendError(HttpServletResponse.SC_BAD_REQUEST);
- container.getContext().complete();
- } catch (final IOException ioe) {
- getLogger().warn("Failed to send HTTP response to {} due to {}",
- new Object[]{request.getRemoteAddr(), ioe});
- }
+ protected void handleFlowContentStreamingError(final ProcessSession session, final HttpRequestContainer container, final Optional<FlowFile> flowFile, final Exception e) {
+ // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
+ // bad requests, the connection to the client is not closed. In order to address also these cases, we try
+ // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
+ // processed and makes it aware that the connection can be closed.
+ final HttpServletRequest request = container.getRequest();
+ getLogger().error("Stream Processing Failed: Method [{}] URI [{}] Address [{}]", request.getMethod(), request.getRequestURI(), request.getRemoteAddr(), e);
+ flowFile.ifPresent(session::remove);
+ sendError(SC_BAD_REQUEST, "Stream Processing Failed", container);
}
private void putAttribute(final Map<String, String> map, final String key, final Object value) {
@@ -905,8 +854,34 @@
map.put(key, value);
}
- private static class HttpRequestContainer {
+ private void sendError(final int statusCode, final String message, final HttpRequestContainer container) {
+ sendError(statusCode, message, container.getRequest(), container.getResponse());
+ final AsyncContext asyncContext = container.getContext();
+ try {
+ asyncContext.complete();
+ } catch (final RuntimeException e) {
+ final HttpServletRequest request = container.getRequest();
+ final String method = request.getMethod();
+ final String uri = request.getRequestURI();
+ final String remoteAddr = request.getRemoteAddr();
+ getLogger().error("Complete Request Failed: Method [{}] URI [{}] Address [{}]", method, uri, remoteAddr, e);
+ }
+ }
+ private void sendError(final int statusCode, final String message, final HttpServletRequest request, final HttpServletResponse response) {
+ final String method = request.getMethod();
+ final String uri = request.getRequestURI();
+ final String remoteAddr = request.getRemoteAddr();
+
+ try {
+ response.sendError(statusCode, message);
+ getLogger().warn("Send Error Completed: HTTP {} [{}] Method [{}] URI [{}] Address [{}]", statusCode, message, method, uri, remoteAddr);
+ } catch (final Exception e) {
+ getLogger().error("Send Error Failed: HTTP {} [{}] Method [{}] URI [{}] Address [{}]", statusCode, message, method, uri, remoteAddr, e);
+ }
+ }
+
+ private static class HttpRequestContainer {
private final HttpServletRequest request;
private final HttpServletResponse response;
private final AsyncContext context;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
index 67954f1..9e98ef3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
@@ -17,23 +17,31 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
+import java.io.PrintWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
@@ -41,10 +49,6 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import com.google.api.client.util.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
@@ -53,17 +57,21 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.HTTPUtils;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
+import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -179,11 +187,11 @@
.addFormDataPart("p1", "v1")
.addFormDataPart("p2", "v2")
.addFormDataPart("file1", "my-file-text.txt",
- RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+ RequestBody.create(createTextFile("Hello", "World"), MediaType.parse("text/plain")))
.addFormDataPart("file2", "my-file-data.json",
- RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+ RequestBody.create(createTextFile( "{ \"name\":\"John\", \"age\":30 }"), MediaType.parse("application/json")))
.addFormDataPart("file3", "my-file-binary.bin",
- RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+ RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream")))
.build();
Request request = new Request.Builder()
@@ -324,7 +332,6 @@
// We cannot rely on the order we sent them in.
for (int i = 1; i < 4; i++) {
MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", String.format("p%d", i));
- String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
mff.assertAttributeEquals("http.multipart.name", String.format("p%d", i));
mff.assertAttributeExists("http.param.p1");
mff.assertAttributeEquals("http.param.p1", "v1");
@@ -364,11 +371,11 @@
.addFormDataPart("p1", "v1")
.addFormDataPart("p2", "v2")
.addFormDataPart("file1", "my-file-text.txt",
- RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+ RequestBody.create(createTextFile("my-file-text.txt", "Hello", "World"), MediaType.parse("text/plain")))
.addFormDataPart("file2", "my-file-data.json",
- RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+ RequestBody.create(createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"), MediaType.parse("application/json")))
.addFormDataPart("file3", "my-file-binary.bin",
- RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+ RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream")))
.build();
Request request = new Request.Builder()
@@ -383,12 +390,12 @@
Callback callback = new Callback() {
@Override
- public void onFailure(Call call, IOException e) {
+ public void onFailure(@NotNull Call call, @NotNull IOException e) {
// Not going to happen
}
@Override
- public void onResponse(Call call, Response response) {
+ public void onResponse(@NotNull Call call, @NotNull Response response) {
responseCode.set(response.code());
resultReady.countDown();
}
@@ -409,25 +416,27 @@
Assert.assertEquals(503, responseCode.get());
}
- private byte[] generateRandomBinaryData(int i) {
+ private byte[] generateRandomBinaryData() {
byte[] bytes = new byte[100];
new Random().nextBytes(bytes);
return bytes;
}
- private File createTextFile(String fileName, String... lines) throws IOException {
- File file = new File(fileName);
+ private File createTextFile(String... lines) throws IOException {
+ File file = new File(getClass().getSimpleName());
file.deleteOnExit();
- for (String string : lines) {
- Files.append(string, file, Charsets.UTF_8);
+ try (final PrintWriter writer = new PrintWriter(new FileWriter(file))) {
+ for (final String line : lines) {
+ writer.println(line);
+ }
}
return file;
}
protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
- Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
+ Optional<MockFlowFile> optional = flowFilesForRelationship.stream().filter(ff -> ff.getAttribute(attributeName).equals(attributeValue)).findFirst();
Assert.assertTrue(optional.isPresent());
return optional.get();
}
@@ -450,7 +459,6 @@
contextMap.setRegisterSuccessfully(false);
final int[] responseCode = new int[1];
- responseCode[0] = 0;
final Thread httpThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -543,12 +551,12 @@
Callback callback = new Callback() {
@Override
- public void onFailure(Call call, IOException e) {
+ public void onFailure(@NotNull Call call, @NotNull IOException e) {
// Will only happen once for the first non-rejected request, but not important
}
@Override
- public void onResponse(Call call, Response response) throws IOException {
+ public void onResponse(@NotNull Call call, @NotNull Response response) {
responses.add(response);
cleanupDone.countDown();
}
@@ -583,10 +591,64 @@
assertEquals(responses.size(), nrOfRequests - 1);
for (Response response : responses) {
assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code());
- assertTrue("Unexpected HTTP response for rejected requests", new String(response.body().bytes()).contains("Processor is shutting down"));
}
}
+ @Test(timeout = 15000)
+ public void testOnPrimaryNodeChangePrimaryNodeRevoked() throws Exception {
+ processor = new HandleHttpRequest();
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ final int port = NetworkUtils.getAvailableTcpPort();
+ runner.setProperty(HandleHttpRequest.PORT, Integer.toString(port));
+
+ final MockHttpContextMap contextMap = new MockHttpContextMap();
+ final String contextMapId = MockHttpContextMap.class.getSimpleName();
+ runner.addControllerService(contextMapId, contextMap);
+ runner.enableControllerService(contextMap);
+ runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, contextMapId);
+
+ final ProcessContext processContext = spy(runner.getProcessContext());
+ when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
+ processor.initializeServer(processContext);
+
+ final OkHttpClient client = new OkHttpClient.Builder().build();
+
+ final String url = String.format("http://localhost:%d", port);
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ final CountDownLatch requestCompleted = new CountDownLatch(1);
+ final CountDownLatch requestStarted = new CountDownLatch(1);
+
+ final AtomicReference<IOException> requestException = new AtomicReference<>();
+ final AtomicInteger responseStatus = new AtomicInteger();
+ executorService.execute(() -> {
+ final Request request = new Request.Builder().url(url).get().build();
+ final Call call = client.newCall(request);
+ call.enqueue(new Callback() {
+ @Override
+ public void onFailure(@NotNull Call call, @NotNull IOException e) {
+ requestException.set(e);
+ requestCompleted.countDown();
+ }
+
+ @Override
+ public void onResponse(@NotNull Call call, @NotNull Response response) {
+ responseStatus.set(response.code());
+ requestCompleted.countDown();
+ }
+ });
+ requestStarted.countDown();
+ });
+
+ requestStarted.await();
+ Thread.sleep(1000);
+ processor.onPrimaryNodeChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
+ requestCompleted.await();
+
+ assertNull("HTTP Request Exception found", requestException.get());
+ assertEquals("HTTP Status not matched", HttpServletResponse.SC_SERVICE_UNAVAILABLE, responseStatus.get());
+ }
+
@Test
public void testSecure() throws Exception {
secureTest(false);
@@ -610,7 +672,7 @@
runner.enableControllerService(contextMap);
runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
- final RestrictedSSLContextService sslContextService = Mockito.mock(RestrictedSSLContextService.class);
+ final RestrictedSSLContextService sslContextService = mock(RestrictedSSLContextService.class);
final String serviceIdentifier = RestrictedSSLContextService.class.getName();
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
Mockito.when(sslContextService.createContext()).thenReturn(keyStoreSslContext);
@@ -683,7 +745,7 @@
}
@Override
- void rejectPendingRequests() {
+ void drainContainerQueue() {
// Skip this, otherwise it would wait to make sure there are no more requests
}
};
@@ -691,7 +753,7 @@
private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception {
Future<InputStream> executionFuture = Executors.newSingleThreadExecutor()
- .submit(() -> connection.getInputStream());
+ .submit(connection::getInputStream);
requestSent.countDown();
@@ -701,12 +763,12 @@
private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) {
Callback callback = new Callback() {
@Override
- public void onFailure(Call call, IOException e) {
+ public void onFailure(@NotNull Call call, @NotNull IOException e) {
// We (may) get a timeout as the processor doesn't answer unless there is some kind of error
}
@Override
- public void onResponse(Call call, Response response) throws IOException {
+ public void onResponse(@NotNull Call call, @NotNull Response response) {
// Not called as the processor doesn't answer unless there is some kind of error
}
};
@@ -747,10 +809,6 @@
return responseMap.size();
}
- public boolean isRegisterSuccessfully() {
- return registerSuccessfully;
- }
-
public void setRegisterSuccessfully(boolean registerSuccessfully) {
this.registerSuccessfully = registerSuccessfully;
}