NIFI-8770 Use queue drainTo() on shutdown in HandleHttpRequest

- Refactored response handling to use shared sendError() method
- Standardized request logging to include HTTP Method and URI

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5218.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index b9a26f8..96af6c2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -56,10 +56,6 @@
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-
 import javax.net.ssl.SSLContext;
 import javax.servlet.AsyncContext;
 import javax.servlet.DispatcherType;
@@ -73,7 +69,6 @@
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URLDecoder;
 import java.security.Principal;
 import java.security.cert.X509Certificate;
@@ -84,6 +79,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -93,6 +89,11 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
+import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
+import static javax.servlet.http.HttpServletResponse.SC_METHOD_NOT_ALLOWED;
+import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND;
+import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
+
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"http", "https", "request", "listen", "ingress", "web service"})
 @CapabilityDescription("Starts an HTTP Server and listens for HTTP Requests. For each request, creates a FlowFile and transfers to 'success'. "
@@ -322,10 +323,10 @@
 
     private volatile Server server;
     private volatile boolean ready;
-    private AtomicBoolean initialized = new AtomicBoolean(false);
     private volatile BlockingQueue<HttpRequestContainer> containerQueue;
-    private AtomicBoolean runOnPrimary = new AtomicBoolean(false);
-    private AtomicReference<Set<String>> parameterToAttributesReference = new AtomicReference<>(null);
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+    private final AtomicBoolean runOnPrimary = new AtomicBoolean(false);
+    private final AtomicReference<Set<String>> parameterToAttributesReference = new AtomicReference<>(null);
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -343,7 +344,7 @@
     }
 
     synchronized void initializeServer(final ProcessContext context) throws Exception {
-        if(initialized.get()){
+        if (initialized.get()) {
             return;
         }
         runOnPrimary.set(context.getExecutionNode().equals(ExecutionNode.PRIMARY));
@@ -357,10 +358,10 @@
         final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
         final boolean need;
         final boolean want;
-        if (CLIENT_NEED.equals(clientAuthValue)) {
+        if (CLIENT_NEED.getValue().equals(clientAuthValue)) {
             need = true;
             want = false;
-        } else if (CLIENT_WANT.equals(clientAuthValue)) {
+        } else if (CLIENT_WANT.getValue().equals(clientAuthValue)) {
             need = false;
             want = true;
         } else {
@@ -458,66 +459,40 @@
 
         server.setHandler(new AbstractHandler() {
             @Override
-            public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response)
-                    throws IOException, ServletException {
-
+            public void handle(final String target, final Request baseRequest, final HttpServletRequest request, final HttpServletResponse response) {
                 final String requestUri = request.getRequestURI();
-                if (!allowedMethods.contains(request.getMethod().toUpperCase())) {
-                    getLogger().info("Sending back METHOD_NOT_ALLOWED response to {}; method was {}; request URI was {}",
-                            new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri});
-                    response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
+                final String method = request.getMethod().toUpperCase();
+                if (!allowedMethods.contains(method)) {
+                    sendError(SC_METHOD_NOT_ALLOWED, "Method Not Allowed", request, response);
                     return;
                 }
 
                 if (pathPattern != null) {
-                    final URI uri;
-                    try {
-                        uri = new URI(requestUri);
-                    } catch (final URISyntaxException e) {
-                        throw new ServletException(e);
-                    }
-
+                    final URI uri = URI.create(requestUri);
                     if (!pathPattern.matcher(uri.getPath()).matches()) {
-                        getLogger().info("Sending back NOT_FOUND response to {}; request was {} {}",
-                                new Object[]{request.getRemoteAddr(), request.getMethod(), requestUri});
-                        response.sendError(HttpServletResponse.SC_NOT_FOUND);
+                        sendError(SC_NOT_FOUND, "Path Not Found", request, response);
                         return;
                     }
                 }
 
-                // If destination queues full, send back a 503: Service Unavailable.
                 if (context.getAvailableRelationships().isEmpty()) {
-                    getLogger().warn("Request from {} cannot be processed, processor downstream queue is full; responding with SERVICE_UNAVAILABLE",
-                            new Object[]{request.getRemoteAddr()});
-
-                    response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor queue is full");
+                    sendError(SC_SERVICE_UNAVAILABLE, "No Available Relationships", request, response);
                     return;
                 } else if (!ready) {
-                    getLogger().warn("Request from {} cannot be processed, processor is being shut down; responding with SERVICE_UNAVAILABLE",
-                        new Object[]{request.getRemoteAddr()});
-
-                    response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
+                    sendError(SC_SERVICE_UNAVAILABLE, "Server Not Ready", request, response);
                     return;
                 }
 
-                // Right now, that information, though, is only in the ProcessSession, not the ProcessContext,
-                // so it is not known to us. Should see if it can be added to the ProcessContext.
                 final AsyncContext async = baseRequest.startAsync();
-
                 // disable timeout handling on AsyncContext, timeout will be handled in HttpContextMap
                 async.setTimeout(0);
 
-                final boolean added = containerQueue.offer(new HttpRequestContainer(request, response, async));
-
+                final HttpRequestContainer container = new HttpRequestContainer(request, response, async);
+                final boolean added = containerQueue.offer(container);
                 if (added) {
-                    getLogger().debug("Added Http Request to queue for {} {} from {}",
-                            new Object[]{request.getMethod(), requestUri, request.getRemoteAddr()});
+                    getLogger().debug("Request Queued: Method [{}] URI [{}] Address [{}]", method, requestUri, request.getRemoteAddr());
                 } else {
-                    getLogger().warn("Request from {} cannot be processed, container queue is full; responding with SERVICE_UNAVAILABLE",
-                            new Object[]{request.getRemoteAddr()});
-
-                    response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Container queue is full");
-                    async.complete();
+                    sendError(SC_SERVICE_UNAVAILABLE, "Request Queue Full", container);
                 }
             }
         });
@@ -525,7 +500,9 @@
         this.server = server;
         server.start();
 
-        getLogger().info("Server started and listening on port " + getPort());
+        for (final Connector connector : server.getConnectors()) {
+            getLogger().info("Started Connector {}", connector);
+        }
 
         initialized.set(true);
         ready = true;
@@ -561,54 +538,50 @@
     public void shutdown() throws Exception {
         ready = false;
 
-        if (server != null) {
-            getLogger().debug("Shutting down server");
-            rejectPendingRequests();
-            server.stop();
-            server.destroy();
-            server.join();
-            clearInit();
-            getLogger().info("Shut down {}", new Object[]{server});
-        }
-    }
+        if (server == null) {
+            getLogger().debug("Server not configured");
+        } else {
+            if (server.isStopped()) {
+                getLogger().debug("Server Stopped {}", server);
+            } else {
+                for (final Connector connector : server.getConnectors()) {
+                    getLogger().debug("Stopping Connector {}", connector);
+                }
 
-    void rejectPendingRequests() {
-        HttpRequestContainer container;
-        while ((container = getNextContainer()) != null) {
-            try {
-                getLogger().warn("Rejecting request from {} during cleanup after processor shutdown; responding with SERVICE_UNAVAILABLE",
-                    new Object[]{container.getRequest().getRemoteAddr()});
+                drainContainerQueue();
+                server.stop();
+                server.destroy();
+                server.join();
+                clearInit();
 
-                HttpServletResponse response = container.getResponse();
-                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Processor is shutting down");
-                container.getContext().complete();
-            } catch (final IOException e) {
-                getLogger().warn("Failed to send HTTP response to {} due to {}",
-                    new Object[]{container.getRequest().getRemoteAddr(), e});
+                for (final Connector connector : server.getConnectors()) {
+                    getLogger().info("Stopped Connector {}", connector);
+                }
             }
         }
     }
 
-    private HttpRequestContainer getNextContainer() {
-        HttpRequestContainer container;
-        try {
-            container = containerQueue.poll(2, TimeUnit.SECONDS);
-        } catch (final InterruptedException e) {
-            getLogger().warn("Interrupted while polling for " + HttpRequestContainer.class.getSimpleName() + " during cleanup.");
-            container = null;
+    void drainContainerQueue() {
+        if (containerQueue.isEmpty()) {
+            getLogger().debug("No Pending Requests Queued");
+        } else {
+            final List<HttpRequestContainer> pendingContainers = new ArrayList<>();
+            containerQueue.drainTo(pendingContainers);
+            getLogger().warn("Pending Requests Queued [{}]", pendingContainers.size());
+            for (final HttpRequestContainer container : pendingContainers) {
+                sendError(SC_SERVICE_UNAVAILABLE, "Stopping Server", container);
+            }
         }
-
-        return container;
     }
 
     @OnPrimaryNodeStateChange
-    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
-        if (runOnPrimary.get() && newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
+    public void onPrimaryNodeChange(final PrimaryNodeState state) {
+        if (runOnPrimary.get() && state.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
+            getLogger().info("Server Shutdown Started: Primary Node State Changed [{}]", state);
             try {
                 shutdown();
-            } catch (final Exception shutdownException) {
-                getLogger().warn("Processor is configured to run only on Primary Node, but failed to shutdown HTTP server following revocation of primary node status due to {}",
-                        shutdownException);
+            } catch (final Exception e) {
+                getLogger().warn("Server Shutdown Failed: Primary Node State Changed [{}]", state, e);
             }
         }
     }
@@ -616,7 +589,7 @@
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         try {
-            if(!initialized.get()) {
+            if (!initialized.get()) {
                 initializeServer(context);
             }
         } catch (Exception e) {
@@ -626,7 +599,7 @@
                 // shutdown to release any resources allocated during the failed initialization
                 shutdown();
             } catch (final Exception shutdownException) {
-                getLogger().debug("Failed to shutdown following a failed initialization: " + shutdownException);
+                getLogger().debug("Server Shutdown Failed after Initialization Failed", shutdownException);
             }
 
             throw new ProcessException("Failed to initialize the server", e);
@@ -647,14 +620,14 @@
         final long start = System.nanoTime();
         final HttpServletRequest request = container.getRequest();
 
-        if (!Strings.isNullOrEmpty(request.getContentType()) && request.getContentType().contains(MIME_TYPE__MULTIPART_FORM_DATA)) {
+        if (StringUtils.contains(request.getContentType(), MIME_TYPE__MULTIPART_FORM_DATA)) {
           final long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue();
           final int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
           String tempDir = System.getProperty("java.io.tmpdir");
           request.setAttribute(Request.MULTIPART_CONFIG_ELEMENT, new MultipartConfigElement(tempDir, requestMaxSize, requestMaxSize, readBufferSize));
           List<Part> parts = null;
           try {
-            parts = ImmutableList.copyOf(request.getParts());
+            parts = Collections.unmodifiableList(new ArrayList<>(request.getParts()));
             int allPartsCount = parts.size();
             final String contextIdentifier = UUID.randomUUID().toString();
             for (int i = 0; i < allPartsCount; i++) {
@@ -663,22 +636,21 @@
               try (OutputStream flowFileOut = session.write(flowFile)) {
                 StreamUtils.copy(part.getInputStream(), flowFileOut);
               } catch (IOException e) {
-                handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e);
+                handleFlowContentStreamingError(session, container, Optional.of(flowFile), e);
                 return;
               }
-              flowFile = savePartAttributes(context, session, part, flowFile, i, allPartsCount);
+              flowFile = savePartAttributes(session, part, flowFile, i, allPartsCount);
               flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
               if (i == 0) {
                 // each one of multipart comes from a single request, thus registering only once per loop.
-                boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile);
+                boolean requestRegistrationSuccess = registerRequest(context, session, container, flowFile);
                 if (!requestRegistrationSuccess)
                   break;
               }
-              forwardFlowFile(context, session, container, start, request, flowFile);
+              forwardFlowFile(session, start, request, flowFile);
             }
           } catch (IOException | ServletException | IllegalStateException e) {
-            handleFlowContentStreamingError(session, container, request, Optional.absent(), e);
-            return;
+            handleFlowContentStreamingError(session, container, Optional.empty(), e);
           } finally {
             if (parts != null) {
               for (Part part : parts) {
@@ -695,18 +667,18 @@
           try (OutputStream flowFileOut = session.write(flowFile)) {
             StreamUtils.copy(request.getInputStream(), flowFileOut);
           } catch (final IOException e) {
-            handleFlowContentStreamingError(session, container, request, Optional.of(flowFile), e);
+            handleFlowContentStreamingError(session, container, Optional.of(flowFile), e);
             return;
           }
           final String contextIdentifier = UUID.randomUUID().toString();
           flowFile = saveRequestAttributes(context, session, request, flowFile, contextIdentifier);
-          boolean requestRegistrationSuccess = registerRequest(context, session, container, start, request, flowFile);
+          boolean requestRegistrationSuccess = registerRequest(context, session, container, flowFile);
           if (requestRegistrationSuccess)
-            forwardFlowFile(context, session, container, start, request, flowFile);
+            forwardFlowFile(session, start, request, flowFile);
         }
     }
 
-    private FlowFile savePartAttributes(ProcessContext context, ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
+    private FlowFile savePartAttributes(ProcessSession session, Part part, FlowFile flowFile, final int i, final int allPartsCount) {
       final Map<String, String> attributes = new HashMap<>();
       for (String headerName : part.getHeaderNames()) {
         final String headerValue = part.getHeader(headerName);
@@ -817,76 +789,53 @@
           putAttribute(attributes, "http.principal.name", principal.getName());
       }
 
-      final X509Certificate certs[] = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
-      final String subjectDn;
+      final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate");
       if (certs != null && certs.length > 0) {
           final X509Certificate cert = certs[0];
-          subjectDn = cert.getSubjectDN().getName();
+          final String subjectDn = cert.getSubjectDN().getName();
           final String issuerDn = cert.getIssuerDN().getName();
 
           putAttribute(attributes, HTTPUtils.HTTP_SSL_CERT, subjectDn);
           putAttribute(attributes, "http.issuer.dn", issuerDn);
-      } else {
-          subjectDn = null;
       }
 
       return session.putAllAttributes(flowFile, attributes);
     }
 
-    private void forwardFlowFile(final ProcessContext context, final ProcessSession session,
-        HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
+    private void forwardFlowFile(final ProcessSession session, final long start, final HttpServletRequest request, final FlowFile flowFile) {
       final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-      String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);
+      final String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);
       session.getProvenanceReporter().receive(flowFile, HTTPUtils.getURI(flowFile.getAttributes()),
           "Received from " + request.getRemoteAddr() + (subjectDn == null ? "" : " with DN=" + subjectDn), receiveMillis);
       session.transfer(flowFile, REL_SUCCESS);
-      getLogger().info("Transferring {} to 'success'; received from {}", new Object[]{flowFile, request.getRemoteAddr()});
+      getLogger().debug("Transferred {} to [{}] Remote Address [{}] ", flowFile, REL_SUCCESS, request.getRemoteAddr());
     }
 
 
     private boolean registerRequest(final ProcessContext context, final ProcessSession session,
-        HttpRequestContainer container, final long start, final HttpServletRequest request, FlowFile flowFile) {
+                                    final HttpRequestContainer container, final FlowFile flowFile) {
         final HttpContextMap contextMap = context.getProperty(HTTP_CONTEXT_MAP).asControllerService(HttpContextMap.class);
-        String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
+        final String contextIdentifier = flowFile.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
+        final HttpServletRequest request = container.getRequest();
         final boolean registered = contextMap.register(contextIdentifier, request, container.getResponse(), container.getContext());
-        if (registered)
-          return true;
-
-        getLogger().warn("Received request from {} but could not process it because too many requests are already outstanding; responding with SERVICE_UNAVAILABLE",
-            new Object[]{request.getRemoteAddr()});
-
-        try {
-          container.getResponse().sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "HttpContextMap is full");
-          container.getContext().complete();
-        } catch (final Exception e) {
-          getLogger().warn("Failed to respond with SERVICE_UNAVAILABLE message to {} due to {}",
-              new Object[]{request.getRemoteAddr(), e});
+        if (registered) {
+            return true;
         }
 
         session.remove(flowFile);
+        sendError(SC_SERVICE_UNAVAILABLE, "Request Registration Failed", container);
         return false;
     }
 
-
-    protected void handleFlowContentStreamingError(final ProcessSession session, HttpRequestContainer container,
-        final HttpServletRequest request, Optional<FlowFile> flowFile, final Exception e) {
-      // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
-      // bad requests, the connection to the client is not closed. In order to address also these cases, we try
-      // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
-      // processed and makes it aware that the connection can be closed.
-      getLogger().error("Failed to receive content from HTTP Request from {} due to {}",
-              new Object[]{request.getRemoteAddr(), e});
-      if (flowFile.isPresent())
-        session.remove(flowFile.get());
-
-      try {
-          HttpServletResponse response = container.getResponse();
-          response.sendError(HttpServletResponse.SC_BAD_REQUEST);
-          container.getContext().complete();
-      } catch (final IOException ioe) {
-          getLogger().warn("Failed to send HTTP response to {} due to {}",
-                  new Object[]{request.getRemoteAddr(), ioe});
-      }
+    protected void handleFlowContentStreamingError(final ProcessSession session, final HttpRequestContainer container, final Optional<FlowFile> flowFile, final Exception e) {
+        // There may be many reasons which can produce an IOException on the HTTP stream and in some of them, eg.
+        // bad requests, the connection to the client is not closed. In order to address also these cases, we try
+        // and answer with a BAD_REQUEST, which lets the client know that the request has not been correctly
+        // processed and makes it aware that the connection can be closed.
+        final HttpServletRequest request = container.getRequest();
+        getLogger().error("Stream Processing Failed: Method [{}] URI [{}] Address [{}]", request.getMethod(), request.getRequestURI(), request.getRemoteAddr(), e);
+        flowFile.ifPresent(session::remove);
+        sendError(SC_BAD_REQUEST, "Stream Processing Failed", container);
     }
 
     private void putAttribute(final Map<String, String> map, final String key, final Object value) {
@@ -905,8 +854,34 @@
         map.put(key, value);
     }
 
-    private static class HttpRequestContainer {
+    private void sendError(final int statusCode, final String message, final HttpRequestContainer container) {
+        sendError(statusCode, message, container.getRequest(), container.getResponse());
+        final AsyncContext asyncContext = container.getContext();
+        try {
+            asyncContext.complete();
+        } catch (final RuntimeException e) {
+            final HttpServletRequest request = container.getRequest();
+            final String method = request.getMethod();
+            final String uri = request.getRequestURI();
+            final String remoteAddr = request.getRemoteAddr();
+            getLogger().error("Complete Request Failed: Method [{}] URI [{}] Address [{}]", method, uri, remoteAddr, e);
+        }
+    }
 
+    private void sendError(final int statusCode, final String message, final HttpServletRequest request, final HttpServletResponse response) {
+        final String method = request.getMethod();
+        final String uri = request.getRequestURI();
+        final String remoteAddr = request.getRemoteAddr();
+
+        try {
+            response.sendError(statusCode, message);
+            getLogger().warn("Send Error Completed: HTTP {} [{}] Method [{}] URI [{}] Address [{}]", statusCode, message, method, uri, remoteAddr);
+        } catch (final Exception e) {
+            getLogger().error("Send Error Failed: HTTP {} [{}] Method [{}] URI [{}] Address [{}]", statusCode, message, method, uri, remoteAddr, e);
+        }
+    }
+
+    private static class HttpRequestContainer {
         private final HttpServletRequest request;
         private final HttpServletResponse response;
         private final AsyncContext context;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
index 67954f1..9e98ef3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ITestHandleHttpRequest.java
@@ -17,23 +17,31 @@
 package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.PrintWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.IntStream;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
@@ -41,10 +49,6 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import com.google.api.client.util.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.io.Files;
 import okhttp3.Call;
 import okhttp3.Callback;
 import okhttp3.MediaType;
@@ -53,17 +57,21 @@
 import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.http.HttpContextMap;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.HTTPUtils;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.util.ssl.SslContextUtils;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -179,11 +187,11 @@
                             .addFormDataPart("p1", "v1")
                             .addFormDataPart("p2", "v2")
                             .addFormDataPart("file1", "my-file-text.txt",
-                                    RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+                                    RequestBody.create(createTextFile("Hello", "World"), MediaType.parse("text/plain")))
                             .addFormDataPart("file2", "my-file-data.json",
-                                    RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+                                    RequestBody.create(createTextFile( "{ \"name\":\"John\", \"age\":30 }"), MediaType.parse("application/json")))
                             .addFormDataPart("file3", "my-file-binary.bin",
-                                    RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+                                    RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream")))
                             .build();
 
                     Request request = new Request.Builder()
@@ -324,7 +332,6 @@
         // We cannot rely on the order we sent them in.
         for (int i = 1; i < 4; i++) {
             MockFlowFile mff = findFlowFile(flowFilesForRelationship, "http.multipart.name", String.format("p%d", i));
-            String contextId = mff.getAttribute(HTTPUtils.HTTP_CONTEXT_ID);
             mff.assertAttributeEquals("http.multipart.name", String.format("p%d", i));
             mff.assertAttributeExists("http.param.p1");
             mff.assertAttributeEquals("http.param.p1", "v1");
@@ -364,11 +371,11 @@
                             .addFormDataPart("p1", "v1")
                             .addFormDataPart("p2", "v2")
                             .addFormDataPart("file1", "my-file-text.txt",
-                                    RequestBody.create(MediaType.parse("text/plain"), createTextFile("my-file-text.txt", "Hello", "World")))
+                                    RequestBody.create(createTextFile("my-file-text.txt", "Hello", "World"), MediaType.parse("text/plain")))
                             .addFormDataPart("file2", "my-file-data.json",
-                                    RequestBody.create(MediaType.parse("application/json"), createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }")))
+                                    RequestBody.create(createTextFile("my-file-text.txt", "{ \"name\":\"John\", \"age\":30 }"), MediaType.parse("application/json")))
                             .addFormDataPart("file3", "my-file-binary.bin",
-                                    RequestBody.create(MediaType.parse("application/octet-stream"), generateRandomBinaryData(100)))
+                                    RequestBody.create(generateRandomBinaryData(), MediaType.parse("application/octet-stream")))
                             .build();
 
                     Request request = new Request.Builder()
@@ -383,12 +390,12 @@
 
                     Callback callback = new Callback() {
                         @Override
-                        public void onFailure(Call call, IOException e) {
+                        public void onFailure(@NotNull Call call, @NotNull IOException e) {
                             // Not going to happen
                         }
 
                         @Override
-                        public void onResponse(Call call, Response response) {
+                        public void onResponse(@NotNull Call call, @NotNull Response response) {
                             responseCode.set(response.code());
                             resultReady.countDown();
                         }
@@ -409,25 +416,27 @@
         Assert.assertEquals(503, responseCode.get());
     }
 
-    private byte[] generateRandomBinaryData(int i) {
+    private byte[] generateRandomBinaryData() {
         byte[] bytes = new byte[100];
         new Random().nextBytes(bytes);
         return bytes;
     }
 
 
-    private File createTextFile(String fileName, String... lines) throws IOException {
-        File file = new File(fileName);
+    private File createTextFile(String... lines) throws IOException {
+        File file = new File(getClass().getSimpleName());
         file.deleteOnExit();
-        for (String string : lines) {
-            Files.append(string, file, Charsets.UTF_8);
+        try (final PrintWriter writer = new PrintWriter(new FileWriter(file))) {
+            for (final String line : lines) {
+                writer.println(line);
+            }
         }
         return file;
     }
 
 
     protected MockFlowFile findFlowFile(List<MockFlowFile> flowFilesForRelationship, String attributeName, String attributeValue) {
-        Optional<MockFlowFile> optional = Iterables.tryFind(flowFilesForRelationship, ff -> ff.getAttribute(attributeName).equals(attributeValue));
+        Optional<MockFlowFile> optional = flowFilesForRelationship.stream().filter(ff -> ff.getAttribute(attributeName).equals(attributeValue)).findFirst();
         Assert.assertTrue(optional.isPresent());
         return optional.get();
     }
@@ -450,7 +459,6 @@
         contextMap.setRegisterSuccessfully(false);
 
         final int[] responseCode = new int[1];
-        responseCode[0] = 0;
         final Thread httpThread = new Thread(new Runnable() {
             @Override
             public void run() {
@@ -543,12 +551,12 @@
 
                     Callback callback = new Callback() {
                         @Override
-                        public void onFailure(Call call, IOException e) {
+                        public void onFailure(@NotNull Call call, @NotNull IOException e) {
                             // Will only happen once for the first non-rejected request, but not important
                         }
 
                         @Override
-                        public void onResponse(Call call, Response response) throws IOException {
+                        public void onResponse(@NotNull Call call, @NotNull Response response) {
                             responses.add(response);
                             cleanupDone.countDown();
                         }
@@ -583,10 +591,64 @@
         assertEquals(responses.size(), nrOfRequests - 1);
         for (Response response : responses) {
             assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, response.code());
-            assertTrue("Unexpected HTTP response for rejected requests", new String(response.body().bytes()).contains("Processor is shutting down"));
         }
     }
 
+    @Test(timeout = 15000)
+    public void testOnPrimaryNodeChangePrimaryNodeRevoked() throws Exception {
+        processor = new HandleHttpRequest();
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        final int port = NetworkUtils.getAvailableTcpPort();
+        runner.setProperty(HandleHttpRequest.PORT, Integer.toString(port));
+
+        final MockHttpContextMap contextMap = new MockHttpContextMap();
+        final String contextMapId = MockHttpContextMap.class.getSimpleName();
+        runner.addControllerService(contextMapId, contextMap);
+        runner.enableControllerService(contextMap);
+        runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, contextMapId);
+
+        final ProcessContext processContext = spy(runner.getProcessContext());
+        when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
+        processor.initializeServer(processContext);
+
+        final OkHttpClient client = new OkHttpClient.Builder().build();
+
+        final String url = String.format("http://localhost:%d", port);
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        final CountDownLatch requestCompleted = new CountDownLatch(1);
+        final CountDownLatch requestStarted = new CountDownLatch(1);
+
+        final AtomicReference<IOException> requestException = new AtomicReference<>();
+        final AtomicInteger responseStatus = new AtomicInteger();
+        executorService.execute(() -> {
+            final Request request = new Request.Builder().url(url).get().build();
+            final Call call = client.newCall(request);
+            call.enqueue(new Callback() {
+                @Override
+                public void onFailure(@NotNull Call call, @NotNull IOException e) {
+                    requestException.set(e);
+                    requestCompleted.countDown();
+                }
+
+                @Override
+                public void onResponse(@NotNull Call call, @NotNull Response response) {
+                    responseStatus.set(response.code());
+                    requestCompleted.countDown();
+                }
+            });
+            requestStarted.countDown();
+        });
+
+        requestStarted.await();
+        Thread.sleep(1000);
+        processor.onPrimaryNodeChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
+        requestCompleted.await();
+
+        assertNull("HTTP Request Exception found", requestException.get());
+        assertEquals("HTTP Status not matched", HttpServletResponse.SC_SERVICE_UNAVAILABLE, responseStatus.get());
+    }
+
     @Test
     public void testSecure() throws Exception {
         secureTest(false);
@@ -610,7 +672,7 @@
         runner.enableControllerService(contextMap);
         runner.setProperty(HandleHttpRequest.HTTP_CONTEXT_MAP, "http-context-map");
 
-        final RestrictedSSLContextService sslContextService = Mockito.mock(RestrictedSSLContextService.class);
+        final RestrictedSSLContextService sslContextService = mock(RestrictedSSLContextService.class);
         final String serviceIdentifier = RestrictedSSLContextService.class.getName();
         Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         Mockito.when(sslContextService.createContext()).thenReturn(keyStoreSslContext);
@@ -683,7 +745,7 @@
             }
 
             @Override
-            void rejectPendingRequests() {
+            void drainContainerQueue() {
                 // Skip this, otherwise it would wait to make sure there are no more requests
             }
         };
@@ -691,7 +753,7 @@
 
     private void sendRequest(HttpURLConnection connection, CountDownLatch requestSent) throws Exception {
         Future<InputStream> executionFuture = Executors.newSingleThreadExecutor()
-                .submit(() -> connection.getInputStream());
+                .submit(connection::getInputStream);
 
         requestSent.countDown();
 
@@ -701,12 +763,12 @@
     private void sendRequest(OkHttpClient client, Request request, CountDownLatch requestSent) {
         Callback callback = new Callback() {
             @Override
-            public void onFailure(Call call, IOException e) {
+            public void onFailure(@NotNull Call call, @NotNull IOException e) {
                 // We (may) get a timeout as the processor doesn't answer unless there is some kind of error
             }
 
             @Override
-            public void onResponse(Call call, Response response) throws IOException {
+            public void onResponse(@NotNull Call call, @NotNull Response response) {
                 // Not called as the processor doesn't answer unless there is some kind of error
             }
         };
@@ -747,10 +809,6 @@
             return responseMap.size();
         }
 
-        public boolean isRegisterSuccessfully() {
-            return registerSuccessfully;
-        }
-
         public void setRegisterSuccessfully(boolean registerSuccessfully) {
             this.registerSuccessfully = registerSuccessfully;
         }