package org.apache.helix.controller.stages;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Resource;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

public class TestResourceComputationStage extends BaseStageTest {
  /**
   * Case where we have one resource in IdealState
   * @throws Exception
   */
  @Test
  public void testSimple() throws Exception {
    int nodes = 5;
    List<String> instances = new ArrayList<String>();
    for (int i = 0; i < nodes; i++) {
      instances.add("localhost_" + i);
    }
    int partitions = 10;
    int replicas = 1;
    String resourceName = "testResource";
    ZNRecord record =
        DefaultIdealStateCalculator.calculateIdealState(instances, partitions, replicas,
            resourceName, "MASTER", "SLAVE");
    IdealState idealState = new IdealState(record);
    idealState.setStateModelDefRef("MasterSlave");

    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    Builder keyBuilder = accessor.keyBuilder();
    accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
    event.addAttribute(AttributeName.ControllerDataProvider.name(),
        new ResourceControllerDataProvider());
    ResourceComputationStage stage = new ResourceComputationStage();
    runStage(event, new ReadClusterDataStage());
    runStage(event, stage);

    Map<String, Resource> resource = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
    AssertJUnit.assertEquals(1, resource.size());

    AssertJUnit.assertEquals(resource.keySet().iterator().next(), resourceName);
    AssertJUnit.assertEquals(resource.values().iterator().next().getResourceName(), resourceName);
    AssertJUnit.assertEquals(resource.values().iterator().next().getStateModelDefRef(),
        idealState.getStateModelDefRef());
    AssertJUnit
        .assertEquals(resource.values().iterator().next().getPartitions().size(), partitions);
  }

