merge up to r1765852 from trunk
diff --git a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
index 48e3f9f..9b0fb56 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
@@ -61,52 +61,52 @@
     private static final class Sample
     {
         private final long _sampleId;
-        private final AtomicLong _count = new AtomicLong();
-        private final AtomicLong _total;
-        private final long _peak;
-        private final long _lastRate;
+        private final AtomicLong _sampleTotal = new AtomicLong();
+        private final AtomicLong _cumulativeTotal;
+        private final long _peakTotal;
+        private final long _previousSampleTotal;
         private final long _start;
         private final long _period;
 
         private Sample(final long period)
         {
             _period = period;
-            _total = new AtomicLong();
-            _peak = 0L;
-            _lastRate = 0L;
+            _cumulativeTotal = new AtomicLong();
+            _peakTotal = 0L;
+            _previousSampleTotal = 0L;
             _start = System.currentTimeMillis();
-            _sampleId = _start / period;
+            _sampleId = 0;
 
         }
 
         private Sample(final long timestamp, Sample priorSample)
         {
             _period = priorSample._period;
-            _sampleId = timestamp / _period;
-            _total = priorSample._total;
-            _peak = priorSample.getRate() > priorSample.getPeak() ? priorSample.getRate() : priorSample.getPeak();
-            _lastRate = priorSample.getRate();
+            _cumulativeTotal = priorSample._cumulativeTotal;
+            _peakTotal = priorSample.getSampleTotal() > priorSample.getPeakSampleTotal() ? priorSample.getSampleTotal() : priorSample.getPeakSampleTotal();
+            _previousSampleTotal = priorSample.getSampleTotal();
             _start = priorSample._start;
+            _sampleId = (timestamp - _start) / _period;
         }
 
-        public long getTotal()
+        public long getCumulativeTotal()
         {
-            return _total.get();
+            return _cumulativeTotal.get();
         }
 
-        public long getRate()
+        public long getSampleTotal()
         {
-            return _count.get();
+            return _sampleTotal.get();
         }
 
-        public long getPeak()
+        public long getPeakSampleTotal()
         {
-            return _peak;
+            return _peakTotal;
         }
 
-        public long getLastRate()
+        public long getPreviousSampleTotal()
         {
-            return _lastRate;
+            return _previousSampleTotal;
         }
 
         public long getStart()
@@ -118,15 +118,15 @@
         {
             if(timestamp >= _start)
             {
-                long eventSampleId = timestamp / _period;
+                long eventSampleId = (timestamp - _start) / _period;
                 if(eventSampleId > _sampleId)
                 {
                     return false;
                 }
-                _total.addAndGet(value);
+                _cumulativeTotal.addAndGet(value);
                 if(eventSampleId == _sampleId)
                 {
-                    _count.addAndGet(value);
+                    _sampleTotal.addAndGet(value);
                 }
                 return true;
             }
@@ -179,7 +179,7 @@
     public double getPeak()
     {
         update();
-        return (double) getSample().getPeak() / ((double) _period / 1000.0d);
+        return (double) getSample().getPeakSampleTotal() / ((double) _period / 1000.0d);
     }
 
     private Sample getSample()
@@ -190,12 +190,12 @@
     public double getRate()
     {
         update();
-        return (double) getSample().getLastRate() / ((double) _period / 1000.0d);
+        return (double) getSample().getPreviousSampleTotal() / ((double) _period / 1000.0d);
     }
 
     public long getTotal()
     {
-        return getSample().getTotal();
+        return getSample().getCumulativeTotal();
     }
 
     public long getStart()
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index 8855e45..faa327f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -22,16 +22,20 @@
 
 import java.io.IOException;
 import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.transport.TransportException;
 
 public class NetworkConnectionScheduler
@@ -204,7 +208,31 @@
 
     public void cancelAcceptingSocket(final ServerSocketChannel serverSocket)
     {
-        _selectorThread.cancelAcceptingSocket(serverSocket);
+        Future<Void> result = cancelAcceptingSocketAsync(serverSocket);
+        try
+        {
+            result.get(Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+                                          CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT),
+                       TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket was interrupted");
+            Thread.currentThread().interrupt();
+        }
+        catch (ExecutionException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket failed", e.getCause());
+        }
+        catch (TimeoutException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket timed out");
+        }
+    }
+
+    private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel serverSocket)
+    {
+        return _selectorThread.cancelAcceptingSocket(serverSocket);
     }
 
     public void addConnection(final SchedulableConnection connection)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index f535347..157e15b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -36,10 +36,12 @@
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -431,8 +433,9 @@
         _selectionTasks[0].wakeup();
     }
 
-    public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+    public Future<Void> cancelAcceptingSocket(final ServerSocketChannel socketChannel)
     {
+        final SettableFuture<Void> cancellationResult = SettableFuture.create();
         _tasks.add(new Runnable()
         {
             @Override
@@ -443,14 +446,33 @@
                     LOGGER.debug("Cancelling selector on accepting port {} ",
                                  socketChannel.socket().getLocalSocketAddress());
                 }
-                SelectionKey selectionKey = socketChannel.keyFor(_selectionTasks[0].getSelector());
-                if (selectionKey != null)
+
+                try
                 {
-                    selectionKey.cancel();
+                    SelectionKey selectionKey = null;
+                    try
+                    {
+                        selectionKey = socketChannel.register(_selectionTasks[0].getSelector(), 0);
+                    }
+                    catch (ClosedChannelException e)
+                    {
+                        LOGGER.error("Failed to deregister selector on accepting port {}",
+                                     socketChannel.socket().getLocalSocketAddress(), e);
+                    }
+
+                    if (selectionKey != null)
+                    {
+                        selectionKey.cancel();
+                    }
+                }
+                finally
+                {
+                    cancellationResult.set(null);
                 }
             }
         });
         _selectionTasks[0].wakeup();
