/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.karaf.cellar.hazelcast.internal.osgi;

import com.hazelcast.core.HazelcastInstance;
import org.apache.aries.proxy.ProxyManager;
import org.apache.karaf.cellar.core.ClusterManager;
import org.apache.karaf.cellar.core.GroupManager;
import org.apache.karaf.cellar.core.Node;
import org.apache.karaf.cellar.core.Synchronizer;
import org.apache.karaf.cellar.core.command.BasicCommandStore;
import org.apache.karaf.cellar.core.command.ClusteredExecutionContext;
import org.apache.karaf.cellar.core.command.CommandStore;
import org.apache.karaf.cellar.core.command.ExecutionContext;
import org.apache.karaf.cellar.core.control.*;
import org.apache.karaf.cellar.core.discovery.DiscoveryService;
import org.apache.karaf.cellar.core.discovery.DiscoveryTask;
import org.apache.karaf.cellar.core.event.*;
import org.apache.karaf.cellar.core.management.CellarGroupMBean;
import org.apache.karaf.cellar.core.management.CellarMBean;
import org.apache.karaf.cellar.core.management.CellarNodeMBean;
import org.apache.karaf.cellar.core.utils.CombinedClassLoader;
import org.apache.karaf.cellar.hazelcast.*;
import org.apache.karaf.cellar.hazelcast.factory.HazelcastConfigurationManager;
import org.apache.karaf.cellar.hazelcast.factory.HazelcastServiceFactory;
import org.apache.karaf.cellar.hazelcast.management.internal.CellarGroupMBeanImpl;
import org.apache.karaf.cellar.hazelcast.management.internal.CellarMBeanImpl;
import org.apache.karaf.cellar.hazelcast.management.internal.CellarNodeMBeanImpl;
import org.apache.karaf.util.tracker.BaseActivator;
import org.apache.karaf.util.tracker.annotation.Managed;
import org.apache.karaf.util.tracker.annotation.ProvideService;
import org.apache.karaf.util.tracker.annotation.RequireService;
import org.apache.karaf.util.tracker.annotation.Services;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.service.cm.ConfigurationListener;
import org.osgi.service.cm.ManagedService;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

@Services(
        provides = {
                @ProvideService(HazelcastInstance.class),
                @ProvideService(ClusterManager.class),
                @ProvideService(GroupManager.class),
                @ProvideService(EventTransportFactory.class),
                @ProvideService(EventProducer.class),
                @ProvideService(ExecutionContext.class),
                @ProvideService(EventHandler.class),
                @ProvideService(CommandStore.class),
                @ProvideService(CellarMBean.class),
                @ProvideService(CellarNodeMBean.class),
                @ProvideService(CellarGroupMBean.class)
        },
        requires = {
                @RequireService(ConfigurationAdmin.class),
                @RequireService(ProxyManager.class),
                @RequireService(EventHandlerRegistry.class)
        }
)
@Managed("org.apache.karaf.cellar.discovery")
public class Activator extends BaseActivator implements ManagedService {

