/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery;
import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.cxf.dosgi.discovery.local.util.Utils.matchFilter;

/**
 * Manages the EndpointListeners and the scopes they are interested in.
 * For each scope with interested EndpointListeners an InterfaceMonitor is created.
 * The InterfaceMonitor calls back when it detects added or removed external Endpoints.
 * These events are then forwarded to all interested EndpointListeners.
 */
public class InterfaceMonitorManager {

    private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);

    private final BundleContext bctx;
    private final ZooKeeper zk;
    // map of EndpointListeners and the scopes they are interested in
    private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes =
            new HashMap<ServiceReference<EndpointListener>, List<String>>();
    // map of scopes and their interest data
    private final Map<String, Interest> interests = new HashMap<String, Interest>();

    protected static class Interest {
        List<ServiceReference<EndpointListener>> endpointListeners = 
            new CopyOnWriteArrayList<ServiceReference<EndpointListener>>();
        InterfaceMonitor monitor;
    }

    public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
        this.bctx = bctx;
        this.zk = zk;
    }

    public void addInterest(ServiceReference<EndpointListener> endpointListener) {
        if (isOurOwnEndpointListener(endpointListener)) {
            LOG.debug("Skipping our own EndpointListener");
            return;
        }

        LOG.info("updating EndpointListener interests: {}", endpointListener);
        if (LOG.isDebugEnabled()) {
            LOG.debug("updated EndpointListener properties: {}", Utils.getProperties(endpointListener));
        }
        for (String scope : Utils.getScopes(endpointListener)) {
            String objClass = Utils.getObjectClass(scope);
            LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
            addInterest(endpointListener, scope, objClass);
        }
    }

    private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) {
        return Boolean.parseBoolean(String.valueOf(
                endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
    }

    @SuppressWarnings("unchecked")
    public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener, 
                                         String scope, String objClass) {
        // get or create interest for given scope and add listener to it
        Interest interest = interests.get(scope);
        if (interest == null) {
            // create interest, add listener and start monitor
            interest = new Interest();
            interests.put(scope, interest);
            interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events
            interest.monitor = createInterfaceMonitor(scope, objClass, interest);
            interest.monitor.start();
        } else {
            // interest already exists, so just add listener to it
            if (!interest.endpointListeners.contains(endpointListener)) {
                interest.endpointListeners.add(endpointListener);
            }
            // notify listener of all known endpoints for given scope
            // (as EndpointListener contract requires of all added/modified listeners)
            for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
                notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener));
            }
        }

        // add scope to listener's scopes list
        List<String> scopes = endpointListenerScopes.get(endpointListener);
        if (scopes == null) {
            scopes = new ArrayList<String>(1);
            endpointListenerScopes.put(endpointListener, scopes);
        }
        if (!scopes.contains(scope)) {
            scopes.add(scope);
        }
    }

    public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) {
        LOG.info("removing EndpointListener interests: {}", endpointListener);
        List<String> scopes = endpointListenerScopes.get(endpointListener);
        if (scopes == null) {
            return;
        }

        for (String scope : scopes) {
            Interest interest = interests.get(scope);
            if (interest != null) {
                interest.endpointListeners.remove(endpointListener);
                if (interest.endpointListeners.isEmpty()) {
                    interest.monitor.close();
                    interests.remove(scope);
                }
            }
        }
        endpointListenerScopes.remove(endpointListener);
    }

    protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
        // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
        EndpointListener endpointListener = new EndpointListener() {

            public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
                notifyListeners(endpoint, scope, false, interest.endpointListeners);
            }

            public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
                notifyListeners(endpoint, scope, true, interest.endpointListeners);
            }
        };
        return new InterfaceMonitor(zk, objClass, endpointListener, scope);
    }

    private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded,
            List<ServiceReference<EndpointListener>> endpointListeners) {
        for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) {
            EndpointListener service = bctx.getService(endpointListenerRef);
            try {
                EndpointListener endpointListener = (EndpointListener)service;
                LOG.trace("matching {} against {}", endpoint, currentScope);
                if (matchFilter(bctx, currentScope, endpoint)) {
                    LOG.debug("Matched {} against {}", endpoint, currentScope);
                    notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(),
                                   endpointListener);
                }
            } finally {
                if (service != null) {
                    bctx.ungetService(endpointListenerRef);
                }
            }
        }
    }

    private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded,
                                Bundle endpointListenerBundle, EndpointListener endpointListener) {
        if (endpointListenerBundle == null) {
            LOG.info("listening service was unregistered, ignoring");
        } else if (isAdded) {
            LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle "
                    + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
            endpointListener.endpointAdded(endpoint, currentScope);
        } else {
            LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle "
                    + endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
            endpointListener.endpointRemoved(endpoint, currentScope);
        }
    }

    public synchronized void close() {
        for (Interest interest : interests.values()) {
            interest.monitor.close();
        }
        interests.clear();
        endpointListenerScopes.clear();
    }

    /**
     * Only for test case!
     */
    protected synchronized Map<String, Interest> getInterests() {
        return interests;
    }

    /**
     * Only for test case!
     */
    protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() {
        return endpointListenerScopes;
    }
}
