(TWILL-258) Use loopback address for ZK server. Also fixes some race conditions in unit tests

- Fix a race condition in the LocationCacheTest
  - There is a small delay in the current timestamp in the
    LocationCacheCleaner.start and the one in the test method.
- Fix race condition in LogLevelChangeTestRun
  - The test assumes after the root logger level changed, the other logger levels also changed
    in the resource report, which is not true
  - The test is not checking the log levels for all runnnable instances

This closes #68 on Github.

Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/.travis.yml b/.travis.yml
index 55101b7..af74548 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -31,7 +31,7 @@
     - /^branch\-.*$/
     - /^feature\/.*$/
 
-script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false -Dtwill.zk.server.localhost=false
+script: mvn --batch-mode test -P $PROFILE -Dsurefire.redirectTestOutputToFile=false
 
 install: mvn --batch-mode install -P $PROFILE -DskipTests=true
 
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
index 0738218..fed76a5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/LocationCacheCleaner.java
@@ -144,7 +144,9 @@
               }
               // If the location is already pending for cleanup, this won't update the expire time as
               // the comparison of PendingCleanup is only by location.
-              pendingCleanups.add(new PendingCleanup(location, expireTime));
+              if (pendingCleanups.add(new PendingCleanup(location, expireTime))) {
+                LOG.debug("Pending deletion of location {} with expiration time at {}", location, expireTime);
+              }
             }
           }
         } catch (IOException e) {
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
index 8ed9ae4..63ca28b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LocationCacheTest.java
@@ -109,12 +109,17 @@
     newTwillRunner.start();
 
     // Force a cleanup using the antique expiry. The list of locations that need to be cleanup was already
-    // collected when the new twill runner was started
+    // collected when the new twill runner was started.
+    // Need to add some time in addition to the antique expiry time because the cache cleaner collects
+    // pending list asynchronously, which the "current" time it uses to calculate the expiration time might be
+    // later than the System.currentTimeMillis() call in the next line.
     ((YarnTwillRunnerService) newTwillRunner)
-      .forceLocationCacheCleanup(System.currentTimeMillis() + Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
+      .forceLocationCacheCleanup(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30) +
+                                   Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
 
     // Now there shouldn't be any file under the current session cache directory
-    Assert.assertTrue(currentSessionCache.list().isEmpty());
+    List<Location> locations = currentSessionCache.list();
+    Assert.assertTrue("Location is not empty " + locations, locations.isEmpty());
   }
 
   /**
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
index 6df6d11..a1d8ae6 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java
@@ -37,6 +37,7 @@
 import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -172,20 +173,20 @@
 
     // assert that log level is DEBUG
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
 
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
 
     // change the log level to INFO
     controller.updateLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, LogEntry.Level.INFO)).get();
 
     // assert log level has changed to INFO
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO), 1);
 
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.INFO, ImmutableMap.of("ROOT", LogEntry.Level.INFO), 1);
 
     // change the log level of LogLevelTestRunnable to WARN,
     // change the log level of LogLevelTestSecondRunnable to TRACE
@@ -195,16 +196,16 @@
     controller.updateLogLevels(LogLevelTestSecondRunnable.class.getSimpleName(), logLevelSecondRunnable).get();
 
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, ImmutableMap.of("ROOT", LogEntry.Level.WARN));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, ImmutableMap.of("ROOT", LogEntry.Level.WARN), 1);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.TRACE, ImmutableMap.of("ROOT", LogEntry.Level.TRACE));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.TRACE, ImmutableMap.of("ROOT", LogEntry.Level.TRACE), 1);
 
     // change a particular logger to log level warn and reset it back.
     logLevelFirstRunnable = ImmutableMap.of("test", LogEntry.Level.WARN);
     controller.updateLogLevels(LogLevelTestRunnable.class.getSimpleName(), logLevelFirstRunnable).get();
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
                     20L, TimeUnit.SECONDS, LogEntry.Level.WARN,
-                    ImmutableMap.of("ROOT", LogEntry.Level.WARN, "test", LogEntry.Level.WARN));
+                    ImmutableMap.of("ROOT", LogEntry.Level.WARN, "test", LogEntry.Level.WARN), 1);
     logLevelFirstRunnable = new HashMap<>();
     logLevelFirstRunnable.put("test", null);
     controller.updateLogLevels(LogLevelTestRunnable.class.getSimpleName(), logLevelFirstRunnable).get();
@@ -212,13 +213,13 @@
     result.put("ROOT", LogEntry.Level.WARN);
     result.put("test", null);
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result);
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result, 1);
 
     // reset the log level for a particular logger of LogLevelTestRunnable
     controller.resetRunnableLogLevels(LogLevelTestRunnable.class.getSimpleName(), "test").get();
     result.remove("test");
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result);
+                    20L, TimeUnit.SECONDS, LogEntry.Level.WARN, result, 1);
 
     // change the log level of LogLevelTestSecondRunnable to INFO and change instances of it to test if the log level
     // request get applied to container started up later
@@ -228,14 +229,14 @@
     controller.changeInstances(LogLevelTestSecondRunnable.class.getSimpleName(), 2).get();
     TimeUnit.SECONDS.sleep(5);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(), 20L, TimeUnit.SECONDS,
-                    LogEntry.Level.INFO, logLevelSecondRunnable);
+                    LogEntry.Level.INFO, logLevelSecondRunnable, 2);
 
     // reset the log levels back to default.
     controller.resetLogLevels().get();
     waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 1);
     waitForLogLevel(controller, LogLevelTestSecondRunnable.class.getSimpleName(),
-                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG));
+                    20L, TimeUnit.SECONDS, LogEntry.Level.DEBUG, ImmutableMap.of("ROOT", LogEntry.Level.DEBUG), 2);
 
     // stop
     controller.terminate().get(120, TimeUnit.SECONDS);
@@ -248,29 +249,37 @@
   // could return null if the application has not fully started.
   private void waitForLogLevel(TwillController controller, String runnable, long timeout,
                                TimeUnit timeoutUnit, LogEntry.Level expected,
-                               Map<String, LogEntry.Level> expectedArgs) throws InterruptedException {
+                               Map<String, LogEntry.Level> expectedArgs,
+                               int expectedInstances) throws InterruptedException {
 
     Stopwatch stopwatch = new Stopwatch();
     stopwatch.start();
-    LogEntry.Level actual = null;
-    Map<String, LogEntry.Level> actualArgs = null;
-    boolean stopped = false;
-    do {
+    while (stopwatch.elapsedTime(timeoutUnit) < timeout) {
       ResourceReport report = controller.getResourceReport();
+
       if (report == null || report.getRunnableResources(runnable) == null) {
+        TimeUnit.MILLISECONDS.sleep(100);
         continue;
       }
+
+      int matchCount = 0;
       for (TwillRunResources resources : report.getRunnableResources(runnable)) {
-        actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
-        actualArgs = resources.getLogLevels();
-        if (actual != null && actual.equals(expected)) {
-          stopped = true;
-          break;
+        LogEntry.Level actual = resources.getLogLevels().get(Logger.ROOT_LOGGER_NAME);
+        Map<String, LogEntry.Level> actualArgs = resources.getLogLevels();
+        if (Objects.equals(expected, actual) && Objects.equals(expectedArgs, actualArgs)) {
+          matchCount++;
+        } else {
+          LOG.info("Log levels not match for {}. {} != {} or {} != {}",
+                   runnable, expected, actual, expectedArgs, actualArgs);
         }
       }
+
+      if (matchCount == expectedInstances) {
+        return;
+      }
       TimeUnit.MILLISECONDS.sleep(100);
-    } while (!stopped && stopwatch.elapsedTime(timeoutUnit) < timeout);
-    Assert.assertEquals(expected, actual);
-    Assert.assertEquals(expectedArgs, actualArgs);
+    }
+
+    Assert.fail("Timeout waiting for expected log levels");
   }
 }
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
index f962b68..d18d5ed 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java
@@ -18,7 +18,6 @@
 package org.apache.twill.internal.zookeeper;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -32,7 +31,6 @@
 import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.concurrent.Executor;
 
 /**
@@ -103,17 +101,11 @@
   }
 
   private InetSocketAddress getAddress(int port) {
-    try {
-      int socketPort = port < 0 ? 0 : port;
-      // This property is needed so that in certain CI environment (e.g. Travis-CI) it can only works properly if
-      // it is binded to the wildcard (0.0.0.0) address
-      if (Boolean.parseBoolean(System.getProperties().getProperty("twill.zk.server.localhost", "true"))) {
-        return new InetSocketAddress(InetAddress.getLocalHost(), socketPort);
-      } else {
-        return new InetSocketAddress(socketPort);
-      }
-    } catch (UnknownHostException e) {
-      throw Throwables.propagate(e);
+    int socketPort = port < 0 ? 0 : port;
+    if (Boolean.parseBoolean(System.getProperties().getProperty("twill.zk.server.localhost", "true"))) {
+      return new InetSocketAddress(InetAddress.getLoopbackAddress(), socketPort);
+    } else {
+      return new InetSocketAddress(socketPort);
     }
   }