GEODE-8202: Two-step serial gw sender threads start (#5900)

* GEODE-8202: Two-step serial gw sender threads start
diff --git a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
index 6be6066..7247362 100644
--- a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
+++ b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
@@ -22,13 +22,17 @@
 import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.StringTokenizer;
+import java.util.Vector;
 
 import com.palantir.docker.compose.DockerComposeRule;
 import org.junit.BeforeClass;
@@ -71,14 +75,8 @@
  * traffic directed to the 2324 port to the receivers in a round robin fashion.
  *
  * - Another site consisting of a 1-server, 1-locator Geode cluster.
- * The server hosts a partition region (region-wan) and has a parallel gateway receiver
+ * The server hosts a partition region (region-wan) and has a gateway receiver
  * to send events to the remote site.
- *
- * The aim of the tests is verify that when several gateway receivers in a remote site
- * share the same port and hostname-for-senders, the pings sent from the gateway senders
- * reach the right gateway receiver and not just any of the receivers. Failure to do this
- * may result in the closing of connections by a gateway receiver for not having
- * received the ping in time.
  */
 @Category({WanTest.class})
 public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
@@ -122,6 +120,13 @@
     super();
   }
 
+  /**
+   * The aim of this test is verify that when several gateway receivers in a remote site
+   * share the same port and hostname-for-senders, the pings sent from the gateway senders
+   * reach the right gateway receiver and not just any of the receivers. Failure to do this
+   * may result in the closing of connections by a gateway receiver for not having
+   * received the ping in time.
+   */
   @Test
   public void testPingsToReceiversWithSamePortAndHostnameForSendersReachTheRightReceivers()
       throws InterruptedException {
@@ -159,6 +164,105 @@
     assertEquals(0, senderPoolDisconnects);
   }
 
