| /* |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.karaf.cellar.hazelcast.internal.osgi; |
| |
| import com.hazelcast.core.HazelcastInstance; |
| import org.apache.aries.proxy.ProxyManager; |
| import org.apache.karaf.cellar.core.ClusterManager; |
| import org.apache.karaf.cellar.core.GroupManager; |
| import org.apache.karaf.cellar.core.Node; |
| import org.apache.karaf.cellar.core.Synchronizer; |
| import org.apache.karaf.cellar.core.command.BasicCommandStore; |
| import org.apache.karaf.cellar.core.command.ClusteredExecutionContext; |
| import org.apache.karaf.cellar.core.command.CommandStore; |
| import org.apache.karaf.cellar.core.command.ExecutionContext; |
| import org.apache.karaf.cellar.core.control.*; |
| import org.apache.karaf.cellar.core.discovery.DiscoveryService; |
| import org.apache.karaf.cellar.core.discovery.DiscoveryTask; |
| import org.apache.karaf.cellar.core.event.*; |
| import org.apache.karaf.cellar.core.management.CellarGroupMBean; |
| import org.apache.karaf.cellar.core.management.CellarMBean; |
| import org.apache.karaf.cellar.core.management.CellarNodeMBean; |
| import org.apache.karaf.cellar.core.utils.CombinedClassLoader; |
| import org.apache.karaf.cellar.hazelcast.*; |
| import org.apache.karaf.cellar.hazelcast.factory.HazelcastConfigurationManager; |
| import org.apache.karaf.cellar.hazelcast.factory.HazelcastServiceFactory; |
| import org.apache.karaf.cellar.hazelcast.management.internal.CellarGroupMBeanImpl; |
| import org.apache.karaf.cellar.hazelcast.management.internal.CellarMBeanImpl; |
| import org.apache.karaf.cellar.hazelcast.management.internal.CellarNodeMBeanImpl; |
| import org.apache.karaf.util.tracker.BaseActivator; |
| import org.apache.karaf.util.tracker.annotation.Managed; |
| import org.apache.karaf.util.tracker.annotation.ProvideService; |
| import org.apache.karaf.util.tracker.annotation.RequireService; |
| import org.apache.karaf.util.tracker.annotation.Services; |
| import org.osgi.framework.ServiceReference; |
| import org.osgi.framework.ServiceRegistration; |
| import org.osgi.service.cm.ConfigurationAdmin; |
| import org.osgi.service.cm.ManagedService; |
| import org.osgi.service.cm.SynchronousConfigurationListener; |
| import org.osgi.util.tracker.ServiceTracker; |
| import org.osgi.util.tracker.ServiceTrackerCustomizer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.*; |
| |
| @Services( |
| provides = { |
| @ProvideService(HazelcastInstance.class), |
| @ProvideService(ClusterManager.class), |
| @ProvideService(GroupManager.class), |
| @ProvideService(EventTransportFactory.class), |
| @ProvideService(EventProducer.class), |
| @ProvideService(ExecutionContext.class), |
| @ProvideService(EventHandler.class), |
| @ProvideService(CommandStore.class), |
| @ProvideService(CellarMBean.class), |
| @ProvideService(CellarNodeMBean.class), |
| @ProvideService(CellarGroupMBean.class) |
| }, |
| requires = { |
| @RequireService(ConfigurationAdmin.class), |
| @RequireService(ProxyManager.class), |
| @RequireService(EventHandlerRegistry.class) |
| } |
| ) |
| @Managed("org.apache.karaf.cellar.discovery") |
| public class Activator extends BaseActivator implements ManagedService { |
| |
| private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class); |
| |
| private CombinedClassLoader combinedClassLoader; |
| private HazelcastServiceFactory hazelcastServiceFactory; |
| private List<DiscoveryService> discoveryServices = new ArrayList<DiscoveryService>(); |
| private List<Synchronizer> synchronizers = new ArrayList<Synchronizer>(); |
| private HazelcastInstance hazelcastInstance; |
| private HazelcastGroupManager groupManager; |
| private DiscoveryTask discoveryTask; |
| private CellarExtender extender; |
| private TopicProducer producer; |
| private TopicConsumer consumer; |
| private ServiceTracker<DiscoveryService, DiscoveryService> discoveryServiceTracker; |
| private ServiceTracker<Synchronizer, Synchronizer> synchronizerServiceTracker; |
| |
| private volatile ServiceRegistration coreMBeanRegistration; |
| private volatile ServiceRegistration nodeMBeanRegistration; |
| private volatile ServiceRegistration groupMBeanRegistration; |
| |
| private HashMap updatedConfig; |
| |
| private EventHandlerRegistryDispatcher dispatcher; |
| |
| @Override |
| public void doStart() throws Exception { |
| |
| ConfigurationAdmin configurationAdmin = getTrackedService(ConfigurationAdmin.class); |
| if (configurationAdmin == null) |
| return; |
| EventHandlerRegistry eventHandlerRegistry = getTrackedService(EventHandlerRegistry.class); |
| if (eventHandlerRegistry == null) |
| return; |
| ProxyManager proxyManager = getTrackedService(ProxyManager.class); |
| if (proxyManager == null) |
| return; |
| |
| LOGGER.debug("CELLAR HAZELCAST: init combined class loader"); |
| combinedClassLoader = new CombinedClassLoader(); |
| combinedClassLoader.init(); |
| |
| LOGGER.debug("CELLAR HAZELCAST: start the discovery service tracker"); |
| discoveryServiceTracker = new ServiceTracker<DiscoveryService, DiscoveryService>(bundleContext, DiscoveryService.class, new ServiceTrackerCustomizer<DiscoveryService, DiscoveryService>() { |
| @Override |
| public DiscoveryService addingService(ServiceReference<DiscoveryService> serviceReference) { |
| DiscoveryService service = bundleContext.getService(serviceReference); |
| discoveryServices.add(service); |
| return service; |
| } |
| |
| @Override |
| public void modifiedService(ServiceReference<DiscoveryService> serviceReference, DiscoveryService discoveryService) { |
| // nothing to do |
| } |
| |
| @Override |
| public void removedService(ServiceReference<DiscoveryService> serviceReference, DiscoveryService discoveryService) { |
| discoveryServices.remove(discoveryService); |
| bundleContext.ungetService(serviceReference); |
| } |
| }); |
| discoveryServiceTracker.open(); |
| |
| LOGGER.debug("CELLAR HAZELCAST: init Cellar extender"); |
| extender = new CellarExtender(); |
| extender.setCombinedClassLoader(combinedClassLoader); |
| extender.setBundleContext(bundleContext); |
| extender.init(); |
| |
| LOGGER.debug("CELLAR HAZELCAST: init dispatcher"); |
| dispatcher = new EventHandlerRegistryDispatcher(); |
| dispatcher.setHandlerRegistry(eventHandlerRegistry); |
| dispatcher.init(); |
| |
| LOGGER.debug("CELLAR HAZELCAST: create Hazelcast configuration manager"); |
| HazelcastConfigurationManager hazelcastConfigurationManager = new HazelcastConfigurationManager(); |
| hazelcastConfigurationManager.setDiscoveryServices(discoveryServices); |
| |
| LOGGER.debug("CELLAR HAZELCAST: init Hazelcast service factory"); |
| hazelcastServiceFactory = new HazelcastServiceFactory(); |
| hazelcastServiceFactory.setCombinedClassLoader(combinedClassLoader); |
| hazelcastServiceFactory.setConfigurationManager(hazelcastConfigurationManager); |
| hazelcastServiceFactory.setBundleContext(bundleContext); |
| hazelcastServiceFactory.init(); |
| if (updatedConfig != null) { |
| // we have outstanding configuration update: do it now |
| updated(updatedConfig); |
| updatedConfig = null; |
| } |
| |
| LOGGER.debug("CELLAR HAZELCAST: register Hazelcast instance"); |
| hazelcastInstance = hazelcastServiceFactory.getInstance(); |
| register(HazelcastInstance.class, hazelcastInstance); |
| |
| LOGGER.debug("CELLAR HAZELCAST: init discovery task"); |
| discoveryTask = new DiscoveryTask(); |
| discoveryTask.setDiscoveryServices(discoveryServices); |
| discoveryTask.setConfigurationAdmin(configurationAdmin); |
| discoveryTask.init(); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register Hazelcast cluster manager"); |
| HazelcastClusterManager clusterManager = new HazelcastClusterManager(); |
| clusterManager.setInstance(hazelcastInstance); |
| clusterManager.setConfigurationAdmin(configurationAdmin); |
| clusterManager.setCombinedClassLoader(combinedClassLoader); |
| register(ClusterManager.class, clusterManager); |
| |
| LOGGER.debug("CELLAR HAZELCAST: create Hazelcast event transport factory"); |
| HazelcastEventTransportFactory eventTransportFactory = new HazelcastEventTransportFactory(); |
| eventTransportFactory.setCombinedClassLoader(combinedClassLoader); |
| eventTransportFactory.setConfigurationAdmin(configurationAdmin); |
| eventTransportFactory.setInstance(hazelcastInstance); |
| eventTransportFactory.setDispatcher(dispatcher); |
| register(EventTransportFactory.class, eventTransportFactory); |
| |
| LOGGER.debug("CELLAR HAZELCAST: init Hazelcast group manager"); |
| groupManager = new HazelcastGroupManager(); |
| groupManager.setInstance(hazelcastInstance); |
| groupManager.setCombinedClassLoader(combinedClassLoader); |
| groupManager.setBundleContext(bundleContext); |
| groupManager.setConfigurationAdmin(configurationAdmin); |
| groupManager.setEventTransportFactory(eventTransportFactory); |
| groupManager.init(); |
| register(new Class[]{GroupManager.class, SynchronousConfigurationListener.class}, groupManager); |
| |
| LOGGER.debug("CELLAR HAZELCAST: create Cellar membership listener"); |
| CellarMembershipListener membershipListener = new CellarMembershipListener(hazelcastInstance); |
| membershipListener.setSynchronizers(synchronizers); |
| membershipListener.setGroupManager(groupManager); |
| |
| Node node = clusterManager.getNode(); |
| |
| LOGGER.debug("CELLAR HAZELCAST: init topic consumer"); |
| consumer = new TopicConsumer(); |
| consumer.setInstance(hazelcastInstance); |
| consumer.setDispatcher(dispatcher); |
| consumer.setNode(node); |
| consumer.setConfigurationAdmin(configurationAdmin); |
| consumer.init(); |
| |
| LOGGER.debug("CELLAR HAZELCAST: init topic producer"); |
| producer = new TopicProducer(); |
| producer.setInstance(hazelcastInstance); |
| producer.setNode(node); |
| producer.setConfigurationAdmin(configurationAdmin); |
| producer.init(); |
| register(EventProducer.class, producer); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register basic command store"); |
| CommandStore commandStore = new BasicCommandStore(); |
| register(CommandStore.class, commandStore); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register clustered execution context"); |
| ClusteredExecutionContext executionContext = new ClusteredExecutionContext(); |
| executionContext.setProducer(producer); |
| executionContext.setCommandStore(commandStore); |
| register(ExecutionContext.class, executionContext); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register producer switch command handler"); |
| ProducerSwitchCommandHandler producerSwitchCommandHandler = new ProducerSwitchCommandHandler(); |
| producerSwitchCommandHandler.setProducer(producer); |
| producerSwitchCommandHandler.setConfigurationAdmin(configurationAdmin); |
| register(EventHandler.class, producerSwitchCommandHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register producer switch result handler"); |
| ProducerSwitchResultHandler producerSwitchResultHandler = new ProducerSwitchResultHandler(); |
| producerSwitchResultHandler.setCommandStore(commandStore); |
| register(EventHandler.class, producerSwitchResultHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register consumer switch command handler"); |
| ConsumerSwitchCommandHandler consumerSwitchCommandHandler = new ConsumerSwitchCommandHandler(); |
| consumerSwitchCommandHandler.setProducer(producer); |
| consumerSwitchCommandHandler.setConsumer(consumer); |
| consumerSwitchCommandHandler.setConfigurationAdmin(configurationAdmin); |
| register(EventHandler.class, consumerSwitchCommandHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST; register consumer switch result handler"); |
| ConsumerSwitchResultHandler consumerSwitchResultHandler = new ConsumerSwitchResultHandler(); |
| consumerSwitchResultHandler.setCommandStore(commandStore); |
| register(EventHandler.class, consumerSwitchResultHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register manage handlers command handler"); |
| ManageHandlersCommandHandler manageHandlersCommandHandler = new ManageHandlersCommandHandler(); |
| manageHandlersCommandHandler.setConfigurationAdmin(configurationAdmin); |
| manageHandlersCommandHandler.setProducer(producer); |
| manageHandlersCommandHandler.setProxyManager(proxyManager); |
| register(EventHandler.class, manageHandlersCommandHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register manage handlers result handler"); |
| ManageHandlersResultHandler manageHandlersResultHandler = new ManageHandlersResultHandler(); |
| manageHandlersResultHandler.setCommandStore(commandStore); |
| register(EventHandler.class, manageHandlersResultHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register manage group command handler"); |
| ManageGroupCommandHandler manageGroupCommandHandler = new ManageGroupCommandHandler(); |
| manageGroupCommandHandler.setProducer(producer); |
| manageGroupCommandHandler.setClusterManager(clusterManager); |
| manageGroupCommandHandler.setGroupManager(groupManager); |
| register(EventHandler.class, manageGroupCommandHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register manage group result handler"); |
| ManageGroupResultHandler manageGroupResultHandler = new ManageGroupResultHandler(); |
| manageGroupResultHandler.setCommandStore(commandStore); |
| register(EventHandler.class, manageGroupResultHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register shutdown command handler"); |
| ShutdownCommandHandler shutdownCommandHandler = new ShutdownCommandHandler(); |
| shutdownCommandHandler.setBundleContext(bundleContext); |
| shutdownCommandHandler.setProducer(producer); |
| shutdownCommandHandler.setClusterManager(clusterManager); |
| shutdownCommandHandler.setGroupManager(groupManager); |
| register(EventHandler.class, shutdownCommandHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register shutdown command result handler"); |
| ShutdownResultHandler shutdownResultHandler = new ShutdownResultHandler(); |
| shutdownResultHandler.setCommandStore(commandStore); |
| register(EventHandler.class, shutdownCommandHandler); |
| |
| LOGGER.debug("CELLAR HAZELCAST: start the synchronizer service tracker"); |
| synchronizerServiceTracker = new ServiceTracker<Synchronizer, Synchronizer>(bundleContext, Synchronizer.class, new ServiceTrackerCustomizer<Synchronizer, Synchronizer>() { |
| @Override |
| public Synchronizer addingService(ServiceReference<Synchronizer> serviceReference) { |
| Synchronizer service = bundleContext.getService(serviceReference); |
| synchronizers.add(service); |
| return service; |
| } |
| |
| @Override |
| public void modifiedService(ServiceReference<Synchronizer> serviceReference, Synchronizer synchronizer) { |
| // nothing to do |
| } |
| |
| @Override |
| public void removedService(ServiceReference<Synchronizer> serviceReference, Synchronizer synchronizer) { |
| synchronizers.remove(synchronizer); |
| bundleContext.ungetService(serviceReference); |
| } |
| }); |
| synchronizerServiceTracker.open(); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register Cellar Core MBean"); |
| CellarMBeanImpl cellarMBean = new CellarMBeanImpl(); |
| cellarMBean.setBundleContext(bundleContext); |
| cellarMBean.setClusterManager(clusterManager); |
| cellarMBean.setGroupManager(groupManager); |
| cellarMBean.setExecutionContext(executionContext); |
| Hashtable props = new Hashtable(); |
| props.put("jmx.objectname", "org.apache.karaf.cellar:type=core,name=" + System.getProperty("karaf.name")); |
| coreMBeanRegistration = bundleContext.registerService(getInterfaceNames(cellarMBean), cellarMBean, props); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register Cellar Node MBean"); |
| CellarNodeMBeanImpl cellarNodeMBean = new CellarNodeMBeanImpl(); |
| cellarNodeMBean.setClusterManager(clusterManager); |
| cellarNodeMBean.setExecutionContext(executionContext); |
| props = new Hashtable(); |
| props.put("jmx.objectname", "org.apache.karaf.cellar:type=node,name=" + System.getProperty("karaf.name")); |
| nodeMBeanRegistration = bundleContext.registerService(getInterfaceNames(cellarNodeMBean), cellarNodeMBean, props); |
| |
| LOGGER.debug("CELLAR HAZELCAST: register Cellar Group MBean"); |
| CellarGroupMBeanImpl cellarGroupMBean = new CellarGroupMBeanImpl(); |
| cellarGroupMBean.setClusterManager(clusterManager); |
| cellarGroupMBean.setExecutionContext(executionContext); |
| cellarGroupMBean.setGroupManager(groupManager); |
| props = new Hashtable(); |
| props.put("jmx.objectname", "org.apache.karaf.cellar:type=group,name=" + System.getProperty("karaf.name")); |
| groupMBeanRegistration = bundleContext.registerService(getInterfaceNames(cellarGroupMBean), cellarGroupMBean, props); |
| } |
| |
| @Override |
| public void doStop() { |
| super.doStop(); |
| |
| if (groupMBeanRegistration != null) { |
| groupMBeanRegistration.unregister(); |
| groupMBeanRegistration = null; |
| } |
| if (nodeMBeanRegistration != null) { |
| nodeMBeanRegistration.unregister(); |
| nodeMBeanRegistration = null; |
| } |
| if (coreMBeanRegistration != null) { |
| coreMBeanRegistration.unregister(); |
| coreMBeanRegistration = null; |
| } |
| if (synchronizerServiceTracker != null) { |
| synchronizerServiceTracker.close(); |
| synchronizerServiceTracker = null; |
| } |
| if (groupManager != null) { |
| try { |
| groupManager.destroy(); |
| } catch (Exception e) { |
| LOGGER.trace("Error occured destroying the group manager", e); |
| } |
| groupManager = null; |
| } |
| if (hazelcastServiceFactory != null) { |
| hazelcastServiceFactory.destroy(); |
| hazelcastServiceFactory = null; |
| } |
| if (hazelcastInstance != null) { |
| hazelcastInstance.shutdown(); |
| hazelcastInstance = null; |
| } |
| if (discoveryTask != null) { |
| discoveryTask.destroy(); |
| discoveryTask = null; |
| } |
| if (producer != null) { |
| producer.destroy(); |
| producer = null; |
| } |
| if (consumer != null) { |
| consumer.destroy(); |
| consumer = null; |
| } |
| if (extender != null) { |
| extender.destroy(); |
| extender = null; |
| } |
| if (discoveryServiceTracker != null) { |
| discoveryServiceTracker.close(); |
| discoveryServiceTracker = null; |
| } |
| if (combinedClassLoader != null) { |
| combinedClassLoader.destroy(); |
| combinedClassLoader = null; |
| } |
| if (dispatcher != null) { |
| dispatcher.destroy(); |
| dispatcher = null; |
| } |
| } |
| |
| @Override |
| public void updated(Dictionary config) { |
| if (config == null) { |
| return; |
| } |
| HashMap map = new HashMap(); |
| for (Enumeration keys = config.keys(); keys.hasMoreElements();) { |
| Object key = keys.nextElement(); |
| map.put(key, config.get(key)); |
| } |
| if (hazelcastServiceFactory != null) { |
| updated(map); |
| } else { |
| // postpone configuration update |
| updatedConfig = map; |
| } |
| } |
| |
| private void updated(HashMap config) { |
| try { |
| hazelcastServiceFactory.update(config); |
| } catch (Exception e) { |
| LOGGER.error("Can't update Hazelcast service factory", e); |
| } |
| } |
| } |