/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.reef.runtime.common.driver.evaluator;

import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ClosedContext;
import org.apache.reef.driver.context.ContextMessage;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.CompletedEvaluator;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.*;
import org.apache.reef.driver.task.*;
import org.apache.reef.runtime.common.driver.DriverExceptionHandler;
import org.apache.reef.runtime.common.utils.DispatchingEStage;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;

import javax.inject.Inject;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Central dispatcher for all Evaluator related events. This exists once per Evaluator.
 */
public final class EvaluatorMessageDispatcher implements AutoCloseable {

  private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName());

  private final String evaluatorIdentifier;

  /**
   * Dispatcher used for application provided event handlers.
   */
  private final DispatchingEStage applicationDispatcher;

  /**
   * Dispatcher used for service provided event handlers.
   */
  private final DispatchingEStage serviceDispatcher;

  /**
   * Dispatcher used for application provided driver-restart specific event handlers.
   */
  private final DispatchingEStage driverRestartApplicationDispatcher;

  /**
   * Dispatcher used for service provided driver-restart specific event handlers.
   */
  private final DispatchingEStage driverRestartServiceDispatcher;

  @Inject
  private EvaluatorMessageDispatcher(
      // Application-provided Context event handlers
      @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers,
      @Parameter(ContextClosedHandlers.class) final Set<EventHandler<ClosedContext>> contextClosedHandlers,
      @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers,
      @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers,
      // Service-provided Context event handlers
      @Parameter(ServiceContextActiveHandlers.class)
      final Set<EventHandler<ActiveContext>> serviceContextActiveHandlers,
      @Parameter(ServiceContextClosedHandlers.class)
      final Set<EventHandler<ClosedContext>> serviceContextClosedHandlers,
      @Parameter(ServiceContextFailedHandlers.class)
      final Set<EventHandler<FailedContext>> serviceContextFailedHandlers,
      @Parameter(ServiceContextMessageHandlers.class)
      final Set<EventHandler<ContextMessage>> serviceContextMessageHandlers,
      // Application-provided Task event handlers
      @Parameter(TaskRunningHandlers.class) final Set<EventHandler<RunningTask>> taskRunningHandlers,
      @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
      @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
      @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
      @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
      // Service-provided Task event handlers
      @Parameter(ServiceTaskRunningHandlers.class) final Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers,
      @Parameter(ServiceTaskCompletedHandlers.class)
      final Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers,
      @Parameter(ServiceTaskSuspendedHandlers.class)
      final Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers,
      @Parameter(ServiceTaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers,
      @Parameter(ServiceTaskFailedHandlers.class) final Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers,
      // Application-provided Evaluator event handlers
      @Parameter(EvaluatorAllocatedHandlers.class)
      final Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
      @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
      @Parameter(EvaluatorCompletedHandlers.class)
      final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
      // Service-provided Evaluator event handlers
      @Parameter(ServiceEvaluatorAllocatedHandlers.class)
      final Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers,
      @Parameter(ServiceEvaluatorFailedHandlers.class)
      final Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers,
      @Parameter(ServiceEvaluatorCompletedHandlers.class)
      final Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers,

      // Application event handlers specific to a Driver restart
      @Parameter(DriverRestartTaskRunningHandlers.class)
      final Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
      @Parameter(DriverRestartContextActiveHandlers.class)
      final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
      @Parameter(DriverRestartFailedEvaluatorHandlers.class)
      final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedHandlers,

      // Service-provided event handlers specific to a Driver restart
      @Parameter(ServiceDriverRestartTaskRunningHandlers.class)
      final Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers,
      @Parameter(ServiceDriverRestartContextActiveHandlers.class)
      final Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers,
      @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class)
      final Set<EventHandler<FailedEvaluator>> serviceDriverRestartFailedEvaluatorHandlers,

      @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads,
      @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String evaluatorIdentifier,
      final DriverExceptionHandler driverExceptionHandler,
      final IdlenessCallbackEventHandlerFactory idlenessCallbackEventHandlerFactory) {

    LOG.log(Level.FINER, "Creating message dispatcher for {0}", evaluatorIdentifier);

    this.evaluatorIdentifier = evaluatorIdentifier;
    this.serviceDispatcher = new DispatchingEStage(
        driverExceptionHandler, numberOfThreads, "EvaluatorMessageDispatcher:" + evaluatorIdentifier);

    this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
    this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
    this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher);

    // Application Context event handlers
    this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
    this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
    this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
    this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);

    // Service Context event handlers
    this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers);
    this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers);
    this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers);
    this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers);

    // Application Task event handlers.
    this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
    this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
    this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
    this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
    this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);

    // Service Task event handlers
    this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers);
    this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers);
    this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers);
    this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers);
    this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers);

    // Application Evaluator event handlers
    this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);

    // Service Evaluator event handlers
    this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers);
    this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers);
    this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers);

    // Application event handlers specific to a Driver restart
    this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
    this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);

    final Set<EventHandler<FailedEvaluator>> driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>();
    for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : driverRestartEvaluatorFailedHandlers) {
      driverRestartEvaluatorFailedCallbackHandlers.add(
          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler));
    }

    this.driverRestartApplicationDispatcher.register(
        FailedEvaluator.class, driverRestartEvaluatorFailedCallbackHandlers);

    // Service event handlers specific to a Driver restart
    this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers);
    this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers);
    this.driverRestartServiceDispatcher.register(FailedEvaluator.class, serviceDriverRestartFailedEvaluatorHandlers);

    final Set<EventHandler<CompletedEvaluator>> evaluatorCompletedCallbackHandlers = new HashSet<>();
    for (final EventHandler<CompletedEvaluator> evaluatorCompletedHandler : evaluatorCompletedHandlers) {
      evaluatorCompletedCallbackHandlers.add(
          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorCompletedHandler));
    }
    this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedCallbackHandlers);

    final Set<EventHandler<FailedEvaluator>> evaluatorFailedCallbackHandlers = new HashSet<>();
    for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : evaluatorFailedHandlers) {
      evaluatorFailedCallbackHandlers.add(
          idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler));
    }
    this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedCallbackHandlers);

    LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'");
  }

  public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) {
    this.dispatch(AllocatedEvaluator.class, allocatedEvaluator);
  }

  public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) {
    this.dispatch(FailedEvaluator.class, failedEvaluator);
  }

  public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) {
    this.dispatch(CompletedEvaluator.class, completedEvaluator);
  }

  public void onTaskRunning(final RunningTask runningTask) {
    this.dispatch(RunningTask.class, runningTask);
  }

  public void onTaskCompleted(final CompletedTask completedTask) {
    this.dispatch(CompletedTask.class, completedTask);
  }

  public void onTaskSuspended(final SuspendedTask suspendedTask) {
    this.dispatch(SuspendedTask.class, suspendedTask);
  }

  public void onTaskMessage(final TaskMessage taskMessage) {
    this.dispatch(TaskMessage.class, taskMessage);
  }

  public void onTaskFailed(final FailedTask failedTask) {
    this.dispatch(FailedTask.class, failedTask);
  }

  public void onContextActive(final ActiveContext activeContext) {
    this.dispatch(ActiveContext.class, activeContext);
  }

  public void onContextClose(final ClosedContext closedContext) {
    this.dispatch(ClosedContext.class, closedContext);
  }

  public void onContextFailed(final FailedContext failedContext) {
    this.dispatch(FailedContext.class, failedContext);
  }

  public void onContextMessage(final ContextMessage contextMessage) {
    this.dispatch(ContextMessage.class, contextMessage);
  }

  public void onDriverRestartTaskRunning(final RunningTask runningTask) {
    this.dispatchForRestartedDriver(RunningTask.class, runningTask);
  }

  public void onDriverRestartContextActive(final ActiveContext activeContext) {
    this.dispatchForRestartedDriver(ActiveContext.class, activeContext);
  }

  public void onDriverRestartEvaluatorFailed(final FailedEvaluator failedEvaluator) {
    this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator);
  }

  boolean isEmpty() {
    return this.applicationDispatcher.isEmpty();
  }

  private <T, U extends T> void dispatch(final Class<T> type, final U message) {
    this.serviceDispatcher.onNext(type, message);
    this.applicationDispatcher.onNext(type, message);
  }

  private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) {
    this.driverRestartServiceDispatcher.onNext(type, message);
    this.driverRestartApplicationDispatcher.onNext(type, message);
  }

  @Override
  public void close() {
    LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier);
    // This effectively closes all dispatchers as they share the same stage.
    this.serviceDispatcher.close();
    if (!this.serviceDispatcher.isClosed()) {
      LOG.log(Level.SEVERE,
          "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close",
          this.evaluatorIdentifier);
    }
  }
}
