QPID-8322: [Broker-J] Fix credit restoration in window credit manager when infinite credit limit is used
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c6c312c..d1004ce 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,16 +68,15 @@
private final String _targetAddress;
- private FlowCreditManager_0_10 _creditManager;
+ private volatile FlowCreditManager_0_10 _creditManager;
private final MessageAcceptMode _acceptMode;
private final MessageAcquireMode _acquireMode;
- private MessageFlowMode _flowMode;
+ private volatile MessageFlowMode _flowMode;
private final ServerSession _session;
- private final AtomicBoolean _stopped = new AtomicBoolean(true);
- private int _deferredMessageCredit;
- private long _deferredSizeCredit;
+ private volatile int _deferredMessageCredit;
+ private volatile long _deferredSizeCredit;
private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
{
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index 3f6f086..818617b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -63,18 +63,24 @@
@Override
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
- _messageUsed -= messageCredit;
- if (_messageUsed < 0L)
+ if (_messageCreditLimit >= 0L)
{
- LOGGER.error("Message credit used value was negative: " + _messageUsed);
- _messageUsed = 0;
+ _messageUsed -= messageCredit;
+ if (_messageUsed < 0L)
+ {
+ LOGGER.warn("Message credit used value was negative: " + _messageUsed);
+ _messageUsed = 0;
+ }
}
- _bytesUsed -= bytesCredit;
- if (_bytesUsed < 0L)
+ if (_bytesCreditLimit >= 0L)
{
- LOGGER.error("Bytes credit used value was negative: " + _bytesUsed);
- _bytesUsed = 0;
+ _bytesUsed -= bytesCredit;
+ if (_bytesUsed < 0L)
+ {
+ LOGGER.warn("Bytes credit used value was negative: " + _bytesUsed);
+ _bytesUsed = 0;
+ }
}
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
index 3f1a403..d9ac43d 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
@@ -96,4 +96,19 @@
assertEquals("unexpected credit value", (long) 1, _creditManager.getMessageCredit());
assertTrue("Manager should 'haveCredit'", _creditManager.hasCredit());
}
+
+ @Test
+ public void testRestoreCreditWhenInfiniteBytesCredit()
+ {
+ _creditManager.addCredit(1, WindowCreditManager.INFINITE_CREDIT);
+
+ _creditManager.useCreditForMessage(10);
+ assertEquals(0, _creditManager.getMessageCredit());
+ assertEquals(Long.MAX_VALUE, _creditManager.getBytesCredit());
+
+ _creditManager.restoreCredit(1, 10);
+
+ assertEquals(1, _creditManager.getMessageCredit());
+ assertEquals(Long.MAX_VALUE, _creditManager.getBytesCredit());
+ }
}