AMQ-8023 - retain sync add call, duplicate sub suppression depends on it, regression in AMQ3274Test
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 2850029..6441c92 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -33,6 +33,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1151,7 +1152,8 @@
protected void addSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) {
// Serialize with remove operations such that new sub does not cause remove/purge to fail
- serialExecutor.execute(new Runnable() {
+ // remain synchronous b/c duplicate suppression depends on add completion
+ FutureTask syncTask = new FutureTask(new Runnable() {
@Override
public void run() {
try {
@@ -1161,7 +1163,14 @@
LOG.debug("detail", e);
}
}
- });
+ }, null);
+ try {
+ serialExecutor.execute(syncTask);
+ syncTask.get();
+ } catch (Exception e) {
+ LOG.warn("failed to execute add sub command: {}, cause: {}", sub.getLocalInfo(), e);
+ LOG.debug("detail", e);
+ }
}
}