Fix an easy to fail unit-test
- The test has race condition. The mutual discovery
doesn’t works well as one runnable can be finished
and not discoverable anymore before another one
tries to discover.
- Switch to use an Echo server / client runnable,
which only the client needs to discover the server.
This closes #77 on GitHub.
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
index 59fe835..77f53ad 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ServiceDiscoveryTestRun.java
@@ -17,8 +17,10 @@
*/
package org.apache.twill.yarn;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.io.LineReader;
import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.Command;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillController;
@@ -34,9 +36,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.io.Reader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -51,18 +61,16 @@
TwillController controller = twillRunner
.prepare(new ServiceApplication())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
- .withArguments("r1", "12345")
- .withArguments("r2", "45678")
+ .withApplicationArguments("echo")
.start();
- ServiceDiscovered completed = controller.discoverService("completed");
- Assert.assertTrue(waitForSize(completed, 2, 120));
- controller.sendCommand(Command.Builder.of("done").build());
- controller.awaitTerminated(120, TimeUnit.SECONDS);
+ ServiceDiscovered discovered = controller.discoverService("discovered");
+ Assert.assertTrue(waitForSize(discovered, 1, 120));
+ controller.terminate().get();
}
/**
- * An application that contains two {@link ServiceRunnable}.
+ * An application that contains an EchoServer and an EchoClient.
*/
public static final class ServiceApplication implements TwillApplication {
@@ -71,66 +79,63 @@
return TwillSpecification.Builder.with()
.setName("ServiceApp")
.withRunnable()
- .add("r1", new ServiceRunnable()).noLocalFiles()
- .add("r2", new ServiceRunnable()).noLocalFiles()
+ .add("server", new EchoServer()).noLocalFiles()
+ .add("client", new EchoClient()).noLocalFiles()
.anyOrder()
.build();
}
}
/**
- * A Runnable that will announce on service and wait for announcement from another instance in the same service.
+ * A runnable to discover the echo server and issue a call to it.
*/
- public static final class ServiceRunnable extends AbstractTwillRunnable {
+ public static final class EchoClient extends AbstractTwillRunnable {
- private static final Logger LOG = LoggerFactory.getLogger(ServiceRunnable.class);
- private static final String SERVICE_NAME = "service";
- private final CountDownLatch stopLatch = new CountDownLatch(1);
+ private static final Logger LOG = LoggerFactory.getLogger(EchoClient.class);
+
+ private final CountDownLatch completion = new CountDownLatch(1);
@Override
public void run() {
- final int port = Integer.parseInt(getContext().getArguments()[0]);
- Cancellable cancelService = getContext().announce(SERVICE_NAME, port);
-
- final CountDownLatch discoveredLatch = new CountDownLatch(1);
-
- ServiceDiscovered serviceDiscovered = getContext().discover(SERVICE_NAME);
+ final BlockingQueue<Discoverable> discoverables = new LinkedBlockingQueue<>();
+ ServiceDiscovered serviceDiscovered = getContext().discover(getContext().getApplicationArguments()[0]);
serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() {
@Override
public void onChange(ServiceDiscovered serviceDiscovered) {
- // Try to find a discoverable that is not this instance
- for (Discoverable discoverable : serviceDiscovered) {
- int discoveredPort = discoverable.getSocketAddress().getPort();
- if (SERVICE_NAME.equals(discoverable.getName()) && discoveredPort != port) {
- LOG.info("{}: Service discovered at {}", getContext().getSpecification().getName(), discoveredPort);
- discoveredLatch.countDown();
- }
- }
+ Iterables.addAll(discoverables, serviceDiscovered);
}
}, Threads.SAME_THREAD_EXECUTOR);
try {
- discoveredLatch.await();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted.", e);
- }
+ Discoverable discoverable = discoverables.poll(120, TimeUnit.SECONDS);
+ // Make a call to the echo server
+ InetSocketAddress address = discoverable.getSocketAddress();
+ Socket socket = new Socket(address.getAddress(), address.getPort());
+ String message = "Hello World";
+ try (PrintWriter printer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"))) {
+ printer.println(message);
+ printer.flush();
- // Announce the "complete" service so that the driver knows this runnable has discovered the other
- Cancellable cancelCompleted = getContext().announce("completed", port);
- try {
- stopLatch.await();
- cancelService.cancel();
- cancelCompleted.cancel();
+ try (Reader reader = new InputStreamReader(socket.getInputStream(), "UTF-8")) {
+ LineReader lineReader = new LineReader(reader);
+ String line = lineReader.readLine();
+ Preconditions.checkState(message.equals(line), "Expected %s, got %s", message, line);
+ }
+ }
+
+ Cancellable cancellable = getContext().announce("discovered", 12345);
+ completion.await();
+ cancellable.cancel();
} catch (InterruptedException e) {
LOG.warn("Interrupted.", e);
+ } catch (IOException e) {
+ LOG.error("Failed to talk to server", e);
}
}
@Override
- public void handleCommand(Command command) throws Exception {
- if ("done".equals(command.getCommand())) {
- stopLatch.countDown();
- }
+ public void stop() {
+ completion.countDown();
}
}
}
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
index fd38576..e9e6a99 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SocketServer.java
@@ -19,7 +19,6 @@
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.TwillContext;
import org.apache.twill.common.Cancellable;
@@ -34,6 +33,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -57,10 +57,13 @@
", id: " + context.getInstanceId() +
", count: " + context.getInstanceCount());
- final List<Cancellable> cancellables = ImmutableList.of(
- context.announce(context.getApplicationArguments()[0], serverSocket.getLocalPort()),
- context.announce(context.getArguments()[0], serverSocket.getLocalPort())
- );
+ // Announce with service names as specified in app arguments and runnable arguments
+ final List<Cancellable> cancellables = new ArrayList<>();
+ for (String[] args : new String[][] {context.getApplicationArguments(), context.getArguments()}) {
+ if (args.length > 0) {
+ cancellables.add(context.announce(args[0], serverSocket.getLocalPort()));
+ }
+ }
canceller = new Cancellable() {
@Override
public void cancel() {