| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.reef.bridge.driver.client; |
| |
| import com.google.common.collect.Sets; |
| import org.apache.reef.annotations.audience.Private; |
| import org.apache.reef.bridge.driver.client.parameters.DriverClientDispatchThreadCount; |
| import org.apache.reef.bridge.driver.client.parameters.ClientDriverStopHandler; |
| import org.apache.reef.driver.context.ActiveContext; |
| import org.apache.reef.driver.context.ClosedContext; |
| import org.apache.reef.driver.context.ContextMessage; |
| import org.apache.reef.driver.context.FailedContext; |
| import org.apache.reef.driver.evaluator.AllocatedEvaluator; |
| import org.apache.reef.driver.evaluator.CompletedEvaluator; |
| import org.apache.reef.driver.evaluator.FailedEvaluator; |
| import org.apache.reef.driver.parameters.*; |
| import org.apache.reef.driver.restart.DriverRestartCompleted; |
| import org.apache.reef.driver.restart.DriverRestarted; |
| import org.apache.reef.driver.task.*; |
| import org.apache.reef.runtime.common.utils.DispatchingEStage; |
| import org.apache.reef.tang.annotations.Parameter; |
| import org.apache.reef.wake.EventHandler; |
| import org.apache.reef.wake.time.event.StartTime; |
| import org.apache.reef.wake.time.event.StopTime; |
| |
| import javax.inject.Inject; |
| import java.util.Set; |
| |
| /** |
| * Async dispatch of client driver events. |
| */ |
| @Private |
| public final class DriverClientDispatcher { |
| |
| /** |
| * Exception handler. |
| */ |
| private final DriverClientExceptionHandler exceptionHandler; |
| |
| /** |
| * Dispatcher used for application provided event handlers. |
| */ |
| private final DispatchingEStage applicationDispatcher; |
| |
| /** |
| * Dispatcher for client close events. |
| */ |
| private final DispatchingEStage clientCloseDispatcher; |
| |
| /** |
| * Dispatcher for client close with message events. |
| */ |
| private final DispatchingEStage clientCloseWithMessageDispatcher; |
| |
| /** |
| * Dispatcher for client messages. |
| */ |
| private final DispatchingEStage clientMessageDispatcher; |
| |
| /** |
| * The alarm dispatcher. |
| */ |
| private final DispatchingEStage alarmDispatcher; |
| |
| /** |
| * Driver restart dispatcher. |
| */ |
| private final DispatchingEStage driverRestartDispatcher; |
| |
| |
| /** |
| * Synchronous set of stop handlers. |
| */ |
| private final Set<EventHandler<StopTime>> stopHandlers; |
| |
| @Inject |
| private DriverClientDispatcher( |
| final DriverClientExceptionHandler driverExceptionHandler, |
| final IAlarmDispatchHandler alarmDispatchHandler, |
| @Parameter(DriverClientDispatchThreadCount.class) |
| final Integer numberOfThreads, |
| // Application-provided start and stop handlers |
| @Parameter(DriverStartHandler.class) |
| final Set<EventHandler<StartTime>> startHandlers, |
| @Parameter(ClientDriverStopHandler.class) |
| final Set<EventHandler<StopTime>> stopHandlers, |
| // Application-provided Context event handlers |
| @Parameter(ContextActiveHandlers.class) |
| final Set<EventHandler<ActiveContext>> contextActiveHandlers, |
| @Parameter(ContextClosedHandlers.class) |
| final Set<EventHandler<ClosedContext>> contextClosedHandlers, |
| @Parameter(ContextFailedHandlers.class) |
| final Set<EventHandler<FailedContext>> contextFailedHandlers, |
| @Parameter(ContextMessageHandlers.class) |
| final Set<EventHandler<ContextMessage>> contextMessageHandlers, |
| // Application-provided Task event handlers |
| @Parameter(TaskRunningHandlers.class) |
| final Set<EventHandler<RunningTask>> taskRunningHandlers, |
| @Parameter(TaskCompletedHandlers.class) |
| final Set<EventHandler<CompletedTask>> taskCompletedHandlers, |
| @Parameter(TaskSuspendedHandlers.class) |
| final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, |
| @Parameter(TaskMessageHandlers.class) |
| final Set<EventHandler<TaskMessage>> taskMessageEventHandlers, |
| @Parameter(TaskFailedHandlers.class) |
| final Set<EventHandler<FailedTask>> taskExceptionEventHandlers, |
| // Application-provided Evaluator event handlers |
| @Parameter(EvaluatorAllocatedHandlers.class) |
| final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, |
| @Parameter(EvaluatorFailedHandlers.class) |
| final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, |
| @Parameter(EvaluatorCompletedHandlers.class) |
| final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, |
| // Client handlers |
| @Parameter(ClientCloseHandlers.class) |
| final Set<EventHandler<Void>> clientCloseHandlers, |
| @Parameter(ClientCloseWithMessageHandlers.class) |
| final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers, |
| @Parameter(ClientMessageHandlers.class) |
| final Set<EventHandler<byte[]>> clientMessageHandlers) { |
| this.exceptionHandler = driverExceptionHandler; |
| this.applicationDispatcher = new DispatchingEStage( |
| driverExceptionHandler, numberOfThreads, "ClientDriverDispatcher"); |
| // Application start and stop handlers |
| this.applicationDispatcher.register(StartTime.class, startHandlers); |
| this.stopHandlers = stopHandlers; // must be called synchronously |
| // Application Context event handlers |
| this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers); |
| this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers); |
| this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers); |
| this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers); |
| |
| // Application Task event handlers. |
| this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers); |
| this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers); |
| this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers); |
| this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers); |
| this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers); |
| |
| // Application Evaluator event handlers |
| this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers); |
| this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers); |
| this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers); |
| |
| // Client event handlers; |
| this.clientCloseDispatcher = new DispatchingEStage(this.applicationDispatcher); |
| this.clientCloseDispatcher.register(Void.class, clientCloseHandlers); |
| |
| this.clientCloseWithMessageDispatcher = new DispatchingEStage(this.applicationDispatcher); |
| this.clientCloseWithMessageDispatcher.register(byte[].class, clientCloseWithMessageHandlers); |
| |
| this.clientMessageDispatcher = new DispatchingEStage(this.applicationDispatcher); |
| this.clientMessageDispatcher.register(byte[].class, clientMessageHandlers); |
| |
| // Alarm event handlers |
| this.alarmDispatcher = new DispatchingEStage(this.applicationDispatcher); |
| this.alarmDispatcher.register(String.class, |
| Sets.newHashSet((EventHandler<String>)alarmDispatchHandler)); |
| |
| // Driver restart dispatcher |
| this.driverRestartDispatcher = new DispatchingEStage(this.applicationDispatcher); |
| } |
| |
| @Inject |
| private DriverClientDispatcher( |
| final DriverClientExceptionHandler driverExceptionHandler, |
| final IAlarmDispatchHandler alarmDispatchHandler, |
| @Parameter(DriverClientDispatchThreadCount.class) |
| final Integer numberOfThreads, |
| // Application-provided start and stop handlers |
| @Parameter(DriverStartHandler.class) |
| final Set<EventHandler<StartTime>> startHandlers, |
| @Parameter(ClientDriverStopHandler.class) |
| final Set<EventHandler<StopTime>> stopHandlers, |
| // Application-provided Context event handlers |
| @Parameter(ContextActiveHandlers.class) |
| final Set<EventHandler<ActiveContext>> contextActiveHandlers, |
| @Parameter(ContextClosedHandlers.class) |
| final Set<EventHandler<ClosedContext>> contextClosedHandlers, |
| @Parameter(ContextFailedHandlers.class) |
| final Set<EventHandler<FailedContext>> contextFailedHandlers, |
| @Parameter(ContextMessageHandlers.class) |
| final Set<EventHandler<ContextMessage>> contextMessageHandlers, |
| // Application-provided Task event handlers |
| @Parameter(TaskRunningHandlers.class) |
| final Set<EventHandler<RunningTask>> taskRunningHandlers, |
| @Parameter(TaskCompletedHandlers.class) |
| final Set<EventHandler<CompletedTask>> taskCompletedHandlers, |
| @Parameter(TaskSuspendedHandlers.class) |
| final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers, |
| @Parameter(TaskMessageHandlers.class) |
| final Set<EventHandler<TaskMessage>> taskMessageEventHandlers, |
| @Parameter(TaskFailedHandlers.class) |
| final Set<EventHandler<FailedTask>> taskExceptionEventHandlers, |
| // Application-provided Evaluator event handlers |
| @Parameter(EvaluatorAllocatedHandlers.class) |
| final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers, |
| @Parameter(EvaluatorFailedHandlers.class) |
| final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers, |
| @Parameter(EvaluatorCompletedHandlers.class) |
| final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers, |
| // Client handlers |
| @Parameter(ClientCloseHandlers.class) |
| final Set<EventHandler<Void>> clientCloseHandlers, |
| @Parameter(ClientCloseWithMessageHandlers.class) |
| final Set<EventHandler<byte[]>> clientCloseWithMessageHandlers, |
| @Parameter(ClientMessageHandlers.class) |
| final Set<EventHandler<byte[]>> clientMessageHandlers, |
| // Driver restart handlers |
| @Parameter(DriverRestartHandler.class) |
| final Set<EventHandler<DriverRestarted>> driverRestartHandlers, |
| @Parameter(DriverRestartTaskRunningHandlers.class) |
| final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers, |
| @Parameter(DriverRestartContextActiveHandlers.class) |
| final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers, |
| @Parameter(DriverRestartCompletedHandlers.class) |
| final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers, |
| @Parameter(DriverRestartFailedEvaluatorHandlers.class) |
| final Set<EventHandler<FailedEvaluator>> driverRestartFailedEvaluatorHandlers) { |
| this( |
| driverExceptionHandler, |
| alarmDispatchHandler, |
| numberOfThreads, |
| startHandlers, |
| stopHandlers, |
| contextActiveHandlers, |
| contextClosedHandlers, |
| contextFailedHandlers, |
| contextMessageHandlers, |
| taskRunningHandlers, |
| taskCompletedHandlers, |
| taskSuspendedHandlers, |
| taskMessageEventHandlers, |
| taskExceptionEventHandlers, |
| evaluatorAllocatedHandlers, |
| evaluatorFailedHandlers, |
| evaluatorCompletedHandlers, |
| clientCloseHandlers, |
| clientCloseWithMessageHandlers, |
| clientMessageHandlers); |
| // Register driver restart handlers. |
| this.driverRestartDispatcher.register(DriverRestarted.class, driverRestartHandlers); |
| this.driverRestartDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers); |
| this.driverRestartDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers); |
| this.driverRestartDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers); |
| this.driverRestartDispatcher.register(FailedEvaluator.class, driverRestartFailedEvaluatorHandlers); |
| } |
| |
| public void dispatchRestart(final DriverRestarted driverRestarted) { |
| this.driverRestartDispatcher.onNext(DriverRestarted.class, driverRestarted); |
| } |
| |
| public void dispatchRestart(final RunningTask task) { |
| this.driverRestartDispatcher.onNext(RunningTask.class, task); |
| } |
| |
| public void dispatchRestart(final ActiveContext context) { |
| this.driverRestartDispatcher.onNext(ActiveContext.class, context); |
| } |
| |
| public void dispatchRestart(final DriverRestartCompleted completed) { |
| this.driverRestartDispatcher.onNext(DriverRestartCompleted.class, completed); |
| } |
| |
| public void dispatchRestart(final FailedEvaluator evaluator) { |
| this.driverRestartDispatcher.onNext(FailedEvaluator.class, evaluator); |
| } |
| |
| public void dispatch(final StartTime startTime) { |
| this.applicationDispatcher.onNext(StartTime.class, startTime); |
| } |
| |
| /** |
| * We must implement this synchronously in order to catch exceptions and |
| * forward them back via the bridge before the server shuts down, after |
| * this method returns. |
| * @param stopTime stop time |
| */ |
| @SuppressWarnings("checkstyle:illegalCatch") |
| public Throwable dispatch(final StopTime stopTime) { |
| try { |
| for (final EventHandler<StopTime> handler : stopHandlers) { |
| handler.onNext(stopTime); |
| } |
| return null; |
| } catch (Throwable t) { |
| return t; |
| } |
| } |
| |
| public void dispatch(final ActiveContext context) { |
| this.applicationDispatcher.onNext(ActiveContext.class, context); |
| } |
| |
| public void dispatch(final ClosedContext context) { |
| this.applicationDispatcher.onNext(ClosedContext.class, context); |
| } |
| |
| public void dispatch(final FailedContext context) { |
| this.applicationDispatcher.onNext(FailedContext.class, context); |
| } |
| |
| public void dispatch(final ContextMessage message) { |
| this.applicationDispatcher.onNext(ContextMessage.class, message); |
| } |
| |
| public void dispatch(final AllocatedEvaluator evaluator) { |
| this.applicationDispatcher.onNext(AllocatedEvaluator.class, evaluator); |
| } |
| |
| public void dispatch(final FailedEvaluator evaluator) { |
| this.applicationDispatcher.onNext(FailedEvaluator.class, evaluator); |
| } |
| |
| public void dispatch(final CompletedEvaluator evaluator) { |
| this.applicationDispatcher.onNext(CompletedEvaluator.class, evaluator); |
| } |
| |
| public void dispatch(final RunningTask task) { |
| this.applicationDispatcher.onNext(RunningTask.class, task); |
| } |
| |
| public void dispatch(final CompletedTask task) { |
| this.applicationDispatcher.onNext(CompletedTask.class, task); |
| } |
| |
| public void dispatch(final FailedTask task) { |
| this.applicationDispatcher.onNext(FailedTask.class, task); |
| } |
| |
| public void dispatch(final SuspendedTask task) { |
| this.applicationDispatcher.onNext(SuspendedTask.class, task); |
| } |
| |
| public void dispatch(final TaskMessage message) { |
| this.applicationDispatcher.onNext(TaskMessage.class, message); |
| } |
| |
| public void clientCloseDispatch() { |
| this.clientCloseDispatcher.onNext(Void.class, null); |
| } |
| |
| public void clientCloseWithMessageDispatch(final byte[] message) { |
| this.clientCloseWithMessageDispatcher.onNext(byte[].class, message); |
| } |
| |
| public void clientMessageDispatch(final byte[] message) { |
| this.clientMessageDispatcher.onNext(byte[].class, message); |
| } |
| |
| public void dispatchAlarm(final String alarmId) { |
| this.alarmDispatcher.onNext(String.class, alarmId); |
| } |
| } |