merge up to r1765609 from trunk
diff --git a/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java b/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
index 1b43237..802da1d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
@@ -24,6 +24,7 @@
 
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 
@@ -58,18 +59,24 @@
         {
             if(object.isDurable() && child.isDurable())
             {
-                child.addChangeListener(this);
-                _store.update(true, child.asObjectRecord());
-
-                Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
-                Collection<Class<? extends ConfiguredObject>> childTypes =
-                        child.getModel().getChildTypes(categoryClass);
-
-                for (Class<? extends ConfiguredObject> childClass : childTypes)
+                Model model = child.getModel();
+                Collection<Class<? extends ConfiguredObject>> parentTypes =
+                        model.getParentTypes(child.getCategoryClass());
+                if(parentTypes.size() == 1 || parentTypes.iterator().next().equals(object.getCategoryClass()))
                 {
-                    for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+                    child.addChangeListener(this);
+                    _store.update(true, child.asObjectRecord());
+
+                    Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
+                    Collection<Class<? extends ConfiguredObject>> childTypes =
+                            model.getChildTypes(categoryClass);
+
+                    for (Class<? extends ConfiguredObject> childClass : childTypes)
                     {
-                        childAdded(child, grandchild);
+                        for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+                        {
+                            childAdded(child, grandchild);
+                        }
                     }
                 }
             }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java b/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
index 124fb0d..afe6482 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
@@ -59,12 +59,14 @@
         }
     }
 
-    protected final void setSuspended(final boolean suspended)
+    protected final boolean setSuspended(final boolean suspended)
     {
         if(_suspended.compareAndSet(!suspended, suspended))
         {
             notifyListeners(suspended);
+            return true;
         }
+        return false;
     }
 
     protected final void notifyIncreaseBytesCredit()
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
index 9979c8d..48e3f9f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
@@ -18,12 +18,12 @@
  */
 package org.apache.qpid.server.stats;
 
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * This class collects statistics and counts the total, rate per second and
  * peak rate per second values for the events that are registered with it. 
@@ -36,15 +36,7 @@
 
     private static final String COUNTER = "counter";
     private static final AtomicLong _counterIds = new AtomicLong(0L);
-    
-    private long _peak = 0L;
-    private long _total = 0L;
-    private long _temp = 0L;
-    private long _last = 0L;
-    private long _rate = 0L;
 
-    private long _start;
-    
     private final long _period;
     private final String _name;
 
@@ -63,10 +55,91 @@
         _period = period;
         _name = name + "-" + + _counterIds.incrementAndGet();
 
-        _start = System.currentTimeMillis();
-        _last = _start / _period;
+        _currentSample.set(new Sample(period));
     }
