[REEF-1761] Race condition in NetworkMessagingTestService

Summary of changes:
   * Fix the race condition in `NetworkMessagingTestService.MessageHandler.onNext()`
   * Disable benchmarking in `NetworkConnectionServiceTest` if logging is too detailed
   * Remove some excessive logging in `NetworkMessagingTestService`
   * Make output of `NetworkConnectionServiceMessage.toString()` shorter
   * Use proper assertions in `NetworkMessagingTestService`
   * Other minor refactoring

JIRA: [REEF-1761](https://issues.apache.org/jira/browse/REEF-1761)

Pull request:
  This closes #1280
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
index c0fea17..5e29e08 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
@@ -101,15 +101,7 @@
    * @return a string representation of this object
    */
   public String toString() {
-    final StringBuilder builder = new StringBuilder();
-    builder.append("NSMessage");
-    builder.append(" remoteID=");
-    builder.append(destId);
-    builder.append(" message=[| ");
-    for (final T message : messages) {
-      builder.append(message).append(" |");
-    }
-    builder.append("]");
-    return builder.toString();
+    return String.format("%s[%s -> %s]: size %d",
+        this.getClass().getSimpleName(), this.srcId, this.destId, this.messages.size());
   }
 }
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
index 75e2c5e..e65d904 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
@@ -28,6 +28,7 @@
 import org.apache.reef.wake.remote.Codec;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -179,7 +180,11 @@
    */
   @Test
   public void testMessagingNetworkConnServiceRate() throws Exception {
+
+    Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
     LOG.log(Level.FINEST, name.getMethodName());
+
     final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
 
     for (final int size : messageSizes) {
@@ -220,7 +225,11 @@
    */
   @Test
   public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
+
+    Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
     LOG.log(Level.FINEST, name.getMethodName());
+
     final BlockingQueue<Object> barrier = new LinkedBlockingQueue<>();
 
     final int numThreads = 4;
@@ -277,7 +286,11 @@
 
   @Test
   public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception {
+
+    Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
     LOG.log(Level.FINEST, name.getMethodName());
+
     final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
 
     for (final int size : messageSizes) {
@@ -331,6 +344,9 @@
    */
   @Test
   public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
+
+    Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
     LOG.log(Level.FINEST, name.getMethodName());
 
     final int batchSize = 1024 * 1024;
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
index 86acc86..5f16af0 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
@@ -38,6 +38,8 @@
 import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -147,6 +149,9 @@
    */
   @Test
   public void testMessagingNetworkServiceRate() throws Exception {
+
+    Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
     LOG.log(Level.FINEST, name.getMethodName());
 
     final IdentifierFactory factory = new StringIdentifierFactory();
@@ -233,6 +238,9 @@
    */
   @Test
   public void testMessagingNetworkServiceRateDisjoint() throws Exception {
+
+    Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
     LOG.log(Level.FINEST, name.getMethodName());
 
     final IdentifierFactory factory = new StringIdentifierFactory();
@@ -342,6 +350,9 @@
 
   @Test
   public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception {
+
+    Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
     LOG.log(Level.FINEST, name.getMethodName());
 
     final IdentifierFactory factory = new StringIdentifierFactory();
@@ -444,6 +455,9 @@
    */
   @Test
   public void testMessagingNetworkServiceBatchingRate() throws Exception {
+
+    Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
     LOG.log(Level.FINEST, name.getMethodName());
 
     final IdentifierFactory factory = new StringIdentifierFactory();
@@ -535,7 +549,7 @@
     private final String name;
     private final int expected;
     private final Monitor monitor;
-    private AtomicInteger count = new AtomicInteger(0);
+    private final AtomicInteger count = new AtomicInteger(0);
 
     MessageHandler(final String name, final Monitor monitor, final int expected) {
       this.name = name;
@@ -546,17 +560,13 @@
     @Override
     public void onNext(final Message<T> value) {
 
-      count.incrementAndGet();
+      final int currentCount = count.incrementAndGet();
 
-      LOG.log(Level.FINEST,
-          "OUT: {0} received {1} from {2} to {3}",
-          new Object[]{name, value.getData(), value.getSrcId(), value.getDestId()});
+      LOG.log(Level.FINER, "{0} Message {1}/{2} :: {3}", new Object[] {name, currentCount, expected, value});
 
-      for (final T obj : value.getData()) {
-        LOG.log(Level.FINEST, "OUT: data: {0}", obj);
-      }
+      Assert.assertTrue(currentCount <= expected);
 
-      if (count.get() == expected) {
+      if (currentCount >= expected) {
         monitor.mnotify();
       }
     }
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
index 59fd0c8..7308bb0 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
@@ -34,6 +34,7 @@
 import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.remote.Codec;
 import org.apache.reef.wake.remote.transport.LinkListener;
+import org.junit.Assert;
 
 import java.net.SocketAddress;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -99,11 +100,12 @@
   }
 
   public static final class MessageHandler<T> implements EventHandler<Message<T>> {
+
     private final int expected;
     private final Monitor monitor;
     private final Identifier expectedSrcId;
     private final Identifier expectedDestId;
-    private AtomicInteger count = new AtomicInteger(0);
+    private final AtomicInteger count = new AtomicInteger(0);
 
     public MessageHandler(final Monitor monitor,
                           final int expected,
@@ -117,20 +119,15 @@
 
     @Override
     public void onNext(final Message<T> value) {
-      count.incrementAndGet();
-      LOG.log(Level.FINE, "Count: {0}", count.get());
-      LOG.log(Level.FINE,
-          "OUT: {0} received {1} from {2} to {3}",
-          new Object[]{value, value.getSrcId(), value.getDestId()});
 
-      for (final T obj : value.getData()) {
-        LOG.log(Level.FINE, "OUT: data: {0}", obj);
-      }
+      final int currentCount = count.incrementAndGet();
+      LOG.log(Level.FINER, "Message {0}/{1} :: {2}", new Object[] {currentCount, expected, value});
 
-      assert value.getSrcId().equals(expectedSrcId);
-      assert value.getDestId().equals(expectedDestId);
+      Assert.assertEquals(expectedSrcId, value.getSrcId());
+      Assert.assertEquals(expectedDestId, value.getDestId());
+      Assert.assertTrue(currentCount <= expected);
 
-      if (count.get() == expected) {
+      if (currentCount >= expected) {
         monitor.mnotify();
       }
     }
@@ -139,11 +136,11 @@
   public static final class TestListener<T> implements LinkListener<Message<T>> {
     @Override
     public void onSuccess(final Message<T> message) {
-      LOG.log(Level.FINE, "success: " + message);
+      LOG.log(Level.FINER, "Success: message {0}", message);
     }
     @Override
     public void onException(final Throwable cause, final SocketAddress remoteAddress, final Message<T> message) {
-      LOG.log(Level.WARNING, "exception: " + cause + message);
+      LOG.log(Level.WARNING, "Exception: message " + message, cause);
       throw new RuntimeException(cause);
     }
   }