blob: 4c809c75fecc0ea6944dad2c3977edfde1ab0fc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.jms.domain;
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.commons.lang.StringUtils;
public class JmsBaseConnection implements Connection {
private final AtomicBoolean started = new AtomicBoolean(false);
protected String clientID;
protected ExceptionListener exceptionListener;
protected CommonContext context;
protected JmsBaseSession session;
public JmsBaseConnection(Map<String, String> connectionParams) {
this.clientID = UUID.randomUUID().toString();
context = new CommonContext();
//At lease one should be set
context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID));
context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID));
//optional
context.setProvider(connectionParams.get(CommonConstant.PROVIDER));
String nameServer = connectionParams.get(CommonConstant.NAMESERVER);
String consumerThreadNums = connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS);
String sendMsgTimeoutMillis = connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS);
String instanceName = connectionParams.get(CommonConstant.INSTANCE_NAME);
if (StringUtils.isNotEmpty(nameServer)) {
context.setNameServer(nameServer);
}
if (StringUtils.isNotEmpty(instanceName)) {
context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME));
}
if (StringUtils.isNotEmpty(consumerThreadNums)) {
context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums));
}
if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) {
context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis));
}
}
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
Preconditions.checkArgument(!transacted, "Not support transaction Session !");
Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == acknowledgeMode,
"Not support this acknowledge mode: " + acknowledgeMode);
if (null != this.session) {
return this.session;
}
synchronized (this) {
if (null != this.session) {
return this.session;
}
this.session = new JmsBaseSession(this, transacted, acknowledgeMode, context);
if (isStarted()) {
this.session.start();
}
return this.session;
}
}
@Override
public String getClientID() throws JMSException {
return this.clientID;
}
@Override
public void setClientID(String clientID) throws JMSException {
this.clientID = clientID;
}
@Override
public ConnectionMetaData getMetaData() throws JMSException {
return new JmsBaseConnectionMetaData();
}
@Override
public ExceptionListener getExceptionListener() throws JMSException {
return this.exceptionListener;
}
@Override
public void setExceptionListener(ExceptionListener listener) throws JMSException {
this.exceptionListener = listener;
}
@Override
public void start() throws JMSException {
if (started.compareAndSet(false, true)) {
if (this.session != null) {
this.session.start();
}
}
}
@Override
public void stop() throws JMSException {
//Stop the connection before closing it.
//Do nothing here.
}
@Override
public void close() throws JMSException {
if (started.compareAndSet(true, false)) {
if (this.session != null) {
this.session.close();
}
}
}
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new UnsupportedOperationException("Unsupported");
}
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new UnsupportedOperationException("Unsupported");
}
/**
* Whether the connection is started.
*
* @return whether the connection is started.
*/
public boolean isStarted() {
return started.get();
}
}