+  @Test
+  public void testSerialGatewaySenderThreadsConnectToSameReceiver() {
+    String senderId = "ln";
+    String regionName = "region-wan";
+    final int remoteLocPort = 20334;
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+    createCache(vm1, locPort);
+
+    createGatewaySender(vm1, senderId, 2, false, 5,
+        3, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+    assertTrue(allDispatchersConnectedToSameReceiver(1));
+    assertTrue(allDispatchersConnectedToSameReceiver(2));
+
+  }
+
+  @Test
+  public void testTwoSendersWithSameIdShouldUseSameValueForEnforceThreadsConnectToSameServer() {
+    String senderId = "ln";
+    final int remoteLocPort = 20334;
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+    createCache(vm1, locPort);
+
+    VM vm2 = VM.getVM(2);
+    createCache(vm2, locPort);
+
+    createGatewaySender(vm1, senderId, 2, false, 5,
+        3, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    Exception exception =
+        assertThrows(Exception.class, () -> createGatewaySender(vm2, senderId, 2, false, 5,
+            3, GatewaySender.DEFAULT_ORDER_POLICY, false));
+    assertEquals(exception.getCause().getMessage(), "Cannot create Gateway Sender " + senderId
+        + " with enforceThreadsConnectSameReceiver false because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver true");
+
+  }
+
+  private boolean allDispatchersConnectedToSameReceiver(int server) {
+
+    String gfshOutput = runListGatewayReceiversCommandInServer(server);
+    Vector<String> sendersConnectedToServer = parseSendersConnectedFromGfshOutput(gfshOutput);
+    String firstSenderId = "";
+    for (String senderId : sendersConnectedToServer) {
+      if (firstSenderId.equals("")) {
+        firstSenderId = senderId;
+      } else {
+        assertEquals("Found two different senders (" + firstSenderId + " and " + senderId
+            + ") connected to same receiver in server " + server, firstSenderId, senderId);
+      }
+    }
+    return true;
+  }
+
+
+  private String runListGatewayReceiversCommandInServer(int serverN) {
+    String result = "";
+    try {
+      result = docker.get().exec(options("-T"), "locator",
+          arguments("gfsh", "run",
+              "--file=/geode/scripts/geode-list-gateway-receivers-server" + serverN + ".gfsh"));
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      return result;
+    }
+  }
+
+  private Vector<String> parseSendersConnectedFromGfshOutput(String gfshOutput) {
+    String lines[] = gfshOutput.split(System.getProperty("line.separator"));
+    final String sendersConnectedColumnHeader = "Senders Connected";
+    String receiverInfo = null;
+    for (int i = 0; i < lines.length; i++) {
+      if (lines[i].contains(sendersConnectedColumnHeader)) {
+        receiverInfo = lines[i + 2];
+        break;
+      }
+    }
+    assertNotNull(
+        "Error parsing gfsh output. '" + sendersConnectedColumnHeader + "' column header not found",
+        receiverInfo);
+    String[] tableRow = receiverInfo.split("\\|");
+    String sendersConnectedColumnValue = tableRow[3].trim();
+    Vector<String> senders = new Vector<String>();
+    for (String sender : sendersConnectedColumnValue.split(",")) {
+      senders.add(sender.trim());
+    }
+    return senders;
+  }
+
   private int createLocator(VM memberVM, int dsId, int remoteLocPort) {
     return memberVM.invoke("create locator", () -> {
       Properties props = new Properties();
@@ -182,6 +286,14 @@
       boolean isParallel, Integer batchSize,
       int numDispatchers,
       GatewaySender.OrderPolicy orderPolicy) {
+    createGatewaySender(vm, dsName, remoteDsId, isParallel, batchSize, numDispatchers, orderPolicy,
+        true);
+  }
+
+  public static void createGatewaySender(VM vm, String dsName, int remoteDsId,
+      boolean isParallel, Integer batchSize,
+      int numDispatchers,
+      GatewaySender.OrderPolicy orderPolicy, boolean enforceThreadsConnectToSameReceiver) {
     vm.invoke(() -> {
       final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
       try {
@@ -191,6 +303,7 @@
         gateway.setBatchSize(batchSize);
         gateway.setDispatcherThreads(numDispatchers);
         gateway.setOrderPolicy(orderPolicy);
+        gateway.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectToSameReceiver);
         gateway.create(dsName, remoteDsId);
 
       } finally {
diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh
new file mode 100644
index 0000000..a0d61bb
--- /dev/null
+++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set variable --name=APP_RESULT_VIEWER --value=200
+connect --locator=locator[20334]
+list gateways --receivers-only --member=server1
diff --git a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh
new file mode 100644
index 0000000..37a16dc
--- /dev/null
+++ b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set variable --name=APP_RESULT_VIEWER --value=200
+connect --locator=locator[20334]
+list gateways --receivers-only --member=server2
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 65d8796..50ce9e4 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1910,10 +1910,12 @@
 fromData,225
 toData,254
 
-org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,4
-fromData,283
+org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,6
+fromData,17
+fromDataPre_GEODE_1_14_0_0,293
 fromDataPre_GFE_8_0_0_0,188
-toData,271
+toData,17
+toDataPre_GEODE_1_14_0_0,281
 toDataPre_GFE_8_0_0_0,236
 
 org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
diff --git a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
index f1b119b..85fe90e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
@@ -2656,6 +2656,8 @@
     protected String orderPolicy;
     @XmlAttribute(name = "group-transaction-events")
     protected Boolean groupTransactionEvents;
+    @XmlAttribute(name = "enforce-threads-connect-same-receiver")
+    protected Boolean enforceThreadsConnectSameReceiver;
 
     /**
      * Gets the value of the gatewayEventFilters property.
@@ -3100,6 +3102,27 @@
       this.orderPolicy = value;
     }
 
+    /**
+     * Sets the value of the enforceThreadsConnectSameReceiver property.
+     *
+     * allowed object is
+     * {@link Boolean }
+     *
+     */
+    public void setEnforceThreadsConnectSameReceiver(Boolean value) {
+      this.enforceThreadsConnectSameReceiver = value;
+    }
+
+    /**
+     * Gets the value of the enforceThreadsConnectSameReceiver property.
+     *
+     * possible object is
+     * {@link Boolean }
+     *
+     */
+    public Boolean getEnforceThreadsConnectSameReceiver() {
+      return this.enforceThreadsConnectSameReceiver;
+    }
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
index ffacf4b..5e0e9f1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
@@ -147,6 +147,8 @@
 
   boolean DEFAULT_IS_FOR_INTERNAL_USE = false;
 
+  boolean DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER = false;
+
   /**
    * Retry a connection from sender to receiver after specified time interval (in milliseconds) in
    * case receiver is not up and running. Default is set to 1000 milliseconds i.e. 1 second.
@@ -449,4 +451,12 @@
    *
    */
   void destroy();
+
+  /**
+   * Returns enforceThreadsConnectSameReceiver boolean property for this GatewaySender.
+   *
+   * @return enforceThreadsConnectSameReceiver boolean property for this GatewaySender
+   *
+   */
+  boolean getEnforceThreadsConnectSameReceiver();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
index 7c99214..6c9e92b 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
@@ -191,6 +191,19 @@
   GatewaySenderFactory setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter);
 
   /**
+   * If true, receiver member id is checked by all dispatcher threads when the connection is
+   * established to ensure they connect to the same receiver. Instead of starting all dispatcher
+   * threads in parallel, one thread is started first, and after that the rest are started in
+   * parallel. Default is false.
+   *
+   * @param enforceThreadsConnectSameReceiver boolean if true threads will verify if they are
+   *        connected to the same receiver
+   *
+   */
+  GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
+      boolean enforceThreadsConnectSameReceiver);
+
+  /**
    * Creates a <code>GatewaySender</code> to communicate with remote distributed system
    *
    * @param id unique id for this SerialGatewaySender
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 7f6c8a1..4ea2c6d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -171,6 +171,8 @@
 
   private ServerLocation serverLocation;
 
+  private String expectedReceiverUniqueId = "";
+
   protected Object queuedEventsSync = new Object();
 
   protected volatile boolean enqueuedAllTempQueueEvents = false;
@@ -237,6 +239,8 @@
 
   private final StatisticsClock statisticsClock;
 
+  protected boolean enforceThreadsConnectSameReceiver;
+
   protected AbstractGatewaySender() {
     statisticsClock = disabledClock();
   }
@@ -275,6 +279,7 @@
     this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads;
     this.serialNumber = DistributionAdvisor.createSerialNumber();
     this.isMetaQueue = attrs.isMetaQueue();
+    this.enforceThreadsConnectSameReceiver = attrs.getEnforceThreadsConnectSameReceiver();
     if (!(this.cache instanceof CacheCreation)) {
       this.myDSId = this.cache.getInternalDistributedSystem().getDistributionManager()
           .getDistributedSystemId();
@@ -500,6 +505,11 @@
   }
 
   @Override
+  public boolean getEnforceThreadsConnectSameReceiver() {
+    return this.enforceThreadsConnectSameReceiver;
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (obj == null) {
       return false;
@@ -1429,6 +1439,14 @@
     }
   }
 
+  public void setExpectedReceiverUniqueId(String expectedReceiverUniqueId) {
+    this.expectedReceiverUniqueId = expectedReceiverUniqueId;
+  }
+
+  public String getExpectedReceiverUniqueId() {
+    return this.expectedReceiverUniqueId;
+  }
+
   /**
    * Has a reference to a GatewayEventImpl and has a timeout value.
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 0609ec9..294121a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -154,6 +154,14 @@
     this.threadMonitoring = tMonitoring;
   }
 
+  public void setExpectedReceiverUniqueId(String uniqueId) {
+    this.sender.setExpectedReceiverUniqueId(uniqueId);
+  }
+
+  public String getExpectedReceiverUniqueId() {
+    return this.sender.getExpectedReceiverUniqueId();
+  }
+
   public Object getRunningStateLock() {
     return runningStateLock;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
index adf80cb..6af0866 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
@@ -232,6 +232,15 @@
               "Cannot create Gateway Sender %s with isDiskSynchronous %s because another cache has the same Gateway Sender defined with isDiskSynchronous %s",
               sp.Id, sp.isDiskSynchronous, sender.isDiskSynchronous()));
     }
+    if (sp.getDistributedMember().getVersion().isNotOlderThan(KnownVersion.GEODE_1_14_0)) {
+      if (sp.enforceThreadsConnectSameReceiver != sender.getEnforceThreadsConnectSameReceiver()) {
+        throw new IllegalStateException(
+            String.format(
+                "Cannot create Gateway Sender %s with enforceThreadsConnectSameReceiver %s because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver %s",
+                sp.Id, sp.enforceThreadsConnectSameReceiver,
+                sender.getEnforceThreadsConnectSameReceiver()));
+      }
+    }
   }
 
   /**
@@ -532,6 +541,8 @@
 
     public ServerLocation serverLocation;
 
+    public boolean enforceThreadsConnectSameReceiver = false;
+
     public GatewaySenderProfile(InternalDistributedMember memberId, int version) {
       super(memberId, version);
     }
@@ -541,6 +552,12 @@
     @Override
     public void fromData(DataInput in,
         DeserializationContext context) throws IOException, ClassNotFoundException {
+      fromDataPre_GEODE_1_14_0_0(in, context);
+      this.enforceThreadsConnectSameReceiver = in.readBoolean();
+    }
+
+    public void fromDataPre_GEODE_1_14_0_0(DataInput in,
+        DeserializationContext context) throws IOException, ClassNotFoundException {
       super.fromData(in, context);
       this.Id = DataSerializer.readString(in);
       this.startTime = in.readLong();
@@ -578,11 +595,18 @@
         this.serverLocation = new ServerLocation();
         InternalDataSerializer.invokeFromData(this.serverLocation, in);
       }
+      this.enforceThreadsConnectSameReceiver = in.readBoolean();
     }
 
     @Override
     public void toData(DataOutput out,
         SerializationContext context) throws IOException {
+      toDataPre_GEODE_1_14_0_0(out, context);
+      out.writeBoolean(enforceThreadsConnectSameReceiver);
+    }
+
+    public void toDataPre_GEODE_1_14_0_0(DataOutput out,
+        SerializationContext context) throws IOException {
       super.toData(out, context);
       DataSerializer.writeString(Id, out);
       out.writeLong(startTime);
@@ -617,6 +641,7 @@
       if (serverLocationFound) {
         InternalDataSerializer.invokeToData(serverLocation, out);
       }
+      out.writeBoolean(enforceThreadsConnectSameReceiver);
     }
 
     public void fromDataPre_GFE_8_0_0_0(DataInput in, DeserializationContext context)
@@ -684,7 +709,7 @@
 
     @Immutable
     private static final KnownVersion[] serializationVersions =
-        new KnownVersion[] {KnownVersion.GFE_80};
+        new KnownVersion[] {KnownVersion.GFE_80, KnownVersion.GEODE_1_14_0};
 
     @Override
     public KnownVersion[] getSerializationVersions() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
index 1457776..581b576 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
@@ -85,6 +85,9 @@
 
   public boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY;
 
+  public boolean enforceThreadsConnectSameReceiver =
+      GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER;
+
   public int getSocketBufferSize() {
     return this.socketBufferSize;
   }
@@ -205,4 +208,7 @@
     return this.forwardExpirationDestroy;
   }
 
+  public boolean getEnforceThreadsConnectSameReceiver() {
+    return this.enforceThreadsConnectSameReceiver;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 06f74ae..7adf996 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -179,12 +179,27 @@
 
   @Override
   public void run() {
-    for (int i = 0; i < this.processors.size(); i++) {
-      if (logger.isDebugEnabled()) {
+    boolean isDebugEnabled = logger.isDebugEnabled();
+    if (this.sender.getEnforceThreadsConnectSameReceiver()) {
+      this.processors.get(0).start();
+      waitForRunningStatus(this.processors.get(0));
+      String receiverUniqueId = this.processors.get(0).getExpectedReceiverUniqueId();
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher is connected to " + receiverUniqueId);
+      }
+      for (int j = 1; j < this.processors.size(); j++) {
+        this.processors.get(j).setExpectedReceiverUniqueId(receiverUniqueId);
+      }
+    }
+
+    for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 : 0; i < this.processors
+        .size(); i++) {
+      if (isDebugEnabled) {
         logger.debug("Starting the serialProcessor {}", i);
       }
       this.processors.get(i).start();
     }
+
     try {
       waitForRunningStatus();
     } catch (GatewaySenderException e) {
@@ -205,7 +220,7 @@
       try {
         serialProcessor.join();
       } catch (InterruptedException e) {
-        if (logger.isDebugEnabled()) {
+        if (isDebugEnabled) {
           logger.debug("Got InterruptedException while waiting for child threads to finish.");
           Thread.currentThread().interrupt();
         }
@@ -219,24 +234,28 @@
     throw new UnsupportedOperationException();
   }
 
-  private void waitForRunningStatus() {
-    for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
-      synchronized (serialProcessor.getRunningStateLock()) {
-        while (serialProcessor.getException() == null && serialProcessor.isStopped()) {
-          try {
-            serialProcessor.getRunningStateLock().wait();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-        }
-        Exception ex = serialProcessor.getException();
-        if (ex != null) {
-          throw new GatewaySenderException(
-              String.format("Could not start a gateway sender %s because of exception %s",
-                  new Object[] {this.sender.getId(), ex.getMessage()}),
-              ex.getCause());
+  private void waitForRunningStatus(SerialGatewaySenderEventProcessor serialProcessor) {
+    synchronized (serialProcessor.getRunningStateLock()) {
+      while (serialProcessor.getException() == null && serialProcessor.isStopped()) {
+        try {
+          serialProcessor.getRunningStateLock().wait();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
         }
       }
+      Exception ex = serialProcessor.getException();
+      if (ex != null) {
+        throw new GatewaySenderException(
+            String.format("Could not start a gateway sender %s because of exception %s",
+                new Object[] {this.sender.getId(), ex.getMessage()}),
+            ex.getCause());
+      }
+    }
+  }
+
+  private void waitForRunningStatus() {
+    for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
+      waitForRunningStatus(serialProcessor);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
index 1ac84a4..9970c55 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
@@ -433,8 +433,11 @@
   protected static final String ORDER_POLICY = "order-policy";
   /** The name of the <code>remote-distributed-system</code> attribute */
   protected static final String REMOTE_DISTRIBUTED_SYSTEM_ID = "remote-distributed-system-id";
+  /** The name of the <code>group-transaction-events</code> attribute */
   protected static final String GROUP_TRANSACTION_EVENTS = "group-transaction-events";
-
+  /** The name of the <code>enforce-threads-connect-same-receiver</code> attribute */
+  protected static final String ENFORCE_THREADS_CONNECT_SAME_RECEIVER =
+      "enforce-threads-connect-same-receiver";
 
   /** The name of the <code>bind-address</code> attribute */
   protected static final String BIND_ADDRESS = "bind-address";
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
index ea19b88..04ac07e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -1394,6 +1394,16 @@
       }
     }
 
+    // enforce-threads-connect-same-receiver
+    if (version.compareTo(CacheXmlVersion.GEODE_1_0) >= 0) {
+      if (generateDefaults()
+          || sender
+              .getEnforceThreadsConnectSameReceiver() != GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER) {
+        atts.addAttribute("", "", ENFORCE_THREADS_CONNECT_SAME_RECEIVER, "",
+            String.valueOf(sender.getEnforceThreadsConnectSameReceiver()));
+      }
+    }
+
     handler.startElement("", GATEWAY_SENDER, GATEWAY_SENDER, atts);
 
     for (GatewayEventFilter gef : sender.getGatewayEventFilters()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
index c775086..ce4d211 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
@@ -707,6 +707,17 @@
       gatewaySenderFactory
           .setGroupTransactionEvents(Boolean.parseBoolean(groupTransactionEvents));
     }
+
+    String enforceThreadsConnectSameReceiver = atts.getValue(ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
+    if (enforceThreadsConnectSameReceiver == null) {
+      gatewaySenderFactory
+          .setEnforceThreadsConnectSameReceiver(
+              GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
+    } else {
+      gatewaySenderFactory
+          .setEnforceThreadsConnectSameReceiver(
+              Boolean.parseBoolean(enforceThreadsConnectSameReceiver));
+    }
   }
 
   private void startGatewayReceiver(Attributes atts) {
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
index 4ce4416..3b08621 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
@@ -2267,6 +2267,10 @@
       "GatewaySender \"{0}\" created on \"{1}\"";
   public static final String CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS =
       "Gateway Sender cannot be created until all members are the current version";
+  public static final String CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER =
+      "enforce-threads-connect-same-receiver";
+  public static final String CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER__HELP =
+      "Whether or not the sender threads have to verify the receiver member id to verify if they are connected to the same server.";
 
   /* start gateway-sender */
   public static final String START_GATEWAYSENDER = "start gateway-sender";
diff --git a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
index 53b57f2..db6841a 100755
--- a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
+++ b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
@@ -203,6 +203,7 @@
             <xsd:attribute name="dispatcher-threads" type="xsd:string" use="optional" />
             <xsd:attribute name="order-policy" type="xsd:string" use="optional" />
             <xsd:attribute name="group-transaction-events" type="xsd:boolean" use="optional" />
+            <xsd:attribute name="enforce-threads-connect-same-receiver" type="xsd:boolean" use="optional" />
           </xsd:complexType>
         </xsd:element>
 
diff --git a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
index 30b4da1..11b4192 100644
--- a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
@@ -638,7 +638,12 @@
 <p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the  <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
 <p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p>
 <p><b>Note:</b> If the above condition is not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.</p></td>
+<td>false</td>
 </td>
+</tr>
+<tr>
+<td><span class="keyword parmname">\-\-enforce-threads-connect-same-receiver</span></td>
+<td>This parameter applies only to serial gateway senders. If true, receiver member id is checked by all dispatcher threads when the connection is established to ensure they connect to the same receiver. Instead of starting all dispatcher threads in parallel, one thread is started first, and after that the rest are started in parallel.</td>
 <td>false</td>
 </tr>
 </tbody>
diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
index a6b40b9..474153b 100644
--- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
+++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
@@ -137,14 +137,20 @@
           help = CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER__HELP) String[] gatewayEventFilters,
 
       @CliOption(key = CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER,
-          help = CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER__HELP) String[] gatewayTransportFilter) {
+          help = CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER__HELP) String[] gatewayTransportFilter,
+
+      @CliOption(key = CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER,
+          specifiedDefaultValue = "true",
+          unspecifiedDefaultValue = "false",
+          help = CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER__HELP) Boolean enforceThreadsConnectSameReceiver) {
 
     CacheConfig.GatewaySender configuration =
         buildConfiguration(id, remoteDistributedSystemId, parallel, manualStart,
             socketBufferSize, socketReadTimeout, enableBatchConflation, batchSize,
             batchTimeInterval, enablePersistence, diskStoreName, diskSynchronous, maxQueueMemory,
             alertThreshold, dispatcherThreads, orderPolicy == null ? null : orderPolicy.name(),
-            gatewayEventFilters, gatewayTransportFilter, groupTransactionEvents);
+            gatewayEventFilters, gatewayTransportFilter, groupTransactionEvents,
+            enforceThreadsConnectSameReceiver);
 
     GatewaySenderFunctionArgs gatewaySenderFunctionArgs =
         new GatewaySenderFunctionArgs(configuration);
@@ -228,7 +234,8 @@
       String orderPolicy,
       String[] gatewayEventFilters,
       String[] gatewayTransportFilters,
-      Boolean groupTransactionEvents) {
+      Boolean groupTransactionEvents,
+      Boolean enforceThreadsConnectSameReceiver) {
     CacheConfig.GatewaySender sender = new CacheConfig.GatewaySender();
     sender.setId(id);
     sender.setRemoteDistributedSystemId(int2string(remoteDSId));
@@ -253,7 +260,7 @@
     if (gatewayTransportFilters != null) {
       sender.getGatewayTransportFilters().addAll(stringsToDeclarableTypes(gatewayTransportFilters));
     }
-
+    sender.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
     return sender;
   }
 
@@ -284,6 +291,10 @@
       Boolean batchConflationEnabled =
           (Boolean) parseResult
               .getParamValue(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION);
+      Boolean enforceThreadsConnectSameReceiver =
+          (Boolean) parseResult
+              .getParamValue(
+                  CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
 
       if (dispatcherThreads != null && dispatcherThreads > 1 && orderPolicy == null) {
         return ResultModel.createError(
@@ -306,6 +317,14 @@
             "Gateway Sender cannot be created with both --group-transaction-events and --enable-batch-conflation.");
       }
 
+      if (parallel && enforceThreadsConnectSameReceiver) {
+        return ResultModel
+            .createError(
+                "Option --" + CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER
+                    + " only applies to serial gateway senders.");
+
+      }
+
       return ResultModel.createInfo("");
     }
   }
diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
index 00efaa5..9d7e75f 100644
--- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
+++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
@@ -173,6 +173,13 @@
             gatewayTransportFilterKlass, CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER));
       }
     }
+
+    Boolean enforceThreadsConnectSameReceiver =
+        gatewaySenderCreateArgs.getEnforceThreadsConnectSameReceiver();
+    if (enforceThreadsConnectSameReceiver != null) {
+      gateway.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
+    }
+
     return gateway.create(gatewaySenderCreateArgs.getId(),
         gatewaySenderCreateArgs.getRemoteDistributedSystemId());
   }
diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
index 2b08ef9..dd70bda 100644
--- a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
+++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
@@ -46,6 +46,7 @@
   // array of fully qualified class names of the filters
   private final List<String> gatewayEventFilters;
   private final List<String> gatewayTransportFilters;
+  private final Boolean enforceThreadsConnectSameReceiver;
 
   public GatewaySenderFunctionArgs(CacheConfig.GatewaySender sender) {
     this.id = sender.getId();
@@ -77,6 +78,7 @@
                 .stream().map(DeclarableType::getClassName)
                 .collect(Collectors.toList()))
             .orElse(null);
+    this.enforceThreadsConnectSameReceiver = sender.getEnforceThreadsConnectSameReceiver();
   }
 
   private Integer string2int(String x) {
@@ -158,4 +160,8 @@
   public List<String> getGatewayTransportFilter() {
     return this.gatewayTransportFilters;
   }
+
+  public Boolean getEnforceThreadsConnectSameReceiver() {
+    return this.enforceThreadsConnectSameReceiver;
+  }
 }
