/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.clustermanager;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.samza.Partition;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Assert;
import org.junit.Test;


public class TestStandbyAllocator {

  @Test
  public void testWithNoStandby() {
    JobModel jobModel = getJobModelWithStandby(1, 1, 1, Optional.empty());
    List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
    Assert.assertEquals("Constrained container count should be 0", 0, containerConstraints.size());
  }

  @Test
  public void testWithStandby() {
    testWithStandby(2, 1, 2);
    testWithStandby(10, 1, 2);

    testWithStandby(2, 10, 2);
    testWithStandby(2, 10, 4);

    testWithStandby(10, 1, 4);
    testWithStandby(10, 10, 4);
  }


  public void testWithStandby(int nContainers, int nTasks, int replicationFactor) {
    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, replicationFactor, Optional.empty());

    for (String containerID : jobModel.getContainers().keySet()) {
      List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);

      Assert.assertTrue("Constrained container should be valid containers",
          jobModel.getContainers().keySet().containsAll(containerConstraints));

      Assert.assertEquals("Constrained container count should be (replication factor-1)", replicationFactor - 1,
          containerConstraints.size());

      Assert.assertFalse("Constrained containers list should not have the container itself",
          containerConstraints.contains(containerID));

      containerConstraints.forEach(containerConstraintID -> {
        Assert.assertTrue("Constrained containers IDs should correspond to the active container",
            containerID.split("-")[0].equals(containerConstraintID.split("-")[0]));
      });
    }
  }

  // Helper method to create a jobmodel with given number of containers, tasks and replication factor
  public static JobModel getJobModelWithStandby(int nContainers, int nTasks, int replicationFactor, Optional<LocalityManager> localityManager) {
    Map<String, ContainerModel> containerModels = new HashMap<>();
    int taskID = 0;

    for (int j = 0; j < nContainers; j++) {
      Map<TaskName, TaskModel> tasks = new HashMap<>();
      for (int i = 0; i < nTasks; i++) {
        TaskModel taskModel = getTaskModel(taskID++);
        tasks.put(taskModel.getTaskName(), taskModel);
      }
      containerModels.put(String.valueOf(j), new ContainerModel(String.valueOf(j), tasks));
    }

    Map<String, ContainerModel> standbyContainerModels = new HashMap<>();
    for (int i = 0; i < replicationFactor - 1; i++) {
      for (String containerID : containerModels.keySet()) {
        String standbyContainerId = StandbyTaskUtil.getStandbyContainerId(containerID, i);
        Map<TaskName, TaskModel> standbyTasks = getStandbyTasks(containerModels.get(containerID).getTasks(), i);
        standbyContainerModels.put(standbyContainerId, new ContainerModel(standbyContainerId, standbyTasks));
      }
    }

    containerModels.putAll(standbyContainerModels);
    return new JobModel(new MapConfig(), containerModels, localityManager.orElse(null));
  }

  // Helper method that creates a taskmodel with one input ssp
  private static TaskModel getTaskModel(int partitionNum) {
    return new TaskModel(new TaskName("Partition " + partitionNum),
        Collections.singleton(new SystemStreamPartition("test-system", "test-stream", new Partition(partitionNum))),
        new Partition(partitionNum), TaskMode.Active);
  }

  // Helper method to create standby-taskModels from active-taskModels
  private static Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName, TaskModel> tasks, int replicaNum) {
    Map<TaskName, TaskModel> standbyTasks = new HashMap<>();
    tasks.forEach((taskName, taskModel) -> {
      TaskName standbyTaskName = StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);
      standbyTasks.put(standbyTaskName,
          new TaskModel(standbyTaskName, taskModel.getSystemStreamPartitions(), taskModel.getChangelogPartition(),
              TaskMode.Standby));
    });
    return standbyTasks;
  }
}
