PROTON-2660 Create specialized tracking map for unsettled deliveries
Track deliveries in structure that creates little to no garbage compared
to the tree based strucutre currently used.
diff --git a/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/MapBenchmarkBase.java b/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/MapBenchmarkBase.java
index 712f26f..0a3ce98 100644
--- a/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/MapBenchmarkBase.java
+++ b/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/MapBenchmarkBase.java
@@ -23,6 +23,7 @@
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
@@ -55,7 +56,7 @@
protected Map<UnsignedInteger, String> map;
protected Map<UnsignedInteger, String> filledMap;
- @Setup
+ @Setup(Level.Invocation)
public void init() {
this.random.setSeed(System.currentTimeMillis());
this.map = createMap();
diff --git a/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMapBenchmark.java b/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMapBenchmark.java
index 994fdee..2cc0aed 100644
--- a/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMapBenchmark.java
+++ b/protonj2-performance-tests/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMapBenchmark.java
@@ -20,6 +20,7 @@
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.RunnerException;
@@ -37,7 +38,7 @@
private SplayMap<String> sqFilledMap;
@Override
- @Setup
+ @Setup(Level.Invocation)
public void init() {
super.init();
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
index e3e63e2..65ef02f 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonIncomingDelivery.java
@@ -303,6 +303,10 @@
return deliveryId;
}
+ int getDeliveryIdInt() {
+ return (int) deliveryId;
+ }
+
ProtonIncomingDelivery aborted() {
aborted = true;
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonOutgoingDelivery.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonOutgoingDelivery.java
index b550504..e6bc954 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonOutgoingDelivery.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonOutgoingDelivery.java
@@ -277,6 +277,10 @@
return deliveryId;
}
+ int getDeliveryIdInt() {
+ return (int) deliveryId;
+ }
+
void setDeliveryId(long deliveryId) {
this.deliveryId = deliveryId;
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
index 6b74d14..a0d4bd9 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
@@ -31,8 +31,7 @@
import org.apache.qpid.protonj2.engine.Session;
import org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException;
import org.apache.qpid.protonj2.engine.util.DeliveryIdTracker;
-import org.apache.qpid.protonj2.engine.util.LinkedSplayMap;
-import org.apache.qpid.protonj2.engine.util.SplayMap;
+import org.apache.qpid.protonj2.engine.util.UnsettledMap;
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.transport.Attach;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
@@ -54,7 +53,8 @@
private final ProtonSessionIncomingWindow sessionWindow;
private final DeliveryIdTracker currentDeliveryId = new DeliveryIdTracker();
- private final SplayMap<ProtonIncomingDelivery> unsettled = new LinkedSplayMap<>();
+ private final UnsettledMap<ProtonIncomingDelivery> unsettled =
+ new UnsettledMap<ProtonIncomingDelivery>(ProtonIncomingDelivery::getDeliveryIdInt);
private DeliveryState defaultDeliveryState;
private LinkCreditState drainStateSnapshot;
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
index a0ba9f6..e883969 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
@@ -30,8 +30,7 @@
import org.apache.qpid.protonj2.engine.Sender;
import org.apache.qpid.protonj2.engine.Session;
import org.apache.qpid.protonj2.engine.util.DeliveryIdTracker;
-import org.apache.qpid.protonj2.engine.util.LinkedSplayMap;
-import org.apache.qpid.protonj2.engine.util.SplayMap;
+import org.apache.qpid.protonj2.engine.util.UnsettledMap;
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.transport.Attach;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
@@ -48,7 +47,8 @@
private final ProtonSessionOutgoingWindow sessionWindow;
private final DeliveryIdTracker currentDeliveryId = new DeliveryIdTracker();
- private final SplayMap<ProtonOutgoingDelivery> unsettled = new LinkedSplayMap<>();
+ private final UnsettledMap<ProtonOutgoingDelivery> unsettled =
+ new UnsettledMap<ProtonOutgoingDelivery>(ProtonOutgoingDelivery::getDeliveryIdInt);
private EventHandler<OutgoingDelivery> deliveryUpdatedEventHandler = null;
private EventHandler<Sender> linkCreditUpdatedHandler = null;
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
index e6ec2a5..59bf6be 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
@@ -16,14 +16,10 @@
*/
package org.apache.qpid.protonj2.engine.impl;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
-
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException;
import org.apache.qpid.protonj2.engine.util.SequenceNumber;
-import org.apache.qpid.protonj2.engine.util.SplayMap;
+import org.apache.qpid.protonj2.engine.util.UnsettledMap;
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.transport.Begin;
import org.apache.qpid.protonj2.types.transport.Disposition;
@@ -61,7 +57,8 @@
private long maxFrameSize;
private long incomingBytes;
- private SplayMap<ProtonIncomingDelivery> unsettled = new SplayMap<>();
+ private UnsettledMap<ProtonIncomingDelivery> unsettled =
+ new UnsettledMap<>(ProtonIncomingDelivery::getDeliveryIdInt);
public ProtonSessionIncomingWindow(ProtonSession session) {
this.session = session;
@@ -174,33 +171,17 @@
return disposition;
}
- private static void handleRangedDisposition(NavigableMap<UnsignedInteger, ProtonIncomingDelivery> unsettled, Disposition disposition) {
- final UnsignedInteger first = UnsignedInteger.valueOf(disposition.getFirst());
- final UnsignedInteger last = UnsignedInteger.valueOf(disposition.getLast());
-
- // Dispositions cover a contiguous range in the map requires a single sub-map
- // which we can iterate whereas a range that wraps requires two iterations over
- // a split between the higher portion and the lower portion of the map.
- if (first.compareTo(last) <= 0) {
- handleDispositions(unsettled.subMap(first, true, last, true), disposition);
+ private static void handleRangedDisposition(UnsettledMap<ProtonIncomingDelivery> unsettled, Disposition disposition) {
+ // Dispositions cover a contiguous range in the map and since the tracker always moves forward
+ // when appending new deliveries the range can wrap without needing a second iteration.
+ if (disposition.getSettled()) {
+ unsettled.removeEach((int) disposition.getFirst(), (int) disposition.getLast(), (delivery) -> {
+ delivery.getLink().remoteDisposition(disposition, delivery);
+ });
} else {
- handleDispositions(unsettled.tailMap(first, true), disposition);
- handleDispositions(unsettled.headMap(last, true), disposition);
- }
- }
-
- private static void handleDispositions(Map<UnsignedInteger, ProtonIncomingDelivery> deliveries, Disposition disposition) {
- final boolean settled = disposition.getSettled();
-
- Iterator<ProtonIncomingDelivery> deliveriesIter = deliveries.values().iterator();
- while (deliveriesIter.hasNext()) {
- ProtonIncomingDelivery delivery = deliveriesIter.next();
-
- if (settled) {
- deliveriesIter.remove();
- }
-
- delivery.getLink().remoteDisposition(disposition, delivery);
+ unsettled.forEach((int) disposition.getFirst(), (int) disposition.getLast(), (delivery) -> {
+ delivery.getLink().remoteDisposition(disposition, delivery);
+ });
}
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
index b60ef17..c25e5b4 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
@@ -16,16 +16,12 @@
*/
package org.apache.qpid.protonj2.engine.impl;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
import java.util.Set;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.engine.OutgoingAMQPEnvelope;
-import org.apache.qpid.protonj2.engine.util.SplayMap;
+import org.apache.qpid.protonj2.engine.util.UnsettledMap;
import org.apache.qpid.protonj2.types.DeliveryTag;
-import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.transport.Begin;
import org.apache.qpid.protonj2.types.transport.Disposition;
import org.apache.qpid.protonj2.types.transport.Flow;
@@ -64,7 +60,8 @@
private Runnable outgoingFrameWriteComplete = this::handleOutgoingFrameWriteComplete;
- private final SplayMap<ProtonOutgoingDelivery> unsettled = new SplayMap<>();
+ private final UnsettledMap<ProtonOutgoingDelivery> unsettled =
+ new UnsettledMap<ProtonOutgoingDelivery>(ProtonOutgoingDelivery::getDeliveryIdInt);
public ProtonSessionOutgoingWindow(ProtonSession session) {
this.session = session;
@@ -237,33 +234,17 @@
return disposition;
}
- private static void handleRangedDisposition(NavigableMap<UnsignedInteger, ProtonOutgoingDelivery> unsettled, Disposition disposition) {
- final UnsignedInteger first = UnsignedInteger.valueOf(disposition.getFirst());
- final UnsignedInteger last = UnsignedInteger.valueOf(disposition.getLast());
-
- // Dispositions cover a contiguous range in the map requires a single sub-map
- // which we can iterate whereas a range that wraps requires two iterations over
- // a split between the higher portion and the lower portion of the map.
- if (first.compareTo(last) <= 0) {
- handleDispositions(unsettled.subMap(first, true, last, true), disposition);
+ private static void handleRangedDisposition(UnsettledMap<ProtonOutgoingDelivery> unsettled, Disposition disposition) {
+ // Dispositions cover a contiguous range in the map and since the tracker always moves forward
+ // when appending new deliveries the range can wrap without needing a second iteration.
+ if (disposition.getSettled()) {
+ unsettled.removeEach((int) disposition.getFirst(), (int) disposition.getLast(), (delivery) -> {
+ delivery.getLink().remoteDisposition(disposition, delivery);
+ });
} else {
- handleDispositions(unsettled.tailMap(first, true), disposition);
- handleDispositions(unsettled.headMap(last, true), disposition);
- }
- }
-
- private static void handleDispositions(Map<UnsignedInteger, ProtonOutgoingDelivery> deliveries, Disposition disposition) {
- final boolean settled = disposition.getSettled();
-
- Iterator<ProtonOutgoingDelivery> deliveriesIter = deliveries.values().iterator();
- while (deliveriesIter.hasNext()) {
- ProtonOutgoingDelivery delivery = deliveriesIter.next();
-
- if (settled) {
- deliveriesIter.remove();
- }
-
- delivery.getLink().remoteDisposition(disposition, delivery);
+ unsettled.forEach((int) disposition.getFirst(), (int) disposition.getLast(), (delivery) -> {
+ delivery.getLink().remoteDisposition(disposition, delivery);
+ });
}
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java
new file mode 100644
index 0000000..cf362df
--- /dev/null
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java
@@ -0,0 +1,1128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.util;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.qpid.protonj2.types.UnsignedInteger;
+
+/**
+ * A specialized collection like entity that is used to keep track of unsettled
+ * incoming and outgoing deliveries for AMQP links and the sessions that manage
+ * those links.
+ *
+ * @param <Delivery> The delivery type being tracker (incoming or outgoing)
+ */
+public class UnsettledMap<Delivery> implements Map<UnsignedInteger, Delivery> {
+
+ public interface UnsettledGetDeliveryId<Delivery> {
+ int getDeliveryId(Delivery delivery);
+ }
+
+ private final double BUCKET_LOAD_FACTOR_MULTIPLIER = 0.30;
+
+ private static final int UNSETTLED_INITIAL_BUCKETS = 2;
+ private static final int UNSETTLED_BUCKET_SIZE = 256;
+
+ // Always full buckets used in operations that needs unwritable bounds
+ private final UnsettledBucket<Delivery> ALWAYS_FULL_BUCKET = new UnsettledBucket<>();
+
+ private final UnsettledGetDeliveryId<Delivery> deliveryIdSupplier;
+ private final int bucketCapacity;
+ private final int bucketLowWaterMark;
+
+ private int totalEntries;
+ private int modCount;
+
+ private int current;
+ private UnsettledBucket<Delivery>[] buckets;
+
+ public UnsettledMap(UnsettledGetDeliveryId<Delivery> idSupplier) {
+ this(idSupplier, UNSETTLED_INITIAL_BUCKETS, UNSETTLED_BUCKET_SIZE);
+ }
+
+ public UnsettledMap(UnsettledGetDeliveryId<Delivery> idSupplier, int initialBuckets) {
+ this(idSupplier, initialBuckets, UNSETTLED_BUCKET_SIZE);
+ }
+
+ @SuppressWarnings("unchecked")
+ public UnsettledMap(UnsettledGetDeliveryId<Delivery> idSupplier, int initialBuckets, int bucketSize) {
+ this.deliveryIdSupplier = idSupplier;
+ this.bucketCapacity = bucketSize;
+ this.bucketLowWaterMark = (int) (bucketSize * BUCKET_LOAD_FACTOR_MULTIPLIER);
+
+ if (bucketSize < 1) {
+ throw new IllegalArgumentException("The bucket size must be greater than zero");
+ }
+
+ if (initialBuckets < 1) {
+ throw new IllegalArgumentException("The initial number of buckets must be at least 1");
+ }
+
+ buckets = new UnsettledBucket[initialBuckets];
+ for (int i = 0; i < buckets.length; ++i) {
+ buckets[i] = new UnsettledBucket<>(bucketSize, deliveryIdSupplier);
+ }
+ }
+
+ @Override
+ public void putAll(Map<? extends UnsignedInteger, ? extends Delivery> source) {
+ for (Entry<? extends UnsignedInteger, ? extends Delivery> entry : source.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void clear() {
+ for (int i = 0; i < buckets.length && buckets[i].isReadable(); ++i) {
+ buckets[i].clear(); // Ensure any referenced deliveries are cleared
+ }
+
+ current = totalEntries = 0;
+ }
+
+ @Override
+ public Delivery put(UnsignedInteger key, Delivery value) {
+ return put(key.intValue(), value);
+ }
+
+ /**
+ * Adds the given key and value pair in this tracking structure at the end of the current series.
+ * <p>
+ * If the map previously contained a mapping for the key, the old value is not replaced by the specified
+ * value unlike a traditional map as this structure is tracking the running series of values. This would
+ * imply that duplicates can exist in the tracker, however given the likelihood of this occurring in the
+ * normal flow of deliveries should be considered extremely low.
+ *
+ * @param deliveryId
+ * The delivery ID of the delivery being added to this tracker.
+ * @param delivery
+ * The delivery that is being added to the tracker
+ *
+ * @return null in all cases as this tracker does not check for duplicates.
+ */
+ public Delivery put(int deliveryId, Delivery delivery) {
+ if (!buckets[current].put(deliveryId, delivery)) {
+ // Always move to next bucket or create one so that current is always
+ // position on a writable bucket.
+ if (++current == buckets.length) {
+ // Create a new bucket of entries since we don't have any more space free yet
+ // and update the chain with the newly create bucket. We disregard the max segments
+ // here since we always need the extra space regardless of how many pending unsettled
+ // deliveries there are.
+ buckets = Arrays.copyOf(buckets, current + 1);
+ buckets[current] = new UnsettledBucket<>(bucketCapacity, deliveryIdSupplier);
+ }
+
+ // Moved on after overflow so we know this one will work.
+ buckets[current].put(deliveryId, delivery);
+ }
+
+ totalEntries++;
+ modCount++;
+
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return totalEntries;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return totalEntries == 0;
+ }
+
+ @Override
+ public Delivery get(Object key) {
+ return get(Number.class.cast(key).intValue());
+ }
+
+ public Delivery get(int deliveryId) {
+ if (totalEntries == 0) {
+ return null;
+ }
+
+ // Search every bucket because delivery IDs can wrap around, but we can
+ // stop at the first empty bucket as all buckets following it must also
+ // be empty buckets.
+ for (int i = 0; i <= current; ++i) {
+ if (buckets[i].isInRange(deliveryId)) {
+ final Delivery result = buckets[i].get(deliveryId);
+ if (result != null) {
+ return result;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public Delivery remove(Object key) {
+ return removeValue(Number.class.cast(key).intValue());
+ }
+
+ public Delivery remove(int deliveryId) {
+ return removeValue(deliveryId);
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return containsKey(Number.class.cast(key).intValue());
+ }
+
+ public boolean containsKey(int key) {
+ if (totalEntries > 0) {
+ for (int i = 0; i <= current; ++i) {
+ if (buckets[i].isInRange(key)) {
+ if (buckets[i].get(key) != null) {
+ return true;
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ if (totalEntries > 0 && value != null) {
+ for (int i = 0; i <= current; ++i) {
+ for (int j = buckets[i].readOffset; j < buckets[i].writeOffset; ++j) {
+ if (value.equals(buckets[i].entryAt(j))) {
+ return true;
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public void forEach(Consumer<Delivery> action) {
+ Objects.requireNonNull(action);
+
+ if (totalEntries == 0) {
+ return;
+ }
+
+ for (int i = 0; i <= current; ++i) {
+ for (int j = buckets[i].readOffset; j < buckets[i].writeOffset; ++j) {
+ action.accept(buckets[i].entryAt(j));
+ }
+ }
+ }
+
+ /**
+ * Visits each entry within the given range and invokes the provided action
+ * on each delivery in the tracker.
+ *
+ * @param first
+ * The first entry to visit.
+ * @param last
+ * The last entry to visit.
+ * @param action
+ * The action to invoke on each visited entry.
+ */
+ public void forEach(int first, int last, Consumer<Delivery> action) {
+ Objects.requireNonNull(action);
+
+ if (totalEntries == 0) {
+ return;
+ }
+
+ boolean foundFirst = false;
+ boolean foundLast = false;
+
+ for (int i = 0; i <= current && !foundLast; ++i) {
+ if (!foundFirst && !buckets[i].isInRange(first)) {
+ continue;
+ }
+
+ for (int j = buckets[i].readOffset; j < buckets[i].writeOffset && !foundLast; ++j) {
+ final Delivery delivery = buckets[i].entryAt(j);
+ final int deliveryId = deliveryIdSupplier.getDeliveryId(delivery);
+
+ foundFirst = foundFirst || deliveryId == first;
+ foundLast = deliveryId == last;
+
+ if (foundFirst) {
+ action.accept(delivery);
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove each entry within the given range of delivery IDs. For each entry
+ * removed the provided action is triggered allowing the caller to be notified
+ * of each removal.
+ *
+ * @param first
+ * The first entry to remove
+ * @param last
+ * The last entry to remove
+ * @param action
+ * The action to invoke on each remove.
+ */
+ public void removeEach(int first, int last, Consumer<Delivery> action) {
+ Objects.requireNonNull(action);
+
+ if (totalEntries == 0) {
+ return;
+ }
+
+ boolean foundFirst = false;
+ boolean foundLast = false;
+ int removeStart = 0;
+ int removeEnd = 0;
+
+ for (int i = 0; i <= current && !foundLast; ++i) {
+ if (!foundFirst && !buckets[i].isInRange(first)) {
+ continue;
+ }
+
+ final UnsettledBucket<Delivery> bucket = buckets[i];
+
+ for (int j = removeStart = removeEnd = bucket.readOffset; j < bucket.writeOffset && !foundLast; ) {
+ final Delivery delivery = bucket.entryAt(j);
+ final int deliveryId = deliveryIdSupplier.getDeliveryId(delivery);
+
+ foundFirst = foundFirst || deliveryId == first;
+ foundLast = deliveryId == last;
+
+ if (foundFirst) {
+ action.accept(delivery);
+ removeEnd = ++j;
+ } else {
+ removeStart = removeEnd = ++j;
+ }
+ }
+
+ // We found first so this iteration did clear some elements from the current bucket
+ // and we need to check that this removal cleared a full bucket which means the index
+ // needs to shift back one since we will have recycled the empty bucket. When the last
+ // index is found we should also attempt to compact the bucket with the previous one
+ // to reduce fragmentation.
+ if (foundFirst) {
+ i = removeRange(i, removeStart, removeEnd, foundLast) ? --i : i;
+ }
+ }
+ }
+
+ private boolean removeRange(int bucketIndex, int start, int end, boolean compact) {
+ final UnsettledBucket<Delivery> bucket = buckets[bucketIndex];
+ final int removals = end - start;
+
+ this.totalEntries -= removals;
+ this.modCount++;
+
+ if (removals == bucket.entries) {
+ return recycleBucket(bucketIndex);
+ } else {
+ System.arraycopy(bucket.deliveries, end, bucket.deliveries, start, bucket.writeOffset - end);
+ Arrays.fill(bucket.deliveries, bucket.writeOffset - removals, bucket.writeOffset, null);
+
+ bucket.writeOffset = bucket.writeOffset - removals;
+ bucket.entries -= removals;
+ bucket.highestDeliveryId = bucket.entryIdAt(bucket.writeOffset - 1);
+
+ if (compact) {
+ return tryCompact(bucketIndex);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void forEach(BiConsumer<? super UnsignedInteger, ? super Delivery> action) {
+ Objects.requireNonNull(action);
+
+ if (totalEntries == 0) {
+ return;
+ }
+
+ for (int i = 0; i <= current; ++i) {
+ for (int j = buckets[i].readOffset; j < buckets[i].writeOffset; ++j) {
+ final Delivery delivery = buckets[i].entryAt(j);
+ action.accept(UnsignedInteger.valueOf(deliveryIdSupplier.getDeliveryId(delivery)), delivery);
+ }
+ }
+ }
+
+ @Override
+ public Collection<Delivery> values() {
+ if (values == null) {
+ values = new UnsettledTackingMapValues();
+ }
+
+ return values;
+ }
+
+ @Override
+ public Set<UnsignedInteger> keySet() {
+ if (keySet == null) {
+ keySet = new UnsettledTackingMapKeys();
+ }
+
+ return this.keySet;
+ }
+
+ @Override
+ public Set<Entry<UnsignedInteger, Delivery>> entrySet() {
+ if (entrySet == null) {
+ entrySet = new UnsettledTackingMapEntries();
+ }
+
+ return this.entrySet;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof Map)) {
+ return false;
+ }
+
+ Map<?,?> m = (Map<?,?>) o;
+ if (m.size() != size()) {
+ return false;
+ }
+
+ try {
+ for (int i = 0; i <= current; ++i) {
+ for (int j = buckets[i].readOffset; j < buckets[i].writeOffset; ++j) {
+ final Delivery delivery = buckets[i].entryAt(j);
+ if (!delivery.equals(m.get(deliveryIdSupplier.getDeliveryId(delivery)))) {
+ return false;
+ }
+ }
+ }
+ } catch (ClassCastException | NullPointerException ignored) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "UnsettledMap: { size=" + totalEntries +
+ " buckets=" + buckets.length +
+ " bucket-capacity=" + bucketCapacity + " }";
+ }
+
+ //----- Internal UnsettledMap API
+
+ private boolean recycleBucket(int index) {
+ buckets[index].clear(); // Drop all content and reset as empty bucket
+
+ // If the current bucket is cleared then we can just start writing in it again
+ if (index < current) {
+ current--;
+ final int next = index + 1;
+ UnsettledBucket<Delivery> recycled = buckets[index];
+ System.arraycopy(buckets, next, buckets, index, buckets.length - next);
+ buckets[buckets.length - 1] = recycled;
+ return true;
+ }
+
+ return false;
+ }
+
+ private Delivery removeValue(int deliveryId) {
+ if (totalEntries > 0) {
+ Delivery result = null;
+
+ for (int i = 0; i <= current; ++i) {
+ if (buckets[i].isInRange(deliveryId) && (result = buckets[i].remove(deliveryId)) != null) {
+ totalEntries--;
+ modCount++;
+
+ if (buckets[i].entries <= bucketLowWaterMark) {
+ tryCompact(i);
+ }
+
+ return result;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ // Called from iteration APIs which requires the method to return the location of the next
+ // entry once removal and possible bucket compaction is completed.
+ private long removeAt(int bucketIndex, int bucketEntry) {
+ if (bucketIndex >= buckets.length || bucketEntry >= UNSETTLED_BUCKET_SIZE) {
+ throw new IndexOutOfBoundsException(String.format(
+ "Cannot remove an entry from segment %d at index %d which is outside the tracked bounds", bucketIndex, bucketEntry));
+ }
+
+ final UnsettledBucket<Delivery> bucket = buckets[bucketIndex];
+
+ bucket.removeAt(bucketEntry);
+ totalEntries--;
+ modCount++;
+
+ long result = -1;
+
+ if (bucket.isReadable()) {
+ final int nextBucketIndex = bucketIndex + 1;
+ final int prevBucketIndex = bucketIndex - 1;
+
+ final UnsettledBucket<Delivery> nextBucket =
+ bucketIndex == current || Integer.compareUnsigned(bucket.highestDeliveryId, buckets[nextBucketIndex].lowestDeliveryId) > 0 ?
+ ALWAYS_FULL_BUCKET : buckets[nextBucketIndex];
+ final UnsettledBucket<Delivery> prevBucket =
+ bucketIndex == 0 || Integer.compareUnsigned(bucket.lowestDeliveryId, buckets[prevBucketIndex].highestDeliveryId) < 0 ?
+ ALWAYS_FULL_BUCKET : buckets[prevBucketIndex];
+
+ // As soon as compaction is possible move elements from this bucket into previous and next
+ // which reduces search times as there are fewer buckets to traverse/
+ if (nextBucket.getFreeSpace() + prevBucket.getFreeSpace() >= bucket.entries) {
+ final int toCopyBackward = Math.min(prevBucket.getFreeSpace(), bucket.entries);
+ final int nextEntryOffset = ++bucketEntry - (bucket.readOffset + toCopyBackward);
+
+ if (nextEntryOffset < 0) {
+ // Moved into the previous bucket so the index being negative
+ // give us the located when added to the previous write offset
+ result = (long) (prevBucketIndex) << 32;
+ result = prevBucket.writeOffset + nextEntryOffset;
+ } else if (nextBucket.entries + (bucket.entries - toCopyBackward) > 0) {
+ // Moved into the next bucket gives of the raw index so long
+ // as we compact entries to zero (otherwise it is the read offset
+ result = (long) bucketIndex << 32;
+ result |= nextEntryOffset;
+ }
+
+ doCompaction(bucket, prevBucket, nextBucket);
+ recycleBucket(bucketIndex);
+ } else {
+ // If the element removed is not the last in this bucket then the next is just
+ // the next element otherwise it is the first element of the next bucket if it
+ // non-empty otherwise we reached the end of the elements.
+ if (++bucketEntry < bucket.writeOffset) {
+ result = (long) bucketIndex << 32;
+ result |= bucketEntry;
+ } else if (nextBucketIndex <= current && buckets[nextBucketIndex].entries > 0) {
+ result = (long) nextBucketIndex << 32;
+ result |= buckets[nextBucketIndex].readOffset;
+ }
+ }
+ } else {
+ recycleBucket(bucketIndex);
+
+ // The bucket was empty and will be recycled shifting down all buckets following it and if
+ // there is a next non-empty bucket then the next entry is the first entry in that bucket.
+ if (bucketIndex <= current && buckets[bucketIndex].entries > 0) {
+ result = (long) bucketIndex << 32;
+ result |= buckets[bucketIndex].readOffset;
+ }
+ }
+
+ return result;
+ }
+
+ private final void doCompaction(UnsettledBucket<Delivery> bucket, UnsettledBucket<Delivery> prev, UnsettledBucket<Delivery> next) {
+ if (prev.getFreeSpace() > 0) {
+ final int toCopy = Math.min(bucket.entries, prev.getFreeSpace());
+
+ // We always compact to zero which makes some of the other updates simpler and
+ // can allow for easier compaction in the future.
+ if (prev.readOffset != 0) {
+ System.arraycopy(prev.deliveries, prev.readOffset, prev.deliveries, 0, prev.entries);
+ if (prev.writeOffset > prev.entries + toCopy) {
+ // Ensure no dangling entries after compaction
+ Arrays.fill(prev.deliveries, prev.entries, prev.writeOffset, null);
+ }
+
+ prev.writeOffset -= prev.readOffset;
+ prev.readOffset = 0;
+ }
+
+ System.arraycopy(bucket.deliveries, bucket.readOffset, prev.deliveries, prev.writeOffset, toCopy);
+
+ prev.entries += toCopy;
+ prev.writeOffset = prev.entries;
+ prev.highestDeliveryId = prev.entryIdAt(prev.writeOffset - 1);
+
+ bucket.entries -= toCopy;
+ bucket.writeOffset -= toCopy;
+ bucket.readOffset += toCopy;
+ }
+
+ // We didn't get them all into the previous bucket but we know that if we are
+ // here then there must be space ahead to accept the rest as we already checked.
+ if (bucket.entries > 0) {
+ if (next.readOffset != bucket.entries) {
+ System.arraycopy(next.deliveries, next.readOffset, next.deliveries, bucket.entries, next.entries);
+ if (next.readOffset < bucket.entries) {
+ // Ensure no dangling entries after compaction
+ Arrays.fill(next.deliveries, bucket.entries + next.entries, next.deliveries.length, null);
+ }
+ }
+
+ System.arraycopy(bucket.deliveries, bucket.readOffset, next.deliveries, 0, bucket.entries);
+
+ next.readOffset = 0;
+ next.entries += bucket.entries;
+ next.writeOffset = next.entries;
+ next.lowestDeliveryId = next.entryIdAt(0);
+ // We set this since the next bucket could be empty in some cases so it would
+ // need to be initialized.
+ next.highestDeliveryId = next.entryIdAt(next.writeOffset - 1);
+ }
+ }
+
+ // This variant is called from the Map API remove methods and doesn't need to track bucket
+ // compaction results or next element locations which increases performance in this case.
+ private boolean tryCompact(int bucketIndex) {
+ final UnsettledBucket<Delivery> bucket = buckets[bucketIndex];
+
+ final int nextBucketIndex = bucketIndex + 1;
+ final int prevBucketIndex = bucketIndex - 1;
+
+ if (bucket.isReadable()) {
+ final UnsettledBucket<Delivery> nextBucket =
+ bucketIndex == current || Integer.compareUnsigned(bucket.highestDeliveryId, buckets[nextBucketIndex].lowestDeliveryId) > 0 ?
+ ALWAYS_FULL_BUCKET : buckets[nextBucketIndex];
+ final UnsettledBucket<Delivery> prevBucket =
+ bucketIndex == 0 || Integer.compareUnsigned(bucket.lowestDeliveryId, buckets[prevBucketIndex].highestDeliveryId) < 0 ?
+ ALWAYS_FULL_BUCKET : buckets[prevBucketIndex];
+
+ // As soon as compaction is possible move elements from this bucket into previous and next
+ // which reduces search times as there are fewer buckets to traverse/
+ if (nextBucket.getFreeSpace() + prevBucket.getFreeSpace() >= bucket.entries) {
+ doCompaction(bucket, prevBucket, nextBucket);
+ return recycleBucket(bucketIndex);
+ }
+ } else {
+ return recycleBucket(bucketIndex);
+ }
+
+ return false;
+ }
+
+ //----- Internal bucket of delivery sequence
+
+ private static class UnsettledBucket<Delivery> {
+
+ private int readOffset;
+ private int writeOffset;
+ private int entries;
+ private int lowestDeliveryId = 0;
+ private int highestDeliveryId = 0;
+
+ private final Object[] deliveries;
+ private final UnsettledGetDeliveryId<Delivery> deliveryIdSupplier;
+
+ private UnsettledBucket() {
+ this.deliveryIdSupplier = null;
+ this.deliveries = new Object[0];
+ this.highestDeliveryId = UnsignedInteger.MAX_VALUE.intValue();
+ }
+
+ public UnsettledBucket(int bucketCapacity, UnsettledGetDeliveryId<Delivery> idSupplier) {
+ this.deliveryIdSupplier = idSupplier;
+ this.deliveries = new Object[bucketCapacity];
+ }
+
+ public boolean isReadable() {
+ return entries > 0;
+ }
+
+ public int getFreeSpace() {
+ return deliveries.length - entries;
+ }
+
+ public boolean isFull() {
+ return writeOffset == deliveries.length;
+ }
+
+ public boolean isInRange(int deliveryId) {
+ return Integer.compareUnsigned(deliveryId, lowestDeliveryId) >= 0 &&
+ Integer.compareUnsigned(deliveryId, highestDeliveryId) <= 0;
+ }
+
+ public boolean put(int deliveryId, Delivery delivery) {
+ // Reject an addition if full or if the delivery ID to be added is less that
+ // the highest id we have tracked as that likely indicates a roll-over of the
+ // IDs and must go into the next bucket so that this bucket is kept in order.
+ if (isFull() || (Integer.compareUnsigned(deliveryId, highestDeliveryId) <= 0 && entries > 0)) {
+ return false;
+ }
+
+ if (entries == 0) {
+ lowestDeliveryId = deliveryId;
+ }
+
+ highestDeliveryId = deliveryId;
+ deliveries[writeOffset++] = delivery;
+ entries++;
+
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Delivery get(int deliveryId) {
+ Delivery delivery;
+
+ // Be optimistic and assume the result is in the first entry and then if not search
+ // beyond that entry for the result.
+ if (deliveryIdSupplier.getDeliveryId(delivery = (Delivery) deliveries[readOffset]) == deliveryId) {
+ return delivery;
+ } else {
+ final int location = search(deliveryId, readOffset + 1, writeOffset);
+ if (location >= 0) {
+ return (Delivery) deliveries[location];
+ }
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Delivery entryAt(int index) {
+ return (Delivery) deliveries[index];
+ }
+
+ @SuppressWarnings("unchecked")
+ public int entryIdAt(int index) {
+ return deliveryIdSupplier.getDeliveryId((Delivery) deliveries[index]);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Delivery remove(int deliveryId) {
+ // Be optimistic and assume the result is in the first entry and then if not search
+ // beyond that entry for the result.
+ if (deliveryIdSupplier.getDeliveryId((Delivery) deliveries[readOffset]) == deliveryId) {
+ return removeAt(readOffset);
+ } else {
+ final int location = search(deliveryId, readOffset + 1, writeOffset);
+ if (location >= 0) {
+ return removeAt(location);
+ }
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Delivery removeAt(int bucketEntry) {
+ final Delivery delivery = (Delivery) deliveries[bucketEntry];
+
+ entries--;
+
+ // If not the readOffset we compact the entries to avoid null gaps in the entries
+ // which complicates searches and makes bulk assignments or copies impossible.
+ if (bucketEntry != readOffset) {
+ System.arraycopy(deliveries, readOffset, deliveries, readOffset + 1, bucketEntry - readOffset);
+ deliveries[readOffset++] = null;
+ // If we remove the last entry then we can reduce the highest delivery ID in this
+ // bucket to avoid false positive matches when randomly accessing elements unless
+ // unordered in which case there could be duplicate entries
+ if (bucketEntry == writeOffset) {
+ highestDeliveryId = deliveryIdSupplier.getDeliveryId((Delivery) deliveries[writeOffset - 1]);
+ }
+ } else {
+ deliveries[readOffset++] = null;
+ // We removed the first element meaning we now must increase the lowest entry to
+ // avoid false positives when accessing randomly unless unordered since there could
+ // be duplicates
+ if (entries > 0) {
+ lowestDeliveryId = deliveryIdSupplier.getDeliveryId((Delivery) deliveries[readOffset]);
+ }
+ }
+
+ return delivery;
+ }
+
+ public void clear() {
+ if (entries != 0) {
+ Arrays.fill(deliveries, null);
+ entries = 0;
+ }
+
+ // Ensures the first put always assigns this
+ lowestDeliveryId = UnsignedInteger.MAX_VALUE.intValue();
+ highestDeliveryId = 0;
+ writeOffset = 0;
+ readOffset = 0;
+ }
+
+ @Override
+ public String toString() {
+ return "UnsettledBucket { size=" + entries +
+ " roff=" + readOffset +
+ " woff=" + writeOffset +
+ " lowID=" + lowestDeliveryId +
+ " highID=" + highestDeliveryId + " }";
+ }
+
+ private static int BINARY_SEARCH_THRESHOLD = 64;
+
+ // Given the behavior of binary search it doesn't make sense to employ
+ // it on spans under a certain number if elements
+ private int search(int deliveryId, int fromIndex, int toIndex) {
+ if ((toIndex - fromIndex) < BINARY_SEARCH_THRESHOLD) {
+ return linearSearch(deliveryId, fromIndex, toIndex);
+ } else {
+ return binarySearch(deliveryId, fromIndex, toIndex);
+ }
+ }
+
+ // Must use our own to avoid boxing for unsigned integer comparison.
+ // fromIndex is inclusive since we search from readOffset normally
+ // toIndex is exclusive since we search to writeOffset normally.
+ @SuppressWarnings("unchecked")
+ private int binarySearch(int deliveryId, int fromIndex, int toIndex) {
+ int low = fromIndex;
+ int high = toIndex - 1;
+
+ while (low <= high) {
+ final int mid = (low + high) >>> 1;
+ final int midDeliveryId = deliveryIdSupplier.getDeliveryId((Delivery) deliveries[mid]);
+ final int cmp = UnsignedInteger.compare(midDeliveryId, deliveryId);
+
+ if (cmp < 0) {
+ low = mid + 1;
+ } else if (cmp > 0) {
+ high = mid - 1;
+ } else {
+ return mid; // first matching delivery
+ }
+ }
+
+ return -1; // signal that delivery ID is not in this bucket
+ }
+
+ // Must use our own to avoid boxing for unsigned integer comparison.
+ // fromIndex is inclusive since we search from readOffset normally
+ // toIndex is exclusive since we search to writeOffset normally.
+ @SuppressWarnings("unchecked")
+ private int linearSearch(int deliveryId, int fromIndex, int toIndex) {
+ for (int i = fromIndex; i < toIndex; ++i) {
+ final int idAtIndex = deliveryIdSupplier.getDeliveryId((Delivery) deliveries[i]);
+ final int comp = UnsignedInteger.compare(idAtIndex, deliveryId);
+
+ if (comp == 0) {
+ return i;
+ } else if (comp > 0) {
+ // Can't be in this bucket because we already found a larger value
+ break;
+ }
+ }
+
+ return -1;
+ }
+ }
+
+ //----- Internal cached values for the various collection type access objects
+
+ // Once requested we will create an store a single instance to a collection
+ // with no state for each of the key, values ,entries types. Since the types do
+ // not have state the trivial race on create is not important to the eventual
+ // outcome of having a cached instance.
+
+ protected Set<UnsignedInteger> keySet;
+ protected Collection<Delivery> values;
+ protected Set<Entry<UnsignedInteger, Delivery>> entrySet;
+
+ //----- Unsettled Tracking Map Collection types
+
+ private final class UnsettledTackingMapValues extends AbstractCollection<Delivery> {
+
+ @Override
+ public Iterator<Delivery> iterator() {
+ return new UnsettledTrackingMapValuesIterator(0);
+ }
+
+ @Override
+ public int size() {
+ return UnsettledMap.this.totalEntries;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return UnsettledMap.this.containsValue(o);
+ }
+
+ @Override
+ public boolean remove(Object target) {
+ @SuppressWarnings("unchecked")
+ final int targetId = UnsettledMap.this.deliveryIdSupplier.getDeliveryId((Delivery) target);
+
+ return UnsettledMap.this.remove(targetId) != null;
+ }
+
+ @Override
+ public void clear() {
+ UnsettledMap.this.clear();
+ }
+ }
+
+ private final class UnsettledTackingMapKeys extends AbstractSet<UnsignedInteger> {
+
+ @Override
+ public Iterator<UnsignedInteger> iterator() {
+ return new UnsettledTrackingMapKeysIterator(0);
+ }
+
+ @Override
+ public int size() {
+ return UnsettledMap.this.totalEntries;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return UnsettledMap.this.containsKey(o);
+ }
+
+ @Override
+ public boolean remove(Object target) {
+ return UnsettledMap.this.remove(Number.class.cast(target).intValue()) != null;
+ }
+
+ @Override
+ public void clear() {
+ UnsettledMap.this.clear();
+ }
+ }
+
+ private final class UnsettledTackingMapEntries extends AbstractSet<Map.Entry<UnsignedInteger, Delivery>> {
+
+ @Override
+ public Iterator<Map.Entry<UnsignedInteger, Delivery>> iterator() {
+ return new UnsettledTrackingMapEntryIterator(0);
+ }
+
+ @Override
+ public int size() {
+ return UnsettledMap.this.totalEntries;
+ }
+
+ @Override
+ public boolean contains(Object target) {
+ if (target instanceof Map.Entry) {
+ @SuppressWarnings("unchecked")
+ final Entry<? extends UnsignedInteger, ? extends Delivery> entry =
+ (Entry<? extends UnsignedInteger, ? extends Delivery>) target;
+ return UnsettledMap.this.containsKey(entry.getKey());
+ }
+
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean remove(Object target) {
+ if (target instanceof Map.Entry) {
+ final Entry<? extends UnsignedInteger, ? extends Delivery> entry =
+ (Entry<? extends UnsignedInteger, ? extends Delivery>) target;
+ return UnsettledMap.this.remove(entry.getKey()) != null;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void clear() {
+ UnsettledMap.this.clear();
+ }
+ }
+
+ //----- Map Iterator implementation for EntrySet, KeySet and Values collections
+
+ // Base class iterator that can be used for the collections returned from the Map
+ private abstract class UnsettledTrackingMapIterator<T> implements Iterator<T> {
+
+ protected int currentBucket;
+ protected int readOffset;
+
+ protected T lastReturned;
+ protected int lastReturnedbucket;
+ protected int lastReturnedbucketIndex;
+
+ protected int expectedModCount;
+
+ public UnsettledTrackingMapIterator(int startAt) {
+ this.currentBucket = buckets[startAt].isReadable() ? startAt : -1;
+ this.readOffset = buckets[startAt].isReadable() ? buckets[startAt].readOffset : -1;
+ this.expectedModCount = UnsettledMap.this.modCount;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return readOffset >= 0;
+ }
+
+ @Override
+ public T next() {
+ if (readOffset == -1) {
+ throw new NoSuchElementException();
+ }
+ if (expectedModCount != UnsettledMap.this.modCount) {
+ throw new ConcurrentModificationException();
+ }
+
+ lastReturnedbucket = currentBucket;
+ lastReturnedbucketIndex = readOffset;
+ lastReturned = entryAt(currentBucket, readOffset);
+ successor();
+
+ return lastReturned;
+ }
+
+ protected abstract T entryAt(int bucketIndex, int bucketEntry);
+
+ @Override
+ public void remove() {
+ if (lastReturned == null) {
+ throw new IllegalStateException("Cannot remove entry when next has not been called");
+ }
+ if (modCount != expectedModCount) {
+ throw new ConcurrentModificationException();
+ }
+
+ long next = UnsettledMap.this.removeAt(lastReturnedbucket, lastReturnedbucketIndex);
+ if (next >= 0) {
+ currentBucket = (int) (next >> 32);
+ readOffset = (int) (next & 0xFFFFFFFF);
+ } else {
+ readOffset = -1;
+ }
+
+ expectedModCount = modCount;
+ lastReturned = null;
+ }
+
+ private void successor() {
+ if (++readOffset == buckets[currentBucket].writeOffset) {
+ currentBucket++;
+ if (currentBucket < buckets.length && buckets[currentBucket].isReadable()) {
+ readOffset = buckets[currentBucket].readOffset;
+ } else {
+ readOffset = -1;
+ }
+ }
+ }
+ }
+
+ private final class UnsettledTrackingMapValuesIterator extends UnsettledTrackingMapIterator<Delivery> {
+
+ public UnsettledTrackingMapValuesIterator(int startAt) {
+ super(startAt);
+ }
+
+ @Override
+ protected Delivery entryAt(int bucketIndex, int bucketEntry) {
+ return buckets[currentBucket].entryAt(bucketEntry);
+ }
+ }
+
+ private final class UnsettledTrackingMapKeysIterator extends UnsettledTrackingMapIterator<UnsignedInteger> {
+
+ public UnsettledTrackingMapKeysIterator(int startAt) {
+ super(startAt);
+ }
+
+ @Override
+ protected UnsignedInteger entryAt(int bucketIndex, int bucketEntry) {
+ return UnsignedInteger.valueOf(deliveryIdSupplier.getDeliveryId(buckets[currentBucket].entryAt(bucketEntry)));
+ }
+ }
+
+ private final class UnsettledTrackingMapEntryIterator extends UnsettledTrackingMapIterator<Entry<UnsignedInteger, Delivery>> {
+
+ public UnsettledTrackingMapEntryIterator(int startAt) {
+ super(startAt);
+ }
+
+ @Override
+ protected Entry<UnsignedInteger, Delivery> entryAt(int bucketIndex, int bucketEntry) {
+ final Delivery delivery = buckets[currentBucket].entryAt(bucketEntry);
+
+ return new ImmutableUnsettledTrackingkMapEntry<Delivery>(deliveryIdSupplier.getDeliveryId(delivery), delivery);
+ }
+ }
+
+ /**
+ * An immutable {@link Map} entry that can be used when exposing raw entry mappings
+ * via the {@link Map} API.
+ *
+ * @param <Delivery> Type of the value portion of this immutable entry.
+ */
+ public static class ImmutableUnsettledTrackingkMapEntry<Delivery> implements Map.Entry<UnsignedInteger, Delivery> {
+
+ private final int key;
+ private final Delivery value;
+
+ /**
+ * Create a new immutable {@link Map} entry.
+ *
+ * @param key
+ * The inner {@link Map} key that is wrapped.
+ * @param value
+ * The inner {@link Map} value that is wrapped.
+ */
+ public ImmutableUnsettledTrackingkMapEntry(int key, Delivery value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public UnsignedInteger getKey() {
+ return UnsignedInteger.valueOf(key);
+ }
+
+ /**
+ * @return the primitive integer view of the unsigned key.
+ */
+ public int getPrimitiveKey() {
+ return key;
+ }
+
+ @Override
+ public Delivery getValue() {
+ return value;
+ }
+
+ @Override
+ public Delivery setValue(Delivery value) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java
new file mode 100644
index 0000000..e6889b0
--- /dev/null
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java
@@ -0,0 +1,1939 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.protonj2.logging.ProtonLogger;
+import org.apache.qpid.protonj2.logging.ProtonLoggerFactory;
+import org.apache.qpid.protonj2.types.UnsignedInteger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the unsettled delivery tacker
+ */
+public class UnsettledMapTest {
+
+ protected static final ProtonLogger LOG = ProtonLoggerFactory.getLogger(SplayMapTest.class);
+
+ protected long seed;
+ protected Random random;
+ protected UnsignedInteger uintArray[] = new UnsignedInteger[1000];
+ protected DeliveryType objArray[] = new DeliveryType[1000];
+ protected UnsettledMap<DeliveryType> tracker;
+
+ @BeforeEach
+ public void setUp() {
+ seed = System.nanoTime();
+ random = new Random();
+ random.setSeed(seed);
+
+ tracker = new UnsettledMap<>(DeliveryType::getDeliveryId);
+
+ for (int i = 1; i <= objArray.length; i++) {
+ UnsignedInteger x = uintArray[i - 1] = UnsignedInteger.valueOf(i);
+ DeliveryType y = objArray[i - 1] = new DeliveryType(UnsignedInteger.valueOf(i).intValue());
+ tracker.put(x, y);
+ }
+ }
+
+ protected UnsettledMap<DeliveryType> createMap() {
+ return new UnsettledMap<>(DeliveryType::getDeliveryId);
+ }
+
+ protected UnsettledMap<DeliveryType> createMap(int numBuckets, int bucketSize) {
+ return new UnsettledMap<>(DeliveryType::getDeliveryId, numBuckets, bucketSize);
+ }
+
+ /**
+ * Simple delivery type used for this test
+ */
+ private class DeliveryType {
+
+ private final int deliveryId;
+
+ public DeliveryType(int deliveryid) {
+ this.deliveryId = deliveryid;
+ }
+
+ public int getDeliveryId() {
+ return deliveryId;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof DeliveryType) {
+ DeliveryType otherType = (DeliveryType) other;
+ return otherType.deliveryId == deliveryId;
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "DeliveryType: { " + deliveryId + " }";
+ }
+ }
+
+ @Test
+ public void testCreateUnsettledTracker() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+ assertTrue(tracker.isEmpty());
+ }
+
+ @Test
+ public void testContainsKeyOnEmptyMap() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ assertFalse(tracker.containsKey(0));
+ assertFalse(tracker.containsKey(UnsignedInteger.ZERO));
+ }
+
+ @Test
+ public void testGetWhenEmpty() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ assertNull(tracker.get(0));
+ }
+
+ @Test
+ public void testGet() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(65535, new DeliveryType(65535));
+ tracker.put(-1, new DeliveryType(-1));
+
+ assertEquals(new DeliveryType(0), tracker.get(0));
+ assertEquals(new DeliveryType(1), tracker.get(1));
+ assertEquals(new DeliveryType(2), tracker.get(2));
+ assertEquals(new DeliveryType(65535), tracker.get(65535));
+ assertEquals(new DeliveryType(-1), tracker.get(-1));
+
+ assertNull(tracker.get(3));
+
+ assertEquals(5, tracker.size());
+ }
+
+ @Test
+ public void testGetUnsignedInteger() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(-3, new DeliveryType(-3));
+
+ assertEquals(new DeliveryType(0), tracker.get(UnsignedInteger.valueOf(0)));
+ assertEquals(new DeliveryType(1), tracker.get(UnsignedInteger.valueOf(1)));
+ assertEquals(new DeliveryType(-3), tracker.get(UnsignedInteger.valueOf(-3)));
+
+ assertNull(tracker.get(3));
+
+ assertEquals(3, tracker.size());
+ }
+
+ @Test
+ public void testContainsKey() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(-3, new DeliveryType(-3));
+
+ assertTrue(tracker.containsKey(0));
+ assertFalse(tracker.containsKey(3));
+
+ assertEquals(3, tracker.size());
+ }
+
+ @Test
+ public void testContainsKeyUnsignedInteger() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(UnsignedInteger.MAX_VALUE.intValue(), new DeliveryType(UnsignedInteger.MAX_VALUE.intValue()));
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+
+ assertTrue(tracker.containsKey(0));
+ assertFalse(tracker.containsKey(3));
+
+ assertEquals(3, tracker.size());
+ }
+
+ @Test
+ public void testContainsValue() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(-3, new DeliveryType(-3));
+
+ assertTrue(tracker.containsValue(new DeliveryType(0)));
+ assertFalse(tracker.containsValue(new DeliveryType(4)));
+
+ assertEquals(3, tracker.size());
+ }
+
+ @Test
+ public void testContainsValueOnEmptyMap() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ assertFalse(tracker.containsValue(new DeliveryType(0)));
+ }
+
+ @Test
+ public void testRemoveIsIdempotent() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(2, new DeliveryType(2));
+
+ assertEquals(3, tracker.size());
+
+ assertEquals(new DeliveryType(0), tracker.remove(0));
+ assertEquals(null, tracker.remove(0));
+
+ assertEquals(2, tracker.size());
+
+ assertEquals(new DeliveryType(1), tracker.remove(1));
+ assertEquals(null, tracker.remove(1));
+
+ assertEquals(1, tracker.size());
+
+ assertEquals(new DeliveryType(2), tracker.remove(2));
+ assertEquals(null, tracker.remove(2));
+
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testRemoveValueNotInMap() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(9, new DeliveryType(9));
+ tracker.put(7, new DeliveryType(7));
+ tracker.put(-1, new DeliveryType(-1));
+
+ assertNull(tracker.remove(5));
+ }
+
+ @Test
+ public void testRemoveFirstEntryTwice() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(16, new DeliveryType(16));
+
+ assertNotNull(tracker.remove(0));
+ assertNull(tracker.remove(0));
+ }
+
+ @SuppressWarnings("unlikely-arg-type")
+ @Test
+ public void testRemoveWithInvalidType() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+
+ try {
+ tracker.remove("foo");
+ fail("Should not accept incompatible types");
+ } catch (ClassCastException ccex) {}
+ }
+
+ @Test
+ public void testRemoveUnsignedInteger() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+ tracker.put(7, new DeliveryType(7));
+ tracker.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+ assertEquals(5, tracker.size());
+ assertNull(tracker.remove(UnsignedInteger.valueOf(5)));
+ assertEquals(5, tracker.size());
+ assertEquals(new DeliveryType(9), tracker.remove(UnsignedInteger.valueOf(9)));
+ assertEquals(4, tracker.size());
+ }
+
+ @SuppressWarnings("unlikely-arg-type")
+ @Test
+ public void testRemoveInteger() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+ tracker.put(7, new DeliveryType(7));
+ tracker.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+ assertEquals(5, tracker.size());
+ assertNull(tracker.remove(Integer.valueOf(5)));
+ assertEquals(5, tracker.size());
+ assertEquals(new DeliveryType(9), tracker.remove(Integer.valueOf(9)));
+ assertEquals(4, tracker.size());
+ }
+
+ @Test
+ public void testRemoveEntriesFromMiddleBucket() {
+ // Start with three buckets of size two
+ UnsettledMap<DeliveryType> tracker = createMap(3, 2);
+
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(3, new DeliveryType(3));
+ tracker.put(4, new DeliveryType(4));
+ tracker.put(5, new DeliveryType(5));
+ tracker.put(6, new DeliveryType(6));
+
+ assertEquals(6, tracker.size());
+
+ tracker.remove(3);
+ tracker.remove(4);
+
+ assertEquals(4, tracker.size());
+
+ assertTrue(tracker.containsKey(1));
+ assertTrue(tracker.containsKey(2));
+ assertTrue(tracker.containsKey(5));
+ assertTrue(tracker.containsKey(6));
+
+ assertFalse(tracker.containsKey(3));
+ assertFalse(tracker.containsKey(4));
+
+ tracker.put(7, new DeliveryType(7));
+ tracker.put(8, new DeliveryType(8));
+
+ assertEquals(6, tracker.size());
+ }
+
+ @Test
+ public void testInsert() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(3, new DeliveryType(3));
+ tracker.put(5, new DeliveryType(5));
+ tracker.put(9, new DeliveryType(9));
+ tracker.put(7, new DeliveryType(7));
+ tracker.put(-1, new DeliveryType(-1));
+
+ assertEquals(8, tracker.size());
+ }
+
+ @Test
+ public void testInsertUnsignedInteger() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(UnsignedInteger.valueOf(0), new DeliveryType(0));
+ tracker.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ tracker.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+ tracker.put(UnsignedInteger.valueOf(3), new DeliveryType(3));
+ tracker.put(UnsignedInteger.valueOf(5), new DeliveryType(5));
+ tracker.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+ tracker.put(UnsignedInteger.valueOf(7), new DeliveryType(7));
+ tracker.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+ assertEquals(8, tracker.size());
+ }
+
+ @Test
+ public void testPutAll() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ Map<UnsignedInteger, DeliveryType> hashmap = new TreeMap<>();
+
+ hashmap.put(UnsignedInteger.valueOf(0), new DeliveryType(0));
+ hashmap.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ hashmap.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+ hashmap.put(UnsignedInteger.valueOf(3), new DeliveryType(3));
+ hashmap.put(UnsignedInteger.valueOf(5), new DeliveryType(5));
+ hashmap.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+ hashmap.put(UnsignedInteger.valueOf(7), new DeliveryType(7));
+ hashmap.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+ tracker.putAll(hashmap);
+
+ assertEquals(8, tracker.size());
+
+ assertEquals(new DeliveryType(0), tracker.get(0));
+ assertEquals(new DeliveryType(1), tracker.get(1));
+ assertEquals(new DeliveryType(2), tracker.get(2));
+ assertEquals(new DeliveryType(3), tracker.get(3));
+ assertEquals(new DeliveryType(5), tracker.get(5));
+ assertEquals(new DeliveryType(9), tracker.get(9));
+ assertEquals(new DeliveryType(7), tracker.get(7));
+ assertEquals(new DeliveryType(-1), tracker.get(-1));
+ }
+
+ @Test
+ public void testPutIfAbsent() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(UnsignedInteger.valueOf(0), new DeliveryType(0));
+ tracker.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ tracker.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+ tracker.put(UnsignedInteger.valueOf(3), new DeliveryType(3));
+ tracker.put(UnsignedInteger.valueOf(5), new DeliveryType(5));
+ tracker.put(UnsignedInteger.valueOf(7), new DeliveryType(7));
+ tracker.put(UnsignedInteger.valueOf(9), new DeliveryType(9));
+ tracker.put(UnsignedInteger.valueOf(-1), new DeliveryType(-1));
+
+ assertEquals(8, tracker.size());
+
+ assertEquals(new DeliveryType(0), tracker.get(0));
+ assertEquals(new DeliveryType(1), tracker.get(1));
+ assertEquals(new DeliveryType(2), tracker.get(2));
+ assertEquals(new DeliveryType(3), tracker.get(3));
+ assertEquals(new DeliveryType(5), tracker.get(5));
+ assertEquals(new DeliveryType(7), tracker.get(7));
+ assertEquals(new DeliveryType(9), tracker.get(9));
+ assertEquals(new DeliveryType(-1), tracker.get(-1));
+
+ assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(0), new DeliveryType(0)));
+ assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(1), new DeliveryType(1)));
+ assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(2), new DeliveryType(2)));
+ assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(3), new DeliveryType(3)));
+ assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(5), new DeliveryType(5)));
+ assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(7), new DeliveryType(7)));
+ assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(9), new DeliveryType(9)));
+ assertNotNull(tracker.putIfAbsent(UnsignedInteger.valueOf(-1), new DeliveryType(-1)));
+
+ assertEquals(8, tracker.size());
+
+ assertEquals(new DeliveryType(0), tracker.get(0));
+ assertEquals(new DeliveryType(1), tracker.get(1));
+ assertEquals(new DeliveryType(2), tracker.get(2));
+ assertEquals(new DeliveryType(3), tracker.get(3));
+ assertEquals(new DeliveryType(5), tracker.get(5));
+ assertEquals(new DeliveryType(7), tracker.get(7));
+ assertEquals(new DeliveryType(9), tracker.get(9));
+ assertEquals(new DeliveryType(-1), tracker.get(-1));
+ }
+
+ @Test
+ public void testAddedDeliveriesUpdatesSizeValue() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ DeliveryType delivery1 = new DeliveryType(0);
+ DeliveryType delivery2 = new DeliveryType(1);
+
+ tracker.put(delivery1.getDeliveryId(), delivery1);
+ assertEquals(1, tracker.size());
+
+ tracker.put(delivery2.getDeliveryId(), delivery2);
+ assertEquals(2, tracker.size());
+ }
+
+ @Test
+ public void testAddThenRemoveDelivery() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ DeliveryType delivery1 = new DeliveryType(127);
+ DeliveryType delivery2 = new DeliveryType(32);
+
+ tracker.put(delivery1.getDeliveryId(), delivery1);
+ assertEquals(1, tracker.size());
+ tracker.remove(delivery1.getDeliveryId());
+ assertEquals(0, tracker.size());
+
+ tracker.put(delivery2.getDeliveryId(), delivery2);
+ assertEquals(1, tracker.size());
+ tracker.remove(delivery2.getDeliveryId());
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testAddThenRemoveMultipleDeliveriesInSequence() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ DeliveryType delivery1 = new DeliveryType(Integer.MAX_VALUE);
+ DeliveryType delivery2 = new DeliveryType(-1);
+
+ tracker.put(delivery1.getDeliveryId(), delivery1);
+ tracker.put(delivery2.getDeliveryId(), delivery2);
+
+ assertEquals(2, tracker.size());
+ assertNotNull(tracker.remove(delivery1.getDeliveryId()));
+ assertEquals(1, tracker.size());
+ assertNotNull(tracker.remove(delivery2.getDeliveryId()));
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testAddThenClearMultipleDeliveriesAddedInSequence() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ DeliveryType delivery1 = new DeliveryType(0);
+ DeliveryType delivery2 = new DeliveryType(1);
+ DeliveryType delivery3 = new DeliveryType(2);
+ DeliveryType delivery4 = new DeliveryType(3);
+
+ tracker.put(delivery1.getDeliveryId(), delivery1);
+ tracker.put(delivery2.getDeliveryId(), delivery2);
+ tracker.put(delivery3.getDeliveryId(), delivery3);
+ tracker.put(delivery4.getDeliveryId(), delivery4);
+
+ assertEquals(4, tracker.size());
+ tracker.clear();
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testGetOneDeliveryInBetweenOthersThatWereAdded() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ DeliveryType delivery1 = new DeliveryType(0);
+ DeliveryType delivery2 = new DeliveryType(1);
+ DeliveryType delivery3 = new DeliveryType(2);
+ DeliveryType delivery4 = new DeliveryType(3);
+
+ tracker.put(delivery1.getDeliveryId(), delivery1);
+ tracker.put(delivery2.getDeliveryId(), delivery2);
+ tracker.put(delivery3.getDeliveryId(), delivery3);
+ tracker.put(delivery4.getDeliveryId(), delivery4);
+
+ assertEquals(4, tracker.size());
+ assertEquals(delivery3, tracker.get(delivery3.getDeliveryId()));
+ assertEquals(4, tracker.size());
+ }
+
+ @Test
+ public void testAddLargeSeriesOfDeliveriesAndThenEnumerateOverThemWithGet() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ final int COUNT = 4080;
+
+ for (int i = 0; i < COUNT; ++i) {
+ tracker.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(COUNT, tracker.size());
+
+ for (int i = 0; i < COUNT; ++i) {
+ assertEquals(i, tracker.get(i).getDeliveryId());
+ }
+ }
+
+ @Test
+ public void testAddLargeSeriesOfDeliveriesAndThenIterateOverThemWithValues() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ final int COUNT = 4080;
+
+ for (int i = 0; i < COUNT; ++i) {
+ tracker.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(COUNT, tracker.size());
+
+ Collection<DeliveryType> values = tracker.values();
+
+ int index = 0;
+
+ for (DeliveryType delivery : values) {
+ assertEquals(index++, delivery.getDeliveryId());
+ }
+
+ assertEquals(index, COUNT);
+ }
+
+ @Test
+ public void testRemoveAllViaIteration() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ final int COUNT = 16;
+
+ for (int i = 0; i < COUNT; ++i) {
+ tracker.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(COUNT, tracker.size());
+
+ Collection<DeliveryType> values = tracker.values();
+ assertEquals(COUNT, values.size());
+ Iterator<DeliveryType> iter = values.iterator();
+
+ int index = 0;
+
+ while (iter.hasNext()) {
+ assertEquals(index++, iter.next().getDeliveryId());
+ iter.remove();
+ }
+
+ assertEquals(index, COUNT);
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testAddLargeSeriesOfDeliveriesAndThenRemoveAllViaIteration() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ final int COUNT = 4080;
+
+ for (int i = 0; i < COUNT; ++i) {
+ tracker.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(COUNT, tracker.size());
+
+ Collection<DeliveryType> values = tracker.values();
+ assertEquals(COUNT, values.size());
+ Iterator<DeliveryType> iter = values.iterator();
+
+ int index = 0;
+
+ while (iter.hasNext()) {
+ assertEquals(index++, iter.next().getDeliveryId());
+ iter.remove();
+ }
+
+ assertEquals(index, COUNT);
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testIteratorRemoveInChunks() {
+ UnsettledMap<DeliveryType> tracker = createMap(3, 6);
+ assertEquals(0, tracker.size());
+
+ final int COUNT = 18;
+
+ for (int i = 0; i < COUNT; ++i) {
+ tracker.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(COUNT, tracker.size());
+
+ Collection<DeliveryType> values = tracker.values();
+ assertEquals(COUNT, values.size());
+ Iterator<DeliveryType> iter = values.iterator();
+
+ int index = 0;
+ int count = 0;
+
+ while (iter.hasNext()) {
+ assertEquals(index++, iter.next().getDeliveryId());
+
+ if (count++ < COUNT / 6) {
+ iter.remove();
+ }
+
+ if (count == 6) {
+ count = 0;
+ }
+ }
+
+ assertEquals(index, COUNT);
+ assertEquals(COUNT / 2, tracker.size());
+ }
+
+ @Test
+ public void testRemoveUsingIteratorFromFullMiddleBucket() {
+ UnsettledMap<DeliveryType> tracker = createMap(3, 6);
+ assertEquals(0, tracker.size());
+
+ final int COUNT = 18;
+
+ for (int i = 0; i < COUNT; ++i) {
+ tracker.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(COUNT, tracker.size());
+
+ // Remove enough from front and back buckets that
+ // a drain of the middle should compress the chain
+
+ // Front
+ tracker.remove(0);
+ tracker.remove(1);
+ tracker.remove(2);
+ // Back
+ tracker.remove(15);
+ tracker.remove(16);
+ tracker.remove(17);
+
+ Collection<DeliveryType> values = tracker.values();
+ Iterator<DeliveryType> iter = values.iterator();
+
+ // Skip elements from first section
+ iter.next();
+ iter.next();
+ iter.next();
+
+ for (int i = 6; i < 12; ++i) {
+ assertEquals(i, iter.next().getDeliveryId());
+ iter.remove();
+ }
+
+ assertEquals(6, tracker.size());
+ }
+
+ @Test
+ public void testForEachDeliveryIteratesOverLargeSeriesOfDeliveries() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ final int COUNT = 4080;
+
+ for (int i = 0; i < COUNT; ++i) {
+ tracker.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(COUNT, tracker.size());
+
+ final AtomicInteger index = new AtomicInteger();
+
+ tracker.forEach((delivery) -> index.incrementAndGet());
+
+ assertEquals(index.get(), COUNT);
+ }
+
+ @Test
+ public void testRangedForEachDeliveryIteratesOverSmallSeriesOfDeliveries() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ final int COUNT = 512;
+
+ for (int i = 0; i < COUNT; ++i) {
+ tracker.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(COUNT, tracker.size());
+
+ final AtomicInteger index = new AtomicInteger();
+
+ tracker.forEach(260, 262, (delivery) -> index.incrementAndGet());
+
+ assertEquals(index.get(), 3);
+ }
+
+ @Test
+ public void testRangedForEachDeliveryIteratesSeriesWhenValuesOverflowIntRange() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ assertEquals(0, tracker.size());
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(Integer.MAX_VALUE, new DeliveryType(Integer.MAX_VALUE));
+ tracker.put(Integer.MAX_VALUE + 1, new DeliveryType(Integer.MAX_VALUE + 1));
+ tracker.put(Integer.MAX_VALUE + 2, new DeliveryType(Integer.MAX_VALUE + 2));
+ tracker.put(Integer.MAX_VALUE + 3, new DeliveryType(Integer.MAX_VALUE + 3));
+ tracker.put(Integer.MAX_VALUE + 4, new DeliveryType(Integer.MAX_VALUE + 4));
+
+ final AtomicInteger index = new AtomicInteger();
+
+ tracker.forEach(Integer.MAX_VALUE, Integer.MAX_VALUE + 2, (delivery) -> index.incrementAndGet());
+
+ assertEquals(index.get(), 3);
+ }
+
+ @Test
+ public void testValuesCollection() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(3, new DeliveryType(3));
+
+ Collection<DeliveryType> values = tracker.values();
+ assertNotNull(values);
+ assertEquals(4, values.size());
+ assertFalse(values.isEmpty());
+ assertSame(values, tracker.values());
+ }
+
+ @Test
+ public void testValuesIteration() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] intValues = {0, 1, 2, 3};
+
+ for (int entry : intValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Collection<DeliveryType> values = tracker.values();
+ Iterator<DeliveryType> iterator = values.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ assertEquals(new DeliveryType(intValues[counter++]), iterator.next());
+ }
+
+ // Check that we really did iterate.
+ assertEquals(intValues.length, counter);
+ }
+
+ @Test
+ public void testValuesIterationRemove() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] intValues = {0, 1, 2, 3};
+
+ for (int entry : intValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Collection<DeliveryType> values = tracker.values();
+ Iterator<DeliveryType> iterator = values.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ assertEquals(new DeliveryType(intValues[counter++]), iterator.next());
+ iterator.remove();
+ }
+
+ // Check that we really did iterate.
+ assertEquals(intValues.length, counter);
+ assertTrue(tracker.isEmpty());
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testValuesIterationFollowUnsignedOrderingExpectations() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] inputValues = {3, 0, -1, 1, -2, 2};
+
+ for (int entry : inputValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Collection<DeliveryType> values = tracker.values();
+ Iterator<DeliveryType> iterator = values.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ assertEquals(new DeliveryType(inputValues[counter++]), iterator.next());
+ }
+
+ // Check that we really did iterate.
+ assertEquals(inputValues.length, counter);
+ }
+
+ @Test
+ public void testValuesIterationFailsWhenConcurrentlyModified() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] inputValues = {1, 2, 3, 5, 7, 9, 11};
+
+ for (int entry : inputValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Collection<DeliveryType> values = tracker.values();
+ Iterator<DeliveryType> iterator = values.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ tracker.remove(3);
+
+ try {
+ iterator.next();
+ fail("Should not iterate when modified outside of iterator");
+ } catch (ConcurrentModificationException cme) {}
+ }
+
+ @Test
+ public void testValuesIterationOnEmptyTree() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ Collection<DeliveryType> values = tracker.values();
+ Iterator<DeliveryType> iterator = values.iterator();
+
+ assertFalse(iterator.hasNext());
+ try {
+ iterator.next();
+ fail("Should have thrown a NoSuchElementException");
+ } catch (NoSuchElementException nse) {
+ }
+ }
+
+ @Test
+ public void testKeySetReturned() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(3, new DeliveryType(3));
+
+ Set<UnsignedInteger> keys = tracker.keySet();
+ assertNotNull(keys);
+ assertEquals(4, keys.size());
+ assertFalse(keys.isEmpty());
+ assertSame(keys, tracker.keySet());
+ }
+
+ @Test
+ public void testKeysIterationRemove() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] intValues = {0, 1, 2, 3};
+
+ for (int entry : intValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Collection<UnsignedInteger> keys = tracker.keySet();
+ Iterator<UnsignedInteger> iterator = keys.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ assertEquals(UnsignedInteger.valueOf(intValues[counter++]), iterator.next());
+ }
+
+ // Check that we really did iterate.
+ assertEquals(intValues.length, counter);
+ }
+
+ @Test
+ public void testKeysIterationRemoveContract() {
+ Set<UnsignedInteger> set = tracker.keySet();
+ Iterator<UnsignedInteger> iter = set.iterator();
+ iter.next();
+ iter.remove();
+
+ // No remove allowed again until next is called
+ assertThrows(IllegalStateException.class, () -> iter.remove());
+
+ iter.next();
+ iter.remove();
+
+ assertEquals(998, tracker.size());
+
+ iter.next();
+ assertNotNull(tracker.remove(999));
+
+ assertThrows(ConcurrentModificationException.class, () -> iter.remove());
+ }
+
+ @Test
+ public void testKeysIteration() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] intValues = {0, 1, 2, 3};
+
+ for (int entry : intValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Collection<UnsignedInteger> keys = tracker.keySet();
+ Iterator<UnsignedInteger> iterator = keys.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ assertEquals(UnsignedInteger.valueOf(intValues[counter++]), iterator.next());
+ iterator.remove();
+ }
+
+ // Check that we really did iterate.
+ assertEquals(intValues.length, counter);
+ assertTrue(tracker.isEmpty());
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testKeysIterationFollowsUnsignedOrderingExpectations() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] inputValues = {3, 0, -1, 1, -2, 2};
+
+ for (int entry : inputValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Collection<UnsignedInteger> keys = tracker.keySet();
+ Iterator<UnsignedInteger> iterator = keys.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ assertEquals(UnsignedInteger.valueOf(inputValues[counter++]), iterator.next());
+ }
+
+ // Check that we really did iterate.
+ assertEquals(inputValues.length, counter);
+ }
+
+ @Test
+ public void testKeysIterationFailsWhenConcurrentlyModified() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] inputValues = {1, 3, 5, 7, 9, 11, 13};
+
+ for (int entry : inputValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Collection<UnsignedInteger> keys = tracker.keySet();
+ Iterator<UnsignedInteger> iterator = keys.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ tracker.remove(3);
+
+ try {
+ iterator.next();
+ fail("Should not iterate when modified outside of iterator");
+ } catch (ConcurrentModificationException cme) {}
+ }
+
+ @Test
+ public void testKeysIterationOnEmptyTree() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ Collection<UnsignedInteger> keys = tracker.keySet();
+ Iterator<UnsignedInteger> iterator = keys.iterator();
+
+ assertFalse(iterator.hasNext());
+ try {
+ iterator.next();
+ fail("Should have thrown a NoSuchElementException");
+ } catch (NoSuchElementException nse) {
+ }
+ }
+
+ @Test
+ public void testKeySetRemoveAllFromCollection() {
+ final Collection<UnsignedInteger> collection = Arrays.asList(uintArray);
+
+ assertTrue(tracker.keySet().removeAll(collection));
+ assertEquals(0, tracker.size());
+ assertFalse(tracker.keySet().iterator().hasNext());
+
+ // Second attempt should do nothing.
+ assertFalse(tracker.keySet().removeAll(collection));
+ }
+
+ @Test
+ public void testKeySetRetainAllFromCollectionAtZero() {
+ doTestKeySetRetainAllFromCollection(0);
+ }
+
+ @Test
+ public void testKeySetRetainAllFromCollectionAtOne() {
+ doTestKeySetRetainAllFromCollection(1);
+ }
+
+ @Test
+ public void testKeySetRetainAllFromCollectionAtTwoHundered() {
+ doTestKeySetRetainAllFromCollection(200);
+ }
+
+ @Test
+ public void testKeySetRetainAllFromCollectionAtFiveHundred() {
+ doTestKeySetRetainAllFromCollection(500);
+ }
+
+ private void doTestKeySetRetainAllFromCollection(int index) {
+ final Collection<UnsignedInteger> collection = new ArrayList<>();
+ collection.add(uintArray[index]);
+
+ assertEquals(1000, tracker.size());
+
+ final Set<UnsignedInteger> keys = tracker.keySet();
+
+ keys.retainAll(collection);
+ assertEquals(1, tracker.size());
+ keys.removeAll(collection);
+ assertEquals(0, tracker.size());
+ tracker.put(1, new DeliveryType(1));
+ assertEquals(1, tracker.size());
+ keys.clear();
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void TestKeySetRetainAllFromCollectionWhenMapHasCustomBucketsAndRetainedIsInFirstBucket() {
+ // Start with three buckets of size three
+ UnsettledMap<DeliveryType> tracker = createMap(3, 3);
+
+ tracker.put(1, new DeliveryType(1)); // First
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(3, new DeliveryType(3));
+ tracker.put(4, new DeliveryType(4)); // Second
+ tracker.put(5, new DeliveryType(5));
+ tracker.put(6, new DeliveryType(6));
+ tracker.put(7, new DeliveryType(7)); // Third
+ tracker.put(8, new DeliveryType(8));
+ tracker.put(9, new DeliveryType(9));
+
+ assertEquals(9, tracker.size());
+
+ final Collection<UnsignedInteger> collection = new ArrayList<>();
+ collection.add(UnsignedInteger.valueOf(1)); // Retain element from bucket one
+
+ final Set<UnsignedInteger> keys = tracker.keySet();
+
+ keys.retainAll(collection);
+ assertEquals(1, tracker.size());
+ keys.removeAll(collection);
+ assertEquals(0, tracker.size());
+ tracker.put(1, new DeliveryType(1));
+ assertEquals(1, tracker.size());
+ keys.clear();
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void TestKeySetRetainAllFromCollectionWhenMapHasCustomBucketsAndRetainedIsInLastBucket() {
+ // Start with three buckets of size three
+ UnsettledMap<DeliveryType> tracker = createMap(3, 3);
+
+ tracker.put(1, new DeliveryType(1)); // First
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(3, new DeliveryType(3));
+ tracker.put(4, new DeliveryType(4)); // Second
+ tracker.put(5, new DeliveryType(5));
+ tracker.put(6, new DeliveryType(6));
+ tracker.put(7, new DeliveryType(7)); // Third
+ tracker.put(8, new DeliveryType(8));
+ tracker.put(9, new DeliveryType(9));
+
+ assertEquals(9, tracker.size());
+
+ final Collection<UnsignedInteger> collection = new ArrayList<>();
+ collection.add(UnsignedInteger.valueOf(7)); // Retain element from bucket three
+
+ final Set<UnsignedInteger> keys = tracker.keySet();
+
+ keys.retainAll(collection);
+ assertEquals(1, tracker.size());
+ keys.removeAll(collection);
+ assertEquals(0, tracker.size());
+ tracker.put(1, new DeliveryType(1));
+ assertEquals(1, tracker.size());
+ keys.clear();
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void tesEntrySetReturned() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(3, new DeliveryType(3));
+
+ Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+ assertNotNull(entries);
+ assertEquals(4, entries.size());
+ assertFalse(entries.isEmpty());
+ assertSame(entries, tracker.entrySet());
+ }
+
+ @Test
+ public void tesEntrySetContains() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ tracker.put(0, new DeliveryType(0));
+ tracker.put(1, new DeliveryType(1));
+ tracker.put(2, new DeliveryType(2));
+ tracker.put(3, new DeliveryType(3));
+
+ Set<Entry<UnsignedInteger, DeliveryType>> entries = tracker.entrySet();
+ assertNotNull(entries);
+ assertEquals(4, entries.size());
+ assertFalse(entries.isEmpty());
+ assertSame(entries, tracker.entrySet());
+
+ OutsideEntry<UnsignedInteger, DeliveryType> entry1 = new OutsideEntry<>(UnsignedInteger.valueOf(0), new DeliveryType(0));
+ OutsideEntry<UnsignedInteger, DeliveryType> entry2 = new OutsideEntry<>(UnsignedInteger.valueOf(7), new DeliveryType(7));
+
+ assertTrue(entries.contains(entry1));
+ assertFalse(entries.contains(entry2));
+ }
+
+ @Test
+ public void testEntryIteration() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] intValues = {0, 1, 2, 3};
+
+ for (int entry : intValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+ Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ Entry<UnsignedInteger, DeliveryType> entry = iterator.next();
+ assertNotNull(entry);
+ assertEquals(UnsignedInteger.valueOf(intValues[counter]), entry.getKey());
+ assertEquals(new DeliveryType(intValues[counter++]), entry.getValue());
+ }
+
+ // Check that we really did iterate.
+ assertEquals(intValues.length, counter);
+ }
+
+ @Test
+ public void testEntryIterationRemove() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] intValues = {0, 1, 2, 3};
+
+ for (int entry : intValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+ Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ Entry<UnsignedInteger, DeliveryType> entry = iterator.next();
+ assertNotNull(entry);
+ assertEquals(UnsignedInteger.valueOf(intValues[counter]), entry.getKey());
+ assertEquals(new DeliveryType(intValues[counter++]), entry.getValue());
+ iterator.remove();
+ }
+
+ // Check that we really did iterate.
+ assertEquals(intValues.length, counter);
+ assertTrue(tracker.isEmpty());
+ assertEquals(0, tracker.size());
+ }
+
+ @Test
+ public void testEntryIterationFollowsInterstionOrderingExpectations() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] inputValues = {3, 0, -1, 1, -2, 2};
+
+ for (int entry : inputValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+ Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ int counter = 0;
+ while (iterator.hasNext()) {
+ Entry<UnsignedInteger, DeliveryType> entry = iterator.next();
+ assertNotNull(entry);
+ assertEquals(UnsignedInteger.valueOf(inputValues[counter]), entry.getKey());
+ assertEquals(new DeliveryType(inputValues[counter++]), entry.getValue());
+ }
+
+ // Check that we really did iterate.
+ assertEquals(inputValues.length, counter);
+ }
+
+ @Test
+ public void testEntryIterationFailsWhenConcurrentlyModified() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] inputValues = {2, 3, 5, 9, 12, 42};
+
+ for (int entry : inputValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+ Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+ assertNotNull(iterator);
+ assertTrue(iterator.hasNext());
+
+ tracker.remove(3);
+
+ try {
+ iterator.next();
+ fail("Should not iterate when modified outside of iterator");
+ } catch (ConcurrentModificationException cme) {}
+ }
+
+ @Test
+ public void testEntrySetIterationOnEmptyTree() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+ Set<Entry<UnsignedInteger, DeliveryType>> entries= tracker.entrySet();
+ Iterator<Entry<UnsignedInteger, DeliveryType>> iterator = entries.iterator();
+
+ assertFalse(iterator.hasNext());
+ try {
+ iterator.next();
+ fail("Should have thrown a NoSuchElementException");
+ } catch (NoSuchElementException nse) {
+ }
+ }
+
+ @Test
+ public void testForEachEntry() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] inputValues = {3, 0, -1, 1, -2, 2};
+
+ for (int entry : inputValues) {
+ tracker.put(entry, new DeliveryType(entry));
+ }
+
+ final SequenceNumber index = new SequenceNumber(0);
+ tracker.forEach((value) -> {
+ int i = index.getAndIncrement().intValue();
+ assertEquals(new DeliveryType(inputValues[i]), value);
+ });
+
+ assertEquals(index.intValue(), inputValues.length);
+ }
+
+ @Test
+ public void testRandomProduceAndConsumeWithBacklog() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int ITERATIONS = 8192;
+
+ try {
+ for (int i = 0; i < ITERATIONS; ++i) {
+ tracker.put(UnsignedInteger.valueOf(i), new DeliveryType(i));
+ }
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ int p = random.nextInt(ITERATIONS);
+ int c = random.nextInt(ITERATIONS);
+
+ tracker.put(UnsignedInteger.valueOf(p), new DeliveryType(p));
+ tracker.remove(UnsignedInteger.valueOf(c));
+ }
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+
+ @Test
+ public void testRandomPutAndGetIntoEmptyMap() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int ITERATIONS = 8192;
+
+ try {
+ for (int i = 0; i < ITERATIONS; ++i) {
+ int p = random.nextInt(ITERATIONS);
+ int c = random.nextInt(ITERATIONS);
+
+ tracker.put(UnsignedInteger.valueOf(p), new DeliveryType(p));
+ tracker.remove(UnsignedInteger.valueOf(c));
+ }
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+
+ @Test
+ public void testRandomPutAndGetIntoEmptyMapWithCustomBucketSize() {
+ UnsettledMap<DeliveryType> tracker = createMap(2, 4);
+
+ final int ITERATIONS = 8192;
+
+ try {
+ for (int i = 0; i < ITERATIONS; ++i) {
+ int p = random.nextInt(ITERATIONS);
+ int c = random.nextInt(ITERATIONS);
+
+ tracker.put(UnsignedInteger.valueOf(p), new DeliveryType(p));
+ tracker.remove(UnsignedInteger.valueOf(c));
+ }
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+
+ @Test
+ public void testPutEntriesWithDuplicateIdsIntoMapThenRemoveInSameOrder() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] ids = new int[] { 4, 7, 5, 0, 7, 7, 0 };
+
+ for(int id : ids) {
+ tracker.put(id, new DeliveryType(id));
+ }
+
+ for (int id : ids) {
+ assertEquals(new DeliveryType(id), tracker.get(id));
+ }
+
+ for (int id : ids) {
+ assertEquals(new DeliveryType(id), tracker.remove(id));
+ }
+
+ assertTrue(tracker.isEmpty());
+ }
+
+ @Test
+ public void testPutThatBreaksOrderLeavesMapUsable() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int[] ids = new int[] { 1, 2, 0 };
+
+ for(int id : ids) {
+ tracker.put(id, new DeliveryType(id));
+ }
+
+ for (int id : ids) {
+ assertEquals(new DeliveryType(id), tracker.get(id));
+ }
+
+ for (int id : ids) {
+ assertEquals(new DeliveryType(id), tracker.remove(id));
+ }
+
+ assertTrue(tracker.isEmpty());
+ }
+
+ @Test
+ public void testPutInSeriesAndRemoveAllValuesRandomly() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ List<UnsignedInteger> values = new ArrayList<>();
+ List<UnsignedInteger> removes = new ArrayList<>();
+
+ final int ITERATIONS = 8192;
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ values.add(UnsignedInteger.valueOf(i));
+ }
+
+ removes.addAll(values);
+ Collections.shuffle(removes, random);
+
+ try {
+ for (UnsignedInteger id : values) {
+ tracker.put(id, new DeliveryType(id.intValue()));
+ }
+
+ assertEquals(ITERATIONS, tracker.size());
+
+ for (UnsignedInteger id : values) {
+ assertEquals(new DeliveryType(id.intValue()), tracker.get(id));
+ }
+
+ for (UnsignedInteger id : removes) {
+ assertEquals(new DeliveryType(id.intValue()), tracker.remove(id));
+ }
+
+ assertTrue(tracker.isEmpty());
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+
+ @Test
+ public void testPutInRandomOrderAndRemoveAllValuesInSeries() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ List<UnsignedInteger> values = new ArrayList<>();
+ List<UnsignedInteger> removes = new ArrayList<>();
+
+ final int ITERATIONS = 8192;
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ values.add(UnsignedInteger.valueOf(i));
+ }
+
+ removes.addAll(values);
+ Collections.shuffle(values, random);
+
+ try {
+ for (UnsignedInteger id : values) {
+ tracker.put(id, new DeliveryType(id.intValue()));
+ }
+
+ assertEquals(ITERATIONS, tracker.size());
+
+ for (UnsignedInteger id : values) {
+ assertEquals(new DeliveryType(id.intValue()), tracker.get(id));
+ }
+
+ for (UnsignedInteger id : removes) {
+ assertEquals(new DeliveryType(id.intValue()), tracker.remove(id));
+ }
+
+ assertTrue(tracker.isEmpty());
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+
+ @Test
+ public void testPutInRandomOrderAndRemoveAllValuesInRandomOrder() {
+ UnsettledMap<DeliveryType> tracker = createMap();
+
+ List<UnsignedInteger> values = new ArrayList<>();
+ List<UnsignedInteger> removes = new ArrayList<>();
+
+ final int ITERATIONS = 8192;
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ values.add(UnsignedInteger.valueOf(i));
+ }
+
+ removes.addAll(values);
+ Collections.shuffle(values, random);
+ Collections.shuffle(removes, random);
+
+ try {
+ for (UnsignedInteger id : values) {
+ tracker.put(id, new DeliveryType(id.intValue()));
+ }
+
+ assertEquals(ITERATIONS, tracker.size());
+
+ for (UnsignedInteger id : values) {
+ assertEquals(new DeliveryType(id.intValue()), tracker.get(id));
+ }
+
+ for (UnsignedInteger id : removes) {
+ assertEquals(new DeliveryType(id.intValue()), tracker.remove(id));
+ }
+
+ assertTrue(tracker.isEmpty());
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+
+ @Test
+ public void testPutRandomValueIntoMapThenRemoveInSameOrder() {
+ final UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int ITERATIONS = 8192;
+
+ try {
+ for (int i = 0; i < ITERATIONS; ++i) {
+ final int index = random.nextInt(ITERATIONS);
+ tracker.put(index, new DeliveryType(index));
+ }
+
+ // Reset to verify insertions
+ random.setSeed(seed);
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ final int index = random.nextInt(ITERATIONS);
+ assertEquals(new DeliveryType(index), tracker.get(index));
+ }
+
+ // Reset to remove
+ random.setSeed(seed);
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ final int index = random.nextInt(ITERATIONS);
+ assertEquals(new DeliveryType(index), tracker.remove(index));
+ }
+
+ assertTrue(tracker.isEmpty());
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+
+ @Test
+ public void testPutInSeriesAndClear() {
+ final UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int LOOPS = 16;
+ final int ITERATIONS = 8192;
+
+ int putDeliveryId = 0;
+ int getDeliveryId = 0;
+
+ for (int loop = 0; loop < LOOPS; loop++) {
+ try {
+ for (int i = 0; i < ITERATIONS; ++i, putDeliveryId++) {
+ tracker.put(putDeliveryId, new DeliveryType(putDeliveryId));
+ }
+
+ for (int i = 0; i < ITERATIONS; ++i, getDeliveryId++) {
+ assertEquals(new DeliveryType(getDeliveryId), tracker.get(getDeliveryId));
+ }
+
+ tracker.clear();
+
+ assertTrue(tracker.isEmpty());
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+ }
+
+ @Test
+ public void testPutInSeriesAndRemoveInSeries() {
+ final UnsettledMap<DeliveryType> tracker = createMap();
+
+ final int LOOPS = 16;
+ final int ITERATIONS = 8192;
+
+ int putDeliveryId = 0;
+ int getDeliveryId = 0;
+ int removeDeliveryId = 0;
+
+ for (int loop = 0; loop < LOOPS; loop++) {
+ try {
+ for (int i = 0; i < ITERATIONS; ++i, putDeliveryId++) {
+ tracker.put(putDeliveryId, new DeliveryType(putDeliveryId));
+ }
+
+ for (int i = 0; i < ITERATIONS; ++i, getDeliveryId++) {
+ assertEquals(new DeliveryType(getDeliveryId), tracker.get(getDeliveryId));
+ }
+
+ for (int i = 0; i < ITERATIONS; ++i, removeDeliveryId++) {
+ assertEquals(new DeliveryType(removeDeliveryId), tracker.remove(removeDeliveryId));
+ }
+
+ assertTrue(tracker.isEmpty());
+ } catch (Throwable error) {
+ dumpRandomDataSet(ITERATIONS, true);
+ throw error;
+ }
+ }
+ }
+
+ @Test
+ public void testEqualsJDKMapTypes() {
+ Map<UnsignedInteger, DeliveryType> m1 = createMap();
+ Map<UnsignedInteger, DeliveryType> m2 = createMap();
+
+ m1.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ m1.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+ m2.put(UnsignedInteger.valueOf(3), new DeliveryType(3));
+ m2.put(UnsignedInteger.valueOf(4), new DeliveryType(4));
+
+ assertNotEquals(m1, m2, "Maps should not be equal 1");
+ assertNotEquals(m2, m1, "Maps should not be equal 2");
+
+ // comparing UnsettledMap3 with HashMap with equal values
+ m1 = createMap();
+ m2 = new HashMap<>();
+ m1.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ m2.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+ assertNotEquals(m1, m2, "Maps should not be equal 3");
+ assertNotEquals(m2, m1, "Maps should not be equal 4");
+
+ // comparing UnsettledMap3 with differing objects inside values
+ m1 = createMap();
+ m2 = createMap();
+ m1.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ m2.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+ assertNotEquals(m1, m2, "Maps should not be equal 5");
+ assertNotEquals(m2, m1, "Maps should not be equal 6");
+
+ // comparing UnsettledMap3 with same objects inside values
+ m1 = createMap();
+ m2 = createMap();
+ m1.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ m2.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ assertTrue(m1.equals(m2), "Maps should be equal 7");
+ assertTrue(m2.equals(m1), "Maps should be equal 7");
+ }
+
+ @Test
+ public void testEntrySetContains() {
+ UnsettledMap<DeliveryType> first = createMap();
+ UnsettledMap<DeliveryType> second = createMap();
+
+ first.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ Object[] entry = first.entrySet().toArray();
+ assertFalse(second.entrySet().contains(entry[0]),
+ "Empty map should not contain anything from first map");
+
+ second.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ assertTrue(second.entrySet().containsAll(first.entrySet()),
+ "entrySet().containsAll(...) should work with values");
+
+ first.clear();
+ first.put(UnsignedInteger.valueOf(1), new DeliveryType(1));
+ entry = first.entrySet().toArray();
+ assertTrue(second.entrySet().contains(entry[0]),
+ "new valued entry with same delivery ID should equal old valued entry");
+ first.put(UnsignedInteger.valueOf(2), new DeliveryType(2));
+ entry = first.entrySet().toArray();
+ assertFalse(second.entrySet().contains(entry[1]),
+ "additional value in first should not match any in second");
+ }
+
+ @Test
+ public void testValues() {
+ Collection<DeliveryType> vals = tracker.values();
+ vals.iterator();
+ assertEquals(vals.size(), objArray.length, "Returned collection of incorrect size");
+ for (DeliveryType element : objArray) {
+ assertTrue(vals.contains(element), "Collection contains incorrect elements");
+ }
+
+ assertEquals(1000, vals.size());
+ int j = 0;
+ for (Iterator<DeliveryType> iter = vals.iterator(); iter.hasNext(); j++) {
+ DeliveryType element = iter.next();
+ assertNotNull(element);
+ }
+ assertEquals(1000, j);
+
+ UnsettledMap<DeliveryType> myMap = new UnsettledMap<DeliveryType>(DeliveryType::getDeliveryId);
+ for (int i = 0; i < 100; i++) {
+ myMap.put(uintArray[i], objArray[i]);
+ }
+ Collection<DeliveryType> values = myMap.values();
+ assertEquals(100, values.size());
+ assertTrue(values.remove(new DeliveryType(1)));
+ assertTrue(!myMap.containsKey(UnsignedInteger.ONE), "Removing from the values collection should remove from the original map");
+ assertTrue(!myMap.containsValue(new DeliveryType(1)), "Removing from the values collection should remove from the original map");
+ assertEquals(99, values.size());
+ j = 0;
+ for (Iterator<DeliveryType> iter = values.iterator(); iter.hasNext(); j++) {
+ iter.next();
+ }
+ assertEquals(99, j);
+ }
+
+ @Test
+ public void testRemoveValueUsingValuesIteratorAndCheckAllOtherValuesRemain() {
+ Iterator<DeliveryType> iterator = tracker.values().iterator();
+
+ DeliveryType removed = null;
+
+ for (int i = 0; i < 10; ++i) {
+ removed = iterator.next();
+ }
+
+ iterator.remove();
+
+ for (UnsignedInteger id : uintArray) {
+ if (id.intValue() != removed.getDeliveryId()) {
+ assertTrue(tracker.containsKey(id.intValue()));
+ } else {
+ assertFalse(tracker.containsKey(id.intValue()));
+ }
+ }
+ }
+
+ @Test
+ public void testRemoveRangeRemovesNoValues() {
+ final int afterLast = uintArray.length + 1; // Entries are one based
+
+ final AtomicBoolean removed = new AtomicBoolean();
+
+ tracker.removeEach(afterLast, afterLast + 10, (delivery) -> removed.set(true));
+
+ assertFalse(removed.get());
+ }
+
+ @Test
+ public void testRemoveRangeRemovesLastValue() {
+ final int lastEntry = uintArray.length; // Entries are one based
+
+ final AtomicInteger removed = new AtomicInteger();
+
+ tracker.removeEach(lastEntry, lastEntry, (delivery) -> removed.incrementAndGet());
+
+ assertEquals(1, removed.get());
+ }
+
+ @Test
+ public void testRemoveRangeRemovesLastValueAndRangeOutsideOfActualEntries() {
+ final int lastEntry = uintArray.length; // Entries are one based
+
+ final AtomicInteger removed = new AtomicInteger();
+
+ tracker.removeEach(lastEntry, lastEntry + 10, (delivery) -> removed.incrementAndGet());
+
+ assertEquals(1, removed.get());
+ }
+
+ @Test
+ public void testRemoveAllEntriesFromFirstBucket() {
+ doTestRemoveEach(0, 15);
+ }
+
+ @Test
+ public void testRemoveAllEntriesFromMiddleBucket() {
+ doTestRemoveEach(16, 31);
+ }
+
+ @Test
+ public void testRemoveAllEntriesFromEndBucket() {
+ doTestRemoveEach(32, 47);
+ }
+
+ @Test
+ public void testRemoveEntriesSpanningThreeBuckets() {
+ doTestRemoveEach(8, 39);
+ }
+
+ @Test
+ public void testRemoveAllEntriesWithClosedRange() {
+ doTestRemoveEach(0, 47);
+ }
+
+ @Test
+ public void testRemoveAllEntriesWithOpenRange() {
+ doTestRemoveEach(0, 64);
+ }
+
+ public void doTestRemoveEach(int start, int end) {
+ final int numBuckets = 3;
+ final int bucketSize = 16;
+ final int numEntries = numBuckets * bucketSize;
+ final int numRemoved = Math.min(end - start + 1, numEntries);
+
+ UnsettledMap<DeliveryType> map = createMap(numBuckets, bucketSize);
+
+ for (int i = 0; i < numEntries; ++i) {
+ map.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(numEntries, map.size());
+
+ final AtomicInteger removed = new AtomicInteger();
+
+ map.removeEach(start, end, (delivery) -> removed.incrementAndGet());
+
+ assertEquals(numRemoved, removed.get());
+ assertEquals(numEntries - numRemoved, map.size());
+ }
+
+ @Test
+ public void testRemoveAllEntriesInSmallChunks() {
+ final AtomicInteger removed = new AtomicInteger();
+
+ for (int i = 0; i < uintArray.length; i += 2) {
+ tracker.removeEach(uintArray[i].intValue(), uintArray[i+1].intValue(), (delivery) -> removed.incrementAndGet());
+ }
+
+ assertEquals(uintArray.length, removed.get());
+ }
+
+ protected void dumpRandomDataSet(int iterations, boolean bounded) {
+ final int[] dataSet = new int[iterations];
+
+ random.setSeed(seed);
+
+ for (int i = 0; i < iterations; ++i) {
+ if (bounded) {
+ dataSet[i] = random.nextInt(iterations);
+ } else {
+ dataSet[i] = random.nextInt();
+ }
+ }
+
+ LOG.info("Random seed was: {}" , seed);
+ LOG.info("Entries in data set: {}", dataSet);
+ }
+
+ protected static class OutsideEntry<K, V> implements Map.Entry<K, V> {
+
+ private final K key;
+ private V value;
+
+ public OutsideEntry(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+ }
+}