[REEF-1651] Improve logging in REEF Remote Manager and related classes
Summary of changes:
* Improve logging in the `RemoteManager` and around
* Implement `.toString()` methods for some remote messages
* Log unhandled exceptions in `DefaultErrorHandler`
* Multiple code readability improvements
JIRA:
[REEF-1651](https://issues.apache.org/jira/browse/REEF-1651)
Pull request:
This closes #1166
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
index abdecd7..c090dec 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStartHandler.java
@@ -38,29 +38,33 @@
* This is bound to the start event of the clock and dispatches it to the appropriate application code.
*/
public final class DriverStartHandler implements EventHandler<StartTime> {
+
private static final Logger LOG = Logger.getLogger(DriverStartHandler.class.getName());
private final Set<EventHandler<StartTime>> startHandlers;
private final Set<EventHandler<DriverRestarted>> restartHandlers;
private final Set<EventHandler<DriverRestarted>> serviceRestartHandlers;
+
private final DriverRestartManager driverRestartManager;
@Inject
- DriverStartHandler(@Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
- final Set<EventHandler<StartTime>> startHandlers,
- @Parameter(DriverRestartHandler.class)
- final Set<EventHandler<DriverRestarted>> restartHandlers,
- @Parameter(ServiceDriverRestartedHandlers.class)
- final Set<EventHandler<DriverRestarted>> serviceRestartHandlers,
- final DriverRestartManager driverRestartManager) {
+ private DriverStartHandler(
+ @Parameter(org.apache.reef.driver.parameters.DriverStartHandler.class)
+ final Set<EventHandler<StartTime>> startHandlers,
+ @Parameter(DriverRestartHandler.class)
+ final Set<EventHandler<DriverRestarted>> restartHandlers,
+ @Parameter(ServiceDriverRestartedHandlers.class)
+ final Set<EventHandler<DriverRestarted>> serviceRestartHandlers,
+ final DriverRestartManager driverRestartManager) {
+
this.startHandlers = startHandlers;
this.restartHandlers = restartHandlers;
this.serviceRestartHandlers = serviceRestartHandlers;
this.driverRestartManager = driverRestartManager;
- LOG.log(Level.FINE, "Instantiated `DriverStartHandler with StartHandlers [{0}], RestartHandlers [{1}]," +
- "and ServiceRestartHandlers [{2}].",
- new String[] {this.startHandlers.toString(), this.restartHandlers.toString(),
- this.serviceRestartHandlers.toString()});
+
+ LOG.log(Level.FINE,
+ "Instantiated DriverStartHandler: StartHandlers:{0} RestartHandlers:{1} ServiceRestartHandlers:{2}",
+ new Object[] {this.startHandlers, this.restartHandlers, this.serviceRestartHandlers});
}
@Override
@@ -73,19 +77,20 @@
}
private void onRestart(final StartTime startTime) {
- if (this.restartHandlers.size() > 0) {
- final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
- new ArrayList<>(this.serviceRestartHandlers.size() + this.restartHandlers.size());
- orderedRestartHandlers.addAll(this.serviceRestartHandlers);
- orderedRestartHandlers.addAll(this.restartHandlers);
-
- // This can only be called after calling client restart handlers because REEF.NET
- // JobDriver requires making this call to set up the InterOp handlers.
- this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
- } else {
- throw new DriverFatalRuntimeException("Driver restart happened, but no ON_DRIVER_RESTART handler is bound.");
+ if (this.restartHandlers.isEmpty()) {
+ throw new DriverFatalRuntimeException("Driver restarted, but no ON_DRIVER_RESTART handler is bound.");
}
+
+ final List<EventHandler<DriverRestarted>> orderedRestartHandlers =
+ new ArrayList<>(this.serviceRestartHandlers.size() + this.restartHandlers.size());
+
+ orderedRestartHandlers.addAll(this.serviceRestartHandlers);
+ orderedRestartHandlers.addAll(this.restartHandlers);
+
+ // This can only be called after calling client restart handlers because REEF.NET
+ // JobDriver requires making this call to set up the InterOp handlers.
+ this.driverRestartManager.onRestart(startTime, orderedRestartHandlers);
}
private void onStart(final StartTime startTime) {
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
index 476cc75..4b234a7 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/DefaultErrorHandler.java
@@ -21,18 +21,23 @@
import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* The default RemoteConfiguration.ErrorHandler.
*/
final class DefaultErrorHandler implements EventHandler<Throwable> {
+ private static final Logger LOG = Logger.getLogger(DefaultErrorHandler.class.getName());
+
@Inject
- DefaultErrorHandler() {
+ private DefaultErrorHandler() {
}
@Override
public void onNext(final Throwable value) {
+ LOG.log(Level.SEVERE, "No error handler in RemoteManager", value);
throw new RuntimeException("No error handler bound for RemoteManager.", value);
}
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
index 17b9b13..310de6d 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.reef.wake.remote;
-
import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.Stage;
@@ -29,9 +28,6 @@
*/
@DefaultImplementation(DefaultRemoteManagerImplementation.class)
public interface RemoteManager extends Stage {
- /**
- * Constructor that takes a Codec<T>
- */
/**
* Returns an event handler that can be used to send messages of type T to the
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index d525c51..6012ba1 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -52,60 +52,61 @@
* The timeout used for the execute running in close().
*/
private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final String name;
- private final Transport transport;
- private final RemoteSenderStage reSendStage;
- private final EStage<TransportEvent> reRecvStage;
- private final HandlerContainer handlerContainer;
- private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
- private RemoteIdentifier myIdentifier;
+
/**
* Indicates a hostname that isn't set or known.
*/
public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
+
+ private final String name;
+ private final Transport transport;
+ private final RemoteSenderStage reSendStage;
+ private final EStage<TransportEvent> reRecvStage;
+ private final HandlerContainer handlerContainer;
+
+ private RemoteIdentifier myIdentifier;
+
@Inject
private <T> DefaultRemoteManagerImplementation(
- @Parameter(RemoteConfiguration.ManagerName.class) final String name,
- @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
- @Parameter(RemoteConfiguration.Port.class) final int listeningPort,
- @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec,
- @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler,
- @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee,
- @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
- @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout,
- final LocalAddressProvider localAddressProvider,
- final TransportFactory tpFactory,
- final TcpPortProvider tcpPortProvider) {
+ @Parameter(RemoteConfiguration.ManagerName.class) final String name,
+ @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
+ @Parameter(RemoteConfiguration.Port.class) final int listeningPort,
+ @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec,
+ @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler,
+ @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee,
+ @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries,
+ @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout,
+ final LocalAddressProvider localAddressProvider,
+ final TransportFactory tpFactory,
+ final TcpPortProvider tcpPortProvider) {
this.name = name;
this.handlerContainer = new HandlerContainer<>(name, codec);
this.reRecvStage = orderingGuarantee ?
- new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
- new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
+ new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
+ new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
- this.transport = tpFactory.newInstance(
- hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout,
- tcpPortProvider);
+ this.transport = tpFactory.newInstance(hostAddress, listeningPort,
+ this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider);
this.handlerContainer.setTransport(this.transport);
- this.myIdentifier = new SocketRemoteIdentifier(
- (InetSocketAddress) this.transport.getLocalAddress());
+ this.myIdentifier = new SocketRemoteIdentifier((InetSocketAddress)this.transport.getLocalAddress());
this.reSendStage = new RemoteSenderStage(codec, this.transport, 10);
StageManager.instance().register(this);
- LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. " +
- "Binding address provided by {5}",
- new Object[]{this.name, this.myIdentifier, COUNTER.incrementAndGet(),
- this.transport.getLocalAddress().toString(),
- this.transport.getListeningPort(), localAddressProvider}
- );
- }
+ final int counter = COUNTER.incrementAndGet();
+
+ LOG.log(Level.FINEST,
+ "RemoteManager {0} instantiated id {1} counter {2} listening on {3} Binding address provided by {4}",
+ new Object[] {this.name, this.myIdentifier, counter, this.transport.getLocalAddress(), localAddressProvider});
+ }
/**
* Returns a proxy event handler for a remote identifier and a message type.
@@ -116,7 +117,7 @@
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "RemoteManager: {0} destinationIdentifier: {1} messageType: {2}",
- new Object[]{this.name, destinationIdentifier, messageType.getName()});
+ new Object[] {this.name, destinationIdentifier, messageType.getName()});
}
return new ProxyEventHandler<>(this.myIdentifier, destinationIdentifier,
@@ -131,11 +132,12 @@
public <T, U extends T> AutoCloseable registerHandler(
final RemoteIdentifier sourceIdentifier,
final Class<U> messageType, final EventHandler<T> theHandler) {
+
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "RemoteManager: {0} remoteid: {1} messageType: {2} handler: {3}",
- new Object[]{this.name, sourceIdentifier, messageType.getName(),
- theHandler.getClass().getName()});
+ new Object[] {this.name, sourceIdentifier, messageType.getName(), theHandler.getClass().getName()});
}
+
return this.handlerContainer.registerHandler(sourceIdentifier, messageType, theHandler);
}
@@ -145,10 +147,12 @@
@Override
public <T, U extends T> AutoCloseable registerHandler(
final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) {
+
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "RemoteManager: {0} messageType: {1} handler: {2}",
- new Object[]{this.name, messageType.getName(), theHandler.getClass().getName()});
+ new Object[] {this.name, messageType.getName(), theHandler.getClass().getName()});
}
+
return this.handlerContainer.registerHandler(messageType, theHandler);
}
@@ -162,10 +166,11 @@
@Override
public void close() {
- if (closed.compareAndSet(false, true)) {
+
+ if (this.closed.compareAndSet(false, true)) {
LOG.log(Level.FINE, "RemoteManager: {0} Closing remote manager id: {1}",
- new Object[]{this.name, this.myIdentifier});
+ new Object[] {this.name, this.myIdentifier});
final Runnable closeRunnable = new Runnable() {
@Override
@@ -194,12 +199,13 @@
LOG.log(Level.SEVERE, "Unable to close the remote receiver stage", e);
}
}
-
};
final ExecutorService closeExecutor = Executors.newSingleThreadExecutor();
+
closeExecutor.submit(closeRunnable);
closeExecutor.shutdown();
+
if (!closeExecutor.isShutdown()) {
LOG.log(Level.SEVERE, "close executor did not shutdown properly.");
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
index d972d03..26a7747 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java
@@ -52,8 +52,12 @@
private Transport transport;
HandlerContainer(final String name, final Codec<T> codec) {
+
this.name = name;
this.codec = codec;
+
+ LOG.log(Level.FINER, "Instantiated HandlerContainer {0} with codec {1}",
+ new String[] {this.name, this.codec.getClass().getCanonicalName()});
}
void setTransport(final Transport transport) {
@@ -141,7 +145,7 @@
unsubscribeClass = new SubscriptionHandler.Unsubscriber<Class<? extends T>>() {
@Override
public void unsubscribe(final Class<? extends T> token) {
- LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[] {name, token.getName()});
+ LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[] {name, token.getCanonicalName()});
msgTypeToHandlerMap.remove(token);
}
};
@@ -152,7 +156,7 @@
@Override
public void unsubscribe(final Tuple2<RemoteIdentifier, Class<? extends T>> token) {
LOG.log(Level.FINER, "Unsubscribe: {0} tuple {1},{2}",
- new Object[] {name, token.getT1(), token.getT2().getName()});
+ new Object[] {name, token.getT1(), token.getT2().getCanonicalName()});
tupleToHandlerMap.remove(token);
}
};
@@ -168,41 +172,49 @@
};
/**
- * Dispatches a message.
- *
- * @param value
+ * Dispatch message received from the remote to proper event handler.
+ * @param value Remote message, encoded as byte[].
*/
- @SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
@Override
+ @SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
public synchronized void onNext(final RemoteEvent<byte[]> value) {
- LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new Object[]{this.name, value});
+ LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new Object[] {this.name, value});
final T decodedEvent = this.codec.decode(value.getEvent());
final Class<?> clazz = decodedEvent.getClass();
+ LOG.log(Level.FINEST, "RemoteManager: {0} decoded event {1} :: {2}",
+ new Object[] {this.name, clazz.getCanonicalName(), decodedEvent});
+
// check remote identifier and message type
- final SocketRemoteIdentifier id =
- new SocketRemoteIdentifier((InetSocketAddress) value.remoteAddress());
+ final SocketRemoteIdentifier id = new SocketRemoteIdentifier((InetSocketAddress)value.remoteAddress());
final Tuple2<RemoteIdentifier, Class<?>> tuple = new Tuple2<RemoteIdentifier, Class<?>>(id, clazz);
final EventHandler<T> tupleHandler = (EventHandler<T>) this.tupleToHandlerMap.get(tuple);
+
if (tupleHandler != null) {
- LOG.log(Level.FINER, "Tuple handler: {0}", tuple);
+
+ LOG.log(Level.FINER, "Tuple handler: {0},{1}",
+ new Object[] {tuple.getT1(), tuple.getT2().getCanonicalName()});
+
tupleHandler.onNext(decodedEvent);
+
} else {
- final EventHandler<RemoteMessage<? extends T>> messageHandler =
- this.msgTypeToHandlerMap.get(clazz);
- if (messageHandler != null) {
- LOG.log(Level.FINER, "Message handler: {0}", clazz);
- messageHandler.onNext(new DefaultRemoteMessage(id, decodedEvent));
- } else {
+
+ final EventHandler<RemoteMessage<? extends T>> messageHandler = this.msgTypeToHandlerMap.get(clazz);
+
+ if (messageHandler == null) {
final RuntimeException ex = new RemoteRuntimeException(
- "Unknown message type in dispatch: " + clazz.getName() + " from " + id);
+ "Unknown message type in dispatch: " + clazz.getCanonicalName() + " from " + id);
LOG.log(Level.WARNING, "Unknown message type in dispatch.", ex);
throw ex;
}
+
+ LOG.log(Level.FINER, "Message handler: {0}", clazz.getCanonicalName());
+
+ messageHandler.onNext(new DefaultRemoteMessage(id, decodedEvent));
}
}
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
index 55adbc6..3ece567 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java
@@ -29,7 +29,9 @@
private final T event;
private final long seq;
+
//private static final AtomicLong curSeq = new AtomicLong(0);
+
private SocketAddress localAddr;
private SocketAddress remoteAddr;
@@ -108,17 +110,8 @@
* @return a string representation of this object
*/
public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("RemoteEvent");
- builder.append(" localAddr=");
- builder.append(localAddr);
- builder.append(" remoteAddr=");
- builder.append(remoteAddr);
- builder.append(" seq=");
- builder.append(seq);
- builder.append(" event=");
- builder.append(event);
- return builder.toString();
+ return String.format(
+ "RemoteEvent localAddr=%s remoteAddr=%s seq=%d event=%s:%s",
+ this.localAddr, this.remoteAddr, this.seq, this.event.getClass().getCanonicalName(), this.event);
}
-
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
index 5f6b15e..8565944 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java
@@ -22,11 +22,11 @@
import java.net.SocketAddress;
-
/**
* Event sent from a remote node.
*/
public class TransportEvent {
+
private final byte[] data;
private final SocketAddress localAddr;
private final SocketAddress remoteAddr;
@@ -65,6 +65,13 @@
}
}
+ @Override
+ public String toString() {
+ return String.format(
+ "TransportEvent: {local: %s remote: %s size: %d bytes}",
+ this.localAddr, this.remoteAddr, this.data.length);
+ }
+
/**
* Gets the data.
*
@@ -102,5 +109,4 @@
public SocketAddress getRemoteAddress() {
return remoteAddr;
}
-
}