[REEF-1905] Create RemoteManager with given host and port; clean up the RM factory API
Summary of changes:
* Add a new `.getInstance()` method to create RM by a given host name and port number
* remove duplicate code from `DefaultRemoteManagerImplementation` and call one uber-method for all `.getInstance()` versions
JIRA: [REEF-1905](https://issues.apache.org/jira/browse/REEF-1905)
Pull request:
This closes #1394
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java
index 1e097b6..0a0393c 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultRemoteManagerFactory.java
@@ -34,7 +34,7 @@
*/
final class DefaultRemoteManagerFactory implements RemoteManagerFactory {
- private final Injector injector;
+ private final Injector injector = Tang.Factory.getTang().newInjector();
private final Codec<?> codec;
private final EventHandler<Throwable> errorHandler;
@@ -55,7 +55,7 @@
final LocalAddressProvider localAddressProvider,
final TransportFactory tpFactory,
final TcpPortProvider tcpPortProvider) {
- this.injector = Tang.Factory.getTang().newInjector();
+
this.codec = codec;
this.errorHandler = errorHandler;
this.orderingGuarantee = orderingGuarantee;
@@ -67,125 +67,85 @@
}
@Override
- public RemoteManager getInstance(final String name) {
- try {
- final Injector newInjector = injector.forkInjector();
- newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
- newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, this.codec);
- newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, this.errorHandler);
- newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee);
- newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries);
- newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout);
- newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
- newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
- newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider);
- return newInjector.getInstance(RemoteManager.class);
- } catch (InjectionException e) {
- throw new RuntimeException(e);
- }
+ public RemoteManager getInstance(final String newRmName) {
+ return getInstance(newRmName, 0, this.codec, this.errorHandler);
}
@Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(final String name,
- final String hostAddress,
- final int listeningPort,
- final Codec<T> codec,
- final EventHandler<Throwable> errorHandler,
- final boolean orderingGuarantee,
- final int numberOfTries,
- final int retryTimeout,
- final LocalAddressProvider localAddressProvider,
- final TcpPortProvider tcpPortProvider) {
- try {
- final Injector newInjector = injector.forkInjector();
- newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
- newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
- newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort);
- newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec);
- newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler);
- newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, orderingGuarantee);
- newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries);
- newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout);
- newInjector.bindVolatileInstance(LocalAddressProvider.class, localAddressProvider);
- newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
- newInjector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider);
- return newInjector.getInstance(RemoteManager.class);
- } catch (InjectionException e) {
- throw new RuntimeException(e);
- }
+ public <T> RemoteManager getInstance(final String newRmName,
+ final String newHostAddress,
+ final int newListeningPort,
+ final Codec<T> newCodec) {
+ return getInstance(newRmName, newHostAddress, newListeningPort, newCodec,
+ this.errorHandler, this.orderingGuarantee, this.numberOfTries, this.retryTimeout,
+ this.localAddressProvider, this.tcpPortProvider);
}
@Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(final String name,
- final String hostAddress,
- final int listeningPort,
- final Codec<T> codec,
- final EventHandler<Throwable> errorHandler,
- final boolean orderingGuarantee,
- final int numberOfTries,
- final int retryTimeout) {
- try {
- final Injector newInjector = injector.forkInjector();
- newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
- newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress);
- newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort);
- newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec);
- newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler);
- newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, orderingGuarantee);
- newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, numberOfTries);
- newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, retryTimeout);
- newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
- newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
- newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider);
- return newInjector.getInstance(RemoteManager.class);
- } catch (InjectionException e) {
- throw new RuntimeException(e);
- }
+ public <T> RemoteManager getInstance(final String newRmName,
+ final String newHostAddress,
+ final int newListeningPort,
+ final Codec<T> newCodec,
+ final EventHandler<Throwable> newErrorHandler,
+ final boolean newOrderingGuarantee,
+ final int newNumberOfTries,
+ final int newRetryTimeout) {
+ return getInstance(newRmName, newHostAddress, newListeningPort, newCodec, newErrorHandler,
+ newOrderingGuarantee, newNumberOfTries, newRetryTimeout, this.localAddressProvider, this.tcpPortProvider);
}
@Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(
- final String name, final Codec<T> codec, final EventHandler<Throwable> errorHandler) {
- try {
- final Injector newInjector = injector.forkInjector();
- newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
- newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec);
- newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler);
- newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee);
- newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries);
- newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout);
- newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
- newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
- newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider);
- return newInjector.getInstance(RemoteManager.class);
- } catch (InjectionException e) {
- throw new RuntimeException(e);
- }
+ public <T> RemoteManager getInstance(final String newRmName,
+ final Codec<T> newCodec,
+ final EventHandler<Throwable> newErrorHandler) {
+ return getInstance(newRmName, 0, newCodec, newErrorHandler);
}
@Override
- @SuppressWarnings("checkstyle:hiddenfield")
- public <T> RemoteManager getInstance(final String name,
- final int listeningPort,
- final Codec<T> codec,
- final EventHandler<Throwable> errorHandler) {
+ public <T> RemoteManager getInstance(final String newRmName,
+ final int newListeningPort,
+ final Codec<T> newCodec,
+ final EventHandler<Throwable> newErrorHandler) {
+ return getInstance(newRmName, null, newListeningPort, newCodec, newErrorHandler, this.orderingGuarantee,
+ this.numberOfTries, this.retryTimeout, this.localAddressProvider, this.tcpPortProvider);
+ }
+
+ @Override
+ public <T> RemoteManager getInstance(final String newRmName,
+ final String newHostAddress,
+ final int newListeningPort,
+ final Codec<T> newCodec,
+ final EventHandler<Throwable> newErrorHandler,
+ final boolean newOrderingGuarantee,
+ final int newNumberOfTries,
+ final int newRetryTimeout,
+ final LocalAddressProvider newLocalAddressProvider,
+ final TcpPortProvider newTcpPortProvider) {
try {
+
final Injector newInjector = injector.forkInjector();
- newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, name);
- newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, listeningPort);
- newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, codec);
- newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, errorHandler);
- newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, this.orderingGuarantee);
- newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, this.numberOfTries);
- newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, this.retryTimeout);
- newInjector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider);
+
+ if (newHostAddress != null) {
+ newInjector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, newHostAddress);
+ }
+
+ if (newListeningPort > 0) {
+ newInjector.bindVolatileParameter(RemoteConfiguration.Port.class, newListeningPort);
+ }
+
+ newInjector.bindVolatileParameter(RemoteConfiguration.ManagerName.class, newRmName);
+ newInjector.bindVolatileParameter(RemoteConfiguration.MessageCodec.class, newCodec);
+ newInjector.bindVolatileParameter(RemoteConfiguration.ErrorHandler.class, newErrorHandler);
+ newInjector.bindVolatileParameter(RemoteConfiguration.OrderingGuarantee.class, newOrderingGuarantee);
+ newInjector.bindVolatileParameter(RemoteConfiguration.NumberOfTries.class, newNumberOfTries);
+ newInjector.bindVolatileParameter(RemoteConfiguration.RetryTimeout.class, newRetryTimeout);
+ newInjector.bindVolatileInstance(LocalAddressProvider.class, newLocalAddressProvider);
newInjector.bindVolatileInstance(TransportFactory.class, this.transportFactory);
- newInjector.bindVolatileInstance(TcpPortProvider.class, this.tcpPortProvider);
+ newInjector.bindVolatileInstance(TcpPortProvider.class, newTcpPortProvider);
+
return newInjector.getInstance(RemoteManager.class);
- } catch (InjectionException e) {
+
+ } catch (final InjectionException e) {
throw new RuntimeException(e);
}
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
index 04c928d..60bd2b8 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java
@@ -62,6 +62,19 @@
final EventHandler<Throwable> errorHandler);
/**
+ * @param name the name of the returned RemoteManager to instantiate.
+ * @param hostAddress the address the returned RemoteManager binds to.
+ * @param listeningPort the port on which the returned RemoteManager listens.
+ * @param codec the codec to use to decode the messages sent to / by this RemoteManager.
+ * @param <T> the message type sent / received by the returned RemoteManager.
+ * @return a new instance of RemoteManager with all parameters but the given one injected via Tang.
+ */
+ <T> RemoteManager getInstance(final String name,
+ final String hostAddress,
+ final int listeningPort,
+ final Codec<T> codec);
+
+ /**
* The old constructor of DefaultRemoteManagerImplementation. Avoid if you can.
*
* @param name the name of the returned RemoteManager to instantiate.