blob: 423a164ca214d2f7ca1a3e922c3a817446b8e642 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.jms.impl;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.sling.jms.ConnectionFactoryService;
import org.apache.sling.mom.MessageFilter;
import org.apache.sling.mom.Subscriber;
import org.apache.sling.mom.TopicManager;
import org.apache.sling.mom.Types;
import org.jetbrains.annotations.NotNull;
import org.osgi.framework.ServiceReference;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides support for sending messages to topics over JMS and subscribing to topics. It uses the ConnectionFactoryService
* to interact with JMS. There is nothing in
*/
@Component(immediate = true, service = TopicManager.class)
public class JMSTopicManager implements TopicManager {
private static final Logger LOGGER = LoggerFactory.getLogger(JMSTopicManager.class);
/**
* Holds all QueueReader registrations.
*/
private final Map<ServiceReference<Subscriber>, SubscriberHolder> registrations =
new ConcurrentHashMap<ServiceReference<Subscriber>, SubscriberHolder>();
@Reference
private ConnectionFactoryService connectionFactoryService;
// A single connection is maintained open per instance of this component.
private Connection connection;
// A single session is used for listening to messages. Separate sessions are opened for sending to avoid synchronisation on sending operations.
private Session session;
@Activate
public synchronized void activate() throws JMSException {
connection = connectionFactoryService.getConnectionFactory().createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
connection.start();
}
@Deactivate
public synchronized void deactivate() throws JMSException {
for ( Map.Entry<ServiceReference<Subscriber>, SubscriberHolder> e : registrations.entrySet()) {
e.getValue().close();
}
registrations.clear();
// don't close the session, there is a bug in JMS which means an already closed session wont go quietly
// and the hook that shutsdown an embedded connection still gets fired when OSGi shutsdown even with
// a flag to prevent it. connection.stop and close are clean.
connection.stop();
connection.close();
}
@Override
public void publish(Types.TopicName name, Types.CommandName commandName, Map<String, Object> message) {
Session session = null;
try {
// use a fresh session per message.
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
TextMessage textMessage = session.createTextMessage(Json.toJson(message));
textMessage.setJMSType(JMSMessageTypes.JSON.toString());
session.createProducer(session.createTopic(name.toString())).send(textMessage);
session.commit();
session.close();
} catch (JMSException e) {
LOGGER.error("Unable to send message to queue "+name, e);
if(session != null) {
try {
session.close();
} catch (JMSException e1) {
LOGGER.warn("Unable to close session ",e1);
}
}
}
}
// Register Subscribers using OSGi Whiteboard pattern
@Reference(service = Subscriber.class,
cardinality = ReferenceCardinality.MULTIPLE,
policy = ReferencePolicy.DYNAMIC,
unbind="removeSubscriber")
public synchronized void addSubscriber(ServiceReference<Subscriber> serviceRef) {
if (registrations.containsKey(serviceRef)) {
LOGGER.error("Registration for service reference is already present {}",serviceRef);
return;
}
SubscriberHolder subscriberHolder = new SubscriberHolder(session, serviceRef.getBundle().getBundleContext().getService(serviceRef), getServiceProperties(serviceRef));
registrations.put(serviceRef, subscriberHolder);
}
private Map<String, Object> getServiceProperties(ServiceReference<Subscriber> serviceRef) {
Map<String, Object> m = new HashMap<String, Object>();
for ( String k : serviceRef.getPropertyKeys()) {
m.put(k, serviceRef.getProperty(k));
}
return Collections.unmodifiableMap(m);
}
public synchronized void removeSubscriber(ServiceReference<Subscriber> serviceRef) {
SubscriberHolder subscriberHolder = registrations.remove(serviceRef);
if (subscriberHolder != null) {
subscriberHolder.close();
}
}
private static class SubscriberHolder implements Closeable {
private final FilteredTopicSubscriber filteredTopicSubscriber;
public SubscriberHolder(Session session, Subscriber subscriber, Map<String, Object> properties) {
try {
LOGGER.info("Creating Subscriber holder for {} ", subscriber.getClass());
String[] topicNames = (String[]) properties.get(Subscriber.TOPIC_NAMES_PROP);
if ( topicNames == null || topicNames.length == 0) {
throw new IllegalArgumentException("At least one valid topic name in property " + Subscriber.TOPIC_NAMES_PROP + " is required for Subscriber registration");
}
if ( subscriber instanceof MessageFilter) {
filteredTopicSubscriber = new FilteredTopicSubscriber(session, subscriber, topicNames, (MessageFilter)subscriber);
} else {
filteredTopicSubscriber = new FilteredTopicSubscriber(session, subscriber, topicNames, new MessageFilter() {
@Override
public boolean accept(Types.Name name, Map<String, Object> mapMessage) {
return true;
}
});
}
} catch (JMSException e) {
throw new IllegalArgumentException("Unable to register QueueReader with JMS ",e);
}
}
@Override
public void close() {
try {
filteredTopicSubscriber.close();
} catch (IOException e) {
LOGGER.warn("Unable to close topic subscriber {} ", e);
}
}
}
/**
* This listens to topic messages, and applies the message filter prior to sending to the subscriber.
* Although JMS has its own filtering language, this is JMS specific and since we don't want to expose implementation
* details in the JOBs API either explicitly or out of band, the JMS specific filters cant be used. As a replacement the
* API provides the MessageFilter API.
*/
private static final class FilteredTopicSubscriber implements Closeable, MessageListener {
private final Subscriber subscriber;
private final MessageFilter filter;
private final List<MessageConsumer> consumers = new ArrayList<MessageConsumer>();
public FilteredTopicSubscriber(@NotNull Session session,
@NotNull Subscriber subscriber,
@NotNull String[] topicNames,
@NotNull MessageFilter filter) throws JMSException {
this.subscriber = subscriber;
this.filter = filter;
for (String t : topicNames) {
MessageConsumer c = session.createConsumer(session.createTopic(t));
c.setMessageListener(this);
consumers.add(c);
}
}
@Override
public void onMessage(Message message) {
try {
LOGGER.info("Got message {} ", message);
Destination destination = message.getJMSDestination();
if (destination instanceof Topic) {
Topic topic = (Topic) destination;
String type = message.getJMSType();
if (JMSMessageTypes.JSON.equals(JMSMessageTypes.valueOf(type))) {
TextMessage textMessage = (TextMessage) message;
Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
Types.TopicName topicName = Types.topicName(topic.getTopicName());
if ( filter.accept(topicName, mapMessage) ) {
subscriber.onMessage(topicName, mapMessage);
}
}
}
} catch (JMSException e) {
LOGGER.warn("Failed to deliver message ",e);
}
}
@Override
public void close() throws IOException {
for (MessageConsumer c : consumers) {
try {
LOGGER.info("Closing consumer on dispose {} ",c);
c.close();
} catch (JMSException e) {
LOGGER.warn(e.getMessage(), e);
}
}
}
}
}