Merge APEXMALHAR-2544.flumetest
diff --git a/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java b/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
index e7007cb..d9d51ec 100644
--- a/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
+++ b/flume/src/test/java/org/apache/apex/malhar/flume/sink/FlumeSinkTest.java
@@ -23,6 +23,8 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -49,28 +51,23 @@
   @SuppressWarnings("SleepWhileInLoop")
   public void testServer() throws InterruptedException, IOException
   {
+    final CountDownLatch countdown = new CountDownLatch(1);
     Discovery<byte[]> discovery = new Discovery<byte[]>()
     {
       @Override
-      public synchronized void unadvertise(Service<byte[]> service)
+      public void unadvertise(Service<byte[]> service)
       {
         logger.info("Unadvertise invoked");
-        notify();
+        countdown.countDown();
       }
 
       @Override
-      public synchronized void advertise(Service<byte[]> service)
+      public void advertise(Service<byte[]> service)
       {
         logger.info("Advertise invoked");
-        // There is a race condition that is breaking the test (Advertise invoked before discover started, hence sleep
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          logger.warn("Not able to sleep because of interruption " + e.getMessage(),e);
-        }
         port = service.getPort();
         logger.debug("listening at {}", service);
-        notify();
+        countdown.countDown();
       }
 
       @Override
@@ -79,7 +76,7 @@
       {
         logger.info("Discover invoked");
         try {
-          wait();
+          countdown.await(30, TimeUnit.SECONDS);
           logger.info("Discover wait completed");
         } catch (InterruptedException ie) {
           throw new RuntimeException(ie);