package org.apache.helix.integration.task;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestStopWorkflow extends TaskTestBase {

  @BeforeClass
  public void beforeClass() throws Exception {
    _numPartitions = 1;
    super.beforeClass();
  }

  @Test
  public void testStopWorkflow() throws InterruptedException {
    stopTestSetup(5);

    String jobQueueName = TestHelper.getTestMethodName();
    JobConfig.Builder jobBuilder =
        JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG).setWorkflow(jobQueueName)
            .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1"));

    JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName);
    jobQueue.enqueueJob("job1_will_succeed", jobBuilder);
    jobQueue.enqueueJob("job2_will_fail", jobBuilder);
    _driver.start(jobQueue.build());

    // job1 should succeed and job2 should fail, wait until that happens
    _driver.pollForJobState(jobQueueName,
        TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"), TaskState.FAILED);

    Assert.assertEquals(TaskState.IN_PROGRESS,
        _driver.getWorkflowContext(jobQueueName).getWorkflowState());

    // Now stop the workflow, and it should be stopped because all jobs have completed or failed.
    _driver.waitToStop(jobQueueName, 4000);
    _driver.pollForWorkflowState(jobQueueName, TaskState.STOPPED);

    Assert.assertEquals(TaskState.STOPPED,
        _driver.getWorkflowContext(jobQueueName).getWorkflowState());

    cleanupParticipants(5);
  }

  /**
   * Tests that stopping a workflow does result in its task ending up in STOPPED state.
   * @throws InterruptedException
   */
  @Test(dependsOnMethods = "testStopWorkflow")
  public void testStopTask() throws Exception {
    stopTestSetup(1);

    String workflowName = TestHelper.getTestMethodName();
    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
    WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName);
    configBuilder.setAllowOverlapJobAssignment(true);
    workflowBuilder.setWorkflowConfig(configBuilder.build());

    for (int i = 0; i < 1; i++) {
      List<TaskConfig> taskConfigs = new ArrayList<>();
      taskConfigs.add(new TaskConfig("StopTask", new HashMap<>()));
      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<>());
      workflowBuilder.addJob("JOB" + i, jobConfigBulider);
    }

    _driver.start(workflowBuilder.build());
    _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);

    // Stop the workflow
    _driver.stop(workflowName);
    _driver.pollForWorkflowState(workflowName, TaskState.STOPPED);

    Assert.assertEquals(TaskDriver.getWorkflowContext(_manager, workflowName).getWorkflowState(),
        TaskState.STOPPED);

    cleanupParticipants(1);
  }

  /**
   * Tests that stop() indeed frees up quotas for tasks belonging to the stopped workflow.
   * @throws InterruptedException
   */
  @Test(dependsOnMethods = "testStopTask")
  public void testStopTaskForQuota() throws Exception {
    stopTestSetup(1);

    String workflowNameToStop = TestHelper.getTestMethodName();
    Workflow.Builder workflowBuilderToStop = new Workflow.Builder(workflowNameToStop);
    WorkflowConfig.Builder configBuilderToStop = new WorkflowConfig.Builder(workflowNameToStop);
    configBuilderToStop.setAllowOverlapJobAssignment(true);
    workflowBuilderToStop.setWorkflowConfig(configBuilderToStop.build());

    // First create 50 jobs so that all 40 threads will be taken up
    for (int i = 0; i < 50; i++) {
      List<TaskConfig> taskConfigs = new ArrayList<>();
      taskConfigs.add(new TaskConfig("StopTask", new HashMap<>()));
      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<>());
      workflowBuilderToStop.addJob("JOB" + i, jobConfigBulider);
    }

    _driver.start(workflowBuilderToStop.build());
    _driver.pollForWorkflowState(workflowNameToStop, TaskState.IN_PROGRESS);

    // Stop the workflow
    _driver.stop(workflowNameToStop);

    _driver.pollForWorkflowState(workflowNameToStop, TaskState.STOPPED);
    Assert.assertEquals(
        TaskDriver.getWorkflowContext(_manager, workflowNameToStop).getWorkflowState(),
        TaskState.STOPPED); // Check that the workflow has been stopped

    // Generate another workflow to be completed this time around
    String workflowToComplete = TestHelper.getTestMethodName() + "ToComplete";
    Workflow.Builder workflowBuilderToComplete = new Workflow.Builder(workflowToComplete);
    WorkflowConfig.Builder configBuilderToComplete = new WorkflowConfig.Builder(workflowToComplete);
    configBuilderToComplete.setAllowOverlapJobAssignment(true);
    workflowBuilderToComplete.setWorkflowConfig(configBuilderToComplete.build());

    // Create 20 jobs that should complete
    for (int i = 0; i < 20; i++) {
      List<TaskConfig> taskConfigs = new ArrayList<>();
      taskConfigs.add(new TaskConfig("CompleteTask", new HashMap<>()));
      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<>());
      workflowBuilderToComplete.addJob("JOB" + i, jobConfigBulider);
    }

    // Start the workflow to be completed
    _driver.start(workflowBuilderToComplete.build());
    _driver.pollForWorkflowState(workflowToComplete, TaskState.COMPLETED);
    Assert.assertEquals(
        TaskDriver.getWorkflowContext(_manager, workflowToComplete).getWorkflowState(),
        TaskState.COMPLETED);

    cleanupParticipants(1);
  }

  /**
   * Test that there is no thread leak when stopping and resuming.
   * @throws InterruptedException
   */
  @Test(dependsOnMethods = "testStopTaskForQuota")
  public void testResumeTaskForQuota() throws Exception {
    stopTestSetup(1);

    String workflowName_1 = TestHelper.getTestMethodName();
    Workflow.Builder workflowBuilder_1 = new Workflow.Builder(workflowName_1);
    WorkflowConfig.Builder configBuilder_1 = new WorkflowConfig.Builder(workflowName_1);
    configBuilder_1.setAllowOverlapJobAssignment(true);
    workflowBuilder_1.setWorkflowConfig(configBuilder_1.build());

    // 30 jobs run first
    for (int i = 0; i < 30; i++) {
      List<TaskConfig> taskConfigs = new ArrayList<>();
      taskConfigs.add(new TaskConfig("StopTask", new HashMap<>()));
      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<>());
      workflowBuilder_1.addJob("JOB" + i, jobConfigBulider);
    }

    _driver.start(workflowBuilder_1.build());

    // Check the jobs are in progress and the tasks are running.
    // Each job has one task. Hence, we just check the state of the partition 0.
    for (int i = 0; i < 30; i++) {
      String jobName = workflowName_1 + "_JOB" + i;
      _driver.pollForJobState(workflowName_1, jobName, TaskState.IN_PROGRESS);
      boolean isTaskInRunningState = TestHelper.verify(() -> {
        JobContext jobContext = _driver.getJobContext(jobName);
        String state = jobContext.getMapField(0).get("STATE");
        return (state!= null && state.equals("RUNNING"));
      }, TestHelper.WAIT_DURATION);
      Assert.assertTrue(isTaskInRunningState);
    }

    _driver.stop(workflowName_1);
    _driver.pollForWorkflowState(workflowName_1, TaskState.STOPPED);

    _driver.resume(workflowName_1);

    // Check the jobs are in progress and the tasks are running.
    // Each job has one task. Hence, we just check the state of the partition 0.
    for (int i = 0; i < 30; i++) {
      String jobName = workflowName_1 + "_JOB" + i;
      _driver.pollForJobState(workflowName_1, jobName, TaskState.IN_PROGRESS);
      boolean isTaskInRunningState = TestHelper.verify(() -> {
        JobContext jobContext = _driver.getJobContext(jobName);
        String state = jobContext.getMapField(0).get("STATE");
        return (state!= null && state.equals("RUNNING"));
      }, TestHelper.WAIT_DURATION);
      Assert.assertTrue(isTaskInRunningState);
    }

    // By now there should only be 30 threads occupied

    String workflowName_2 = TestHelper.getTestMethodName() + "_2";
    Workflow.Builder workflowBuilder_2 = new Workflow.Builder(workflowName_2);
    WorkflowConfig.Builder configBuilder_2 = new WorkflowConfig.Builder(workflowName_2);
    configBuilder_2.setAllowOverlapJobAssignment(true);
    workflowBuilder_2.setWorkflowConfig(configBuilder_2.build());

    // Try to run 10 jobs that complete
    int numJobs = 10;
    for (int i = 0; i < numJobs; i++) {
      List<TaskConfig> taskConfigs = new ArrayList<>();
      taskConfigs.add(new TaskConfig("CompleteTask", new HashMap<>()));
      JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand("Dummy")
          .addTaskConfigs(taskConfigs).setJobCommandConfigMap(new HashMap<>());
      workflowBuilder_2.addJob("JOB" + i, jobConfigBulider);
    }

    // If these jobs complete successfully, then that means there is no thread leak
    _driver.start(workflowBuilder_2.build());
    Assert.assertEquals(_driver.pollForWorkflowState(workflowName_2, TaskState.COMPLETED),
        TaskState.COMPLETED);

    cleanupParticipants(1);
  }

  /**
   * Sets up an environment to make stop task testing easy. Shuts down all Participants and starts
   * only one Participant.
   */
  private void stopTestSetup(int numNodes) {
    // Set task callbacks
    Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
    TaskFactory taskFactory = StopTask::new;
    TaskFactory taskFactoryComplete = MockTask::new;
    taskFactoryReg.put("StopTask", taskFactory);
    taskFactoryReg.put("CompleteTask", taskFactoryComplete);

    stopParticipants();

    for (int i = 0; i < numNodes; i++) {
      String instanceName = _participants[i].getInstanceName();
      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
      // Register a Task state model factory.
      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
      stateMachine.registerStateModelFactory("Task",
          new TaskStateModelFactory(_participants[i], taskFactoryReg));

      _participants[i].syncStart();
    }
  }

  private void cleanupParticipants(int numNodes) {
    for (int i = 0; i < numNodes; i++) {
      if (_participants[i] != null && _participants[i].isConnected()) {
        _participants[i].syncStop();
      }
    }
  }

  /**
   * A mock task class that models a short-lived task to be stopped.
   */
  private class StopTask extends MockTask {
    private boolean _stopFlag = false;

    StopTask(TaskCallbackContext context) {
      super(context);
    }

    @Override
    public TaskResult run() {
      _stopFlag = false;
      while (!_stopFlag) {
        try {
          Thread.sleep(1000L);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }

      // This wait is to prevent the task from completing before being stopped
      try {
        Thread.sleep(500L);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      return new TaskResult(TaskResult.Status.CANCELED, "");
    }

    @Override
    public void cancel() {
      _stopFlag = true;
    }
  }
}
