/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.transport;

import java.net.InetAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Snapshot;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir;
import org.apache.cassandra.net.AbstractMessageHandler;
import org.apache.cassandra.net.ResourceLimits;

public class ClientResourceLimits
{
    private static final Logger logger = LoggerFactory.getLogger(ClientResourceLimits.class);

    private static final ResourceLimits.Concurrent GLOBAL_LIMIT = new ResourceLimits.Concurrent(getGlobalLimit());
    private static final AbstractMessageHandler.WaitQueue GLOBAL_QUEUE = AbstractMessageHandler.WaitQueue.global(GLOBAL_LIMIT);
    private static final ConcurrentMap<InetAddress, Allocator> PER_ENDPOINT_ALLOCATORS = new ConcurrentHashMap<>();

    public static Allocator getAllocatorForEndpoint(InetAddress endpoint)
    {
        while (true)
        {
            Allocator result = PER_ENDPOINT_ALLOCATORS.computeIfAbsent(endpoint, Allocator::new);
            if (result.acquire())
                return result;

            PER_ENDPOINT_ALLOCATORS.remove(endpoint, result);
        }
    }

    public static long getGlobalLimit()
    {
        return DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes();
    }

    public static void setGlobalLimit(long newLimit)
    {
        DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(newLimit);
        long existingLimit = GLOBAL_LIMIT.setLimit(getGlobalLimit());
        logger.info("Changed native_max_transport_requests_in_bytes from {} to {}", existingLimit, newLimit);
    }

    public static long getCurrentGlobalUsage()
    {
        return GLOBAL_LIMIT.using();
    }

    public static long getEndpointLimit()
    {
        return DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
    }

    public static void setEndpointLimit(long newLimit)
    {
        long existingLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp();
        DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(newLimit); // ensure new instances get the new limit
        for (Allocator allocator : PER_ENDPOINT_ALLOCATORS.values())
            existingLimit = allocator.endpointAndGlobal.endpoint().setLimit(newLimit);
        logger.info("Changed native_max_transport_requests_in_bytes_per_ip from {} to {}", existingLimit, newLimit);
    }

    public static Snapshot getCurrentIpUsage()
    {
        DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir();
        for (Allocator allocator : PER_ENDPOINT_ALLOCATORS.values())
        {
            histogram.update(allocator.endpointAndGlobal.endpoint().using());
        }
        return histogram.getSnapshot();
    }

    /**
     * This will recompute the ip usage histo on each query of the snapshot when requested instead of trying to keep
     * a histogram up to date with each request
     */
    public static Reservoir ipUsageReservoir()
    {
        return new Reservoir()
        {
            public int size()
            {
                return PER_ENDPOINT_ALLOCATORS.size();
            }

            public void update(long l)
            {
                throw new IllegalStateException();
            }

            public Snapshot getSnapshot()
            {
                return getCurrentIpUsage();
            }
        };
    }

    /**
     * Used during protocol negotiation in {@link InitialConnectionHandler} and then if protocol V4 or earlier is
     * selected. V5 connections convert this to a {@link ResourceProvider} and use the global/endpoint
     * limits and queues directly.
     */
    static class Allocator
    {
        private final AtomicInteger refCount = new AtomicInteger(0);
        private final InetAddress endpoint;

        private final ResourceLimits.EndpointAndGlobal endpointAndGlobal;
        // This is not used or exposed by instances of this class, but it is used for V5+
        // client connections so it's initialized here as the same queue must be shared
        // across all clients from the same remote address.
        private final AbstractMessageHandler.WaitQueue waitQueue;

        private Allocator(InetAddress endpoint)
        {
            this.endpoint = endpoint;
            ResourceLimits.Concurrent limit = new ResourceLimits.Concurrent(getEndpointLimit());
            endpointAndGlobal = new ResourceLimits.EndpointAndGlobal(limit, GLOBAL_LIMIT);
            waitQueue = AbstractMessageHandler.WaitQueue.endpoint(limit);
        }

        private boolean acquire()
        {
            return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1);
        }

        /**
         * Decrement the reference count, possibly removing the instance from the cache
         * if this is its final reference
         */
        void release()
        {
            if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
                PER_ENDPOINT_ALLOCATORS.remove(endpoint, this);
        }

        /**
         * Attempt to allocate a number of permits representing bytes towards the inflight
         * limits. To succeed, it must be possible to allocate from both the per-endpoint
         * and global reserves.
         *
         * @param amount number permits to allocate
         * @return outcome SUCCESS if the allocation was successful. In the case of failure,
         * either INSUFFICIENT_GLOBAL or INSUFFICIENT_ENPOINT to indicate which reserve rejected
         * the allocation request.
         */
        ResourceLimits.Outcome tryAllocate(long amount)
        {
            return endpointAndGlobal.tryAllocate(amount);
        }

        /**
         * Force an allocation of a number of permits representing bytes from the inflight
         * limits. Permits will be acquired from both the per-endpoint and global reserves
         * which may lead to either or both reserves going over their limits.
         * @param amount number permits to allocate
         */
        void allocate(long amount)
        {
            endpointAndGlobal.allocate(amount);
        }

        /**
         * Release a number of permits representing bytes back to the both the per-endpoint and
         * global limits for inflight requests.
         *
         * @param amount number of permits to release
         * @return outcome, ABOVE_LIMIT if either reserve is above its configured limit after
         * the operation completes or, BELOW_LIMIT if neither is.
         * rejected the allocation request.
         */
        ResourceLimits.Outcome release(long amount)
        {
            return endpointAndGlobal.release(amount);
        }

        @VisibleForTesting
        long endpointUsing()
        {
           return endpointAndGlobal.endpoint().using();
        }

        @VisibleForTesting
        long globallyUsing()
        {
           return endpointAndGlobal.global().using();
        }

        public String toString()
        {
            return String.format("InflightEndpointRequestPayload: %d/%d, InflightOverallRequestPayload: %d/%d",
                                 endpointAndGlobal.endpoint().using(),
                                 endpointAndGlobal.endpoint().limit(),
                                 endpointAndGlobal.global().using(),
                                 endpointAndGlobal.global().limit());
        }
    }

    /**
     * Used in protocol V5 and later by the AbstractMessageHandler/CQLMessageHandler hierarchy.
     * This hides the allocate/tryAllocate/release methods from EndpointResourceLimits and exposes
     * the endpoint and global limits, along with their corresponding
     * {@link org.apache.cassandra.net.AbstractMessageHandler.WaitQueue} directly.
     * Provided as an interface and single implementation for testing (see CQLConnectionTest)
     */
    interface ResourceProvider
    {
        ResourceLimits.Limit globalLimit();
        AbstractMessageHandler.WaitQueue globalWaitQueue();
        ResourceLimits.Limit endpointLimit();
        AbstractMessageHandler.WaitQueue endpointWaitQueue();
        void release();

        static class Default implements ResourceProvider
        {
            private final Allocator limits;

            Default(Allocator limits)
            {
                this.limits = limits;
            }

            public ResourceLimits.Limit globalLimit()
            {
                return limits.endpointAndGlobal.global();
            }

            public AbstractMessageHandler.WaitQueue globalWaitQueue()
            {
                return GLOBAL_QUEUE;
            }

            public ResourceLimits.Limit endpointLimit()
            {
                return limits.endpointAndGlobal.endpoint();
            }

            public AbstractMessageHandler.WaitQueue endpointWaitQueue()
            {
                return limits.waitQueue;
            }

            public void release()
            {
                limits.release();
            }
        }
    }
}