diff --git a/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt b/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
index c60d599..b368b1f 100644
--- a/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
+++ b/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
@@ -65,7 +65,7 @@
 org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction,true,8746830191680509335
 org/apache/geode/management/internal/cli/functions/GatewaySenderDestroyFunction,true,1
 org/apache/geode/management/internal/cli/functions/GatewaySenderDestroyFunctionArgs,true,3848480256348119530,id:java/lang/String,ifExists:boolean
-org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs,true,4636678328980816780,alertThreshold:java/lang/Integer,batchSize:java/lang/Integer,batchTimeInterval:java/lang/Integer,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/Integer,enableBatchConflation:java/lang/Boolean,enablePersistence:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayTransportFilters:java/util/List,groupTransactionEvents:java/lang/Boolean,id:java/lang/String,manualStart:java/lang/Boolean,maxQueueMemory:java/lang/Integer,orderPolicy:java/lang/String,parallel:java/lang/Boolean,remoteDSId:java/lang/Integer,socketBufferSize:java/lang/Integer,socketReadTimeout:java/lang/Integer
+org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs,true,4636678328980816780,alertThreshold:java/lang/Integer,batchSize:java/lang/Integer,batchTimeInterval:java/lang/Integer,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/Integer,enableBatchConflation:java/lang/Boolean,enablePersistence:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayTransportFilters:java/util/List,groupTransactionEvents:java/lang/Boolean,id:java/lang/String,manualStart:java/lang/Boolean,maxQueueMemory:java/lang/Integer,orderPolicy:java/lang/String,parallel:java/lang/Boolean,remoteDSId:java/lang/Integer,socketBufferSize:java/lang/Integer,socketReadTimeout:java/lang/Integer,enforceThreadsConnectSameReceiver:java/lang/Boolean
 org/apache/geode/management/internal/cli/functions/GetMemberConfigInformationFunction,true,1
 org/apache/geode/management/internal/cli/functions/GetRegionDescriptionFunction,true,1
 org/apache/geode/management/internal/cli/functions/GetRegionsFunction,true,1
