/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.affinity;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

/**
 * Affinity interface implementation.
 */
public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
    /** */
    public static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " +
        "yet or cache was already stopped): ";

    /** Cache context. */
    private GridCacheContext<K, V> cctx;

    /** Logger. */
    private IgniteLogger log;

    /**
     * @param cctx Context.
     */
    public GridCacheAffinityImpl(GridCacheContext<K, V> cctx) {
        this.cctx = cctx;

        log = cctx.logger(getClass());
    }

    /** {@inheritDoc} */
    @Override public int partitions() {
        return cctx.group().affinityFunction().partitions();
    }

    /** {@inheritDoc} */
    @Override public int partition(K key) {
        A.notNull(key, "key");

        return cctx.affinity().partition(key);
    }

    /** {@inheritDoc} */
    @Override public boolean isPrimary(ClusterNode n, K key) {
        A.notNull(n, "n", key, "key");

        return cctx.affinity().primaryByKey(n, key, topologyVersion());
    }

    /** {@inheritDoc} */
    @Override public boolean isBackup(ClusterNode n, K key) {
        A.notNull(n, "n", key, "key");

        return cctx.affinity().backupsByKey(key, topologyVersion()).contains(n);
    }

    /** {@inheritDoc} */
    @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
        A.notNull(n, "n", key, "key");

        return cctx.affinity().partitionBelongs(n, cctx.affinity().partition(key), topologyVersion());
    }

    /** {@inheritDoc} */
    @Override public int[] primaryPartitions(ClusterNode n) {
        A.notNull(n, "n");

        Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topologyVersion());

        return U.toIntArray(parts);
    }

    /** {@inheritDoc} */
    @Override public int[] backupPartitions(ClusterNode n) {
        A.notNull(n, "n");

        Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topologyVersion());

        return U.toIntArray(parts);
    }

    /** {@inheritDoc} */
    @Override public int[] allPartitions(ClusterNode n) {
        A.notNull(n, "p");

        Collection<Integer> parts = new HashSet<>();

        AffinityTopologyVersion topVer = topologyVersion();

        for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) {
            for (ClusterNode affNode : cctx.affinity().nodesByPartition(part, topVer)) {
                if (n.id().equals(affNode.id())) {
                    parts.add(part);

                    break;
                }
            }
        }

        return U.toIntArray(parts);
    }

    /** {@inheritDoc} */
    @Override public ClusterNode mapPartitionToNode(int part) {
        A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions");

        return F.first(cctx.affinity().nodesByPartition(part, topologyVersion()));
    }

    /** {@inheritDoc} */
    @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) {
        A.notNull(parts, "parts");

        Map<Integer, ClusterNode> map = new HashMap<>();

        if (!F.isEmpty(parts)) {
            for (int p : parts)
                map.put(p, mapPartitionToNode(p));
        }

        return map;
    }

    /** {@inheritDoc} */
    @Override public Object affinityKey(K key) {
        A.notNull(key, "key");

        if (key instanceof CacheObject && !(key instanceof BinaryObject)) {
            CacheObjectContext ctx = cctx.cacheObjectContext();

            if (ctx == null)
                throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name());

            key = ((CacheObject)key).value(ctx, false);
        }

        CacheConfiguration ccfg = cctx.config();

        if (ccfg == null)
            throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name());

        return ccfg.getAffinityMapper().affinityKey(key);
    }

    /** {@inheritDoc} */
    @Override @Nullable public ClusterNode mapKeyToNode(K key) {
        A.notNull(key, "key");

        return F.first(mapKeysToNodes(F.asList(key)).keySet());
    }

    /** {@inheritDoc} */
    @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) {
        A.notNull(keys, "keys");

        AffinityTopologyVersion topVer = topologyVersion();

        int nodesCnt;

        if (!cctx.isLocal())
            nodesCnt = cctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer).size();
        else
            nodesCnt = 1;

        // Must return empty map if no alive nodes present or keys is empty.
        Map<ClusterNode, Collection<K>> res = new HashMap<>(nodesCnt, 1.0f);

        for (K key : keys) {
            ClusterNode primary = cctx.affinity().primaryByKey(key, topVer);

            if (primary == null)
                throw new IgniteException("Failed to get primary node [topVer=" + topVer + ", key=" + key + ']');

            Collection<K> mapped = res.get(primary);

            if (mapped == null) {
                mapped = new ArrayList<>(Math.max(keys.size() / nodesCnt, 16));

                res.put(primary, mapped);
            }

            mapped.add(key);
        }

        return res;
    }

    /** {@inheritDoc} */
    @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) {
        A.notNull(key, "key");

        return cctx.affinity().nodesByPartition(partition(key), topologyVersion());
    }

    /** {@inheritDoc} */
    @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) {
        A.ensure(part >= 0 && part < partitions(), "part >= 0 && part < total partitions");

        return cctx.affinity().nodesByPartition(part, topologyVersion());
    }

    /**
     * Gets current topology version.
     *
     * @return Topology version.
     */
    private AffinityTopologyVersion topologyVersion() {
        return cctx.affinity().affinityTopologyVersion();
    }
}
