blob: 3a2772f305dc1fc45e1d3504fec6e02828fe785d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.wake.remote.impl;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.RemoteIdentifier;
import org.apache.reef.wake.remote.RemoteMessage;
import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
import org.apache.reef.wake.remote.transport.Transport;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Main logic to dispatch messages.
* An event handler that receives a remote message with a binary payload,
* decodes a message from the blob, and dispatches that message to a proper handler.
*/
final class HandlerContainer<T> implements EventHandler<RemoteEvent<byte[]>> {
private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName());
private final ConcurrentMap<Class<? extends T>,
EventHandler<RemoteMessage<? extends T>>> msgTypeToHandlerMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Tuple2<RemoteIdentifier, Class<? extends T>>,
EventHandler<? extends T>> tupleToHandlerMap = new ConcurrentHashMap<>();
private final Codec<T> codec;
private final String name;
private Transport transport;
HandlerContainer(final String name, final Codec<T> codec) {
this.name = name;
this.codec = codec;
LOG.log(Level.FINER, "Instantiated {0}", this);
}
@Override
public String toString() {
return String.format("HandlerContainer: {name:%s codec:%s}",
this.name, this.codec.getClass().getCanonicalName());
}
void setTransport(final Transport transport) {
this.transport = transport;
}
/**
* Subscribe for events from a given source and message type.
* @param sourceIdentifier An identifier of an event source.
* @param messageType Java class of messages to dispatch.
* @param theHandler Message handler.
* @return A new subscription object that will cancel its subscription on .close()
*/
@SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
public AutoCloseable registerHandler(
final RemoteIdentifier sourceIdentifier,
final Class<? extends T> messageType,
final EventHandler<? extends T> theHandler) {
final Tuple2<RemoteIdentifier, Class<? extends T>> tuple =
new Tuple2<RemoteIdentifier, Class<? extends T>>(sourceIdentifier, messageType);
this.tupleToHandlerMap.put(tuple, theHandler);
LOG.log(Level.FINER,
"Add handler for tuple: {0},{1}",
new Object[] {tuple.getT1(), tuple.getT2().getCanonicalName()});
return new SubscriptionHandler<>(tuple, this.unsubscribeTuple);
}
/**
* Subscribe for events of a given message type.
* @param messageType Java class of messages to dispatch.
* @param theHandler Message handler.
* @return A new subscription object that will cancel its subscription on .close()
*/
public AutoCloseable registerHandler(
final Class<? extends T> messageType,
final EventHandler<RemoteMessage<? extends T>> theHandler) {
this.msgTypeToHandlerMap.put(messageType, theHandler);
LOG.log(Level.FINER, "Add handler for class: {0}", messageType.getCanonicalName());
return new SubscriptionHandler<>(messageType, this.unsubscribeClass);
}
/**
* Specify handler for error messages.
* @param theHandler Error handler.
* @return A new subscription object that will cancel its subscription on .close()
*/
public AutoCloseable registerErrorHandler(final EventHandler<Exception> theHandler) {
this.transport.registerErrorHandler(theHandler);
return new SubscriptionHandler<>(
new Exception("Token for finding the error handler subscription"), this.unsubscribeException);
}
/**
* Unsubscribes a handler.
*
* @param subscription
* @throws org.apache.reef.wake.remote.exception.RemoteRuntimeException if the Subscription type is unknown.
* @deprecated [REEF-1544] Prefer using SubscriptionHandler and the corresponding methods
* instead of the old Subscription class. Remove method after release 0.16.
*/
public void unsubscribe(final Subscription<T> subscription) {
final T token = subscription.getToken();
LOG.log(Level.FINER, "RemoteManager: {0} token {1}", new Object[]{this.name, token});
if (token instanceof Exception) {
this.transport.registerErrorHandler(null);
} else if (token instanceof Tuple2) {
this.tupleToHandlerMap.remove(token);
} else if (token instanceof Class) {
this.msgTypeToHandlerMap.remove(token);
} else {
throw new RemoteRuntimeException(
"Unknown subscription type: " + subscription.getClass().getCanonicalName());
}
}
/** Unsubscribe from messages of a given class. */
private final SubscriptionHandler.Unsubscriber<Class<? extends T>>
unsubscribeClass = new SubscriptionHandler.Unsubscriber<Class<? extends T>>() {
@Override
public void unsubscribe(final Class<? extends T> token) {
LOG.log(Level.FINER, "Unsubscribe: {0} class {1}", new Object[] {name, token.getCanonicalName()});
msgTypeToHandlerMap.remove(token);
}
};
/** Unsubscribe from event from a certain source and message type. */
private final SubscriptionHandler.Unsubscriber<Tuple2<RemoteIdentifier, Class<? extends T>>>
unsubscribeTuple = new SubscriptionHandler.Unsubscriber<Tuple2<RemoteIdentifier, Class<? extends T>>>() {
@Override
public void unsubscribe(final Tuple2<RemoteIdentifier, Class<? extends T>> token) {
LOG.log(Level.FINER, "Unsubscribe: {0} tuple {1},{2}",
new Object[] {name, token.getT1(), token.getT2().getCanonicalName()});
tupleToHandlerMap.remove(token);
}
};
/** Unsubscribe from error messages. */
private final SubscriptionHandler.Unsubscriber<Exception>
unsubscribeException = new SubscriptionHandler.Unsubscriber<Exception>() {
@Override
public void unsubscribe(final Exception token) {
LOG.log(Level.FINER, "Unsubscribe: {0} exception {1}", new Object[] {name, token});
transport.registerErrorHandler(null);
}
};
/**
* Dispatch message received from the remote to proper event handler.
* @param value Remote message, encoded as byte[].
*/
@Override
@SuppressWarnings("checkstyle:diamondoperatorforvariabledefinition")
public synchronized void onNext(final RemoteEvent<byte[]> value) {
LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new Object[] {this.name, value});
final T decodedEvent = this.codec.decode(value.getEvent());
final Class<?> clazz = decodedEvent.getClass();
LOG.log(Level.FINEST, "RemoteManager: {0} decoded event {1} :: {2}",
new Object[] {this.name, clazz.getCanonicalName(), decodedEvent});
// check remote identifier and message type
final SocketRemoteIdentifier id = new SocketRemoteIdentifier((InetSocketAddress)value.remoteAddress());
final Tuple2<RemoteIdentifier, Class<?>> tuple = new Tuple2<RemoteIdentifier, Class<?>>(id, clazz);
final EventHandler<T> tupleHandler = (EventHandler<T>) this.tupleToHandlerMap.get(tuple);
if (tupleHandler != null) {
LOG.log(Level.FINER, "Tuple handler: {0},{1}",
new Object[] {tuple.getT1(), tuple.getT2().getCanonicalName()});
tupleHandler.onNext(decodedEvent);
} else {
final EventHandler<RemoteMessage<? extends T>> messageHandler = this.msgTypeToHandlerMap.get(clazz);
if (messageHandler == null) {
final RuntimeException ex = new RemoteRuntimeException(
"Unknown message type in dispatch: " + clazz.getCanonicalName() + " from " + id);
LOG.log(Level.WARNING, "Unknown message type in dispatch.", ex);
throw ex;
}
LOG.log(Level.FINER, "Message handler: {0}", clazz.getCanonicalName());
messageHandler.onNext(new DefaultRemoteMessage(id, decodedEvent));
}
}
}