Merge pull request #76 from lukeabsent/AMQNET-722

AMQNET 722 little addition 
diff --git a/docs/configuration.md b/docs/configuration.md
index 8d0dd17..c414126 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -24,6 +24,11 @@
 - **nms.clientIdPrefix** Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory. The default prefix is 'ID:'.
 - **nms.connectionIdPrefix** Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS ConnectionFactory. This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is 'ID:'.
 - **nms.maxNewConnectionRatePerSec** Allowed approximated rate for how fast connection factory is allowed to create new connection. If there is more request, they will have to wait. Default value is -1 which means unlimited.
+- **nms.prefetchPolicy.all** Link credit value that will be assigned to new consumers from each category. It is pretty much the number of messages consumer can read without acknowledging. The default value is 1000.
+- **nms.prefetchPolicy.queuePrefetch** Link credit value that will be assigned to new consumers of queue. The default value is 1000.
+- **nms.prefetchPolicy.topicPrefetch** Link credit value that will be assigned to new consumers of topic. The default value is 1000.
+- **nms.prefetchPolicy.queueBrowserPrefetch** Link credit value that will be assigned to new queue browser. The default value is 1000..
+- **nms.prefetchPolicy.durableTopicPrefetch** Link credit value that will be assigned to new consumers of durable topic. The default value is 1000..
 
 ### TCP Transport Configuration options
 When connected to a remote using plain TCP these options configure the behaviour of the underlying socket. These options are appended to the connection URI along with the other configuration options, for example:
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index b656037..103a4cb 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -34,6 +34,13 @@
         public static readonly int DEFAULT_IDLE_TIMEOUT;
         public static readonly ushort DEFAULT_CHANNEL_MAX;
         public static readonly int DEFAULT_MAX_FRAME_SIZE;
+        public static readonly PrefetchPolicyInfo DEFAULT_PREFETCH_POLICY = new PrefetchPolicyInfo()
+        {
+            QueuePrefetch = 1000,
+            TopicPrefetch = 1000,
+            DurableTopicPrefetch = 1000,
+            QueueBrowserPrefetch = 1000
+        };
         public static double DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC = -1;
 
         static NmsConnectionInfo()
@@ -70,6 +77,11 @@
         public bool DelayedDeliverySupported { get; set; }
         
         public bool SharedSubsSupported { get; set; }
+
+        public PrefetchPolicyInfo PrefetchPolicy { get; set; } = DEFAULT_PREFETCH_POLICY;
+        
+        
+        
         
 
         public void SetClientId(string clientId, bool explicitClientId)
@@ -101,4 +113,36 @@
             return $"[{nameof(NmsConnectionInfo)}] {nameof(Id)}: {Id}, {nameof(ConfiguredUri)}: {ConfiguredUri}";
         }
     }
