adding activator class for messaging and calling terminate of event recievers in de-activation of te bundle
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 85142e3..93f391f 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -59,13 +59,6 @@
}
}
-// public void execute() {
-// super.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Load balancer topology receiver thread started");
-// }
-// }
-
public void initializeTopology() {
if (initialized) {
return;
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 3786af8..b235208 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -25,7 +25,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.services.DistributedObjectProvider;
-import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonApplicationSignUpEventReceiver;
import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
@@ -38,7 +37,6 @@
import org.apache.stratos.load.balancer.event.receivers.LoadBalancerTopologyEventReceiver;
import org.apache.stratos.load.balancer.exception.TenantAwareLoadBalanceEndpointException;
import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
-import org.apache.stratos.load.balancer.util.LoadBalancerConstants;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -63,7 +61,6 @@
import java.io.File;
import java.util.Collection;
-import java.util.concurrent.ExecutorService;
/**
* @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true"
@@ -90,7 +87,6 @@
private static final Log log = LogFactory.getLog(LoadBalancerServiceComponent.class);
private boolean activated = false;
- private ExecutorService executorService;
private LoadBalancerTopologyEventReceiver topologyEventReceiver;
private TenantEventReceiver tenantEventReceiver;
private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver;
@@ -124,11 +120,6 @@
// Configure topology filters
TopologyFilterConfigurator.configure(configuration);
- int threadPoolSize = Integer.getInteger(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_SIZE_KEY,
- LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE);
- executorService = StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID,
- threadPoolSize);
-
TopologyProvider topologyProvider = LoadBalancerConfiguration.getInstance().getTopologyProvider();
if (topologyProvider == null) {
topologyProvider = new TopologyProvider();
@@ -137,18 +128,18 @@
if (configuration.isMultiTenancyEnabled() || configuration.isDomainMappingEnabled()) {
// Start tenant & application signup event receivers
- startTenantEventReceiver(executorService);
- startApplicationSignUpEventReceiver(executorService, topologyProvider);
+ startTenantEventReceiver();
+ startApplicationSignUpEventReceiver(topologyProvider);
}
if (configuration.isDomainMappingEnabled()) {
// Start domain mapping event receiver
- startDomainMappingEventReceiver(executorService, topologyProvider);
+ startDomainMappingEventReceiver(topologyProvider);
}
if (configuration.isTopologyEventListenerEnabled()) {
// Start topology receiver
- startTopologyEventReceiver(executorService, topologyProvider);
+ startTopologyEventReceiver(topologyProvider);
}
if (configuration.isCepStatsPublisherEnabled()) {
@@ -167,43 +158,28 @@
}
}
- private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startDomainMappingEventReceiver(TopologyProvider topologyProvider) {
if (domainMappingEventReceiver != null) {
return;
}
domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider);
-// domainMappingEventReceiver.setExecutorService(executorService);
-// domainMappingEventReceiver.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Domain mapping event receiver thread started");
-// }
}
- private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startApplicationSignUpEventReceiver(TopologyProvider topologyProvider) {
if (applicationSignUpEventReceiver != null) {
return;
}
applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
-// applicationSignUpEventReceiver.setExecutorService(executorService);
-// applicationSignUpEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Application signup event receiver thread started");
- }
}
- private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ private void startTopologyEventReceiver(TopologyProvider topologyProvider) {
if (topologyEventReceiver != null) {
return;
}
topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
-// topologyEventReceiver.setExecutorService(executorService);
-// topologyEventReceiver.execute();
-// if (log.isInfoEnabled()) {
-// log.info("Topology receiver thread started");
-// }
if (log.isInfoEnabled()) {
if (TopologyServiceFilter.getInstance().isActive()) {
@@ -223,14 +199,8 @@
}
}
- private void startTenantEventReceiver(ExecutorService executorService) {
-
+ private void startTenantEventReceiver() {
tenantEventReceiver = TenantEventReceiver.getInstance();
-// tenantEventReceiver.setExecutorService(executorService);
-// tenantEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Tenant event receiver thread started");
- }
}
private void startStatisticsNotifier(TopologyProvider topologyProvider) {
@@ -256,33 +226,6 @@
log.warn("An error occurred while removing endpoint deployer", e);
}
- // Terminate topology receiver
-// if (topologyEventReceiver != null) {
-// try {
-// topologyEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating topology event receiver", e);
-// }
-// }
-
- // Terminate application signup event receiver
-// if (applicationSignUpEventReceiver != null) {
-// try {
-// applicationSignUpEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating application signup event receiver", e);
-// }
-// }
-
- // Terminate domain mapping event receiver
-// if (domainMappingEventReceiver != null) {
-// try {
-// domainMappingEventReceiver.terminate();
-// } catch (Exception e) {
-// log.warn("An error occurred while terminating domain mapping event receiver", e);
-// }
-// }
-
// Terminate statistics notifier
if (statisticsNotifier != null) {
try {
@@ -291,15 +234,6 @@
log.warn("An error occurred while terminating health statistics notifier", e);
}
}
-
- // Shutdown executor service
- if (executorService != null) {
- try {
- executorService.shutdownNow();
- } catch (Exception e) {
- log.warn("An error occurred while shutting down load balancer executor service", e);
- }
- }
}
/**
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
new file mode 100644
index 0000000..c97125b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.internal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
+import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
+import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
+import org.apache.stratos.messaging.message.receiver.domain.mapping.DomainMappingEventReceiver;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
+import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.osgi.service.component.ComponentContext;
+
+/**
+ * @scr.component name="org.apache.stratos.messaging.internal.MessagingServiceComponent"
+ * immediate="true"
+ */
+public class MessagingServiceComponent {
+
+ private static final Log log = LogFactory.getLog(MessagingServiceComponent.class);
+
+ protected void activate(ComponentContext context) {
+ try {
+ log.info("Messaging Service bundle activated");
+ } catch (Exception e) {
+ log.error("Could not activate Messaging Service component", e);
+ }
+ }
+
+ protected void deactivate(ComponentContext context) {
+ // deactivate all message receivers
+ try {
+ ApplicationSignUpEventReceiver.getInstance().terminate();
+ ApplicationsEventReceiver.getInstance().terminate();
+ ClusterStatusEventReceiver.getInstance().terminate();
+ DomainMappingEventReceiver.getInstance().terminate();
+ HealthStatEventReceiver.getInstance().terminate();
+ InitializerEventReceiver.getInstance().terminate();
+ TenantEventReceiver.getInstance().terminate();
+ TopologyEventReceiver.getInstance().terminate();
+ log.info("Messaging Service component is deactivated");
+ } catch (Exception e) {
+ log.error("Could not de-activate Messaging Service component", e);
+ }
+ }
+}
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index e191799..be42b43 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -87,6 +87,11 @@
}
}
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
+
public boolean isSubscribed() {
return ((eventSubscriber != null) && (eventSubscriber.isSubscribed()));
}
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 4e4c04b..6de99c0 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -64,6 +64,11 @@
return instance;
}
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
+
private void execute() {
try {
// Start topic subscriber thread
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index ba124a7..a9d2602 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -82,4 +82,9 @@
}
}
}
+
+ public void terminate() {
+ eventSubscriber.terminate();
+ messageDelegator.terminate();
+ }
}