merge up to r1765609 from trunk
diff --git a/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java b/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
index 1b43237..802da1d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
@@ -24,6 +24,7 @@
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -58,18 +59,24 @@
{
if(object.isDurable() && child.isDurable())
{
- child.addChangeListener(this);
- _store.update(true, child.asObjectRecord());
-
- Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
- Collection<Class<? extends ConfiguredObject>> childTypes =
- child.getModel().getChildTypes(categoryClass);
-
- for (Class<? extends ConfiguredObject> childClass : childTypes)
+ Model model = child.getModel();
+ Collection<Class<? extends ConfiguredObject>> parentTypes =
+ model.getParentTypes(child.getCategoryClass());
+ if(parentTypes.size() == 1 || parentTypes.iterator().next().equals(object.getCategoryClass()))
{
- for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+ child.addChangeListener(this);
+ _store.update(true, child.asObjectRecord());
+
+ Class<? extends ConfiguredObject> categoryClass = child.getCategoryClass();
+ Collection<Class<? extends ConfiguredObject>> childTypes =
+ model.getChildTypes(categoryClass);
+
+ for (Class<? extends ConfiguredObject> childClass : childTypes)
{
- childAdded(child, grandchild);
+ for (ConfiguredObject<?> grandchild : child.getChildren(childClass))
+ {
+ childAdded(child, grandchild);
+ }
}
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java b/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
index 124fb0d..afe6482 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
@@ -59,12 +59,14 @@
}
}
- protected final void setSuspended(final boolean suspended)
+ protected final boolean setSuspended(final boolean suspended)
{
if(_suspended.compareAndSet(!suspended, suspended))
{
notifyListeners(suspended);
+ return true;
}
+ return false;
}
protected final void notifyIncreaseBytesCredit()
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
index 9979c8d..48e3f9f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
@@ -18,12 +18,12 @@
*/
package org.apache.qpid.server.stats;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* This class collects statistics and counts the total, rate per second and
* peak rate per second values for the events that are registered with it.
@@ -36,15 +36,7 @@
private static final String COUNTER = "counter";
private static final AtomicLong _counterIds = new AtomicLong(0L);
-
- private long _peak = 0L;
- private long _total = 0L;
- private long _temp = 0L;
- private long _last = 0L;
- private long _rate = 0L;
- private long _start;
-
private final long _period;
private final String _name;
@@ -63,10 +55,91 @@
_period = period;
_name = name + "-" + + _counterIds.incrementAndGet();
- _start = System.currentTimeMillis();
- _last = _start / _period;
+ _currentSample.set(new Sample(period));
}
-
+
+ private static final class Sample
+ {
+ private final long _sampleId;
+ private final AtomicLong _count = new AtomicLong();
+ private final AtomicLong _total;
+ private final long _peak;
+ private final long _lastRate;
+ private final long _start;
+ private final long _period;
+
+ private Sample(final long period)
+ {
+ _period = period;
+ _total = new AtomicLong();
+ _peak = 0L;
+ _lastRate = 0L;
+ _start = System.currentTimeMillis();
+ _sampleId = _start / period;
+
+ }
+
+ private Sample(final long timestamp, Sample priorSample)
+ {
+ _period = priorSample._period;
+ _sampleId = timestamp / _period;
+ _total = priorSample._total;
+ _peak = priorSample.getRate() > priorSample.getPeak() ? priorSample.getRate() : priorSample.getPeak();
+ _lastRate = priorSample.getRate();
+ _start = priorSample._start;
+ }
+
+ public long getTotal()
+ {
+ return _total.get();
+ }
+
+ public long getRate()
+ {
+ return _count.get();
+ }
+
+ public long getPeak()
+ {
+ return _peak;
+ }
+
+ public long getLastRate()
+ {
+ return _lastRate;
+ }
+
+ public long getStart()
+ {
+ return _start;
+ }
+
+ public boolean add(final long value, final long timestamp)
+ {
+ if(timestamp >= _start)
+ {
+ long eventSampleId = timestamp / _period;
+ if(eventSampleId > _sampleId)
+ {
+ return false;
+ }
+ _total.addAndGet(value);
+ if(eventSampleId == _sampleId)
+ {
+ _count.addAndGet(value);
+ }
+ return true;
+ }
+ else
+ {
+ // ignore - event occurred before reset;
+ return true;
+ }
+ }
+ }
+
+ private AtomicReference<Sample> _currentSample = new AtomicReference<>();
+
public void registerEvent(long value)
{
@@ -75,22 +148,12 @@
public void registerEvent(long value, long timestamp)
{
- long thisSample = (timestamp / _period);
- synchronized (this)
+ Sample currentSample;
+
+ while(!(currentSample = getSample()).add(value, timestamp))
{
- if (thisSample > _last)
- {
- _last = thisSample;
- _rate = _temp;
- _temp = 0L;
- if (_rate > _peak)
- {
- _peak = _rate;
- }
- }
-
- _total += value;
- _temp += value;
+ Sample nextSample = new Sample(timestamp, currentSample);
+ _currentSample.compareAndSet(currentSample, nextSample);
}
}
@@ -109,40 +172,37 @@
public void reset()
{
_log.info("Resetting statistics for counter: " + _name);
- _peak = 0L;
- _rate = 0L;
- _total = 0L;
- _start = System.currentTimeMillis();
- _last = _start / _period;
+
+ _currentSample.set(new Sample(_period));
}
public double getPeak()
{
update();
- return (double) _peak / ((double) _period / 1000.0d);
+ return (double) getSample().getPeak() / ((double) _period / 1000.0d);
+ }
+
+ private Sample getSample()
+ {
+ return _currentSample.get();
}
public double getRate()
{
update();
- return (double) _rate / ((double) _period / 1000.0d);
+ return (double) getSample().getLastRate() / ((double) _period / 1000.0d);
}
public long getTotal()
{
- return _total;
+ return getSample().getTotal();
}
public long getStart()
{
- return _start;
+ return getSample().getStart();
}
- public Date getStartTime()
- {
- return new Date(_start);
- }
-
public String getName()
{
return _name;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index ca80208..ecfd82a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -33,6 +33,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -298,13 +301,59 @@
}
else
{
- data = build(_rootClass, rootId);
+ data = build(_rootClass, rootId, createChildMap());
}
save(data);
}
- private Map<String, Object> build(final Class<? extends ConfiguredObject> type, final UUID id)
+ private HashMap<UUID, Map<String, SortedSet<ConfiguredObjectRecord>>> createChildMap()
+ {
+ Model model = _parent.getModel();
+ HashMap<UUID, Map<String, SortedSet<ConfiguredObjectRecord>>> map = new HashMap<>();
+
+ for(ConfiguredObjectRecord record : _objectsById.values())
+ {
+ int parentCount = record.getParents().size();
+ if (parentCount == 0)
+ {
+ continue;
+ }
+ Collection<Class<? extends ConfiguredObject>> parentTypes =
+ model.getParentTypes(_classNameMapping.get(record.getType()));
+ if (parentTypes != null && !parentTypes.isEmpty())
+ {
+
+ final Class<? extends ConfiguredObject> primaryParentCategory =
+ parentTypes.iterator().next();
+
+ String parentCategoryName = primaryParentCategory.getSimpleName();
+
+ UUID parentId = record.getParents().get(parentCategoryName);
+
+ if (parentId != null)
+ {
+ Map<String, SortedSet<ConfiguredObjectRecord>> childMap = map.get(parentId);
+ if (childMap == null)
+ {
+ childMap = new TreeMap<>();
+ map.put(parentId, childMap);
+ }
+ SortedSet<ConfiguredObjectRecord> children = childMap.get(record.getType());
+ if (children == null)
+ {
+ children = new TreeSet<>(CONFIGURED_OBJECT_RECORD_COMPARATOR);
+ childMap.put(record.getType(), children);
+ }
+ children.add(record);
+ }
+ }
+ }
+ return map;
+ }
+
+ private Map<String, Object> build(final Class<? extends ConfiguredObject> type, final UUID id,
+ Map<UUID, Map<String, SortedSet<ConfiguredObjectRecord>>> childMap)
{
ConfiguredObjectRecord record = _objectsById.get(id);
Map<String,Object> map = new LinkedHashMap<String, Object>();
@@ -332,41 +381,24 @@
Collections.sort(childClasses, CATEGORY_CLASS_COMPARATOR);
- for(Class<? extends ConfiguredObject> childClass : childClasses)
+ final Map<String, SortedSet<ConfiguredObjectRecord>> allChildren = childMap.get(id);
+ if(allChildren != null && !allChildren.isEmpty())
{
- // only add if this is the "first" parent
- if(_parent.getModel().getParentTypes(childClass).iterator().next() == type)
+ for(Map.Entry<String, SortedSet<ConfiguredObjectRecord>> entry : allChildren.entrySet())
{
- String singularName = childClass.getSimpleName().toLowerCase();
+ String singularName = entry.getKey().toLowerCase();
String attrName = singularName + (singularName.endsWith("s") ? "es" : "s");
- List<UUID> childIds = _idsByType.get(childClass.getSimpleName());
- if(childIds != null)
+ final SortedSet<ConfiguredObjectRecord> sortedChildren = entry.getValue();
+ List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>();
+
+ for(ConfiguredObjectRecord childRecord : sortedChildren)
{
- List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>();
- List<ConfiguredObjectRecord> sortedChildren = new ArrayList<>();
- for(UUID childId : childIds)
- {
- ConfiguredObjectRecord childRecord = _objectsById.get(childId);
+ entities.add(build(_classNameMapping.get(entry.getKey()), childRecord.getId(), childMap));
+ }
- final UUID parent = childRecord.getParents().get(type.getSimpleName());
- String parentId = parent.toString();
- if(id.toString().equals(parentId))
- {
- sortedChildren.add(childRecord);
- }
- }
-
- Collections.sort(sortedChildren, CONFIGURED_OBJECT_RECORD_COMPARATOR);
-
- for(ConfiguredObjectRecord childRecord : sortedChildren)
- {
- entities.add(build(childClass, childRecord.getId()));
- }
-
- if(!entities.isEmpty())
- {
- map.put(attrName,entities);
- }
+ if(!entities.isEmpty())
+ {
+ map.put(attrName,entities);
}
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index 7722ac3..ff64fe8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -87,9 +87,9 @@
public void performAction(final ProtocolEngine object)
{
NetworkConnectionScheduler scheduler = getScheduler();
- if(scheduler != null)
+ if(scheduler != null && !_scheduled.get())
{
- scheduler.schedule(NonBlockingConnection.this);
+ getScheduler().schedule(NonBlockingConnection.this);
}
}
});
diff --git a/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
index 5e8bf33..8f10241 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
@@ -75,6 +75,7 @@
when(child.getCategoryClass()).thenReturn(VirtualHost.class);
Model model = mock(Model.class);
when(model.getChildTypes(any(Class.class))).thenReturn(Collections.<Class<? extends ConfiguredObject>>emptyList());
+ when(model.getParentTypes(eq(VirtualHost.class))).thenReturn(Collections.<Class<? extends ConfiguredObject>>singleton(Broker.class));
when(child.getModel()).thenReturn(model);
when(child.isDurable()).thenReturn(true);
_listener.childAdded(broker, child);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java b/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
index a36f47a..5db0c31 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/stats/StatisticsCounterTest.java
@@ -92,39 +92,6 @@
Thread.sleep(1000);
assertEquals(2000.0, counter.getPeak());
}
-
- /**
- * Test that peak rate is reported correctly for out-of-order messages,
- * and the total is also unaffected.
- */
- public void testPeakOutOfOrder() throws Exception
- {
- StatisticsCounter counter = new StatisticsCounter("test", 1000L);
- long start = counter.getStart();
- assertEquals(0.0, counter.getPeak());
- counter.registerEvent(1000, start + 2500);
- Thread.sleep(1500);
- assertEquals(0.0, counter.getPeak());
- counter.registerEvent(2000, start + 1500);
-
- // make sure, that getPeak invocation occurs at "start + 2500"
- // if test thread over-sleeps for 500+ mls
- // the peak value can be incremented and test will fail
- long sleep = start + 2500 - System.currentTimeMillis();
- Thread.sleep(sleep < 0 ? 0 : sleep);
- assertEquals(0.0, counter.getPeak());
- counter.registerEvent(1000, start + 500);
- Thread.sleep(1500);
- assertEquals(4000.0, counter.getPeak());
- Thread.sleep(2000);
- assertEquals(4000.0, counter.getPeak());
- counter.registerEvent(1000, start + 500);
- assertEquals(4000.0, counter.getPeak());
- Thread.sleep(2000);
- counter.registerEvent(1000);
- assertEquals(4000.0, counter.getPeak());
- assertEquals(6000, counter.getTotal());
- }
/**
* Test the current rate is generated correctly.
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index 9570c85..982b12c 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -61,17 +61,17 @@
notifyIncrease = notifyIncrease && bytesCredit>0;
_bytesCredit += bytesCredit;
-
-
- if(notifyIncrease)
- {
- notifyIncreaseBytesCredit();
- }
+ }
+ else
+ {
+ notifyIncrease = false;
}
+ if(!setSuspended(!hasCredit()) && notifyIncrease)
+ {
+ notifyIncreaseBytesCredit();
-
- setSuspended(!hasCredit());
+ }
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
index a37cb29..bd42b3f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
@@ -49,18 +49,15 @@
public void init()
{
- _out.position(_out.limit());
- _out.limit(_out.capacity());
- QpidByteBuffer old = _out;
- if(_out.remaining() < _threshold)
+ if(_out.capacity() < _threshold)
{
+ _out.dispose();
_out = QpidByteBuffer.allocateDirect(_initialCapacity);
}
else
{
- _out = _out.slice();
+ _out.clear();
}
- old.dispose();
}
public QpidByteBuffer getBuffer()
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
index f9776b5..e512ee9 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
@@ -71,37 +71,32 @@
public synchronized void restoreCredit(final long messageCredit, final long bytesCredit)
{
_messageUsed -= messageCredit;
- if(_messageUsed < 0L)
+ if (_messageUsed < 0L)
{
- LOGGER.error("Message credit used value was negative: "+ _messageUsed);
+ LOGGER.error("Message credit used value was negative: " + _messageUsed);
_messageUsed = 0;
}
boolean notifyIncrease = true;
- if(_messageCreditLimit > 0L)
+ if (_messageCreditLimit > 0L)
{
notifyIncrease = (_messageUsed != _messageCreditLimit);
}
_bytesUsed -= bytesCredit;
- if(_bytesUsed < 0L)
+ if (_bytesUsed < 0L)
{
- LOGGER.error("Bytes credit used value was negative: "+ _bytesUsed);
+ LOGGER.error("Bytes credit used value was negative: " + _bytesUsed);
_bytesUsed = 0;
}
- if(_bytesCreditLimit > 0L)
+ notifyIncrease = notifyIncrease && bytesCredit > 0 && _bytesCreditLimit > 0L ;
+
+ if (!setSuspended(!hasCredit()) && notifyIncrease)
{
- notifyIncrease = notifyIncrease && bytesCredit>0;
-
- if(notifyIncrease)
- {
- notifyIncreaseBytesCredit();
- }
+ notifyIncreaseBytesCredit();
}
-
- setSuspended(!hasCredit());
}
diff --git a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index ffd6f16..4e68e1f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -33,6 +33,7 @@
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
@@ -86,6 +87,8 @@
when(modelConnection.getContextProvider()).thenReturn(_virtualHost);
when(modelConnection.getBroker()).thenReturn((Broker)broker);
when(modelConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
+ when(modelConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
+ when(modelConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
Subject subject = new Subject();
when(modelConnection.getSubject()).thenReturn(subject);
when(modelConnection.getMaxMessageSize()).thenReturn(1024l);
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
index b5f04a7..e61fbea 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
@@ -90,12 +90,14 @@
_bytesCreditLimit));
}
- if (_bytesCreditLimit != 0 && _bytesCredit > 0 && bytesCredit > 0 && hadCredit)
+ boolean suspended = !hasCredit();
+ if(!setSuspended(suspended))
{
- notifyIncreaseBytesCredit();
+ if (!suspended && _bytesCreditLimit != 0 && _bytesCredit > 0 && bytesCredit > 0 && hadCredit)
+ {
+ notifyIncreaseBytesCredit();
+ }
}
-
- setSuspended(!hasCredit());
}
public synchronized boolean hasCredit()
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
index 708e164..968d26b 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
@@ -51,6 +51,7 @@
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
@@ -115,6 +116,8 @@
when(_amqConnection.getBroker()).thenReturn((Broker) _broker);
when(_amqConnection.getMethodRegistry()).thenReturn(new MethodRegistry(ProtocolVersion.v0_9));
when(_amqConnection.getContextProvider()).thenReturn(_virtualHost);
+ when(_amqConnection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
+ when(_amqConnection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
when(_amqConnection.getEventLogger()).thenReturn(mock(EventLogger.class));
_messageDestination = mock(MessageDestination.class);
}