PROTON-2599 Fix race that was causing some intermittent test failures
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
index 1bf4a4b..6161585 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
@@ -20,6 +20,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
@@ -30,14 +32,24 @@
/**
* Base type used to provide some common plumbing for Tracker types
+ *
+ * @param <SenderType> The client sender type that created this tracker
+ * @param <TrackerType> The actual type of tracker that is being implemented
*/
public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?>, TrackerType> {
protected final SenderType sender;
protected final OutgoingDelivery delivery;
+ @SuppressWarnings("rawtypes")
+ protected static final AtomicIntegerFieldUpdater<ClientTrackable> REMOTELY_SETTLED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClientTrackable.class, "remotelySettled");
+ @SuppressWarnings("rawtypes")
+ protected static final AtomicReferenceFieldUpdater<ClientTrackable, DeliveryState> REMOTEL_DELIVERY_STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ClientTrackable.class, DeliveryState.class, "remoteDeliveryState");
+
private ClientFuture<TrackerType> remoteSettlementFuture;
- private volatile boolean remotelySettled;
+ private volatile int remotelySettled;
private volatile DeliveryState remoteDeliveryState;
/**
@@ -69,7 +81,7 @@
}
public boolean remoteSettled() {
- return remotelySettled;
+ return remotelySettled > 0;
}
public TrackerType disposition(DeliveryState state, boolean settle) throws ClientException {
@@ -199,10 +211,10 @@
//----- Internal Event hooks for delivery updates
private void processDeliveryUpdated(OutgoingDelivery delivery) {
- remotelySettled = delivery.isRemotelySettled();
- remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState());
-
if (delivery.isRemotelySettled()) {
+ REMOTELY_SETTLED_UPDATER.lazySet(this, 1);
+ REMOTEL_DELIVERY_STATE_UPDATER.lazySet(this, ClientDeliveryState.fromProtonType(delivery.getRemoteState()));
+
if (sender.options.autoSettle()) {
delivery.settle();
}
@@ -212,6 +224,8 @@
remoteSettlementFuture.complete(self());
}
}
+ } else {
+ REMOTEL_DELIVERY_STATE_UPDATER.set(this, ClientDeliveryState.fromProtonType(delivery.getRemoteState()));
}
}
}