/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.client.internal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.logging.log4j.Logger;

import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.ClassPathLoader;
import org.apache.geode.internal.cache.BucketServerLocation66;
import org.apache.geode.internal.cache.FixedPartitionAttributesImpl;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * Stores the information such as partition attributes and meta data details
 *
 *
 * @since GemFire 6.5
 *
 */
public class ClientPartitionAdvisor {

  private static final Logger logger = LogService.getLogger();

  private final ConcurrentMap<Integer, List<BucketServerLocation66>> bucketServerLocationsMap =
      new ConcurrentHashMap<Integer, List<BucketServerLocation66>>();

  private final int totalNumBuckets;

  private String serverGroup = "";

  private final String colocatedWith;

  private PartitionResolver partitionResolver = null;

  private Map<String, List<Integer>> fixedPAMap = null;

  private boolean fpaAttrsCompletes = false;

  private Random random = new Random();

  @SuppressWarnings("unchecked")
  public ClientPartitionAdvisor(int totalNumBuckets, String colocatedWith,
      String partitionResolverName, Set<FixedPartitionAttributes> fpaSet) {

    this.totalNumBuckets = totalNumBuckets;
    this.colocatedWith = colocatedWith;
    try {
      if (partitionResolverName != null) {
        this.partitionResolver = (PartitionResolver) ClassPathLoader.getLatest()
            .forName(partitionResolverName).newInstance();
      }
    } catch (Exception e) {
      if (logger.isErrorEnabled()) {
        logger.error(e.getMessage(), e);
      }

      throw new InternalGemFireException(
          String.format("Cannot create an instance of PartitionResolver : %s",
              partitionResolverName));
    }
    if (fpaSet != null) {
      fixedPAMap = new ConcurrentHashMap<String, List<Integer>>();
      int totalFPABuckets = 0;
      for (FixedPartitionAttributes fpa : fpaSet) {
        List attrList = new ArrayList();
        totalFPABuckets += fpa.getNumBuckets();
        attrList.add(fpa.getNumBuckets());
        attrList.add(((FixedPartitionAttributesImpl) fpa).getStartingBucketID());
        fixedPAMap.put(fpa.getPartitionName(), attrList);
      }
      if (totalFPABuckets == this.totalNumBuckets) {
        this.fpaAttrsCompletes = true;
      }
    }
  }

  public ServerLocation adviseServerLocation(int bucketId) {
    if (this.bucketServerLocationsMap.containsKey(bucketId)) {
      List<BucketServerLocation66> locations = this.bucketServerLocationsMap.get(bucketId);
      List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(locations);

      if (locationsCopy.isEmpty()) {
        return null;
      }
      if (locationsCopy.size() == 1) {
        return locationsCopy.get(0);
      }
      int index = random.nextInt(locationsCopy.size());
      return locationsCopy.get(index);
    }
    return null;
  }

  public ServerLocation adviseRandomServerLocation() {
    ArrayList<Integer> bucketList = new ArrayList<Integer>(this.bucketServerLocationsMap.keySet());
    int size = bucketList.size();
    if (size > 0) {
      List<BucketServerLocation66> locations =
          this.bucketServerLocationsMap.get(bucketList.get(random.nextInt(size)));
      if (locations != null) {
        List<BucketServerLocation66> serverList = new ArrayList<BucketServerLocation66>(locations);
        if (serverList.size() == 0)
          return null;
        return serverList.get(0);
      }
    }
    return null;
  }

  public List<BucketServerLocation66> adviseServerLocations(int bucketId) {
    if (this.bucketServerLocationsMap.containsKey(bucketId)) {
      List<BucketServerLocation66> locationsCopy =
          new ArrayList<BucketServerLocation66>(this.bucketServerLocationsMap.get(bucketId));
      return locationsCopy;
    }
    return null;
  }

  public ServerLocation advisePrimaryServerLocation(int bucketId) {
    if (this.bucketServerLocationsMap.containsKey(bucketId)) {
      List<BucketServerLocation66> locations = this.bucketServerLocationsMap.get(bucketId);
      List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(locations);
      for (BucketServerLocation66 loc : locationsCopy) {
        if (loc.isPrimary()) {
          return loc;
        }
      }
    }
    return null;
  }

