blob: 3f5f00b4d1097670cd8569bb186a25fe4ff908f6 [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis2.transport.jms;
import org.apache.axis2.AxisFault;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.description.ParameterInclude;
import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.transport.base.ParamUtils;
import org.apache.axis2.transport.base.ProtocolEndpoint;
import org.apache.axis2.transport.base.threads.WorkerPool;
import org.apache.axis2.transport.jms.ctype.ContentTypeRuleFactory;
import org.apache.axis2.transport.jms.ctype.ContentTypeRuleSet;
import org.apache.axis2.transport.jms.ctype.MessageTypeRule;
import org.apache.axis2.transport.jms.ctype.PropertyRule;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import javax.jms.BytesMessage;
import javax.jms.TextMessage;
import javax.naming.Context;
/**
* Class that links an Axis2 service to a JMS destination. Additionally, it contains
* all the required information to process incoming JMS messages and to inject them
* into Axis2.
*/
public class JMSEndpoint extends ProtocolEndpoint {
private static final Log log = LogFactory.getLog(JMSEndpoint.class);
private final JMSListener listener;
private final WorkerPool workerPool;
private JMSConnectionFactory cf;
private String jndiDestinationName;
private int destinationType = JMSConstants.GENERIC;
private String jndiReplyDestinationName;
private String replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
private Set<EndpointReference> endpointReferences = new HashSet<EndpointReference>();
private ContentTypeRuleSet contentTypeRuleSet;
private ServiceTaskManager serviceTaskManager;
public JMSEndpoint(JMSListener listener, WorkerPool workerPool) {
this.listener = listener;
this.workerPool = workerPool;
}
public String getJndiDestinationName() {
return jndiDestinationName;
}
private void setDestinationType(String destinationType) {
if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) {
this.destinationType = JMSConstants.TOPIC;
} else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) {
this.destinationType = JMSConstants.QUEUE;
} else {
this.destinationType = JMSConstants.GENERIC;
}
}
private void setReplyDestinationType(String destinationType) {
if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) {
this.replyDestinationType = JMSConstants.DESTINATION_TYPE_TOPIC;
} else if (JMSConstants.DESTINATION_TYPE_QUEUE.equalsIgnoreCase(destinationType)) {
this.replyDestinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
} else {
this.replyDestinationType = JMSConstants.DESTINATION_TYPE_GENERIC;
}
}
public String getJndiReplyDestinationName() {
return jndiReplyDestinationName;
}
public String getReplyDestinationType() {
return replyDestinationType;
}
@Override
public EndpointReference[] getEndpointReferences(AxisService service, String ip) {
return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]);
}
private void computeEPRs() {
List<EndpointReference> eprs = new ArrayList<EndpointReference>();
for (Object o : getService().getParameters()) {
Parameter p = (Parameter) o;
if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) {
if ("legacy".equalsIgnoreCase((String) p.getValue())) {
// if "legacy" specified, compute and replace it
endpointReferences.add(
new EndpointReference(getEPR()));
} else {
endpointReferences.add(new EndpointReference((String) p.getValue()));
}
}
}
if (eprs.isEmpty()) {
// if nothing specified, compute and return legacy EPR
endpointReferences.add(new EndpointReference(getEPR()));
}
}
/**
* Get the EPR for the given JMS connection factory and destination
* the form of the URL is
* jms:/<destination>?[<key>=<value>&]*
* Credentials Context.SECURITY_PRINCIPAL, Context.SECURITY_CREDENTIALS
* JMSConstants.PARAM_JMS_USERNAME and JMSConstants.PARAM_JMS_USERNAME are filtered
*
* @return the EPR as a String
*/
private String getEPR() {
StringBuffer sb = new StringBuffer();
sb.append(
JMSConstants.JMS_PREFIX).append(jndiDestinationName);
sb.append("?").
append(JMSConstants.PARAM_DEST_TYPE).append("=").append(
destinationType == JMSConstants.TOPIC ?
JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE);
if (contentTypeRuleSet != null) {
String contentTypeProperty = contentTypeRuleSet.getDefaultContentTypeProperty();
if (contentTypeProperty != null) {
sb.append("&");
sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM);
sb.append("=");
sb.append(contentTypeProperty);
}
}
for (Map.Entry<String,String> entry : cf.getParameters().entrySet()) {
if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) &&
!Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey()) &&
!JMSConstants.PARAM_JMS_USERNAME.equalsIgnoreCase(entry.getKey()) &&
!JMSConstants.PARAM_JMS_PASSWORD.equalsIgnoreCase(entry.getKey())) {
sb.append("&").append(
entry.getKey()).append("=").append(entry.getValue());
}
}
return sb.toString();
}
public ContentTypeRuleSet getContentTypeRuleSet() {
return contentTypeRuleSet;
}
public JMSConnectionFactory getCf() {
return cf;
}
public ServiceTaskManager getServiceTaskManager() {
return serviceTaskManager;
}
public void setServiceTaskManager(ServiceTaskManager serviceTaskManager) {
this.serviceTaskManager = serviceTaskManager;
}
@Override
public boolean loadConfiguration(ParameterInclude params) throws AxisFault {
// We only support endpoints configured at service level
if (!(params instanceof AxisService)) {
return false;
}
AxisService service = (AxisService)params;
cf = listener.getConnectionFactory(service);
if (cf == null) {
return false;
}
Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION);
if (destParam != null) {
jndiDestinationName = (String)destParam.getValue();
} else {
// Assume that the JNDI destination name is the same as the service name
jndiDestinationName = service.getName();
}
Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE);
if (destTypeParam != null) {
String paramValue = (String) destTypeParam.getValue();
if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
setDestinationType(paramValue);
} else {
throw new AxisFault("Invalid destinaton type value " + paramValue);
}
} else {
log.debug("JMS destination type not given. default queue");
destinationType = JMSConstants.QUEUE;
}
Parameter replyDestTypeParam = service.getParameter(JMSConstants.PARAM_REPLY_DEST_TYPE);
if (replyDestTypeParam != null) {
String paramValue = (String) replyDestTypeParam.getValue();
if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) ||
JMSConstants.DESTINATION_TYPE_TOPIC.equals(paramValue) ) {
setReplyDestinationType(paramValue);
} else {
throw new AxisFault("Invalid destinaton type value " + paramValue);
}
} else {
log.debug("JMS reply destination type not given. default queue");
replyDestinationType = JMSConstants.DESTINATION_TYPE_QUEUE;
}
jndiReplyDestinationName = ParamUtils.getOptionalParam(service,
JMSConstants.PARAM_REPLY_DESTINATION);
Parameter contentTypeParam = service.getParameter(JMSConstants.CONTENT_TYPE_PARAM);
if (contentTypeParam == null) {
contentTypeRuleSet = new ContentTypeRuleSet();
contentTypeRuleSet.addRule(new PropertyRule(BaseConstants.CONTENT_TYPE));
contentTypeRuleSet.addRule(new MessageTypeRule(BytesMessage.class, "application/octet-stream"));
contentTypeRuleSet.addRule(new MessageTypeRule(TextMessage.class, "text/plain"));
} else {
contentTypeRuleSet = ContentTypeRuleFactory.parse(contentTypeParam);
}
computeEPRs(); // compute service EPR and keep for later use
serviceTaskManager = ServiceTaskManagerFactory.createTaskManagerForService(cf, service, workerPool);
serviceTaskManager.setJmsMessageReceiver(new JMSMessageReceiver(listener, cf, this));
return true;
}
}