blob: 9f986def1ce1162f88cc314c1b2df4056fa0055b [file] [log] [blame]
/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis2.transport.jms;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.Session;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.transport.base.BaseConstants;
import org.apache.axis2.transport.base.threads.WorkerPool;
public class ServiceTaskManagerFactory {
private ServiceTaskManagerFactory() {}
/**
* Create a ServiceTaskManager for the service passed in and its corresponding JMSConnectionFactory
* @param jcf
* @param service
* @param workerPool
* @return
*/
public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf,
AxisService service, WorkerPool workerPool) {
String name = service.getName();
Map<String, String> svc = getServiceStringParameters(service.getParameters());
Map<String, String> cf = jcf.getParameters();
ServiceTaskManager stm = new ServiceTaskManager();
stm.setServiceName(name);
stm.addJmsProperties(cf);
stm.addJmsProperties(svc);
stm.setConnFactoryJNDIName(
getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf));
String destName = getOptionalStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf);
if (destName == null) {
destName = service.getName();
}
stm.setDestinationJNDIName(destName);
stm.setDestinationType(getDestinationType(svc, cf));
stm.setJmsSpec11(
getJMSSpecVersion(svc, cf));
stm.setTransactionality(
getTransactionality(svc, cf));
stm.setCacheUserTransaction(
getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf));
stm.setUserTransactionJNDIName(
getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf));
stm.setSessionTransacted(
getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf));
stm.setSessionAckMode(
getSessionAck(svc, cf));
stm.setMessageSelector(
getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf));
stm.setSubscriptionDurable(
getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf));
stm.setDurableSubscriberName(
getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf));
stm.setCacheLevel(
getCacheLevel(svc, cf));
stm.setPubSubNoLocal(
getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf));
Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf);
if (value != null) {
stm.setReceiveTimeout(value);
}
value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf);
if (value != null) {
stm.setConcurrentConsumers(value);
}
value = getOptionalIntProperty(JMSConstants.PARAM_MAX_CONSUMERS, svc, cf);
if (value != null) {
stm.setMaxConcurrentConsumers(value);
}
value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf);
if (value != null) {
stm.setIdleTaskExecutionLimit(value);
}
value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf);
if (value != null) {
stm.setMaxMessagesPerTask(value);
}
value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf);
if (value != null) {
stm.setInitialReconnectDuration(value);
}
value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf);
if (value != null) {
stm.setMaxReconnectDuration(value);
}
Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf);
if (dValue != null) {
stm.setReconnectionProgressionFactor(dValue);
}
stm.setWorkerPool(workerPool);
// remove processed properties from property bag
stm.removeJmsProperties(JMSConstants.PARAM_CONFAC_JNDI_NAME);
stm.removeJmsProperties(JMSConstants.PARAM_DESTINATION);
stm.removeJmsProperties(JMSConstants.PARAM_JMS_SPEC_VER);
stm.removeJmsProperties(BaseConstants.PARAM_TRANSACTIONALITY);
stm.removeJmsProperties(BaseConstants.PARAM_CACHE_USER_TXN);
stm.removeJmsProperties(BaseConstants.PARAM_USER_TXN_JNDI_NAME);
stm.removeJmsProperties(JMSConstants.PARAM_SESSION_TRANSACTED);
stm.removeJmsProperties(JMSConstants.PARAM_MSG_SELECTOR);
stm.removeJmsProperties(JMSConstants.PARAM_SUB_DURABLE);
stm.removeJmsProperties(JMSConstants.PARAM_DURABLE_SUB_NAME);
stm.removeJmsProperties(JMSConstants.PARAM_CACHE_LEVEL);
stm.removeJmsProperties(JMSConstants.PARAM_PUBSUB_NO_LOCAL);
stm.removeJmsProperties(JMSConstants.PARAM_RCV_TIMEOUT);
stm.removeJmsProperties(JMSConstants.PARAM_CONCURRENT_CONSUMERS);
stm.removeJmsProperties(JMSConstants.PARAM_MAX_CONSUMERS);
stm.removeJmsProperties(JMSConstants.PARAM_IDLE_TASK_LIMIT);
stm.removeJmsProperties(JMSConstants.PARAM_MAX_MSGS_PER_TASK);
stm.removeJmsProperties(JMSConstants.PARAM_RECON_INIT_DURATION);
stm.removeJmsProperties(JMSConstants.PARAM_RECON_MAX_DURATION);
stm.removeJmsProperties(JMSConstants.PARAM_RECON_FACTOR);
return stm;
}
private static Map<String, String> getServiceStringParameters(List<Parameter> list) {
Map<String, String> map = new HashMap<String, String>();
for (Parameter p : list) {
if (p.getValue() instanceof String) {
map.put(p.getName(), (String) p.getValue());
}
}
return map;
}
private static String getRqdStringProperty(String key, Map<String,String> svcMap,
Map<String,String> cfMap) {
String value = svcMap.get(key);
if (value == null) {
value = cfMap.get(key);
}
if (value == null) {
throw new AxisJMSException("Service/connection factory property : " + key);
}
return value;
}
private static String getOptionalStringProperty(String key, Map<String,String> svcMap,
Map<String,String> cfMap) {
String value = svcMap.get(key);
if (value == null) {
value = cfMap.get(key);
}
return value;
}
private static Boolean getOptionalBooleanProperty(String key, Map<String,String> svcMap,
Map<String,String> cfMap) {
String value = svcMap.get(key);
if (value == null) {
value = cfMap.get(key);
}
if (value == null) {
return null;
} else {
return Boolean.valueOf(value);
}
}
private static Integer getOptionalIntProperty(String key, Map<String,String> svcMap,
Map<String,String> cfMap) {
String value = svcMap.get(key);
if (value == null) {
value = cfMap.get(key);
}
if (value != null) {
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new AxisJMSException("Invalid value : " + value + " for " + key);
}
}
return null;
}
private static Double getOptionalDoubleProperty(String key, Map<String,String> svcMap,
Map<String,String> cfMap) {
String value = svcMap.get(key);
if (value == null) {
value = cfMap.get(key);
}
if (value != null) {
try {
return Double.parseDouble(value);
} catch (NumberFormatException e) {
throw new AxisJMSException("Invalid value : " + value + " for " + key);
}
}
return null;
}
private static int getTransactionality(Map<String,String> svcMap, Map<String,String> cfMap) {
String key = BaseConstants.PARAM_TRANSACTIONALITY;
String val = svcMap.get(key);
if (val == null) {
val = cfMap.get(key);
}
if (val == null) {
return BaseConstants.TRANSACTION_NONE;
} else {
if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) {
return BaseConstants.TRANSACTION_JTA;
} else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) {
return BaseConstants.TRANSACTION_LOCAL;
} else {
throw new AxisJMSException("Invalid option : " + val + " for parameter : " +
BaseConstants.STR_TRANSACTION_JTA);
}
}
}
private static int getDestinationType(Map<String,String> svcMap, Map<String,String> cfMap) {
String key = JMSConstants.PARAM_DEST_TYPE;
String val = svcMap.get(key);
if (val == null) {
val = cfMap.get(key);
}
if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) {
return JMSConstants.TOPIC;
}
return JMSConstants.QUEUE;
}
private static int getSessionAck(Map<String,String> svcMap, Map<String,String> cfMap) {
String key = JMSConstants.PARAM_SESSION_ACK;
String val = svcMap.get(key);
if (val == null) {
val = cfMap.get(key);
}
if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) {
return Session.AUTO_ACKNOWLEDGE;
} else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) {
return Session.CLIENT_ACKNOWLEDGE;
} else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){
return Session.DUPS_OK_ACKNOWLEDGE;
} else if ("SESSION_TRANSACTED".equals(val)) {
return 0; //Session.SESSION_TRANSACTED;
} else {
try {
return Integer.parseInt(val);
} catch (NumberFormatException ignore) {
throw new AxisJMSException("Invalid session acknowledgement mode : " + val);
}
}
}
private static int getCacheLevel(Map<String,String> svcMap, Map<String,String> cfMap) {
String key = JMSConstants.PARAM_CACHE_LEVEL;
String val = svcMap.get(key);
if (val == null) {
val = cfMap.get(key);
}
if ("none".equalsIgnoreCase(val)) {
return JMSConstants.CACHE_NONE;
} else if ("connection".equalsIgnoreCase(val)) {
return JMSConstants.CACHE_CONNECTION;
} else if ("session".equals(val)){
return JMSConstants.CACHE_SESSION;
} else if ("consumer".equals(val)) {
return JMSConstants.CACHE_CONSUMER;
} else if ("producer".equals(val)) {
return JMSConstants.CACHE_PRODUCER;
} else if (val != null) {
throw new AxisJMSException("Invalid cache level : " + val);
}
return JMSConstants.CACHE_AUTO;
}
private static boolean getJMSSpecVersion(Map<String,String> svcMap, Map<String,String> cfMap) {
String key = JMSConstants.PARAM_JMS_SPEC_VER;
String val = svcMap.get(key);
if (val == null) {
val = cfMap.get(key);
}
if (val == null || "1.1".equals(val)) {
return true;
} else {
return false;
}
}
}