-    
+
+    private static final class Sample
+    {
+        private final long _sampleId;
+        private final AtomicLong _count = new AtomicLong();
+        private final AtomicLong _total;
+        private final long _peak;
+        private final long _lastRate;
+        private final long _start;
+        private final long _period;
+
+        private Sample(final long period)
+        {
+            _period = period;
+            _total = new AtomicLong();
+            _peak = 0L;
+            _lastRate = 0L;
+            _start = System.currentTimeMillis();
+            _sampleId = _start / period;
+
+        }
+
+        private Sample(final long timestamp, Sample priorSample)
+        {
+            _period = priorSample._period;
+            _sampleId = timestamp / _period;
+            _total = priorSample._total;
+            _peak = priorSample.getRate() > priorSample.getPeak() ? priorSample.getRate() : priorSample.getPeak();
+            _lastRate = priorSample.getRate();
+            _start = priorSample._start;
+        }
+
+        public long getTotal()
+        {
+            return _total.get();
+        }
+
+        public long getRate()
+        {
+            return _count.get();
+        }
+
+        public long getPeak()
+        {
+            return _peak;
+        }
+
+        public long getLastRate()
+        {
+            return _lastRate;
+        }
+
+        public long getStart()
+        {
+            return _start;
+        }
+
+        public boolean add(final long value, final long timestamp)
+        {
+            if(timestamp >= _start)
+            {
+                long eventSampleId = timestamp / _period;
+                if(eventSampleId > _sampleId)
+                {
+                    return false;
+                }
+                _total.addAndGet(value);
+                if(eventSampleId == _sampleId)
+                {
+                    _count.addAndGet(value);
+                }
+                return true;
+            }
+            else
+            {
+                // ignore - event occurred before reset;
+                return true;
+            }
+        }
+    }
+
+    private AtomicReference<Sample> _currentSample = new AtomicReference<>();
+
 
     public void registerEvent(long value)
     {
@@ -75,22 +148,12 @@
 
     public void registerEvent(long value, long timestamp)
     {
-        long thisSample = (timestamp / _period);
-        synchronized (this)
+        Sample currentSample;
+
+        while(!(currentSample = getSample()).add(value, timestamp))
         {
-            if (thisSample > _last)
-            {
-                _last = thisSample;
-                _rate = _temp;
-                _temp = 0L;
-                if (_rate > _peak)
-                {
-                    _peak = _rate;
-                }
-            }
-            
-            _total += value;
-            _temp += value;
+            Sample nextSample = new Sample(timestamp, currentSample);
+            _currentSample.compareAndSet(currentSample, nextSample);
         }
     }
     
@@ -109,40 +172,37 @@
     public void reset()
     {
         _log.info("Resetting statistics for counter: " + _name);
-        _peak = 0L;
-        _rate = 0L;
-        _total = 0L;
-        _start = System.currentTimeMillis();
-        _last = _start / _period;
+
+        _currentSample.set(new Sample(_period));
     }
 
     public double getPeak()
     {
         update();
-        return (double) _peak / ((double) _period / 1000.0d);
+        return (double) getSample().getPeak() / ((double) _period / 1000.0d);
+    }
+
+    private Sample getSample()
+    {
+        return _currentSample.get();
     }
 
     public double getRate()
     {
         update();
-        return (double) _rate / ((double) _period / 1000.0d);
+        return (double) getSample().getLastRate() / ((double) _period / 1000.0d);
     }
 
     public long getTotal()
     {
-        return _total;
+        return getSample().getTotal();
     }
 
     public long getStart()
     {
-        return _start;
+        return getSample().getStart();
     }
 
-    public Date getStartTime()
-    {
-        return new Date(_start);
-    }
-    
     public String getName()
     {
         return _name;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index ca80208..ecfd82a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -33,6 +33,9 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -298,13 +301,59 @@
         }
         else
         {
-            data = build(_rootClass, rootId);
+            data = build(_rootClass, rootId, createChildMap());
         }
 
         save(data);
     }
 
-    private Map<String, Object> build(final Class<? extends ConfiguredObject> type, final UUID id)
+    private HashMap<UUID, Map<String, SortedSet<ConfiguredObjectRecord>>> createChildMap()
+    {
+        Model model = _parent.getModel();
+        HashMap<UUID, Map<String, SortedSet<ConfiguredObjectRecord>>> map = new HashMap<>();
+
+        for(ConfiguredObjectRecord record : _objectsById.values())
+        {
+            int parentCount = record.getParents().size();
+            if (parentCount == 0)
+            {
+                continue;
+            }
+            Collection<Class<? extends ConfiguredObject>> parentTypes =
+                    model.getParentTypes(_classNameMapping.get(record.getType()));
+            if (parentTypes != null && !parentTypes.isEmpty())
+            {
+
+                final Class<? extends ConfiguredObject> primaryParentCategory =
+                        parentTypes.iterator().next();
+
+                String parentCategoryName = primaryParentCategory.getSimpleName();
+
+                UUID parentId = record.getParents().get(parentCategoryName);
+
+                if (parentId != null)
+                {
+                    Map<String, SortedSet<ConfiguredObjectRecord>> childMap = map.get(parentId);
+                    if (childMap == null)
+                    {
+                        childMap = new TreeMap<>();
+                        map.put(parentId, childMap);
+                    }
+                    SortedSet<ConfiguredObjectRecord> children = childMap.get(record.getType());
+                    if (children == null)
+                    {
+                        children = new TreeSet<>(CONFIGURED_OBJECT_RECORD_COMPARATOR);
+                        childMap.put(record.getType(), children);
+                    }
+                    children.add(record);
+                }
+            }
+        }
+        return map;
+    }
+
+    private Map<String, Object> build(final Class<? extends ConfiguredObject> type, final UUID id,
+                                      Map<UUID, Map<String, SortedSet<ConfiguredObjectRecord>>> childMap)
     {
         ConfiguredObjectRecord record = _objectsById.get(id);
         Map<String,Object> map = new LinkedHashMap<String, Object>();
@@ -332,41 +381,24 @@
 
         Collections.sort(childClasses, CATEGORY_CLASS_COMPARATOR);
 
-        for(Class<? extends ConfiguredObject> childClass : childClasses)
+        final Map<String, SortedSet<ConfiguredObjectRecord>> allChildren = childMap.get(id);
+        if(allChildren != null && !allChildren.isEmpty())
         {
-            // only add if this is the "first" parent
-            if(_parent.getModel().getParentTypes(childClass).iterator().next() == type)
+            for(Map.Entry<String, SortedSet<ConfiguredObjectRecord>> entry : allChildren.entrySet())
             {
-                String singularName = childClass.getSimpleName().toLowerCase();
+                String singularName = entry.getKey().toLowerCase();
                 String attrName = singularName + (singularName.endsWith("s") ? "es" : "s");
-                List<UUID> childIds = _idsByType.get(childClass.getSimpleName());
-                if(childIds != null)
+                final SortedSet<ConfiguredObjectRecord> sortedChildren = entry.getValue();
+                List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>();
+
+                for(ConfiguredObjectRecord childRecord : sortedChildren)
                 {
-                    List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>();
-                    List<ConfiguredObjectRecord> sortedChildren = new ArrayList<>();
-                    for(UUID childId : childIds)
-                    {
-                        ConfiguredObjectRecord childRecord = _objectsById.get(childId);
+                    entities.add(build(_classNameMapping.get(entry.getKey()), childRecord.getId(), childMap));
+                }
 
-                        final UUID parent = childRecord.getParents().get(type.getSimpleName());
-                        String parentId = parent.toString();
-                        if(id.toString().equals(parentId))
-                        {
-                            sortedChildren.add(childRecord);
-                        }
-                    }
-
-                    Collections.sort(sortedChildren, CONFIGURED_OBJECT_RECORD_COMPARATOR);
-
-                    for(ConfiguredObjectRecord childRecord : sortedChildren)
-                    {
-                        entities.add(build(childClass, childRecord.getId()));
-                    }
-
-                    if(!entities.isEmpty())
-                    {
-                        map.put(attrName,entities);
-                    }
+                if(!entities.isEmpty())
+                {
+                    map.put(attrName,entities);
                 }
             }
         }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index 7722ac3..ff64fe8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -87,9 +87,9 @@
             public void performAction(final ProtocolEngine object)
             {
                 NetworkConnectionScheduler scheduler = getScheduler();
-                if(scheduler != null)
+                if(scheduler != null && !_scheduled.get())
                 {
-                    scheduler.schedule(NonBlockingConnection.this);
+                    getScheduler().schedule(NonBlockingConnection.this);
                 }
             }
         });