+        return cancellationResult;
     }
 
     @Override
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 0e4d494..b308435 100755
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -20,6 +20,10 @@
 */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
@@ -31,10 +35,6 @@
 import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Struct;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 public class MessageMetaData_0_10 implements StorableMessageMetaData
 {
     private Header _header;
@@ -83,7 +83,7 @@
         return TYPE;
     }
 
-    public int getStorableSize()
+    public synchronized int getStorableSize()
     {
         QpidByteBuffer buf = _encoded;
 
@@ -93,13 +93,12 @@
             _encoded = buf;
         }
 
-        //TODO -- need to add stuff
         return buf.limit();
     }
 
     private QpidByteBuffer encodeAsBuffer()
     {
-        ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE);
+        ServerEncoder encoder = new ServerEncoder(ENCODER_SIZE, false);
 
         encoder.writeInt64(_arrivalTime);
         encoder.writeInt32(_bodySize);
@@ -141,28 +140,18 @@
         return buf;
     }
 
-    public int writeToBuffer(QpidByteBuffer dest)
+    public synchronized int writeToBuffer(QpidByteBuffer dest)
     {
-        QpidByteBuffer buf = _encoded;
-
-        if(buf == null)
+        if (_encoded == null)
         {
-            buf = encodeAsBuffer();
-            _encoded = buf;
+            _encoded = encodeAsBuffer();
         }
-
-        buf = buf.duplicate();
-
-        buf.position(0);
-
-        if(dest.remaining() < buf.limit())
-        {
-            buf.limit(dest.remaining());
-        }
-        dest.put(buf);
-        final int length = buf.limit();
-        buf.dispose();
-        return length;
+        dest.put(_encoded);
+        final int bytesWritten = _encoded.limit();
+        // We have special knowledge that we no longer need the encoded form after this call
+        // to reduce memory usage associated with the metadata free the encoded form here (QPID-7465)
+        clearEncodedForm();
+        return bytesWritten;
     }
 
     public int getContentSize()
@@ -178,13 +167,17 @@
     @Override
     public void dispose()
     {
-
+        clearEncodedForm();
     }
 
     @Override
-    public void clearEncodedForm()
+    public synchronized void clearEncodedForm()
     {
-
+        if (_encoded != null)
+        {
+            _encoded.dispose();
+            _encoded = null;
+        }
     }
 
     public String getRoutingKey()
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
index bd42b3f..655eafe 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
@@ -30,21 +30,23 @@
 
 public final class ServerEncoder extends AbstractEncoder
 {
-    public static final int DEFAULT_CAPACITY = 8192;
+    public static final int DEFAULT_CAPACITY = 256 * 1024;
+    private final boolean _useDirectMemory;
     private final int _threshold;
     private QpidByteBuffer _out;
     private int _initialCapacity;
 
     public ServerEncoder()
     {
-        this(DEFAULT_CAPACITY);
+        this(QpidByteBuffer.getPooledBufferSize(), true);
     }
 
-    public ServerEncoder(int capacity)
+    public ServerEncoder(int capacity, boolean useDirectMemory)
     {
-        _initialCapacity = capacity;
-        _threshold = capacity/16;
-        _out = QpidByteBuffer.allocateDirect(capacity);
+        _useDirectMemory = useDirectMemory;
+        _initialCapacity = (capacity > 0 ? capacity : DEFAULT_CAPACITY);
+        _threshold = Math.min(_initialCapacity/16, 256);
+        _out = QpidByteBuffer.allocate(useDirectMemory, _initialCapacity);
     }
 
     public void init()
@@ -52,7 +54,7 @@
         if(_out.capacity() < _threshold)
         {
             _out.dispose();
-            _out = QpidByteBuffer.allocateDirect(_initialCapacity);
+            _out = QpidByteBuffer.allocate(_useDirectMemory, _initialCapacity);
         }
         else
         {
@@ -81,7 +83,7 @@
     {
         QpidByteBuffer old = _out;
         int capacity = old.capacity();
-        _out = QpidByteBuffer.allocateDirect(Math.max(Math.max(capacity + size, 2*capacity), _initialCapacity));
+        _out = QpidByteBuffer.allocate(_useDirectMemory, Math.max(Math.max(capacity + size, 2 * capacity), _initialCapacity));
         old.flip();
         _out.put(old);
         old.dispose();
diff --git a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
index 17f5abd..eec2a05 100644
--- a/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ b/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
@@ -804,6 +804,11 @@
         _isPoolInitialized = true;
     }
 
+    public static int getPooledBufferSize()
+    {
+        return _pooledBufferSize;
+    }
+
     private static final class BufferInputStream extends InputStream
     {
         private final QpidByteBuffer _qpidByteBuffer;