package org.apache.helix.controller.changedetector;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;

import org.apache.helix.AccessOption;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
 * This test contains a series of unit tests for ResourceChangeDetector.
 */
public class TestResourceChangeDetector extends ZkTestBase {

  // All possible change types for ResourceChangeDetector except for ClusterConfig
  // since we don't provide the names of changed fields for ClusterConfig
  private static final ChangeType[] RESOURCE_CHANGE_TYPES = {
      ChangeType.IDEAL_STATE, ChangeType.INSTANCE_CONFIG, ChangeType.LIVE_INSTANCE,
      ChangeType.RESOURCE_CONFIG, ChangeType.CLUSTER_CONFIG
  };

  private static final String CLUSTER_NAME = TestHelper.getTestClassName();
  private static final String RESOURCE_NAME = "TestDB";
  private static final String NEW_RESOURCE_NAME = "TestDB2";
  private static final String STATE_MODEL = "MasterSlave";
  // There are 5 possible change types for ResourceChangeDetector
  private static final int NUM_CHANGE_TYPES = 5;
  private static final int NUM_RESOURCES = 1;
  private static final int NUM_PARTITIONS = 10;
  private static final int NUM_REPLICAS = 3;
  private static final int NUM_NODES = 5;

  // Create a mock of ResourceControllerDataProvider so that we could manipulate it
  private ResourceControllerDataProvider _dataProvider;
  private ResourceChangeDetector _resourceChangeDetector;
  private ClusterControllerManager _controller;
  private MockParticipantManager[] _participants = new MockParticipantManager[NUM_NODES];
  private HelixDataAccessor _dataAccessor;
  private PropertyKey.Builder _keyBuilder;

  @BeforeClass
  public void beforeClass() throws Exception {
    super.beforeClass();

    // Set up a mock cluster
    TestHelper.setupCluster(CLUSTER_NAME, ZK_ADDR, 12918, // participant port
        "localhost", // participant name prefix
        RESOURCE_NAME, // resource name prefix
        NUM_RESOURCES, // resources
        NUM_PARTITIONS, // partitions per resource
        NUM_NODES, // nodes
        NUM_REPLICAS, // replicas
        STATE_MODEL, true); // do rebalance

    // Start a controller
    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "controller_0");
    _controller.syncStart();

    // Start Participants
    for (int i = 0; i < NUM_NODES; i++) {
      String instanceName = "localhost_" + (12918 + i);
      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
      _participants[i].syncStart();
    }

    _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
    _keyBuilder = _dataAccessor.keyBuilder();
    _resourceChangeDetector = new ResourceChangeDetector();

