PROTON-2541 Finish work on providing more common base receiver API

Pulls up most of the common and repeated bits in receiver
implementations into the base receiver link type.
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
index a042c91..7bb52e0 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
@@ -17,6 +17,7 @@
 
 package org.apache.qpid.protonj2.client.impl;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
@@ -47,7 +48,7 @@
 public abstract class ClientLinkType<LinkType extends Link<LinkType>,
                                      ProtonType extends org.apache.qpid.protonj2.engine.Link<ProtonType>> implements Link<LinkType> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ClientLinkType.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     @SuppressWarnings("rawtypes")
     protected static final AtomicIntegerFieldUpdater<ClientLinkType> CLOSED_UPDATER =
@@ -246,20 +247,20 @@
         return openFuture;
     }
 
-    LinkType remotelyClosedHandler(Consumer<LinkType> handler) {
+    final LinkType remotelyClosedHandler(Consumer<LinkType> handler) {
         this.linkRemotelyClosedHandler = handler;
         return self();
     }
 
-    String getId() {
+    final String getId() {
         return linkId;
     }
 
-    void setFailureCause(ClientException failureCause) {
+    final void setFailureCause(ClientException failureCause) {
         this.failureCause = failureCause;
     }
 
-    ClientException getFailureCause() {
+    final ClientException getFailureCause() {
         if (failureCause == null) {
             return session.getFailureCause();
         } else {
@@ -267,11 +268,11 @@
         }
     }
 
-    boolean isClosed() {
+    final boolean isClosed() {
         return closed > 0;
     }
 
-    boolean isDynamic() {
+    final boolean isDynamic() {
         if (protonLink().isSender()) {
             return protonLink().getTarget() != null && protonLink().<org.apache.qpid.protonj2.types.messaging.Target>getTarget().isDynamic();
         } else {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index e025d35..91c7270 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -16,8 +16,8 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import java.lang.invoke.MethodHandles;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.client.Delivery;
@@ -26,13 +26,11 @@
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
 import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
-import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
 import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.client.util.FifoDeliveryQueue;
 import org.apache.qpid.protonj2.engine.IncomingDelivery;
 import org.apache.qpid.protonj2.types.messaging.Accepted;
 import org.apache.qpid.protonj2.types.messaging.Released;
-import org.apache.qpid.protonj2.types.transport.DeliveryState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,10 +39,7 @@
  */
 public final class ClientReceiver extends ClientReceiverLinkType<Receiver> implements Receiver {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
-
-    private ClientFuture<Receiver> drainingFuture;
-    private ScheduledFuture<?> drainingTimeout;
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private final ReceiverOptions options;
     private final FifoDeliveryQueue messageQueue;
@@ -75,9 +70,9 @@
             ClientDelivery delivery = messageQueue.dequeue(units.toMillis(timeout));
             if (delivery != null) {
                 if (options.autoAccept()) {
-                    asyncApplyDisposition(delivery.protonDelivery(), Accepted.getInstance(), options.autoSettle());
-                } else {
-                    asyncReplenishCreditIfNeeded();
+                    disposition(delivery.protonDelivery(), Accepted.getInstance(), options.autoSettle());
+                } else if (options.creditWindow() > 0) {
+                    executor.execute(() -> replenishCreditIfNeeded());
                 }
 
                 return delivery;
@@ -100,8 +95,8 @@
         if (delivery != null) {
             if (options.autoAccept()) {
                 delivery.disposition(org.apache.qpid.protonj2.client.DeliveryState.accepted(), options.autoSettle());
-            } else {
-                asyncReplenishCreditIfNeeded();
+            } else if (options.creditWindow() > 0) {
+                executor.execute(() -> replenishCreditIfNeeded());
             }
         } else {
             checkClosedOrFailed();
@@ -172,24 +167,14 @@
     //----- Internal API for the ClientReceiver and other Client objects
 
     @Override
-    void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
-        checkClosedOrFailed();
-        asyncApplyDisposition(delivery, state, settle);
-    }
-
-    @Override
-    boolean isDynamic() {
-        return protonReceiver.getSource() != null && protonReceiver.getSource().isDynamic();
-    }
-
-    @Override
     protected Receiver self() {
         return this;
     }
 
     //----- Handlers for proton receiver events
 
-    private void handleDeliveryReceived(IncomingDelivery delivery) {
+    @Override
+    protected void handleDeliveryRead(IncomingDelivery delivery) {
         LOG.trace("Delivery data was received: {}", delivery);
 
         if (delivery.getDefaultDeliveryState() == null) {
@@ -197,47 +182,17 @@
         }
 
         if (!delivery.isPartial()) {
-            LOG.trace("{} has incoming Message(s).", this);
+            LOG.trace("Receiver {} has incoming Message(s).", linkId);
             messageQueue.enqueue(new ClientDelivery(this, delivery));
         } else {
             delivery.claimAvailableBytes();
         }
     }
 
-    private void handleDeliveryAborted(IncomingDelivery delivery) {
-        LOG.trace("Delivery data was aborted: {}", delivery);
-        delivery.settle();
-        replenishCreditIfNeeded();
-    }
-
-    private void handleDeliveryStateRemotelyUpdated(IncomingDelivery delivery) {
-        LOG.trace("Delivery remote state was updated: {}", delivery);
-    }
-
-    private void handleReceiverCreditUpdated(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        LOG.trace("Receiver credit update by remote: {}", receiver);
-
-        if (drainingFuture != null) {
-            if (receiver.getCredit() == 0) {
-                drainingFuture.complete(this);
-                if (drainingTimeout != null) {
-                    drainingTimeout.cancel(false);
-                    drainingTimeout = null;
-                }
-            }
-        }
-    }
-
     //----- Private implementation details
 
-    private void asyncApplyDisposition(IncomingDelivery delivery, DeliveryState state, boolean settle) {
-        executor.execute(() -> {
-            session.getTransactionContext().disposition(delivery, state, settle);
-            replenishCreditIfNeeded();
-        });
-    }
-
-    private void replenishCreditIfNeeded() {
+    @Override
+    protected void replenishCreditIfNeeded() {
         int creditWindow = options.creditWindow();
         if (creditWindow > 0) {
             int currentCredit = protonReceiver.getCredit();
@@ -247,7 +202,7 @@
                 if (potentialPrefetch <= creditWindow * 0.7) {
                     int additionalCredit = creditWindow - potentialPrefetch;
 
-                    LOG.trace("Consumer granting additional credit: {}", additionalCredit);
+                    LOG.trace("Receiver {} granting additional credit: {}", linkId, additionalCredit);
                     try {
                         protonReceiver.addCredit(additionalCredit);
                     } catch (Exception ex) {
@@ -258,13 +213,6 @@
         }
     }
 
-    private void asyncReplenishCreditIfNeeded() {
-        int creditWindow = options.creditWindow();
-        if (creditWindow > 0) {
-            executor.execute(() -> replenishCreditIfNeeded());
-        }
-    }
-
     @Override
     protected void recreateLinkForReconnect() {
         int previousCredit = protonReceiver.getCredit() + messageQueue.size();
@@ -288,35 +236,8 @@
     }
 
     @Override
-    protected void linkSpecificCleanupHandler(ClientException failureCause) {
-        if (drainingTimeout != null) {
-            drainingFuture.failed(
-                failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
-            drainingTimeout.cancel(false);
-            drainingTimeout = null;
-        }
-    }
-
-    @Override
-    protected void linkSpecificLocalOpenHandler() {
-        protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
-                      .deliveryReadHandler(this::handleDeliveryReceived)
-                      .deliveryAbortedHandler(this::handleDeliveryAborted)
-                      .creditStateUpdateHandler(this::handleReceiverCreditUpdated);
-    }
-
-    @Override
     protected void linkSpecificLocalCloseHandler() {
         messageQueue.stop();  // Ensure blocked receivers are all unblocked.
-    }
-
-    @Override
-    protected void linkSpecificRemoteOpenHandler() {
-        replenishCreditIfNeeded();
-    }
-
-    @Override
-    protected void linkSpecificRemoteCloseHandler() {
-        // Nothing needed for receiver link remote close
+        messageQueue.clear();
     }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
index a2a00fa..76df24e 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
@@ -17,12 +17,19 @@
 
 package org.apache.qpid.protonj2.client.impl;
 
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ScheduledFuture;
+
 import org.apache.qpid.protonj2.client.Link;
 import org.apache.qpid.protonj2.client.LinkOptions;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
+import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.engine.IncomingDelivery;
 import org.apache.qpid.protonj2.engine.Receiver;
 import org.apache.qpid.protonj2.types.transport.DeliveryState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Base class for client link types that wrap a proton receiver to provide
@@ -30,6 +37,11 @@
  */
 public abstract class ClientReceiverLinkType<ReceiverType extends Link<ReceiverType>> extends ClientLinkType<ReceiverType, Receiver> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    protected ClientFuture<ReceiverType> drainingFuture;
+    protected ScheduledFuture<?> drainingTimeout;
+
     protected Receiver protonReceiver;
 
     protected ClientReceiverLinkType(ClientSession session, String linkId, LinkOptions<?> options, Receiver protonReceiver) {
@@ -55,6 +67,78 @@
      *
      * @throws ClientException if an error occurs while applying the disposition to the delivery.
      */
-    abstract void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException;
+    void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
+        checkClosedOrFailed();
+        executor.execute(() -> {
+            session.getTransactionContext().disposition(delivery, state, settle);
+            replenishCreditIfNeeded();
+        });
+    }
 
+    //----- Abstract API that receiver must implement as they are implementation specific
+
+    protected abstract void replenishCreditIfNeeded();
+
+    protected abstract void handleDeliveryRead(IncomingDelivery delivery);
+
+    //----- API that receiver may override if they need additional handling
+
+    protected void handleDeliveryAborted(IncomingDelivery delivery) {
+        LOG.trace("Delivery data was aborted: {}", delivery);
+        delivery.settle();
+        replenishCreditIfNeeded();
+    }
+
+    protected void handleDeliveryStateRemotelyUpdated(IncomingDelivery delivery) {
+        LOG.trace("Delivery remote state was updated: {}", delivery);
+    }
+
+    protected void handleReceiverCreditUpdated(org.apache.qpid.protonj2.engine.Receiver receiver) {
+        LOG.trace("Receiver credit update by remote: {}", receiver);
+
+        if (drainingFuture != null) {
+            if (receiver.getCredit() == 0) {
+                drainingFuture.complete(self());
+                if (drainingTimeout != null) {
+                    drainingTimeout.cancel(false);
+                    drainingTimeout = null;
+                }
+            }
+        }
+    }
+
+    //----- Default receiver link handling of proton engine events
+
+    @Override
+    protected void linkSpecificLocalOpenHandler() {
+        protonLink().deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
+                    .deliveryReadHandler(this::handleDeliveryRead)
+                    .deliveryAbortedHandler(this::handleDeliveryAborted)
+                    .creditStateUpdateHandler(this::handleReceiverCreditUpdated);
+    }
+
+    @Override
+    protected void linkSpecificRemoteOpenHandler() {
+        replenishCreditIfNeeded();
+    }
+
+    @Override
+    protected void linkSpecificLocalCloseHandler() {
+        // Nothing needed for local close handling
+    }
+
+    @Override
+    protected void linkSpecificRemoteCloseHandler() {
+        // Nothing needed for receiver link remote close
+    }
+
+    @Override
+    protected void linkSpecificCleanupHandler(ClientException failureCause) {
+        if (drainingTimeout != null) {
+            drainingFuture.failed(
+                failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
+            drainingTimeout.cancel(false);
+            drainingTimeout = null;
+        }
+    }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
index 9ab35c8..067b4b8 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -34,7 +35,6 @@
 import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.engine.IncomingDelivery;
 import org.apache.qpid.protonj2.types.messaging.Released;
-import org.apache.qpid.protonj2.types.transport.DeliveryState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +43,7 @@
  */
 public final class ClientStreamReceiver extends ClientReceiverLinkType<StreamReceiver> implements StreamReceiver {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private ClientFuture<StreamReceiver> drainingFuture;
     private ScheduledFuture<?> drainingTimeout;
@@ -102,7 +102,9 @@
                     }
                 } else {
                     receive.complete(new ClientStreamDelivery(this, delivery));
-                    asyncReplenishCreditIfNeeded();
+                    if (options.creditWindow() > 0) {
+                        executor.execute(() -> replenishCreditIfNeeded());
+                    }
                 }
             }
         });
@@ -177,17 +179,10 @@
 
         executor.execute(() -> {
             if (notClosedOrFailed(request)) {
-                int queued = 0;
-
                 // Scan for an unsettled delivery that isn't yet assigned to a client delivery
                 // either it is a complete delivery or the initial stage of the next incoming
-                for (IncomingDelivery unsettled : protonReceiver.unsettled()) {
-                    if (unsettled.getLinkedResource() == null) {
-                        queued++;
-                    }
-                }
-
-                request.complete(queued);
+                request.complete((int)
+                    protonReceiver.unsettled().stream().filter(delivery -> delivery.getLinkedResource() == null).count());
             }
         });
 
@@ -200,9 +195,15 @@
         return options;
     }
 
+    @Override
+    protected StreamReceiver self() {
+        return this;
+    }
+
     //----- Handlers for proton receiver events
 
-    private void handleDeliveryRead(IncomingDelivery delivery) {
+    @Override
+    protected void handleDeliveryRead(IncomingDelivery delivery) {
         LOG.trace("Delivery data was received: {}", delivery);
         if (delivery.getDefaultDeliveryState() == null) {
             delivery.setDefaultDeliveryState(Released.getInstance());
@@ -223,31 +224,9 @@
                     entry.getKey().complete(new ClientStreamDelivery(this, delivery));
                 } finally {
                     entries.remove();
-                    asyncReplenishCreditIfNeeded();
-                }
-            }
-        }
-    }
-
-    private void handleDeliveryAborted(IncomingDelivery delivery) {
-        LOG.trace("Delivery data was aborted: {}", delivery);
-        delivery.settle();
-        replenishCreditIfNeeded();
-    }
-
-    private void handleDeliveryStateRemotelyUpdated(IncomingDelivery delivery) {
-        LOG.trace("Delivery remote state was updated: {}", delivery);
-    }
-
-    private void handleReceiverCreditUpdated(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        LOG.trace("Receiver credit update by remote: {}", receiver);
-
-        if (drainingFuture != null) {
-            if (receiver.getCredit() == 0) {
-                drainingFuture.complete(this);
-                if (drainingTimeout != null) {
-                    drainingTimeout.cancel(false);
-                    drainingTimeout = null;
+                    if (options.creditWindow() > 0) {
+                        executor.execute(() -> replenishCreditIfNeeded());
+                    }
                 }
             }
         }
@@ -256,26 +235,7 @@
     //----- Private implementation details
 
     @Override
-    void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
-        checkClosedOrFailed();
-        asyncApplyDisposition(delivery, state, settle);
-    }
-
-    private void asyncApplyDisposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
-        executor.execute(() -> {
-            session.getTransactionContext().disposition(delivery, state, settle);
-            replenishCreditIfNeeded();
-        });
-    }
-
-    private void asyncReplenishCreditIfNeeded() {
-        int creditWindow = options.creditWindow();
-        if (creditWindow > 0) {
-            executor.execute(() -> replenishCreditIfNeeded());
-        }
-    }
-
-    private void replenishCreditIfNeeded() {
+    protected void replenishCreditIfNeeded() {
         int creditWindow = options.creditWindow();
         if (creditWindow > 0) {
             int currentCredit = protonReceiver.getCredit();
@@ -325,40 +285,7 @@
             }
         });
 
-        if (drainingTimeout != null) {
-            drainingFuture.failed(
-                failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
-            drainingTimeout.cancel(false);
-            drainingTimeout = null;
-        }
-    }
-
-    @Override
-    protected StreamReceiver self() {
-        return this;
-    }
-
-    @Override
-    protected void linkSpecificLocalOpenHandler() {
-        protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
-                      .deliveryReadHandler(this::handleDeliveryRead)
-                      .deliveryAbortedHandler(this::handleDeliveryAborted)
-                      .creditStateUpdateHandler(this::handleReceiverCreditUpdated);
-    }
-
-    @Override
-    protected void linkSpecificLocalCloseHandler() {
-        // Nothing needed for local close handling
-    }
-
-    @Override
-    protected void linkSpecificRemoteOpenHandler() {
-        replenishCreditIfNeeded();
-    }
-
-    @Override
-    protected void linkSpecificRemoteCloseHandler() {
-        // Nothing needed for remote close handling
+        super.linkSpecificCleanupHandler(failureCause);
     }
 
     @Override