/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache;

import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.ANY_INIT;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.partition.PartitionNotAvailableException;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.util.CacheWriterAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.logging.internal.log4j.api.LogService;

public class PartitionedRegionHelper {
  private static final Logger logger = LogService.getLogger();

  /** 1 MB */
  static final long BYTES_PER_MB = 1024 * 1024;

  /**
   * The administrative region used for storing Partitioned Region meta data sub regions *
   */
  public static final String PR_ROOT_REGION_NAME = "__PR";

  /** Name of the DistributedLockService that PartitionedRegions used. */
  public static final String PARTITION_LOCK_SERVICE_NAME = "__PRLS";

  /** This is used to create bucket regions */
  static final String BUCKET_REGION_PREFIX = "_B_";

  /**
   * Time to wait for ownership (ms)
   * <p>
   * This should not be used normally. Internally, GemFire uses global locks to modify shared
   * meta-data and this property controls the delay before giving up trying to acquire a global lock
   */
  static final String VM_OWNERSHIP_WAIT_TIME_PROPERTY =
      DistributionConfig.GEMFIRE_PREFIX + "VM_OWNERSHIP_WAIT_TIME";

  /** Wait forever for ownership */
  static final long VM_OWNERSHIP_WAIT_TIME_DEFAULT = Long.MAX_VALUE;

  static final String MAX_PARTITIONED_REGION_ID = "MAX_PARTITIONED_REGION_ID";

  public static final int DEFAULT_WAIT_PER_RETRY_ITERATION = 100; // milliseconds

  public static final int DEFAULT_TOTAL_WAIT_RETRY_ITERATION = 60 * 60 * 1000; // milliseconds

  @Immutable
  public static final DataPolicy DEFAULT_DATA_POLICY = DataPolicy.PARTITION;

  @Immutable
  public static final Set ALLOWED_DATA_POLICIES;

  static final Object dlockMonitor = new Object();

