Fix an easy to fail unit-test

 - The test has race condition. The mutual discovery
   doesn’t works well as one runnable can be finished
   and not discoverable anymore before another one
   tries to discover.
 - Switch to use an Echo server / client runnable,
   which only the client needs to discover the server.

This closes #77 on GitHub.

Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
index 59fe835..77f53ad 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
@@ -17,8 +17,10 @@
  */
 package org.apache.twill.yarn;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.io.LineReader;
 import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.Command;
 import org.apache.twill.api.TwillApplication;
 import org.apache.twill.api.TwillContext;
 import org.apache.twill.api.TwillController;
@@ -34,9 +36,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.io.Reader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -51,18 +61,16 @@
     TwillController controller = twillRunner
       .prepare(new ServiceApplication())
       .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
-      .withArguments("r1", "12345")
-      .withArguments("r2", "45678")
+      .withApplicationArguments("echo")
       .start();
 
-    ServiceDiscovered completed = controller.discoverService("completed");
-    Assert.assertTrue(waitForSize(completed, 2, 120));
-    controller.sendCommand(Command.Builder.of("done").build());
-    controller.awaitTerminated(120, TimeUnit.SECONDS);
+    ServiceDiscovered discovered = controller.discoverService("discovered");
+    Assert.assertTrue(waitForSize(discovered, 1, 120));
+    controller.terminate().get();
   }
 
   /**
-   * An application that contains two {@link ServiceRunnable}.
+   * An application that contains an EchoServer and an EchoClient.
    */
   public static final class ServiceApplication implements TwillApplication {
 
@@ -71,66 +79,63 @@
       return TwillSpecification.Builder.with()
         .setName("ServiceApp")
         .withRunnable()
-          .add("r1", new ServiceRunnable()).noLocalFiles()
-          .add("r2", new ServiceRunnable()).noLocalFiles()
+          .add("server", new EchoServer()).noLocalFiles()
+          .add("client", new EchoClient()).noLocalFiles()
         .anyOrder()
         .build();
     }
   }
 
   /**
-   * A Runnable that will announce on service and wait for announcement from another instance in the same service.
+   * A runnable to discover the echo server and issue a call to it.
    */
-  public static final class ServiceRunnable extends AbstractTwillRunnable {
+  public static final class EchoClient extends AbstractTwillRunnable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ServiceRunnable.class);
-    private static final String SERVICE_NAME = "service";
-    private final CountDownLatch stopLatch = new CountDownLatch(1);
+    private static final Logger LOG = LoggerFactory.getLogger(EchoClient.class);
+
+    private final CountDownLatch completion = new CountDownLatch(1);
 
     @Override
     public void run() {
-      final int port = Integer.parseInt(getContext().getArguments()[0]);
-      Cancellable cancelService = getContext().announce(SERVICE_NAME, port);
-
-      final CountDownLatch discoveredLatch = new CountDownLatch(1);
-
-      ServiceDiscovered serviceDiscovered = getContext().discover(SERVICE_NAME);
+      final BlockingQueue<Discoverable> discoverables = new LinkedBlockingQueue<>();
+      ServiceDiscovered serviceDiscovered = getContext().discover(getContext().getApplicationArguments()[0]);
       serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
         @Override
         public void onChange(ServiceDiscovered serviceDiscovered) {
-          // Try to find a discoverable that is not this instance
-          for (Discoverable discoverable : serviceDiscovered) {
-            int discoveredPort = discoverable.getSocketAddress().getPort();
-            if (SERVICE_NAME.equals(discoverable.getName()) && discoveredPort != port) {
-              LOG.info("{}: Service discovered at {}", getContext().getSpecification().getName(), discoveredPort);
-              discoveredLatch.countDown();
-            }
-          }
+          Iterables.addAll(discoverables, serviceDiscovered);
         }
       }, Threads.SAME_THREAD_EXECUTOR);
 
       try {
-        discoveredLatch.await();
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted.", e);
-      }
+        Discoverable discoverable = discoverables.poll(120, TimeUnit.SECONDS);
+        // Make a call to the echo server
+        InetSocketAddress address = discoverable.getSocketAddress();
+        Socket socket = new Socket(address.getAddress(), address.getPort());
+        String message = "Hello World";
+        try (PrintWriter printer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"))) {
+          printer.println(message);
+          printer.flush();
 
-      // Announce the "complete" service so that the driver knows this runnable has discovered the other
-      Cancellable cancelCompleted = getContext().announce("completed", port);
-      try {
-        stopLatch.await();
-        cancelService.cancel();
-        cancelCompleted.cancel();
+          try (Reader reader = new InputStreamReader(socket.getInputStream(), "UTF-8")) {
+            LineReader lineReader = new LineReader(reader);
+            String line = lineReader.readLine();
+            Preconditions.checkState(message.equals(line), "Expected %s, got %s", message, line);
+          }
+        }
+
+        Cancellable cancellable = getContext().announce("discovered", 12345);
+        completion.await();
+        cancellable.cancel();
       } catch (InterruptedException e) {
         LOG.warn("Interrupted.", e);
+      } catch (IOException e) {
+        LOG.error("Failed to talk to server", e);
       }
     }
 
     @Override
-    public void handleCommand(Command command) throws Exception {
-      if ("done".equals(command.getCommand())) {
-        stopLatch.countDown();
-      }
+    public void stop() {
+      completion.countDown();
     }
   }
 }
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
index fd38576..e9e6a99 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
@@ -19,7 +19,6 @@
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
 import org.apache.twill.api.AbstractTwillRunnable;
 import org.apache.twill.api.TwillContext;
 import org.apache.twill.common.Cancellable;
@@ -34,6 +33,7 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -57,10 +57,13 @@
                ", id: " + context.getInstanceId() +
                ", count: " + context.getInstanceCount());
 
-      final List<Cancellable> cancellables = ImmutableList.of(
-        context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()),
-        context.announce(context.getArguments()[0], serverSocket.getLocalPort())
-      );
+      // Announce with service names as specified in app arguments and runnable arguments
+      final List<Cancellable> cancellables = new ArrayList<>();
+      for (String[] args : new String[][] {context.getApplicationArguments(), context.getArguments()}) {
+        if (args.length > 0) {
+          cancellables.add(context.announce(args[0], serverSocket.getLocalPort()));
+        }
+      }
       canceller = new Cancellable() {
         @Override
         public void cancel() {