QPID-8225 : Fix incorrect implementation of infinite credit
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 8d42d2b..c58f897 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -513,11 +513,10 @@
switch (unit)
{
case MESSAGE:
-
creditManager.addCredit(value, 0L);
break;
case BYTE:
- creditManager.addCredit(0l, value);
+ creditManager.addCredit(0L, value);
break;
}
updateNotifyWorkDesired();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index 698351c..6cee5dd 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -21,8 +21,12 @@
package org.apache.qpid.server.protocol.v0_10;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class CreditCreditManager implements FlowCreditManager_0_10
{
+ private static final Logger LOG = LoggerFactory.getLogger(CreditCreditManager.class);
private volatile long _bytesCredit;
private volatile long _messageCredit;
@@ -41,22 +45,51 @@
@Override
public synchronized void addCredit(final long messageCredit, final long bytesCredit)
{
- if(_messageCredit >= 0L && messageCredit > 0L)
+
+ if(_messageCredit >= 0L)
{
- _messageCredit += messageCredit;
+ if(messageCredit == INFINITE_CREDIT)
+ {
+ _messageCredit = -1L;
+ }
+ else
+ {
+ _messageCredit += messageCredit;
+ if (_messageCredit < 0L)
+ {
+ LOG.warn("Message credit wraparound: attempt to add {} message credit to existing total of {}",
+ messageCredit,
+ _messageCredit - messageCredit);
+ _messageCredit = Long.MAX_VALUE;
+ }
+ }
}
- if(_bytesCredit >= 0L && bytesCredit > 0L)
+ if(_bytesCredit >= 0L)
{
- _bytesCredit += bytesCredit;
+ if(bytesCredit == INFINITE_CREDIT)
+ {
+ _bytesCredit = -1L;
+ }
+ else
+ {
+ _bytesCredit += bytesCredit;
+ if (_bytesCredit < 0L)
+ {
+ LOG.warn("Bytes credit wraparound: attempt to add {} bytes credit to existing total of {}",
+ bytesCredit,
+ _bytesCredit - bytesCredit);
+ _bytesCredit = Long.MAX_VALUE;
+ }
+ }
}
}
@Override
public synchronized void clearCredit()
{
- _bytesCredit = 0l;
- _messageCredit = 0l;
+ _bytesCredit = 0L;
+ _messageCredit = 0L;
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
index a8ee98e..a4c32fd 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
@@ -24,6 +24,8 @@
public interface FlowCreditManager_0_10 extends FlowCreditManager
{
+ long INFINITE_CREDIT = 0xFFFFFFFFL;
+
void addCredit(long count, long bytes);
void clearCredit();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index d5c09d4..3f6f086 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -140,30 +140,43 @@
@Override
public synchronized void addCredit(long count, long bytes)
{
- if(bytes > 0)
+ if(bytes == INFINITE_CREDIT)
+ {
+ _bytesCreditLimit = -1L;
+ }
+ else if(_bytesCreditLimit >= 0L)
{
_bytesCreditLimit += bytes;
+ if (_bytesCreditLimit < 0L)
+ {
+ LOGGER.warn("Bytes credit wraparound: attempt to add {} bytes credit to existing total of {}",
+ bytes,
+ _bytesCreditLimit - bytes);
+ _bytesCreditLimit = Long.MAX_VALUE;
+ }
}
- else if(bytes == -1)
+
+ if(count == INFINITE_CREDIT)
{
- _bytesCreditLimit = -1;
+ _messageCreditLimit = -1L;
}
-
-
- if(count > 0)
+ else if(_messageCreditLimit >= 0L)
{
_messageCreditLimit += count;
- }
- else if(count == -1)
- {
- _messageCreditLimit = -1;
+ if (_messageCreditLimit < 0L)
+ {
+ LOGGER.warn("Message credit wraparound: attempt to add {} message credit to existing total of {}",
+ count,
+ _messageCreditLimit - count);
+ _messageCreditLimit = Long.MAX_VALUE;
+ }
}
}
@Override
public synchronized void clearCredit()
{
- _bytesCreditLimit = 0l;
- _messageCreditLimit = 0l;
+ _bytesCreditLimit = 0L;
+ _messageCreditLimit = 0L;
}
}