    private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class);

    private CombinedClassLoader combinedClassLoader;
    private HazelcastServiceFactory hazelcastServiceFactory;
    private List<DiscoveryService> discoveryServices = new ArrayList<DiscoveryService>();
    private List<Synchronizer> synchronizers = new ArrayList<Synchronizer>();
    private HazelcastInstance hazelcastInstance;
    private HazelcastGroupManager groupManager;
    private DiscoveryTask discoveryTask;
    private CellarExtender extender;
    private TopicProducer producer;
    private TopicConsumer consumer;
    private ServiceTracker<DiscoveryService, DiscoveryService> discoveryServiceTracker;
    private ServiceTracker<Synchronizer, Synchronizer> synchronizerServiceTracker;

    private volatile ServiceRegistration coreMBeanRegistration;
    private volatile ServiceRegistration nodeMBeanRegistration;
    private volatile ServiceRegistration groupMBeanRegistration;

    private HashMap updatedConfig;

    @Override
    public void doStart() throws Exception {

        ConfigurationAdmin configurationAdmin = getTrackedService(ConfigurationAdmin.class);
        if (configurationAdmin == null)
            return;
        EventHandlerRegistry eventHandlerRegistry = getTrackedService(EventHandlerRegistry.class);
        if (eventHandlerRegistry == null)
            return;
        ProxyManager proxyManager = getTrackedService(ProxyManager.class);
        if (proxyManager == null)
            return;

        LOGGER.debug("CELLAR HAZELCAST: init combined class loader");
        combinedClassLoader = new CombinedClassLoader();
        combinedClassLoader.init();

        LOGGER.debug("CELLAR HAZELCAST: start the discovery service tracker");
        discoveryServiceTracker = new ServiceTracker<DiscoveryService, DiscoveryService>(bundleContext, DiscoveryService.class, new ServiceTrackerCustomizer<DiscoveryService, DiscoveryService>() {
            @Override
            public DiscoveryService addingService(ServiceReference<DiscoveryService> serviceReference) {
                DiscoveryService service = bundleContext.getService(serviceReference);
                discoveryServices.add(service);
                return service;
            }

            @Override
            public void modifiedService(ServiceReference<DiscoveryService> serviceReference, DiscoveryService discoveryService) {
                // nothing to do
            }

            @Override
            public void removedService(ServiceReference<DiscoveryService> serviceReference, DiscoveryService discoveryService) {
                discoveryServices.remove(discoveryService);
                bundleContext.ungetService(serviceReference);
            }
        });
        discoveryServiceTracker.open();

        LOGGER.debug("CELLAR HAZELCAST: init Cellar extender");
        extender = new CellarExtender();
        extender.setCombinedClassLoader(combinedClassLoader);
        extender.setBundleContext(bundleContext);
        extender.init();

        LOGGER.debug("CELLAR HAZELCAST: init dispatcher");
        EventHandlerRegistryDispatcher dispatcher = new EventHandlerRegistryDispatcher();
        dispatcher.setHandlerRegistry(eventHandlerRegistry);
        dispatcher.init();

        LOGGER.debug("CELLAR HAZELCAST: create Hazelcast configuration manager");
        HazelcastConfigurationManager hazelcastConfigurationManager = new HazelcastConfigurationManager();
        hazelcastConfigurationManager.setDiscoveryServices(discoveryServices);

        LOGGER.debug("CELLAR HAZELCAST: init Hazelcast service factory");
        hazelcastServiceFactory = new HazelcastServiceFactory();
        hazelcastServiceFactory.setCombinedClassLoader(combinedClassLoader);
        hazelcastServiceFactory.setConfigurationManager(hazelcastConfigurationManager);
        hazelcastServiceFactory.setBundleContext(bundleContext);
        hazelcastServiceFactory.init();
        if (updatedConfig != null) {
            // we have outstanding configuration update: do it now
            updated(updatedConfig);
            updatedConfig = null;
        }

        LOGGER.debug("CELLAR HAZELCAST: register Hazelcast instance");
        hazelcastInstance = hazelcastServiceFactory.getInstance();
        register(HazelcastInstance.class, hazelcastInstance);

        LOGGER.debug("CELLAR HAZELCAST: init discovery task");
        discoveryTask = new DiscoveryTask();
        discoveryTask.setDiscoveryServices(discoveryServices);
        discoveryTask.setConfigurationAdmin(configurationAdmin);
        discoveryTask.init();

        LOGGER.debug("CELLAR HAZELCAST: register Hazelcast cluster manager");
        HazelcastClusterManager clusterManager = new HazelcastClusterManager();
        clusterManager.setInstance(hazelcastInstance);
        clusterManager.setConfigurationAdmin(configurationAdmin);
        clusterManager.setCombinedClassLoader(combinedClassLoader);
        register(ClusterManager.class, clusterManager);

        LOGGER.debug("CELLAR HAZELCAST: create Hazelcast event transport factory");
        HazelcastEventTransportFactory eventTransportFactory = new HazelcastEventTransportFactory();
        eventTransportFactory.setCombinedClassLoader(combinedClassLoader);
        eventTransportFactory.setConfigurationAdmin(configurationAdmin);
        eventTransportFactory.setInstance(hazelcastInstance);
        eventTransportFactory.setDispatcher(dispatcher);
        register(EventTransportFactory.class, eventTransportFactory);

        LOGGER.debug("CELLAR HAZELCAST: init Hazelcast group manager");
        groupManager = new HazelcastGroupManager();
        groupManager.setInstance(hazelcastInstance);
        groupManager.setCombinedClassLoader(combinedClassLoader);
        groupManager.setBundleContext(bundleContext);
        groupManager.setConfigurationAdmin(configurationAdmin);
        groupManager.setEventTransportFactory(eventTransportFactory);
        groupManager.init();
        register(new Class[]{GroupManager.class, ConfigurationListener.class}, groupManager);

        LOGGER.debug("CELLAR HAZELCAST: create Cellar membership listener");
        CellarMembershipListener membershipListener = new CellarMembershipListener(hazelcastInstance);
        membershipListener.setSynchronizers(synchronizers);
        membershipListener.setGroupManager(groupManager);

        Node node = clusterManager.getNode();

        LOGGER.debug("CELLAR HAZELCAST: init topic consumer");
        consumer = new TopicConsumer();
        consumer.setInstance(hazelcastInstance);
        consumer.setDispatcher(dispatcher);
        consumer.setNode(node);
        consumer.setConfigurationAdmin(configurationAdmin);
        consumer.init();

        LOGGER.debug("CELLAR HAZELCAST: init topic producer");
        producer = new TopicProducer();
        producer.setInstance(hazelcastInstance);
        producer.setNode(node);
        producer.setConfigurationAdmin(configurationAdmin);
        producer.init();
        register(EventProducer.class, producer);

        LOGGER.debug("CELLAR HAZELCAST: register basic command store");
        CommandStore commandStore = new BasicCommandStore();
        register(CommandStore.class, commandStore);

        LOGGER.debug("CELLAR HAZELCAST: register clustered execution context");
        ClusteredExecutionContext executionContext = new ClusteredExecutionContext();
        executionContext.setProducer(producer);
        executionContext.setCommandStore(commandStore);
        register(ExecutionContext.class, executionContext);

        LOGGER.debug("CELLAR HAZELCAST: register producer switch command handler");
        ProducerSwitchCommandHandler producerSwitchCommandHandler = new ProducerSwitchCommandHandler();
        producerSwitchCommandHandler.setProducer(producer);
        producerSwitchCommandHandler.setConfigurationAdmin(configurationAdmin);
        register(EventHandler.class, producerSwitchCommandHandler);

        LOGGER.debug("CELLAR HAZELCAST: register producer switch result handler");
        ProducerSwitchResultHandler producerSwitchResultHandler = new ProducerSwitchResultHandler();
        producerSwitchResultHandler.setCommandStore(commandStore);
        register(EventHandler.class, producerSwitchResultHandler);

        LOGGER.debug("CELLAR HAZELCAST: register consumer switch command handler");
        ConsumerSwitchCommandHandler consumerSwitchCommandHandler = new ConsumerSwitchCommandHandler();
        consumerSwitchCommandHandler.setProducer(producer);
        consumerSwitchCommandHandler.setConsumer(consumer);
        consumerSwitchCommandHandler.setConfigurationAdmin(configurationAdmin);
        register(EventHandler.class, consumerSwitchCommandHandler);

        LOGGER.debug("CELLAR HAZELCAST; register consumer switch result handler");
        ConsumerSwitchResultHandler consumerSwitchResultHandler = new ConsumerSwitchResultHandler();
        consumerSwitchResultHandler.setCommandStore(commandStore);
        register(EventHandler.class, consumerSwitchResultHandler);

        LOGGER.debug("CELLAR HAZELCAST: register manage handlers command handler");
        ManageHandlersCommandHandler manageHandlersCommandHandler = new ManageHandlersCommandHandler();
        manageHandlersCommandHandler.setConfigurationAdmin(configurationAdmin);
        manageHandlersCommandHandler.setProducer(producer);
        manageHandlersCommandHandler.setProxyManager(proxyManager);
        register(EventHandler.class, manageHandlersCommandHandler);

        LOGGER.debug("CELLAR HAZELCAST: register manage handlers result handler");
        ManageHandlersResultHandler manageHandlersResultHandler = new ManageHandlersResultHandler();
        manageHandlersResultHandler.setCommandStore(commandStore);
        register(EventHandler.class, manageHandlersResultHandler);

        LOGGER.debug("CELLAR HAZELCAST: register manage group command handler");
        ManageGroupCommandHandler manageGroupCommandHandler = new ManageGroupCommandHandler();
        manageGroupCommandHandler.setProducer(producer);
        manageGroupCommandHandler.setClusterManager(clusterManager);
        manageGroupCommandHandler.setGroupManager(groupManager);
        register(EventHandler.class, manageGroupCommandHandler);

        LOGGER.debug("CELLAR HAZELCAST: register manage group result handler");
        ManageGroupResultHandler manageGroupResultHandler = new ManageGroupResultHandler();
        manageGroupResultHandler.setCommandStore(commandStore);
        register(EventHandler.class, manageGroupResultHandler);

        LOGGER.debug("CELLAR HAZELCAST: register shutdown command handler");
        ShutdownCommandHandler shutdownCommandHandler = new ShutdownCommandHandler();
        shutdownCommandHandler.setBundleContext(bundleContext);
        shutdownCommandHandler.setProducer(producer);
        shutdownCommandHandler.setClusterManager(clusterManager);
        shutdownCommandHandler.setGroupManager(groupManager);
        register(EventHandler.class, shutdownCommandHandler);

        LOGGER.debug("CELLAR HAZELCAST: register shutdown command result handler");
        ShutdownResultHandler shutdownResultHandler = new ShutdownResultHandler();
        shutdownResultHandler.setCommandStore(commandStore);
        register(EventHandler.class, shutdownCommandHandler);

        LOGGER.debug("CELLAR HAZELCAST: start the synchronizer service tracker");
        synchronizerServiceTracker = new ServiceTracker<Synchronizer, Synchronizer>(bundleContext, Synchronizer.class, new ServiceTrackerCustomizer<Synchronizer, Synchronizer>() {
            @Override
            public Synchronizer addingService(ServiceReference<Synchronizer> serviceReference) {
                Synchronizer service = bundleContext.getService(serviceReference);
                synchronizers.add(service);
                return service;
            }

            @Override
            public void modifiedService(ServiceReference<Synchronizer> serviceReference, Synchronizer synchronizer) {
                // nothing to do
            }

            @Override
            public void removedService(ServiceReference<Synchronizer> serviceReference, Synchronizer synchronizer) {
                synchronizers.remove(synchronizer);
                bundleContext.ungetService(serviceReference);
            }
        });
        synchronizerServiceTracker.open();

        LOGGER.debug("CELLAR HAZELCAST: register Cellar Core MBean");
        CellarMBeanImpl cellarMBean = new CellarMBeanImpl();
        cellarMBean.setBundleContext(bundleContext);
        cellarMBean.setClusterManager(clusterManager);
        cellarMBean.setGroupManager(groupManager);
        cellarMBean.setExecutionContext(executionContext);
        Hashtable props = new Hashtable();
        props.put("jmx.objectname", "org.apache.karaf.cellar:type=core,name=" + System.getProperty("karaf.name"));
        coreMBeanRegistration = bundleContext.registerService(getInterfaceNames(cellarMBean), cellarMBean, props);

        LOGGER.debug("CELLAR HAZELCAST: register Cellar Node MBean");
        CellarNodeMBeanImpl cellarNodeMBean = new CellarNodeMBeanImpl();
        cellarNodeMBean.setClusterManager(clusterManager);
        cellarNodeMBean.setExecutionContext(executionContext);
        props = new Hashtable();
        props.put("jmx.objectname", "org.apache.karaf.cellar:type=node,name=" + System.getProperty("karaf.name"));
        nodeMBeanRegistration = bundleContext.registerService(getInterfaceNames(cellarNodeMBean), cellarNodeMBean, props);

        LOGGER.debug("CELLAR HAZELCAST: register Cellar Group MBean");
        CellarGroupMBeanImpl cellarGroupMBean = new CellarGroupMBeanImpl();
        cellarGroupMBean.setClusterManager(clusterManager);
        cellarGroupMBean.setExecutionContext(executionContext);
        cellarGroupMBean.setGroupManager(groupManager);
        props = new Hashtable();
        props.put("jmx.objectname", "org.apache.karaf.cellar:type=group,name=" + System.getProperty("karaf.name"));
        groupMBeanRegistration = bundleContext.registerService(getInterfaceNames(cellarGroupMBean), cellarGroupMBean, props);
    }

    @Override
    public void doStop() {
        super.doStop();

        if (groupMBeanRegistration != null) {
            groupMBeanRegistration.unregister();
            groupMBeanRegistration = null;
        }
        if (nodeMBeanRegistration != null) {
            nodeMBeanRegistration.unregister();
            nodeMBeanRegistration = null;
        }
        if (coreMBeanRegistration != null) {
            coreMBeanRegistration.unregister();
            coreMBeanRegistration = null;
        }
        if (synchronizerServiceTracker != null) {
            synchronizerServiceTracker.close();
            synchronizerServiceTracker = null;
        }
        if (groupManager != null) {
            try {
                groupManager.destroy();
            } catch (Exception e) {
                LOGGER.trace("Error occured destroying the group manager", e);
            }
            groupManager = null;
        }
        if (hazelcastServiceFactory != null) {
            hazelcastServiceFactory.destroy();
            hazelcastServiceFactory = null;
        }
        if (hazelcastInstance != null) {
            hazelcastInstance.shutdown();
            hazelcastInstance = null;
        }
        if (discoveryTask != null) {
            discoveryTask.destroy();
            discoveryTask = null;
        }
        if (producer != null) {
            producer.destroy();
            producer = null;
        }
        if (consumer != null) {
            consumer.destroy();
            consumer = null;
        }
        if (extender != null) {
            extender.destroy();
            extender = null;
        }
        if (discoveryServiceTracker != null) {
            discoveryServiceTracker.close();
            discoveryServiceTracker = null;
        }
        if (combinedClassLoader != null) {
            combinedClassLoader.destroy();
            combinedClassLoader = null;
        }
    }

    @Override
    public void updated(Dictionary config) {
        if (config == null) {
            return;
        }
        HashMap map = new HashMap();
        for (Enumeration keys = config.keys(); keys.hasMoreElements();) {
            Object key = keys.nextElement();
            map.put(key, config.get(key));
        }
        if (hazelcastServiceFactory != null) {
            updated(map);
        } else {
            // postpone configuration update
            updatedConfig = map;
        }
    }

    private void updated(HashMap config) {
        try {
            hazelcastServiceFactory.update(config);
        } catch (Exception e) {
            LOGGER.error("Can't update Hazelcast service factory", e);
        }
    }
}
