QPID-8225 : Fix incorrect implementation of infinite credit
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 8d42d2b..c58f897 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -513,11 +513,10 @@
         switch (unit)
         {
             case MESSAGE:
-
                 creditManager.addCredit(value, 0L);
                 break;
             case BYTE:
-                creditManager.addCredit(0l, value);
+                creditManager.addCredit(0L, value);
                 break;
         }
         updateNotifyWorkDesired();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index 698351c..6cee5dd 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -21,8 +21,12 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class CreditCreditManager implements FlowCreditManager_0_10
 {
+    private static final Logger LOG = LoggerFactory.getLogger(CreditCreditManager.class);
     private volatile long _bytesCredit;
     private volatile long _messageCredit;
 
@@ -41,22 +45,51 @@
     @Override
     public synchronized void addCredit(final long messageCredit, final long bytesCredit)
     {
-        if(_messageCredit >= 0L && messageCredit > 0L)
+
+        if(_messageCredit >= 0L)
         {
-            _messageCredit += messageCredit;
+            if(messageCredit == INFINITE_CREDIT)
+            {
+                _messageCredit = -1L;
+            }
+            else
+            {
+                _messageCredit += messageCredit;
+                if (_messageCredit < 0L)
+                {
+                    LOG.warn("Message credit wraparound: attempt to add {} message credit to existing total of {}",
+                             messageCredit,
+                             _messageCredit - messageCredit);
+                    _messageCredit = Long.MAX_VALUE;
+                }
+            }
         }
 
-        if(_bytesCredit >= 0L && bytesCredit > 0L)
+        if(_bytesCredit >= 0L)
         {
-            _bytesCredit += bytesCredit;
+            if(bytesCredit == INFINITE_CREDIT)
+            {
+                _bytesCredit = -1L;
+            }
+            else
+            {
+                _bytesCredit += bytesCredit;
+                if (_bytesCredit < 0L)
+                {
+                    LOG.warn("Bytes credit wraparound: attempt to add {} bytes credit to existing total of {}",
+                             bytesCredit,
+                             _bytesCredit - bytesCredit);
+                    _bytesCredit = Long.MAX_VALUE;
+                }
+            }
         }
     }
 
     @Override
     public synchronized void clearCredit()
     {
-        _bytesCredit = 0l;
-        _messageCredit = 0l;
+        _bytesCredit = 0L;
+        _messageCredit = 0L;
     }
 
 
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
index a8ee98e..a4c32fd 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/FlowCreditManager_0_10.java
@@ -24,6 +24,8 @@
 
 public interface FlowCreditManager_0_10 extends FlowCreditManager
 {
+    long INFINITE_CREDIT = 0xFFFFFFFFL;
+
     void addCredit(long count, long bytes);
 
     void clearCredit();
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index d5c09d4..3f6f086 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -140,30 +140,43 @@
     @Override
     public synchronized void addCredit(long count, long bytes)
     {
-        if(bytes > 0)
+        if(bytes == INFINITE_CREDIT)
+        {
+            _bytesCreditLimit = -1L;
+        }
+        else if(_bytesCreditLimit >= 0L)
         {
             _bytesCreditLimit += bytes;
+            if (_bytesCreditLimit < 0L)
+            {
+                LOGGER.warn("Bytes credit wraparound: attempt to add {} bytes credit to existing total of {}",
+                         bytes,
+                         _bytesCreditLimit - bytes);
+                _bytesCreditLimit = Long.MAX_VALUE;
+            }
         }
-        else if(bytes == -1)
+
+        if(count == INFINITE_CREDIT)
         {
-            _bytesCreditLimit = -1;
+            _messageCreditLimit = -1L;
         }
-
-
-        if(count > 0)
+        else if(_messageCreditLimit >= 0L)
         {
             _messageCreditLimit += count;
-        }
-        else if(count == -1)
-        {
-            _messageCreditLimit = -1;
+            if (_messageCreditLimit < 0L)
+            {
+                LOGGER.warn("Message credit wraparound: attempt to add {} message credit to existing total of {}",
+                         count,
+                         _messageCreditLimit - count);
+                _messageCreditLimit = Long.MAX_VALUE;
+            }
         }
     }
 
     @Override
     public synchronized void clearCredit()
     {
-        _bytesCreditLimit = 0l;
-        _messageCreditLimit = 0l;
+        _bytesCreditLimit = 0L;
+        _messageCreditLimit = 0L;
     }
 }