APEXMALHAR-2544 Flume test synchronization bug.
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
index e7007cb..d9d51ec 100644
--- a/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
@@ -23,6 +23,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
@@ -49,28 +51,23 @@
@SuppressWarnings("SleepWhileInLoop")
public void testServer() throws InterruptedException, IOException
{
+ final CountDownLatch countdown = new CountDownLatch(1);
Discovery<byte[]> discovery = new Discovery<byte[]>()
{
@Override
- public synchronized void unadvertise(Service<byte[]> service)
+ public void unadvertise(Service<byte[]> service)
{
logger.info("Unadvertise invoked");
- notify();
+ countdown.countDown();
}
@Override
- public synchronized void advertise(Service<byte[]> service)
+ public void advertise(Service<byte[]> service)
{
logger.info("Advertise invoked");
- // There is a race condition that is breaking the test (Advertise invoked before discover started, hence sleep
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.warn("Not able to sleep because of interruption " + e.getMessage(),e);
- }
port = service.getPort();
logger.debug("listening at {}", service);
- notify();
+ countdown.countDown();
}
@Override
@@ -79,7 +76,7 @@
{
logger.info("Discover invoked");
try {
- wait();
+ countdown.await(30, TimeUnit.SECONDS);
logger.info("Discover wait completed");
} catch (InterruptedException ie) {
throw new RuntimeException(ie);