PROTON-2600 Fix potential leak of tracked deliveries in the session

Ensure that if the receiver sets the disposition immediately in the
handler we don't add the delivery to the tracking map.
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 3ecfd5e..d3fbae2 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2810,4 +2810,80 @@
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testSyncReceiverAutoSettlingOnEachIncomingDelivery() throws Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World!"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond().withNextOutgoingId(0);
+            peer.expectAttach().respond();
+            peer.expectFlow().withLinkCredit(3);
+            peer.remoteTransfer().withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] {1})
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.expectDisposition().withFirst(0)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.remoteTransfer().withDeliveryId(1)
+                                 .withDeliveryTag(new byte[] {2})
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.expectDisposition().withFirst(1)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.remoteTransfer().withDeliveryId(2)
+                                 .withDeliveryTag(new byte[] {3})
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.expectDisposition().withFirst(2)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            ReceiverOptions options = new ReceiverOptions();
+            options.creditWindow(0);
+            options.autoSettle(true);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+            Session session = connection.openSession();
+            Receiver receiver = session.openReceiver("test-queue", options);
+
+            receiver.addCredit(3);
+
+            Delivery delivery1 = receiver.receive();
+            Wait.assertTrue(() -> delivery1.settled());
+            Delivery delivery2 = receiver.receive();
+            Wait.assertTrue(() -> delivery2.settled());
+            Delivery delivery3 = receiver.receive();
+            Wait.assertTrue(() -> delivery3.settled());
+
+            assertNotNull(delivery1);
+            assertNotNull(delivery2);
+            assertNotNull(delivery3);
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+
+            receiver.close();
+            session.close();
+            connection.close();
+
+            // Check post conditions and done.
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
index 5e43a56..e6ec2a5 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
@@ -141,7 +141,7 @@
         nextIncomingId++;
 
         ProtonIncomingDelivery delivery = link.remoteTransfer(transfer, payload);
-        if (!delivery.isRemotelySettled() && delivery.isFirstTransfer()) {
+        if (!delivery.isSettled() && !delivery.isRemotelySettled() && delivery.isFirstTransfer()) {
             unsettled.put((int) delivery.getDeliveryId(), delivery);
         }
 
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
index 7cffd29..240ca81 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
@@ -55,7 +55,11 @@
     protected static final Comparator<UnsignedInteger> COMPARATOR = new UnsignedComparator();
     protected static final Comparator<UnsignedInteger> REVERSE_COMPARATOR = Collections.reverseOrder(COMPARATOR);
 
-    protected final RingQueue<SplayedEntry<E>> entryPool = new RingQueue<>(64);
+    protected final int DEFAULT_ENTRY_POOL_SIZE = 64;
+
+    protected final RingQueue<SplayedEntry<E>> entryPool = new RingQueue<>(DEFAULT_ENTRY_POOL_SIZE);
+
+    protected int entriesInExistence = 0;
 
     /**
      * Root node which can be null if the tree has no elements (size == 0)
@@ -154,14 +158,14 @@
         E oldValue = null;
 
         if (root == null) {
-            root = entryPool.poll(SplayMap::createEntry).initialize(key, value);
+            root = entryPool.poll(() -> createEntry()).initialize(key, value);
         } else {
             root = splay(root, key);
             if (root.key == key) {
                 oldValue = root.value;
                 root.value = value;
             } else {
-                final SplayedEntry<E> node = entryPool.poll(SplayMap::createEntry).initialize(key, value);
+                final SplayedEntry<E> node = entryPool.poll(() -> createEntry()).initialize(key, value);
 
                 if (compare(key, root.key) < 0) {
                     shiftRootRightOf(node);
@@ -198,13 +202,13 @@
      */
     public E putIfAbsent(int key, E value) {
         if (root == null) {
-            root = entryPool.poll(SplayMap::createEntry).initialize(key, value);
+            root = entryPool.poll(() -> createEntry()).initialize(key, value);
         } else {
             root = splay(root, key);
             if (root.key == key) {
                 return root.value;
             } else {
-                final SplayedEntry<E> node = entryPool.poll(SplayMap::createEntry).initialize(key, value);
+                final SplayedEntry<E> node = entryPool.poll(() -> createEntry()).initialize(key, value);
 
                 if (compare(key, root.key) < 0) {
                     shiftRootRightOf(node);
@@ -857,7 +861,7 @@
         return Integer.compareUnsigned(lhs, rhs);
     }
 
-    private static <E> SplayedEntry<E> createEntry() {
+    private SplayedEntry<E> createEntry() {
         return new SplayedEntry<>();
     }
 
@@ -1307,6 +1311,8 @@
     /**
      * An immutable {@link Map} entry that can be used when exposing raw entry mappings
      * via the {@link Map} API.
+     *
+     * @param <E> Type of the value portion of this immutable entry.
      */
     public static class ImmutableSplayMapEntry<E> implements Map.Entry<UnsignedInteger, E> {
 
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
index bedfcf6..b8be5c3 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
@@ -4592,4 +4592,66 @@
 
         assertNull(failure);
     }
+
+    @Test
+    public void testReceiveDeliveriesAndSendDispositionUponReceipt() {
+        Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        final byte[] payload = new byte[] { 1 };
+
+        peer.expectAMQPHeader().respondWithAMQPHeader();
+        peer.expectOpen().respond().withContainerId("driver");
+        peer.expectBegin().respond().withNextOutgoingId(0);
+        peer.expectAttach().respond();
+        peer.expectFlow().withLinkCredit(3);
+        peer.remoteTransfer().withDeliveryId(0)
+                             .withDeliveryTag(new byte[] {1})
+                             .withMore(false)
+                             .withMessageFormat(0)
+                             .withPayload(payload).queue();
+        peer.expectDisposition().withFirst(0)
+                                .withSettled(true)
+                                .withState().accepted();
+        peer.remoteTransfer().withDeliveryId(1)
+                             .withDeliveryTag(new byte[] {2})
+                             .withMore(false)
+                             .withMessageFormat(0)
+                             .withPayload(payload).queue();
+        peer.expectDisposition().withFirst(1)
+                                .withSettled(true)
+                                .withState().accepted();
+        peer.remoteTransfer().withDeliveryId(2)
+                             .withDeliveryTag(new byte[] {3})
+                             .withMore(false)
+                             .withMessageFormat(0)
+                             .withPayload(payload).queue();
+        peer.expectDisposition().withFirst(2)
+                                .withSettled(true)
+                                .withState().accepted();
+
+        Connection connection = engine.start().open();
+        Session session = connection.session().open();
+        Receiver receiver = session.receiver("receiver");
+        receiver.deliveryReadHandler((delivery) -> {
+            delivery.disposition(Accepted.getInstance(), true);
+        });
+
+        receiver.addCredit(3);
+        receiver.open();
+
+        peer.waitForScriptToComplete();
+        peer.expectDetach().respond();
+        peer.expectEnd().respond();
+        peer.expectClose().respond();
+
+        receiver.close();
+        session.close();
+        connection.close();
+
+        // Check post conditions and done.
+        peer.waitForScriptToComplete();
+        assertNull(failure);
+    }
 }