    // Create a custom data provider
    _dataProvider = new ResourceControllerDataProvider(CLUSTER_NAME);
  }

  @AfterClass
  public void afterClass() throws Exception {
    for (MockParticipantManager participant : _participants) {
      if (participant != null && participant.isConnected()) {
        participant.syncStop();
      }
    }
    _controller.syncStop();
    deleteCluster(CLUSTER_NAME);
    Assert.assertFalse(TestHelper.verify(() -> _dataAccessor.getBaseDataAccessor()
        .exists("/" + CLUSTER_NAME, AccessOption.PERSISTENT), 20000L));
  }

  /**
   * Tests the initialization of the change detector. It should tell us that there's been changes
   * for every change type and for all items per type.
   * @throws Exception
   */
  @Test
  public void testResourceChangeDetectorInit() {
    _dataProvider.refresh(_dataAccessor);
    _resourceChangeDetector.updateSnapshots(_dataProvider);

    Collection<ChangeType> changeTypes = _resourceChangeDetector.getChangeTypes();
    Assert.assertEquals(changeTypes.size(), NUM_CHANGE_TYPES,
        "Not all change types have been detected for ResourceChangeDetector!");

    // Check that the right amount of resources show up as added
    checkDetectionCounts(ChangeType.IDEAL_STATE, NUM_RESOURCES, 0, 0);

    // Check that the right amount of instances show up as added
    checkDetectionCounts(ChangeType.LIVE_INSTANCE, NUM_NODES, 0, 0);
    checkDetectionCounts(ChangeType.INSTANCE_CONFIG, NUM_NODES, 0, 0);

    // Check that the right amount of cluster config item show up
    checkDetectionCounts(ChangeType.CLUSTER_CONFIG, 1, 0, 0);
  }

  /**
   * Add a resource (IS and ResourceConfig) and see if the detector detects it.
   */
  @Test(dependsOnMethods = "testResourceChangeDetectorInit")
  public void testAddResource() {
    // Create an IS and ResourceConfig
    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, NEW_RESOURCE_NAME,
        NUM_PARTITIONS, STATE_MODEL);
    ResourceConfig resourceConfig = new ResourceConfig(NEW_RESOURCE_NAME);
    _dataAccessor.setProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig);
    // Manually notify dataProvider
    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
    _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);

    // Refresh the data provider
    _dataProvider.refresh(_dataAccessor);

    // Update the detector
    _resourceChangeDetector.updateSnapshots(_dataProvider);

    checkChangeTypes(ChangeType.IDEAL_STATE, ChangeType.RESOURCE_CONFIG);
    // Check the counts
    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
      if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) {
        checkDetectionCounts(type, 1, 0, 0);
      } else {
        checkDetectionCounts(type, 0, 0, 0);
      }
    }
    // Check that detector gives the right item
    Assert.assertTrue(_resourceChangeDetector.getAdditionsByType(ChangeType.RESOURCE_CONFIG)
        .contains(NEW_RESOURCE_NAME));
  }

  /**
   * Modify a resource config for the new resource and test that detector detects it.
   */
  @Test(dependsOnMethods = "testAddResource")
  public void testModifyResource() {
    // Modify resource config
    ResourceConfig resourceConfig =
        _dataAccessor.getProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME));
    resourceConfig.getRecord().setSimpleField("Did I change?", "Yes!");
    _dataAccessor.updateProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig);

    // Notify data provider and check
    _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
    _dataProvider.refresh(_dataAccessor);
    _resourceChangeDetector.updateSnapshots(_dataProvider);

    checkChangeTypes(ChangeType.RESOURCE_CONFIG);
    // Check the counts
    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
      if (type == ChangeType.RESOURCE_CONFIG) {
        checkDetectionCounts(type, 0, 1, 0);
      } else {
        checkDetectionCounts(type, 0, 0, 0);
      }
    }
    Assert.assertTrue(_resourceChangeDetector.getChangesByType(ChangeType.RESOURCE_CONFIG)
        .contains(NEW_RESOURCE_NAME));
  }

  /**
   * Delete the new resource and test that detector detects it.
   */
  @Test(dependsOnMethods = "testModifyResource")
  public void testDeleteResource() {
    // Delete the newly added resource
    _dataAccessor.removeProperty(_keyBuilder.idealStates(NEW_RESOURCE_NAME));
    _dataAccessor.removeProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME));

    // Notify data provider and check
    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
    _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
    _dataProvider.refresh(_dataAccessor);
    _resourceChangeDetector.updateSnapshots(_dataProvider);

    checkChangeTypes(ChangeType.RESOURCE_CONFIG, ChangeType.IDEAL_STATE);
    // Check the counts
    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
      if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) {
        checkDetectionCounts(type, 0, 0, 1);
      } else {
        checkDetectionCounts(type, 0, 0, 0);
      }
    }
  }

  /**
   * Disconnect and reconnect a Participant and see if detector detects.
   */
  @Test(dependsOnMethods = "testDeleteResource")
  public void testDisconnectReconnectInstance() {
    // Disconnect a Participant
    _participants[0].syncStop();
    _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
    _dataProvider.refresh(_dataAccessor);
    _resourceChangeDetector.updateSnapshots(_dataProvider);

    checkChangeTypes(ChangeType.LIVE_INSTANCE);
    // Check the counts
    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
      if (type == ChangeType.LIVE_INSTANCE) {
        checkDetectionCounts(type, 0, 0, 1);
      } else {
        checkDetectionCounts(type, 0, 0, 0);
      }
    }

    // Reconnect the Participant
    _participants[0] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, "localhost_12918");
    _participants[0].syncStart();
    _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
    _dataProvider.refresh(_dataAccessor);
    _resourceChangeDetector.updateSnapshots(_dataProvider);

    checkChangeTypes(ChangeType.LIVE_INSTANCE);
    // Check the counts
    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
      if (type == ChangeType.LIVE_INSTANCE) {
        checkDetectionCounts(type, 1, 0, 0);
      } else {
        checkDetectionCounts(type, 0, 0, 0);
      }
    }
  }

  /**
   * Remove an instance completely and see if detector detects.
   */
  @Test(dependsOnMethods = "testDisconnectReconnectInstance")
  public void testRemoveInstance() {
    _participants[0].syncStop();
    InstanceConfig instanceConfig =
        _dataAccessor.getProperty(_keyBuilder.instanceConfig(_participants[0].getInstanceName()));
    _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig);

    _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
    _dataProvider.notifyDataChange(ChangeType.INSTANCE_CONFIG);
    _dataProvider.refresh(_dataAccessor);
    _resourceChangeDetector.updateSnapshots(_dataProvider);

    checkChangeTypes(ChangeType.LIVE_INSTANCE, ChangeType.INSTANCE_CONFIG);
    // Check the counts
    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
      if (type == ChangeType.LIVE_INSTANCE || type == ChangeType.INSTANCE_CONFIG) {
        checkDetectionCounts(type, 0, 0, 1);
      } else {
        checkDetectionCounts(type, 0, 0, 0);
      }
    }
  }

  /**
   * Modify cluster config and see if detector detects.
   */
  @Test(dependsOnMethods = "testRemoveInstance")
  public void testModifyClusterConfig() {
    // Modify cluster config
    ClusterConfig clusterConfig = _dataAccessor.getProperty(_keyBuilder.clusterConfig());
    clusterConfig.setTopology("Change");
    _dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);

    _dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG);
    _dataProvider.refresh(_dataAccessor);
    _resourceChangeDetector.updateSnapshots(_dataProvider);

    checkChangeTypes(ChangeType.CLUSTER_CONFIG);
    // Check the counts for other types
    for (ChangeType type : RESOURCE_CHANGE_TYPES) {
      if (type == ChangeType.CLUSTER_CONFIG) {
        checkDetectionCounts(type, 0, 1, 0);
      } else {
        checkDetectionCounts(type, 0, 0, 0);
      }
    }
  }

  /**
   * Test that change detector gives correct results when there are no changes after updating
   * snapshots.
   */
  @Test(dependsOnMethods = "testModifyClusterConfig")
  public void testNoChange() {
    // Test twice to make sure that no change is stable across different runs
    for (int i = 0; i < 2; i++) {
      _dataProvider.refresh(_dataAccessor);
      _resourceChangeDetector.updateSnapshots(_dataProvider);

      Assert.assertEquals(_resourceChangeDetector.getChangeTypes().size(), 0);
      // Check the counts for all the other types
      for (ChangeType type : RESOURCE_CHANGE_TYPES) {
        checkDetectionCounts(type, 0, 0, 0);
      }
    }
  }

  /**
   * Modify IdealState mapping fields for a FULL_AUTO resource and see if detector detects.
   */
  @Test(dependsOnMethods = "testNoChange")
  public void testIgnoreControllerGeneratedFields() {
    // Modify cluster config and IdealState to ensure the mapping field of the IdealState will be
    // considered as the fields that are modified by Helix logic.
    ClusterConfig clusterConfig = _dataAccessor.getProperty(_keyBuilder.clusterConfig());
    clusterConfig.setPersistBestPossibleAssignment(true);
    _dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);

    // Create an new IS
    String resourceName = "Resource" + TestHelper.getTestMethodName();
    _gSetupTool.getClusterManagementTool()
        .addResource(CLUSTER_NAME, resourceName, NUM_PARTITIONS, STATE_MODEL);
    IdealState idealState = _dataAccessor.getProperty(_keyBuilder.idealStates(resourceName));
    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
    idealState.getRecord().getMapFields().put("Partition1", new HashMap<>());
    _dataAccessor.updateProperty(_keyBuilder.idealStates(resourceName), idealState);
    _dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG);
    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
    _dataProvider.refresh(_dataAccessor);

    // Test with ignore option to be true
    ResourceChangeDetector changeDetector = new ResourceChangeDetector(true);
    changeDetector.updateSnapshots(_dataProvider);
    // Now, modify the field
    idealState.getRecord().getMapFields().put("Partition1", Collections.singletonMap("foo", "bar"));
    _dataAccessor.updateProperty(_keyBuilder.idealStates(resourceName), idealState);
    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
    _dataProvider.refresh(_dataAccessor);
    changeDetector.updateSnapshots(_dataProvider);
    Assert.assertEquals(changeDetector.getChangeTypes(),
        Collections.singleton(ChangeType.IDEAL_STATE));
    Assert.assertEquals(
        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 0);
  }

  @Test(dependsOnMethods = "testIgnoreControllerGeneratedFields")
  public void testResetSnapshots() {
    // Initialize a new detector with the existing data
    ResourceChangeDetector changeDetector = new ResourceChangeDetector();
    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
    _dataProvider.refresh(_dataAccessor);
    changeDetector.updateSnapshots(_dataProvider);
    Assert.assertEquals(
        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 2);

    // Update the detector with old data, since nothing changed, the result will be empty.
    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
    _dataProvider.refresh(_dataAccessor);
    changeDetector.updateSnapshots(_dataProvider);
    Assert.assertEquals(
        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 0);

    // Reset the snapshots
    changeDetector.resetSnapshots();
    // After reset, all the data in the data provider will be treated as new changes
    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
    _dataProvider.refresh(_dataAccessor);
    changeDetector.updateSnapshots(_dataProvider);
    Assert.assertEquals(
        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 2);
  }

  /**
   * Check that the given change types appear in detector's change types.
   * @param types
   */
  private void checkChangeTypes(ChangeType... types) {
    for (ChangeType type : types) {
      Assert.assertTrue(_resourceChangeDetector.getChangeTypes().contains(type));
    }
  }

  /**
   * Convenience method for checking three types of detections.
   * @param changeType
   * @param additions
   * @param changes
   * @param deletions
   */
  private void checkDetectionCounts(ChangeType changeType, int additions, int changes,
      int deletions) {
    Assert.assertEquals(_resourceChangeDetector.getAdditionsByType(changeType).size(), additions);
    Assert.assertEquals(_resourceChangeDetector.getChangesByType(changeType).size(), changes);
    Assert.assertEquals(_resourceChangeDetector.getRemovalsByType(changeType).size(), deletions);
  }
}