diff --git a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
index 2784ba5..585158d 100644
--- a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
+++ b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
@@ -275,7 +275,7 @@
     assertThat(argsArgumentCaptor.getValue().getGatewayEventFilter()).isNotNull().isEmpty();
     assertThat(argsArgumentCaptor.getValue().getGatewayTransportFilter()).isNotNull().isEmpty();
     assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isNotNull();
-
+    assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
   }
 
   @Test
@@ -347,4 +347,70 @@
     assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isFalse();
 
   }
+
+  @Test
+  public void testEnforceThreadsConnectSameReceiverCannotBeUsedForParallelSenders() {
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1 --parallel --enforce-threads-connect-same-receiver")
+        .statusIsError()
+        .containsOutput(
+            "Option --" + CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER
+                + " only applies to serial gateway senders.");
+  }
+
+  @Test
+  public void testEnforceThreadsConnectSameReceiverIsTrueWhenUsedWithoutValue() {
+    doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+    cliFunctionResult =
+        new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult");
+    functionResults.add(cliFunctionResult);
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1 --enforce-threads-connect-same-receiver")
+        .statusIsSuccess();
+    verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any());
+
+    assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isTrue();
+  }
+
+  @Test
+  public void testEnforceThreadsConnectSameReceiverIsFalseWhenSetToFalse() {
+    doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+    cliFunctionResult =
+        new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult");
+    functionResults.add(cliFunctionResult);
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1 --enforce-threads-connect-same-receiver=false")
+        .statusIsSuccess();
+    verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any());
+
+    assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
+  }
+
+  @Test
+  public void testEnforceThreadsConnectSameReceiverIsTrueWhenSetToTrue() {
+    doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+    cliFunctionResult =
+        new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult");
+    functionResults.add(cliFunctionResult);
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1 --enforce-threads-connect-same-receiver=true")
+        .statusIsSuccess();
+    verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any());
+
+    assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isTrue();
+  }
+
+  @Test
+  public void testEnforceThreadsConnectSameReceiverIsFalseByDefault() {
+    doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+    cliFunctionResult =
+        new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult");
+    functionResults.add(cliFunctionResult);
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1")
+        .statusIsSuccess();
+    verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any());
+
+    assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
+  }
 }
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
index 734ec8e..854329e 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
@@ -112,7 +112,7 @@
       String xml = locator.getConfigurationPersistenceService().getConfiguration("cluster")
           .getCacheXmlContent();
       assertThat(xml).contains(
-          "<gateway-sender id=\"ln\" remote-distributed-system-id=\"2\" parallel=\"false\" manual-start=\"false\" enable-batch-conflation=\"false\" enable-persistence=\"false\" disk-synchronous=\"true\" group-transaction-events=\"false\"/>");
+          "<gateway-sender id=\"ln\" remote-distributed-system-id=\"2\" parallel=\"false\" manual-start=\"false\" enable-batch-conflation=\"false\" enable-persistence=\"false\" disk-synchronous=\"true\" group-transaction-events=\"false\" enforce-threads-connect-same-receiver=\"false\"/>");
     });
 
     // destroy gateway sender and verify AEQs cleaned up
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 1199fb9..4b7e330 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -18,8 +18,10 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.Vector;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 
 import org.apache.logging.log4j.Logger;
 