  static {
    Set policies = new HashSet();
    policies.add(DEFAULT_DATA_POLICY);
    policies.add(DataPolicy.PERSISTENT_PARTITION);
    // policies.add(DataPolicy.NORMAL);
    ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies);
  }

  /**
   * This function is used for cleaning the config meta data for the failed or closed
   * PartitionedRegion node.
   *
   * @param failedNode The failed PartitionedRegion Node
   * @param regionIdentifier The PartitionedRegion for which the cleanup is required
   * @param cache GemFire cache.
   */
  static void removeGlobalMetadataForFailedNode(Node failedNode, String regionIdentifier,
      InternalCache cache) {
    removeGlobalMetadataForFailedNode(failedNode, regionIdentifier, cache, true);
  }

  /**
   * This function is used for cleaning the config meta data for the failed or closed
   * PartitionedRegion node.
   *
   * @param failedNode The failed PartitionedRegion Node
   * @param regionIdentifier The PartitionedRegion for which the cleanup is required
   * @param cache GemFire cache.
   * @param lock True if this removal should acquire and release the RegionLock
   */
  static void removeGlobalMetadataForFailedNode(Node failedNode, String regionIdentifier,
      InternalCache cache, final boolean lock) {
    Region root = PartitionedRegionHelper.getPRRoot(cache, false);
    if (root == null) {
      return; // no partitioned region info to clean up
    }
    PartitionRegionConfig prConfig = (PartitionRegionConfig) root.get(regionIdentifier);
    if (null == prConfig || !prConfig.containsNode(failedNode)) {
      return;
    }

    final PartitionedRegion.RegionLock rl =
        PartitionedRegion.getRegionLock(regionIdentifier, cache);
    try {
      if (lock) {
        rl.lock();
      }
      prConfig = (PartitionRegionConfig) root.get(regionIdentifier);
      if (prConfig != null && prConfig.containsNode(failedNode)) {
        if (logger.isDebugEnabled()) {
          logger.debug("Cleaning up config for pr {} node {}", regionIdentifier, failedNode);
        }
        if ((prConfig.getNumberOfNodes() - 1) <= 0) {
          if (logger.isDebugEnabled()) {
            logger.debug("No nodes left but failed node {} destroying entry {} nodes {}",
                failedNode, regionIdentifier, prConfig.getNodes());
          }
          try {
            root.destroy(regionIdentifier);
          } catch (EntryNotFoundException e) {
            logger.warn(
                String.format("Got EntryNotFoundException in destroy Op for allPRRegion key, %s",
                    regionIdentifier),
                e);
          }
        } else {
          prConfig.removeNode(failedNode);
          if (prConfig.getNumberOfNodes() == 0) {
            root.destroy(regionIdentifier);
          } else {
            // We can't go backwards, or we potentially lose data
            root.put(regionIdentifier, prConfig);
          }
        }
      }
    } finally {
      if (lock) {
        rl.unlock();
      }
    }
  }

  /**
   * Return a region that is the root for all Partitioned Region metadata on this node
   */
  public static LocalRegion getPRRoot(final InternalCache cache) {
    return getPRRoot(cache, true);
  }

  /**
   * Return a region that is the root for all PartitionedRegion meta data on this Node. The main
   * administrative Regions contained within are <code>allPartitionedRegion</code> (Scope
   * DISTRIBUTED_ACK) and <code>bucket2Node</code> (Scope DISTRIBUTED_ACK) and dataStore regions.
   *
   * @return a GLOBLAL scoped root region used for PartitionedRegion administration
   */
  public static LocalRegion getPRRoot(final InternalCache cache, boolean createIfAbsent) {
    DistributedRegion root = (DistributedRegion) cache.getRegion(PR_ROOT_REGION_NAME, true);
    if (root == null) {
      if (!createIfAbsent) {
        return null;
      }
      if (logger.isDebugEnabled()) {
        logger.debug("Creating root Partitioned Admin Region {}",
            PartitionedRegionHelper.PR_ROOT_REGION_NAME);
      }
      AttributesFactory factory = new AttributesFactory();
      factory.setScope(Scope.DISTRIBUTED_ACK);
      factory.setDataPolicy(DataPolicy.REPLICATE);
      factory.addCacheListener(new FixedPartitionAttributesListener());
      if (Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PRDebug")) {
        factory.addCacheListener(new CacheListenerAdapter() {
          @Override
          public void afterCreate(EntryEvent event) {
            if (logger.isDebugEnabled()) {
              logger.debug(
                  "Create Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
                  event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(),
                  event.getDistributedMember(), event.getOperation().isNetSearch());
            }
          }

          @Override
          public void afterUpdate(EntryEvent event) {
            if (logger.isDebugEnabled()) {
              logger.debug(
                  "Update Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
                  event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(),
                  event.getDistributedMember(), event.getOperation().isNetSearch());
            }
          }

          @Override
          public void afterDestroy(EntryEvent event) {
            if (logger.isDebugEnabled()) {
              logger.debug(
                  "Destroy Event for allPR: key = {} oldVal = {} newVal = {} Op = {} origin = {} isNetSearch = {}",
                  event.getKey(), event.getOldValue(), event.getNewValue(), event.getOperation(),
                  event.getDistributedMember(), event.getOperation().isNetSearch());
            }
          }
        });

        factory.setCacheWriter(new CacheWriterAdapter() {
          @Override
          public void beforeUpdate(EntryEvent event) throws CacheWriterException {
            // the prConfig node list must advance (otherwise meta data becomes out of sync)
            final PartitionRegionConfig newConf = (PartitionRegionConfig) event.getNewValue();
            final PartitionRegionConfig oldConf = (PartitionRegionConfig) event.getOldValue();
            if (newConf != oldConf && !newConf.isGreaterNodeListVersion(oldConf)) {
              throw new CacheWriterException(
                  String.format(
                      "New PartitionedRegionConfig %s does not have newer version than previous %s",
                      new Object[] {newConf, oldConf}));
            }
          }
        });
      }

      RegionAttributes ra = factory.create();
      // Create anonymous stats holder for Partitioned Region meta data
      final HasCachePerfStats prMetaStatsHolder = new HasCachePerfStats() {
        @Override
        public CachePerfStats getCachePerfStats() {
          return new CachePerfStats(cache.getDistributedSystem(), "RegionStats-partitionMetaData",
              cache.getStatisticsClock());
        }
      };

      try {
        root = (DistributedRegion) cache.createVMRegion(PR_ROOT_REGION_NAME, ra,
            new InternalRegionArguments().setIsUsedForPartitionedRegionAdmin(true)
                .setInternalRegion(true).setCachePerfStatsHolder(prMetaStatsHolder));
        root.getDistributionAdvisor().addMembershipListener(new MemberFailureListener(cache));
      } catch (RegionExistsException ignore) {
        // we avoid this before hand, but yet we have to catch it
        root = (DistributedRegion) cache.getRegion(PR_ROOT_REGION_NAME, true);
      } catch (IOException ieo) {
        Assert.assertTrue(false, "IOException creating Partitioned Region root: " + ieo);
      } catch (ClassNotFoundException cne) {
        Assert.assertTrue(false, "ClassNotFoundExcpetion creating Partitioned Region root: " + cne);
      }
    }
    Assert.assertTrue(root != null,
        "Can not obtain internal Partitioned Region configuration root");
    return root;
  }

  // TODO rebalancing - this code was added here in the merge of -r22804:23093 from trunk
  // because of changes made on trunk that require this method, which was removed on
  // prRebalancing. It probably needs refactoring.
  // The idea here is to remove meta data from the partitioned region for a node that
  // has left the cache.
  // A couple options that didn't work
  // - remove metadata in region advisor for PR instead - this doesn't work because
  // the a member can close it's cache and then recreate the same region. Another member
  // might end up removing meta data after the region is recreated, leading to inconsistent metadata
  // - remove metadata on cache closure in the member that is closing - This didn't work because
  // we can't do region operations after isClosing is set to true (to remove metadata). Removing
  // metadata
  // before is closing is set to true results operations being silently ignored because of
  // inconsistent metadata
  // and regions.
  /**
   * Clean the config meta data for a DistributedMember which has left the DistributedSystem, one
   * PartitionedRegion at a time.
   */
  public static void cleanUpMetaDataOnNodeFailure(InternalCache cache,
      DistributedMember failedMemId) {
    try {
      if (cache == null || cache.getCancelCriterion().isCancelInProgress()) {
        return;
      }

      if (logger.isDebugEnabled()) {
        logger.debug("Cleaning PartitionedRegion meta data for memberId={}", failedMemId);
      }
      Region rootReg = PartitionedRegionHelper.getPRRoot(cache, false);
      if (rootReg == null) {
        return;
      }

      final ArrayList<String> ks = new ArrayList<String>(rootReg.keySet());
      if (ks.size() > 1) {
        Collections.shuffle(ks, PartitionedRegion.RANDOM);
      }
      for (String prName : ks) {
        try {
          cleanUpMetaDataForRegion(cache, prName, failedMemId, null);

        } catch (CancelException ignore) {
          // okay to ignore this - metadata will be cleaned up by cache close operation
        } catch (Exception e) {
          if (logger.isDebugEnabled()) {
            logger.debug("Got exception in cleaning up metadata. {}", e.getMessage(), e);
          }
        }
      }
    } catch (CancelException ignore) {
      // ignore
    }
  }

  public static void cleanUpMetaDataForRegion(final InternalCache cache, final String prName,
      final DistributedMember failedMemId, final Runnable postCleanupTask) {
    boolean runPostCleanUp = true;
    try {
      final PartitionRegionConfig prConf;
      Region rootReg = PartitionedRegionHelper.getPRRoot(cache, false);
      if (rootReg == null) {
        return;
      }
      try {
        prConf = (PartitionRegionConfig) rootReg.get(prName);
      } catch (EntryDestroyedException ignore) {
        return;
      }
      if (prConf == null) {
        // darrel says: I'm seeing an NPE in this code after pr->rem
        // merge
        // so I added this check and continue
        return;
      }
      Set<Node> nodeList = prConf.getNodes();
      if (nodeList == null) {
        return;
      }

      for (final Node node1 : nodeList) {
        if (cache.getCancelCriterion().isCancelInProgress()) {
          return;
        }
        if (node1.getMemberId().equals(failedMemId)) {
          // Do the cleanup in another thread so we don't have the advisor locked.
          // Fix for #45365, we don't schedule an asynchronous task until
          // we have determined the node to remove (Which includes the
          // serial number).
          cache.getDistributionManager().getExecutors().getPrMetaDataCleanupThreadPool()
              .execute(new Runnable() {
                @Override
                public void run() {
                  cleanPartitionedRegionMetaDataForNode(cache, node1, prConf, prName);
                  if (postCleanupTask != null) {
                    postCleanupTask.run();
                  }
                }
              });
          runPostCleanUp = false;
          return;
        }
      }
    } finally {
      if (runPostCleanUp && postCleanupTask != null) {
        postCleanupTask.run();
      }
    }
  }

  /**
   * This is a function for cleaning the config meta data (both the configuration data and the
   * buckets) for a Node that hosted a PartitionedRegion
   */
  private static void cleanPartitionedRegionMetaDataForNode(InternalCache cache, Node node,
      PartitionRegionConfig prConf, String regionIdentifier) {
    if (logger.isDebugEnabled()) {
      logger.debug(
          "Cleaning PartitionedRegion meta data for node={} for Partitioned Region={} configuration={}",
          node, regionIdentifier, prConf);
    }
    PartitionedRegionHelper.removeGlobalMetadataForFailedNode(node, regionIdentifier, cache);

    if (logger.isDebugEnabled()) {
      logger.debug("Done Cleaning PartitionedRegion meta data for memberId={} for {}", node,
          regionIdentifier);
    }
  }

  /**
   * Runs hashCode() on given key producing a long value and then finds absolute value of the
   * modulus with bucketSize. For better key distribution, possibly use MD5 or SHA or any unique ID
   * generator for the hash function.
   *
   * @param pr the partitioned region on which to operate
   * @param key the key on which to determine the hash key
   * @return the bucket id the key hashes to
   */
  // private static int NOSIGN = 0x7fffffff;
  /*
   * public static int getHashKey(PartitionedObject key) { PartitionedRegion pRegion =
   * (PartitionedRegion)entryOp.getRegion(); RoutingResolver resolver =
   * pRegion.getRoutingResolver();
   *
   * int totalNumberOfBuckets = pRegion.getTotalNumberOfBuckets(); Object resolveKey = null; if
   * (resolver == null) { resolveKey = key; } else {
   *
   * //resolveKey = resolver.getPartitionKey(key); resolveKey =
   * resolver.getRoutingObject((EntryOperation)key); } int hc = resolveKey.hashCode(); int bucketId
   * = hc % totalNumberOfBuckets; // Force positive bucket ids only return Math.abs(bucketId); // We
   * should use the same hash code spreader as most other java.util hash tables. // h += ~(h << 9);
   * // h ^= (h >>> 14); // h += (h << 4); // h ^= (h >>> 10); // h &= NOSIGN; // return h %
   * totalNumberOfBuckets; }
   **/

  private static PartitionResolver getResolver(PartitionedRegion pr, Object key,
      Object callbackArgument) {
    // First choice is one associated with the region
    PartitionResolver result = pr.getPartitionResolver();
    if (result != null) {
      return result;
    }

    // Second is the key
    if (key != null && key instanceof PartitionResolver) {
      return (PartitionResolver) key;
    }

    // Third is the callback argument
    if (callbackArgument != null && callbackArgument instanceof PartitionResolver) {
      return (PartitionResolver) callbackArgument;
    }

    // There is no resolver.
    return null;
  }

  /**
   * Runs hashCode() on given key/routing object producing a long value and then finds absolute
   * value of the modulus with bucketSize. For better key distribution, possibly use MD5 or SHA or
   * any unique ID generator for the hash function.
   *
   * @param pr the partitioned region on which to operate
   * @param operation operation
   * @param key the key on which to determine the hash key
   * @param callbackArgument the callbackArgument is passed to <code>PartitionResolver</code> to get
   *        Routing object
   * @return the bucket id the key/routing object hashes to
   */
  public static int getHashKey(PartitionedRegion pr, Operation operation, Object key, Object value,
      Object callbackArgument) {
    // avoid creating EntryOperation if there is no resolver
    try {
      return getHashKey(null, pr, operation, key, value, callbackArgument);
    } catch (IllegalStateException e) { // bug #43651 - check for shutdown before throwing this
      pr.getCache().getCancelCriterion().checkCancelInProgress(e);
      pr.checkClosedOrDestroyed(); // GEODE-2282 - check for region destroyed before throwing it
      throw e;
    }
  }

  /**
   * Runs hashCode() on given key/routing object producing a long value and then finds absolute
   * value of the modulus with bucketSize. For better key distribution, possibly use MD5 or SHA or
   * any unique ID generator for the hash function.
   *
   * @param event entry event created for this entry operation
   * @return the bucket id the key/routing object hashes to
   */
  public static int getHashKey(EntryOperation event) {
    return getHashKey(event, null, null, null, null, null);
  }

  /**
   * Runs hashCode() on given key/routing object producing a long value and then finds absolute
   * value of the modulus with bucketSize. For better key distribution, possibly use MD5 or SHA or
   * any unique ID generator for the hash function.
   *
   * @param event entry event created for this entry operation; can be null
   * @param pr the partitioned region on which to operate
   * @param operation operation
   * @param key the key on which to determine the hash key
   * @param callbackArgument the callbackArgument is passed to <code>PartitionResolver</code> to get
   *        Routing object
   * @return the bucket id the key/routing object hashes to
   */
  private static int getHashKey(EntryOperation event, PartitionedRegion pr, Operation operation,
      Object key, Object value, Object callbackArgument) {
    // avoid creating EntryOperation if there is no resolver
    if (event != null) {
      pr = (PartitionedRegion) event.getRegion();
      key = event.getKey();
      callbackArgument = event.getCallbackArgument();
    }

    PartitionResolver resolver = getResolver(pr, key, callbackArgument);
    Object resolveKey = null;
    if (pr.isFixedPartitionedRegion()) {
      String partition = null;
      if (resolver instanceof FixedPartitionResolver) {
        Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
        if (event == null) {
          event = new EntryOperationImpl(pr, operation, key, value, callbackArgument);
        }
        partition =
            ((FixedPartitionResolver) resolver).getPartitionName(event, partitionMap.keySet());
        if (partition == null) {
          Object[] prms = new Object[] {pr.getName(), resolver};
          throw new IllegalStateException(
              String.format("For region %s, partition resolver %s returned partition name null",
                  prms));
        }
        Integer[] bucketArray = partitionMap.get(partition);
        if (bucketArray == null) {
          Object[] prms = new Object[] {pr.getName(), partition};
          throw new PartitionNotAvailableException(
              String.format(
                  "For FixedPartitionedRegion %s, partition %s is not available on any datastore.",
                  prms));
        }
        int numBukets = bucketArray[1];
        resolveKey = (numBukets == 1) ? partition : resolver.getRoutingObject(event);
      } else if (resolver == null) {
        throw new IllegalStateException(
            String.format(
                "For FixedPartitionedRegion %s, FixedPartitionResolver is not available (neither through the partition attribute partition-resolver nor key/callbackArg implementing FixedPartitionResolver)",
                pr.getName()));
      } else {
        Object[] prms = new Object[] {pr.getName(), resolver};
        throw new IllegalStateException(
            String.format(
                "For FixedPartitionedRegion %s, Resolver defined %s is not an instance of FixedPartitionResolver",
                prms));
      }
      return assignFixedBucketId(pr, partition, resolveKey);
    } else {
      // Calculate resolveKey.
      if (resolver == null) {
        // no custom partitioning at all
        resolveKey = key;
        if (resolveKey == null) {
          throw new IllegalStateException("attempting to hash null");
        }
      } else {
        if (event == null) {
          event = new EntryOperationImpl(pr, operation, key, value, callbackArgument);
        }
        resolveKey = resolver.getRoutingObject(event);
        if (resolveKey == null) {
          throw new IllegalStateException(
              "The RoutingObject returned by PartitionResolver is null.");
        }
      }
      // Finally, calculate the hash.
      return getHashKey(pr, resolveKey);
    }
  }

  private static int assignFixedBucketId(PartitionedRegion pr, String partition,
      Object resolveKey) {
    int startingBucketID = 0;
    int partitionNumBuckets = 0;
    boolean isPartitionAvailable = pr.getPartitionsMap().containsKey(partition);
    Integer[] partitionDeatils = pr.getPartitionsMap().get(partition);
    if (isPartitionAvailable) {
      startingBucketID = partitionDeatils[0];
      partitionNumBuckets = partitionDeatils[1];

      int hc = resolveKey.hashCode();
      int bucketId = Math.abs(hc % partitionNumBuckets);
      int partitionBucketID = bucketId + startingBucketID;
      assert partitionBucketID != KeyInfo.UNKNOWN_BUCKET;
      return partitionBucketID;
    }
    List<FixedPartitionAttributesImpl> localFPAs = pr.getFixedPartitionAttributesImpl();

    if (localFPAs != null) {
      for (FixedPartitionAttributesImpl fpa : localFPAs) {
        if (fpa.getPartitionName().equals(partition)) {
          isPartitionAvailable = true;
          partitionNumBuckets = fpa.getNumBuckets();
          startingBucketID = fpa.getStartingBucketID();
          break;
        }
      }
    }

    if (!isPartitionAvailable) {
      List<FixedPartitionAttributesImpl> remoteFPAs =
          pr.getRegionAdvisor().adviseAllFixedPartitionAttributes();
      for (FixedPartitionAttributesImpl fpa : remoteFPAs) {
        if (fpa.getPartitionName().equals(partition)) {
          isPartitionAvailable = true;
          partitionNumBuckets = fpa.getNumBuckets();
          startingBucketID = fpa.getStartingBucketID();
          break;
        }
      }
    }

    if (partitionNumBuckets == 0) {
      if (isPartitionAvailable) {
        Object[] prms = new Object[] {pr.getName(), partition};
        throw new IllegalStateException(
            String.format(
                "For region %s, For partition %s partition-num-buckets is set to 0. Buckets cann not be created on this partition.",
                prms));
      }
    }

    if (!isPartitionAvailable) {
      Object[] prms = new Object[] {pr.getName(), partition};
      throw new PartitionNotAvailableException(
          String.format("For region %s, partition name %s is not available on any datastore.",
              prms));
    }
    int hc = resolveKey.hashCode();
    int bucketId = Math.abs(hc % partitionNumBuckets);
    int partitionBucketID = bucketId + startingBucketID;
    assert partitionBucketID != KeyInfo.UNKNOWN_BUCKET;
    return partitionBucketID;
  }

  public static int getHashKey(PartitionedRegion pr, Object routingObject) {
    return getHashKey(routingObject, pr.getTotalNumberOfBuckets());
  }

  public static int getHashKey(Object routingObject, int totalNumBuckets) {
    int hc = routingObject.hashCode();
    int bucketId = hc % totalNumBuckets;
    // Force positive bucket ids only
    return Math.abs(bucketId);
  }

  public static PartitionedRegion getPartitionedRegion(String prName, Cache cache) {
    Region region = cache.getRegion(prName);
    if (region != null) {
      if (region instanceof PartitionedRegion)
        return (PartitionedRegion) region;
    }
    return null;
  }

  public static boolean isBucketRegion(String fullPath) {
    return getBucketName(fullPath) != null;
  }

  /**
   * Find a ProxyBucketRegion by parsing the region fullPath
   *
   * @param fullPath full region path to parse
   * @param postInit true if caller should wait for bucket initialization to complete
   * @return ProxyBucketRegion as Bucket or null if not found
   * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed
   */
  public static Bucket getProxyBucketRegion(Cache cache, String fullPath, boolean postInit)
      throws PRLocallyDestroyedException {
    if (cache == null) {
      // No cache
      return null;
    }
    // fullPath = /__PR/_B_1_10
    String bucketName = getBucketName(fullPath);
    if (bucketName == null) {
      return null;
    }

    String prid = getPRPath(bucketName);
    // PartitionedRegion region =
    // PartitionedRegion.getPRFromId(Integer.parseInt(prid));

    Region region;
    final InitializationLevel oldLevel = LocalRegion.setThreadInitLevelRequirement(ANY_INIT);
    try {
      region = cache.getRegion(prid);
    } finally {
      LocalRegion.setThreadInitLevelRequirement(oldLevel);
    }
    if (region == null || !(region instanceof PartitionedRegion)) {
      return null;
    }

    PartitionedRegion pr = (PartitionedRegion) region;

    int bid = getBucketId(bucketName);
    RegionAdvisor ra = (RegionAdvisor) pr.getDistributionAdvisor();
    if (postInit) {
      return ra.getBucketPostInit(bid);
    } else if (!ra.areBucketsInitialized()) {
      // While the RegionAdvisor may be available, it's bucket meta-data may not be constructed yet
      return null;
    } else {
      return ra.getBucket(bid);
    }

  }

  private static final String BUCKET_FULL_PATH_PREFIX =
      PR_ROOT_REGION_NAME + Region.SEPARATOR + BUCKET_REGION_PREFIX;

  /**
   * Get the bucket string by parsing the region fullPath
   *
   * @param bucketFullPath full region path to parse
   * @return the bucket string or null if no bucket string is present
   */
  public static String getBucketName(String bucketFullPath) {
    if (bucketFullPath == null || bucketFullPath.length() == 0) {
      return null;
    }
    int idxStartRoot = bucketFullPath.indexOf(BUCKET_FULL_PATH_PREFIX);
    // parse bucketString
    if (idxStartRoot != -1) {
      int idxEndRoot = idxStartRoot + PR_ROOT_REGION_NAME.length() + Region.SEPARATOR.length();
      return bucketFullPath.substring(idxEndRoot);
    }

    if (logger.isDebugEnabled()) {
      logger.debug("getBucketString no match fullPath={}", bucketFullPath);
    }
    return null;
  }

  public static String getBucketFullPath(String prFullPath, int bucketId) {
    String name = getBucketName(prFullPath, bucketId);
    if (name != null)
      return Region.SEPARATOR + PR_ROOT_REGION_NAME + Region.SEPARATOR + name;

    return null;

  }

  public static String escapePRPath(String prFullPath) {
    String escaped = prFullPath.replace("_", "__");
    escaped = escaped.replace(Region.SEPARATOR_CHAR, '_');
    return escaped;
  }


  public static final String TWO_SEPARATORS = Region.SEPARATOR + Region.SEPARATOR;

  public static String unescapePRPath(String escapedPath) {
    String path = escapedPath.replace('_', Region.SEPARATOR_CHAR);
    path = path.replace(TWO_SEPARATORS, "_");
    return path;
  }

  public static String getBucketName(String prPath, int bucketId) {
    return PartitionedRegionHelper.BUCKET_REGION_PREFIX
        + PartitionedRegionHelper.escapePRPath(prPath) + PartitionedRegion.BUCKET_NAME_SEPARATOR
        + bucketId;
  }


  /**
   * Returns the PR name give the bucketName (see getBucketName).
   */
  public static String getPRPath(String bucketName) {
    // bucketName = _B_PRNAME_10
    int pridIdx = PartitionedRegionHelper.BUCKET_REGION_PREFIX.length();
    int bidSepIdx = bucketName.lastIndexOf(PartitionedRegion.BUCKET_NAME_SEPARATOR);
    Assert.assertTrue(bidSepIdx > -1, "getProxyBucketRegion failed on " + bucketName);
    return unescapePRPath(bucketName.substring(pridIdx, bidSepIdx));
  }

  /**
   * Returns the bucket id gvien the bucketName (see getBucketName).
   */
  public static int getBucketId(String bucketName) {
    // bucketName = _B_PRNAME_10
    int bidSepIdx = bucketName.lastIndexOf(PartitionedRegion.BUCKET_NAME_SEPARATOR);
    String bid = bucketName.substring(bidSepIdx + 1);
    return Integer.parseInt(bid);
  }

  /**
   * This method returns true if the last region in provided fullPath is a sub-region else it
   * returns false. If fullPath is "/REGION1" it would return false and if it is "/REGION1/REGION2",
   * it would return true, which means that Region2 is a sub-region.
   *
   * @param fullPath full path of the region
   * @return true if given full path has sub-regions else return false
   */

  public static boolean isSubRegion(String fullPath) {
    boolean isSubRegion = false;
    if (null != fullPath) {
      int idx = fullPath.indexOf(Region.SEPARATOR, Region.SEPARATOR.length());
      if (idx >= 0)
        isSubRegion = true;
    }
    return isSubRegion;
  }

  /**
   * Utility method to print warning when nodeList in b2n region is found empty. This will signify
   * potential data loss scenario.
   *
   * @param bucketId Id of Bucket whose nodeList in b2n is empty.
   * @param callingMethod methodName of the calling method.
   */
  public static void logForDataLoss(PartitionedRegion partitionedRegion, int bucketId,
      String callingMethod) {
    if (!Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PRDebug")) {
      return;
    }
    Region root = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache());
    // Region allPartitionedRegions = PartitionedRegionHelper.getPRConfigRegion(
    // root, partitionedRegion.getCache());
    PartitionRegionConfig prConfig =
        (PartitionRegionConfig) root.get(partitionedRegion.getRegionIdentifier());
    if (prConfig == null)
      return;

    Set members = partitionedRegion.getDistributionManager().getDistributionManagerIds();
    logger.warn(
        "DATALOSS (  {}  ) :: Size of nodeList After verifyBucketNodes for bucket ID,  {}  is 0",
        callingMethod, bucketId);
    logger.warn("DATALOSS (  {}  ) :: NodeList from prConfig,  {}",
        callingMethod, printCollection(prConfig.getNodes()));
    logger.warn("DATALOSS (  {}  ) :: Current Membership List,  {}",
        callingMethod, printCollection(members));
  }

  /**
   * Utility method to print a collection.
   *
   */
  public static String printCollection(Collection c) {
    if (c != null) {
      StringBuilder sb = new StringBuilder("[");
      Iterator itr = c.iterator();
      while (itr.hasNext()) {
        sb.append(itr.next());
        if (itr.hasNext()) {
          sb.append(", ");
        }
      }
      sb.append("]");
      return sb.toString();
    } else {
      return "[null]";
    }
  }

  public static FixedPartitionAttributesImpl getFixedPartitionAttributesForBucket(
      PartitionedRegion pr, int bucketId) {
    List<FixedPartitionAttributesImpl> localFPAs = pr.getFixedPartitionAttributesImpl();

    if (localFPAs != null) {
      for (FixedPartitionAttributesImpl fpa : localFPAs) {
        if (fpa.hasBucket(bucketId)) {
          return fpa;
        }
      }
    }

    List<FixedPartitionAttributesImpl> remoteFPAs =
        pr.getRegionAdvisor().adviseAllFixedPartitionAttributes();
    for (FixedPartitionAttributesImpl fpa : remoteFPAs) {
      if (fpa.hasBucket(bucketId)) {
        return fpa;
      }
    }
    Object[] prms = new Object[] {pr.getName(), bucketId};
    throw new PartitionNotAvailableException(
        String.format(
            "For FixedPartitionedRegion %s, Fixed partition is not defined for bucket id %s on any datastore",
            prms));
  }

  private static Set<String> getAllAvailablePartitions(PartitionedRegion region) {
    Set<String> partitionSet = new HashSet<String>();
    List<FixedPartitionAttributesImpl> localFPAs = region.getFixedPartitionAttributesImpl();
    if (localFPAs != null) {
      for (FixedPartitionAttributesImpl fpa : localFPAs) {
        partitionSet.add(fpa.getPartitionName());
      }
    }

    List<FixedPartitionAttributesImpl> remoteFPAs =
        region.getRegionAdvisor().adviseAllFixedPartitionAttributes();
    for (FixedPartitionAttributes fpa : remoteFPAs) {
      partitionSet.add(fpa.getPartitionName());
    }
    return Collections.unmodifiableSet(partitionSet);
  }

  public static Set<FixedPartitionAttributes> getAllFixedPartitionAttributes(
      PartitionedRegion region) {
    Set<FixedPartitionAttributes> fpaSet = new HashSet<FixedPartitionAttributes>();
    List<FixedPartitionAttributesImpl> localFPAs = region.getFixedPartitionAttributesImpl();
    if (localFPAs != null) {
      fpaSet.addAll(localFPAs);
    }
    List<FixedPartitionAttributesImpl> remoteFPAs =
        region.getRegionAdvisor().adviseAllFixedPartitionAttributes();
    fpaSet.addAll(remoteFPAs);
    return fpaSet;
  }

  private static class MemberFailureListener implements MembershipListener {

    InternalCache cache = null;

    MemberFailureListener(InternalCache cache) {
      this.cache = cache;
    }

    @Override
    public void memberJoined(DistributionManager distributionManager,
        InternalDistributedMember id) {

    }

    @Override
    public void memberDeparted(DistributionManager distributionManager,
        final InternalDistributedMember id, boolean crashed) {
      PartitionedRegionHelper.cleanUpMetaDataOnNodeFailure(cache, id);
    }

    @Override
    public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
        InternalDistributedMember whoSuspected, String reason) {}

    @Override
    public void quorumLost(DistributionManager distributionManager,
        Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}

  }

  static class FixedPartitionAttributesListener extends CacheListenerAdapter {
    private static final Logger logger = LogService.getLogger();

    @Override
    public void afterCreate(EntryEvent event) {
      PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue();
      if (!prConfig.getElderFPAs().isEmpty()) {
        updatePartitionMap(prConfig);
      }
    }

    @Override
    public void afterUpdate(EntryEvent event) {
      PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue();
      if (!prConfig.getElderFPAs().isEmpty()) {
        updatePartitionMap(prConfig);
      }
    }

    private void updatePartitionMap(PartitionRegionConfig prConfig) {
      int prId = prConfig.getPRId();
      PartitionedRegion pr = null;

      try {
        pr = PartitionedRegion.getPRFromId(prId);
        if (pr != null) {
          Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
          for (FixedPartitionAttributesImpl fxPrAttr : prConfig.getElderFPAs()) {
            partitionMap.put(fxPrAttr.getPartitionName(),
                new Integer[] {fxPrAttr.getStartingBucketID(), fxPrAttr.getNumBuckets()});
          }
        }
      } catch (PRLocallyDestroyedException e) {
        logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node",
            prConfig.getPRId(), e);
      }
    }
  }
}