  public void updateBucketServerLocations(int bucketId,
      List<BucketServerLocation66> bucketServerLocations, ClientMetadataService cms) {
    List<BucketServerLocation66> locationCopy = new ArrayList<BucketServerLocation66>();
    List<BucketServerLocation66> locations;

    boolean honourSeverGroup = cms.honourServerGroup();

    if (this.serverGroup.length() != 0 && honourSeverGroup) {
      for (BucketServerLocation66 s : bucketServerLocations) {
        String[] groups = s.getServerGroups();
        if (groups.length > 0) {
          for (String str : groups) {
            if (str.equals(this.serverGroup)) {
              locationCopy.add(s);
              break;
            }
          }
        } else {
          locationCopy.add(s);
        }
      }
      locations = Collections.unmodifiableList(locationCopy);
    } else {
      locations = Collections.unmodifiableList(bucketServerLocations);
    }

    this.bucketServerLocationsMap.put(bucketId, locations);
  }

  public void removeBucketServerLocation(ServerLocation serverLocation) {
    Iterator<Map.Entry<Integer, List<BucketServerLocation66>>> iter =
        this.bucketServerLocationsMap.entrySet().iterator();
    while (iter.hasNext()) {
      Map.Entry<Integer, List<BucketServerLocation66>> entry = iter.next();
      Integer key = entry.getKey();
      List<BucketServerLocation66> oldLocations = entry.getValue();
      List<BucketServerLocation66> newLocations =
          new ArrayList<BucketServerLocation66>(oldLocations);
      // if this serverLocation contains in the list the remove the
      // serverLocation and update the map with new List
      while (newLocations.remove(serverLocation)
          && !this.bucketServerLocationsMap.replace(key, oldLocations, newLocations)) {
        oldLocations = this.bucketServerLocationsMap.get(key);
        newLocations = new ArrayList<BucketServerLocation66>(oldLocations);
      }
    }
  }

  public Map<Integer, List<BucketServerLocation66>> getBucketServerLocationsMap_TEST_ONLY() {
    return this.bucketServerLocationsMap;
  }

  /**
   * This method returns total number of buckets for a PartitionedRegion.
   *
   * @return total number of buckets for a PartitionedRegion.
   */

  public int getTotalNumBuckets() {
    return this.totalNumBuckets;
  }

  /**
   * @return the serverGroup
   */
  public String getServerGroup() {
    return this.serverGroup;
  }


  public void setServerGroup(String group) {
    this.serverGroup = group;
  }

  /**
   * Returns name of the colocated PartitionedRegion on CacheServer
   */
  public String getColocatedWith() {
    return this.colocatedWith;
  }

  /**
   * Returns the PartitionResolver set for custom partitioning
   *
   * @return <code>PartitionResolver</code> for the PartitionedRegion
   */
  public PartitionResolver getPartitionResolver() {
    return this.partitionResolver;
  }

  public Set<String> getFixedPartitionNames() {
    return this.fixedPAMap.keySet();
  }

  public int assignFixedBucketId(Region region, String partition, Object resolveKey) {
    if (this.fixedPAMap.containsKey(partition)) {
      List<Integer> attList = this.fixedPAMap.get(partition);
      int hc = resolveKey.hashCode();
      int bucketId = Math.abs(hc % (attList.get(0)));
      int partitionBucketID = bucketId + attList.get(1);
      return partitionBucketID;
    } else {
      // We don't know as we might not have got the all FPAttributes
      // from the FPR, So don't throw the exception but send the request
      // to the server and update the FPA attributes
      // This exception should be thrown from the server as we will
      // not be sure of partition not available unless we contact the server.
      return -1;
    }
  }

  public Map<String, List<Integer>> getFixedPAMap() {
    return this.fixedPAMap;
  }

  public void updateFixedPAMap(Map<String, List<Integer>> map) {
    this.fixedPAMap.putAll(map);
  }

  public boolean isFPAAttrsComplete() {
    return this.fpaAttrsCompletes;
  }
}
