blob: 474e6e7c2450b390a18ee34dd47f785fcc4a601a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.io.network;
import org.apache.commons.lang3.StringUtils;
import org.apache.reef.exception.evaluator.NetworkException;
import org.apache.reef.io.network.impl.NetworkService;
import org.apache.reef.io.network.impl.NetworkServiceParameters;
import org.apache.reef.io.network.naming.NameResolver;
import org.apache.reef.io.network.naming.NameResolverConfiguration;
import org.apache.reef.io.network.naming.NameServer;
import org.apache.reef.io.network.naming.NameServerParameters;
import org.apache.reef.io.network.util.StringIdentifierFactory;
import org.apache.reef.io.network.util.Monitor;
import org.apache.reef.io.network.util.StringCodec;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Identifier;
import org.apache.reef.wake.IdentifierFactory;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.net.InetSocketAddress;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Network service test.
*/
public class NetworkServiceTest {
private static final Logger LOG = Logger.getLogger(NetworkServiceTest.class.getName());
private final LocalAddressProvider localAddressProvider;
private final String localAddress;
public NetworkServiceTest() throws InjectionException {
localAddressProvider = Tang.Factory.getTang().newInjector().getInstance(LocalAddressProvider.class);
localAddress = localAddressProvider.getLocalAddress();
}
@Rule
public TestName name = new TestName();
/**
* NetworkService messaging test.
*/
@Test
public void testMessagingNetworkService() throws Exception {
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory);
injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
try (NameServer server = injector.getInstance(NameServer.class)) {
final int nameServerPort = server.getPort();
final int numMessages = 10;
final Monitor monitor = new Monitor();
// network service
final String name2 = "task2";
final String name1 = "task1";
final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
.set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
.set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
.build())
.build();
final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf);
LOG.log(Level.FINEST, "=== Test network service receiver start");
LOG.log(Level.FINEST, "=== Test network service sender start");
try (NameResolver nameResolver = injector2.getInstance(NameResolver.class)) {
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory);
injector2.bindVolatileInstance(NameResolver.class, nameResolver);
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec());
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class,
injector.getInstance(MessagingTransportFactory.class));
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
new ExceptionHandler());
final Injector injectorNs2 = injector2.forkInjector();
injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name2, monitor, numMessages));
final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class);
final Injector injectorNs1 = injector2.forkInjector();
injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name1, null, 0));
final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class);
ns2.registerId(factory.getNewInstance(name2));
final int port2 = ns2.getTransport().getListeningPort();
server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2));
ns1.registerId(factory.getNewInstance(name1));
final int port1 = ns1.getTransport().getListeningPort();
server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1));
final Identifier destId = factory.getNewInstance(name2);
try (Connection<String> conn = ns1.newConnection(destId)) {
conn.open();
for (int count = 0; count < numMessages; ++count) {
conn.write("hello! " + count);
}
monitor.mwait();
} catch (final NetworkException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
/**
* NetworkService messaging rate benchmark.
*/
@Test
public void testMessagingNetworkServiceRate() throws Exception {
Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory);
injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
try (NameServer server = injector.getInstance(NameServer.class)) {
final int nameServerPort = server.getPort();
final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
for (final int size : messageSizes) {
final int numMessages = 300000 / (Math.max(1, size / 512));
final Monitor monitor = new Monitor();
// network service
final String name2 = "task2";
final String name1 = "task1";
final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
.set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
.set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
.build())
.build();
final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf);
LOG.log(Level.FINEST, "=== Test network service receiver start");
LOG.log(Level.FINEST, "=== Test network service sender start");
try (NameResolver nameResolver = injector2.getInstance(NameResolver.class)) {
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory);
injector2.bindVolatileInstance(NameResolver.class, nameResolver);
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec());
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class,
injector.getInstance(MessagingTransportFactory.class));
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
new ExceptionHandler());
final Injector injectorNs2 = injector2.forkInjector();
injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name2, monitor, numMessages));
final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class);
final Injector injectorNs1 = injector2.forkInjector();
injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name1, null, 0));
final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class);
ns2.registerId(factory.getNewInstance(name2));
final int port2 = ns2.getTransport().getListeningPort();
server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2));
ns1.registerId(factory.getNewInstance(name1));
final int port1 = ns1.getTransport().getListeningPort();
server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1));
final Identifier destId = factory.getNewInstance(name2);
final String message = StringUtils.repeat('1', size);
final long start = System.currentTimeMillis();
try (Connection<String> conn = ns1.newConnection(destId)) {
conn.open();
for (int i = 0; i < numMessages; i++) {
conn.write(message);
}
monitor.mwait();
} catch (final NetworkException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
final long end = System.currentTimeMillis();
final double runtime = ((double) end - start) / 1000;
LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages / runtime +
" bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime); // x2 for unicode chars
}
}
}
}
/**
* NetworkService messaging rate benchmark.
*/
@Test
public void testMessagingNetworkServiceRateDisjoint() throws Exception {
Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory);
injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
try (NameServer server = injector.getInstance(NameServer.class)) {
final int nameServerPort = server.getPort();
final BlockingQueue<Object> barrier = new LinkedBlockingQueue<>();
final int numThreads = 4;
final int size = 2000;
final int numMessages = 300000 / (Math.max(1, size / 512));
final int totalNumMessages = numMessages * numThreads;
final ExecutorService e = Executors.newCachedThreadPool();
for (int t = 0; t < numThreads; t++) {
final int tt = t;
e.submit(new Runnable() {
@Override
public void run() {
try {
final Monitor monitor = new Monitor();
// network service
final String name2 = "task2-" + tt;
final String name1 = "task1-" + tt;
final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
.set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
.set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
.build())
.build();
final Injector injector = Tang.Factory.getTang().newInjector(nameResolverConf);
LOG.log(Level.FINEST, "=== Test network service receiver start");
LOG.log(Level.FINEST, "=== Test network service sender start");
try (NameResolver nameResolver = injector.getInstance(NameResolver.class)) {
injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory);
injector.bindVolatileInstance(NameResolver.class, nameResolver);
injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec());
injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class,
injector.getInstance(MessagingTransportFactory.class));
injector.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
new ExceptionHandler());
final Injector injectorNs2 = injector.forkInjector();
injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name2, monitor, numMessages));
final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class);
final Injector injectorNs1 = injector.forkInjector();
injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name1, null, 0));
final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class);
ns2.registerId(factory.getNewInstance(name2));
final int port2 = ns2.getTransport().getListeningPort();
server.register(factory.getNewInstance(name2), new InetSocketAddress(localAddress, port2));
ns1.registerId(factory.getNewInstance(name1));
final int port1 = ns1.getTransport().getListeningPort();
server.register(factory.getNewInstance(name1), new InetSocketAddress(localAddress, port1));
final Identifier destId = factory.getNewInstance(name2);
final String message = StringUtils.repeat('1', size);
try (Connection<String> conn = ns1.newConnection(destId)) {
conn.open();
for (int i = 0; i < numMessages; i++) {
conn.write(message);
}
monitor.mwait();
} catch (final NetworkException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
} catch (final Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
});
}
// start and time
final long start = System.currentTimeMillis();
final Object ignore = new Object();
for (int i = 0; i < numThreads; i++) {
barrier.add(ignore);
}
e.shutdown();
e.awaitTermination(100, TimeUnit.SECONDS);
final long end = System.currentTimeMillis();
final double runtime = ((double) end - start) / 1000;
LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime +
" bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars
}
}
@Test
public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception {
Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory);
injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
try (NameServer server = injector.getInstance(NameServer.class)) {
final int nameServerPort = server.getPort();
final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
for (final int size : messageSizes) {
final int numMessages = 300000 / (Math.max(1, size / 512));
final int numThreads = 2;
final int totalNumMessages = numMessages * numThreads;
final Monitor monitor = new Monitor();
// network service
final String name2 = "task2";
final String name1 = "task1";
final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
.set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
.set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
.build())
.build();
final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf);
LOG.log(Level.FINEST, "=== Test network service receiver start");
LOG.log(Level.FINEST, "=== Test network service sender start");
try (NameResolver nameResolver = injector2.getInstance(NameResolver.class)) {
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory);
injector2.bindVolatileInstance(NameResolver.class, nameResolver);
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec());
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class,
injector.getInstance(MessagingTransportFactory.class));
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
new ExceptionHandler());
final Injector injectorNs2 = injector2.forkInjector();
injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name2, monitor, totalNumMessages));
final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class);
final Injector injectorNs1 = injector2.forkInjector();
injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name1, null, 0));
final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class);
ns2.registerId(factory.getNewInstance(name2));
final int port2 = ns2.getTransport().getListeningPort();
server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2));
ns1.registerId(factory.getNewInstance(name1));
final int port1 = ns1.getTransport().getListeningPort();
server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1));
final Identifier destId = factory.getNewInstance(name2);
try (Connection<String> conn = ns1.newConnection(destId)) {
conn.open();
final String message = StringUtils.repeat('1', size);
final ExecutorService e = Executors.newCachedThreadPool();
final long start = System.currentTimeMillis();
for (int i = 0; i < numThreads; i++) {
e.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < numMessages; i++) {
conn.write(message);
}
}
});
}
e.shutdown();
e.awaitTermination(30, TimeUnit.SECONDS);
monitor.mwait();
final long end = System.currentTimeMillis();
final double runtime = ((double) end - start) / 1000;
LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime +
" bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars
}
}
}
}
}
/**
* NetworkService messaging rate benchmark.
*/
@Test
public void testMessagingNetworkServiceBatchingRate() throws Exception {
Assume.assumeFalse("Use log level INFO to run benchmarking", LOG.isLoggable(Level.FINEST));
LOG.log(Level.FINEST, name.getMethodName());
final IdentifierFactory factory = new StringIdentifierFactory();
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory);
injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
try (NameServer server = injector.getInstance(NameServer.class)) {
final int nameServerPort = server.getPort();
final int batchSize = 1024 * 1024;
final int[] messageSizes = {32, 64, 512};
for (final int size : messageSizes) {
final int numMessages = 300 / (Math.max(1, size / 512));
final Monitor monitor = new Monitor();
// network service
final String name2 = "task2";
final String name1 = "task1";
final Configuration nameResolverConf =
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
.set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress)
.set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
.build())
.build();
final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf);
LOG.log(Level.FINEST, "=== Test network service receiver start");
LOG.log(Level.FINEST, "=== Test network service sender start");
try (NameResolver nameResolver = injector2.getInstance(NameResolver.class)) {
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class, factory);
injector2.bindVolatileInstance(NameResolver.class, nameResolver);
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceCodec.class, new StringCodec());
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceTransportFactory.class,
injector.getInstance(MessagingTransportFactory.class));
injector2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceExceptionHandler.class,
new ExceptionHandler());
final Injector injectorNs2 = injector2.forkInjector();
injectorNs2.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name2, monitor, numMessages));
final NetworkService<String> ns2 = injectorNs2.getInstance(NetworkService.class);
final Injector injectorNs1 = injector2.forkInjector();
injectorNs1.bindVolatileParameter(NetworkServiceParameters.NetworkServiceHandler.class,
new MessageHandler<String>(name1, null, 0));
final NetworkService<String> ns1 = injectorNs1.getInstance(NetworkService.class);
ns2.registerId(factory.getNewInstance(name2));
final int port2 = ns2.getTransport().getListeningPort();
server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2));
ns1.registerId(factory.getNewInstance(name1));
final int port1 = ns1.getTransport().getListeningPort();
server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1));
final Identifier destId = factory.getNewInstance(name2);
final String message = StringUtils.repeat('1', batchSize);
final long start = System.currentTimeMillis();
try (Connection<String> conn = ns1.newConnection(destId)) {
conn.open();
for (int i = 0; i < numMessages; i++) {
conn.write(message);
}
monitor.mwait();
} catch (final NetworkException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
final long end = System.currentTimeMillis();
final double runtime = ((double) end - start) / 1000;
final long numAppMessages = numMessages * batchSize / size;
LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numAppMessages / runtime +
" bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime); // x2 for unicode chars
}
}
}
}
/**
* Test message handler.
*/
class MessageHandler<T> implements EventHandler<Message<T>> {
private final String name;
private final int expected;
private final Monitor monitor;
private final AtomicInteger count = new AtomicInteger(0);
MessageHandler(final String name, final Monitor monitor, final int expected) {
this.name = name;
this.monitor = monitor;
this.expected = expected;
}
@Override
public void onNext(final Message<T> value) {
final int currentCount = count.incrementAndGet();
LOG.log(Level.FINER, "{0} Message {1}/{2} :: {3}", new Object[] {name, currentCount, expected, value});
Assert.assertTrue(currentCount <= expected);
if (currentCount >= expected) {
monitor.mnotify();
}
}
}
/**
* Test exception handler.
*/
class ExceptionHandler implements EventHandler<Exception> {
@Override
public void onNext(final Exception error) {
System.err.println(error);
}
}
}