/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * TaskRunner implemention for the CliIndexer task execution service, which runs all tasks in a single process.
 *
 * Two thread pools are used:
 * - A task execution pool, sized to number of worker slots. This is used to setup and execute the Task run() methods.
 * - A control thread pool, sized to number of worker slots. The control threads are responsible for running graceful
 *   shutdown on the Task objects. Only one shutdown per-task can be running at a given time,
 *   so we allocate one control thread per worker slot.
 *
 * Note that separate task logs are not currently supported, all task log entries will be written to the Indexer
 * process log instead.
 */
public class ThreadingTaskRunner
    extends BaseRestorableTaskRunner<ThreadingTaskRunner.ThreadingTaskRunnerWorkItem>
    implements TaskLogStreamer, QuerySegmentWalker
{
  private static final EmittingLogger LOGGER = new EmittingLogger(ThreadingTaskRunner.class);

  private final TaskToolboxFactory toolboxFactory;
  private final TaskLogPusher taskLogPusher;
  private final DruidNode node;
  private final AppenderatorsManager appenderatorsManager;
  private final MultipleFileTaskReportFileWriter taskReportFileWriter;
  private final ListeningExecutorService taskExecutor;
  private final ListeningExecutorService controlThreadExecutor;
  private final WorkerConfig workerConfig;

  private volatile boolean stopping = false;

  @Inject
  public ThreadingTaskRunner(
      TaskToolboxFactory toolboxFactory,
      TaskConfig taskConfig,
      WorkerConfig workerConfig,
      TaskLogPusher taskLogPusher,
      ObjectMapper jsonMapper,
      AppenderatorsManager appenderatorsManager,
      TaskReportFileWriter taskReportFileWriter,
      @Self DruidNode node
  )
  {
    super(jsonMapper, taskConfig);
    this.toolboxFactory = toolboxFactory;
    this.taskLogPusher = taskLogPusher;
    this.node = node;
    this.appenderatorsManager = appenderatorsManager;
    this.taskReportFileWriter = (MultipleFileTaskReportFileWriter) taskReportFileWriter;
    this.workerConfig = workerConfig;
    this.taskExecutor = MoreExecutors.listeningDecorator(
        Execs.multiThreaded(workerConfig.getCapacity(), "threading-task-runner-executor-%d")
    );
    this.controlThreadExecutor = MoreExecutors.listeningDecorator(
        Execs.multiThreaded(workerConfig.getCapacity(), "threading-task-runner-control-%d")
    );
  }

  @Override
  public Optional<ByteSource> streamTaskLog(String taskid, long offset)
  {
    // task logs will appear in the main indexer log, streaming individual task logs is not supported
    return Optional.absent();
  }

  @Override
  public void start()
  {
    // Nothing to start.
  }

  @Override
  public ListenableFuture<TaskStatus> run(Task task)
  {
    synchronized (tasks) {
      tasks.computeIfAbsent(
          task.getId(), k ->
              new ThreadingTaskRunnerWorkItem(
                  task,
                  taskExecutor.submit(
                      new Callable<TaskStatus>() {
                        @Override
                        public TaskStatus call()
                        {
                          final String attemptUUID = UUID.randomUUID().toString();
                          final File taskDir = taskConfig.getTaskDir(task.getId());
                          final File attemptDir = new File(taskDir, attemptUUID);

                          final TaskLocation taskLocation = TaskLocation.create(
                              node.getHost(),
                              node.getPlaintextPort(),
                              node.getTlsPort()
                          );

                          final ThreadingTaskRunnerWorkItem taskWorkItem;

                          try {
                            if (!attemptDir.mkdirs()) {
                              throw new IOE("Could not create directories: %s", attemptDir);
                            }

                            final File taskFile = new File(taskDir, "task.json");
                            final File reportsFile = new File(attemptDir, "report.json");
                            taskReportFileWriter.add(task.getId(), reportsFile);

                            // time to adjust process holders
                            synchronized (tasks) {
                              taskWorkItem = tasks.get(task.getId());

                              if (taskWorkItem == null) {
                                LOGGER.makeAlert("TaskInfo disappeared").addData("task", task.getId()).emit();
                                throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
                              }

                              if (taskWorkItem.shutdown) {
                                throw new IllegalStateException("Task has been shut down!");
                              }
                            }


                            if (!taskFile.exists()) {
                              jsonMapper.writeValue(taskFile, task);
                            }

                            // This will block for a while. So we append the thread information with more details
                            final String priorThreadName = Thread.currentThread().getName();
                            Thread.currentThread()
                                  .setName(StringUtils.format("[%s]-%s", task.getId(), priorThreadName));

                            TaskStatus taskStatus;
                            final TaskToolbox toolbox = toolboxFactory.build(task);
                            TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
                            TaskRunnerUtils.notifyStatusChanged(
                                listeners,
                                task.getId(),
                                TaskStatus.running(task.getId())
                            );

                            taskWorkItem.setState(RunnerTaskState.RUNNING);
                            try {
                              taskStatus = task.run(toolbox);
                            }
                            catch (Throwable t) {
                              LOGGER.error(t, "Exception caught while running the task.");
                              taskStatus = TaskStatus.failure(
                                  task.getId(),
                                  "Failed with an exception. See indexer logs for more details."
                              );
                            }
                            finally {
                              taskWorkItem.setState(RunnerTaskState.NONE);
                              Thread.currentThread().setName(priorThreadName);
                              if (reportsFile.exists()) {
                                taskLogPusher.pushTaskReports(task.getId(), reportsFile);
                              }
                            }

                            TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), taskStatus);
                            return taskStatus;
                          }
                          catch (Throwable t) {
                            LOGGER.error(t, "Exception caught during execution");
                            throw new RuntimeException(t);
                          }
                          finally {
                            try {
                              taskReportFileWriter.delete(task.getId());
                              appenderatorsManager.removeAppenderatorsForTask(task.getId(), task.getDataSource());

                              synchronized (tasks) {
                                tasks.remove(task.getId());
                                if (!stopping) {
                                  saveRunningTasks();
                                }
                              }

                              try {
                                if (!stopping && taskDir.exists()) {
                                  FileUtils.deleteDirectory(taskDir);
                                  LOGGER.info("Removed task directory: %s", taskDir);
                                }
                              }
                              catch (Exception e) {
                                LOGGER.makeAlert(e, "Failed to delete task directory")
                                      .addData("taskDir", taskDir.toString())
                                      .addData("task", task.getId())
                                      .emit();
                              }
                            }
                            catch (Exception e) {
                              LOGGER.error(e, "Suppressing exception caught while cleaning up task");
                            }
                          }
                        }
                      }
                  )
              )
      );
      saveRunningTasks();
      return tasks.get(task.getId()).getResult();
    }
  }

  @Override
  public void shutdown(String taskid, String reason)
  {
    LOGGER.info("Shutdown [%s] because: [%s]", taskid, reason);
    final ThreadingTaskRunnerWorkItem taskInfo;

    synchronized (tasks) {
      taskInfo = tasks.get(taskid);

      if (taskInfo == null) {
        LOGGER.info("Ignoring request to cancel unknown task: %s", taskid);
        return;
      }

      if (taskInfo.shutdown) {
        LOGGER.info(
            "Task [%s] is already shutting down, ignoring duplicate shutdown request with reason [%s]",
            taskid,
            reason
        );
      } else {
        taskInfo.shutdown = true;
        scheduleTaskShutdown(taskInfo);
      }
    }
  }

  /**
   * Submits a callable to the control thread pool that attempts a task graceful shutdown,
   * if shutdown is not already scheduled.
   *
   * The shutdown will wait for the configured timeout and then interrupt the thread if the timeout is exceeded.
   */
  private ListenableFuture scheduleTaskShutdown(ThreadingTaskRunnerWorkItem taskInfo)
  {
    synchronized (tasks) {
      if (taskInfo.shutdownFuture != null) {
        return taskInfo.shutdownFuture;
      }

      taskInfo.shutdownFuture = controlThreadExecutor.submit(
          new Callable<Void>()
          {
            @Override
            public Void call()
            {
              LOGGER.info("Stopping thread for task: %s", taskInfo.getTaskId());
              taskInfo.getTask().stopGracefully(taskConfig);

              try {
                taskInfo.getResult().get(
                    taskConfig.getGracefulShutdownTimeout().toStandardDuration().getMillis(),
                    TimeUnit.MILLISECONDS
                );
              }
              catch (TimeoutException e) {
                // Note that we can't truly force a hard termination of the task, interrupting the thread
                // running the task to hopefully have it stop.
                // In the future we may want to add a forceful shutdown method to the Task interface.
                taskInfo.getResult().cancel(true);
              }
              catch (Exception e) {
                LOGGER.info(e, "Encountered exception while waiting for task [%s] shutdown", taskInfo.getTaskId());
                if (taskInfo.shutdownFuture != null) {
                  taskInfo.shutdownFuture.cancel(true);
                }
              }
              return null;
            }
          }
      );

      return taskInfo.shutdownFuture;
    }
  }

  /**
   * First shuts down the task execution pool and then schedules a graceful shutdown attempt for each active task.
   *
   * After the tasks shutdown gracefully or the graceful shutdown timeout is exceeded, the control thread pool
   * will be terminated (also waiting for the graceful shutdown period for this termination).
   */
  @Override
  public void stop()
  {
    stopping = true;
    taskExecutor.shutdown();

    List<ListenableFuture<Void>> shutdownFutures = new ArrayList<>();
    synchronized (tasks) {
      for (ThreadingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
        shutdownFutures.add(scheduleTaskShutdown(taskWorkItem));
      }
    }
    controlThreadExecutor.shutdown();
    try {
      ListenableFuture<List<Void>> shutdownFuture = Futures.successfulAsList(shutdownFutures);
      shutdownFuture.get();
    }
    catch (Exception e) {
      LOGGER.error(e, "Encountered exception when stopping all tasks.");
    }

    final DateTime start = DateTimes.nowUtc();
    final long gracefulShutdownMillis = taskConfig.getGracefulShutdownTimeout().toStandardDuration().getMillis();

    LOGGER.info("Waiting up to %,dms for shutdown.", gracefulShutdownMillis);
    if (gracefulShutdownMillis > 0) {
      try {
        final boolean terminated = controlThreadExecutor.awaitTermination(
            gracefulShutdownMillis,
            TimeUnit.MILLISECONDS
        );
        final long elapsed = System.currentTimeMillis() - start.getMillis();
        if (terminated) {
          LOGGER.info("Finished stopping in %,dms.", elapsed);
        } else {
          final Set<String> stillRunning;
          synchronized (tasks) {
            stillRunning = ImmutableSet.copyOf(tasks.keySet());
          }
          LOGGER.makeAlert("Failed to stop task threads")
                .addData("stillRunning", stillRunning)
                .addData("elapsed", elapsed)
                .emit();

          LOGGER.warn(
              "Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]",
              elapsed,
              Joiner.on("; ").join(stillRunning)
          );
        }
      }
      catch (InterruptedException e) {
        LOGGER.warn(e, "Interrupted while waiting for executor to finish.");
        Thread.currentThread().interrupt();
      }
    } else {
      LOGGER.warn("Ran out of time, not waiting for executor to finish!");
    }

    appenderatorsManager.shutdown();
  }

  @Override
  public Collection<TaskRunnerWorkItem> getRunningTasks()
  {
    return getTasks(RunnerTaskState.RUNNING);
  }

  @Override
  public Collection<TaskRunnerWorkItem> getPendingTasks()
  {
    return getTasks(RunnerTaskState.PENDING);
  }

  @Nullable
  @Override
  public RunnerTaskState getRunnerTaskState(String taskId)
  {
    final ThreadingTaskRunnerWorkItem workItem = tasks.get(taskId);
    return workItem == null ? null : workItem.getState();
  }

  private Collection<TaskRunnerWorkItem> getTasks(RunnerTaskState state)
  {
    synchronized (tasks) {
      final List<TaskRunnerWorkItem> ret = new ArrayList<>();
      for (final ThreadingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
        if (taskWorkItem.getState() == state) {
          ret.add(taskWorkItem);
        }
      }
      return ret;
    }
  }

  @Override
  public Optional<ScalingStats> getScalingStats()
  {
    return Optional.absent();
  }

  @Override
  public long getTotalTaskSlotCount()
  {
    return workerConfig.getCapacity();
  }

  @Override
  public long getIdleTaskSlotCount()
  {
    return Math.max(getTotalTaskSlotCount() - getUsedTaskSlotCount(), 0);
  }

  @Override
  public long getUsedTaskSlotCount()
  {
    return getRunningTasks().size();
  }

  @Override
  public long getLazyTaskSlotCount()
  {
    return 0;
  }

  @Override
  public long getBlacklistedTaskSlotCount()
  {
    return 0;
  }

  @Override
  public <T> QueryRunner<T> getQueryRunnerForIntervals(
      Query<T> query,
      Iterable<Interval> intervals
  )
  {
    return appenderatorsManager.getQueryRunnerForIntervals(query, intervals);
  }

  @Override
  public <T> QueryRunner<T> getQueryRunnerForSegments(
      Query<T> query,
      Iterable<SegmentDescriptor> specs
  )
  {
    return appenderatorsManager.getQueryRunnerForSegments(query, specs);
  }

  protected static class ThreadingTaskRunnerWorkItem extends TaskRunnerWorkItem
  {
    private final Task task;
    private volatile boolean shutdown = false;
    private volatile ListenableFuture shutdownFuture;
    private volatile RunnerTaskState state;

    private ThreadingTaskRunnerWorkItem(
        Task task,
        ListenableFuture<TaskStatus> statusFuture
    )
    {
      super(task.getId(), statusFuture);
      this.task = task;
      this.state = RunnerTaskState.PENDING;
    }

    public Task getTask()
    {
      return task;
    }

    @Override
    public TaskLocation getLocation()
    {
      return null;
    }

    @Override
    public String getTaskType()
    {
      return task.getType();
    }

    @Override
    public String getDataSource()
    {
      return task.getDataSource();
    }

    public RunnerTaskState getState()
    {
      return state;
    }

    public void setState(RunnerTaskState state)
    {
      this.state = state;
    }
  }
}