@@ -63,6 +65,9 @@
 
   private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock();
 
+  protected static final String maxAttemptsReachedConnectingServerIdExceptionMessage =
+      "Reached max attempts number trying to connect to desired server id";
+
   /*
    * Called after each attempt at processing an outbound (dispatch) or inbound (ack)
    * message, whether the attempt is successful or not. The purpose is testability.
@@ -86,7 +91,6 @@
   public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
     this.processor = eventProcessor;
     this.sender = eventProcessor.getSender();
-    // this.ackReaderThread = new AckReaderThread(sender);
     try {
       initializeConnection();
     } catch (GatewaySenderException e) {
@@ -362,11 +366,70 @@
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to server " + connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      return con;
+    }
+
+    int attempt = 0;
+    final int attemptsPerServer = 5;
+    int maxAttempts = attemptsPerServer;
+    Vector<String> notExpectedServerIds = new Vector<String>();
+    boolean connectedToExpectedReceiver = connectedServerId.equals(expectedServerId);
+    while (!connectedToExpectedReceiver) {
+
+      if (isDebugEnabled) {
+        logger.debug("Dispatcher wants to connect to [" + expectedServerId
+            + "] but got connection to [" + connectedServerId + "]");
+      }
+      attempt++;
+      if (!notExpectedServerIds.contains(connectedServerId)) {
+        if (isDebugEnabled) {
+          logger.debug(
+              "Increasing dispatcher connection max retries number due to connection to unknown server ("
+                  + connectedServerId + ")");
+        }
+        notExpectedServerIds.add(connectedServerId);
+        maxAttempts += attemptsPerServer;
+      }
+
+      if (attempt >= maxAttempts) {
+        throw new ServerConnectivityException(maxAttemptsReachedConnectingServerIdExceptionMessage
+            + " [" + expectedServerId + "] (" + maxAttempts + " attempts).");
+      }
+
+      con.destroy();
+      this.sender.getProxy().returnConnection(con);
+      con = this.sender.getProxy().acquireConnection();
+
+      connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+      if (connectedServerId.equals(expectedServerId)) {
+        connectedToExpectedReceiver = true;
+      }
+    }
+
+    if (isDebugEnabled) {
+      logger.debug("Dispatcher connected to expected endpoint " + connectedServerId
+          + " after " + attempt + " retries.");
+    }
+    return con;
+  }
+
   /**
    * Initializes the <code>Connection</code>.
    *
    */
