/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pinot.controller.helix.core.assignment.segment.strategy;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
  private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaGroupSegmentAssignmentStrategy.class);

  private static HelixManager _helixManager;
  private static String _tableName;
  private static String _partitionColumn;
  private int _replication;
  private TableConfig _tableConfig;

  @Override
  public void init(HelixManager helixManager, TableConfig tableConfig) {
    _helixManager = helixManager;
    _tableConfig = tableConfig;
    _tableName = tableConfig.getTableName();
    SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
    Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null");
    _replication = validationAndRetentionConfig.getReplicationNumber();
    ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
        validationAndRetentionConfig.getReplicaGroupStrategyConfig();
    _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null;
    if (_partitionColumn == null) {
      LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
          + "with replication: {} without partition column for table: {} ", _replication, _tableName);
    } else {
      LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
          + "with replication: {} and partition column: {} for table: {}", _replication, _partitionColumn, _tableName);
    }
  }

  /**
   * Assigns the segment for the replica-group based segment assignment strategy and returns the assigned instances.
   */
  @Override
  public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
      InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
    int numPartitions = instancePartitions.getNumPartitions();
    checkReplication(instancePartitions, _replication, _tableName);
    int partitionId;
    if (_partitionColumn == null || numPartitions == 1) {
      partitionId = 0;
    } else {
      // Uniformly spray the segment partitions over the instance partitions
      if (_tableConfig.getTableType() == TableType.OFFLINE) {
        partitionId = SegmentAssignmentUtils
            .getOfflineSegmentPartitionId(segmentName, _tableName, _helixManager, _partitionColumn) % numPartitions;
      } else {
        partitionId = SegmentAssignmentUtils
            .getRealtimeSegmentPartitionId(segmentName, _tableName, _helixManager, _partitionColumn) % numPartitions;
      }
    }
    return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
  }

  @Override
  public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String, String>> currentAssignment,
      InstancePartitions instancePartitions, InstancePartitionsType instancePartitionsType) {
    Map<String, Map<String, String>> newAssignment;
    int numPartitions = instancePartitions.getNumPartitions();

    checkReplication(instancePartitions, _replication, _tableName);

    if (_partitionColumn == null || numPartitions == 1) {
      // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
      //       servers, which might cause hotspot servers because queries tend to hit the new segments. Use the
      //       table name hash as the random seed for the shuffle so that the result is deterministic.
      List<String> segments = new ArrayList<>(currentAssignment.keySet());
      Collections.shuffle(segments, new Random(_tableName.hashCode()));

      newAssignment = new TreeMap<>();
      SegmentAssignmentUtils
          .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, segments, newAssignment);
      return newAssignment;
    } else {
      Map<Integer, List<String>> instancePartitionIdToSegmentsMap;
      if (_tableConfig.getTableType() == TableType.OFFLINE) {
        instancePartitionIdToSegmentsMap = SegmentAssignmentUtils
            .getOfflineInstancePartitionIdToSegmentsMap(currentAssignment.keySet(),
                instancePartitions.getNumPartitions(), _tableName, _helixManager, _partitionColumn);
      } else {
        instancePartitionIdToSegmentsMap = SegmentAssignmentUtils
            .getRealtimeInstancePartitionIdToSegmentsMap(currentAssignment.keySet(),
                instancePartitions.getNumPartitions(), _tableName, _helixManager, _partitionColumn);
      }

      // NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
      //       servers, which might cause hotspot servers because queries tend to hit the new segments. Use the
      //       table name hash as the random seed for the shuffle so that the result is deterministic.
      Random random = new Random(_tableName.hashCode());
      for (List<String> segments : instancePartitionIdToSegmentsMap.values()) {
        Collections.shuffle(segments, random);
      }

      return SegmentAssignmentUtils
          .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, instancePartitionIdToSegmentsMap);
    }
  }

  /**
   * Helper method to check whether the number of replica-groups matches the table replication for replica-group based
   * instance partitions. Log a warning if they do not match and use the one inside the instance partitions. The
   * mismatch can happen when table is not configured correctly (table replication and numReplicaGroups does not match
   * or replication changed without reassigning instances).
   */
  private static void checkReplication(InstancePartitions instancePartitions, int replication, String tableName) {
    int numReplicaGroups = instancePartitions.getNumReplicaGroups();
    if (numReplicaGroups != replication) {
      LOGGER.warn(
          "Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for "
              + "table: {}, using: {}", instancePartitions.getInstancePartitionsName(), numReplicaGroups, replication,
          tableName, numReplicaGroups);
    }
  }
}
