blob: 76d77ff3b0e178fe98f1d9151dd9a7094f1c2f37 [file] [log] [blame]
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.apache.airavata.allocation.manager.messaging;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.DBEventManagerConstants;
import org.apache.airavata.common.utils.DBEventService;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Subscriber;
import org.apache.airavata.model.dbevent.DBEventMessage;
import org.apache.airavata.model.dbevent.DBEventMessageContext;
import org.apache.airavata.model.dbevent.DBEventSubscriber;
import org.apache.airavata.model.dbevent.DBEventType;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.allocation.manager.models.AllocationManagerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
*
* @author madrinathapa
*/
public class AllocationServiceDBEventMessagingFactory {
private final static Logger log = LoggerFactory.getLogger(AllocationServiceDBEventMessagingFactory.class);
private static Publisher dbEventPublisher;
private static Subscriber allocationServiceDBEventSubscriber;
/**
* Get publisher for DB Event queue
* Change access specifier as required
* @return
* @throws AiravataException
*/
private static Publisher getDBEventPublisher() throws AiravataException {
if(null == dbEventPublisher){
synchronized (AllocationServiceDBEventMessagingFactory.class){
if(null == dbEventPublisher){
log.info("Creating DB Event publisher.....");
dbEventPublisher = MessagingFactory.getDBEventPublisher();
log.info("DB Event publisher created");
}
}
}
return dbEventPublisher;
}
public static Subscriber getDBEventSubscriber() throws AiravataException, AllocationManagerException {
if(null == allocationServiceDBEventSubscriber){
synchronized (AllocationServiceDBEventMessagingFactory.class){
if(null == allocationServiceDBEventSubscriber){
log.info("Creating DB Event publisher.....");
allocationServiceDBEventSubscriber = MessagingFactory.getDBEventSubscriber(new AllocationServiceDBEventHandler(), "allocation");
log.info("DB Event publisher created");
}
}
}
return allocationServiceDBEventSubscriber;
}
/**
* Register Allocation Manager service with stated publishers
* @param publishers
* @return
* @throws AiravataException
*/
public static boolean registerAllocationServiceWithPublishers(List<String> publishers) throws AiravataException {
for(String publisher : publishers){
log.info("Sending service discovery message. Publisher : " + publisher + ", Subscriber : " + "allocation");
DBEventSubscriber dbEventSubscriber = new DBEventSubscriber("allocation");
DBEventMessageContext dbEventMessageContext = new DBEventMessageContext();
dbEventMessageContext.setSubscriber(dbEventSubscriber);
DBEventMessage dbEventMessage = new DBEventMessage(DBEventType.SUBSCRIBER, dbEventMessageContext, publisher);
MessageContext messageContext = new MessageContext(dbEventMessage, MessageType.DB_EVENT, "", "");
getDBEventPublisher().publish(messageContext, DBEventManagerConstants.getRoutingKey(DBEventService.DB_EVENT.toString()));
}
return true;
}
}