+
+    public class PrefetchPolicyInfo
+    {
+        public int QueuePrefetch { get; set; }
+        public int TopicPrefetch { get; set; }
+        public int QueueBrowserPrefetch { get; set; }
+        public int DurableTopicPrefetch { get; set; }
+
+        public int All
+        {
+            set => QueuePrefetch = TopicPrefetch = QueueBrowserPrefetch = DurableTopicPrefetch = value;
+        }
+
+        internal PrefetchPolicyInfo Clone()
+        {
+            return (PrefetchPolicyInfo) this.MemberwiseClone();
+        }
+        
+        public int GetLinkCredit(IDestination destination, bool browser, bool durable)
+        {
+            if (destination.IsTopic)
+            {
+                if (durable) return DurableTopicPrefetch;
+                else return TopicPrefetch;
+            }
+            else
+            {
+                if (browser) return QueueBrowserPrefetch;
+                else return QueuePrefetch;
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
index 70cf529..b5506c3 100644
--- a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
@@ -21,8 +21,6 @@
 {
     public class NmsConsumerInfo : INmsResource<NmsConsumerId>
     {
-        public static readonly int DEFAULT_CREDIT = 200;
-        
         public NmsConsumerInfo(NmsConsumerId consumerId)
         {
             Id = consumerId ?? throw new ArgumentNullException(nameof(consumerId), "Consumer ID cannot be null");
@@ -39,7 +37,7 @@
         public bool IsShared { get; set; }
         public bool LocalMessageExpiry { get; set; }
         public bool IsBrowser { get; set; }
-        public int LinkCredit { get; set; } = DEFAULT_CREDIT;
+        public int LinkCredit { get; set; }
         
         public bool HasSelector() => !string.IsNullOrWhiteSpace(Selector);
 
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs b/src/NMS.AMQP/NmsConnectionFactory.cs
index 6729bee..6212ddc 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -169,6 +169,11 @@
         public string ClientId { get; set; }

 

         /// <summary>

+        /// Sets and gets the prefetch values for consumers

+        /// </summary>

+        public PrefetchPolicyInfo PrefetchPolicy { get; set; } = NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.Clone();

+

+        /// <summary>

         /// Sets the desired max rate of creating new connections by this factory.

         ///

         /// NOTE: During creating new connection if the rate is too high system will

@@ -333,7 +338,8 @@
                 RequestTimeout = RequestTimeout,

                 SendTimeout = SendTimeout,

                 CloseTimeout = CloseTimeout,

-                LocalMessageExpiry = LocalMessageExpiry

+                LocalMessageExpiry = LocalMessageExpiry,

+                PrefetchPolicy = PrefetchPolicy.Clone()

             };

 

             bool userSpecifiedClientId = ClientId != null;

diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index 63f6896..c214ee0 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -50,7 +50,7 @@
             {
                 session.Connection.CheckConsumeFromTemporaryDestination((NmsTemporaryDestination) destination);
             }
-
+         
             Info = new NmsConsumerInfo(consumerId)
             {
                 Destination = destination,
@@ -61,8 +61,8 @@
                 IsShared = IsSharedSubscription,
                 IsDurable = IsDurableSubscription,
                 IsBrowser =  IsBrowser,
-                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry
-
+                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry,
+                LinkCredit = Session.Connection.ConnectionInfo.PrefetchPolicy.GetLinkCredit(destination, IsBrowser, IsDurableSubscription)
             };
             deliveryTask = new MessageDeliveryTask(this);
         }
diff --git a/src/NMS.AMQP/Util/PropertyUtil.cs b/src/NMS.AMQP/Util/PropertyUtil.cs
index db7cd65..ac26ec7 100644
--- a/src/NMS.AMQP/Util/PropertyUtil.cs
+++ b/src/NMS.AMQP/Util/PropertyUtil.cs
@@ -52,28 +52,45 @@
 
         public static void SetProperties(object obj, StringDictionary properties, string propertyPrefix = PROPERTY_PREFIX)
         {
-            Dictionary<string, PropertyInfo> props = GetPropertiesForClass(obj);
             foreach (string rawkey in properties.Keys)
             {
-                string key = RemovePrefix(propertyPrefix, rawkey);
-                Tracer.DebugFormat("Searching for Property: \"{0}\"", key);
-                if (props.ContainsKey(key))
-                {
-                    Tracer.DebugFormat(
-                        "Assigning Property {0} to {1}.{2} with value {3}",
-                        key, obj.GetType().Namespace, obj.GetType().Name, properties[rawkey]
-                        );
-#if NET40
-                    if (props[key].GetSetMethod() != null)
-#else
-                    if(props[key].SetMethod!=null)
-#endif
-                        props[key].SetValue(obj, ConvertType(props[key].PropertyType, properties[rawkey]), null);
-                }
+                Tracer.DebugFormat("Searching for Property: \"{0}\"", rawkey);
+                var (currentObject, propertyInfo) = GetPropertyInfo(obj, propertyPrefix, rawkey);
 
+                if (propertyInfo != null)
+                {
+#if NET40
+                    if (propertyInfo.GetSetMethod() != null)
+#else
+                    if (propertyInfo.SetMethod != null)
+#endif
+                    {
+                        Tracer.DebugFormat(
+                            "Assigning Property {0} to {1}.{2} with value {3}",
+                            rawkey, obj.GetType().Namespace, obj.GetType().Name, properties[rawkey]
+                        );
+                        propertyInfo.SetValue(currentObject, ConvertType(propertyInfo.PropertyType, properties[rawkey]), null);
+                    }
+
+                }
             }
         }
 
+        private static (object,PropertyInfo) GetPropertyInfo(object obj, string propertyPrefix, string rawkey)
+        {
+            Object currentObject = null;
+            PropertyInfo propertyInfo = null;
+            
+            foreach (var propertyName in rawkey.Split(new[] {'.'}, StringSplitOptions.RemoveEmptyEntries))
+            {
+                currentObject = currentObject == null ? obj : propertyInfo.GetValue(currentObject);
+                string key = RemovePrefix(propertyPrefix, propertyName);
+                propertyInfo = GetPropertiesForClass(currentObject)[key];
+            }
+
+            return (currentObject, propertyInfo);
+        }
+
         public static StringDictionary GetProperties(object obj, string propertyPrefix = PROPERTY_PREFIX)
         {
             StringDictionary result = new StringDictionary();
diff --git a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
index bf309c4..c742457 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
@@ -48,10 +48,14 @@
             connection.Start();
             return connection;
         }
-        
-        protected IConnection CreateAmqpConnection(string clientId = null)
+
+        protected IConnection CreateAmqpConnection(string clientId = null, string options = null)
         {
             string brokerUri = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_URI") ?? "amqp://127.0.0.1:5672";
+            if (options != null)
+            {
+                brokerUri += "?" + options;
+            }
             string userName = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CU") ?? "admin";
             string password = Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CPWD") ?? "admin";
 
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
index dc8c260..7b2de68 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -60,7 +60,47 @@
             messageConsumer.Close();
             
         }
+        
+        [Test, Timeout(60_000)]
+        public void TestConsumerCredit()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
 
+            Connection = CreateAmqpConnection(options: "nms.prefetchPolicy.all=3");
+            Connection.Start();
+
+            ISession session = Connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+            ConcurrentBag<IMessage> messages = new ConcurrentBag<IMessage>();
+            CountdownEvent countdownReceived = new CountdownEvent(4);
+            messageConsumer.Listener += message =>
+            {
+                messages.Add(message);
+                try
+                {
+                    countdownReceived.Signal();
+                }
+                catch (Exception)
+                {
+                    // if it gets below zero, we dont care
+                }
+            };
+            
+            IMessageProducer producer = session.CreateProducer(queue);
+            Enumerable.Range(0, 100).ToList().ForEach(nr => producer.Send(session.CreateTextMessage("hello")));
+            
+            // Wait for at least four messages are read, which should never happen
+            Assert.IsFalse(countdownReceived.Wait(500));
+            Assert.AreEqual(3, messages.Count);
+            
+            // Once we ack messages we should start receiving another 3
+            messages.ToList().ForEach(m => m.Acknowledge());
+            // We just wait to see if 4th message arrived
+            Assert.IsTrue(countdownReceived.Wait(500));
+        }
+
+        
         [Test, Timeout(60_000)]
         public void TestSelectorsWithJMSType()
         {
diff --git a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
index 3962e36..096e53f 100644
--- a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
@@ -120,7 +120,8 @@
                                 "&nms.requestTimeout=1000" +
                                 "&nms.sendTimeout=1000" +
                                 "&nms.closeTimeout=2000" +
-                                "&nms.localMessageExpiry=false";
+                                "&nms.localMessageExpiry=false" +
+                                "&nms.prefetchPolicy.all=55";
 
             NmsConnectionFactory factory = new NmsConnectionFactory(new Uri(configuredUri));
 
@@ -132,6 +133,46 @@
             Assert.AreEqual(1000, factory.RequestTimeout);
             Assert.AreEqual(1000, factory.SendTimeout);
             Assert.AreEqual(2000, factory.CloseTimeout);
+            Assert.AreEqual(55, factory.PrefetchPolicy.QueuePrefetch);
+            Assert.AreEqual(55, factory.PrefetchPolicy.TopicPrefetch);
+            Assert.AreEqual(55, factory.PrefetchPolicy.DurableTopicPrefetch);
+            Assert.AreEqual(55, factory.PrefetchPolicy.QueueBrowserPrefetch);
+            Assert.IsFalse(factory.LocalMessageExpiry);
+        }
+        
+        [Test]
+        public void TestSetPrefetchPolicyPropertiesFromUri()
+        {
+            string baseUri = "amqp://localhost:1234";
+            string configuredUri = baseUri +
+                                   "?nms.username=user" +
+                                   "&nms.password=password" +
+                                   "&nms.clientId=client" +
+                                   "&nms.connectionIdPrefix=ID:TEST" +
+                                   "&nms.clientIDPrefix=clientId" +
+                                   "&nms.requestTimeout=1000" +
+                                   "&nms.sendTimeout=1000" +
+                                   "&nms.closeTimeout=2000" +
+                                   "&nms.localMessageExpiry=false" +
+                                   "&nms.prefetchPolicy.queuePrefetch=11" +
+                                   "&nms.prefetchPolicy.topicPrefetch=22" +
+                                   "&nms.prefetchPolicy.durableTopicPrefetch=33" +
+                                   "&nms.prefetchPolicy.queueBrowserPrefetch=44";
+
+            NmsConnectionFactory factory = new NmsConnectionFactory(new Uri(configuredUri));
+
+            Assert.AreEqual("user", factory.UserName);
+            Assert.AreEqual("password", factory.Password);
+            Assert.AreEqual("client", factory.ClientId);
+            Assert.AreEqual("ID:TEST", factory.ConnectionIdPrefix);
+            Assert.AreEqual("clientId", factory.ClientIdPrefix);
+            Assert.AreEqual(1000, factory.RequestTimeout);
+            Assert.AreEqual(1000, factory.SendTimeout);
+            Assert.AreEqual(2000, factory.CloseTimeout);
+            Assert.AreEqual(11, factory.PrefetchPolicy.QueuePrefetch);
+            Assert.AreEqual(22, factory.PrefetchPolicy.TopicPrefetch);
+            Assert.AreEqual(33, factory.PrefetchPolicy.DurableTopicPrefetch);
+            Assert.AreEqual(44, factory.PrefetchPolicy.QueueBrowserPrefetch);
             Assert.IsFalse(factory.LocalMessageExpiry);
         }
 
diff --git a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
index b4f9885..5445d5e 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
@@ -59,6 +59,130 @@
         }
 
         [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditAll()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.all=5");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(5, credit));
+
+                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = await session.GetQueueAsync("myQueue");
+                IMessageConsumer consumer = await session.CreateConsumerAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditQueuePrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.queuePrefetch=6");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(6, credit));
+
+                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = await session.GetQueueAsync("myQueue");
+                IMessageConsumer consumer = await session.CreateConsumerAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.topicPrefetch=7");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(7, credit));
+
+                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                ITopic topic = await session.GetTopicAsync("myTopic");
+                IMessageConsumer consumer = await session.CreateConsumerAsync(topic);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditDurableTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.durableTopicPrefetch=8");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(8, credit));
+
+                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                ITopic topic = await session.GetTopicAsync("myTopic");
+                IMessageConsumer consumer = await session.CreateDurableConsumerAsync(topic, "durableName");
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, replyClosed: false);
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditQueueBrowserPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.queueBrowserPrefetch=9");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach(Assert.IsNotNull, Assert.IsNotNull, Assert.IsNotNull, true);
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(9, credit));
+
+                ISession session = await connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = await session.GetQueueAsync("myQueue");
+                IQueueBrowser consumer = await session.CreateBrowserAsync(queue);
+                
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                
+                // To cause actual creation of consumer, after iteration consumer would be closed
+                foreach (var o in consumer)
+                {
+                }
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
         public async Task TestRemotelyCloseConsumer()
         {
             Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
diff --git a/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs b/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
index 9cae2d9..0f5397d 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
@@ -23,6 +23,7 @@
 using Amqp.Types;
 using Apache.NMS;
 using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Meta;
 using Moq;
 using NLog;
 using NMS.AMQP.Test.TestAmqp;
@@ -617,7 +618,7 @@
                 finalPeer.ExpectBegin();
                 finalPeer.ExpectBegin();
                 finalPeer.ExpectReceiverAttach();
-                finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false, creditMatcher: credit => Assert.AreEqual(credit, 200));
+                finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false, creditMatcher: credit => Assert.AreEqual(NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.QueuePrefetch, credit));
                 finalPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
                 finalPeer.ExpectClose();
 
