PROTON-1958: use a per-Timer counter value such that its Tasks naturally order if their deadlines are equal
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
index 11bb6b8..d360ee7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
@@ -21,8 +21,6 @@
package org.apache.qpid.proton.reactor.impl;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.impl.RecordImpl;
import org.apache.qpid.proton.reactor.Reactor;
@@ -32,13 +30,12 @@
private final long deadline;
private final int counter;
private boolean cancelled = false;
- private final AtomicInteger count = new AtomicInteger();
private Record attachments = new RecordImpl();
private Reactor reactor;
- public TaskImpl(long deadline) {
+ public TaskImpl(long deadline, int counter) {
this.deadline = deadline;
- this.counter = count.getAndIncrement();
+ this.counter = counter;
}
@Override
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
index b8df19d..0351196 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
@@ -22,6 +22,7 @@
package org.apache.qpid.proton.reactor.impl;
import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Event.Type;
@@ -32,13 +33,14 @@
private CollectorImpl collector;
private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
+ private AtomicInteger counter = new AtomicInteger();
public Timer(Collector collector) {
this.collector = (CollectorImpl)collector;
}
Task schedule(long deadline) {
- TaskImpl task = new TaskImpl(deadline);
+ TaskImpl task = new TaskImpl(deadline, counter.incrementAndGet());
tasks.add(task);
return task;
}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
index 387446e..fde9219 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -33,6 +33,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import junit.framework.AssertionFailedError;
@@ -487,6 +488,36 @@
taskHandler.assertEvents(Type.TIMER_TASK);
}
+ @Test
+ public void scheduleWithEqualDeadline() throws IOException {
+ final int count = 10;
+ final ArrayList<Integer> taskRunOrder = new ArrayList<Integer>();
+
+ class TaskHandler extends BaseHandler {
+ private int _counter;
+
+ private TaskHandler(int counter) {
+ _counter = counter;
+ }
+
+ @Override
+ public void onTimerTask(Event event) {
+ taskRunOrder.add(_counter);
+ }
+ }
+
+ final List<Integer> expectedOrder = new ArrayList<>();
+ for(int i = 0 ; i < count; i++) {
+ reactor.schedule(0, new TaskHandler(i));
+ expectedOrder.add(i);
+ }
+
+ reactor.run();
+ reactor.free();
+
+ assertEquals(expectedOrder, taskRunOrder);
+ }
+
private class BarfException extends RuntimeException {
private static final long serialVersionUID = -5891140258375562884L;
}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/TaskImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/TaskImplTest.java
new file mode 100644
index 0000000..23159d9
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/TaskImplTest.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.reactor.impl;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TaskImplTest {
+
+ @Test
+ public void testCompareToWithSameObject() {
+ TaskImpl task = new TaskImpl(1000, 1);
+ Assert.assertEquals(0, task.compareTo(task));
+ }
+
+ @Test
+ public void testCompareToWithDifferentDeadlines() {
+ TaskImpl task1 = new TaskImpl(1000, 1);
+ TaskImpl task2 = new TaskImpl(2000, 2);
+
+ Assert.assertTrue(task1.compareTo(task2) < 0);
+ Assert.assertTrue(task2.compareTo(task1) > 0);
+ }
+
+ @Test
+ public void testCompareToWithSameDeadlines() {
+ int deadline = 1000;
+ TaskImpl task1 = new TaskImpl(deadline, 1);
+ TaskImpl task2 = new TaskImpl(deadline, 2);
+
+ Assert.assertTrue("Expected task1 to order 'less' due to being created first", task1.compareTo(task2) < 0);
+ Assert.assertTrue("Expected task2 to order 'greater' due to being created second", task1.compareTo(task2) < 0);
+ Assert.assertTrue(task2.compareTo(task1) > 0);
+ }
+}