  @Test
  public void testMultipleResources() throws Exception {
    // List<IdealState> idealStates = new ArrayList<IdealState>();
    String[] resources = new String[] {
        "testResource1", "testResource2"
    };
    List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
    event.addAttribute(AttributeName.ControllerDataProvider.name(),
        new ResourceControllerDataProvider());
    ResourceComputationStage stage = new ResourceComputationStage();
    runStage(event, new ReadClusterDataStage());
    runStage(event, stage);

    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
    AssertJUnit.assertEquals(resources.length, resourceMap.size());

    for (int i = 0; i < resources.length; i++) {
      String resourceName = resources[i];
      IdealState idealState = idealStates.get(i);
      AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
      AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
      AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
          idealState.getStateModelDefRef());
      AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
          idealState.getNumPartitions());
    }
  }

  @Test
  public void testMultipleResourcesWithSomeDropped() throws Exception {
    int nodes = 5;
    List<String> instances = new ArrayList<String>();
    for (int i = 0; i < nodes; i++) {
      instances.add("localhost_" + i);
    }
    String[] resources = new String[] {
        "testResource1", "testResource2"
    };
    List<IdealState> idealStates = new ArrayList<IdealState>();
    for (int i = 0; i < resources.length; i++) {
      int partitions = 10;
      int replicas = 1;
      String resourceName = resources[i];
      ZNRecord record =
          DefaultIdealStateCalculator.calculateIdealState(instances, partitions, replicas,
              resourceName, "MASTER", "SLAVE");
      IdealState idealState = new IdealState(record);
      idealState.setStateModelDefRef("MasterSlave");

      HelixDataAccessor accessor = manager.getHelixDataAccessor();
      Builder keyBuilder = accessor.keyBuilder();
      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);

      idealStates.add(idealState);
    }
    // ADD A LIVE INSTANCE WITH A CURRENT STATE THAT CONTAINS RESOURCE WHICH NO
    // LONGER EXISTS IN IDEALSTATE
    String instanceName = "localhost_" + 3;
    LiveInstance liveInstance = new LiveInstance(instanceName);
    String sessionId = UUID.randomUUID().toString();
    liveInstance.setSessionId(sessionId);

    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    Builder keyBuilder = accessor.keyBuilder();
    accessor.setProperty(keyBuilder.liveInstance(instanceName), liveInstance);

    String oldResource = "testResourceOld";
    CurrentState currentState = new CurrentState(oldResource);
    currentState.setState("testResourceOld_0", "OFFLINE");
    currentState.setState("testResourceOld_1", "SLAVE");
    currentState.setState("testResourceOld_2", "MASTER");
    currentState.setStateModelDefRef("MasterSlave");
    accessor.setProperty(keyBuilder.currentState(instanceName, sessionId, oldResource),
        currentState);

    String oldTaskResource = "testTaskResourceOld";
    CurrentState taskCurrentState = new CurrentState(oldTaskResource);
    taskCurrentState.setState("testTaskResourceOld_0", "RUNNING");
    taskCurrentState.setState("testTaskResourceOld_1", "FINISHED");
    taskCurrentState.setStateModelDefRef("Task");
    accessor.setProperty(keyBuilder.taskCurrentState(instanceName, sessionId, oldTaskResource),
        taskCurrentState);

    event.addAttribute(AttributeName.ControllerDataProvider.name(),
        new ResourceControllerDataProvider());
    ResourceComputationStage stage = new ResourceComputationStage();
    runStage(event, new ReadClusterDataStage());
    runStage(event, stage);

    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
    // +1 because it will have one for current state
    AssertJUnit.assertEquals(resources.length + 1, resourceMap.size());

    for (int i = 0; i < resources.length; i++) {
      String resourceName = resources[i];
      IdealState idealState = idealStates.get(i);
      AssertJUnit.assertTrue(resourceMap.containsKey(resourceName));
      AssertJUnit.assertEquals(resourceMap.get(resourceName).getResourceName(), resourceName);
      AssertJUnit.assertEquals(resourceMap.get(resourceName).getStateModelDefRef(),
          idealState.getStateModelDefRef());
      AssertJUnit.assertEquals(resourceMap.get(resourceName).getPartitions().size(),
          idealState.getNumPartitions());
    }
    // Test the data derived from CurrentState
    AssertJUnit.assertTrue(resourceMap.containsKey(oldResource));
    AssertJUnit.assertEquals(resourceMap.get(oldResource).getResourceName(), oldResource);
    AssertJUnit.assertEquals(resourceMap.get(oldResource).getStateModelDefRef(),
        currentState.getStateModelDefRef());
    AssertJUnit.assertEquals(resourceMap.get(oldResource).getPartitions().size(), currentState
        .getPartitionStateMap().size());
    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_0"));
    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_1"));
    AssertJUnit.assertNotNull(resourceMap.get(oldResource).getPartition("testResourceOld_2"));


    event.addAttribute(AttributeName.ControllerDataProvider.name(),
        new WorkflowControllerDataProvider());
    runStage(event, new ReadClusterDataStage());
    runStage(event, stage);

    resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
    // +2 because it will have current state and task current state
    AssertJUnit.assertEquals(resources.length + 2, resourceMap.size());

    Resource taskResource = resourceMap.get(oldTaskResource);
    AssertJUnit.assertNotNull(taskResource);
    AssertJUnit.assertEquals(taskResource.getResourceName(), oldTaskResource);
    AssertJUnit
        .assertEquals(taskResource.getStateModelDefRef(), taskCurrentState.getStateModelDefRef());
    AssertJUnit.assertEquals(taskResource.getPartitions().size(),
        taskCurrentState.getPartitionStateMap().size());
    AssertJUnit.assertNotNull(taskResource.getPartition("testTaskResourceOld_0"));
    AssertJUnit.assertNotNull(taskResource.getPartition("testTaskResourceOld_1"));
  }

  @Test
  public void testNull() {
    ResourceComputationStage stage = new ResourceComputationStage();
    StageContext context = new StageContext();
    stage.init(context);
    stage.preProcess();
    boolean exceptionCaught = false;
    try {
      stage.process(event);
    } catch (Exception e) {
      exceptionCaught = true;
    }
    AssertJUnit.assertTrue(exceptionCaught);
    stage.postProcess();
  }

  // public void testEmptyCluster()
  // {
  // ClusterEvent event = new ClusterEvent("sampleEvent");
  // ClusterManager manager = new Mocks.MockManager();
  // event.addAttribute("clustermanager", manager);
  // ResourceComputationStage stage = new ResourceComputationStage();
  // StageContext context = new StageContext();
  // stage.init(context);
  // stage.preProcess();
  // boolean exceptionCaught = false;
  // try
  // {
  // stage.process(event);
  // } catch (Exception e)
  // {
  // exceptionCaught = true;
  // }
  // Assert.assertTrue(exceptionCaught);
  // stage.postProcess();
  // }

}
