/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.reef.vortex.driver;

import org.apache.commons.lang.SerializationUtils;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.evaluator.*;
import org.apache.reef.driver.task.RunningTask;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.driver.task.TaskMessage;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.vortex.api.VortexStart;
import org.apache.reef.vortex.common.TaskletFailureReport;
import org.apache.reef.vortex.common.TaskletResultReport;
import org.apache.reef.vortex.common.WorkerReport;
import org.apache.reef.vortex.evaluator.VortexWorker;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.SingleThreadStage;
import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.time.event.StartTime;

import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * REEF Driver for Vortex.
 */
@Unit
@DriverSide
final class VortexDriver {
  private static final Logger LOG = Logger.getLogger(VortexDriver.class.getName());
  private static final int MAX_NUM_OF_FAILURES = 5;
  private static final int SCHEDULER_EVENT = 0; // Dummy number to comply with onNext() interface

  private final AtomicInteger numberOfFailures = new AtomicInteger(0);
  private final EvaluatorRequestor evaluatorRequestor; // for requesting resources
  private final VortexMaster vortexMaster; // Vortex Master
  private final VortexRequestor vortexRequestor; // For sending Commands to remote workers

  // Resource configuration for single thread pool
  private final int evalMem;
  private final int evalNum;
  private final int evalCores;

  private final EStage<VortexStart> vortexStartEStage;
  private final VortexStart vortexStart;
  private final EStage<Integer> pendingTaskletSchedulerEStage;

  @Inject
  private VortexDriver(final EvaluatorRequestor evaluatorRequestor,
                       final VortexRequestor vortexRequestor,
                       final VortexMaster vortexMaster,
                       final VortexStart vortexStart,
                       final VortexStartExecutor vortexStartExecutor,
                       final PendingTaskletScheduler pendingTaskletScheduler,
                       @Parameter(VortexMasterConf.WorkerMem.class) final int workerMem,
                       @Parameter(VortexMasterConf.WorkerNum.class) final int workerNum,
                       @Parameter(VortexMasterConf.WorkerCores.class) final int workerCores,
                       @Parameter(VortexMasterConf.NumberOfVortexStartThreads.class) final int numOfStartThreads) {
    this.vortexStartEStage = new ThreadPoolStage<>(vortexStartExecutor, numOfStartThreads);
    this.vortexStart = vortexStart;
    this.pendingTaskletSchedulerEStage = new SingleThreadStage<>(pendingTaskletScheduler, 1);
    this.evaluatorRequestor = evaluatorRequestor;
    this.vortexMaster = vortexMaster;
    this.vortexRequestor = vortexRequestor;
    this.evalMem = workerMem;
    this.evalNum = workerNum;
    this.evalCores = workerCores;
  }

  /**
   * Driver started.
   */
  final class StartHandler implements EventHandler<StartTime> {
    @Override
    public void onNext(final StartTime startTime) {
      // Initial Evaluator Request
      evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
          .setNumber(evalNum)
          .setMemory(evalMem)
          .setNumberOfCores(evalCores)
          .build());

      // Run Vortex Start
      vortexStartEStage.onNext(vortexStart);

      // Run Scheduler
      pendingTaskletSchedulerEStage.onNext(SCHEDULER_EVENT);
    }
  }

  /**
   * Container allocated.
   */
  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
    @Override
    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
      LOG.log(Level.INFO, "Container allocated");
      final String workerId = allocatedEvaluator.getId() + "_vortex_worker";

      final Configuration workerConfiguration = VortexWorkerConf.CONF
          .set(VortexWorkerConf.NUM_OF_THREADS, evalCores) // NUM_OF_THREADS = evalCores
          .build();

      final Configuration taskConfiguration = TaskConfiguration.CONF
          .set(TaskConfiguration.IDENTIFIER, workerId)
          .set(TaskConfiguration.TASK, VortexWorker.class)
          .set(TaskConfiguration.ON_SEND_MESSAGE, VortexWorker.class)
          .set(TaskConfiguration.ON_MESSAGE, VortexWorker.DriverMessageHandler.class)
          .set(TaskConfiguration.ON_CLOSE, VortexWorker.TaskCloseHandler.class)
          .build();

      allocatedEvaluator.submitTask(Configurations.merge(workerConfiguration, taskConfiguration));
    }
  }

  /**
   * Evaluator up and running.
   */
  final class RunningTaskHandler implements EventHandler<RunningTask> {
    @Override
    public void onNext(final RunningTask reefTask) {
      LOG.log(Level.INFO, "Worker up and running");
      vortexMaster.workerAllocated(new VortexWorkerManager(vortexRequestor, reefTask));
    }
  }

  /**
   * Message received.
   */
  final class TaskMessageHandler implements EventHandler<TaskMessage> {
    @Override
    public void onNext(final TaskMessage taskMessage) {
      final String workerId = taskMessage.getId();
      final WorkerReport workerReport= (WorkerReport)SerializationUtils.deserialize(taskMessage.get());
      switch (workerReport.getType()) {
      case TaskletResult:
        final TaskletResultReport taskletResultReport = (TaskletResultReport)workerReport;
        vortexMaster.taskletCompleted(workerId, taskletResultReport.getTaskletId(), taskletResultReport.getResult());
        break;
      case TaskletFailure:
        final TaskletFailureReport taskletFailureReport = (TaskletFailureReport)workerReport;
        vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletId(), taskletFailureReport.getException());
        break;
      default:
        throw new RuntimeException("Unknown Report");
      }
    }
  }

  /**
   * Evaluator preempted.
   * TODO[REEF-501]: Distinguish different types of FailedEvaluator in Vortex.
   */
  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
    @Override
    public void onNext(final FailedEvaluator failedEvaluator) {
      LOG.log(Level.INFO, "Evaluator preempted");
      if (numberOfFailures.incrementAndGet() >= MAX_NUM_OF_FAILURES) {
        throw new RuntimeException("Exceeded max number of failures");
      } else {
        // We request a new evaluator to take the place of the preempted one
        evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
            .setNumber(1)
            .setMemory(evalMem)
            .setNumberOfCores(evalCores)
            .build());

        vortexMaster.workerPreempted(failedEvaluator.getFailedTask().get().getId());
      }
    }
  }
}
