VYSPER-330: replace PriorityQueue (where iteration is not ordered) with ArrayList and handle sorting on addition. add unit tests for RequestsWindow
git-svn-id: https://svn.apache.org/repos/asf/mina/vysper/trunk@1401208 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java b/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java
index ea8249a..44d15c0 100644
--- a/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java
+++ b/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java
@@ -22,9 +22,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.List;
/**
*/
@@ -47,8 +48,16 @@
*/
private long currentProcessingRequest = -1;
-
- protected final Queue<BoshRequest> queue = new PriorityQueue<BoshRequest>();
+ /**
+ * queue holding all BoshRequests, sorted ascendingly by RID.
+ *
+ * NOTE: the list does not enforce the order itself, it must be sorted when adding a new request!
+ * but the list always iterates ordered over Requests (which is the advantage over PriorityQueue, which
+ * needs additional object creation and CPU cycles on iteration time for ordered processing).
+ * since this queue normally only holds 0, 1, 2 or only a handful of elements, and normally new additions
+ * are in-order, sorting is cheap.
+ */
+ protected final List<BoshRequest> queue = new ArrayList<BoshRequest>();
protected String sessionId;
@@ -83,9 +92,10 @@
LOGGER.warn("SID = " + sessionId + " - " + "queueing duplicated rid in requests window: " + rid);
}
queue.add(br);
+ Collections.sort(queue);
latestAddionTimestamp = System.currentTimeMillis();
- if (highestContinuousRid < 0) {
+ if (highestContinuousRid < 0 || queue.size() == 1) {
highestContinuousRid = rid;
}
while (containsRid(highestContinuousRid + 1)) {
@@ -102,8 +112,8 @@
}
public synchronized Long firstRid() {
- final BoshRequest first = queue.peek();
- if (first == null) return null;
+ if (queue.isEmpty()) return null;
+ final BoshRequest first = queue.get(0);
return first.getRid();
}
@@ -116,18 +126,39 @@
if (queue.isEmpty()) return null;
String ridSeq = logRequestWindow();
-
- currentProcessingRequest = Math.max(currentProcessingRequest, queue.peek().getRid());
+
+ final Long lowestRid = queue.get(0).getRid();
+ currentProcessingRequest = Math.max(currentProcessingRequest, lowestRid);
+ if (currentProcessingRequest > highestContinuousRid) {
+ reinitHCRFromQueue();
+ }
+ // still a problem?
if (currentProcessingRequest > highestContinuousRid) {
LOGGER.debug("SID = " + sessionId + " - using RID = NULL, not current = " + currentProcessingRequest + " " + ridSeq);
return null;
}
- final BoshRequest nextRequest = queue.poll();
+ final BoshRequest nextRequest = queue.remove(0);
LOGGER.debug("SID = " + sessionId + " - using RID = " + (nextRequest == null ? "NULL" : Long.toString(currentProcessingRequest)) + " " + ridSeq);
return nextRequest;
}
-
+
+ private synchronized void reinitHCRFromQueue() {
+ highestContinuousRid = -1;
+ if (queue.isEmpty()) return;
+
+ final Iterator<BoshRequest> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ BoshRequest boshRequest = iterator.next();
+ final Long rid = boshRequest.getRid();
+ if (highestContinuousRid == -1) {
+ highestContinuousRid = rid;
+ } else if (highestContinuousRid + 1 == rid) {
+ highestContinuousRid = rid;
+ }
+ }
+ }
+
public synchronized boolean containsRid(Long rid) {
for (BoshRequest item : queue) {
if (rid.equals(item.getRid())) return true;
diff --git a/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindowTest.java b/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindowTest.java
index 1e60be6..329ca55 100644
--- a/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindowTest.java
+++ b/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindowTest.java
@@ -37,6 +37,17 @@
queueNewRequest(2L); // 2 after 3
assertHighestContinuous(3); // HCR is correct
+
+ assertRID(1L, requestsWindow.pollNext());
+ assertRID(2L, requestsWindow.pollNext());
+ assertRID(3L, requestsWindow.pollNext());
+ }
+
+ public void testAllowDuplicateRIDs() {
+ requestsWindow = new RequestsWindow("1");
+ queueNewRequest(100L);
+ queueNewRequest(100L); // another 100
+ assertEquals(2, requestsWindow.size());
}
public void testRequestGap() {
@@ -49,9 +60,47 @@
// now, 3 is highest continous, 4 is missing, 5 is highest: [3 HCR, GAP, 5]
- final BoshRequest boshRequest = requestsWindow.pollNext();
- assertHighestContinuous(3); // BUG! should go to 5
- System.out.println("reqWin = " + requestsWindow.logRequestWindow()); // HCR is not in line
+ // now, fetch 3
+ final BoshRequest boshRequest3 = requestsWindow.pollNext();
+ assertRID(3L, boshRequest3);
+ final BoshRequest boshRequest5 = requestsWindow.pollNext();
+ assertHighestContinuous(5);
+ System.out.println("reqWin = " + requestsWindow.logRequestWindow());
+ }
+
+ public void testRequest2Gaps() {
+ requestsWindow = new RequestsWindow("1");
+ queueNewRequest(3L);
+ assertHighestContinuous(3);
+
+ queueNewRequest(5L);
+ queueNewRequest(6L);
+ assertHighestContinuous(3);
+
+ queueNewRequest(8L);
+ queueNewRequest(9L);
+ queueNewRequest(10L);
+ assertHighestContinuous(3);
+
+ // now we have request [3, 5, 6, 8, 9, 10]
+
+ // now, fetch 3
+ final BoshRequest boshRequest3 = requestsWindow.pollNext();
+ assertRID(3L, boshRequest3);
+
+ assertRID(5L, requestsWindow.pollNext());
+ assertRID(6L, requestsWindow.pollNext());
+ assertHighestContinuous(6);
+
+ assertRID(8L, requestsWindow.pollNext());
+ assertRID(9L, requestsWindow.pollNext());
+ assertRID(10L, requestsWindow.pollNext());
+ assertHighestContinuous(10);
+
+ }
+
+ private void assertRID(final long expectedRid, BoshRequest boshRequest) {
+ assertEquals(expectedRid, (long)boshRequest.getRid());
}
private void assertHighestContinuous(final int expected) {