NIFI-2259: HTTP Site-to-Site can't handle DEST_FULL

HTTP Site-to-Site can't handle TRANSACTION_FINISHED_BUT_DESTINATION_FULL
scenario as expected.

That happens if the remote NiFi's input port destination relationship
becomes full during Site-to-Site client sends data. The data which has
already sent to the remote NiFi has to be committed successfully.
However, the remote NiFi returns 503 as a response of commit HTTP
request. Because it does check port availability.

The port availability check shouldn't be called at commit request, since
the session at source NiFi has already been committed. The remote NiFi
should commit its session as well, and return
TRANSACTION_FINISHED_BUT_DESTINATION_FULL response.

This fix makes a remote NiFi to keep the handshaken properties when it holds
transaction to be committed. Then if a transaction already has
handshaken properties, then use it, instead of doing a handshake process
again.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
index 08fb188..e08a3ac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/HttpRemoteSiteListener.java
@@ -18,6 +18,7 @@
 
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.remote.protocol.HandshakenProperties;
 import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
@@ -90,10 +91,12 @@
 
     private class TransactionWrapper {
         private final FlowFileTransaction transaction;
+        private final HandshakenProperties handshakenProperties;
         private long lastCommunicationAt;
 
-        private TransactionWrapper(final FlowFileTransaction transaction) {
+        private TransactionWrapper(final FlowFileTransaction transaction, final HandshakenProperties handshakenProperties) {
             this.transaction = transaction;
+            this.handshakenProperties = handshakenProperties;
             this.lastCommunicationAt = System.currentTimeMillis();
         }
 
@@ -166,13 +169,17 @@
 
     public String createTransaction() {
         final String transactionId = UUID.randomUUID().toString();
-        transactions.put(transactionId, new TransactionWrapper(null));
+        transactions.put(transactionId, new TransactionWrapper(null, null));
         logger.debug("Created a new transaction: {}", transactionId);
         return transactionId;
     }
 
     public boolean isTransactionActive(final String transactionId) {
         TransactionWrapper transaction = transactions.get(transactionId);
+        return isTransactionActive(transaction);
+    }
+
+    private boolean isTransactionActive(TransactionWrapper transaction) {
         if (transaction == null) {
             return false;
         }
@@ -182,7 +189,23 @@
         return true;
     }
 
-    public void holdTransaction(final String transactionId, final FlowFileTransaction transaction) throws IllegalStateException {
+    /**
+     * @param transactionId transactionId to check
+     * @return Returns a HandshakenProperties instance which is created when this transaction is started,
+     *          only if the transaction is active,
+     *          and it holds a HandshakenProperties,
+     *          otherwise return null
+     */
+    public HandshakenProperties getHandshakenProperties(final String transactionId) {
+        TransactionWrapper transaction = transactions.get(transactionId);
+        if (isTransactionActive(transaction)) {
+            return transaction.handshakenProperties;
+        }
+        return null;
+    }
+
+    public void holdTransaction(final String transactionId, final FlowFileTransaction transaction,
+                                final HandshakenProperties handshakenProperties) throws IllegalStateException {
         // We don't check expiration of the transaction here, to support large file transport or slow network.
         // The availability of current transaction is already checked when the HTTP request was received at SiteToSiteResource.
         TransactionWrapper currentTransaction = transactions.remove(transactionId);
@@ -197,7 +220,7 @@
         logger.debug("Holding a transaction: {}", transactionId);
         // Server has received or sent all data, and transaction TTL count down starts here.
         // However, if the client doesn't consume data fast enough, server might expire and rollback the transaction.
-        transactions.put(transactionId, new TransactionWrapper(transaction));
+        transactions.put(transactionId, new TransactionWrapper(transaction, handshakenProperties));
     }
 
     public FlowFileTransaction finalizeTransaction(final String transactionId) throws IllegalStateException {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
index ebbee17..660c498 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/http/StandardHttpFlowFileServerProtocol.java
@@ -34,6 +34,7 @@
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -65,11 +66,21 @@
 
     @Override
     protected HandshakenProperties doHandshake(Peer peer) throws IOException, HandshakeException {
-        HandshakenProperties confirmed = new HandshakenProperties();
 
         HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        confirmed.setCommsIdentifier(commsSession.getTransactionId());
-        validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
+        final String transactionId = commsSession.getTransactionId();
+
+        HandshakenProperties confirmed = null;
+        if (!StringUtils.isEmpty(transactionId)) {
+            // If handshake is already done, use it.
+            confirmed = transactionManager.getHandshakenProperties(transactionId);
+        }
+        if (confirmed == null) {
+            // If it's not, then do handshake.
+            confirmed = new HandshakenProperties();
+            confirmed.setCommsIdentifier(transactionId);
+            validateHandshakeRequest(confirmed, peer, commsSession.getHandshakeParams());
+        }
 
         logger.debug("{} Done handshake, confirmed={}", this, confirmed);
         return confirmed;
@@ -168,7 +179,7 @@
         HttpServerCommunicationsSession commSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
         String transactionId = commSession.getTransactionId();
         logger.debug("{} Holding transaction. transactionId={}", this, transactionId);
-        transactionManager.holdTransaction(transactionId, transaction);
+        transactionManager.holdTransaction(transactionId, transaction, handshakenProperties);
 
         return transaction.getFlowFilesSent().size();
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
index ded2042..6ab8988 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestHttpRemoteSiteListener.java
@@ -18,6 +18,7 @@
 
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.protocol.FlowFileTransaction;
+import org.apache.nifi.remote.protocol.HandshakenProperties;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -45,7 +46,9 @@
 
         ProcessSession processSession = Mockito.mock(ProcessSession.class);
         FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
-        transactionManager.holdTransaction(transactionId, transaction);
+        transactionManager.holdTransaction(transactionId, transaction, new HandshakenProperties());
+
+        assertNotNull(transactionManager.getHandshakenProperties(transactionId));
 
         transaction = transactionManager.finalizeTransaction(transactionId);
         assertNotNull(transaction);
@@ -63,10 +66,10 @@
 
         ProcessSession processSession = Mockito.mock(ProcessSession.class);
         FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
-        transactionManager.holdTransaction(transactionId, transaction);
+        transactionManager.holdTransaction(transactionId, transaction, null);
 
         try {
-            transactionManager.holdTransaction(transactionId, transaction);
+            transactionManager.holdTransaction(transactionId, transaction, null);
             fail("The same transaction id can't hold another transaction");
         } catch (IllegalStateException e) {
         }
@@ -83,7 +86,7 @@
         ProcessSession processSession = Mockito.mock(ProcessSession.class);
         FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
         try {
-            transactionManager.holdTransaction(transactionId, transaction);
+            transactionManager.holdTransaction(transactionId, transaction, null);
         } catch (IllegalStateException e) {
             fail("Transaction can be held even if the transaction id is not valid anymore," +
                     " in order to support large file or slow network.");
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index 3034e1e..e2de588 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -316,8 +316,6 @@
 
         HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator);
         HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
-        // TODO: How should I pass cluster information?
-        // serverProtocol.setNodeInformant(clusterManager);
         serverProtocol.handshake(peer);
         return serverProtocol;
     }
@@ -833,15 +831,6 @@
             return result;
         }
 
-        // TODO: NCM no longer exists.
-        /*
-        if (properties.isClusterManager()) {
-            result.errResponse = responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is not available on a NiFi Cluster Manager.");
-            return result;
-        }
-        */
-
-
         try {
             result.transportProtocolVersion = negotiateTransportProtocolVersion(req, transportProtocolVersionNegotiator);
         } catch (BadRequestException e) {