+  @VisibleForTesting
   void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
     if (ackReaderThread != null) {
       ackReaderThread.shutDownAckReaderConnection(connection);
     }
@@ -397,26 +460,24 @@
           synchronized (this.sender.getLockForConcurrentDispatcher()) {
             ServerLocation server = this.sender.getServerLocation();
             if (server != null) {
-              if (logger.isDebugEnabled()) {
+              if (isDebugEnabled) {
                 logger.debug("ServerLocation is: {}. Connecting to this serverLocation...", server);
               }
               con = this.sender.getProxy().acquireConnection(server);
             } else {
-              if (logger.isDebugEnabled()) {
+              if (isDebugEnabled) {
                 logger.debug("ServerLocation is null. Creating new connection. ");
               }
               con = this.sender.getProxy().acquireConnection();
-              // Acquired connection from pool!! Update the server location
-              // information in the sender and
-              // distribute the information to other senders ONLY IF THIS SENDER
-              // IS
-              // PRIMARY
-              if (this.sender.isPrimary()) {
-                if (sender.getServerLocation() == null) {
-                  sender.setServerLocation(con.getServer());
-                }
-                new UpdateAttributesProcessor(this.sender).distribute(false);
+            }
+            if (this.sender.getEnforceThreadsConnectSameReceiver()) {
+              con = retryInitializeConnection(con);
+            }
+            if (this.sender.isPrimary()) {
+              if (sender.getServerLocation() == null) {
+                sender.setServerLocation(con.getServer());
               }
+              new UpdateAttributesProcessor(this.sender).distribute(false);
             }
           }
         }
@@ -486,6 +547,12 @@
                 "No available connection was found, but the following active servers exist: %s",
                 buffer.toString());
       }
+      if (this.sender.getEnforceThreadsConnectSameReceiver() && e.getMessage() != null) {
+        if (Pattern.compile(maxAttemptsReachedConnectingServerIdExceptionMessage + ".*")
+            .matcher(e.getMessage()).find()) {
+          ioMsg += " " + e.getMessage();
+        }
+      }
       IOException ex = new IOException(ioMsg);
       gse = new GatewaySenderException(
           String.format("%s : Could not connect due to: %s",
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
index 2a7cfd7..c0d2051 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
@@ -199,6 +199,13 @@
   }
 
   @Override
+  public GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
+      boolean enforceThreadsConnectSameReceiver) {
+    this.attrs.enforceThreadsConnectSameReceiver = enforceThreadsConnectSameReceiver;
+    return this;
+  }
+
+  @Override
   public GatewaySender create(String id, int remoteDSId) {
     int myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager()
         .getDistributedSystemId();
@@ -291,7 +298,6 @@
       if (this.cache instanceof GemFireCacheImpl) {
         sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs);
         this.cache.addGatewaySender(sender);
-
         if (!this.attrs.isManualStart()) {
           sender.start();
         }
@@ -394,5 +400,7 @@
     }
     this.attrs.eventSubstitutionFilter = senderCreation.getGatewayEventSubstitutionFilter();
     this.attrs.groupTransactionEvents = senderCreation.mustGroupTransactionEvents();
