blob: 3a2772f305dc1fc45e1d3504fec6e02828fe785d [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.reef.wake.remote.impl;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.RemoteIdentifier;
import org.apache.reef.wake.remote.RemoteMessage;
import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
import org.apache.reef.wake.remote.transport.Transport;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
* Main logic to dispatch messages.
* An event handler that receives a remote message with a binary payload,
* decodes a message from the blob, and dispatches that message to a proper handler.
final class HandlerContainer<T> implements EventHandler<RemoteEvent<byte[]>> {
private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName());
private final ConcurrentMap<Class<? extends T>,
EventHandler<RemoteMessage<? extends T>>> msgTypeToHandlerMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Tuple2<RemoteIdentifier, Class<? extends T>>,
EventHandler<? extends T>> tupleToHandlerMap = new ConcurrentHashMap<>();
private final Codec<T> codec;
private final String name;
private Transport transport;
HandlerContainer(final String name, final Codec<T> codec) { = name;
this.codec = codec;
LOG.log(Level.FINER, "Instantiated {0}", this);
public String toString() {
return String.format("HandlerContainer: {name:%s codec:%s}",, this.codec.getClass().getCanonicalName());
void setTransport(final Transport transport) {
this.transport = transport;
* Subscribe for events from a given source and message type.
* @param sourceIdentifier An identifier of an event source.
* @param messageType Java class of messages to dispatch.
* @param theHandler Message handler.
* @return A new subscription object that will cancel its subscription on .close()
public AutoCloseable registerHandler(
final RemoteIdentifier sourceIdentifier,
final Class<? extends T> messageType,
final EventHandler<? extends T> theHandler) {
final Tuple2<RemoteIdentifier, Class<? extends T>> tuple =
new Tuple2<RemoteIdentifier, Class<? extends T>>(sourceIdentifier, messageType);
this.tupleToHandlerMap.put(tuple, theHandler);
"Add handler for tuple: {0},{1}",
new Object[] {tuple.getT1(), tuple.getT2().getCanonicalName()});
return new SubscriptionHandler<>(tuple, this.unsubscribeTuple);
* Subscribe for events of a given message type.
* @param messageType Java class of messages to dispatch.
* @param theHandler Message handler.
* @return A new subscription object that will cancel its subscription on .close()
public AutoCloseable registerHandler(
final Class<? extends T> messageType,
final EventHandler<RemoteMessage<? extends T>> theHandler) {
this.msgTypeToHandlerMap.put(messageType, theHandler);
LOG.log(Level.FINER, "Add handler for class: {0}", messageType.getCanonicalName());
return new SubscriptionHandler<>(messageType, this.unsubscribeClass);
* Specify handler for error messages.
* @param theHandler Error handler.
* @return A new subscription object that will cancel its subscription on .close()
public AutoCloseable registerErrorHandler(final EventHandler<Exception> theHandler) {
return new SubscriptionHandler<>(
new Exception("Token for finding the error handler subscription"), this.unsubscribeException);
* Unsubscribes a handler.
* @param subscription
* @throws org.apache.reef.wake.remote.exception.RemoteRuntimeException if the Subscription type is unknown.
* @deprecated [REEF-1544] Prefer using SubscriptionHandler and the corresponding methods
* instead of the old Subscription class. Remove method after release 0.16.
public void unsubscribe(final Subscription<T> subscription) {
final T token = subscription.getToken();
LOG.log(Level.FINER, "RemoteManager: {0} token {1}", new Object[]{, token});
if (token instanceof Exception) {
} else if (token instanceof Tuple2) {
} else if (token instanceof Class) {
} else {
throw new RemoteRuntimeException(
"Unknown subscription type: " + subscription.getClass().getCanonicalName());
/** Unsubscribe from messages of a given class. */
private final SubscriptionHandler.Unsubscriber<Class<? extends T>>
unsubscribeClass = new SubscriptionHandler.Unsubscriber<Class<? extends T>>() {
public void unsubscribe(final Class<? extends T> token) {
LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[] {name, token.getCanonicalName()});
/** Unsubscribe from event from a certain source and message type. */
private final SubscriptionHandler.Unsubscriber<Tuple2<RemoteIdentifier, Class<? extends T>>>
unsubscribeTuple = new SubscriptionHandler.Unsubscriber<Tuple2<RemoteIdentifier, Class<? extends T>>>() {
public void unsubscribe(final Tuple2<RemoteIdentifier, Class<? extends T>> token) {
LOG.log(Level.FINER, "Unsubscribe: {0} tuple {1},{2}",
new Object[] {name, token.getT1(), token.getT2().getCanonicalName()});
/** Unsubscribe from error messages. */
private final SubscriptionHandler.Unsubscriber<Exception>
unsubscribeException = new SubscriptionHandler.Unsubscriber<Exception>() {
public void unsubscribe(final Exception token) {
LOG.log(Level.FINER, "Unsubscribe: {0} exception {1}", new Object[] {name, token});
* Dispatch message received from the remote to proper event handler.
* @param value Remote message, encoded as byte[].
public synchronized void onNext(final RemoteEvent<byte[]> value) {
LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new Object[] {, value});
final T decodedEvent = this.codec.decode(value.getEvent());
final Class<?> clazz = decodedEvent.getClass();
LOG.log(Level.FINEST, "RemoteManager: {0} decoded event {1} :: {2}",
new Object[] {, clazz.getCanonicalName(), decodedEvent});
// check remote identifier and message type
final SocketRemoteIdentifier id = new SocketRemoteIdentifier((InetSocketAddress)value.remoteAddress());
final Tuple2<RemoteIdentifier, Class<?>> tuple = new Tuple2<RemoteIdentifier, Class<?>>(id, clazz);
final EventHandler<T> tupleHandler = (EventHandler<T>) this.tupleToHandlerMap.get(tuple);
if (tupleHandler != null) {
LOG.log(Level.FINER, "Tuple handler: {0},{1}",
new Object[] {tuple.getT1(), tuple.getT2().getCanonicalName()});
} else {
final EventHandler<RemoteMessage<? extends T>> messageHandler = this.msgTypeToHandlerMap.get(clazz);
if (messageHandler == null) {
final RuntimeException ex = new RemoteRuntimeException(
"Unknown message type in dispatch: " + clazz.getCanonicalName() + " from " + id);
LOG.log(Level.WARNING, "Unknown message type in dispatch.", ex);
throw ex;
LOG.log(Level.FINER, "Message handler: {0}", clazz.getCanonicalName());
messageHandler.onNext(new DefaultRemoteMessage(id, decodedEvent));