diff --git a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
index a27b268..87659da 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
@@ -57,6 +57,131 @@
                 testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
+        
+             [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditAll()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, "nms.prefetchPolicy.all=5");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(5, credit));
+
+                IQueue queue = await context.GetQueueAsync("myQueue");
+                INMSConsumer consumer = await context.CreateConsumerAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditQueuePrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, "nms.prefetchPolicy.queuePrefetch=6");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(6, credit));
+
+                IQueue queue = await context.GetQueueAsync("myQueue");
+                INMSConsumer consumer = await context.CreateConsumerAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, "nms.prefetchPolicy.topicPrefetch=7");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(7, credit));
+
+                ITopic topic = await context.GetTopicAsync("myTopic");
+                INMSConsumer consumer = await context.CreateConsumerAsync(topic);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditDurableTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, "nms.prefetchPolicy.durableTopicPrefetch=8");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(8, credit));
+
+                ITopic topic = await context.GetTopicAsync("myTopic");
+                INMSConsumer consumer = await context.CreateDurableConsumerAsync(topic, "durableName");
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, replyClosed: false);
+                testPeer.ExpectEnd();
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditQueueBrowserPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, "nms.prefetchPolicy.queueBrowserPrefetch=9");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach(Assert.IsNotNull, Assert.IsNotNull, Assert.IsNotNull, true);
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(9, credit));
+
+                IQueue queue = await context.GetQueueAsync("myQueue");
+                IQueueBrowser consumer = await context.CreateBrowserAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+
+                // To cause actual creation of consumer, after iteration consumer would be closed
+                foreach (var o in consumer)
+                {
+                }
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
 
         // TODO No connection Listener in NMSContext
         // [Test, Timeout(20_000)]
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
index f42ba51..6bfb1f5 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
@@ -59,6 +59,131 @@
         }
 
         [Test, Timeout(20_000)]