+    this.attrs.enforceThreadsConnectSameReceiver =
+        senderCreation.getEnforceThreadsConnectSameReceiver();
   }
 }
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
index f18ce81..ef3e599 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
@@ -35,7 +35,7 @@
   @Override
   public void initializeEventDispatcher() {
     if (logger.isDebugEnabled()) {
-      logger.debug(" Creating the GatewayEventRemoteDispatcher");
+      logger.debug("Creating the GatewayEventRemoteDispatcher");
     }
     // In case of serial there is a way to create gatewaySender and attach
     // asyncEventListener. Not sure of the use-case but there are dunit tests
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index 3474b4a..97436a2 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -228,6 +228,7 @@
     pf.dispatcherThreads = getDispatcherThreads();
     pf.orderPolicy = getOrderPolicy();
     pf.serverLocation = this.getServerLocation();
+    pf.enforceThreadsConnectSameReceiver = getEnforceThreadsConnectSameReceiver();
   }
 
   @Override
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
index 7cfe1f5..8b35ab1 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
@@ -14,9 +14,11 @@
  */
 package org.apache.geode.internal.cache.wan;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -24,11 +26,64 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.Endpoint;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
+import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
 
 public class GatewaySenderEventRemoteDispatcherJUnitTest {
+
+  @Mock
+  private AbstractGatewaySender senderMock;
+
+  @Mock
+  private AbstractGatewaySenderEventProcessor eventProcessorMock;
+
+  @InjectMocks
+  private GatewaySenderEventRemoteDispatcher eventDispatcher;
+
+  @Mock
+  private PoolImpl poolMock;
+
+  @Mock
+  private Connection connectionMock;
+
+  @Mock
+  private ServerQueueStatus serverQueueStatusMock;
+
+  @Mock
+  private Endpoint endpointMock;
+
+  @Mock
+  private DistributedMember memberIdMock;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    when(eventProcessorMock.getSender()).thenReturn(senderMock);
+
+    when(senderMock.isParallel()).thenReturn(false);
+    when(senderMock.getLockForConcurrentDispatcher()).thenReturn(new Object());
+    when(senderMock.getProxy()).thenReturn(poolMock);
+
+    when(poolMock.isDestroyed()).thenReturn(false);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock);
+
+    when(connectionMock.getQueueStatus()).thenReturn(serverQueueStatusMock);
+  }
+
   @Test
   public void getConnectionShouldShutdownTheAckThreadReaderWhenEventProcessorIsShutDown() {
     AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
@@ -46,7 +101,7 @@
   }
 
   @Test
-  public void shuttingDownAckThreadReaderConnectionShouldshutdownTheAckThreadReader() {
+  public void shuttingDownAckThreadReaderConnectionShouldShutdownTheAckThreadReader() {
     AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
     AbstractGatewaySenderEventProcessor eventProcessor =
         mock(AbstractGatewaySenderEventProcessor.class);
@@ -77,4 +132,161 @@
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void initializeConnectionWithParallelSenderDoesNotRetryInitializeConnection() {
+    when(senderMock.isParallel()).thenReturn(true);
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).setServerLocation(any());
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameRecieverFalseDoesNotRetryInitializeConnection() {
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndNoExpectedReceiverIdSetsReceiverIdAndDoesNotReacquireConnection() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(1)).setExpectedReceiverUniqueId("receiverId");
+  }
+
+  @Test
+  public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverDoesNotReacquireConnection() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("expectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
+  @Test
+  public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverOnSecondTryReacquiresConnectionOnce() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("notExpectedId").thenReturn("expectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(2)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+
+  }
+
+  @Test
+  public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndNoServersAvailableThrowsException() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("notExpectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+
+    String expectedExceptionMessage =
+        "There are no active servers. "
+            + GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage
+            + " [expectedId] (10 attempts)";
+    assertThatThrownBy(() -> {
+      dispatcherSpy.initializeConnection();
+    }).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage);
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(10)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
+  @Test
+  public void initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndServersAvailableThrowsException() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("notExpectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+    List<ServerLocationAndMemberId> currentServers = new ArrayList<>();
+    currentServers.add(new ServerLocationAndMemberId(new ServerLocation("host1", 1), "id1"));
+    currentServers.add(new ServerLocationAndMemberId(new ServerLocation("host2", 2), "id2"));
+    when(poolMock.getCurrentServers()).thenReturn(currentServers);
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+
+    String expectedExceptionMessage =
+        "No available connection was found, but the following active servers exist: host1:1@id1, host2:2@id2 "
+            + GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage
+            + " [expectedId] (10 attempts)";
+    assertThatThrownBy(() -> {
+      dispatcherSpy.initializeConnection();
+    }).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage);
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(10)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
 }