DRILL-2574: SendingAccountor can suffer from lost updates
SendingAccountor
- atomically get and set the message count to wait for
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
index 8794188..21fc800 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
@@ -27,10 +27,10 @@
* TODO: Need to update to use long for number of pending messages.
*/
public class SendingAccountor {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
private final AtomicInteger batchesSent = new AtomicInteger(0);
- private Semaphore wait = new Semaphore(0);
+ private final Semaphore wait = new Semaphore(0);
public void increment() {
batchesSent.incrementAndGet();
@@ -42,8 +42,10 @@
public synchronized void waitForSendComplete() {
try {
- wait.acquire(batchesSent.get());
- batchesSent.set(0);
+ int waitForBatches;
+ while((waitForBatches = batchesSent.getAndSet(0)) != 0) {
+ wait.acquire(waitForBatches);
+ }
} catch (InterruptedException e) {
logger.warn("Failure while waiting for send complete.", e);
// TODO InterruptedException