diff --git a/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
index 5e8bf33..8f10241 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
@@ -75,6 +75,7 @@
         when(child.getCategoryClass()).thenReturn(VirtualHost.class);
         Model model = mock(Model.class);
         when(model.getChildTypes(any(Class.class))).thenReturn(Collections.<Class<? extends ConfiguredObject>>emptyList());
+        when(model.getParentTypes(eq(VirtualHost.class))).thenReturn(Collections.<Class<? extends ConfiguredObject>>singleton(Broker.class));
         when(child.getModel()).thenReturn(model);
         when(child.isDurable()).thenReturn(true);
         _listener.childAdded(broker, child);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
index a36f47a..5db0c31 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
@@ -92,39 +92,6 @@
         Thread.sleep(1000);
         assertEquals(2000.0, counter.getPeak());
     }
- 
-    /**
-     * Test that peak rate is reported correctly for out-of-order messages,
-     * and the total is also unaffected.
-     */
-    public void testPeakOutOfOrder() throws Exception
-    {
-        StatisticsCounter counter = new StatisticsCounter("test", 1000L);
-        long start = counter.getStart();
-        assertEquals(0.0, counter.getPeak());
-        counter.registerEvent(1000, start + 2500);
-        Thread.sleep(1500);
-        assertEquals(0.0, counter.getPeak());
-        counter.registerEvent(2000, start + 1500);
-
-        // make sure, that getPeak invocation occurs at "start + 2500"
-        // if test thread over-sleeps for 500+ mls
-        // the peak value can be incremented and test will fail
-        long sleep = start + 2500 - System.currentTimeMillis();
-        Thread.sleep(sleep < 0 ? 0 : sleep);
-        assertEquals(0.0, counter.getPeak());
-        counter.registerEvent(1000, start + 500);
-        Thread.sleep(1500);
-        assertEquals(4000.0, counter.getPeak());
-        Thread.sleep(2000);
-        assertEquals(4000.0, counter.getPeak());
-        counter.registerEvent(1000, start + 500);
-        assertEquals(4000.0, counter.getPeak());
-        Thread.sleep(2000);
-        counter.registerEvent(1000);
-        assertEquals(4000.0, counter.getPeak());
-        assertEquals(6000, counter.getTotal());
-    }
 
     /**
      * Test the current rate is generated correctly.
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index 9570c85..982b12c 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -61,17 +61,17 @@
             notifyIncrease = notifyIncrease && bytesCredit>0;
             _bytesCredit += bytesCredit;
 
-
-
-            if(notifyIncrease)
-            {
-                notifyIncreaseBytesCredit();
-            }
+        }
+        else
+        {
+            notifyIncrease = false;
         }
 
+        if(!setSuspended(!hasCredit()) && notifyIncrease)
+        {
+            notifyIncreaseBytesCredit();
 
-
-        setSuspended(!hasCredit());
+        }
 
     }
 
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
index a37cb29..bd42b3f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
@@ -49,18 +49,15 @@
 
     public void init()
     {
-        _out.position(_out.limit());
-        _out.limit(_out.capacity());
-        QpidByteBuffer old = _out;
-        if(_out.remaining() < _threshold)
+        if(_out.capacity() < _threshold)
         {
+            _out.dispose();
             _out = QpidByteBuffer.allocateDirect(_initialCapacity);
         }
         else
         {
-            _out = _out.slice();
+            _out.clear();
         }
-        old.dispose();
     }
 
     public QpidByteBuffer getBuffer()
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index f9776b5..e512ee9 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -71,37 +71,32 @@
     public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
     {
         _messageUsed -= messageCredit;
-        if(_messageUsed < 0L)
+        if (_messageUsed < 0L)
         {
-            LOGGER.error("Message credit used value was negative: "+ _messageUsed);
+            LOGGER.error("Message credit used value was negative: " + _messageUsed);
             _messageUsed = 0;
         }
 
         boolean notifyIncrease = true;
 
-        if(_messageCreditLimit > 0L)
+        if (_messageCreditLimit > 0L)
         {
             notifyIncrease = (_messageUsed != _messageCreditLimit);
         }
 
         _bytesUsed -= bytesCredit;
-        if(_bytesUsed < 0L)
+        if (_bytesUsed < 0L)
         {
-            LOGGER.error("Bytes credit used value was negative: "+ _bytesUsed);
+            LOGGER.error("Bytes credit used value was negative: " + _bytesUsed);
             _bytesUsed = 0;
         }
 
-        if(_bytesCreditLimit > 0L)
+        notifyIncrease = notifyIncrease && bytesCredit > 0 && _bytesCreditLimit > 0L ;
+
+        if (!setSuspended(!hasCredit()) && notifyIncrease)
         {
-            notifyIncrease = notifyIncrease && bytesCredit>0;
-
-            if(notifyIncrease)
-            {
-                notifyIncreaseBytesCredit();
-            }
+            notifyIncreaseBytesCredit();
         }
-
-        setSuspended(!hasCredit());
     }
 
 
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index ffd6f16..4e68e1f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -33,6 +33,7 @@
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
@@ -86,6 +87,8 @@
         when(modelConnection.getContextProvider()).thenReturn(_virtualHost);
         when(modelConnection.getBroker()).thenReturn((Broker)broker);
         when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
+        when(modelConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
+        when(modelConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
         Subject subject = new Subject();
         when(modelConnection.getSubject()).thenReturn(subject);
         when(modelConnection.getMaxMessageSize()).thenReturn(1024l);
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
index b5f04a7..e61fbea 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
@@ -90,12 +90,14 @@
                     _bytesCreditLimit));
         }
 
-        if (_bytesCreditLimit != 0 && _bytesCredit > 0 && bytesCredit > 0 && hadCredit)
+        boolean suspended = !hasCredit();
+        if(!setSuspended(suspended))
         {
-            notifyIncreaseBytesCredit();
+            if (!suspended && _bytesCreditLimit != 0 && _bytesCredit > 0 && bytesCredit > 0 && hadCredit)
+            {
+                notifyIncreaseBytesCredit();
+            }
         }
-
-        setSuspended(!hasCredit());
     }
 
     public synchronized boolean hasCredit()
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
index 708e164..968d26b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
@@ -51,6 +51,7 @@
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -115,6 +116,8 @@
         when(_amqConnection.getBroker()).thenReturn((Broker) _broker);
         when(_amqConnection.getMethodRegistry()).thenReturn(new MethodRegistry(ProtocolVersion.v0_9));
         when(_amqConnection.getContextProvider()).thenReturn(_virtualHost);
+        when(_amqConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
+        when(_amqConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
         when(_amqConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
         _messageDestination = mock(MessageDestination.class);
     }