/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.disruptor;

import java.util.HashMap;
import java.util.Map;

import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An implementation of the <a href="https://github.com/sirchia/camel-disruptor">Disruptor component</a>
 * for asynchronous SEDA exchanges on an
 * <a href="https://github.com/LMAX-Exchange/disruptor">LMAX Disruptor</a> within a CamelContext
 */
public class DisruptorComponent extends UriEndpointComponent {
    
    public static final int DEFAULT_BUFFER_SIZE = 1024;
    public static final int MAX_CONCURRENT_CONSUMERS = 500;

    private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorComponent.class);

    private int bufferSize = -1;
    //for SEDA compatibility only
    private int queueSize = -1;

    private int defaultConcurrentConsumers = 1;
    private boolean defaultMultipleConsumers;
    private DisruptorProducerType defaultProducerType = DisruptorProducerType.Multi;
    private DisruptorWaitStrategy defaultWaitStrategy = DisruptorWaitStrategy.Blocking;
    private boolean defaultBlockWhenFull = true;

    //synchronized access guarded by this
    private final Map<String, DisruptorReference> disruptors = new HashMap<String, DisruptorReference>();

    public DisruptorComponent() {
        super(DisruptorEndpoint.class);
    }

    @Override
    protected Endpoint createEndpoint(final String uri, final String remaining,
                                      final Map<String, Object> parameters) throws Exception {

        final int concurrentConsumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers);
        final boolean limitConcurrentConsumers = getAndRemoveParameter(parameters, "limitConcurrentConsumers", Boolean.class, true);

        if (limitConcurrentConsumers && concurrentConsumers > MAX_CONCURRENT_CONSUMERS) {
            throw new IllegalArgumentException(
                    "The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
                            + MAX_CONCURRENT_CONSUMERS + " was " + concurrentConsumers);
        }

        if (concurrentConsumers < 0) {
            throw new IllegalArgumentException("concurrentConsumers found to be " + concurrentConsumers 
                                               + ", must be greater than 0");
        }

        int size = 0;
        if (parameters.containsKey("size")) {
            size = getAndRemoveParameter(parameters, "size", int.class);
            if (size <= 0) {
                throw new IllegalArgumentException("size found to be " + size + ", must be greater than 0");
            }
        }

        // Check if the pollTimeout argument is set (may be the case if Disruptor component is used as drop-in
        // replacement for the SEDA component.
        if (parameters.containsKey("pollTimeout")) {
            throw new IllegalArgumentException("The 'pollTimeout' argument is not supported by the Disruptor component");
        }

        final DisruptorWaitStrategy waitStrategy = getAndRemoveParameter(parameters, "waitStrategy", DisruptorWaitStrategy.class, defaultWaitStrategy);
        final DisruptorProducerType producerType = getAndRemoveParameter(parameters, "producerType", DisruptorProducerType.class, defaultProducerType);
        final boolean multipleConsumers = getAndRemoveParameter(parameters, "multipleConsumers", boolean.class, defaultMultipleConsumers);
        final boolean blockWhenFull = getAndRemoveParameter(parameters, "blockWhenFull", boolean.class, defaultBlockWhenFull);

        final DisruptorReference disruptorReference = getOrCreateDisruptor(uri, remaining, size, producerType, waitStrategy);
        final DisruptorEndpoint disruptorEndpoint = new DisruptorEndpoint(uri, this, disruptorReference, concurrentConsumers, multipleConsumers, blockWhenFull);
        disruptorEndpoint.setWaitStrategy(waitStrategy);
        disruptorEndpoint.setProducerType(producerType);
        disruptorEndpoint.configureProperties(parameters);

        return disruptorEndpoint;
    }

    private DisruptorReference getOrCreateDisruptor(final String uri, final String name, final int size,
                                                    final DisruptorProducerType producerType,
                                                    final DisruptorWaitStrategy waitStrategy) throws Exception {
        final String key = getDisruptorKey(uri);

        int sizeToUse;
        if (size > 0) {
            sizeToUse = size;
        } else if (bufferSize > 0) {
            sizeToUse = bufferSize;
        } else if (queueSize > 0) {
            sizeToUse = queueSize;
        } else {
            sizeToUse = DEFAULT_BUFFER_SIZE;
        }
        sizeToUse = powerOfTwo(sizeToUse);

        synchronized (this) {
            DisruptorReference ref = getDisruptors().get(key);
            if (ref == null) {
                LOGGER.debug("Creating new disruptor for key {}", key);
                ref = new DisruptorReference(this, uri, name, sizeToUse, producerType, waitStrategy);
                getDisruptors().put(key, ref);
            } else {
                //if size was explicitly requested, the size to use should match the retrieved DisruptorReference
                if (size != 0 && ref.getBufferSize() != sizeToUse) {
                    // there is already a queue, so make sure the size matches
                    throw new IllegalArgumentException(
                            "Cannot use existing queue " + key + " as the existing queue size "
                                    + ref.getBufferSize() + " does not match given queue size " + sizeToUse);
                }
                LOGGER.debug("Reusing disruptor {} for key {}", ref, key);
            }

            return ref;
        }
    }

    private static int powerOfTwo(int size) {
        size--;
        size |= size >> 1;
        size |= size >> 2;
        size |= size >> 4;
        size |= size >> 8;
        size |= size >> 16;
        size++;
        return size;
    }

    public static String getDisruptorKey(String uri) {
        if (uri.contains("?")) {
            // strip parameters
            uri = uri.substring(0, uri.indexOf('?'));
        }
        return uri;
    }

    @Override
    protected void doStop() throws Exception {
        synchronized (this) {
            getDisruptors().clear();
        }
        super.doStop();
    }

    public Map<String, DisruptorReference> getDisruptors() {
        return disruptors;
    }

    public int getDefaultConcurrentConsumers() {
        return defaultConcurrentConsumers;
    }

    /**
     * To configure the default number of concurrent consumers
     */
    public void setDefaultConcurrentConsumers(final int defaultConcurrentConsumers) {
        this.defaultConcurrentConsumers = defaultConcurrentConsumers;
    }

    public boolean isDefaultMultipleConsumers() {
        return defaultMultipleConsumers;
    }

    /**
     * To configure the default value for multiple consumers
     */
    public void setDefaultMultipleConsumers(final boolean defaultMultipleConsumers) {
        this.defaultMultipleConsumers = defaultMultipleConsumers;
    }

    public DisruptorProducerType getDefaultProducerType() {
        return defaultProducerType;
    }

    /**
     * To configure the default value for DisruptorProducerType
     * <p/>
     * The default value is Multi.
     */
    public void setDefaultProducerType(final DisruptorProducerType defaultProducerType) {
        this.defaultProducerType = defaultProducerType;
    }

    public DisruptorWaitStrategy getDefaultWaitStrategy() {
        return defaultWaitStrategy;
    }

    /**
     * To configure the default value for DisruptorWaitStrategy
     * <p/>
     * The default value is Blocking.
     */
    public void setDefaultWaitStrategy(final DisruptorWaitStrategy defaultWaitStrategy) {
        this.defaultWaitStrategy = defaultWaitStrategy;
    }

    public boolean isDefaultBlockWhenFull() {
        return defaultBlockWhenFull;
    }

    /**
     * To configure the default value for block when full
     * <p/>
     * The default value is true.
     */
    public void setDefaultBlockWhenFull(boolean defaultBlockWhenFull) {
        this.defaultBlockWhenFull = defaultBlockWhenFull;
    }

    /**
     * To configure the ring buffer size
     */
    @Deprecated
    public void setQueueSize(final int size) {
        queueSize = size;
    }

    @Deprecated
    public int getQueueSize() {
        return queueSize;
    }

    /**
     * To configure the ring buffer size
     */
    public void setBufferSize(final int size) {
        bufferSize = size;
    }

    public int getBufferSize() {
        return bufferSize;
    }

    public void onShutdownEndpoint(DisruptorEndpoint disruptorEndpoint) {
        String disruptorKey = getDisruptorKey(disruptorEndpoint.getEndpointUri());
        DisruptorReference disruptorReference = getDisruptors().get(disruptorKey);

        if (disruptorReference.getEndpointCount() == 0) {
            //the last disruptor has been removed, we can delete the disruptor
            getDisruptors().remove(disruptorKey);
        }
    }
}