+        public void TestConsumerCreditAll()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.prefetchPolicy.all=5");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(5, credit));
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditQueuePrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.prefetchPolicy.queuePrefetch=6");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(6, credit));
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.prefetchPolicy.topicPrefetch=7");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(7, credit));
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                ITopic topic = session.GetTopic("myTopic");
+                IMessageConsumer consumer = session.CreateConsumer(topic);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditDurableTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.prefetchPolicy.durableTopicPrefetch=8");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(8, credit));
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                ITopic topic = session.GetTopic("myTopic");
+                IMessageConsumer consumer = session.CreateDurableConsumer(topic, "durableName");
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, replyClosed: false);
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditQueueBrowserPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, "nms.prefetchPolicy.queueBrowserPrefetch=9");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach(Assert.IsNotNull, Assert.IsNotNull, Assert.IsNotNull, true);
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(9, credit));
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IQueueBrowser consumer = session.CreateBrowser(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+
+                // To cause actual creation of consumer, after iteration consumer would be closed
+                foreach (var o in consumer)
+                {
+                }
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+
+        [Test, Timeout(20_000)]
         public void TestRemotelyCloseConsumer()
         {
             Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>();
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 42919cc..6d53c45 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -23,6 +23,7 @@
 using Amqp.Types;
 using Apache.NMS;
 using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Meta;
 using Moq;
 using NLog;
 using NMS.AMQP.Test.TestAmqp;
@@ -617,7 +618,7 @@
                 finalPeer.ExpectBegin();
                 finalPeer.ExpectBegin();
                 finalPeer.ExpectReceiverAttach();
-                finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false, creditMatcher: credit => Assert.AreEqual(credit, 200));
+                finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: false, creditMatcher: credit => Assert.AreEqual(NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.QueuePrefetch, credit));
                 finalPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
                 finalPeer.ExpectClose();
 
