VYSPER-330: replace PriorityQueue (where iteration is not ordered) with ArrayList and handle sorting on addition. add unit tests for RequestsWindow

git-svn-id: https://svn.apache.org/repos/asf/mina/vysper/trunk@1401208 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java b/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java
index ea8249a..44d15c0 100644
--- a/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java
+++ b/server/extensions/xep0124-xep0206-bosh/src/main/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindow.java
@@ -22,9 +22,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.List;
 
 /**
  */
@@ -47,8 +48,16 @@
      */
     private long currentProcessingRequest = -1;
 
-    
-    protected final Queue<BoshRequest> queue = new PriorityQueue<BoshRequest>();
+    /**
+     * queue holding all BoshRequests, sorted ascendingly by RID.
+     * 
+     * NOTE: the list does not enforce the order itself, it must be sorted when adding a new request!
+     * but the list always iterates ordered over Requests (which is the advantage over PriorityQueue, which 
+     * needs additional object creation and CPU cycles on iteration time for ordered processing).
+     * since this queue normally only holds 0, 1, 2 or only a handful of elements, and normally new additions 
+     * are in-order, sorting is cheap.
+     */
+    protected final List<BoshRequest> queue = new ArrayList<BoshRequest>();
 
     protected String sessionId;
     
@@ -83,9 +92,10 @@
             LOGGER.warn("SID = " + sessionId + " - " + "queueing duplicated rid in requests window: " + rid);
         }
         queue.add(br);
+        Collections.sort(queue);
         latestAddionTimestamp = System.currentTimeMillis();
 
-        if (highestContinuousRid < 0) {
+        if (highestContinuousRid < 0 || queue.size() == 1) {
             highestContinuousRid = rid;
         }
         while (containsRid(highestContinuousRid + 1)) {
@@ -102,8 +112,8 @@
     }
     
     public synchronized Long firstRid() {
-        final BoshRequest first = queue.peek();
-        if (first == null) return null;
+        if (queue.isEmpty()) return null;
+        final BoshRequest first = queue.get(0);
         return first.getRid();
     }
 
@@ -116,18 +126,39 @@
         if (queue.isEmpty()) return null;
         
         String ridSeq = logRequestWindow();
-    
-        currentProcessingRequest = Math.max(currentProcessingRequest, queue.peek().getRid());
+
+        final Long lowestRid = queue.get(0).getRid();
+        currentProcessingRequest = Math.max(currentProcessingRequest, lowestRid);
+        if (currentProcessingRequest > highestContinuousRid) {
+            reinitHCRFromQueue();
+        }
+        // still a problem?
         if (currentProcessingRequest > highestContinuousRid) {
             LOGGER.debug("SID = " + sessionId + " -  using RID = NULL, not current = " + currentProcessingRequest + " " + ridSeq);
             return null; 
         }
 
-        final BoshRequest nextRequest = queue.poll();
+        final BoshRequest nextRequest = queue.remove(0);
         LOGGER.debug("SID = " + sessionId + " - using RID = " + (nextRequest == null ? "NULL" : Long.toString(currentProcessingRequest)) + " " + ridSeq);
         return nextRequest;
     }
-    
+
+    private synchronized void reinitHCRFromQueue() {
+        highestContinuousRid = -1;
+        if (queue.isEmpty()) return;
+
+        final Iterator<BoshRequest> iterator = queue.iterator();
+        while (iterator.hasNext()) {
+            BoshRequest boshRequest = iterator.next();            
+            final Long rid = boshRequest.getRid();
+            if (highestContinuousRid == -1) {
+                highestContinuousRid = rid;
+            } else if (highestContinuousRid + 1 == rid) {
+                highestContinuousRid = rid;
+            }
+        }
+    }
+
     public synchronized boolean containsRid(Long rid) {
         for (BoshRequest item : queue) {
             if (rid.equals(item.getRid())) return true;
diff --git a/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindowTest.java b/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindowTest.java
index 1e60be6..329ca55 100644
--- a/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindowTest.java
+++ b/server/extensions/xep0124-xep0206-bosh/src/test/java/org/apache/vysper/xmpp/extension/xep0124/RequestsWindowTest.java
@@ -37,6 +37,17 @@
 
         queueNewRequest(2L); // 2 after 3
         assertHighestContinuous(3); // HCR is correct
+        
+        assertRID(1L, requestsWindow.pollNext());
+        assertRID(2L, requestsWindow.pollNext());
+        assertRID(3L, requestsWindow.pollNext());
+    }
+    
+    public void testAllowDuplicateRIDs() {
+        requestsWindow = new RequestsWindow("1");
+        queueNewRequest(100L);
+        queueNewRequest(100L); // another 100
+        assertEquals(2, requestsWindow.size());
     }
 
     public void testRequestGap() {
@@ -49,9 +60,47 @@
 
         // now, 3 is highest continous, 4 is missing, 5 is highest: [3 HCR, GAP, 5]
         
-        final BoshRequest boshRequest = requestsWindow.pollNext();
-        assertHighestContinuous(3); // BUG! should go to 5
-        System.out.println("reqWin = " + requestsWindow.logRequestWindow()); // HCR is not in line
+        // now, fetch 3
+        final BoshRequest boshRequest3 = requestsWindow.pollNext();
+        assertRID(3L, boshRequest3);
+        final BoshRequest boshRequest5 = requestsWindow.pollNext();
+        assertHighestContinuous(5);
+        System.out.println("reqWin = " + requestsWindow.logRequestWindow());
+    }
+    
+    public void testRequest2Gaps() {
+        requestsWindow = new RequestsWindow("1");
+        queueNewRequest(3L);
+        assertHighestContinuous(3);
+
+        queueNewRequest(5L);
+        queueNewRequest(6L);
+        assertHighestContinuous(3);
+
+        queueNewRequest(8L);
+        queueNewRequest(9L);
+        queueNewRequest(10L);
+        assertHighestContinuous(3);
+
+        // now we have request [3, 5, 6, 8, 9, 10]
+        
+        // now, fetch 3
+        final BoshRequest boshRequest3 = requestsWindow.pollNext();
+        assertRID(3L, boshRequest3);
+
+        assertRID(5L, requestsWindow.pollNext());
+        assertRID(6L, requestsWindow.pollNext());
+        assertHighestContinuous(6);
+
+        assertRID(8L, requestsWindow.pollNext());
+        assertRID(9L, requestsWindow.pollNext());
+        assertRID(10L, requestsWindow.pollNext());
+        assertHighestContinuous(10);
+        
+    }
+
+    private void assertRID(final long expectedRid, BoshRequest boshRequest) {
+        assertEquals(expectedRid, (long)boshRequest.getRid());
     }
 
     private void assertHighestContinuous(final int expected) {