blob: 23c83eec71c848e38371ea380b905dceea2646af [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
///*
// *
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing,
// * software distributed under the License is distributed on an
// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// * KIND, either express or implied. See the License for the
// * specific language governing permissions and limitations
// * under the License.
// *
// */
//
//package org.apache.airavata.messaging.core.impl;
//
//
//import com.rabbitmq.client.*;
//import org.apache.airavata.common.exception.AiravataException;
//import org.apache.airavata.common.exception.ApplicationSettingsException;
//import org.apache.airavata.common.utils.AiravataUtils;
//import org.apache.airavata.common.utils.ServerSettings;
//import org.apache.airavata.common.utils.ThriftUtils;
//import org.apache.airavata.messaging.core.Subscriber;
//import org.apache.airavata.messaging.core.MessageContext;
//import org.apache.airavata.messaging.core.MessageHandler;
//import org.apache.airavata.messaging.core.MessagingConstants;
//import org.apache.airavata.model.messaging.event.*;
//import org.apache.thrift.TBase;
//import org.apache.thrift.TException;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//
//import javax.annotation.Nonnull;
//import java.io.IOException;
//import java.util.ArrayList;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//
//public class RabbitMQStatusSubscriber implements Subscriber {
// public static final String EXCHANGE_TYPE = "topic";
// private static Logger log = LoggerFactory.getLogger(RabbitMQStatusSubscriber.class);
//
// private String exchangeName;
// private String url;
// private Connection connection;
// private Channel channel;
// private int prefetchCount;
// private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
//
// public RabbitMQStatusSubscriber() throws AiravataException {
// try {
// url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
// exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
// prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
// createConnection();
// } catch (ApplicationSettingsException e) {
// String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
// log.error(message, e);
// throw new AiravataException(message, e);
// }
// }
//
// public RabbitMQStatusSubscriber(String brokerUrl, String exchangeName) throws AiravataException {
// this.exchangeName = exchangeName;
// this.url = brokerUrl;
//
// createConnection();
// }
//
// private void createConnection() throws AiravataException {
// try {
// ConnectionFactory connectionFactory = new ConnectionFactory();
// connectionFactory.setUri(url);
// connectionFactory.setAutomaticRecoveryEnabled(true);
// connection = connectionFactory.newConnection();
// connection.addShutdownListener(new ShutdownListener() {
// public void shutdownCompleted(ShutdownSignalException cause) {
// }
// });
// log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
//
// channel = connection.createChannel();
// channel.basicQos(prefetchCount);
// channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, false);
//
// } catch (Exception e) {
// String msg = "could not open channel for exchange " + exchangeName;
// log.error(msg);
// throw new AiravataException(msg, e);
// }
// }
//
// public String listen(final MessageHandler handler) throws AiravataException {
// try {
// Map<String, Object> props = handler.getProperties();
// final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
// if (routing == null) {
// throw new IllegalArgumentException("The routing key must be present");
// }
//
// List<String> keys = new ArrayList<String>();
// if (routing instanceof List) {
// for (Object o : (List)routing) {
// keys.add(o.toString());
// }
// } else if (routing instanceof String) {
// keys.add((String) routing);
// }
//
// String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
// String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
// if (queueName == null) {
// if (!channel.isOpen()) {
// channel = connection.createChannel();
// channel.exchangeDeclare(exchangeName, "topic", false);
// }
// queueName = channel.queueDeclare().getQueue();
// } else {
// channel.queueDeclare(queueName, true, false, false, null);
// }
//
// final String id = getId(keys, queueName);
// if (queueDetailsMap.containsKey(id)) {
// throw new IllegalStateException("This subscriber is already defined for this Subscriber, " +
// "cannot define the same subscriber twice");
// }
//
// if (consumerTag == null) {
// consumerTag = "default";
// }
//
// // bind all the routing keys
// for (String routingKey : keys) {
// channel.queueBind(queueName, exchangeName, routingKey);
// }
//
// channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
// @Override
// public void handleDelivery(String consumerTag,
// Envelope envelope,
// AMQP.BasicProperties properties,
// byte[] body) {
// Message message = new Message();
//
// try {
// ThriftUtils.createThriftFromBytes(body, message);
// TBase event = null;
// String gatewayId = null;
//
// if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
// ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
// ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
// log.debug(" Message Received with message id '" + message.getMessageId()
// + "' and with message type '" + message.getMessageType() + "' with status " +
// experimentStatusChangeEvent.getState());
// event = experimentStatusChangeEvent;
// gatewayId = experimentStatusChangeEvent.getGatewayId();
// } else if (message.getMessageType().equals(MessageType.PROCESS)) {
// ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent();
// ThriftUtils.createThriftFromBytes(message.getEvent(), processStatusChangeEvent);
// log.debug("Message Recieved with message id :" + message.getMessageId() + " and with " +
// "message type " + message.getMessageType() + " with status " +
// processStatusChangeEvent.getState());
// event = processStatusChangeEvent;
// gatewayId = processStatusChangeEvent.getProcessIdentity().getGatewayId();
// } else if (message.getMessageType().equals(MessageType.TASK)) {
// TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
// ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
// log.debug(" Message Received with message id '" + message.getMessageId()
// + "' and with message type '" + message.getMessageType() + "' with status " +
// taskStatusChangeEvent.getState());
// event = taskStatusChangeEvent;
// gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
// }else if (message.getMessageType() == MessageType.PROCESSOUTPUT) {
// TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent();
// ThriftUtils.createThriftFromBytes(message.getEvent(), taskOutputChangeEvent);
// log.debug(" Message Received with message id '" + message.getMessageId() + "' and with message type '" + message.getMessageType());
// event = taskOutputChangeEvent;
// gatewayId = taskOutputChangeEvent.getTaskIdentity().getGatewayId();
// } else if (message.getMessageType().equals(MessageType.JOB)) {
// JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
// ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
// log.debug(" Message Received with message id '" + message.getMessageId()
// + "' and with message type '" + message.getMessageType() + "' with status " +
// jobStatusChangeEvent.getState());
// event = jobStatusChangeEvent;
// gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
// } else if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
// TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
// ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
// log.debug(" Message Received with message id '" + message.getMessageId()
// + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
// taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
// event = taskSubmitEvent;
// gatewayId = taskSubmitEvent.getGatewayId();
// } else if (message.getMessageType().equals(MessageType.TERMINATEPROCESS)) {
// TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
// ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
// log.debug(" Message Received with message id '" + message.getMessageId()
// + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
// taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
// event = taskTerminateEvent;
// gatewayId = null;
// }
// MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
// messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
// messageContext.setIsRedeliver(envelope.isRedeliver());
// handler.onMessage(messageContext);
// } catch (TException e) {
// String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
// log.warn(msg, e);
// }
// }
// });
// // save the name for deleting the queue
// queueDetailsMap.put(id, new QueueDetails(queueName, keys));
// return id;
// } catch (Exception e) {
// String msg = "could not open channel for exchange " + exchangeName;
// log.error(msg);
// throw new AiravataException(msg, e);
// }
// }
//
// public void stopListen(final String id) throws AiravataException {
// QueueDetails details = queueDetailsMap.get(id);
// if (details != null) {
// try {
// for (String key : details.getRoutingKeys()) {
// channel.queueUnbind(details.getQueueName(), exchangeName, key);
// }
// channel.queueDelete(details.getQueueName(), true, true);
// } catch (IOException e) {
// String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
// log.debug(msg);
// }
// }
// }
//
// /**
// * Private class for holding some information about the consumers registered
// */
// private class QueueDetails {
// String queueName;
//
// List<String> routingKeys;
//
// private QueueDetails(String queueName, List<String> routingKeys) {
// this.queueName = queueName;
// this.routingKeys = routingKeys;
// }
//
// public String getQueueName() {
// return queueName;
// }
//
// public List<String> getRoutingKeys() {
// return routingKeys;
// }
// }
//
// private String getId(List<String> routingKeys, String queueName) {
// String id = "";
// for (String key : routingKeys) {
// id = id + "_" + key;
// }
// return id + "_" + queueName;
// }
//
// public void close() {
// if (connection != null) {
// try {
// connection.close();
// } catch (IOException ignore) {
// }
// }
// }
//}