package org.apache.helix.integration;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.HashMap;
import java.util.Map;

import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.MockTask;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.mock.participant.MockDelayMSStateModel;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.Message;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestStateTransitionCancellation extends TaskTestBase {
  // TODO: Replace the thread sleep with synchronized condition check
  private ConfigAccessor _configAccessor;
  private ZkHelixClusterVerifier _verifier;

  @BeforeClass
  public void beforeClass() throws Exception {
    _participants = new MockParticipantManager[_numNodes];
    _numDbs = 1;
    _numPartitions = 20;
    _numNodes = 2;
    _numReplicas = 2;
    _verifier =
        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
            .build();

    _gSetupTool.addCluster(CLUSTER_NAME, true);
    setupParticipants();
    setupDBs();

    registerParticipants(_participants, _numNodes, _startPort, 0, -3000000L);

    // start controller
    String controllerName = CONTROLLER_PREFIX + "_0";
    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
    _controller.syncStart();

    createManagers();
    _configAccessor = new ConfigAccessor(_gZkClient);
  }

  @Test
  public void testCancellationWhenDisableResource() throws InterruptedException {
    // Enable cancellation
    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
    clusterConfig.stateTransitionCancelEnabled(true);
    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);

    // Wait for assignment done
    Thread.sleep(2000);

    // Disable the resource
    _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME,
        WorkflowGenerator.DEFAULT_TGT_DB, false);

    // Wait for pipeline reaching final stage
    Assert.assertTrue(_verifier.verifyByPolling());
    ExternalView externalView = _gSetupTool.getClusterManagementTool()
        .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
    for (String partition : externalView.getPartitionSet()) {
      for (String currentState : externalView.getStateMap(partition).values()) {
        Assert.assertEquals(currentState, "OFFLINE");
      }
    }
  }

  @Test
  public void testDisableCancellationWhenDisableResource() throws InterruptedException {
    // Disable cancellation
    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
    clusterConfig.stateTransitionCancelEnabled(false);
    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);

    // Reenable resource
    stateCleanUp();
    _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME,
        WorkflowGenerator.DEFAULT_TGT_DB, true);

    // Wait for assignment done
    Thread.sleep(2000);

    // Disable the resource
    _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME,
        WorkflowGenerator.DEFAULT_TGT_DB, false);

    // Wait for pipeline reaching final stage
    Thread.sleep(2000L);
    ExternalView externalView = _gSetupTool.getClusterManagementTool()
        .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
    for (String partition : externalView.getPartitionSet()) {
      Assert.assertTrue(externalView.getStateMap(partition).values().contains("SLAVE"));
    }
  }

  @Test
  public void testRebalancingCauseCancellation() throws InterruptedException {
    // Enable cancellation
    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
    clusterConfig.stateTransitionCancelEnabled(true);
    clusterConfig.setPersistBestPossibleAssignment(true);
    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);

    // Reenable resource
    stateCleanUp();
    _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME,
        WorkflowGenerator.DEFAULT_TGT_DB, true);

    // Wait for assignment done
    Thread.sleep(2000);
    int numNodesToStart = 10;
    for (int i = 0; i < numNodesToStart; i++) {
      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + _numNodes + i);
      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
    }
    MockParticipantManager[] newParticipants = new MockParticipantManager[numNodesToStart];
    registerParticipants(newParticipants, numNodesToStart, _startPort + _numNodes, 1000, -3000000L);

    // Wait for pipeline reaching final stage
    Thread.sleep(2000L);
    int numOfMasters = 0;
    ExternalView externalView = _gSetupTool.getClusterManagementTool()
        .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
    for (String partition : externalView.getPartitionSet()) {
      if (externalView.getStateMap(partition).values().contains("MASTER")) {
        numOfMasters++;
      }
    }

    for (MockParticipantManager participant : newParticipants) {
      participant.syncStop();
    }

    // Either partial of state transitions have been cancelled or all the Slave -> Master
    // reassigned to other cluster
    Assert.assertTrue((numOfMasters > 0 && numOfMasters <= _numPartitions));
  }

  private void stateCleanUp() {
    InternalMockDelayMSStateModel._cancelledFirstTime = true;
    InternalMockDelayMSStateModel._cancelledStatic = false;
  }

  @StateModelInfo(initialState = "OFFLINE", states = {
      "MASTER", "SLAVE", "ERROR"
  })
  public static class InternalMockDelayMSStateModel extends StateModel {
    private static Logger LOG = LoggerFactory.getLogger(MockDelayMSStateModel.class);
    private long _delay;
    public static boolean _cancelledStatic;
    public static boolean _cancelledFirstTime;

    public InternalMockDelayMSStateModel(long delay) {
      _delay = delay;
      _cancelledStatic = false;
      _cancelledFirstTime = true;
    }

    @Transition(to = "SLAVE", from = "OFFLINE")
    public void onBecomeSlaveFromOffline(Message message, NotificationContext context) {
      if (_delay > 0) {
        try {
          Thread.sleep(_delay);
        } catch (InterruptedException e) {
          LOG.error("Failed to sleep for " + _delay);
        }
      }
      LOG.info("Become SLAVE from OFFLINE");
    }

    @Transition(to = "MASTER", from = "SLAVE")
    public void onBecomeMasterFromSlave(Message message, NotificationContext context)
        throws InterruptedException, HelixRollbackException {
      if (_cancelledFirstTime && _delay < 0) {
        while (!_cancelledStatic) {
          Thread.sleep(Math.abs(1000L));
        }
        _cancelledFirstTime = false;
        throw new HelixRollbackException("EX");
      }
      LOG.error("Become MASTER from SLAVE");
    }

    @Transition(to = "SLAVE", from = "MASTER")
    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
      LOG.info("Become Slave from Master");
    }

    @Transition(to = "OFFLINE", from = "SLAVE")
    public void onBecomeOfflineFromSlave(Message message, NotificationContext context) {
      LOG.info("Become OFFLINE from SLAVE");
    }

    @Transition(to = "DROPPED", from = "OFFLINE")
    public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
      LOG.info("Become DROPPED FROM OFFLINE");
    }

    @Override
    public void cancel() {
      _cancelledStatic = true;
    }

    @Override
    public boolean isCancelled() {
      return _cancelledStatic;
    }
  }

  public class InMockDelayMSStateModelFactory
      extends StateModelFactory<InternalMockDelayMSStateModel> {
    private long _delay;

    @Override
    public InternalMockDelayMSStateModel createNewStateModel(String resourceName,
        String partitionKey) {
      InternalMockDelayMSStateModel model = new InternalMockDelayMSStateModel(_delay);
      return model;
    }

    public InMockDelayMSStateModelFactory setDelay(long delay) {
      _delay = delay;
      return this;
    }
  }

  private void registerParticipants(MockParticipantManager[] participants, int numNodes,
      int startPort, long sleepTime, long delay) throws InterruptedException {
    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
      @Override
      public Task createNewTask(TaskCallbackContext context) {
        return new MockTask(context);
      }
    });

    for (int i = 0; i < numNodes; i++) {
      String instanceName = PARTICIPANT_PREFIX + "_" + (startPort + i);
      participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);

      // add a state model with non-OFFLINE initial state
      StateMachineEngine stateMach = participants[i].getStateMachineEngine();
      stateMach.registerStateModelFactory("Task",
          new TaskStateModelFactory(participants[i], taskFactoryReg));
      InMockDelayMSStateModelFactory delayFactory =
          new InMockDelayMSStateModelFactory().setDelay(delay);
      stateMach.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);

      participants[i].syncStart();

      if (sleepTime > 0) {
        Thread.sleep(sleepTime);
      }
    }
  }
}
