[REEF-1761] Race condition in NetworkMessagingTestService
Summary of changes:
* Fix the race condition in `NetworkMessagingTestService.MessageHandler.onNext()`
* Disable benchmarking in `NetworkConnectionServiceTest` if logging is too detailed
* Remove some excessive logging in `NetworkMessagingTestService`
* Make output of `NetworkConnectionServiceMessage.toString()` shorter
* Use proper assertions in `NetworkMessagingTestService`
* Other minor refactoring
JIRA: [REEF-1761](https://issues.apache.org/jira/browse/REEF-1761)
Pull request:
This closes #1280
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
index c0fea17..5e29e08 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceMessage.java
@@ -101,15 +101,7 @@
* @return a string representation of this object
*/
public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("NSMessage");
- builder.append(" remoteID=");
- builder.append(destId);
- builder.append(" message=[| ");
- for (final T message : messages) {
- builder.append(message).append(" |");
- }
- builder.append("]");
- return builder.toString();
+ return String.format("%s[%s -> %s]: size %d",
+ this.getClass().getSimpleName(), this.srcId, this.destId, this.messages.size());
}
}
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
index 75e2c5e..e65d904 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
@@ -28,6 +28,7 @@
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -179,7 +180,11 @@
*/
@Test
public void testMessagingNetworkConnServiceRate() throws Exception {
+
+ Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
LOG.log(Level.FINEST, name.getMethodName());
+
final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
for (final int size : messageSizes) {
@@ -220,7 +225,11 @@
*/
@Test
public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
+
+ Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
LOG.log(Level.FINEST, name.getMethodName());
+
final BlockingQueue<Object> barrier = new LinkedBlockingQueue<>();
final int numThreads = 4;
@@ -277,7 +286,11 @@
@Test
public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception {
+
+ Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
LOG.log(Level.FINEST, name.getMethodName());
+
final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
for (final int size : messageSizes) {
@@ -331,6 +344,9 @@
*/
@Test
public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
+
+ Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
LOG.log(Level.FINEST, name.getMethodName());
final int batchSize = 1024 * 1024;
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
index 86acc86..5f16af0 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
@@ -38,6 +38,8 @@
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -147,6 +149,9 @@
*/
@Test
public void testMessagingNetworkServiceRate() throws Exception {
+
+ Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
@@ -233,6 +238,9 @@
*/
@Test
public void testMessagingNetworkServiceRateDisjoint() throws Exception {
+
+ Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
@@ -342,6 +350,9 @@
@Test
public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception {
+
+ Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
@@ -444,6 +455,9 @@
*/
@Test
public void testMessagingNetworkServiceBatchingRate() throws Exception {
+
+ Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
+
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
@@ -535,7 +549,7 @@
private final String name;
private final int expected;
private final Monitor monitor;
- private AtomicInteger count = new AtomicInteger(0);
+ private final AtomicInteger count = new AtomicInteger(0);
MessageHandler(final String name, final Monitor monitor, final int expected) {
this.name = name;
@@ -546,17 +560,13 @@
@Override
public void onNext(final Message<T> value) {
- count.incrementAndGet();
+ final int currentCount = count.incrementAndGet();
- LOG.log(Level.FINEST,
- "OUT: {0} received {1} from {2} to {3}",
- new Object[]{name, value.getData(), value.getSrcId(), value.getDestId()});
+ LOG.log(Level.FINER, "{0} Message {1}/{2} :: {3}", new Object[] {name, currentCount, expected, value});
- for (final T obj : value.getData()) {
- LOG.log(Level.FINEST, "OUT: data: {0}", obj);
- }
+ Assert.assertTrue(currentCount <= expected);
- if (count.get() == expected) {
+ if (currentCount >= expected) {
monitor.mnotify();
}
}
diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
index 59fd0c8..7308bb0 100644
--- a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
+++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
@@ -34,6 +34,7 @@
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.transport.LinkListener;
+import org.junit.Assert;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
@@ -99,11 +100,12 @@
}
public static final class MessageHandler<T> implements EventHandler<Message<T>> {
+
private final int expected;
private final Monitor monitor;
private final Identifier expectedSrcId;
private final Identifier expectedDestId;
- private AtomicInteger count = new AtomicInteger(0);
+ private final AtomicInteger count = new AtomicInteger(0);
public MessageHandler(final Monitor monitor,
final int expected,
@@ -117,20 +119,15 @@
@Override
public void onNext(final Message<T> value) {
- count.incrementAndGet();
- LOG.log(Level.FINE, "Count: {0}", count.get());
- LOG.log(Level.FINE,
- "OUT: {0} received {1} from {2} to {3}",
- new Object[]{value, value.getSrcId(), value.getDestId()});
- for (final T obj : value.getData()) {
- LOG.log(Level.FINE, "OUT: data: {0}", obj);
- }
+ final int currentCount = count.incrementAndGet();
+ LOG.log(Level.FINER, "Message {0}/{1} :: {2}", new Object[] {currentCount, expected, value});
- assert value.getSrcId().equals(expectedSrcId);
- assert value.getDestId().equals(expectedDestId);
+ Assert.assertEquals(expectedSrcId, value.getSrcId());
+ Assert.assertEquals(expectedDestId, value.getDestId());
+ Assert.assertTrue(currentCount <= expected);
- if (count.get() == expected) {
+ if (currentCount >= expected) {
monitor.mnotify();
}
}
@@ -139,11 +136,11 @@
public static final class TestListener<T> implements LinkListener<Message<T>> {
@Override
public void onSuccess(final Message<T> message) {
- LOG.log(Level.FINE, "success: " + message);
+ LOG.log(Level.FINER, "Success: message {0}", message);
}
@Override
public void onException(final Throwable cause, final SocketAddress remoteAddress, final Message<T> message) {
- LOG.log(Level.WARNING, "exception: " + cause + message);
+ LOG.log(Level.WARNING, "Exception: message " + message, cause);
throw new RuntimeException(cause);
}
}