PROTON-2660 Create specialized tracking map for unsettled deliveries

Track deliveries in structure that creates little to no garbage compared
to the tree based strucutre currently used.
diff --git a/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/MapBenchmarkBase.java b/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/MapBenchmarkBase.java
index 712f26f..0a3ce98 100644
--- a/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/MapBenchmarkBase.java
+++ b/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/MapBenchmarkBase.java
@@ -23,6 +23,7 @@
 import org.apache.qpid.protonj2.types.UnsignedInteger;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
@@ -55,7 +56,7 @@
     protected Map<UnsignedInteger, String> map;
     protected Map<UnsignedInteger, String> filledMap;
 
-    @Setup
+    @Setup(Level.Invocation)
     public void init() {
         this.random.setSeed(System.currentTimeMillis());
         this.map = createMap();
diff --git a/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMapBenchmark.java b/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMapBenchmark.java
index 994fdee..2cc0aed 100644
--- a/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMapBenchmark.java
+++ b/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMapBenchmark.java
@@ -20,6 +20,7 @@
 
 import org.apache.qpid.protonj2.types.UnsignedInteger;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.infra.Blackhole;
 import org.openjdk.jmh.runner.RunnerException;
@@ -37,7 +38,7 @@
     private SplayMap<String> sqFilledMap;
 
     @Override
-    @Setup
+    @Setup(Level.Invocation)
     public void init() {
         super.init();
 
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
index e3e63e2..65ef02f 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
@@ -303,6 +303,10 @@
         return deliveryId;
     }
 
+    int getDeliveryIdInt() {
+        return (int) deliveryId;
+    }
+
     ProtonIncomingDelivery aborted() {
         aborted = true;
 
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonOutgoingDelivery.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonOutgoingDelivery.java
index b550504..e6bc954 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonOutgoingDelivery.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonOutgoingDelivery.java
@@ -277,6 +277,10 @@
         return deliveryId;
     }
 
+    int getDeliveryIdInt() {
+        return (int) deliveryId;
+    }
+
     void setDeliveryId(long deliveryId) {
         this.deliveryId = deliveryId;
     }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
index 6b74d14..a0d4bd9 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
@@ -31,8 +31,7 @@
 import org.apache.qpid.protonj2.engine.Session;
 import org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException;
 import org.apache.qpid.protonj2.engine.util.DeliveryIdTracker;
-import org.apache.qpid.protonj2.engine.util.LinkedSplayMap;
-import org.apache.qpid.protonj2.engine.util.SplayMap;
+import org.apache.qpid.protonj2.engine.util.UnsettledMap;
 import org.apache.qpid.protonj2.types.UnsignedInteger;
 import org.apache.qpid.protonj2.types.transport.Attach;
 import org.apache.qpid.protonj2.types.transport.DeliveryState;
@@ -54,7 +53,8 @@
 
     private final ProtonSessionIncomingWindow sessionWindow;
     private final DeliveryIdTracker currentDeliveryId = new DeliveryIdTracker();
-    private final SplayMap<ProtonIncomingDelivery> unsettled = new LinkedSplayMap<>();
+    private final UnsettledMap<ProtonIncomingDelivery> unsettled =
+        new UnsettledMap<ProtonIncomingDelivery>(ProtonIncomingDelivery::getDeliveryIdInt);
 
     private DeliveryState defaultDeliveryState;
     private LinkCreditState drainStateSnapshot;
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
index a0ba9f6..e883969 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
@@ -30,8 +30,7 @@
 import org.apache.qpid.protonj2.engine.Sender;
 import org.apache.qpid.protonj2.engine.Session;
 import org.apache.qpid.protonj2.engine.util.DeliveryIdTracker;
-import org.apache.qpid.protonj2.engine.util.LinkedSplayMap;
-import org.apache.qpid.protonj2.engine.util.SplayMap;
+import org.apache.qpid.protonj2.engine.util.UnsettledMap;
 import org.apache.qpid.protonj2.types.UnsignedInteger;
 import org.apache.qpid.protonj2.types.transport.Attach;
 import org.apache.qpid.protonj2.types.transport.DeliveryState;
@@ -48,7 +47,8 @@
 
     private final ProtonSessionOutgoingWindow sessionWindow;
     private final DeliveryIdTracker currentDeliveryId = new DeliveryIdTracker();
-    private final SplayMap<ProtonOutgoingDelivery> unsettled = new LinkedSplayMap<>();
+    private final UnsettledMap<ProtonOutgoingDelivery> unsettled =
+        new UnsettledMap<ProtonOutgoingDelivery>(ProtonOutgoingDelivery::getDeliveryIdInt);
 
     private EventHandler<OutgoingDelivery> deliveryUpdatedEventHandler = null;
     private EventHandler<Sender> linkCreditUpdatedHandler = null;
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
index e6ec2a5..59bf6be 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
@@ -16,14 +16,10 @@
  */
 package org.apache.qpid.protonj2.engine.impl;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
-
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException;
 import org.apache.qpid.protonj2.engine.util.SequenceNumber;
-import org.apache.qpid.protonj2.engine.util.SplayMap;
+import org.apache.qpid.protonj2.engine.util.UnsettledMap;
 import org.apache.qpid.protonj2.types.UnsignedInteger;
 import org.apache.qpid.protonj2.types.transport.Begin;
 import org.apache.qpid.protonj2.types.transport.Disposition;
@@ -61,7 +57,8 @@
     private long maxFrameSize;
     private long incomingBytes;
 
-    private SplayMap<ProtonIncomingDelivery> unsettled = new SplayMap<>();
+    private UnsettledMap<ProtonIncomingDelivery> unsettled =
+        new UnsettledMap<>(ProtonIncomingDelivery::getDeliveryIdInt);
 
     public ProtonSessionIncomingWindow(ProtonSession session) {
         this.session = session;
@@ -174,33 +171,17 @@
         return disposition;
     }
 
-    private static void handleRangedDisposition(NavigableMap<UnsignedInteger, ProtonIncomingDelivery> unsettled, Disposition disposition) {
-        final UnsignedInteger first = UnsignedInteger.valueOf(disposition.getFirst());
-        final UnsignedInteger last = UnsignedInteger.valueOf(disposition.getLast());
-
-        // Dispositions cover a contiguous range in the map requires a single sub-map
-        // which we can iterate whereas a range that wraps requires two iterations over
-        // a split between the higher portion and the lower portion of the map.
-        if (first.compareTo(last) <= 0) {
-            handleDispositions(unsettled.subMap(first, true, last, true), disposition);
+    private static void handleRangedDisposition(UnsettledMap<ProtonIncomingDelivery> unsettled, Disposition disposition) {
+        // Dispositions cover a contiguous range in the map and since the tracker always moves forward
+        // when appending new deliveries the range can wrap without needing a second iteration.
+        if (disposition.getSettled()) {
+            unsettled.removeEach((int) disposition.getFirst(), (int) disposition.getLast(), (delivery) -> {
+                delivery.getLink().remoteDisposition(disposition, delivery);
+            });
         } else {
-            handleDispositions(unsettled.tailMap(first, true), disposition);
-            handleDispositions(unsettled.headMap(last, true), disposition);
-        }
-    }
-
-    private static void handleDispositions(Map<UnsignedInteger, ProtonIncomingDelivery> deliveries, Disposition disposition) {
-        final boolean settled = disposition.getSettled();
-
-        Iterator<ProtonIncomingDelivery> deliveriesIter = deliveries.values().iterator();
-        while (deliveriesIter.hasNext()) {
-            ProtonIncomingDelivery delivery = deliveriesIter.next();
-
-            if (settled) {
-                deliveriesIter.remove();
-            }
-
-            delivery.getLink().remoteDisposition(disposition, delivery);
+            unsettled.forEach((int) disposition.getFirst(), (int) disposition.getLast(), (delivery) -> {
+                delivery.getLink().remoteDisposition(disposition, delivery);
+            });
         }
     }
 
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
index b60ef17..c25e5b4 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
@@ -16,16 +16,12 @@
  */
 package org.apache.qpid.protonj2.engine.impl;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.engine.OutgoingAMQPEnvelope;
-import org.apache.qpid.protonj2.engine.util.SplayMap;
+import org.apache.qpid.protonj2.engine.util.UnsettledMap;
 import org.apache.qpid.protonj2.types.DeliveryTag;
-import org.apache.qpid.protonj2.types.UnsignedInteger;
 import org.apache.qpid.protonj2.types.transport.Begin;
 import org.apache.qpid.protonj2.types.transport.Disposition;
 import org.apache.qpid.protonj2.types.transport.Flow;
@@ -64,7 +60,8 @@
 
     private Runnable outgoingFrameWriteComplete = this::handleOutgoingFrameWriteComplete;
 
-    private final SplayMap<ProtonOutgoingDelivery> unsettled = new SplayMap<>();
+    private final UnsettledMap<ProtonOutgoingDelivery> unsettled =
+        new UnsettledMap<ProtonOutgoingDelivery>(ProtonOutgoingDelivery::getDeliveryIdInt);
 
     public ProtonSessionOutgoingWindow(ProtonSession session) {
         this.session = session;
@@ -237,33 +234,17 @@
         return disposition;
     }
 
-    private static void handleRangedDisposition(NavigableMap<UnsignedInteger, ProtonOutgoingDelivery> unsettled, Disposition disposition) {
-        final UnsignedInteger first = UnsignedInteger.valueOf(disposition.getFirst());
-        final UnsignedInteger last = UnsignedInteger.valueOf(disposition.getLast());
-
-        // Dispositions cover a contiguous range in the map requires a single sub-map
-        // which we can iterate whereas a range that wraps requires two iterations over
-        // a split between the higher portion and the lower portion of the map.
-        if (first.compareTo(last) <= 0) {
-            handleDispositions(unsettled.subMap(first, true, last, true), disposition);
+    private static void handleRangedDisposition(UnsettledMap<ProtonOutgoingDelivery> unsettled, Disposition disposition) {
+        // Dispositions cover a contiguous range in the map and since the tracker always moves forward
+        // when appending new deliveries the range can wrap without needing a second iteration.
+        if (disposition.getSettled()) {
+            unsettled.removeEach((int) disposition.getFirst(), (int) disposition.getLast(), (delivery) -> {
+                delivery.getLink().remoteDisposition(disposition, delivery);
+            });
         } else {
-            handleDispositions(unsettled.tailMap(first, true), disposition);
-            handleDispositions(unsettled.headMap(last, true), disposition);
-        }
-    }
-
-    private static void handleDispositions(Map<UnsignedInteger, ProtonOutgoingDelivery> deliveries, Disposition disposition) {
-        final boolean settled = disposition.getSettled();
-
-        Iterator<ProtonOutgoingDelivery> deliveriesIter = deliveries.values().iterator();
-        while (deliveriesIter.hasNext()) {
-            ProtonOutgoingDelivery delivery = deliveriesIter.next();
-
-            if (settled) {
-                deliveriesIter.remove();
-            }
-
-            delivery.getLink().remoteDisposition(disposition, delivery);
+            unsettled.forEach((int) disposition.getFirst(), (int) disposition.getLast(), (delivery) -> {
+                delivery.getLink().remoteDisposition(disposition, delivery);
+            });
         }
     }
 
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java
new file mode 100644
index 0000000..cf362df
--- /dev/null
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java
@@ -0,0 +1,1128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.util;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.qpid.protonj2.types.UnsignedInteger;
+
+/**
+ * A specialized collection like entity that is used to keep track of unsettled
+ * incoming and outgoing deliveries for AMQP links and the sessions that manage
+ * those links.
+ *
+ * @param <Delivery> The delivery type being tracker (incoming or outgoing)
+ */
+public class UnsettledMap<Delivery> implements Map<UnsignedInteger, Delivery> {
+
+    public interface UnsettledGetDeliveryId<Delivery> {
+        int getDeliveryId(Delivery delivery);
+    }
+
+    private final double BUCKET_LOAD_FACTOR_MULTIPLIER = 0.30;
+
+    private static final int UNSETTLED_INITIAL_BUCKETS = 2;
+    private static final int UNSETTLED_BUCKET_SIZE = 256;
+
+    // Always full buckets used in operations that needs unwritable bounds
+    private final UnsettledBucket<Delivery> ALWAYS_FULL_BUCKET = new UnsettledBucket<>();
+
+    private final UnsettledGetDeliveryId<Delivery> deliveryIdSupplier;
+    private final int bucketCapacity;
+    private final int bucketLowWaterMark;
+
+    private int totalEntries;
+    private int modCount;
+
+    private int current;
+    private UnsettledBucket<Delivery>[] buckets;
+
+    public UnsettledMap(UnsettledGetDeliveryId<Delivery> idSupplier) {
+        this(idSupplier, UNSETTLED_INITIAL_BUCKETS, UNSETTLED_BUCKET_SIZE);
+    }
+
+    public UnsettledMap(UnsettledGetDeliveryId<Delivery> idSupplier, int initialBuckets) {
+        this(idSupplier, initialBuckets, UNSETTLED_BUCKET_SIZE);
+    }
+
+    @SuppressWarnings("unchecked")
+    public UnsettledMap(UnsettledGetDeliveryId<Delivery> idSupplier, int initialBuckets, int bucketSize) {
+        this.deliveryIdSupplier = idSupplier;
+        this.bucketCapacity = bucketSize;
+        this.bucketLowWaterMark = (int) (bucketSize * BUCKET_LOAD_FACTOR_MULTIPLIER);
+
+        if (bucketSize < 1) {
+            throw new IllegalArgumentException("The bucket size must be greater than zero");
+        }
+
+        if (initialBuckets < 1) {
+            throw new IllegalArgumentException("The initial number of buckets must be at least 1");
+        }
+
+        buckets = new UnsettledBucket[initialBuckets];
+        for (int i = 0; i < buckets.length; ++i) {
+            buckets[i] = new UnsettledBucket<>(bucketSize, deliveryIdSupplier);
+        }
+    }
+
+    @Override
+    public void putAll(Map<? extends UnsignedInteger, ? extends Delivery> source) {
+        for (Entry<? extends UnsignedInteger, ? extends Delivery> entry : source.entrySet()) {
+            put(entry.getKey(), entry.getValue());
+        }
+    }
+
+    @Override
+    public void clear() {
+        for (int i = 0; i < buckets.length && buckets[i].isReadable(); ++i) {
+            buckets[i].clear(); // Ensure any referenced deliveries are cleared
+        }
+
+        current = totalEntries = 0;
+    }
+
+    @Override
+    public Delivery put(UnsignedInteger key, Delivery value) {
+        return put(key.intValue(), value);
+    }
+
+    /**
+     * Adds the given key and value pair in this tracking structure at the end of the current series.
+     * <p>
+     * If the map previously contained a mapping for the key, the old value is not replaced by the specified
+     * value unlike a traditional map as this structure is tracking the running series of values. This would
+     * imply that duplicates can exist in the tracker, however given the likelihood of this occurring in the
+     * normal flow of deliveries should be considered extremely low.
+     *
+     * @param deliveryId
+     * 		The delivery ID of the delivery being added to this tracker.
+     * @param delivery
+     * 		The delivery that is being added to the tracker
+     *
+     * @return null in all cases as this tracker does not check for duplicates.
+     */
+    public Delivery put(int deliveryId, Delivery delivery) {
+        if (!buckets[current].put(deliveryId, delivery)) {
+            // Always move to next bucket or create one so that current is always
+            // position on a writable bucket.
+            if (++current == buckets.length) {
+                // Create a new bucket of entries since we don't have any more space free yet
+                // and update the chain with the newly create bucket. We disregard the max segments
+                // here since we always need the extra space regardless of how many pending unsettled
+                // deliveries there are.
+                buckets = Arrays.copyOf(buckets, current + 1);
+                buckets[current] = new UnsettledBucket<>(bucketCapacity, deliveryIdSupplier);
+            }
+
+            // Moved on after overflow so we know this one will work.
+            buckets[current].put(deliveryId, delivery);
+        }
+
+        totalEntries++;
+        modCount++;
+
+        return null;
+    }
+
+    @Override
+    public int size() {
+        return totalEntries;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return totalEntries == 0;
+    }
+
+    @Override
+    public Delivery get(Object key) {
+        return get(Number.class.cast(key).intValue());
+    }
+
+    public Delivery get(int deliveryId) {
+        if (totalEntries == 0) {
+            return null;
+        }
+
+        // Search every bucket because delivery IDs can wrap around, but we can
+        // stop at the first empty bucket as all buckets following it must also
+        // be empty buckets.
+        for (int i = 0; i <= current; ++i) {
+            if (buckets[i].isInRange(deliveryId)) {
+                final Delivery result = buckets[i].get(deliveryId);
+                if (result != null) {
+                    return result;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public Delivery remove(Object key) {
+        return removeValue(Number.class.cast(key).intValue());
+    }
+
+    public Delivery remove(int deliveryId) {
+        return removeValue(deliveryId);
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return containsKey(Number.class.cast(key).intValue());
+    }
+
+    public boolean containsKey(int key) {
+        if (totalEntries > 0) {
+            for (int i = 0; i <= current; ++i) {
+                if (buckets[i].isInRange(key)) {
+                    if (buckets[i].get(key) != null) {
+                        return true;
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        if (totalEntries > 0 && value != null) {
+            for (int i = 0; i <= current; ++i) {
+                for (int j = buckets[i].readOffset; j < buckets[i].writeOffset; ++j) {
+                    if (value.equals(buckets[i].entryAt(j))) {
+                        return true;
+                    }
+                }
+            }
+        }
+
+        return false;
+    }
+
+    public void forEach(Consumer<Delivery> action) {
+        Objects.requireNonNull(action);
+
+        if (totalEntries == 0) {
+            return;
+        }
+
+        for (int i = 0; i <= current; ++i) {
+            for (int j = buckets[i].readOffset; j < buckets[i].writeOffset; ++j) {
+                action.accept(buckets[i].entryAt(j));
+            }
+        }
+    }
+
+    /**
+     * Visits each entry within the given range and invokes the provided action
+     * on each delivery in the tracker.
+     *
+     * @param first
+     * 		The first entry to visit.
+     * @param last
+     * 		The last entry to visit.
+     * @param action
+     * 		The action to invoke on each visited entry.
+     */
+    public void forEach(int first, int last, Consumer<Delivery> action) {
+        Objects.requireNonNull(action);
+
+        if (totalEntries == 0) {
+            return;
+        }
+
+        boolean foundFirst = false;
+        boolean foundLast = false;
+
+        for (int i = 0; i <= current && !foundLast; ++i) {
+            if (!foundFirst && !buckets[i].isInRange(first)) {
+                continue;
+            }
+
+            for (int j = buckets[i].readOffset; j < buckets[i].writeOffset && !foundLast; ++j) {
+                final Delivery delivery = buckets[i].entryAt(j);
+                final int deliveryId = deliveryIdSupplier.getDeliveryId(delivery);
+
+                foundFirst = foundFirst || deliveryId == first;
+                foundLast = deliveryId == last;
+
+                if (foundFirst) {
+                    action.accept(delivery);
+                }
+            }
+        }
+    }
+
+    /**
+     * Remove each entry within the given range of delivery IDs. For each entry
+     * removed the provided action is triggered allowing the caller to be notified
+     * of each removal.
+     *
+     * @param first
+     * 		The first entry to remove
+     * @param last
+     *      The last entry to remove
+     * @param action
+     * 		The action to invoke on each remove.
+     */
+    public void removeEach(int first, int last, Consumer<Delivery> action) {
+        Objects.requireNonNull(action);
+
+        if (totalEntries == 0) {
+            return;
+        }
+
+        boolean foundFirst = false;
+        boolean foundLast = false;
+        int removeStart = 0;
+        int removeEnd = 0;
+
+        for (int i = 0; i <= current && !foundLast; ++i) {
+            if (!foundFirst && !buckets[i].isInRange(first)) {
+                continue;
+            }
+
+            final UnsettledBucket<Delivery> bucket = buckets[i];
+
+            for (int j = removeStart = removeEnd = bucket.readOffset; j < bucket.writeOffset && !foundLast; ) {
+                final Delivery delivery = bucket.entryAt(j);
+                final int deliveryId = deliveryIdSupplier.getDeliveryId(delivery);
+
+                foundFirst = foundFirst || deliveryId == first;
+                foundLast = deliveryId == last;
+
+                if (foundFirst) {
+                    action.accept(delivery);
+                    removeEnd = ++j;
+                } else {
+                    removeStart = removeEnd = ++j;
+                }
+            }
+
+            // We found first so this iteration did clear some elements from the current bucket
+            // and we need to check that this removal cleared a full bucket which means the index
+            // needs to shift back one since we will have recycled the empty bucket. When the last
+            // index is found we should also attempt to compact the bucket with the previous one
+            // to reduce fragmentation.
+            if (foundFirst) {
+                i = removeRange(i, removeStart, removeEnd, foundLast) ? --i : i;
+            }
+        }
+    }
+
+    private boolean removeRange(int bucketIndex, int start, int end, boolean compact) {
+        final UnsettledBucket<Delivery> bucket = buckets[bucketIndex];
+        final int removals = end - start;
+
+        this.totalEntries -= removals;
+        this.modCount++;
+
+        if (removals == bucket.entries) {
+            return recycleBucket(bucketIndex);
+        } else {
+            System.arraycopy(bucket.deliveries, end, bucket.deliveries, start, bucket.writeOffset - end);
+            Arrays.fill(bucket.deliveries, bucket.writeOffset - removals, bucket.writeOffset, null);
+
+            bucket.writeOffset = bucket.writeOffset - removals;
+            bucket.entries -= removals;
+            bucket.highestDeliveryId = bucket.entryIdAt(bucket.writeOffset - 1);
+
+            if (compact) {
+                return tryCompact(bucketIndex);
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public void forEach(BiConsumer<? super UnsignedInteger, ? super Delivery> action) {
+        Objects.requireNonNull(action);
+
+        if (totalEntries == 0) {
+            return;
+        }
+
+        for (int i = 0; i <= current; ++i) {
+            for (int j = buckets[i].readOffset; j < buckets[i].writeOffset; ++j) {
+                final Delivery delivery = buckets[i].entryAt(j);
+                action.accept(UnsignedInteger.valueOf(deliveryIdSupplier.getDeliveryId(delivery)), delivery);
+            }
+        }
+    }
+
+    @Override
+    public Collection<Delivery> values() {
+        if (values == null) {
+            values = new UnsettledTackingMapValues();
+        }
+
+        return values;
+    }
+
+    @Override
+    public Set<UnsignedInteger> keySet() {
+        if (keySet == null) {
+            keySet = new UnsettledTackingMapKeys();
+        }
+
+        return this.keySet;
+    }
+
+    @Override
+    public Set<Entry<UnsignedInteger, Delivery>> entrySet() {
+        if (entrySet == null) {
+            entrySet = new UnsettledTackingMapEntries();
+        }
+
+        return this.entrySet;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof Map)) {
+            return false;
+        }
+
+        Map<?,?> m = (Map<?,?>) o;
+        if (m.size() != size()) {
+            return false;
+        }
+
+        try {
+            for (int i = 0; i <= current; ++i) {
+                for (int j = buckets[i].readOffset; j < buckets[i].writeOffset; ++j) {
+                    final Delivery delivery = buckets[i].entryAt(j);
+                    if (!delivery.equals(m.get(deliveryIdSupplier.getDeliveryId(delivery)))) {
+                        return false;
+                    }
+                }
+            }
+        } catch (ClassCastException | NullPointerException ignored) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "UnsettledMap: { size=" + totalEntries +
+                              " buckets=" + buckets.length +
+                              " bucket-capacity=" + bucketCapacity + " }";
+    }
+
+    //----- Internal UnsettledMap API
+
+    private boolean recycleBucket(int index) {
+        buckets[index].clear();  // Drop all content and reset as empty bucket
+
+        // If the current bucket is cleared then we can just start writing in it again
+        if (index < current) {
+            current--;
+            final int next = index + 1;
+            UnsettledBucket<Delivery> recycled = buckets[index];
+            System.arraycopy(buckets, next, buckets, index, buckets.length - next);
+            buckets[buckets.length - 1] = recycled;
+            return true;
+        }
+
+        return false;
+    }
+
+    private Delivery removeValue(int deliveryId) {
+        if (totalEntries > 0) {
+            Delivery result = null;
+
+            for (int i = 0; i <= current; ++i) {
+                if (buckets[i].isInRange(deliveryId) && (result = buckets[i].remove(deliveryId)) != null) {
+                    totalEntries--;
+                    modCount++;
+
+                    if (buckets[i].entries <= bucketLowWaterMark) {
+                        tryCompact(i);
+                    }
+
+                    return result;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    // Called from iteration APIs which requires the method to return the location of the next
+    // entry once removal and possible bucket compaction is completed.
+    private long removeAt(int bucketIndex, int bucketEntry) {
+        if (bucketIndex >= buckets.length || bucketEntry >= UNSETTLED_BUCKET_SIZE) {
+            throw new IndexOutOfBoundsException(String.format(
+                "Cannot remove an entry from segment %d at index %d which is outside the tracked bounds", bucketIndex, bucketEntry));
+        }
+
+        final UnsettledBucket<Delivery> bucket = buckets[bucketIndex];
+
+        bucket.removeAt(bucketEntry);
+        totalEntries--;
+        modCount++;
+
+        long result = -1;
+
+        if (bucket.isReadable()) {
+            final int nextBucketIndex = bucketIndex + 1;
+            final int prevBucketIndex = bucketIndex - 1;
+
+            final UnsettledBucket<Delivery> nextBucket =
+                bucketIndex == current || Integer.compareUnsigned(bucket.highestDeliveryId, buckets[nextBucketIndex].lowestDeliveryId) > 0 ?
+                    ALWAYS_FULL_BUCKET : buckets[nextBucketIndex];
+            final UnsettledBucket<Delivery> prevBucket =
+                bucketIndex == 0 || Integer.compareUnsigned(bucket.lowestDeliveryId, buckets[prevBucketIndex].highestDeliveryId) < 0 ?
+                    ALWAYS_FULL_BUCKET : buckets[prevBucketIndex];
+
+            // As soon as compaction is possible move elements from this bucket into previous and next
+            // which reduces search times as there are fewer buckets to traverse/
+            if (nextBucket.getFreeSpace() + prevBucket.getFreeSpace() >= bucket.entries) {
+                final int toCopyBackward = Math.min(prevBucket.getFreeSpace(), bucket.entries);
+                final int nextEntryOffset = ++bucketEntry - (bucket.readOffset + toCopyBackward);
+
+                if (nextEntryOffset < 0) {
+                    // Moved into the previous bucket so the index being negative
+                    // give us the located when added to the previous write offset
+                    result = (long) (prevBucketIndex) << 32;
+                    result = prevBucket.writeOffset + nextEntryOffset;
+                } else if (nextBucket.entries + (bucket.entries - toCopyBackward) > 0) {
+                    // Moved into the next bucket gives of the raw index so long
+                    // as we compact entries to zero (otherwise it is the read offset
+                    result = (long) bucketIndex << 32;
+                    result |= nextEntryOffset;
+                }
+
+                doCompaction(bucket, prevBucket, nextBucket);
+                recycleBucket(bucketIndex);
+            } else {
+                // If the element removed is not the last in this bucket then the next is just
+                // the next element otherwise it is the first element of the next bucket if it
+                // non-empty otherwise we reached the end of the elements.
+                if (++bucketEntry < bucket.writeOffset) {
+                    result = (long) bucketIndex << 32;
+                    result |= bucketEntry;
+                } else if (nextBucketIndex <= current && buckets[nextBucketIndex].entries > 0) {
+                    result = (long) nextBucketIndex << 32;
+                    result |= buckets[nextBucketIndex].readOffset;
+                }
+            }
+        } else {
+            recycleBucket(bucketIndex);
+
+            // The bucket was empty and will be recycled shifting down all buckets following it and if
+            // there is a next non-empty bucket then the next entry is the first entry in that bucket.
+            if (bucketIndex <= current && buckets[bucketIndex].entries > 0) {
+                result = (long) bucketIndex << 32;
+                result |= buckets[bucketIndex].readOffset;
+            }
+        }
+
+        return result;
+    }
+
+    private final void doCompaction(UnsettledBucket<Delivery> bucket, UnsettledBucket<Delivery> prev, UnsettledBucket<Delivery> next) {
+        if (prev.getFreeSpace() > 0) {
+            final int toCopy = Math.min(bucket.entries, prev.getFreeSpace());
+
+            // We always compact to zero which makes some of the other updates simpler and
+            // can allow for easier compaction in the future.
+            if (prev.readOffset != 0) {
+                System.arraycopy(prev.deliveries, prev.readOffset, prev.deliveries, 0, prev.entries);
+                if (prev.writeOffset > prev.entries + toCopy) {
+                    // Ensure no dangling entries after compaction
+                    Arrays.fill(prev.deliveries, prev.entries, prev.writeOffset, null);
+                }
+
+                prev.writeOffset -= prev.readOffset;
+                prev.readOffset = 0;
+            }
+
+            System.arraycopy(bucket.deliveries, bucket.readOffset, prev.deliveries, prev.writeOffset, toCopy);
+
+            prev.entries += toCopy;
+            prev.writeOffset = prev.entries;
+            prev.highestDeliveryId = prev.entryIdAt(prev.writeOffset - 1);
+
+            bucket.entries -= toCopy;
+            bucket.writeOffset -= toCopy;
+            bucket.readOffset += toCopy;
+        }
+
+        // We didn't get them all into the previous bucket but we know that if we are
+        // here then there must be space ahead to accept the rest as we already checked.
+        if (bucket.entries > 0) {
+            if (next.readOffset != bucket.entries) {
+                System.arraycopy(next.deliveries, next.readOffset, next.deliveries, bucket.entries, next.entries);
+                if (next.readOffset < bucket.entries) {
+                    // Ensure no dangling entries after compaction
+                    Arrays.fill(next.deliveries, bucket.entries + next.entries, next.deliveries.length, null);
+                }
+            }
+
+            System.arraycopy(bucket.deliveries, bucket.readOffset, next.deliveries, 0, bucket.entries);
+
+            next.readOffset = 0;
+            next.entries += bucket.entries;
+            next.writeOffset = next.entries;
+            next.lowestDeliveryId = next.entryIdAt(0);
+            // We set this since the next bucket could be empty in some cases so it would
+            // need to be initialized.
+            next.highestDeliveryId = next.entryIdAt(next.writeOffset - 1);
+        }
+    }
+
+    // This variant is called from the Map API remove methods and doesn't need to track bucket
+    // compaction results or next element locations which increases performance in this case.
+    private boolean tryCompact(int bucketIndex) {
+        final UnsettledBucket<Delivery> bucket = buckets[bucketIndex];
+
+        final int nextBucketIndex = bucketIndex + 1;
+        final int prevBucketIndex = bucketIndex - 1;
+
+        if (bucket.isReadable()) {
+            final UnsettledBucket<Delivery> nextBucket =
+                bucketIndex == current || Integer.compareUnsigned(bucket.highestDeliveryId, buckets[nextBucketIndex].lowestDeliveryId) > 0 ?
+                    ALWAYS_FULL_BUCKET : buckets[nextBucketIndex];
+            final UnsettledBucket<Delivery> prevBucket =
+                bucketIndex == 0 || Integer.compareUnsigned(bucket.lowestDeliveryId, buckets[prevBucketIndex].highestDeliveryId) < 0 ?
+                    ALWAYS_FULL_BUCKET : buckets[prevBucketIndex];
+
+            // As soon as compaction is possible move elements from this bucket into previous and next
+            // which reduces search times as there are fewer buckets to traverse/
+            if (nextBucket.getFreeSpace() + prevBucket.getFreeSpace() >= bucket.entries) {
+                doCompaction(bucket, prevBucket, nextBucket);
+                return recycleBucket(bucketIndex);
+            }
+        } else {
+            return recycleBucket(bucketIndex);
+        }
+
+        return false;
+    }
+
+    //----- Internal bucket of delivery sequence
+
+    private static class UnsettledBucket<Delivery> {
+
+        private int readOffset;
+        private int writeOffset;
+        private int entries;
+        private int lowestDeliveryId = 0;
+        private int highestDeliveryId = 0;
+
+        private final Object[] deliveries;
+        private final UnsettledGetDeliveryId<Delivery> deliveryIdSupplier;
+
+        private UnsettledBucket() {
+            this.deliveryIdSupplier = null;
+            this.deliveries = new Object[0];
+            this.highestDeliveryId = UnsignedInteger.MAX_VALUE.intValue();
+        }
+
+        public UnsettledBucket(int bucketCapacity, UnsettledGetDeliveryId<Delivery> idSupplier) {
+            this.deliveryIdSupplier = idSupplier;
+            this.deliveries = new Object[bucketCapacity];
+        }
+
+        public boolean isReadable() {
+            return entries > 0;
+        }
+
+        public int getFreeSpace() {
+            return deliveries.length - entries;
+        }
+
+        public boolean isFull() {
+            return writeOffset == deliveries.length;
+        }
+
+        public boolean isInRange(int deliveryId) {
+            return Integer.compareUnsigned(deliveryId, lowestDeliveryId) >= 0 &&
+                   Integer.compareUnsigned(deliveryId, highestDeliveryId) <= 0;
+        }
+
+        public boolean put(int deliveryId, Delivery delivery) {
+            // Reject an addition if full or if the delivery ID to be added is less that
+            // the highest id we have tracked as that likely indicates a roll-over of the
+            // IDs and must go into the next bucket so that this bucket is kept in order.
+            if (isFull() || (Integer.compareUnsigned(deliveryId, highestDeliveryId) <= 0 && entries > 0)) {
+                return false;
+            }
+
+            if (entries == 0) {
+                lowestDeliveryId = deliveryId;
+            }
+
+            highestDeliveryId = deliveryId;
+            deliveries[writeOffset++] = delivery;
+            entries++;
+
+            return true;
+        }
+
+        @SuppressWarnings("unchecked")
+        public Delivery get(int deliveryId) {
+            Delivery delivery;
+
+            // Be optimistic and assume the result is in the first entry and then if not search
+            // beyond that entry for the result.
+            if (deliveryIdSupplier.getDeliveryId(delivery = (Delivery) deliveries[readOffset]) == deliveryId) {
+                return delivery;
+            } else {
+                final int location = search(deliveryId, readOffset + 1, writeOffset);
+                if (location >= 0) {
+                    return (Delivery) deliveries[location];
+                }
+            }
+
+            return null;
+        }
+
+        @SuppressWarnings("unchecked")
+        public Delivery entryAt(int index) {
+            return (Delivery) deliveries[index];
+        }
+
+        @SuppressWarnings("unchecked")
+        public int entryIdAt(int index) {
+            return deliveryIdSupplier.getDeliveryId((Delivery) deliveries[index]);
+        }
+
+        @SuppressWarnings("unchecked")
+        public Delivery remove(int deliveryId) {
+            // Be optimistic and assume the result is in the first entry and then if not search
+            // beyond that entry for the result.
+            if (deliveryIdSupplier.getDeliveryId((Delivery) deliveries[readOffset]) == deliveryId) {
+                return removeAt(readOffset);
+            } else {
+                final int location = search(deliveryId, readOffset + 1, writeOffset);
+                if (location >= 0) {
+                    return removeAt(location);
+                }
+            }
+
+            return null;
+        }
+
+        @SuppressWarnings("unchecked")
+        public Delivery removeAt(int bucketEntry) {
+            final Delivery delivery = (Delivery) deliveries[bucketEntry];
+
+            entries--;
+
+            // If not the readOffset we compact the entries to avoid null gaps in the entries
+            // which complicates searches and makes bulk assignments or copies impossible.
+            if (bucketEntry != readOffset) {
+                System.arraycopy(deliveries, readOffset, deliveries, readOffset + 1, bucketEntry - readOffset);
+                deliveries[readOffset++] = null;
+                // If we remove the last entry then we can reduce the highest delivery ID in this
+                // bucket to avoid false positive matches when randomly accessing elements unless
+                // unordered in which case there could be duplicate entries
+                if (bucketEntry == writeOffset) {
+                    highestDeliveryId = deliveryIdSupplier.getDeliveryId((Delivery) deliveries[writeOffset - 1]);
+                }
+            } else {
+                deliveries[readOffset++] = null;
+                // We removed the first element meaning we now must increase the lowest entry to
+                // avoid false positives when accessing randomly unless unordered since there could
+                // be duplicates
+                if (entries > 0) {
+                    lowestDeliveryId = deliveryIdSupplier.getDeliveryId((Delivery) deliveries[readOffset]);
+                }
+            }
+
+            return delivery;
+        }
+
+        public void clear() {
+            if (entries != 0) {
+                Arrays.fill(deliveries, null);
+                entries = 0;
+            }
+
+            // Ensures the first put always assigns this
+            lowestDeliveryId = UnsignedInteger.MAX_VALUE.intValue();
+            highestDeliveryId = 0;
+            writeOffset = 0;
+            readOffset = 0;
+        }
+
+        @Override
+        public String toString() {
+            return "UnsettledBucket { size=" + entries +
+                                    " roff=" + readOffset +
+                                    " woff=" + writeOffset +
+                                    " lowID=" + lowestDeliveryId +
+                                    " highID=" + highestDeliveryId + " }";
+        }
+
+        private static int BINARY_SEARCH_THRESHOLD = 64;
+
+        // Given the behavior of binary search it doesn't make sense to employ
+        // it on spans under a certain number if elements
+        private int search(int deliveryId, int fromIndex, int toIndex) {
+            if ((toIndex - fromIndex) < BINARY_SEARCH_THRESHOLD) {
+                return linearSearch(deliveryId, fromIndex, toIndex);
+            } else {
+                return binarySearch(deliveryId, fromIndex, toIndex);
+            }
+         }
+
+        // Must use our own to avoid boxing for unsigned integer comparison.
+        // fromIndex is inclusive since we search from readOffset normally
+        // toIndex is exclusive since we search to writeOffset normally.
+        @SuppressWarnings("unchecked")
+        private int binarySearch(int deliveryId, int fromIndex, int toIndex) {
+            int low = fromIndex;
+            int high = toIndex - 1;
+
+            while (low <= high) {
+                final int mid = (low + high) >>> 1;
+                final int midDeliveryId = deliveryIdSupplier.getDeliveryId((Delivery) deliveries[mid]);
+                final int cmp = UnsignedInteger.compare(midDeliveryId, deliveryId);
+
+                if (cmp < 0) {
+                    low = mid + 1;
+                } else if (cmp > 0) {
+                    high = mid - 1;
+                } else {
+                    return mid;  // first matching delivery
+                }
+            }
+
+            return -1; // signal that delivery ID is not in this bucket
+        }
+
+        // Must use our own to avoid boxing for unsigned integer comparison.
+        // fromIndex is inclusive since we search from readOffset normally
+        // toIndex is exclusive since we search to writeOffset normally.
+        @SuppressWarnings("unchecked")
+        private int linearSearch(int deliveryId, int fromIndex, int toIndex) {
+            for (int i = fromIndex; i < toIndex; ++i) {
+                final int idAtIndex = deliveryIdSupplier.getDeliveryId((Delivery) deliveries[i]);
+                final int comp = UnsignedInteger.compare(idAtIndex, deliveryId);
+
+                if (comp == 0) {
+                    return i;
+                } else if (comp > 0) {
+                    // Can't be in this bucket because we already found a larger value
+                    break;
+                }
+            }
+
+            return -1;
+        }
+    }
+
+    //----- Internal cached values for the various collection type access objects
+
+    // Once requested we will create an store a single instance to a collection
+    // with no state for each of the key, values ,entries types. Since the types do
+    // not have state the trivial race on create is not important to the eventual
+    // outcome of having a cached instance.
+
+    protected Set<UnsignedInteger> keySet;
+    protected Collection<Delivery> values;
+    protected Set<Entry<UnsignedInteger, Delivery>> entrySet;
+
+    //----- Unsettled Tracking Map Collection types
+
+    private final class UnsettledTackingMapValues extends AbstractCollection<Delivery> {
+
+        @Override
+        public Iterator<Delivery> iterator() {
+            return new UnsettledTrackingMapValuesIterator(0);
+        }
+
+        @Override
+        public int size() {
+            return UnsettledMap.this.totalEntries;
+        }
+
+        @Override
+        public boolean contains(Object o) {
+            return UnsettledMap.this.containsValue(o);
+        }
+
+        @Override
+        public boolean remove(Object target) {
+            @SuppressWarnings("unchecked")
+            final int targetId = UnsettledMap.this.deliveryIdSupplier.getDeliveryId((Delivery) target);
+
+            return UnsettledMap.this.remove(targetId) != null;
+        }
+
+        @Override
+        public void clear() {
+            UnsettledMap.this.clear();
+        }
+    }
+
+    private final class UnsettledTackingMapKeys extends AbstractSet<UnsignedInteger> {
+
+        @Override
+        public Iterator<UnsignedInteger> iterator() {
+            return new UnsettledTrackingMapKeysIterator(0);
+        }
+
+        @Override
+        public int size() {
+            return UnsettledMap.this.totalEntries;
+        }
+
+        @Override
+        public boolean contains(Object o) {
+            return UnsettledMap.this.containsKey(o);
+        }
+
+        @Override
+        public boolean remove(Object target) {
+            return UnsettledMap.this.remove(Number.class.cast(target).intValue()) != null;
+        }
+
+        @Override
+        public void clear() {
+            UnsettledMap.this.clear();
+        }
+    }
+
+    private final class UnsettledTackingMapEntries extends AbstractSet<Map.Entry<UnsignedInteger, Delivery>> {
+
+        @Override
+        public Iterator<Map.Entry<UnsignedInteger, Delivery>> iterator() {
+            return new UnsettledTrackingMapEntryIterator(0);
+        }
+
+        @Override
+        public int size() {
+            return UnsettledMap.this.totalEntries;
+        }
+
+        @Override
+        public boolean contains(Object target) {
+            if (target instanceof Map.Entry) {
+                @SuppressWarnings("unchecked")
+                final Entry<? extends UnsignedInteger, ? extends Delivery> entry =
+                    (Entry<? extends UnsignedInteger, ? extends Delivery>) target;
+                return UnsettledMap.this.containsKey(entry.getKey());
+            }
+
+            return false;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public boolean remove(Object target) {
+            if (target instanceof Map.Entry) {
+                final Entry<? extends UnsignedInteger, ? extends Delivery> entry =
+                    (Entry<? extends UnsignedInteger, ? extends Delivery>) target;
+                return UnsettledMap.this.remove(entry.getKey()) != null;
+            }
+
+            return false;
+        }
+
+        @Override
+        public void clear() {
+            UnsettledMap.this.clear();
+        }
+    }
+
+    //----- Map Iterator implementation for EntrySet, KeySet and Values collections
+
+    // Base class iterator that can be used for the collections returned from the Map
+    private abstract class UnsettledTrackingMapIterator<T> implements Iterator<T> {
+
+        protected int currentBucket;
+        protected int readOffset;
+
+        protected T lastReturned;
+        protected int lastReturnedbucket;
+        protected int lastReturnedbucketIndex;
+
+        protected int expectedModCount;
+
+        public UnsettledTrackingMapIterator(int startAt) {
+            this.currentBucket = buckets[startAt].isReadable() ? startAt : -1;
+            this.readOffset = buckets[startAt].isReadable() ? buckets[startAt].readOffset : -1;
+            this.expectedModCount = UnsettledMap.this.modCount;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return readOffset >= 0;
+        }
+
+        @Override
+        public T next() {
+            if (readOffset == -1) {
+                throw new NoSuchElementException();
+            }
+            if (expectedModCount != UnsettledMap.this.modCount) {
+                throw new ConcurrentModificationException();
+            }
+
+            lastReturnedbucket = currentBucket;
+            lastReturnedbucketIndex = readOffset;
+            lastReturned = entryAt(currentBucket, readOffset);
+            successor();
+
+            return lastReturned;
+        }
+
+        protected abstract T entryAt(int bucketIndex, int bucketEntry);
+
+        @Override
+        public void remove() {
+            if (lastReturned == null) {
+                throw new IllegalStateException("Cannot remove entry when next has not been called");
+            }
+            if (modCount != expectedModCount) {
+                throw new ConcurrentModificationException();
+            }
+
+            long next = UnsettledMap.this.removeAt(lastReturnedbucket, lastReturnedbucketIndex);
+            if (next >= 0) {
+                currentBucket = (int) (next >> 32);
+                readOffset = (int) (next & 0xFFFFFFFF);
+            } else {
+                readOffset = -1;
+            }
+
+            expectedModCount = modCount;
+            lastReturned = null;
+        }
+
+        private void successor() {
+            if (++readOffset == buckets[currentBucket].writeOffset) {
+                currentBucket++;
+                if (currentBucket < buckets.length && buckets[currentBucket].isReadable()) {
+                    readOffset = buckets[currentBucket].readOffset;
+                } else {
+                    readOffset = -1;
+                }
+            }
+        }
+    }
+
+    private final class UnsettledTrackingMapValuesIterator extends UnsettledTrackingMapIterator<Delivery> {
+
+        public UnsettledTrackingMapValuesIterator(int startAt) {
+            super(startAt);
+        }
+
+        @Override
+        protected Delivery entryAt(int bucketIndex, int bucketEntry) {
+            return buckets[currentBucket].entryAt(bucketEntry);
+        }
+    }
+
+    private final class UnsettledTrackingMapKeysIterator extends UnsettledTrackingMapIterator<UnsignedInteger> {
+
+        public UnsettledTrackingMapKeysIterator(int startAt) {
+            super(startAt);
+        }
+
+        @Override
+        protected UnsignedInteger entryAt(int bucketIndex, int bucketEntry) {
+            return UnsignedInteger.valueOf(deliveryIdSupplier.getDeliveryId(buckets[currentBucket].entryAt(bucketEntry)));
+        }
+    }
+
+    private final class UnsettledTrackingMapEntryIterator extends UnsettledTrackingMapIterator<Entry<UnsignedInteger, Delivery>> {
+
+        public UnsettledTrackingMapEntryIterator(int startAt) {
+            super(startAt);
+        }
+
+        @Override
+        protected Entry<UnsignedInteger, Delivery> entryAt(int bucketIndex, int bucketEntry) {
+            final Delivery delivery = buckets[currentBucket].entryAt(bucketEntry);
+
+            return new ImmutableUnsettledTrackingkMapEntry<Delivery>(deliveryIdSupplier.getDeliveryId(delivery), delivery);
+        }
+    }
+
+    /**
+     * An immutable {@link Map} entry that can be used when exposing raw entry mappings
+     * via the {@link Map} API.
+     *
+     * @param <Delivery> Type of the value portion of this immutable entry.
+     */
+    public static class ImmutableUnsettledTrackingkMapEntry<Delivery> implements Map.Entry<UnsignedInteger, Delivery> {
+
+        private final int key;
+        private final Delivery value;
+
+        /**
+         * Create a new immutable {@link Map} entry.
+         *
+         * @param key
+         * 		The inner {@link Map} key that is wrapped.
+         * @param value
+         * 		The inner {@link Map} value that is wrapped.
+         */
+        public ImmutableUnsettledTrackingkMapEntry(int key, Delivery value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public UnsignedInteger getKey() {
+            return UnsignedInteger.valueOf(key);
+        }
+
+        /**
+         * @return the primitive integer view of the unsigned key.
+         */
+        public int getPrimitiveKey() {
+            return key;
+        }
+
+        @Override
+        public Delivery getValue() {
+            return value;
+        }
+
+        @Override
+        public Delivery setValue(Delivery value) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java
new file mode 100644
index 0000000..e6889b0
--- /dev/null
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java
@@ -0,0 +1,1939 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.protonj2.logging.ProtonLogger;
+import org.apache.qpid.protonj2.logging.ProtonLoggerFactory;
+import org.apache.qpid.protonj2.types.UnsignedInteger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the unsettled delivery tacker
+ */
+public class UnsettledMapTest {
+
+    protected static final ProtonLogger LOG = ProtonLoggerFactory.getLogger(SplayMapTest.class);
+
+    protected long seed;
+    protected Random random;
+    protected UnsignedInteger uintArray[] = new UnsignedInteger[1000];
+    protected DeliveryType objArray[] = new DeliveryType[1000];
+    protected UnsettledMap<DeliveryType> tracker;
+
+    @BeforeEach
+    public void setUp() {
+        seed = System.nanoTime();
+        random = new Random();
+        random.setSeed(seed);
+
+        tracker = new UnsettledMap<>(DeliveryType::getDeliveryId);
+
+        for (int i = 1; i <= objArray.length; i++) {
+            UnsignedInteger x = uintArray[i - 1] = UnsignedInteger.valueOf(i);
+            DeliveryType y = objArray[i - 1] = new DeliveryType(UnsignedInteger.valueOf(i).intValue());
+            tracker.put(x, y);
+        }
+    }
+
+    protected UnsettledMap<DeliveryType> createMap() {
+        return new UnsettledMap<>(DeliveryType::getDeliveryId);
+    }
+
+    protected UnsettledMap<DeliveryType> createMap(int numBuckets, int bucketSize) {
+        return new UnsettledMap<>(DeliveryType::getDeliveryId, numBuckets, bucketSize);
+    }
+
+    /**
+     * Simple delivery type used for this test
+     */
+    private class DeliveryType {
+
+        private final int deliveryId;
+
+        public DeliveryType(int deliveryid) {
+            this.deliveryId = deliveryid;
+        }
+
+        public int getDeliveryId() {
+            return deliveryId;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other instanceof DeliveryType) {
+                DeliveryType otherType = (DeliveryType) other;
+                return otherType.deliveryId == deliveryId;
+            }
+
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return "DeliveryType: { " + deliveryId + " }";
+        }
+    }
+
+    @Test
+    public void testCreateUnsettledTracker() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+        assertTrue(tracker.isEmpty());
+    }
+
+    @Test
+    public void testContainsKeyOnEmptyMap() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        assertFalse(tracker.containsKey(0));
+        assertFalse(tracker.containsKey(UnsignedInteger.ZERO));
+    }
+
+    @Test
+    public void testGetWhenEmpty() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        assertNull(tracker.get(0));
+    }
+
+    @Test
+    public void testGet() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(65535, new DeliveryType(65535));
+        tracker.put(-1, new DeliveryType(-1));
+
+        assertEquals(new DeliveryType(0), tracker.get(0));
+        assertEquals(new DeliveryType(1), tracker.get(1));
+        assertEquals(new DeliveryType(2), tracker.get(2));
+        assertEquals(new DeliveryType(65535), tracker.get(65535));
+        assertEquals(new DeliveryType(-1), tracker.get(-1));
+
+        assertNull(tracker.get(3));
+
+        assertEquals(5, tracker.size());
+    }
+
+    @Test
+    public void testGetUnsignedInteger() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(-3, new DeliveryType(-3));
+
+        assertEquals(new DeliveryType(0), tracker.get(UnsignedInteger.valueOf(0)));
+        assertEquals(new DeliveryType(1), tracker.get(UnsignedInteger.valueOf(1)));
+        assertEquals(new DeliveryType(-3), tracker.get(UnsignedInteger.valueOf(-3)));
+
+        assertNull(tracker.get(3));
+
+        assertEquals(3, tracker.size());
+    }
+
+    @Test
+    public void testContainsKey() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(-3, new DeliveryType(-3));
+
+        assertTrue(tracker.containsKey(0));
+        assertFalse(tracker.containsKey(3));
+
+        assertEquals(3, tracker.size());
+    }
+
+    @Test
+    public void testContainsKeyUnsignedInteger() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(UnsignedInteger.MAX_VALUE.intValue(), new DeliveryType(UnsignedInteger.MAX_VALUE.intValue()));
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+
+        assertTrue(tracker.containsKey(0));
+        assertFalse(tracker.containsKey(3));
+
+        assertEquals(3, tracker.size());
+    }
+
+    @Test
+    public void testContainsValue() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(-3, new DeliveryType(-3));
+
+        assertTrue(tracker.containsValue(new DeliveryType(0)));
+        assertFalse(tracker.containsValue(new DeliveryType(4)));
+
+        assertEquals(3, tracker.size());
+    }
+
+    @Test
+    public void testContainsValueOnEmptyMap() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        assertFalse(tracker.containsValue(new DeliveryType(0)));
+    }
+
+    @Test
+    public void testRemoveIsIdempotent() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(2, new DeliveryType(2));
+
+        assertEquals(3, tracker.size());
+
+        assertEquals(new DeliveryType(0), tracker.remove(0));
+        assertEquals(null, tracker.remove(0));
+
+        assertEquals(2, tracker.size());
+
+        assertEquals(new DeliveryType(1), tracker.remove(1));
+        assertEquals(null, tracker.remove(1));
+
+        assertEquals(1, tracker.size());
+
+        assertEquals(new DeliveryType(2), tracker.remove(2));
+        assertEquals(null, tracker.remove(2));
+
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testRemoveValueNotInMap() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(9, new DeliveryType(9));
+        tracker.put(7, new DeliveryType(7));
+        tracker.put(-1, new DeliveryType(-1));
+
+        assertNull(tracker.remove(5));
+    }
+
+    @Test
+    public void testRemoveFirstEntryTwice() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(16, new DeliveryType(16));
+
+        assertNotNull(tracker.remove(0));
+        assertNull(tracker.remove(0));
+    }
+
+    @SuppressWarnings("unlikely-arg-type")
+    @Test
+    public void testRemoveWithInvalidType() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+
+        try {
+            tracker.remove("foo");
+            fail("Should not accept incompatible types");
+        } catch (ClassCastException ccex) {}
+    }
+
+    @Test
+    public void testRemoveUnsignedInteger() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+        tracker.put(7, new DeliveryType(7));
+        tracker.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+        assertEquals(5, tracker.size());
+        assertNull(tracker.remove(UnsignedInteger.valueOf(5)));
+        assertEquals(5, tracker.size());
+        assertEquals(new DeliveryType(9), tracker.remove(UnsignedInteger.valueOf(9)));
+        assertEquals(4, tracker.size());
+    }
+
+    @SuppressWarnings("unlikely-arg-type")
+    @Test
+    public void testRemoveInteger() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+        tracker.put(7, new DeliveryType(7));
+        tracker.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+        assertEquals(5, tracker.size());
+        assertNull(tracker.remove(Integer.valueOf(5)));
+        assertEquals(5, tracker.size());
+        assertEquals(new DeliveryType(9), tracker.remove(Integer.valueOf(9)));
+        assertEquals(4, tracker.size());
+    }
+
+    @Test
+    public void testRemoveEntriesFromMiddleBucket() {
+        // Start with three buckets of size two
+        UnsettledMap<DeliveryType> tracker = createMap(3, 2);
+
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(3, new DeliveryType(3));
+        tracker.put(4, new DeliveryType(4));
+        tracker.put(5, new DeliveryType(5));
+        tracker.put(6, new DeliveryType(6));
+
+        assertEquals(6, tracker.size());
+
+        tracker.remove(3);
+        tracker.remove(4);
+
+        assertEquals(4, tracker.size());
+
+        assertTrue(tracker.containsKey(1));
+        assertTrue(tracker.containsKey(2));
+        assertTrue(tracker.containsKey(5));
+        assertTrue(tracker.containsKey(6));
+
+        assertFalse(tracker.containsKey(3));
+        assertFalse(tracker.containsKey(4));
+
+        tracker.put(7, new DeliveryType(7));
+        tracker.put(8, new DeliveryType(8));
+
+        assertEquals(6, tracker.size());
+    }
+
+    @Test
+    public void testInsert() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(3, new DeliveryType(3));
+        tracker.put(5, new DeliveryType(5));
+        tracker.put(9, new DeliveryType(9));
+        tracker.put(7, new DeliveryType(7));
+        tracker.put(-1, new DeliveryType(-1));
+
+        assertEquals(8, tracker.size());
+    }
+
+    @Test
+    public void testInsertUnsignedInteger() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(UnsignedInteger.valueOf(0), new DeliveryType(0));
+        tracker.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        tracker.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+        tracker.put(UnsignedInteger.valueOf(3), new DeliveryType(3));
+        tracker.put(UnsignedInteger.valueOf(5), new DeliveryType(5));
+        tracker.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+        tracker.put(UnsignedInteger.valueOf(7), new DeliveryType(7));
+        tracker.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+        assertEquals(8, tracker.size());
+    }
+
+    @Test
+    public void testPutAll() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        Map<UnsignedInteger, DeliveryType> hashmap = new TreeMap<>();
+
+        hashmap.put(UnsignedInteger.valueOf(0), new DeliveryType(0));
+        hashmap.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        hashmap.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+        hashmap.put(UnsignedInteger.valueOf(3), new DeliveryType(3));
+        hashmap.put(UnsignedInteger.valueOf(5), new DeliveryType(5));
+        hashmap.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+        hashmap.put(UnsignedInteger.valueOf(7), new DeliveryType(7));
+        hashmap.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+        tracker.putAll(hashmap);
+
+        assertEquals(8, tracker.size());
+
+        assertEquals(new DeliveryType(0), tracker.get(0));
+        assertEquals(new DeliveryType(1), tracker.get(1));
+        assertEquals(new DeliveryType(2), tracker.get(2));
+        assertEquals(new DeliveryType(3), tracker.get(3));
+        assertEquals(new DeliveryType(5), tracker.get(5));
+        assertEquals(new DeliveryType(9), tracker.get(9));
+        assertEquals(new DeliveryType(7), tracker.get(7));
+        assertEquals(new DeliveryType(-1), tracker.get(-1));
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(UnsignedInteger.valueOf(0), new DeliveryType(0));
+        tracker.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        tracker.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+        tracker.put(UnsignedInteger.valueOf(3), new DeliveryType(3));
+        tracker.put(UnsignedInteger.valueOf(5), new DeliveryType(5));
+        tracker.put(UnsignedInteger.valueOf(7), new DeliveryType(7));
+        tracker.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+        tracker.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+        assertEquals(8, tracker.size());
+
+        assertEquals(new DeliveryType(0), tracker.get(0));
+        assertEquals(new DeliveryType(1), tracker.get(1));
+        assertEquals(new DeliveryType(2), tracker.get(2));
+        assertEquals(new DeliveryType(3), tracker.get(3));
+        assertEquals(new DeliveryType(5), tracker.get(5));
+        assertEquals(new DeliveryType(7), tracker.get(7));
+        assertEquals(new DeliveryType(9), tracker.get(9));
+        assertEquals(new DeliveryType(-1), tracker.get(-1));
+
+        assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(0), new DeliveryType(0)));
+        assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(1), new DeliveryType(1)));
+        assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(2), new DeliveryType(2)));
+        assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(3), new DeliveryType(3)));
+        assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(5), new DeliveryType(5)));
+        assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(7), new DeliveryType(7)));
+        assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(9), new DeliveryType(9)));
+        assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(-1), new DeliveryType(-1)));
+
+        assertEquals(8, tracker.size());
+
+        assertEquals(new DeliveryType(0), tracker.get(0));
+        assertEquals(new DeliveryType(1), tracker.get(1));
+        assertEquals(new DeliveryType(2), tracker.get(2));
+        assertEquals(new DeliveryType(3), tracker.get(3));
+        assertEquals(new DeliveryType(5), tracker.get(5));
+        assertEquals(new DeliveryType(7), tracker.get(7));
+        assertEquals(new DeliveryType(9), tracker.get(9));
+        assertEquals(new DeliveryType(-1), tracker.get(-1));
+    }
+
+    @Test
+    public void testAddedDeliveriesUpdatesSizeValue() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        DeliveryType delivery1 = new DeliveryType(0);
+        DeliveryType delivery2 = new DeliveryType(1);
+
+        tracker.put(delivery1.getDeliveryId(), delivery1);
+        assertEquals(1, tracker.size());
+
+        tracker.put(delivery2.getDeliveryId(), delivery2);
+        assertEquals(2, tracker.size());
+    }
+
+    @Test
+    public void testAddThenRemoveDelivery() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        DeliveryType delivery1 = new DeliveryType(127);
+        DeliveryType delivery2 = new DeliveryType(32);
+
+        tracker.put(delivery1.getDeliveryId(), delivery1);
+        assertEquals(1, tracker.size());
+        tracker.remove(delivery1.getDeliveryId());
+        assertEquals(0, tracker.size());
+
+        tracker.put(delivery2.getDeliveryId(), delivery2);
+        assertEquals(1, tracker.size());
+        tracker.remove(delivery2.getDeliveryId());
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testAddThenRemoveMultipleDeliveriesInSequence() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        DeliveryType delivery1 = new DeliveryType(Integer.MAX_VALUE);
+        DeliveryType delivery2 = new DeliveryType(-1);
+
+        tracker.put(delivery1.getDeliveryId(), delivery1);
+        tracker.put(delivery2.getDeliveryId(), delivery2);
+
+        assertEquals(2, tracker.size());
+        assertNotNull(tracker.remove(delivery1.getDeliveryId()));
+        assertEquals(1, tracker.size());
+        assertNotNull(tracker.remove(delivery2.getDeliveryId()));
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testAddThenClearMultipleDeliveriesAddedInSequence() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        DeliveryType delivery1 = new DeliveryType(0);
+        DeliveryType delivery2 = new DeliveryType(1);
+        DeliveryType delivery3 = new DeliveryType(2);
+        DeliveryType delivery4 = new DeliveryType(3);
+
+        tracker.put(delivery1.getDeliveryId(), delivery1);
+        tracker.put(delivery2.getDeliveryId(), delivery2);
+        tracker.put(delivery3.getDeliveryId(), delivery3);
+        tracker.put(delivery4.getDeliveryId(), delivery4);
+
+        assertEquals(4, tracker.size());
+        tracker.clear();
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testGetOneDeliveryInBetweenOthersThatWereAdded() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        DeliveryType delivery1 = new DeliveryType(0);
+        DeliveryType delivery2 = new DeliveryType(1);
+        DeliveryType delivery3 = new DeliveryType(2);
+        DeliveryType delivery4 = new DeliveryType(3);
+
+        tracker.put(delivery1.getDeliveryId(), delivery1);
+        tracker.put(delivery2.getDeliveryId(), delivery2);
+        tracker.put(delivery3.getDeliveryId(), delivery3);
+        tracker.put(delivery4.getDeliveryId(), delivery4);
+
+        assertEquals(4, tracker.size());
+        assertEquals(delivery3, tracker.get(delivery3.getDeliveryId()));
+        assertEquals(4, tracker.size());
+    }
+
+    @Test
+    public void testAddLargeSeriesOfDeliveriesAndThenEnumerateOverThemWithGet() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        final int COUNT = 4080;
+
+        for (int i = 0; i < COUNT; ++i) {
+            tracker.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(COUNT, tracker.size());
+
+        for (int i = 0; i < COUNT; ++i) {
+            assertEquals(i, tracker.get(i).getDeliveryId());
+        }
+    }
+
+    @Test
+    public void testAddLargeSeriesOfDeliveriesAndThenIterateOverThemWithValues() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        final int COUNT = 4080;
+
+        for (int i = 0; i < COUNT; ++i) {
+            tracker.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(COUNT, tracker.size());
+
+        Collection<DeliveryType> values = tracker.values();
+
+        int index = 0;
+
+        for (DeliveryType delivery : values) {
+            assertEquals(index++, delivery.getDeliveryId());
+        }
+
+        assertEquals(index, COUNT);
+    }
+
+    @Test
+    public void testRemoveAllViaIteration() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        final int COUNT = 16;
+
+        for (int i = 0; i < COUNT; ++i) {
+            tracker.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(COUNT, tracker.size());
+
+        Collection<DeliveryType> values = tracker.values();
+        assertEquals(COUNT, values.size());
+        Iterator<DeliveryType> iter = values.iterator();
+
+        int index = 0;
+
+        while (iter.hasNext()) {
+            assertEquals(index++, iter.next().getDeliveryId());
+            iter.remove();
+        }
+
+        assertEquals(index, COUNT);
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testAddLargeSeriesOfDeliveriesAndThenRemoveAllViaIteration() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        final int COUNT = 4080;
+
+        for (int i = 0; i < COUNT; ++i) {
+            tracker.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(COUNT, tracker.size());
+
+        Collection<DeliveryType> values = tracker.values();
+        assertEquals(COUNT, values.size());
+        Iterator<DeliveryType> iter = values.iterator();
+
+        int index = 0;
+
+        while (iter.hasNext()) {
+            assertEquals(index++, iter.next().getDeliveryId());
+            iter.remove();
+        }
+
+        assertEquals(index, COUNT);
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testIteratorRemoveInChunks() {
+        UnsettledMap<DeliveryType> tracker = createMap(3, 6);
+        assertEquals(0, tracker.size());
+
+        final int COUNT = 18;
+
+        for (int i = 0; i < COUNT; ++i) {
+            tracker.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(COUNT, tracker.size());
+
+        Collection<DeliveryType> values = tracker.values();
+        assertEquals(COUNT, values.size());
+        Iterator<DeliveryType> iter = values.iterator();
+
+        int index = 0;
+        int count = 0;
+
+        while (iter.hasNext()) {
+            assertEquals(index++, iter.next().getDeliveryId());
+
+            if (count++ < COUNT / 6) {
+                iter.remove();
+            }
+
+            if (count == 6) {
+                count = 0;
+            }
+        }
+
+        assertEquals(index, COUNT);
+        assertEquals(COUNT / 2, tracker.size());
+    }
+
+    @Test
+    public void testRemoveUsingIteratorFromFullMiddleBucket() {
+        UnsettledMap<DeliveryType> tracker = createMap(3, 6);
+        assertEquals(0, tracker.size());
+
+        final int COUNT = 18;
+
+        for (int i = 0; i < COUNT; ++i) {
+            tracker.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(COUNT, tracker.size());
+
+        // Remove enough from front and back buckets that
+        // a drain of the middle should compress the chain
+
+        // Front
+        tracker.remove(0);
+        tracker.remove(1);
+        tracker.remove(2);
+        // Back
+        tracker.remove(15);
+        tracker.remove(16);
+        tracker.remove(17);
+
+        Collection<DeliveryType> values = tracker.values();
+        Iterator<DeliveryType> iter = values.iterator();
+
+        // Skip elements from first section
+        iter.next();
+        iter.next();
+        iter.next();
+
+        for (int i = 6; i < 12; ++i) {
+            assertEquals(i, iter.next().getDeliveryId());
+            iter.remove();
+        }
+
+        assertEquals(6, tracker.size());
+    }
+
+    @Test
+    public void testForEachDeliveryIteratesOverLargeSeriesOfDeliveries() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        final int COUNT = 4080;
+
+        for (int i = 0; i < COUNT; ++i) {
+            tracker.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(COUNT, tracker.size());
+
+        final AtomicInteger index = new AtomicInteger();
+
+        tracker.forEach((delivery) -> index.incrementAndGet());
+
+        assertEquals(index.get(), COUNT);
+    }
+
+    @Test
+    public void testRangedForEachDeliveryIteratesOverSmallSeriesOfDeliveries() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        final int COUNT = 512;
+
+        for (int i = 0; i < COUNT; ++i) {
+            tracker.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(COUNT, tracker.size());
+
+        final AtomicInteger index = new AtomicInteger();
+
+        tracker.forEach(260, 262, (delivery) -> index.incrementAndGet());
+
+        assertEquals(index.get(), 3);
+    }
+
+    @Test
+    public void testRangedForEachDeliveryIteratesSeriesWhenValuesOverflowIntRange() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        assertEquals(0, tracker.size());
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(Integer.MAX_VALUE, new DeliveryType(Integer.MAX_VALUE));
+        tracker.put(Integer.MAX_VALUE + 1, new DeliveryType(Integer.MAX_VALUE + 1));
+        tracker.put(Integer.MAX_VALUE + 2, new DeliveryType(Integer.MAX_VALUE + 2));
+        tracker.put(Integer.MAX_VALUE + 3, new DeliveryType(Integer.MAX_VALUE + 3));
+        tracker.put(Integer.MAX_VALUE + 4, new DeliveryType(Integer.MAX_VALUE + 4));
+
+        final AtomicInteger index = new AtomicInteger();
+
+        tracker.forEach(Integer.MAX_VALUE, Integer.MAX_VALUE + 2, (delivery) -> index.incrementAndGet());
+
+        assertEquals(index.get(), 3);
+    }
+
+    @Test
+    public void testValuesCollection() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(3, new DeliveryType(3));
+
+        Collection<DeliveryType> values = tracker.values();
+        assertNotNull(values);
+        assertEquals(4, values.size());
+        assertFalse(values.isEmpty());
+        assertSame(values, tracker.values());
+    }
+
+    @Test
+    public void testValuesIteration() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] intValues = {0, 1, 2, 3};
+
+        for (int entry : intValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Collection<DeliveryType> values = tracker.values();
+        Iterator<DeliveryType> iterator = values.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            assertEquals(new DeliveryType(intValues[counter++]), iterator.next());
+        }
+
+        // Check that we really did iterate.
+        assertEquals(intValues.length, counter);
+    }
+
+    @Test
+    public void testValuesIterationRemove() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] intValues = {0, 1, 2, 3};
+
+        for (int entry : intValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Collection<DeliveryType> values = tracker.values();
+        Iterator<DeliveryType> iterator = values.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            assertEquals(new DeliveryType(intValues[counter++]), iterator.next());
+            iterator.remove();
+        }
+
+        // Check that we really did iterate.
+        assertEquals(intValues.length, counter);
+        assertTrue(tracker.isEmpty());
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testValuesIterationFollowUnsignedOrderingExpectations() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] inputValues = {3, 0, -1, 1, -2, 2};
+
+        for (int entry : inputValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Collection<DeliveryType> values = tracker.values();
+        Iterator<DeliveryType> iterator = values.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            assertEquals(new DeliveryType(inputValues[counter++]), iterator.next());
+        }
+
+        // Check that we really did iterate.
+        assertEquals(inputValues.length, counter);
+    }
+
+    @Test
+    public void testValuesIterationFailsWhenConcurrentlyModified() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] inputValues = {1, 2, 3, 5, 7, 9, 11};
+
+        for (int entry : inputValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Collection<DeliveryType> values = tracker.values();
+        Iterator<DeliveryType> iterator = values.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        tracker.remove(3);
+
+        try {
+            iterator.next();
+            fail("Should not iterate when modified outside of iterator");
+        } catch (ConcurrentModificationException cme) {}
+    }
+
+    @Test
+    public void testValuesIterationOnEmptyTree() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        Collection<DeliveryType> values = tracker.values();
+        Iterator<DeliveryType> iterator = values.iterator();
+
+        assertFalse(iterator.hasNext());
+        try {
+            iterator.next();
+            fail("Should have thrown a NoSuchElementException");
+        } catch (NoSuchElementException nse) {
+        }
+    }
+
+    @Test
+    public void testKeySetReturned() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(3, new DeliveryType(3));
+
+        Set<UnsignedInteger> keys = tracker.keySet();
+        assertNotNull(keys);
+        assertEquals(4, keys.size());
+        assertFalse(keys.isEmpty());
+        assertSame(keys, tracker.keySet());
+    }
+
+    @Test
+    public void testKeysIterationRemove() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] intValues = {0, 1, 2, 3};
+
+        for (int entry : intValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Collection<UnsignedInteger> keys = tracker.keySet();
+        Iterator<UnsignedInteger> iterator = keys.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            assertEquals(UnsignedInteger.valueOf(intValues[counter++]), iterator.next());
+        }
+
+        // Check that we really did iterate.
+        assertEquals(intValues.length, counter);
+    }
+
+    @Test
+    public void testKeysIterationRemoveContract() {
+        Set<UnsignedInteger> set = tracker.keySet();
+        Iterator<UnsignedInteger> iter = set.iterator();
+        iter.next();
+        iter.remove();
+
+        // No remove allowed again until next is called
+        assertThrows(IllegalStateException.class, () -> iter.remove());
+
+        iter.next();
+        iter.remove();
+
+        assertEquals(998, tracker.size());
+
+        iter.next();
+        assertNotNull(tracker.remove(999));
+
+        assertThrows(ConcurrentModificationException.class, () -> iter.remove());
+    }
+
+    @Test
+    public void testKeysIteration() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] intValues = {0, 1, 2, 3};
+
+        for (int entry : intValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Collection<UnsignedInteger> keys = tracker.keySet();
+        Iterator<UnsignedInteger> iterator = keys.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            assertEquals(UnsignedInteger.valueOf(intValues[counter++]), iterator.next());
+            iterator.remove();
+        }
+
+        // Check that we really did iterate.
+        assertEquals(intValues.length, counter);
+        assertTrue(tracker.isEmpty());
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testKeysIterationFollowsUnsignedOrderingExpectations() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] inputValues = {3, 0, -1, 1, -2, 2};
+
+        for (int entry : inputValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Collection<UnsignedInteger> keys = tracker.keySet();
+        Iterator<UnsignedInteger> iterator = keys.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            assertEquals(UnsignedInteger.valueOf(inputValues[counter++]), iterator.next());
+        }
+
+        // Check that we really did iterate.
+        assertEquals(inputValues.length, counter);
+    }
+
+    @Test
+    public void testKeysIterationFailsWhenConcurrentlyModified() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] inputValues = {1, 3, 5, 7, 9, 11, 13};
+
+        for (int entry : inputValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Collection<UnsignedInteger> keys = tracker.keySet();
+        Iterator<UnsignedInteger> iterator = keys.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        tracker.remove(3);
+
+        try {
+            iterator.next();
+            fail("Should not iterate when modified outside of iterator");
+        } catch (ConcurrentModificationException cme) {}
+    }
+
+    @Test
+    public void testKeysIterationOnEmptyTree() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        Collection<UnsignedInteger> keys = tracker.keySet();
+        Iterator<UnsignedInteger> iterator = keys.iterator();
+
+        assertFalse(iterator.hasNext());
+        try {
+            iterator.next();
+            fail("Should have thrown a NoSuchElementException");
+        } catch (NoSuchElementException nse) {
+        }
+    }
+
+    @Test
+    public void testKeySetRemoveAllFromCollection() {
+        final Collection<UnsignedInteger> collection = Arrays.asList(uintArray);
+
+        assertTrue(tracker.keySet().removeAll(collection));
+        assertEquals(0, tracker.size());
+        assertFalse(tracker.keySet().iterator().hasNext());
+
+        // Second attempt should do nothing.
+        assertFalse(tracker.keySet().removeAll(collection));
+    }
+
+    @Test
+    public void testKeySetRetainAllFromCollectionAtZero() {
+        doTestKeySetRetainAllFromCollection(0);
+    }
+
+    @Test
+    public void testKeySetRetainAllFromCollectionAtOne() {
+        doTestKeySetRetainAllFromCollection(1);
+    }
+
+    @Test
+    public void testKeySetRetainAllFromCollectionAtTwoHundered() {
+        doTestKeySetRetainAllFromCollection(200);
+    }
+
+    @Test
+    public void testKeySetRetainAllFromCollectionAtFiveHundred() {
+        doTestKeySetRetainAllFromCollection(500);
+    }
+
+    private void doTestKeySetRetainAllFromCollection(int index) {
+        final Collection<UnsignedInteger> collection = new ArrayList<>();
+        collection.add(uintArray[index]);
+
+        assertEquals(1000, tracker.size());
+
+        final Set<UnsignedInteger> keys = tracker.keySet();
+
+        keys.retainAll(collection);
+        assertEquals(1, tracker.size());
+        keys.removeAll(collection);
+        assertEquals(0, tracker.size());
+        tracker.put(1, new DeliveryType(1));
+        assertEquals(1, tracker.size());
+        keys.clear();
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void TestKeySetRetainAllFromCollectionWhenMapHasCustomBucketsAndRetainedIsInFirstBucket() {
+        // Start with three buckets of size three
+        UnsettledMap<DeliveryType> tracker = createMap(3, 3);
+
+        tracker.put(1, new DeliveryType(1)); // First
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(3, new DeliveryType(3));
+        tracker.put(4, new DeliveryType(4)); // Second
+        tracker.put(5, new DeliveryType(5));
+        tracker.put(6, new DeliveryType(6));
+        tracker.put(7, new DeliveryType(7)); // Third
+        tracker.put(8, new DeliveryType(8));
+        tracker.put(9, new DeliveryType(9));
+
+        assertEquals(9, tracker.size());
+
+        final Collection<UnsignedInteger> collection = new ArrayList<>();
+        collection.add(UnsignedInteger.valueOf(1));  // Retain element from bucket one
+
+        final Set<UnsignedInteger> keys = tracker.keySet();
+
+        keys.retainAll(collection);
+        assertEquals(1, tracker.size());
+        keys.removeAll(collection);
+        assertEquals(0, tracker.size());
+        tracker.put(1, new DeliveryType(1));
+        assertEquals(1, tracker.size());
+        keys.clear();
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void TestKeySetRetainAllFromCollectionWhenMapHasCustomBucketsAndRetainedIsInLastBucket() {
+        // Start with three buckets of size three
+        UnsettledMap<DeliveryType> tracker = createMap(3, 3);
+
+        tracker.put(1, new DeliveryType(1)); // First
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(3, new DeliveryType(3));
+        tracker.put(4, new DeliveryType(4)); // Second
+        tracker.put(5, new DeliveryType(5));
+        tracker.put(6, new DeliveryType(6));
+        tracker.put(7, new DeliveryType(7)); // Third
+        tracker.put(8, new DeliveryType(8));
+        tracker.put(9, new DeliveryType(9));
+
+        assertEquals(9, tracker.size());
+
+        final Collection<UnsignedInteger> collection = new ArrayList<>();
+        collection.add(UnsignedInteger.valueOf(7));  // Retain element from bucket three
+
+        final Set<UnsignedInteger> keys = tracker.keySet();
+
+        keys.retainAll(collection);
+        assertEquals(1, tracker.size());
+        keys.removeAll(collection);
+        assertEquals(0, tracker.size());
+        tracker.put(1, new DeliveryType(1));
+        assertEquals(1, tracker.size());
+        keys.clear();
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void tesEntrySetReturned() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(3, new DeliveryType(3));
+
+        Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+        assertNotNull(entries);
+        assertEquals(4, entries.size());
+        assertFalse(entries.isEmpty());
+        assertSame(entries, tracker.entrySet());
+    }
+
+    @Test
+    public void tesEntrySetContains() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        tracker.put(0, new DeliveryType(0));
+        tracker.put(1, new DeliveryType(1));
+        tracker.put(2, new DeliveryType(2));
+        tracker.put(3, new DeliveryType(3));
+
+        Set<Entry<UnsignedInteger, DeliveryType>> entries = tracker.entrySet();
+        assertNotNull(entries);
+        assertEquals(4, entries.size());
+        assertFalse(entries.isEmpty());
+        assertSame(entries, tracker.entrySet());
+
+        OutsideEntry<UnsignedInteger, DeliveryType> entry1 = new OutsideEntry<>(UnsignedInteger.valueOf(0), new DeliveryType(0));
+        OutsideEntry<UnsignedInteger, DeliveryType> entry2 = new OutsideEntry<>(UnsignedInteger.valueOf(7), new DeliveryType(7));
+
+        assertTrue(entries.contains(entry1));
+        assertFalse(entries.contains(entry2));
+    }
+
+    @Test
+    public void testEntryIteration() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] intValues = {0, 1, 2, 3};
+
+        for (int entry : intValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+        Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            Entry<UnsignedInteger, DeliveryType> entry = iterator.next();
+            assertNotNull(entry);
+            assertEquals(UnsignedInteger.valueOf(intValues[counter]), entry.getKey());
+            assertEquals(new DeliveryType(intValues[counter++]), entry.getValue());
+        }
+
+        // Check that we really did iterate.
+        assertEquals(intValues.length, counter);
+    }
+
+    @Test
+    public void testEntryIterationRemove() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] intValues = {0, 1, 2, 3};
+
+        for (int entry : intValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+        Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            Entry<UnsignedInteger, DeliveryType> entry = iterator.next();
+            assertNotNull(entry);
+            assertEquals(UnsignedInteger.valueOf(intValues[counter]), entry.getKey());
+            assertEquals(new DeliveryType(intValues[counter++]), entry.getValue());
+            iterator.remove();
+        }
+
+        // Check that we really did iterate.
+        assertEquals(intValues.length, counter);
+        assertTrue(tracker.isEmpty());
+        assertEquals(0, tracker.size());
+    }
+
+    @Test
+    public void testEntryIterationFollowsInterstionOrderingExpectations() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] inputValues = {3, 0, -1, 1, -2, 2};
+
+        for (int entry : inputValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+        Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        int counter = 0;
+        while (iterator.hasNext()) {
+            Entry<UnsignedInteger, DeliveryType> entry = iterator.next();
+            assertNotNull(entry);
+            assertEquals(UnsignedInteger.valueOf(inputValues[counter]), entry.getKey());
+            assertEquals(new DeliveryType(inputValues[counter++]), entry.getValue());
+        }
+
+        // Check that we really did iterate.
+        assertEquals(inputValues.length, counter);
+    }
+
+    @Test
+    public void testEntryIterationFailsWhenConcurrentlyModified() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] inputValues = {2, 3, 5, 9, 12, 42};
+
+        for (int entry : inputValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+        Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+        assertNotNull(iterator);
+        assertTrue(iterator.hasNext());
+
+        tracker.remove(3);
+
+        try {
+            iterator.next();
+            fail("Should not iterate when modified outside of iterator");
+        } catch (ConcurrentModificationException cme) {}
+    }
+
+    @Test
+    public void testEntrySetIterationOnEmptyTree() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+        Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+        Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+
+        assertFalse(iterator.hasNext());
+        try {
+            iterator.next();
+            fail("Should have thrown a NoSuchElementException");
+        } catch (NoSuchElementException nse) {
+        }
+    }
+
+    @Test
+    public void testForEachEntry() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] inputValues = {3, 0, -1, 1, -2, 2};
+
+        for (int entry : inputValues) {
+            tracker.put(entry, new DeliveryType(entry));
+        }
+
+        final SequenceNumber index = new SequenceNumber(0);
+        tracker.forEach((value) -> {
+            int i = index.getAndIncrement().intValue();
+            assertEquals(new DeliveryType(inputValues[i]), value);
+        });
+
+        assertEquals(index.intValue(), inputValues.length);
+    }
+
+    @Test
+    public void testRandomProduceAndConsumeWithBacklog() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int ITERATIONS = 8192;
+
+        try {
+            for (int i = 0; i < ITERATIONS; ++i) {
+                tracker.put(UnsignedInteger.valueOf(i), new DeliveryType(i));
+            }
+
+            for (int i = 0; i < ITERATIONS; ++i) {
+                int p = random.nextInt(ITERATIONS);
+                int c = random.nextInt(ITERATIONS);
+
+                tracker.put(UnsignedInteger.valueOf(p), new DeliveryType(p));
+                tracker.remove(UnsignedInteger.valueOf(c));
+            }
+        } catch (Throwable error) {
+            dumpRandomDataSet(ITERATIONS, true);
+            throw error;
+        }
+    }
+
+    @Test
+    public void testRandomPutAndGetIntoEmptyMap() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int ITERATIONS = 8192;
+
+        try {
+            for (int i = 0; i < ITERATIONS; ++i) {
+                int p = random.nextInt(ITERATIONS);
+                int c = random.nextInt(ITERATIONS);
+
+                tracker.put(UnsignedInteger.valueOf(p), new DeliveryType(p));
+                tracker.remove(UnsignedInteger.valueOf(c));
+            }
+        } catch (Throwable error) {
+            dumpRandomDataSet(ITERATIONS, true);
+            throw error;
+        }
+    }
+
+    @Test
+    public void testRandomPutAndGetIntoEmptyMapWithCustomBucketSize() {
+        UnsettledMap<DeliveryType> tracker = createMap(2, 4);
+
+        final int ITERATIONS = 8192;
+
+        try {
+            for (int i = 0; i < ITERATIONS; ++i) {
+                int p = random.nextInt(ITERATIONS);
+                int c = random.nextInt(ITERATIONS);
+
+                tracker.put(UnsignedInteger.valueOf(p), new DeliveryType(p));
+                tracker.remove(UnsignedInteger.valueOf(c));
+            }
+        } catch (Throwable error) {
+            dumpRandomDataSet(ITERATIONS, true);
+            throw error;
+        }
+    }
+
+    @Test
+    public void testPutEntriesWithDuplicateIdsIntoMapThenRemoveInSameOrder() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] ids = new int[] { 4, 7, 5, 0, 7, 7, 0 };
+
+        for(int id : ids) {
+            tracker.put(id, new DeliveryType(id));
+        }
+
+        for (int id : ids) {
+            assertEquals(new DeliveryType(id), tracker.get(id));
+        }
+
+        for (int id : ids) {
+            assertEquals(new DeliveryType(id), tracker.remove(id));
+        }
+
+        assertTrue(tracker.isEmpty());
+    }
+
+    @Test
+    public void testPutThatBreaksOrderLeavesMapUsable() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int[] ids = new int[] { 1, 2, 0 };
+
+        for(int id : ids) {
+            tracker.put(id, new DeliveryType(id));
+        }
+
+        for (int id : ids) {
+            assertEquals(new DeliveryType(id), tracker.get(id));
+        }
+
+        for (int id : ids) {
+            assertEquals(new DeliveryType(id), tracker.remove(id));
+        }
+
+        assertTrue(tracker.isEmpty());
+    }
+
+    @Test
+    public void testPutInSeriesAndRemoveAllValuesRandomly() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        List<UnsignedInteger> values = new ArrayList<>();
+        List<UnsignedInteger> removes = new ArrayList<>();
+
+        final int ITERATIONS = 8192;
+
+        for (int i = 0; i < ITERATIONS; ++i) {
+            values.add(UnsignedInteger.valueOf(i));
+        }
+
+        removes.addAll(values);
+        Collections.shuffle(removes, random);
+
+        try {
+            for (UnsignedInteger id : values) {
+                tracker.put(id, new DeliveryType(id.intValue()));
+            }
+
+            assertEquals(ITERATIONS, tracker.size());
+
+            for (UnsignedInteger id : values) {
+                assertEquals(new DeliveryType(id.intValue()), tracker.get(id));
+            }
+
+            for (UnsignedInteger id : removes) {
+                assertEquals(new DeliveryType(id.intValue()), tracker.remove(id));
+            }
+
+            assertTrue(tracker.isEmpty());
+        } catch (Throwable error) {
+            dumpRandomDataSet(ITERATIONS, true);
+            throw error;
+        }
+    }
+
+    @Test
+    public void testPutInRandomOrderAndRemoveAllValuesInSeries() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        List<UnsignedInteger> values = new ArrayList<>();
+        List<UnsignedInteger> removes = new ArrayList<>();
+
+        final int ITERATIONS = 8192;
+
+        for (int i = 0; i < ITERATIONS; ++i) {
+            values.add(UnsignedInteger.valueOf(i));
+        }
+
+        removes.addAll(values);
+        Collections.shuffle(values, random);
+
+        try {
+            for (UnsignedInteger id : values) {
+                tracker.put(id, new DeliveryType(id.intValue()));
+            }
+
+            assertEquals(ITERATIONS, tracker.size());
+
+            for (UnsignedInteger id : values) {
+                assertEquals(new DeliveryType(id.intValue()), tracker.get(id));
+            }
+
+            for (UnsignedInteger id : removes) {
+                assertEquals(new DeliveryType(id.intValue()), tracker.remove(id));
+            }
+
+            assertTrue(tracker.isEmpty());
+        } catch (Throwable error) {
+            dumpRandomDataSet(ITERATIONS, true);
+            throw error;
+        }
+    }
+
+    @Test
+    public void testPutInRandomOrderAndRemoveAllValuesInRandomOrder() {
+        UnsettledMap<DeliveryType> tracker = createMap();
+
+        List<UnsignedInteger> values = new ArrayList<>();
+        List<UnsignedInteger> removes = new ArrayList<>();
+
+        final int ITERATIONS = 8192;
+
+        for (int i = 0; i < ITERATIONS; ++i) {
+            values.add(UnsignedInteger.valueOf(i));
+        }
+
+        removes.addAll(values);
+        Collections.shuffle(values, random);
+        Collections.shuffle(removes, random);
+
+        try {
+            for (UnsignedInteger id : values) {
+                tracker.put(id, new DeliveryType(id.intValue()));
+            }
+
+            assertEquals(ITERATIONS, tracker.size());
+
+            for (UnsignedInteger id : values) {
+                assertEquals(new DeliveryType(id.intValue()), tracker.get(id));
+            }
+
+            for (UnsignedInteger id : removes) {
+                assertEquals(new DeliveryType(id.intValue()), tracker.remove(id));
+            }
+
+            assertTrue(tracker.isEmpty());
+        } catch (Throwable error) {
+            dumpRandomDataSet(ITERATIONS, true);
+            throw error;
+        }
+    }
+
+    @Test
+    public void testPutRandomValueIntoMapThenRemoveInSameOrder() {
+        final UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int ITERATIONS = 8192;
+
+        try {
+            for (int i = 0; i < ITERATIONS; ++i) {
+                final int index = random.nextInt(ITERATIONS);
+                tracker.put(index, new DeliveryType(index));
+            }
+
+            // Reset to verify insertions
+            random.setSeed(seed);
+
+            for (int i = 0; i < ITERATIONS; ++i) {
+                final int index = random.nextInt(ITERATIONS);
+                assertEquals(new DeliveryType(index), tracker.get(index));
+            }
+
+            // Reset to remove
+            random.setSeed(seed);
+
+            for (int i = 0; i < ITERATIONS; ++i) {
+                final int index = random.nextInt(ITERATIONS);
+                assertEquals(new DeliveryType(index), tracker.remove(index));
+            }
+
+            assertTrue(tracker.isEmpty());
+        } catch (Throwable error) {
+            dumpRandomDataSet(ITERATIONS, true);
+            throw error;
+        }
+    }
+
+    @Test
+    public void testPutInSeriesAndClear() {
+        final UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int LOOPS = 16;
+        final int ITERATIONS = 8192;
+
+        int putDeliveryId = 0;
+        int getDeliveryId = 0;
+
+        for (int loop = 0; loop < LOOPS; loop++) {
+            try {
+                for (int i = 0; i < ITERATIONS; ++i, putDeliveryId++) {
+                    tracker.put(putDeliveryId, new DeliveryType(putDeliveryId));
+                }
+
+                for (int i = 0; i < ITERATIONS; ++i, getDeliveryId++) {
+                    assertEquals(new DeliveryType(getDeliveryId), tracker.get(getDeliveryId));
+                }
+
+                tracker.clear();
+
+                assertTrue(tracker.isEmpty());
+            } catch (Throwable error) {
+                dumpRandomDataSet(ITERATIONS, true);
+                throw error;
+            }
+        }
+    }
+
+    @Test
+    public void testPutInSeriesAndRemoveInSeries() {
+        final UnsettledMap<DeliveryType> tracker = createMap();
+
+        final int LOOPS = 16;
+        final int ITERATIONS = 8192;
+
+        int putDeliveryId = 0;
+        int getDeliveryId = 0;
+        int removeDeliveryId = 0;
+
+        for (int loop = 0; loop < LOOPS; loop++) {
+            try {
+                for (int i = 0; i < ITERATIONS; ++i, putDeliveryId++) {
+                    tracker.put(putDeliveryId, new DeliveryType(putDeliveryId));
+                }
+
+                for (int i = 0; i < ITERATIONS; ++i, getDeliveryId++) {
+                    assertEquals(new DeliveryType(getDeliveryId), tracker.get(getDeliveryId));
+                }
+
+                for (int i = 0; i < ITERATIONS; ++i, removeDeliveryId++) {
+                    assertEquals(new DeliveryType(removeDeliveryId), tracker.remove(removeDeliveryId));
+                }
+
+                assertTrue(tracker.isEmpty());
+            } catch (Throwable error) {
+                dumpRandomDataSet(ITERATIONS, true);
+                throw error;
+            }
+        }
+    }
+
+    @Test
+    public void testEqualsJDKMapTypes() {
+        Map<UnsignedInteger, DeliveryType> m1 = createMap();
+        Map<UnsignedInteger, DeliveryType> m2 = createMap();
+
+        m1.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        m1.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+        m2.put(UnsignedInteger.valueOf(3), new DeliveryType(3));
+        m2.put(UnsignedInteger.valueOf(4), new DeliveryType(4));
+
+        assertNotEquals(m1, m2, "Maps should not be equal 1");
+        assertNotEquals(m2, m1, "Maps should not be equal 2");
+
+        // comparing UnsettledMap3 with HashMap with equal values
+        m1 = createMap();
+        m2 = new HashMap<>();
+        m1.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        m2.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+        assertNotEquals(m1, m2, "Maps should not be equal 3");
+        assertNotEquals(m2, m1, "Maps should not be equal 4");
+
+        // comparing UnsettledMap3 with differing objects inside values
+        m1 = createMap();
+        m2 = createMap();
+        m1.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        m2.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+        assertNotEquals(m1, m2, "Maps should not be equal 5");
+        assertNotEquals(m2, m1, "Maps should not be equal 6");
+
+        // comparing UnsettledMap3 with same objects inside values
+        m1 = createMap();
+        m2 = createMap();
+        m1.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        m2.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        assertTrue(m1.equals(m2), "Maps should be equal 7");
+        assertTrue(m2.equals(m1), "Maps should be equal 7");
+    }
+
+    @Test
+    public void testEntrySetContains() {
+        UnsettledMap<DeliveryType> first = createMap();
+        UnsettledMap<DeliveryType> second = createMap();
+
+        first.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        Object[] entry = first.entrySet().toArray();
+        assertFalse(second.entrySet().contains(entry[0]),
+            "Empty map should not contain anything from first map");
+
+        second.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        assertTrue(second.entrySet().containsAll(first.entrySet()),
+            "entrySet().containsAll(...) should work with values");
+
+        first.clear();
+        first.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+        entry = first.entrySet().toArray();
+        assertTrue(second.entrySet().contains(entry[0]),
+            "new valued entry with same delivery ID should equal old valued entry");
+        first.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+        entry = first.entrySet().toArray();
+        assertFalse(second.entrySet().contains(entry[1]),
+            "additional value in first should not match any in second");
+    }
+
+    @Test
+    public void testValues() {
+        Collection<DeliveryType> vals = tracker.values();
+        vals.iterator();
+        assertEquals(vals.size(), objArray.length, "Returned collection of incorrect size");
+        for (DeliveryType element : objArray) {
+            assertTrue(vals.contains(element), "Collection contains incorrect elements");
+        }
+
+        assertEquals(1000, vals.size());
+        int j = 0;
+        for (Iterator<DeliveryType> iter = vals.iterator(); iter.hasNext(); j++) {
+            DeliveryType element = iter.next();
+            assertNotNull(element);
+        }
+        assertEquals(1000, j);
+
+        UnsettledMap<DeliveryType> myMap = new UnsettledMap<DeliveryType>(DeliveryType::getDeliveryId);
+        for (int i = 0; i < 100; i++) {
+            myMap.put(uintArray[i], objArray[i]);
+        }
+        Collection<DeliveryType> values = myMap.values();
+        assertEquals(100, values.size());
+        assertTrue(values.remove(new DeliveryType(1)));
+        assertTrue(!myMap.containsKey(UnsignedInteger.ONE), "Removing from the values collection should remove from the original map");
+        assertTrue(!myMap.containsValue(new DeliveryType(1)), "Removing from the values collection should remove from the original map");
+        assertEquals(99, values.size());
+        j = 0;
+        for (Iterator<DeliveryType> iter = values.iterator(); iter.hasNext(); j++) {
+            iter.next();
+        }
+        assertEquals(99, j);
+    }
+
+    @Test
+    public void testRemoveValueUsingValuesIteratorAndCheckAllOtherValuesRemain() {
+        Iterator<DeliveryType> iterator = tracker.values().iterator();
+
+        DeliveryType removed = null;
+
+        for (int i = 0; i < 10; ++i) {
+            removed = iterator.next();
+        }
+
+        iterator.remove();
+
+        for (UnsignedInteger id : uintArray) {
+            if (id.intValue() != removed.getDeliveryId()) {
+                assertTrue(tracker.containsKey(id.intValue()));
+            } else {
+                assertFalse(tracker.containsKey(id.intValue()));
+            }
+        }
+    }
+
+    @Test
+    public void testRemoveRangeRemovesNoValues() {
+        final int afterLast = uintArray.length + 1; // Entries are one based
+
+        final AtomicBoolean removed = new AtomicBoolean();
+
+        tracker.removeEach(afterLast, afterLast + 10, (delivery) -> removed.set(true));
+
+        assertFalse(removed.get());
+    }
+
+    @Test
+    public void testRemoveRangeRemovesLastValue() {
+        final int lastEntry = uintArray.length; // Entries are one based
+
+        final AtomicInteger removed = new AtomicInteger();
+
+        tracker.removeEach(lastEntry, lastEntry, (delivery) -> removed.incrementAndGet());
+
+        assertEquals(1, removed.get());
+    }
+
+    @Test
+    public void testRemoveRangeRemovesLastValueAndRangeOutsideOfActualEntries() {
+        final int lastEntry = uintArray.length; // Entries are one based
+
+        final AtomicInteger removed = new AtomicInteger();
+
+        tracker.removeEach(lastEntry, lastEntry + 10, (delivery) -> removed.incrementAndGet());
+
+        assertEquals(1, removed.get());
+    }
+
+    @Test
+    public void testRemoveAllEntriesFromFirstBucket() {
+        doTestRemoveEach(0, 15);
+    }
+
+    @Test
+    public void testRemoveAllEntriesFromMiddleBucket() {
+        doTestRemoveEach(16, 31);
+    }
+
+    @Test
+    public void testRemoveAllEntriesFromEndBucket() {
+        doTestRemoveEach(32, 47);
+    }
+
+    @Test
+    public void testRemoveEntriesSpanningThreeBuckets() {
+        doTestRemoveEach(8, 39);
+    }
+
+    @Test
+    public void testRemoveAllEntriesWithClosedRange() {
+        doTestRemoveEach(0, 47);
+    }
+
+    @Test
+    public void testRemoveAllEntriesWithOpenRange() {
+        doTestRemoveEach(0, 64);
+    }
+
+    public void doTestRemoveEach(int start, int end) {
+        final int numBuckets = 3;
+        final int bucketSize = 16;
+        final int numEntries = numBuckets * bucketSize;
+        final int numRemoved = Math.min(end - start + 1, numEntries);
+
+        UnsettledMap<DeliveryType> map = createMap(numBuckets, bucketSize);
+
+        for (int i = 0; i < numEntries; ++i) {
+            map.put(i, new DeliveryType(i));
+        }
+
+        assertEquals(numEntries, map.size());
+
+        final AtomicInteger removed = new AtomicInteger();
+
+        map.removeEach(start, end, (delivery) -> removed.incrementAndGet());
+
+        assertEquals(numRemoved, removed.get());
+        assertEquals(numEntries - numRemoved, map.size());
+    }
+
+    @Test
+    public void testRemoveAllEntriesInSmallChunks() {
+        final AtomicInteger removed = new AtomicInteger();
+
+        for (int i = 0; i < uintArray.length; i += 2) {
+            tracker.removeEach(uintArray[i].intValue(), uintArray[i+1].intValue(), (delivery) -> removed.incrementAndGet());
+        }
+
+        assertEquals(uintArray.length, removed.get());
+    }
+
+    protected void dumpRandomDataSet(int iterations, boolean bounded) {
+        final int[] dataSet = new int[iterations];
+
+        random.setSeed(seed);
+
+        for (int i = 0; i < iterations; ++i) {
+            if (bounded) {
+                dataSet[i] = random.nextInt(iterations);
+            } else {
+                dataSet[i] = random.nextInt();
+            }
+        }
+
+        LOG.info("Random seed was: {}" , seed);
+        LOG.info("Entries in data set: {}", dataSet);
+    }
+
+    protected static class OutsideEntry<K, V> implements Map.Entry<K, V> {
+
+        private final K key;
+        private V value;
+
+        public OutsideEntry(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public V setValue(V value) {
+            V oldValue = this.value;
+            this.value = value;
+            return oldValue;
+        }
+
+        @Override
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public K getKey() {
+            return key;
+        }
+    }
+}