/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sling.distribution.resources.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;


import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.component.impl.DistributionComponent;
import org.apache.sling.distribution.component.impl.DistributionComponentKind;
import org.apache.sling.distribution.component.impl.DistributionComponentProvider;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.queue.impl.ErrorQueueDispatchingStrategy;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.resources.DistributionResourceTypes;
import org.apache.sling.distribution.resources.impl.common.SimplePathInfo;

/**
 * Extended service resource provider exposes children resources like .../agents/agentName/queues/queueName/queueItem
 */
public class ExtendedDistributionServiceResourceProvider extends DistributionServiceResourceProvider {

    private static final String QUEUES_PATH = "queues";
    private static final String LOG_PATH = "log";
    private static final String STATUS_PATH = "status";


    private static final int MAX_QUEUE_LENGTH = 5000;
    private static final int MAX_QUEUE_CHUNK = 100;



    public ExtendedDistributionServiceResourceProvider(String kind,
                                                       DistributionComponentProvider componentProvider,
                                                       String resourceRoot) {
        super(kind, componentProvider, resourceRoot);
    }


    @Override
    protected Map<String, Object> getChildResourceProperties(DistributionComponent<?> component, String childResourceName) {
        DistributionComponentKind kind = component.getKind();
        if (DistributionComponentKind.AGENT == kind) {
            DistributionAgent agent = (DistributionAgent) component.getService();

            if (agent != null && childResourceName != null) {
                if (childResourceName.startsWith(QUEUES_PATH)) {
                    SimplePathInfo queuePathInfo = SimplePathInfo.parsePathInfo(QUEUES_PATH, childResourceName);
                    return getQueueProperties(agent, queuePathInfo);
                } else if (childResourceName.startsWith(LOG_PATH)) {
                    Map<String, Object> result = new HashMap<String, Object>();
                    result.put(SLING_RESOURCE_TYPE, DistributionResourceTypes.LOG_RESOURCE_TYPE);
                    DistributionLog distributionLog = agent.getLog();

                    result.put(INTERNAL_ADAPTABLE, distributionLog);

                    return result;
                } else if (childResourceName.startsWith(STATUS_PATH)) {
                    Map<String, Object> result = new HashMap<String, Object>();
                    DistributionAgentState agentState = agent.getState();

                    result.put("state", agentState.name());
                    return result;
                }

            }
        }
        return null;
    }

    @Override
    protected Iterable<String> getChildResourceChildren(DistributionComponent<?> component, String childResourceName) {

        DistributionComponentKind kind = component.getKind();
        if (DistributionComponentKind.AGENT == kind) {
            DistributionAgent agent = (DistributionAgent) component.getService();

            if (agent != null) {
                if (childResourceName == null) {
                    List<String> nameList = new ArrayList<String>();
                    nameList.add(QUEUES_PATH);
                    nameList.add(LOG_PATH);
                    nameList.add(STATUS_PATH);

                    return nameList;
                }
            }
        }
        return null;
    }

    private Map<String, Object> getQueueProperties(DistributionAgent agent, SimplePathInfo queueInfo) {
        if (queueInfo.isRoot()) {
            Map<String, Object> result = new HashMap<String, Object>();

            List<String> nameList = new ArrayList<String>();
            for (String name : agent.getQueueNames()) {
                nameList.add(name);
            }
            result.put(ITEMS, nameList.toArray(new String[nameList.size()]));

            result.put(SLING_RESOURCE_TYPE, DistributionResourceTypes.AGENT_QUEUE_LIST_RESOURCE_TYPE);
            return result;
        } else if (queueInfo.isMain()) {
            String queueName = queueInfo.getMainResourceName();
            Map<String, Object> result = new HashMap<String, Object>();

            DistributionQueue queue = agent.getQueue(queueName);

            if (queue != null) {
                DistributionQueueStatus queueStatus = queue.getStatus();
                result.put(SLING_RESOURCE_TYPE, DistributionResourceTypes.AGENT_QUEUE_RESOURCE_TYPE);

                result.put("state", queueStatus.getState().name());
                result.put("empty", queueStatus.isEmpty());
                result.put("itemsCount", queueStatus.getItemsCount());

                if (queueName.startsWith(ErrorQueueDispatchingStrategy.ERROR_PREFIX)) {
                    String retryQueue = queueName.replace(ErrorQueueDispatchingStrategy.ERROR_PREFIX, "");
                    result.put("retryQueue", retryQueue);
                }

                List<String> nameList = new ArrayList<String>();

                DistributionQueueEntry entry = queue.getHead();
                if (entry != null) {
                    nameList.add(entry.getId());
                }

                result.put(ITEMS, nameList.toArray(new String[nameList.size()]));
                result.put(INTERNAL_ITEMS_ITERATOR, new QueueItemsIterator(queue));
                result.put(INTERNAL_ADAPTABLE, queue);
            }


            return result;

        } else if (queueInfo.isChild()) {
            String queueName = queueInfo.getMainResourceName();
            Map<String, Object> result = new HashMap<String, Object>();

            DistributionQueue queue = agent.getQueue(queueName);

            if (queue != null) {
                String itemId = queueInfo.getChildResourceName();

                DistributionQueueEntry entry = queue.getItem(itemId);
                result = getItemProperties(entry);
            }

            return result;
        }

        return null;
    }


    private Map<String, Object> getItemProperties(DistributionQueueEntry entry) {
        Map<String, Object> result = new HashMap<String, Object>();


        if (entry != null) {

            result.put(SLING_RESOURCE_TYPE, DistributionResourceTypes.AGENT_QUEUE_ITEM_RESOURCE_TYPE);

            DistributionQueueItem item = entry.getItem();
            DistributionPackageInfo packageInfo = DistributionPackageUtils.fromQueueItem(item);

            result.put("id", entry.getId());
            result.put("size", item.getSize());
            result.put("paths", packageInfo.getPaths());
            result.put("action", packageInfo.getRequestType());
            result.put("userid", packageInfo.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, String.class));

            DistributionQueueItemStatus status = entry.getStatus();
            result.put("attempts", status.getAttempts());
            result.put("time", status.getEntered().getTime());
            result.put("state", status.getItemState().name());

        }

        return result;

    }


    class QueueItemsIterator implements Iterator<Map<String, Object>> {

        private final DistributionQueue queue;
        private Iterator<DistributionQueueEntry> items;
        int fetched = 0;

        QueueItemsIterator(DistributionQueue queue) {
            this.queue = queue;
        }

        @Override
        public boolean hasNext() {

            if (fetched > MAX_QUEUE_LENGTH) {
                return false;
            }

            boolean shouldFetch = items == null || !items.hasNext();

            if (shouldFetch) {
                items = queue.getItems(fetched, MAX_QUEUE_CHUNK).iterator();
            }

            return items.hasNext();
        }

        @Override
        public Map<String, Object> next() {
            DistributionQueueEntry queueEntry = items.next();
            String itemName = queueEntry.getId();
            Map<String, Object> itemProperties = getItemProperties(queueEntry);
            itemProperties.put(INTERNAL_NAME, itemName);

            fetched ++;
            return itemProperties;
        }

        @Override
        public void remove() {
            items.remove();
        }
    }

}