diff --git a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
index 542f720..0230f44 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
@@ -58,6 +58,131 @@
             }
         }
 
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditAll()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, "nms.prefetchPolicy.all=5");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(5, credit));
+
+                IQueue queue = context.GetQueue("myQueue");
+                INMSConsumer consumer = context.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditQueuePrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, "nms.prefetchPolicy.queuePrefetch=6");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(6, credit));
+
+                IQueue queue = context.GetQueue("myQueue");
+                INMSConsumer consumer = context.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, "nms.prefetchPolicy.topicPrefetch=7");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(7, credit));
+
+                ITopic topic = context.GetTopic("myTopic");
+                INMSConsumer consumer = context.CreateConsumer(topic);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditDurableTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, "nms.prefetchPolicy.durableTopicPrefetch=8");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(8, credit));
+
+                ITopic topic = context.GetTopic("myTopic");
+                INMSConsumer consumer = context.CreateDurableConsumer(topic, "durableName");
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, replyClosed: false);
+                testPeer.ExpectEnd();
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditQueueBrowserPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, "nms.prefetchPolicy.queueBrowserPrefetch=9");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach(Assert.IsNotNull, Assert.IsNotNull, Assert.IsNotNull, true);
+                testPeer.ExpectLinkFlow(false, false, credit => Assert.AreEqual(9, credit));
+
+                IQueue queue = context.GetQueue("myQueue");
+                IQueueBrowser consumer = context.CreateBrowser(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true);
+                testPeer.ExpectEnd();
+
+                // To cause actual creation of consumer, after iteration consumer would be closed
+                foreach (var o in consumer)
+                {
+                }
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+       
         // TODO No connection Listener in context
         // [Test, Timeout(20_000)]
         // public void TestRemotelyCloseConsumer()