registry of message busses
diff --git a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
index fb0e878..ef6adab 100644
--- a/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
+++ b/core/src/main/resources/META-INF/cloudstack/compute/spring-core-lifecycle-compute-context-inheritable.xml
@@ -40,6 +40,11 @@
</bean>
<bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
+ <property name="registry" ref="eventBussesRegistry" />
+ <property name="typeClass" value="org.apache.cloudstack.framework.events.EventBus" />
+ </bean>
+
+ <bean class="org.apache.cloudstack.spring.lifecycle.registry.RegistryLifecycle">
<property name="registry" ref="hypervisorGurusRegistry" />
<property name="typeClass" value="com.cloud.hypervisor.HypervisorGuru" />
</bean>
diff --git a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
index a36d124..bfe722f 100644
--- a/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
+++ b/core/src/main/resources/META-INF/cloudstack/core/spring-core-registry-core-context.xml
@@ -287,11 +287,16 @@
<property name="excludeKey" value="api.commands.exclude" />
</bean>
+ <bean id="eventBussesRegistry"
+ class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
+ <property name="excludeKey" value="event.busses.exclude" />
+ </bean>
+
<bean id="hypervisorGurusRegistry"
- class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
+ class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="hypervisor.gurus.exclude" />
</bean>
-
+
<bean id="vpcProvidersRegistry"
class="org.apache.cloudstack.spring.lifecycle.registry.ExtensionRegistry">
<property name="excludeKey" value="vpc.providers.exclude" />
diff --git a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
index 1e1251d..1686935 100644
--- a/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
+++ b/engine/components-api/src/main/java/com/cloud/network/NetworkStateListener.java
@@ -24,16 +24,13 @@
import javax.inject.Inject;
+import com.cloud.utils.component.ComponentContext;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.apache.cloudstack.framework.events.EventDistributor;
import com.cloud.event.EventCategory;
import com.cloud.network.Network.Event;
import com.cloud.network.Network.State;
-import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
@@ -42,14 +39,16 @@
@Inject
private ConfigurationDao _configDao;
- private static EventBus s_eventBus = null;
-
- private static final Logger s_logger = Logger.getLogger(NetworkStateListener.class);
+ private EventDistributor eventDistributor;
public NetworkStateListener(ConfigurationDao configDao) {
_configDao = configDao;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, Network vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -66,36 +65,30 @@
}
private void pubishOnEventBus(String event, String status, Network vo, State oldState, State newState) {
+ String configKey = "publish.resource.state.events";
+ String value = _configDao.getValue(configKey);
+ boolean configValue = Boolean.parseBoolean(value);
+ if(!configValue)
+ return;
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
+ }
- String configKey = "publish.resource.state.events";
- String value = _configDao.getValue(configKey);
- boolean configValue = Boolean.parseBoolean(value);
- if(!configValue)
- return;
- try {
- s_eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so just return
- }
+ String resourceName = getEntityFromClassName(Network.class.getName());
+ org.apache.cloudstack.framework.events.Event eventMsg =
+ new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
+ Map<String, String> eventDescription = new HashMap<>();
+ eventDescription.put("resource", resourceName);
+ eventDescription.put("id", vo.getUuid());
+ eventDescription.put("old-state", oldState.name());
+ eventDescription.put("new-state", newState.name());
- String resourceName = getEntityFromClassName(Network.class.getName());
- org.apache.cloudstack.framework.events.Event eventMsg =
- new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName, vo.getUuid());
- Map<String, String> eventDescription = new HashMap<String, String>();
- eventDescription.put("resource", resourceName);
- eventDescription.put("id", vo.getUuid());
- eventDescription.put("old-state", oldState.name());
- eventDescription.put("new-state", newState.name());
+ String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
+ eventDescription.put("eventDateTime", eventDate);
- String eventDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z").format(new Date());
- eventDescription.put("eventDateTime", eventDate);
+ eventMsg.setDescription(eventDescription);
- eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (EventBusException e) {
- s_logger.warn("Failed to publish state change event on the event bus.");
- }
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {
diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
index 4a3eaf9..bd1ea2a 100644
--- a/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
+++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/Event.java
@@ -31,11 +31,11 @@
String description;
public Event(String eventSource, String eventCategory, String eventType, String resourceType, String resourceUUID) {
- this.eventCategory = eventCategory;
- this.eventType = eventType;
- this.eventSource = eventSource;
- this.resourceType = resourceType;
- this.resourceUUID = resourceUUID;
+ setEventCategory(eventCategory);
+ setEventType(eventType);
+ setEventSource(eventSource);
+ setResourceType(resourceType);
+ setResourceUUID(resourceUUID);
}
public String getEventCategory() {
@@ -68,7 +68,7 @@
public void setDescription(Object message) {
Gson gson = new Gson();
- this.description = gson.toJson(message).toString();
+ this.description = gson.toJson(message);
}
public void setDescription(String description) {
diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
new file mode 100644
index 0000000..4f47753
--- /dev/null
+++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.Manager;
+
+import java.util.List;
+
+public interface EventDistributor extends Manager {
+ /**
+ * publish an event on to the event busses
+ *
+ * @param event event that needs to be published on the event bus
+ */
+ List<EventBusException> publish(Event event);
+}
diff --git a/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
new file mode 100644
index 0000000..e92d36b
--- /dev/null
+++ b/framework/events/src/main/java/org/apache/cloudstack/framework/events/EventDistributorImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.ManagerBase;
+import org.apache.log4j.Logger;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventDistributorImpl extends ManagerBase implements EventDistributor {
+ private static final Logger LOGGER = Logger.getLogger(EventDistributorImpl.class);
+
+ public void setEventBusses(List<EventBus> eventBusses) {
+ this.eventBusses = eventBusses;
+ }
+
+ List<EventBus> eventBusses;
+
+ @PostConstruct
+ public void init() {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(String.format("testing %d event busses", eventBusses.size()));
+ }
+ publish(new Event("server", "NONE","starting", "server", "NONE"));
+ }
+
+ @Override
+ public List<EventBusException> publish(Event event) {
+ LOGGER.info(String.format("publishing %s to %d event busses", (event == null ? "<none>" : event.getDescription()), eventBusses.size()));
+ List<EventBusException> exceptions = new ArrayList<>();
+ if (event == null) {
+ return exceptions;
+ }
+ for (EventBus bus : eventBusses) {
+ try {
+ bus.publish(event);
+ } catch (EventBusException e) {
+ LOGGER.warn(String.format("no publish for bus %s of event %s", bus.getClass().getName(), event.getDescription()));
+ exceptions.add(e);
+ }
+ }
+ return exceptions;
+ }
+
+}
diff --git a/framework/events/src/main/resources/META-INF.cloudstack.core/spring-framework-event-core-context.xml b/framework/events/src/main/resources/META-INF.cloudstack.core/spring-framework-event-core-context.xml
new file mode 100644
index 0000000..45eb666
--- /dev/null
+++ b/framework/events/src/main/resources/META-INF.cloudstack.core/spring-framework-event-core-context.xml
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xmlns:aop="http://www.springframework.org/schema/aop"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
+ http://www.springframework.org/schema/context
+ http://www.springframework.org/schema/context/spring-context.xsd"
+>
+ <bean id="eventDistributor"
+ class="org.apache.cloudstack.framework.events.EventDistributorImpl" >
+ <property name="eventBusses"
+ value="#{eventBussesRegistry.registered}" />
+ </bean>
+</beans>
diff --git a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
index b7d74df..5538a50 100644
--- a/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
+++ b/plugins/event-bus/inmemory/src/main/java/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java
@@ -62,6 +62,10 @@
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
+ }
+
UUID subscriberId = UUID.randomUUID();
subscribers.put(subscriberId, new Pair<EventTopic, EventSubscriber>(topic, subscriber));
@@ -70,6 +74,9 @@
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
if (subscriberId == null) {
throw new EventBusException("Cannot unregister a null subscriberId.");
}
@@ -87,7 +94,11 @@
@Override
public void publish(Event event) throws EventBusException {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
+ }
if (subscribers == null || subscribers.isEmpty()) {
+ s_logger.trace("no subscribers, no publish");
return; // no subscriber to publish to, so just return
}
diff --git a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
index 17a58a5..7d48a39 100644
--- a/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
+++ b/plugins/event-bus/kafka/src/main/java/org/apache/cloudstack/mom/kafka/KafkaEventBus.java
@@ -89,19 +89,29 @@
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
+ }
+
/* NOOP */
return UUID.randomUUID();
}
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
/* NOOP */
}
@Override
public void publish(Event event) throws EventBusException {
- ProducerRecord<String, String> record = new ProducerRecord<String,String>(_topic, event.getResourceUUID(), event.getDescription());
- _producer.send(record);
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
+ }
+ ProducerRecord<String, String> newRecord = new ProducerRecord<>(_topic, event.getResourceUUID(), event.getDescription());
+ _producer.send(newRecord);
}
@Override
diff --git a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
index f54c769..5e5589a 100644
--- a/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
+++ b/plugins/event-bus/rabbitmq/src/main/java/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -187,11 +187,14 @@
*/
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
-
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("subscribing \'%s\' to events of type \'%s\' from \'%s\'",subscriber.toString(), topic.getEventType(), topic.getEventSource()));
+ }
+
// create a UUID, that will be used for managing subscriptions and also used as queue name
// for on the queue used for the subscriber on the AMQP broker
UUID queueId = UUID.randomUUID();
@@ -252,6 +255,9 @@
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("unsubscribing \'%s\'",subscriberId));
+ }
try {
String classname = subscriber.getClass().getName();
String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
@@ -267,6 +273,9 @@
// publish event on to the exchange created on AMQP server
@Override
public void publish(Event event) throws EventBusException {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace(String.format("publish \'%s\'", event.getDescription()));
+ }
String routingKey = createRoutingKey(event);
String eventDescription = event.getDescription();
diff --git a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
index 78ec013..d6a2865 100644
--- a/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
+++ b/plugins/network-elements/juniper-contrail/src/main/java/org/apache/cloudstack/network/contrail/management/EventUtils.java
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
@@ -42,15 +43,21 @@
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ComponentMethodInterceptor;
+
@Component
public class EventUtils {
private static final Logger s_logger = Logger.getLogger(EventUtils.class);
+ private static EventDistributor eventDistributor;
protected static EventBus s_eventBus = null;
public EventUtils() {
}
+ public static void setEventDistributor(EventDistributor eventDistributorImpl) {
+ eventDistributor = eventDistributorImpl;
+ }
+
private static void publishOnMessageBus(String eventCategory, String eventType, String details, Event.State state) {
if (state != com.cloud.event.Event.State.Completed) {
@@ -58,6 +65,7 @@
}
try {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
s_eventBus = ComponentContext.getComponent(EventBus.class);
} catch (NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
@@ -66,18 +74,16 @@
org.apache.cloudstack.framework.events.Event event =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, eventCategory, eventType, EventTypes.getEntityForEvent(eventType), null);
- Map<String, String> eventDescription = new HashMap<String, String>();
+ Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("event", eventType);
eventDescription.put("status", state.toString());
eventDescription.put("details", details);
event.setDescription(eventDescription);
- try {
- s_eventBus.publish(event);
- } catch (EventBusException evx) {
- String errMsg = "Failed to publish contrail event.";
- s_logger.warn(errMsg, evx);
+ List<EventBusException> exceptions = eventDistributor.publish(event);
+ for (EventBusException ex : exceptions) {
+ String errMsg = "Failed to publish event.";
+ s_logger.warn(errMsg, ex);
}
-
}
public static class EventInterceptor implements ComponentMethodInterceptor, MethodInterceptor {
@@ -118,7 +124,7 @@
}
protected List<ActionEvent> getActionEvents(Method m) {
- List<ActionEvent> result = new ArrayList<ActionEvent>();
+ List<ActionEvent> result = new ArrayList<>();
ActionEvents events = m.getAnnotation(ActionEvents.class);
diff --git a/server/src/main/java/com/cloud/api/ApiServer.java b/server/src/main/java/com/cloud/api/ApiServer.java
index b602ed2..0310fa3 100644
--- a/server/src/main/java/com/cloud/api/ApiServer.java
+++ b/server/src/main/java/com/cloud/api/ApiServer.java
@@ -95,8 +95,8 @@
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
-import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
+import org.apache.cloudstack.framework.events.EventDistributor;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
@@ -134,7 +134,6 @@
import org.apache.http.protocol.ResponseServer;
import org.apache.log4j.Logger;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import com.cloud.api.dispatch.DispatchChainFactory;
@@ -198,25 +197,25 @@
private static final String CONTROL_CHARACTERS = "[\000-\011\013-\014\016-\037\177]";
@Inject
+ private AccountManager accountMgr;
+ @Inject
+ private APIAuthenticationManager authManager;
+ @Inject
private ApiDispatcher dispatcher;
@Inject
- private DispatchChainFactory dispatchChainFactory;
+ private AsyncJobManager asyncMgr;
@Inject
- private AccountManager accountMgr;
+ private DispatchChainFactory dispatchChainFactory;
@Inject
private DomainManager domainMgr;
@Inject
private DomainDao domainDao;
@Inject
- private UUIDManager uuidMgr;
- @Inject
- private AsyncJobManager asyncMgr;
- @Inject
private EntityManager entityMgr;
@Inject
- private APIAuthenticationManager authManager;
- @Inject
private ProjectDao projectDao;
+ @Inject
+ private UUIDManager uuidMgr;
private List<PluggableService> pluggableServices;
@@ -225,6 +224,7 @@
@Inject
private ApiAsyncJobDispatcher asyncDispatcher;
+ private EventDistributor eventDistributor = null;
private static int s_workerCount = 0;
private static Map<String, List<Class<?>>> s_apiNameCmdClassMap = new HashMap<String, List<Class<?>>>();
@@ -291,6 +291,10 @@
return true;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@MessageHandler(topic = AsyncJob.Topics.JOB_EVENT_PUBLISH)
public void handleAsyncJobPublishEvent(String subject, String senderAddress, Object args) {
assert (args != null);
@@ -302,12 +306,8 @@
if (s_logger.isTraceEnabled())
s_logger.trace("Handle asyjob publish event " + jobEvent);
-
- EventBus eventBus = null;
- try {
- eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so just return
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
if (!job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher")) {
@@ -320,7 +320,7 @@
// Get the event type from the cmdInfo json string
String info = job.getCmdInfo();
String cmdEventType = "unknown";
- Map<String, Object> cmdInfoObj = new HashMap<String, Object>();
+ Map<String, Object> cmdInfoObj = new HashMap<>();
if (info != null) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> cmdInfo = ApiGsonHelper.getBuilder().create().fromJson(info, type);
@@ -348,7 +348,7 @@
org.apache.cloudstack.framework.events.Event event = new org.apache.cloudstack.framework.events.Event("management-server", EventCategory.ASYNC_JOB_CHANGE_EVENT.getName(),
jobEvent, instanceType, instanceUuid);
- Map<String, Object> eventDescription = new HashMap<String, Object>();
+ Map<String, Object> eventDescription = new HashMap<>();
eventDescription.put("command", job.getCmd());
eventDescription.put("user", userJobOwner.getUuid());
eventDescription.put("account", jobOwner.getUuid());
@@ -369,12 +369,10 @@
eventDescription.put("domainname", domain.getName());
}
event.setDescription(eventDescription);
-
- try {
- eventBus.publish(event);
- } catch (EventBusException evx) {
- String errMsg = "Failed to publish async job event on the event bus.";
- s_logger.warn(errMsg, evx);
+ List<EventBusException> exceptions = eventDistributor.publish(event);
+ for (EventBusException ex : exceptions) {
+ String errMsg = "Failed to publish event.";
+ s_logger.warn(errMsg, ex);
}
}
diff --git a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
index a5f1f9f..03c2f48 100644
--- a/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
+++ b/server/src/main/java/com/cloud/hypervisor/HypervisorGuruManagerImpl.java
@@ -40,7 +40,7 @@
HostDao _hostDao;
List<HypervisorGuru> _hvGuruList;
- Map<HypervisorType, HypervisorGuru> _hvGurus = new ConcurrentHashMap<HypervisorType, HypervisorGuru>();
+ Map<HypervisorType, HypervisorGuru> _hvGurus = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
diff --git a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
index c68b05c..8133547 100644
--- a/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
+++ b/server/src/main/java/com/cloud/storage/listener/SnapshotStateListener.java
@@ -25,13 +25,11 @@
import javax.annotation.PostConstruct;
import javax.inject.Inject;
-import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.events.EventDistributor;
+
import com.cloud.configuration.Config;
import com.cloud.event.EventCategory;
import com.cloud.server.ManagementService;
@@ -46,13 +44,12 @@
@Component
public class SnapshotStateListener implements StateListener<State, Event, SnapshotVO> {
- protected static EventBus s_eventBus = null;
protected static ConfigurationDao s_configDao;
@Inject
private ConfigurationDao configDao;
- private static final Logger s_logger = Logger.getLogger(SnapshotStateListener.class);
+ private EventDistributor eventDistributor = null;
public SnapshotStateListener() {
@@ -63,6 +60,10 @@
s_configDao = configDao;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, SnapshotVO vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -83,17 +84,15 @@
if(!configValue) {
return;
}
- try {
- s_eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so just return
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
String resourceName = getEntityFromClassName(Snapshot.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
vo.getUuid());
- Map<String, String> eventDescription = new HashMap<String, String>();
+ Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
@@ -103,11 +102,7 @@
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (EventBusException e) {
- s_logger.warn("Failed to publish state change event on the event bus.");
- }
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {
diff --git a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
index d2a4dc9..2177afe 100644
--- a/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
+++ b/server/src/main/java/com/cloud/storage/listener/VolumeStateListener.java
@@ -22,41 +22,40 @@
import java.util.HashMap;
import java.util.Map;
-import com.cloud.event.EventTypes;
-import com.cloud.event.UsageEventUtils;
-import com.cloud.utils.fsm.StateMachine2;
-import com.cloud.vm.VMInstanceVO;
-import com.cloud.vm.VirtualMachine;
-import com.cloud.vm.dao.VMInstanceDao;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-
-import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-import org.apache.cloudstack.framework.events.EventBusException;
-
import com.cloud.configuration.Config;
import com.cloud.event.EventCategory;
+import com.cloud.event.EventTypes;
+import com.cloud.event.UsageEventUtils;
import com.cloud.server.ManagementService;
import com.cloud.storage.Volume;
import com.cloud.storage.Volume.Event;
import com.cloud.storage.Volume.State;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
+import com.cloud.utils.fsm.StateMachine2;
+import com.cloud.vm.VMInstanceVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.VMInstanceDao;
+
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.events.EventDistributor;
public class VolumeStateListener implements StateListener<State, Event, Volume> {
- protected static EventBus s_eventBus = null;
protected ConfigurationDao _configDao;
protected VMInstanceDao _vmInstanceDao;
- private static final Logger s_logger = Logger.getLogger(VolumeStateListener.class);
+ private EventDistributor eventDistributor;
public VolumeStateListener(ConfigurationDao configDao, VMInstanceDao vmInstanceDao) {
this._configDao = configDao;
this._vmInstanceDao = vmInstanceDao;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, Volume vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -92,23 +91,21 @@
return true;
}
- private void pubishOnEventBus(String event, String status, Volume vo, State oldState, State newState) {
+ private void pubishOnEventBus(String event, String status, Volume vo, State oldState, State newState) {
String configKey = Config.PublishResourceStateEvent.key();
String value = _configDao.getValue(configKey);
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
- try {
- s_eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so just return
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
String resourceName = getEntityFromClassName(Volume.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
- vo.getUuid());
+ vo.getUuid());
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
@@ -119,11 +116,7 @@
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (EventBusException e) {
- s_logger.warn("Failed to state change event on the event bus.");
- }
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {
diff --git a/server/src/main/java/com/cloud/vm/UserVmStateListener.java b/server/src/main/java/com/cloud/vm/UserVmStateListener.java
index e9f7e7c..8d39727 100644
--- a/server/src/main/java/com/cloud/vm/UserVmStateListener.java
+++ b/server/src/main/java/com/cloud/vm/UserVmStateListener.java
@@ -24,15 +24,6 @@
import javax.inject.Inject;
-import com.cloud.server.ManagementService;
-import com.cloud.utils.fsm.StateMachine2;
-import com.cloud.vm.dao.UserVmDao;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-
-import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
-import org.apache.cloudstack.framework.events.EventBus;
-
import com.cloud.configuration.Config;
import com.cloud.event.EventCategory;
import com.cloud.event.EventTypes;
@@ -41,11 +32,17 @@
import com.cloud.network.dao.NetworkDao;
import com.cloud.network.dao.NetworkVO;
import com.cloud.service.dao.ServiceOfferingDao;
+import com.cloud.server.ManagementService;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
+import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.VirtualMachine.Event;
import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.dao.NicDao;
+import com.cloud.vm.dao.UserVmDao;
+
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.events.EventDistributor;
public class UserVmStateListener implements StateListener<State, VirtualMachine.Event, VirtualMachine> {
@@ -56,9 +53,7 @@
@Inject protected UserVmDao _userVmDao;
@Inject protected UserVmManager _userVmMgr;
@Inject protected ConfigurationDao _configDao;
- private static final Logger s_logger = Logger.getLogger(UserVmStateListener.class);
-
- protected static EventBus s_eventBus = null;
+ private EventDistributor eventDistributor;
public UserVmStateListener(UsageEventDao usageEventDao, NetworkDao networkDao, NicDao nicDao, ServiceOfferingDao offeringDao, UserVmDao userVmDao, UserVmManager userVmMgr,
ConfigurationDao configDao) {
@@ -71,6 +66,10 @@
this._configDao = configDao;
}
+ public void setEventDistributor(EventDistributor eventDistributor) {
+ this.eventDistributor = eventDistributor;
+ }
+
@Override
public boolean preStateTransitionEvent(State oldState, Event event, State newState, VirtualMachine vo, boolean status, Object opaque) {
pubishOnEventBus(event.name(), "preStateTransitionEvent", vo, oldState, newState);
@@ -128,17 +127,15 @@
boolean configValue = Boolean.parseBoolean(value);
if(!configValue)
return;
- try {
- s_eventBus = ComponentContext.getComponent(EventBus.class);
- } catch (NoSuchBeanDefinitionException nbe) {
- return; // no provider is configured to provide events bus, so just return
+ if (eventDistributor == null) {
+ setEventDistributor(ComponentContext.getComponent(EventDistributor.class));
}
String resourceName = getEntityFromClassName(VirtualMachine.class.getName());
org.apache.cloudstack.framework.events.Event eventMsg =
new org.apache.cloudstack.framework.events.Event(ManagementService.Name, EventCategory.RESOURCE_STATE_CHANGE_EVENT.getName(), event, resourceName,
vo.getUuid());
- Map<String, String> eventDescription = new HashMap<String, String>();
+ Map<String, String> eventDescription = new HashMap<>();
eventDescription.put("resource", resourceName);
eventDescription.put("id", vo.getUuid());
eventDescription.put("old-state", oldState.name());
@@ -149,12 +146,7 @@
eventDescription.put("eventDateTime", eventDate);
eventMsg.setDescription(eventDescription);
- try {
- s_eventBus.publish(eventMsg);
- } catch (org.apache.cloudstack.framework.events.EventBusException e) {
- s_logger.warn("Failed to publish state change event on the event bus.");
- }
-
+ eventDistributor.publish(eventMsg);
}
private String getEntityFromClassName(String entityClassName) {
diff --git a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
index ca3a77b..ab699cd 100644
--- a/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
+++ b/server/src/main/resources/META-INF/cloudstack/core/spring-server-core-managers-context.xml
@@ -144,6 +144,10 @@
<property name="staticNatElements" value="#{staticNatServiceProvidersRegistry.registered}" />
</bean>
+ <bean id="eventDistributor" class="org.apache.cloudstack.framework.events.EventDistributorImpl" >
+ <property name="eventBusses" value="#{eventBussesRegistry.registered}" />
+ </bean>
+
<bean id="hypervisorGuruManagerImpl" class="com.cloud.hypervisor.HypervisorGuruManagerImpl" >
<property name="hvGuruList" value="#{hypervisorGurusRegistry.registered}" />
</bean>
diff --git a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
index 8486dbf..a03d21d 100644
--- a/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
+++ b/utils/src/main/java/com/cloud/utils/component/ComponentContext.java
@@ -100,7 +100,7 @@
s_logger.info("Running SystemIntegrityChecker " + entry.getKey());
try {
entry.getValue().check();
- } catch (Throwable e) {
+ } catch (RuntimeException e) {
s_logger.error("System integrity check failed. Refuse to startup", e);
System.exit(1);
}
@@ -178,6 +178,13 @@
return (T)s_appContext.getBean(name);
}
+ /**
+ * only ever used to get the event bus
+ *
+ * @param beanType the component type to return
+ * @return one of the component registered for the requested type
+ * @param <T>
+ */
public static <T> T getComponent(Class<T> beanType) {
assert (s_appContext != null);
Map<String, T> matchedTypes = getComponentsOfType(beanType);