PROTON-2564 Avoid system calls for waiter notification if none waiting

Reduces overhead when no waiters are present and deliveries are being
queued into the prefetch buffer.
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
index 91fbaeb..2bbb8f4 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java
@@ -18,7 +18,6 @@
 
 import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.qpid.protonj2.client.Delivery;
@@ -38,6 +37,8 @@
 
     private volatile int state = STOPPED;
 
+    private int waiters = 0;
+
     private final Deque<ClientDelivery> queue;
 
     /**
@@ -54,7 +55,9 @@
     public void enqueueFirst(ClientDelivery envelope) {
         synchronized (queue) {
             queue.addFirst(envelope);
-            queue.notify();
+            if (waiters > 0) {
+                queue.notify();
+            }
         }
     }
 
@@ -62,7 +65,9 @@
     public void enqueue(ClientDelivery envelope) {
         synchronized (queue) {
             queue.addLast(envelope);
-            queue.notify();
+            if (waiters > 0) {
+                queue.notify();
+            }
         }
     }
 
@@ -72,10 +77,20 @@
             // Wait until the receiver is ready to deliver messages.
             while (timeout != 0 && isRunning() && queue.isEmpty()) {
                 if (timeout == -1) {
-                    queue.wait();
+                    waiters++;
+                    try {
+                        queue.wait();
+                    } finally {
+                        waiters--;
+                    }
                 } else {
                     long start = System.currentTimeMillis();
-                    queue.wait(TimeUnit.MILLISECONDS.toMillis(timeout));
+                    waiters++;
+                    try {
+                        queue.wait(timeout);
+                    } finally {
+                        waiters--;
+                    }
                     timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
                 }
             }
@@ -103,7 +118,9 @@
     public void start() {
         if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
             synchronized (queue) {
-                queue.notifyAll();
+                if (waiters > 0) {
+                    queue.notifyAll();
+                }
             }
         }
     }
@@ -112,7 +129,9 @@
     public void stop() {
         if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
             synchronized (queue) {
-                queue.notifyAll();
+                if (waiters > 0) {
+                    queue.notifyAll();
+                }
             }
         }
     }
@@ -121,7 +140,9 @@
     public void close() {
         if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
             synchronized (queue) {
-                queue.notifyAll();
+                if (waiters > 0) {
+                    queue.notifyAll();
+                }
             }
         }
     }