PROTON-2564 Avoid system calls for waiter notification if none waiting
Reduces overhead when no waiters are present and deliveries are being
queued into the prefetch buffer.
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
index 91fbaeb..2bbb8f4 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
@@ -18,7 +18,6 @@
import java.util.ArrayDeque;
import java.util.Deque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.qpid.protonj2.client.Delivery;
@@ -38,6 +37,8 @@
private volatile int state = STOPPED;
+ private int waiters = 0;
+
private final Deque<ClientDelivery> queue;
/**
@@ -54,7 +55,9 @@
public void enqueueFirst(ClientDelivery envelope) {
synchronized (queue) {
queue.addFirst(envelope);
- queue.notify();
+ if (waiters > 0) {
+ queue.notify();
+ }
}
}
@@ -62,7 +65,9 @@
public void enqueue(ClientDelivery envelope) {
synchronized (queue) {
queue.addLast(envelope);
- queue.notify();
+ if (waiters > 0) {
+ queue.notify();
+ }
}
}
@@ -72,10 +77,20 @@
// Wait until the receiver is ready to deliver messages.
while (timeout != 0 && isRunning() && queue.isEmpty()) {
if (timeout == -1) {
- queue.wait();
+ waiters++;
+ try {
+ queue.wait();
+ } finally {
+ waiters--;
+ }
} else {
long start = System.currentTimeMillis();
- queue.wait(TimeUnit.MILLISECONDS.toMillis(timeout));
+ waiters++;
+ try {
+ queue.wait(timeout);
+ } finally {
+ waiters--;
+ }
timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
}
}
@@ -103,7 +118,9 @@
public void start() {
if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
synchronized (queue) {
- queue.notifyAll();
+ if (waiters > 0) {
+ queue.notifyAll();
+ }
}
}
}
@@ -112,7 +129,9 @@
public void stop() {
if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
synchronized (queue) {
- queue.notifyAll();
+ if (waiters > 0) {
+ queue.notifyAll();
+ }
}
}
}
@@ -121,7 +140,9 @@
public void close() {
if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
synchronized (queue) {
- queue.notifyAll();
+ if (waiters > 0) {
+ queue.notifyAll();
+ }
}
}
}