blob: ce66692fe62a8d4496fe2e1d33e4aee47c7c4576 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.bridge.driver.client;
import com.google.common.collect.Sets;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.bridge.driver.client.parameters.DriverClientDispatchThreadCount;
import org.apache.reef.bridge.driver.client.parameters.ClientDriverStopHandler;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ClosedContext;
import org.apache.reef.driver.context.ContextMessage;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.CompletedEvaluator;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.*;
import org.apache.reef.driver.restart.DriverRestartCompleted;
import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.*;
import org.apache.reef.runtime.common.utils.DispatchingEStage;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.event.StartTime;
import org.apache.reef.wake.time.event.StopTime;
import javax.inject.Inject;
import java.util.Set;
/**
* Async dispatch of client driver events.
*/
@Private
public final class DriverClientDispatcher {
/**
* Exception handler.
*/
private final DriverClientExceptionHandler exceptionHandler;
/**
* Dispatcher used for application provided event handlers.
*/
private final DispatchingEStage applicationDispatcher;
/**
* Dispatcher for client close events.
*/
private final DispatchingEStage clientCloseDispatcher;
/**
* Dispatcher for client close with message events.
*/
private final DispatchingEStage clientCloseWithMessageDispatcher;
/**
* Dispatcher for client messages.
*/
private final DispatchingEStage clientMessageDispatcher;
/**
* The alarm dispatcher.
*/
private final DispatchingEStage alarmDispatcher;
/**
* Driver restart dispatcher.
*/
private final DispatchingEStage driverRestartDispatcher;
/**
* Synchronous set of stop handlers.
*/
private final Set<EventHandler<StopTime>> stopHandlers;
@Inject
private DriverClientDispatcher(
final DriverClientExceptionHandler driverExceptionHandler,
final IAlarmDispatchHandler alarmDispatchHandler,
@Parameter(DriverClientDispatchThreadCount.class)
final Integer numberOfThreads,
// Application-provided start and stop handlers
@Parameter(DriverStartHandler.class)
final Set<EventHandler<StartTime>> startHandlers,
@Parameter(ClientDriverStopHandler.class)
final Set<EventHandler<StopTime>> stopHandlers,
// Application-provided Context event handlers
@Parameter(ContextActiveHandlers.class)
final Set<EventHandler<ActiveContext>> contextActiveHandlers,
@Parameter(ContextClosedHandlers.class)
final Set<EventHandler<ClosedContext>> contextClosedHandlers,
@Parameter(ContextFailedHandlers.class)
final Set<EventHandler<FailedContext>> contextFailedHandlers,
@Parameter(ContextMessageHandlers.class)
final Set<EventHandler<ContextMessage>> contextMessageHandlers,
// Application-provided Task event handlers
@Parameter(TaskRunningHandlers.class)
final Set<EventHandler<RunningTask>> taskRunningHandlers,
@Parameter(TaskCompletedHandlers.class)
final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
@Parameter(TaskSuspendedHandlers.class)
final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
@Parameter(TaskMessageHandlers.class)
final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
@Parameter(TaskFailedHandlers.class)
final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
// Application-provided Evaluator event handlers
@Parameter(EvaluatorAllocatedHandlers.class)
final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
@Parameter(EvaluatorFailedHandlers.class)
final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
@Parameter(EvaluatorCompletedHandlers.class)
final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
// Client handlers
@Parameter(ClientCloseHandlers.class)
final Set<EventHandler<Void>> clientCloseHandlers,
@Parameter(ClientCloseWithMessageHandlers.class)
final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
@Parameter(ClientMessageHandlers.class)
final Set<EventHandler<byte[]>> clientMessageHandlers) {
this.exceptionHandler = driverExceptionHandler;
this.applicationDispatcher = new DispatchingEStage(
driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher");
// Application start and stop handlers
this.applicationDispatcher.register(StartTime.class, startHandlers);
this.stopHandlers = stopHandlers; // must be called synchronously
// Application Context event handlers
this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
// Application Task event handlers.
this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
// Application Evaluator event handlers
this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers);
this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers);
// Client event handlers;
this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher);
this.clientCloseDispatcher.register(Void.class, clientCloseHandlers);
this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers);
this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher);
this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers);
// Alarm event handlers
this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher);
this.alarmDispatcher.register(String.class,
Sets.newHashSet((EventHandler<String>)alarmDispatchHandler));
// Driver restart dispatcher
this.driverRestartDispatcher = new DispatchingEStage(this.applicationDispatcher);
}
@Inject
private DriverClientDispatcher(
final DriverClientExceptionHandler driverExceptionHandler,
final IAlarmDispatchHandler alarmDispatchHandler,
@Parameter(DriverClientDispatchThreadCount.class)
final Integer numberOfThreads,
// Application-provided start and stop handlers
@Parameter(DriverStartHandler.class)
final Set<EventHandler<StartTime>> startHandlers,
@Parameter(ClientDriverStopHandler.class)
final Set<EventHandler<StopTime>> stopHandlers,
// Application-provided Context event handlers
@Parameter(ContextActiveHandlers.class)
final Set<EventHandler<ActiveContext>> contextActiveHandlers,
@Parameter(ContextClosedHandlers.class)
final Set<EventHandler<ClosedContext>> contextClosedHandlers,
@Parameter(ContextFailedHandlers.class)
final Set<EventHandler<FailedContext>> contextFailedHandlers,
@Parameter(ContextMessageHandlers.class)
final Set<EventHandler<ContextMessage>> contextMessageHandlers,
// Application-provided Task event handlers
@Parameter(TaskRunningHandlers.class)
final Set<EventHandler<RunningTask>> taskRunningHandlers,
@Parameter(TaskCompletedHandlers.class)
final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
@Parameter(TaskSuspendedHandlers.class)
final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
@Parameter(TaskMessageHandlers.class)
final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
@Parameter(TaskFailedHandlers.class)
final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
// Application-provided Evaluator event handlers
@Parameter(EvaluatorAllocatedHandlers.class)
final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
@Parameter(EvaluatorFailedHandlers.class)
final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
@Parameter(EvaluatorCompletedHandlers.class)
final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
// Client handlers
@Parameter(ClientCloseHandlers.class)
final Set<EventHandler<Void>> clientCloseHandlers,
@Parameter(ClientCloseWithMessageHandlers.class)
final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers,
@Parameter(ClientMessageHandlers.class)
final Set<EventHandler<byte[]>> clientMessageHandlers,
// Driver restart handlers
@Parameter(DriverRestartHandler.class)
final Set<EventHandler<DriverRestarted>> driverRestartHandlers,
@Parameter(DriverRestartTaskRunningHandlers.class)
final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
@Parameter(DriverRestartContextActiveHandlers.class)
final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
@Parameter(DriverRestartCompletedHandlers.class)
final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers,
@Parameter(DriverRestartFailedEvaluatorHandlers.class)
final Set<EventHandler<FailedEvaluator>> driverRestartFailedEvaluatorHandlers) {
this(
driverExceptionHandler,
alarmDispatchHandler,
numberOfThreads,
startHandlers,
stopHandlers,
contextActiveHandlers,
contextClosedHandlers,
contextFailedHandlers,
contextMessageHandlers,
taskRunningHandlers,
taskCompletedHandlers,
taskSuspendedHandlers,
taskMessageEventHandlers,
taskExceptionEventHandlers,
evaluatorAllocatedHandlers,
evaluatorFailedHandlers,
evaluatorCompletedHandlers,
clientCloseHandlers,
clientCloseWithMessageHandlers,
clientMessageHandlers);
// Register driver restart handlers.
this.driverRestartDispatcher.register(DriverRestarted.class, driverRestartHandlers);
this.driverRestartDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
this.driverRestartDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);
this.driverRestartDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers);
this.driverRestartDispatcher.register(FailedEvaluator.class, driverRestartFailedEvaluatorHandlers);
}
public void dispatchRestart(final DriverRestarted driverRestarted) {
this.driverRestartDispatcher.onNext(DriverRestarted.class, driverRestarted);
}
public void dispatchRestart(final RunningTask task) {
this.driverRestartDispatcher.onNext(RunningTask.class, task);
}
public void dispatchRestart(final ActiveContext context) {
this.driverRestartDispatcher.onNext(ActiveContext.class, context);
}
public void dispatchRestart(final DriverRestartCompleted completed) {
this.driverRestartDispatcher.onNext(DriverRestartCompleted.class, completed);
}
public void dispatchRestart(final FailedEvaluator evaluator) {
this.driverRestartDispatcher.onNext(FailedEvaluator.class, evaluator);
}
public void dispatch(final StartTime startTime) {
this.applicationDispatcher.onNext(StartTime.class, startTime);
}
/**
* We must implement this synchronously in order to catch exceptions and
* forward them back via the bridge before the server shuts down, after
* this method returns.
* @param stopTime stop time
*/
@SuppressWarnings("checkstyle:illegalCatch")
public Throwable dispatch(final StopTime stopTime) {
try {
for (final EventHandler<StopTime> handler : stopHandlers) {
handler.onNext(stopTime);
}
return null;
} catch (Throwable t) {
return t;
}
}
public void dispatch(final ActiveContext context) {
this.applicationDispatcher.onNext(ActiveContext.class, context);
}
public void dispatch(final ClosedContext context) {
this.applicationDispatcher.onNext(ClosedContext.class, context);
}
public void dispatch(final FailedContext context) {
this.applicationDispatcher.onNext(FailedContext.class, context);
}
public void dispatch(final ContextMessage message) {
this.applicationDispatcher.onNext(ContextMessage.class, message);
}
public void dispatch(final AllocatedEvaluator evaluator) {
this.applicationDispatcher.onNext(AllocatedEvaluator.class, evaluator);
}
public void dispatch(final FailedEvaluator evaluator) {
this.applicationDispatcher.onNext(FailedEvaluator.class, evaluator);
}
public void dispatch(final CompletedEvaluator evaluator) {
this.applicationDispatcher.onNext(CompletedEvaluator.class, evaluator);
}
public void dispatch(final RunningTask task) {
this.applicationDispatcher.onNext(RunningTask.class, task);
}
public void dispatch(final CompletedTask task) {
this.applicationDispatcher.onNext(CompletedTask.class, task);
}
public void dispatch(final FailedTask task) {
this.applicationDispatcher.onNext(FailedTask.class, task);
}
public void dispatch(final SuspendedTask task) {
this.applicationDispatcher.onNext(SuspendedTask.class, task);
}
public void dispatch(final TaskMessage message) {
this.applicationDispatcher.onNext(TaskMessage.class, message);
}
public void clientCloseDispatch() {
this.clientCloseDispatcher.onNext(Void.class, null);
}
public void clientCloseWithMessageDispatch(final byte[] message) {
this.clientCloseWithMessageDispatcher.onNext(byte[].class, message);
}
public void clientMessageDispatch(final byte[] message) {
this.clientMessageDispatcher.onNext(byte[].class, message);
}
public void dispatchAlarm(final String alarmId) {
this.alarmDispatcher.onNext(String.class, alarmId);
}
}