Introduced an event publisher pool to avoid large number of connections being created to message broker
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
index 3bf73e7..a1be237 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/Main.java
@@ -19,13 +19,15 @@
package org.apache.stratos.cartridge.agent;
-import java.lang.reflect.Constructor;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.PropertyConfigurator;
import org.apache.stratos.cartridge.agent.config.CartridgeAgentConfiguration;
import org.apache.stratos.cartridge.agent.config.configurator.JndiConfigurator;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.util.Constants;
+
+import java.lang.reflect.Constructor;
/**
* Cartridge agent main class.
@@ -37,6 +39,20 @@
public static void main(String[] args) {
try {
+ // Add shutdown hook
+ final Thread mainThread = Thread.currentThread();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ // Close event publisher connections to message broker
+ EventPublisherPool.close(Constants.INSTANCE_STATUS_TOPIC);
+ mainThread.join();
+ } catch (Exception e) {
+ log.error(e);
+ }
+ }
+ });
+
// Configure log4j properties
if(log.isDebugEnabled()) {
log.debug("Configuring log4j.properties file path");
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
index 367f61d..9c2e21f 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/event/publisher/CartridgeAgentEventPublisher.java
@@ -27,6 +27,7 @@
import org.apache.stratos.cartridge.agent.statistics.publisher.HealthStatisticsNotifier;
import org.apache.stratos.cartridge.agent.util.ExtensionUtils;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent;
import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent;
import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent;
@@ -55,7 +56,7 @@
CartridgeAgentConfiguration.getInstance().getPartitionId(),
CartridgeAgentConfiguration.getInstance().getMemberId());
- EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
eventPublisher.publish(event);
setStarted(true);
if (log.isInfoEnabled()) {
@@ -82,7 +83,8 @@
CartridgeAgentConfiguration.getInstance().getPartitionId(),
CartridgeAgentConfiguration.getInstance().getMemberId());
- EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+ // Event publisher connection will
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
eventPublisher.publish(event);
if (log.isInfoEnabled()) {
log.info("Instance activated event published");
@@ -118,7 +120,7 @@
CartridgeAgentConfiguration.getInstance().getPartitionId(),
CartridgeAgentConfiguration.getInstance().getMemberId());
- EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
eventPublisher.publish(event);
setReadyToShutdown(true);
if (log.isInfoEnabled()) {
@@ -143,7 +145,7 @@
CartridgeAgentConfiguration.getInstance().getPartitionId(),
CartridgeAgentConfiguration.getInstance().getMemberId());
- EventPublisher eventPublisher = new EventPublisher(Constants.INSTANCE_STATUS_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_STATUS_TOPIC);
eventPublisher.publish(event);
setMaintenance(true);
if (log.isInfoEnabled()) {
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
index 9cb2869..77e3ac4 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java
@@ -32,19 +32,16 @@
import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener;
import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.util.Constants;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
-import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.registry.core.session.UserRegistry;
import org.wso2.carbon.utils.ConfigurationContextService;
-import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
-
-import java.util.List;
/**
* Registering Cloud Controller Service.
@@ -57,7 +54,24 @@
* interface=
* "org.wso2.carbon.registry.core.service.RegistryService"
* cardinality="1..1" policy="dynamic" bind="setRegistryService"
- * unbind="unsetRegistryService"
+ * unbind="unsetR/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */egistryService"
* @scr.reference name="config.context.service"
* interface="org.wso2.carbon.utils.ConfigurationContextService"
* cardinality="1..1" policy="dynamic"
@@ -87,7 +101,7 @@
Thread tdelegator = new Thread(delegator);
tdelegator.start();
- // Register cloud controller service
+ // Register cloud controller service E
BundleContext bundleContext = context.getBundleContext();
bundleContext.registerService(CloudControllerService.class.getName(), new CloudControllerServiceImpl(), null);
@@ -151,11 +165,8 @@
}
protected void deactivate(ComponentContext ctx) {
-
- List<EventPublisher> publishers = dataHolder.getAllEventPublishers();
- for (EventPublisher topicPublisher : publishers) {
- topicPublisher.close();
- }
+ // Close event publisher connections to message broker
+ EventPublisherPool.close(Constants.TOPOLOGY_TOPIC);
}
}
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
index 970e2c0..9b05b5d 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/runtime/FasterLookUpDataHolder.java
@@ -22,12 +22,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.pojo.*;
import org.apache.stratos.cloud.controller.registry.RegistryManager;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -88,12 +86,6 @@
private transient DataPublisherConfig dataPubConfig;
private boolean enableTopologySync;
private transient TopologyConfig topologyConfig;
-
- /**
- * Key - name of the topic
- * Value - corresponding EventPublisher
- */
- private transient Map<String, EventPublisher> topicToPublisherMap = new HashMap<String, EventPublisher>();
private transient AsyncDataPublisher dataPublisher;
private String streamId;
@@ -244,18 +236,6 @@
public void setTopologyConfig(TopologyConfig topologyConfig) {
this.topologyConfig = topologyConfig;
}
-
- public EventPublisher getEventPublisher(String topic){
- return topicToPublisherMap.get(topic);
- }
-
- public List<EventPublisher> getAllEventPublishers() {
- return new ArrayList<EventPublisher>(topicToPublisherMap.values());
- }
-
- public void addEventPublisher(EventPublisher publisher, String topicName) {
- topicToPublisherMap.put(topicName, publisher);
- }
public DataPublisherConfig getDataPubConfig() {
return dataPubConfig;
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
index c039af7..86237d8 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java
@@ -23,6 +23,7 @@
import org.apache.stratos.cloud.controller.pojo.ClusterContext;
import org.apache.stratos.cloud.controller.pojo.PortMapping;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Port;
import org.apache.stratos.messaging.domain.topology.ServiceType;
@@ -162,7 +163,7 @@
}
public static void publishEvent(Event event) {
- EventPublisher eventPublisher = new EventPublisher(Constants.TOPOLOGY_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TOPOLOGY_TOPIC);
eventPublisher.publish(event);
}
}
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
index c843fdb..6857bd6 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/ADCManagementServerComponent.java
@@ -26,7 +26,7 @@
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.manager.topology.receiver.StratosManagerTopologyEventReceiver;
import org.apache.stratos.manager.utils.CartridgeConfigFileReader;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
import org.apache.stratos.messaging.util.Constants;
import org.osgi.service.component.ComponentContext;
@@ -65,7 +65,6 @@
protected void activate(ComponentContext componentContext) throws Exception {
try {
CartridgeConfigFileReader.readProperties();
- DataHolder.setEventPublisher(new EventPublisher(Constants.INSTANCE_NOTIFIER_TOPIC));
// Schedule complete tenant event synchronizer
if(log.isDebugEnabled()) {
@@ -172,6 +171,9 @@
}
protected void deactivate(ComponentContext context) {
+ // Close event publisher connections to message broker
+ EventPublisherPool.close(Constants.INSTANCE_NOTIFIER_TOPIC);
+ EventPublisherPool.close(Constants.TENANT_TOPIC);
//terminate Stratos Manager Topology Receiver
stratosManagerTopologyEventReceiver.terminate();
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
index 496a54c..07f21de 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/DataHolder.java
@@ -20,7 +20,6 @@
package org.apache.stratos.manager.internal;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.wso2.carbon.registry.core.service.RegistryService;
import org.wso2.carbon.user.core.service.RealmService;
import org.wso2.carbon.utils.CarbonUtils;
@@ -34,8 +33,6 @@
private static RealmService realmService;
private static RegistryService registryService;
- //private static TopologyManagementService topologyMgtService;
- private static EventPublisher eventPublisher;
public static RealmService getRealmService() {
return realmService;
@@ -70,13 +67,4 @@
public static void setRegistryService(RegistryService registryService) {
DataHolder.registryService = registryService;
}
-
- public static EventPublisher getEventPublisher() {
- return eventPublisher;
- }
-
- public static void setEventPublisher(EventPublisher eventPublisher) {
- DataHolder.eventPublisher = eventPublisher;
- }
-
}
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
index f8aea72..e10d4ff 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/InstanceNotificationPublisher.java
@@ -20,13 +20,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.manager.internal.DataHolder;
import org.apache.stratos.manager.repository.Repository;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
-import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent;
+import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
+import org.apache.stratos.messaging.util.Constants;
/**
* Creating the relevant instance notification event and publish it to the instances.
@@ -38,7 +39,7 @@
}
private void publish(Event event) {
- EventPublisher depsyncEventPublisher = DataHolder.getEventPublisher();
+ EventPublisher depsyncEventPublisher = EventPublisherPool.getPublisher(Constants.INSTANCE_NOTIFIER_TOPIC);
depsyncEventPublisher.publish(event);
}
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
index 2df631a..8213ed9 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantEventPublisher.java
@@ -25,6 +25,7 @@
import org.apache.stratos.common.exception.StratosException;
import org.apache.stratos.common.listeners.TenantMgtListener;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent;
import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent;
@@ -49,7 +50,7 @@
}
Tenant tenant = new Tenant(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
TenantCreatedEvent event = new TenantCreatedEvent(tenant);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
}
catch (Exception e) {
@@ -64,7 +65,7 @@
log.info(String.format("Publishing tenant updated event: [tenant-id] %d [tenant-domain] %s", tenantInfo.getTenantId(), tenantInfo.getTenantDomain()));
}
TenantUpdatedEvent event = new TenantUpdatedEvent(tenantInfo.getTenantId(), tenantInfo.getTenantDomain());
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
}
catch (Exception e) {
@@ -79,7 +80,7 @@
log.info(String.format("Publishing tenant removed event: [tenant-id] %d", tenantId));
}
TenantRemovedEvent event = new TenantRemovedEvent(tenantId);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
}
catch (Exception e) {
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
index af5cd5f..3eac3f5 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/publisher/TenantSynzhronizerTask.java
@@ -25,6 +25,7 @@
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.manager.subscription.CartridgeSubscription;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
import org.apache.stratos.messaging.util.Constants;
@@ -81,7 +82,7 @@
tenants.add(tenant);
}
CompleteTenantEvent event = new CompleteTenantEvent(tenants);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
} catch (Exception e) {
if (log.isErrorEnabled()) {
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
index cd50fd8..a5c5517 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/subscription/utils/CartridgeSubscriptionUtils.java
@@ -39,6 +39,7 @@
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
import org.apache.stratos.manager.subscriber.Subscriber;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
import org.apache.stratos.messaging.util.Constants;
@@ -166,7 +167,7 @@
log.info(String.format("Publishing tenant subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
}
TenantSubscribedEvent subscribedEvent = new TenantSubscribedEvent(tenantId, serviceName);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(subscribedEvent);
} catch (Exception e) {
if (log.isErrorEnabled()) {
@@ -196,7 +197,7 @@
log.info(String.format("Publishing tenant un-subscribed event: [tenant-id] %d [service] %s", tenantId, serviceName));
}
TenantUnSubscribedEvent event = new TenantUnSubscribedEvent(tenantId, serviceName);
- EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
} catch (Exception e) {
if (log.isErrorEnabled()) {
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
index 1e11142..5d39956 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisher.java
@@ -34,7 +34,7 @@
/**
* @param topicName topic name of this publisher instance.
*/
- public EventPublisher(String topicName) {
+ EventPublisher(String topicName) {
super(topicName);
}
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
new file mode 100644
index 0000000..175d09b
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/EventPublisherPool.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.broker.publish;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Event publisher instance pool.
+ */
+public class EventPublisherPool {
+ private static final Log log = LogFactory.getLog(EventPublisherPool.class);
+ private static Map<String, EventPublisher> topicNameEventPublisherMap = new HashMap<String, EventPublisher>();
+
+ public static EventPublisher getPublisher(String topicName) {
+ synchronized (EventPublisherPool.class) {
+ if(topicNameEventPublisherMap.containsKey(topicName)) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Event publisher fetched from pool: [topic] %s", topicName));
+ }
+ return topicNameEventPublisherMap.get(topicName);
+ }
+ EventPublisher eventPublisher = new EventPublisher(topicName);
+ topicNameEventPublisherMap.put(topicName, eventPublisher);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Event publisher instance created: [topic] %s", topicName));
+ }
+ return eventPublisher;
+ }
+ }
+
+ public static void close(String topicName) {
+ synchronized (EventPublisherPool.class) {
+ if(topicNameEventPublisherMap.containsKey(topicName)) {
+ topicNameEventPublisherMap.get(topicName).close();
+ topicNameEventPublisherMap.remove(topicName);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Event publisher closed and removed from pool: [topic] %s", topicName));
+ }
+ }
+ else {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Event publisher instance not found in pool: [topic] %s", topicName));
+ }
+ }
+ }
+ }
+}
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
index 6614e75..004be13 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/broker/publish/TopicPublisher.java
@@ -19,18 +19,18 @@
package org.apache.stratos.messaging.broker.publish;
-import java.util.Enumeration;
-import java.util.Properties;
-
-import javax.jms.*;
-
+import com.google.gson.Gson;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.connect.TopicConnector;
import org.apache.stratos.messaging.publish.MessagePublisher;
-import com.google.gson.Gson;
-import org.apache.stratos.messaging.util.Constants;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import java.util.Enumeration;
+import java.util.Properties;
/**
* Any instance who needs to publish data to a topic, should communicate with
@@ -53,7 +53,7 @@
* @param aTopicName
* topic name of this publisher instance.
*/
- public TopicPublisher(String aTopicName) {
+ TopicPublisher(String aTopicName) {
super(aTopicName);
connector = new TopicConnector();
}