AMQ-8023 - retain sync add call, duplicate sub suppression depends on it, regression in AMQ3274Test
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 2850029..6441c92 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1151,7 +1152,8 @@
     protected void addSubscription(final DemandSubscription sub) throws IOException {
         if (sub != null) {
             // Serialize with remove operations such that new sub does not cause remove/purge to fail
-            serialExecutor.execute(new Runnable() {
+            // remain synchronous b/c duplicate suppression depends on add completion
+            FutureTask syncTask = new FutureTask(new Runnable() {
                 @Override
                 public void run() {
                     try {
@@ -1161,7 +1163,14 @@
                         LOG.debug("detail", e);
                     }
                 }
-            });
+            }, null);
+            try {
+                serialExecutor.execute(syncTask);
+                syncTask.get();
+            } catch (Exception e) {
+                LOG.warn("failed to execute add sub command: {}, cause: {}", sub.getLocalInfo(), e);
+                LOG.debug("detail", e);
+            }
         }
     }