blob: 3df63239396744be9649cbc2daad5f80fc7e8963 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cocoon.components.slide.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.slide.common.NamespaceAccessToken;
import org.apache.slide.common.ServiceAccessException;
import org.apache.slide.common.SlideToken;
import org.apache.slide.content.AbstractContentInterceptor;
import org.apache.slide.content.NodeRevisionContent;
import org.apache.slide.content.NodeRevisionDescriptor;
import org.apache.slide.content.NodeRevisionDescriptors;
import org.apache.slide.lock.ObjectLockedException;
import org.apache.slide.security.AccessDeniedException;
import org.apache.slide.structure.LinkedObjectNotFoundException;
import org.apache.slide.structure.ObjectNotFoundException;
import org.apache.slide.util.logger.Logger;
/**
* A ContentInterceptor for Slide that publishes
* invalidation events to a JMS topic.
*/
public class JMSContentInterceptor extends AbstractContentInterceptor {
// ---------------------------------------------------- constants
private static final String LOG_CHANNEL = "JMSContentInterceptor";
private static final String PARAM_TOPIC_FACTORY = "topic-factory";
private static final String PARAM_TOPIC = "topic";
private static final String PARAM_PERSISTENT = "persistent-delivery";
private static final String PARAM_PRIORITY = "priority";
private static final String PARAM_TIME_TO_LIVE = "time-to-live";
private static final String PARAM_INITIAL_CONTEXT_FACTORY = Context.INITIAL_CONTEXT_FACTORY;
private static final String PARAM_PROVIDER_URL = Context.PROVIDER_URL;
private static final String DEFAULT_TOPIC_FACTORY = "JmsTopicConnectionFactory";
private static final String DEFAULT_TOPIC = "topic1";
private static final String DEFAULT_PERSISTENT = "false";
private static final String DEFAULT_PRIORITY = "4";
private static final String DEFAULT_TIME_TO_LIVE = "1000";
private static final String DEFAULT_INITIAL_CONTEXT_FACTORY = "org.exolab.jms.jndi.InitialContextFactory";
private static final String DEFAULT_PROVIDER_URL = "rmi://localhost:1099/";
// ---------------------------------------------------- member variables
// JMS objects
private TopicConnection m_connection;
private TopicSession m_session;
private Topic m_topic;
private TopicPublisher m_publisher;
// configuration options
private String m_topicFactoryName;
private String m_topicName;
private int m_deliveryMode;
private int m_priority;
private long m_timeToLive;
private Hashtable m_jndiProps;
// queue of messages to be published
private List m_queue = Collections.synchronizedList(new ArrayList());
// the started state of the interceptor
private boolean m_started = false;
// ---------------------------------------------------- lifecycle
public JMSContentInterceptor() {
}
/**
* Configure the interceptor.
* <p>
* The following parameters are recognized:
* <ul>
* <li>
* <code>java.naming.factory.initial</code> [<code>org.exolab.jms.jndi.InitialContextFactory</code>]
* - initial jndi context factory.
* </li>
* <li>
* <code>java.naming.provider.url</code> [<code>rmi://localhost:1099/</code>] - jndi provider url.
* </li>
* <li>
* <code>topic-factory<code> [<code>JmsTopicConnectionFactory</code>] - the JNDI lookup name
* of the JMS TopicConnectionFactory.
* </li>
* <li>
* <code>topic</code> [<code>topic1</code>] - the name of the topic to publish messages to.
* </li>
* <li>
* <code>persistent-delivery</code> [<code>false</code>] - message delivery mode.
* </li>
* <li>
* <code>priority</code> [<code>4</code>] - message priority.
* </li>
* <li>
* <code>time-to-live</code> [<code>1000</code>] - message lifetime in ms.
* </li>
* </ul>
* </p>
*/
public void setParameters(Hashtable params) {
super.setParameters(params);
// parse the JMS related parameters
m_topicFactoryName = getParameter(PARAM_TOPIC_FACTORY,DEFAULT_TOPIC_FACTORY);
m_topicName = getParameter(PARAM_TOPIC,DEFAULT_TOPIC);
boolean persistent = Boolean.valueOf(getParameter(PARAM_PERSISTENT,DEFAULT_PERSISTENT)).booleanValue();
m_deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
m_priority = Integer.valueOf(getParameter(PARAM_PRIORITY,DEFAULT_PRIORITY)).intValue();
m_timeToLive = Long.valueOf(getParameter(PARAM_TIME_TO_LIVE,DEFAULT_TIME_TO_LIVE)).longValue();
// parse the JNDI related parameters
m_jndiProps = new Hashtable();
m_jndiProps.put(Context.INITIAL_CONTEXT_FACTORY,
getParameter(PARAM_INITIAL_CONTEXT_FACTORY,
DEFAULT_INITIAL_CONTEXT_FACTORY)
);
m_jndiProps.put(Context.PROVIDER_URL,
getParameter(PARAM_PROVIDER_URL,
DEFAULT_PROVIDER_URL)
);
}
/**
* Sets up the JMS connection.
*/
public void setNamespace(NamespaceAccessToken nat) {
super.setNamespace(nat);
// setup the JMS connection and session
try {
Context context = new InitialContext(m_jndiProps);
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory)
context.lookup(m_topicFactoryName);
m_connection = topicConnectionFactory.createTopicConnection();
m_connection.start();
m_session = m_connection.createTopicSession(false,Session.DUPS_OK_ACKNOWLEDGE);
m_topic = m_session.createTopic(m_topicName);
m_publisher = m_session.createPublisher(m_topic);
/* a background thread does the actual publishing
* of messages. This is because JMS Session
* and its derivatives are single threaded
* and so if we were to publish messages synchronously
* we'd have to create a new Session
* each time we need to publish a message.
*/
Thread t = new Thread(new Runnable() {
public void run() {
m_started = true;
while (m_started) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue
}
if (m_queue.size() == 0) continue;
List list = m_queue;
m_queue = Collections.synchronizedList(new ArrayList());
Iterator iter = list.iterator();
while (iter.hasNext()) {
String msg = (String) iter.next();
if (getLogger().isEnabled(Logger.INFO)) {
getLogger().log("Sending message: " + msg,Logger.INFO);
}
try {
m_publisher.publish(m_session.createTextMessage(msg),
m_deliveryMode,m_priority,m_timeToLive);
}
catch (JMSException e) {
getLogger().log("Failure sending JMS message.",e,LOG_CHANNEL,Logger.ERROR);
}
}
}
}
});
t.setPriority(Thread.NORM_PRIORITY);
t.start();
} catch (NamingException e) {
getLogger().log("Failure while connecting to JMS server.",e,LOG_CHANNEL,Logger.ERROR);
} catch (JMSException e) {
getLogger().log("Failure while connecting to JMS server.",e,LOG_CHANNEL,Logger.ERROR);
}
}
private String getParameter(String name, String defaultValue) {
String value = super.getParameter(name);
if (value == null) {
value = defaultValue;
}
return value;
}
// ---------------------------------------------------- interception methods
public void postRemoveContent(
SlideToken slideToken,
NodeRevisionDescriptors revisions,
NodeRevisionDescriptor revision)
throws
AccessDeniedException,
ObjectNotFoundException,
LinkedObjectNotFoundException,
ObjectLockedException,
ServiceAccessException {
if (!m_started) {
return;
}
if (revisions == null) {
return;
}
queueMessage(revisions.getUri(),"remove");
}
public void postStoreContent(
SlideToken slideToken,
NodeRevisionDescriptors revisions,
NodeRevisionDescriptor revision,
NodeRevisionContent content)
throws
AccessDeniedException,
ObjectNotFoundException,
LinkedObjectNotFoundException,
ObjectLockedException,
ServiceAccessException {
if (!m_started) {
return;
}
if (revisions == null) {
return;
}
queueMessage(revisions.getUri(),"store");
}
private void queueMessage(String uri, String type) {
String msg = "slide-interceptor:" + type + "|" + getNamespace().getName() + uri;
m_queue.add(msg);
}
private Logger getLogger() {
return getNamespace().getLogger();
}
}