Release rocketmq-jms 1.0.0 version
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..07a5fa6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,32 @@
+# RocketMQ Externals
+
+There are some RocketMQ external projects, with the purpose of growing the RocketMQ community.
+
+## RocketMQ-Console-Ng
+A console for RocketMQ
+
+## RocketMQ-JMS
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.
+
+## RocketMQ-Flume-Ng
+
+This project is used to receive and send messages between
+[RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume)
+
+1. Firstly, please get familiar with [RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume).
+2. Ensure that the jar related to [RocketMQ](http://rocketmq.incubator.apache.org/dowloading/releases) exists in local maven repository.
+3. Execute the following command in rocketmq-flume root directory
+
+ `mvn clean install dependency:copy-dependencies`
+
+4. Copy the jar depended by rocketmq-flume to `$FLUME_HOME/lib`(the specific jar will be given later)
+
+
+## RocketMQ-Spark
+
+Apache Spark-Streaming integration with RocketMQ. Both push & pull consumer mode are provided.
+For more details please refer to rocketmq-spark README.md.
+
+## RocketMQ-Docker
+Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image.
+
diff --git a/rocketmq-jms/.gitignore b/rocketmq-jms/.gitignore
new file mode 100644
index 0000000..d2e5aaf
--- /dev/null
+++ b/rocketmq-jms/.gitignore
@@ -0,0 +1,5 @@
+.idea/
+*.iml
+*.ipr
+*.iws
+target/
diff --git a/rocketmq-jms/.travis.yml b/rocketmq-jms/.travis.yml
new file mode 100644
index 0000000..9f430b2
--- /dev/null
+++ b/rocketmq-jms/.travis.yml
@@ -0,0 +1,43 @@
+notifications:
+ email:
+ recipients:
+ - zhangke.huangshan@gmail.com
+ - zhendongliu92@gmail.com
+ on_success: change
+ on_failure: always
+
+language: java
+
+matrix:
+ include:
+ # On OSX, run with default JDK only.
+ # - os: osx
+ # On Linux, run with specific JDKs only.
+ # - os: linux
+ # env: CUSTOM_JDK="oraclejdk8"
+ - os: linux
+ env: CUSTOM_JDK="oraclejdk7"
+ #- os: linux
+ # env: CUSTOM_JDK="openjdk7"
+
+before_install:
+ - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
+ - cat ~/.mavenrc
+ - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
+ - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
+
+#os:
+# - linux
+# - osx
+#jdk:
+# - oraclejdk8
+# - oraclejdk7
+# - openjdk7
+
+
+script:
+ - travis_retry mvn -B clean install jacoco:report coveralls:report
+
+#after_success:
+# - mvn clean install
+# - mvn sonar:sonar
diff --git a/rocketmq-jms/README.md b/rocketmq-jms/README.md
new file mode 100644
index 0000000..a05e27e
--- /dev/null
+++ b/rocketmq-jms/README.md
@@ -0,0 +1,31 @@
+# RocketMQ-JMS [![Build Status](https://travis-ci.org/rocketmq/rocketmq-jms.svg?branch=master)](https://travis-ci.org/rocketmq/rocketmq-jms) [![Coverage Status](https://coveralls.io/repos/github/rocketmq/rocketmq-jms/badge.svg?branch=master)](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master)
+
+
+## Introduction
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker.
+Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.
+
+Now RocketMQ-JMS will release the first version soon, and new features will be developed on the branch "v1.1".
+Please visit the [issue board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next version.
+
+
+## Building
+
+ > cd rocketmq-jms
+ > mvn clean install
+
+ **run unit test:**
+ > mvn test
+
+ **run integration test:**
+ > mvn verify
+
+ **see jacoco code coverage report**
+ > open core/target/site/jacoco/index.html
+ > open core/target/site/jacoco-it/index.html
+ > open spring/target/site/jacoco-it/index.html
+
+
+## Guidelines
+
+ Please see [Coding Guidelines Introduction](http://rocketmq.apache.org/docs/code-guidelines/)
diff --git a/rocketmq-jms/core/pom.xml b/rocketmq-jms/core/pom.xml
new file mode 100644
index 0000000..c1ed1d0
--- /dev/null
+++ b/rocketmq-jms/core/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-jms-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>1.0.0</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-jms</artifactId>
+ <version>1.0.0</version>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
new file mode 100644
index 0000000..80a8b64
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface CommonConstant {
+
+ String PRODUCERID = "producerId";
+
+ String CONSUMERID = "consumerId";
+
+ String PROVIDER = "provider";
+
+ String NAMESERVER = "nameServer";
+
+ String INSTANCE_NAME = "instanceName";
+
+ String CONSUME_THREAD_NUMS = "consumeThreadNums";
+
+ String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis";
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
new file mode 100644
index 0000000..c8e4276
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+public class CommonContext {
+ private String accessKey;
+ private String secretKey;
+
+ private String consumerId;
+ private String producerId;
+
+ private String provider;
+
+ private String appId;
+
+ private String nameServer;
+
+ /**
+ * MQType
+ */
+ private String mqType;
+
+ /**
+ * Using for distinguishing client jvm process
+ */
+ private String instanceName;
+ /**
+ * Set consumer threadPool Size
+ */
+ private int consumeThreadNums;
+ /**
+ * Set send message timeOut
+ */
+ private int sendMsgTimeoutMillis = -1;
+
+ /**
+ * @return the appId
+ */
+ public String getAppId() {
+ return appId;
+ }
+
+ /**
+ * @param appId the appId to set
+ */
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ /**
+ * @return the provider
+ */
+ public String getProvider() {
+ return provider;
+ }
+
+ /**
+ * @param provider the provider to set
+ */
+ public void setProvider(String provider) {
+ this.provider = provider;
+ }
+
+ /**
+ * @return the instanceName
+ */
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ /**
+ * @param instanceName the instanceName to set
+ */
+ public void setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ /**
+ * @return the accessKey
+ */
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ /**
+ * @param accessKey the accessKey to set
+ */
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ /**
+ * @return the secretKey
+ */
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ /**
+ * @param secretKey the secretKey to set
+ */
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ /**
+ * @return consumer thread nums
+ */
+ public int getConsumeThreadNums() {
+ return consumeThreadNums;
+ }
+
+ /**
+ * @param consumeThreadNums
+ */
+ public void setConsumeThreadNums(int consumeThreadNums) {
+ this.consumeThreadNums = consumeThreadNums;
+ }
+
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ public String getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(String producerId) {
+ this.producerId = producerId;
+ }
+
+ public int getSendMsgTimeoutMillis() {
+ return sendMsgTimeoutMillis;
+ }
+
+ public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) {
+ this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
+ }
+
+ public String getMqType() {
+ return mqType;
+ }
+
+ public void setMqType(String mqType) {
+ this.mqType = mqType;
+ }
+
+ public String getNameServer() {
+ return nameServer;
+ }
+
+ public void setNameServer(String nameServer) {
+ this.nameServer = nameServer;
+ }
+
+ @Override
+ public String toString() {
+ return ReflectionToStringBuilder.toString(this, ToStringStyle.DEFAULT_STYLE);
+ }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
new file mode 100644
index 0000000..4c809c7
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.commons.lang.StringUtils;
+
+public class JmsBaseConnection implements Connection {
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ protected String clientID;
+ protected ExceptionListener exceptionListener;
+ protected CommonContext context;
+ protected JmsBaseSession session;
+
+ public JmsBaseConnection(Map<String, String> connectionParams) {
+
+ this.clientID = UUID.randomUUID().toString();
+
+ context = new CommonContext();
+
+ //At lease one should be set
+ context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID));
+ context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID));
+
+ //optional
+ context.setProvider(connectionParams.get(CommonConstant.PROVIDER));
+
+ String nameServer = connectionParams.get(CommonConstant.NAMESERVER);
+ String consumerThreadNums = connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS);
+ String sendMsgTimeoutMillis = connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS);
+ String instanceName = connectionParams.get(CommonConstant.INSTANCE_NAME);
+
+ if (StringUtils.isNotEmpty(nameServer)) {
+ context.setNameServer(nameServer);
+ }
+ if (StringUtils.isNotEmpty(instanceName)) {
+ context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME));
+ }
+
+ if (StringUtils.isNotEmpty(consumerThreadNums)) {
+ context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums));
+ }
+ if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) {
+ context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis));
+ }
+ }
+
+ @Override
+ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+
+ Preconditions.checkArgument(!transacted, "Not support transaction Session !");
+ Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == acknowledgeMode,
+ "Not support this acknowledge mode: " + acknowledgeMode);
+
+ if (null != this.session) {
+ return this.session;
+ }
+ synchronized (this) {
+ if (null != this.session) {
+ return this.session;
+ }
+ this.session = new JmsBaseSession(this, transacted, acknowledgeMode, context);
+ if (isStarted()) {
+ this.session.start();
+ }
+ return this.session;
+ }
+ }
+
+ @Override
+ public String getClientID() throws JMSException {
+ return this.clientID;
+ }
+
+ @Override
+ public void setClientID(String clientID) throws JMSException {
+ this.clientID = clientID;
+ }
+
+ @Override
+ public ConnectionMetaData getMetaData() throws JMSException {
+ return new JmsBaseConnectionMetaData();
+ }
+
+ @Override
+ public ExceptionListener getExceptionListener() throws JMSException {
+ return this.exceptionListener;
+ }
+
+ @Override
+ public void setExceptionListener(ExceptionListener listener) throws JMSException {
+ this.exceptionListener = listener;
+ }
+
+ @Override
+ public void start() throws JMSException {
+ if (started.compareAndSet(false, true)) {
+ if (this.session != null) {
+ this.session.start();
+ }
+
+ }
+ }
+
+ @Override
+ public void stop() throws JMSException {
+ //Stop the connection before closing it.
+ //Do nothing here.
+ }
+
+ @Override
+ public void close() throws JMSException {
+ if (started.compareAndSet(true, false)) {
+ if (this.session != null) {
+ this.session.close();
+ }
+
+ }
+ }
+
+ @Override
+ public ConnectionConsumer createConnectionConsumer(Destination destination,
+ String messageSelector,
+ ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+ String messageSelector,
+ ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ /**
+ * Whether the connection is started.
+ *
+ * @return whether the connection is started.
+ */
+ public boolean isStarted() {
+ return started.get();
+ }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
new file mode 100644
index 0000000..1b9da06
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.util.URISpecParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseConnectionFactory implements ConnectionFactory {
+
+ private static Logger logger = LoggerFactory
+ .getLogger(JmsBaseConnectionFactory.class);
+ /**
+ * Synchronization monitor for the shared Connection
+ */
+ private final Object connectionMonitor = new Object();
+ /**
+ * Can be configured in a consistent way without too much URL hacking.
+ */
+ protected URI connectionUri;
+ /**
+ * Store connection uri query parameters.
+ */
+ protected Map<String, String> connectionParams;
+ /**
+ * Wrapped Connection
+ */
+ protected JmsBaseConnection connection;
+
+ public JmsBaseConnectionFactory() {
+
+ }
+
+ public JmsBaseConnectionFactory(URI connectionUri) {
+ setConnectionUri(connectionUri);
+ }
+
+ public void setConnectionUri(URI connectionUri) {
+ Preconditions.checkNotNull(connectionUri, "Please set URI !");
+ this.connectionUri = connectionUri;
+ this.connectionParams = URISpecParser.parseURI(connectionUri.toString());
+
+ if (null != connectionParams) {
+ Preconditions.checkState(null != connectionParams.get(CommonConstant.CONSUMERID) ||
+ null != connectionParams.get(CommonConstant.PRODUCERID), "Please set consumerId or ProducerId !");
+ }
+
+ }
+
+ @Override
+ public Connection createConnection() throws JMSException {
+ synchronized (this.connectionMonitor) {
+ if (this.connection == null) {
+ initConnection();
+ }
+ return this.connection;
+ }
+ }
+
+ /**
+ * Using userName and Password to create a connection
+ *
+ * @param userName ignored
+ * @param password ignored
+ * @return the new JMS Connection
+ * @throws JMSException
+ */
+ @Override
+ public Connection createConnection(String userName, String password) throws JMSException {
+ logger.debug("Using userName and Password to create a connection.");
+ return this.createConnection();
+ }
+
+ /**
+ * Initialize the underlying shared Connection.
+ * <p/>
+ * Closes and reInitializes the Connection if an underlying Connection is present already.
+ *
+ * @throws javax.jms.JMSException if thrown by JMS API methods
+ */
+ protected void initConnection() throws JMSException {
+ synchronized (this.connectionMonitor) {
+ if (this.connection != null) {
+ closeConnection(this.connection);
+ }
+ this.connection = doCreateConnection();
+ logger.debug("Established shared JMS Connection: {}", this.connection);
+ }
+ }
+
+ /**
+ * Close the given Connection.
+ *
+ * @param con the Connection to close
+ */
+ protected void closeConnection(Connection con) {
+ logger.debug("Closing shared JMS Connection: {}", this.connection);
+ try {
+ try {
+ con.stop();
+ }
+ finally {
+ con.close();
+ }
+ }
+ catch (Throwable ex) {
+ logger.error("Could not close shared JMS Connection.", ex);
+ }
+ }
+
+ /**
+ * Create a JMS Connection
+ *
+ * @return the new JMS Connection
+ * @throws javax.jms.JMSException if thrown by JMS API methods
+ */
+ protected JmsBaseConnection doCreateConnection() throws JMSException {
+ Preconditions.checkState(null != this.connectionParams && this.connectionParams.size() > 0,
+ "Connection Parameters can not be null!");
+ this.connection = new JmsBaseConnection(this.connectionParams);
+
+ return connection;
+ }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
new file mode 100644
index 0000000..ee549aa
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+public class JmsBaseConnectionMetaData implements ConnectionMetaData {
+ public static final String JMS_VERSION;
+ public static final int JMS_MAJOR_VERSION;
+ public static final int JMS_MINOR_VERSION;
+
+ public static final String PROVIDER_VERSION;
+ public static final int PROVIDER_MAJOR_VERSION;
+ public static final int PROVIDER_MINOR_VERSION;
+
+ public static final String PROVIDER_NAME = "Apache RocketMQ";
+
+ public static final JmsBaseConnectionMetaData INSTANCE = new JmsBaseConnectionMetaData();
+
+ public static InputStream resourceStream;
+
+ static {
+ Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+
+ String jmsVersion = null;
+ int jmsMajor = 0;
+ int jmsMinor = 0;
+ try {
+ Package p = Package.getPackage("javax.jms");
+ if (p != null) {
+ jmsVersion = p.getImplementationVersion();
+ Matcher m = pattern.matcher(jmsVersion);
+ if (m.matches()) {
+ jmsMajor = Integer.parseInt(m.group(1));
+ jmsMinor = Integer.parseInt(m.group(2));
+ }
+ }
+ }
+ catch (Throwable e) {
+ }
+ JMS_VERSION = jmsVersion;
+ JMS_MAJOR_VERSION = jmsMajor;
+ JMS_MINOR_VERSION = jmsMinor;
+
+ String providerVersion = null;
+ int providerMajor = 0;
+ int providerMinor = 0;
+ Properties properties = new Properties();
+ try {
+ resourceStream = JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf");
+ properties.load(resourceStream);
+ providerVersion = properties.getProperty("version");
+
+ Matcher m = pattern.matcher(providerVersion);
+ if (m.matches()) {
+ providerMajor = Integer.parseInt(m.group(1));
+ providerMinor = Integer.parseInt(m.group(2));
+ }
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ PROVIDER_VERSION = providerVersion;
+ PROVIDER_MAJOR_VERSION = providerMajor;
+ PROVIDER_MINOR_VERSION = providerMinor;
+
+ }
+
+ public String getJMSVersion() throws JMSException {
+ return JMS_VERSION;
+ }
+
+ public int getJMSMajorVersion() throws JMSException {
+ return JMS_MAJOR_VERSION;
+ }
+
+ public int getJMSMinorVersion() throws JMSException {
+ return JMS_MINOR_VERSION;
+ }
+
+ public String getJMSProviderName() throws JMSException {
+ return PROVIDER_NAME;
+ }
+
+ public String getProviderVersion() throws JMSException {
+ return PROVIDER_VERSION;
+ }
+
+ public int getProviderMajorVersion() throws JMSException {
+ return PROVIDER_MAJOR_VERSION;
+ }
+
+ public int getProviderMinorVersion() throws JMSException {
+ return PROVIDER_MINOR_VERSION;
+ }
+
+ public Enumeration<?> getJMSXPropertyNames() throws JMSException {
+ Vector<String> jmxProperties = new Vector<String>();
+ jmxProperties.add("jmsXUserId");
+ jmxProperties.add("jmsXAppId");
+ jmxProperties.add("jmsXGroupID");
+ jmxProperties.add("jmsXGroupSeq");
+ jmxProperties.add("jmsXState");
+ jmxProperties.add("jmsXDeliveryCount");
+ jmxProperties.add("jmsXProducerTXID");
+ jmxProperties.add("jmsConsumerTXID");
+ jmxProperties.add("jmsRecvTimeStamp");
+ return jmxProperties.elements();
+ }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
new file mode 100644
index 0000000..f0bca28
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface JmsBaseConstant {
+ //------------------------JMS message header constant---------------------------------
+ String JMS_DESTINATION = "jmsDestination";
+ String JMS_DELIVERY_MODE = "jmsDeliveryMode";
+ String JMS_EXPIRATION = "jmsExpiration";
+ String JMS_DELIVERY_TIME = "jmsDeliveryTime";
+ String JMS_PRIORITY = "jmsPriority";
+ String JMS_MESSAGE_ID = "jmsMessageID";
+ String JMS_TIMESTAMP = "jmsTimestamp";
+ String JMS_CORRELATION_ID = "jmsCorrelationID";
+ String JMS_REPLY_TO = "jmsReplyTo";
+ String JMS_TYPE = "jmsType";
+ String JMS_REDELIVERED = "jmsRedelivered";
+
+ //-------------------------JMS defined properties constant----------------------------
+ /**
+ * The identity of the user sending the Send message
+ */
+ String JMS_XUSER_ID = "jmsXUserID";
+ /**
+ * The identity of the application Send sending the message
+ */
+ String JMS_XAPP_ID = "jmsXAppID";
+ /**
+ * The number of message delivery Receive attempts
+ */
+ String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount";
+ /**
+ * The identity of the message group this message is part of
+ */
+ String JMS_XGROUP_ID = "jmsXGroupID";
+ /**
+ * The sequence number of this message within the group; the first message is 1, the second 2,...
+ */
+ String JMS_XGROUP_SEQ = "jmsXGroupSeq";
+ /**
+ * The transaction identifier of the Send transaction within which this message was produced
+ */
+ String JMS_XPRODUCER_TXID = "jmsXProducerTXID";
+ /**
+ * The transaction identifier of the Receive transaction within which this message was consumed
+ */
+ String JMS_XCONSUMER_TXID = "jmsXConsumerTXID";
+
+ /**
+ * The time JMS delivered the Receive message to the consumer
+ */
+ String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp";
+ /**
+ * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and
+ * that these copies exist from the time the original message was sent. Each copy’s state is one of: 1(waiting),
+ * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided
+ * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this.
+ */
+ String JMS_XSTATE = "jmsXState";
+
+ //---------------------------JMS Headers' value constant---------------------------
+ /**
+ * Default time to live
+ */
+ long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000;
+
+ /**
+ * Default Jms Type
+ */
+ String DEFAULT_JMS_TYPE = "rocketmq";
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
new file mode 100644
index 0000000..b62e928
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessageConsumer implements MessageConsumer {
+
+ private static final Object LOCK_OBJECT = new Object();
+ //all shared consumers
+ private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> consumerMap = new MapMaker().makeMap();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private CommonContext context;
+ private Destination destination;
+ private MessageListener messageListener;
+
+ public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext,
+ JmsBaseConnection connection) throws JMSException {
+ synchronized (LOCK_OBJECT) {
+ checkArgs(destination, commonContext);
+
+ if (null == consumerMap.get(context.getConsumerId())) {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId());
+ if (context.getConsumeThreadNums() > 0) {
+ consumer.setConsumeThreadMax(context.getConsumeThreadNums());
+ consumer.setConsumeThreadMin(context.getConsumeThreadNums());
+ }
+ if (!Strings.isNullOrEmpty(context.getNameServer())) {
+ consumer.setNamesrvAddr(context.getNameServer());
+ }
+ if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+ consumer.setInstanceName(context.getInstanceName());
+ }
+ consumer.setConsumeMessageBatchMaxSize(1);
+ //add subscribe?
+ RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer);
+ consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt);
+ }
+
+ consumerMap.get(context.getConsumerId()).incrementAndGet();
+
+ //If the connection has been started, start the consumer right now.
+ //add start status?
+ RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId());
+ if (connection.isStarted()) {
+ try {
+ consumerExt.start();
+ }
+ catch (MQClientException mqe) {
+ JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId());
+ jmsException.initCause(mqe);
+ throw jmsException;
+ }
+ }
+ }
+
+ }
+
+ private void checkArgs(Destination destination, CommonContext context) throws JMSException {
+ Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can not be null!");
+ Preconditions.checkNotNull(destination.toString(), "Destination can not be null!");
+ this.context = context;
+ this.destination = destination;
+ }
+
+ @Override
+ public String getMessageSelector() throws JMSException {
+ return null;
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSException {
+ return this.messageListener;
+ }
+
+ @Override
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+ if (null != rocketmqConsumerExt) {
+ try {
+ this.messageListener = listener;
+ String messageTopic = ((JmsBaseTopic) destination).getMessageTopic();
+ String messageType = ((JmsBaseTopic) destination).getMessageType();
+ rocketmqConsumerExt.subscribe(messageTopic, messageType, listener);
+ }
+ catch (MQClientException mqe) {
+ //add what?
+ throw new JMSException(mqe.getMessage());
+ }
+
+ }
+
+ }
+
+ @Override
+ public Message receive() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public Message receive(long timeout) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public Message receiveNoWait() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public void close() throws JMSException {
+ synchronized (LOCK_OBJECT) {
+ if (closed.compareAndSet(false, true)) {
+ RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+ if (null != rocketmqConsumerExt && 0 == rocketmqConsumerExt.decrementAndGet()) {
+ rocketmqConsumerExt.close();
+ consumerMap.remove(context.getConsumerId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Start the consumer to get message from the Broker.
+ */
+ public void startConsumer() throws JMSException {
+ RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+ if (null != rocketmqConsumerExt) {
+ try {
+ rocketmqConsumerExt.start();
+ }
+ catch (MQClientException mqe) {
+ throw ExceptionUtil.convertToJmsException(mqe, "Start consumer failed");
+ }
+ }
+ }
+
+ public Destination getDestination() throws JMSException {
+ return this.destination;
+ }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
new file mode 100644
index 0000000..8dd82f0
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentMap;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseMessageProducer implements MessageProducer {
+
+ private static final Object LOCK_OBJECT = new Object();
+ private static ConcurrentMap<String, MQProducer> producerMap = new MapMaker().makeMap();
+ private final Logger logger = LoggerFactory.getLogger(JmsBaseMessageProducer.class);
+ private CommonContext context;
+
+ private Destination destination;
+
+ public JmsBaseMessageProducer(Destination destination, CommonContext context) throws JMSException {
+ synchronized (LOCK_OBJECT) {
+ checkArgs(destination, context);
+
+ if (null == producerMap.get(this.context.getProducerId())) {
+ DefaultMQProducer producer = new DefaultMQProducer(context.getProducerId());
+ if (!Strings.isNullOrEmpty(context.getNameServer())) {
+ producer.setNamesrvAddr(context.getNameServer());
+ }
+ if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+ producer.setInstanceName(context.getInstanceName());
+ }
+ if (context.getSendMsgTimeoutMillis() > 0) {
+ producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis());
+ }
+ try {
+ producer.start();
+ }
+ catch (MQClientException mqe) {
+ throw ExceptionUtil.convertToJmsException(mqe, String.format("Start producer failed:%s", context.getProducerId()));
+ }
+ producerMap.putIfAbsent(this.context.getProducerId(), producer);
+ }
+
+ }
+ }
+
+ private void checkArgs(Destination destination, CommonContext context) throws JMSException {
+ Preconditions.checkNotNull(context.getProducerId(), "ProducerId can not be null!");
+ Preconditions.checkNotNull(destination.toString(), "Destination can not be null!");
+ this.context = context;
+ this.destination = destination;
+ }
+
+ @Override
+ public boolean getDisableMessageID() throws JMSException {
+ return false;
+ }
+
+ @Override
+ public void setDisableMessageID(boolean value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public boolean getDisableMessageTimestamp() throws JMSException {
+ return false;
+ }
+
+ @Override
+ public void setDisableMessageTimestamp(boolean value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public int getDeliveryMode() throws JMSException {
+ return javax.jms.Message.DEFAULT_DELIVERY_MODE;
+ }
+
+ @Override
+ public void setDeliveryMode(int deliveryMode) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public int getPriority() throws JMSException {
+ return javax.jms.Message.DEFAULT_PRIORITY;
+ }
+
+ @Override
+ public void setPriority(int defaultPriority) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public long getTimeToLive() throws JMSException {
+ return JmsBaseConstant.DEFAULT_TIME_TO_LIVE;
+ }
+
+ @Override
+ public void setTimeToLive(long timeToLive) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public Destination getDestination() throws JMSException {
+ return this.destination;
+ }
+
+ @Override
+ public void close() throws JMSException {
+ //Nothing to do
+ }
+
+ @Override
+ public void send(javax.jms.Message message) throws JMSException {
+ this.send(getDestination(), message);
+ }
+
+ /**
+ * Send the message to the defined Destination success---return normally. Exception---throw out JMSException.
+ *
+ * @param destination see <CODE>Destination</CODE>
+ * @param message the message to be sent.
+ * @throws javax.jms.JMSException
+ */
+ @Override
+ public void send(Destination destination, javax.jms.Message message) throws JMSException {
+ JmsBaseMessage jmsMsg = (JmsBaseMessage) message;
+ initJMSHeaders(jmsMsg, destination);
+
+ try {
+ if (context == null) {
+ throw new IllegalStateException("Context should be inited");
+ }
+ org.apache.rocketmq.common.message.Message rocketmqMsg = MessageConverter.convert2RMQMessage(jmsMsg);
+
+ MQProducer producer = producerMap.get(context.getProducerId());
+
+ if (producer == null) {
+ throw new Exception("producer is null ");
+ }
+ SendResult sendResult = producer.send(rocketmqMsg);
+ if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
+ jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + sendResult.getMsgId());
+ } else {
+ throw new Exception("SendResult is " + (sendResult == null ? "null" : sendResult.toString()));
+ }
+ }
+ catch (Exception e) {
+ logger.error("Send rocketmq message failure !", e);
+ //if fail to send the message, throw out JMSException
+ JMSException jmsException = new JMSException("Send rocketmq message failure!");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
+
+ @Override
+ public void send(javax.jms.Message message, int deliveryMode, int priority,
+ long timeToLive) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public void send(Destination destination, javax.jms.Message message, int deliveryMode,
+ int priority, long timeToLive) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ /**
+ * Init the JmsMessage Headers.
+ * <p/>
+ * <P>JMS providers init message's headers. Do not allow user to set these by yourself.
+ *
+ * @param jmsMsg message
+ * @param destination
+ * @throws javax.jms.JMSException
+ * @see <CODE>Destination</CODE>
+ */
+ private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination destination) throws JMSException {
+
+ //JMS_DESTINATION default:"topic:message"
+ jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination);
+ //JMS_DELIVERY_MODE default : PERSISTENT
+ jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE);
+ //JMS_TIMESTAMP default : current time
+ jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, System.currentTimeMillis());
+ //JMS_EXPIRATION default : 3 days
+ //JMS_EXPIRATION = currentTime + time_to_live
+ jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE);
+ //JMS_PRIORITY default : 4
+ jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY);
+ //JMS_TYPE default : open notification service
+ jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, JmsBaseConstant.DEFAULT_JMS_TYPE);
+ //JMS_REPLY_TO,JMS_CORRELATION_ID default : null
+ //JMS_MESSAGE_ID is set by sendResult.
+ //JMS_REDELIVERED is set by broker.
+ }
+
+ /**
+ * Init the OnsMessage Headers.
+ * <p/>
+ * <P>When converting JmsMessage to OnsMessage, should read from the JmsMessage's Properties and write to the
+ * OnsMessage's Properties.
+ *
+ * @param jmsMsg message
+ * @throws javax.jms.JMSException
+ */
+ public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg,
+ String topic, String messageType) throws JMSException {
+ Properties userProperties = new Properties();
+
+ //Jms userProperties to properties
+ Map<String, Object> userProps = jmsMsg.getProperties();
+ Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator();
+ while (userPropsIter.hasNext()) {
+ Map.Entry<String, Object> entry = userPropsIter.next();
+ userProperties.setProperty(entry.getKey(), entry.getValue().toString());
+ }
+ //Jms systemProperties to ROCKETMQ properties
+ Map<String, Object> sysProps = jmsMsg.getHeaders();
+ Iterator<Map.Entry<String, Object>> sysPropsIter = sysProps.entrySet().iterator();
+ while (sysPropsIter.hasNext()) {
+ Map.Entry<String, Object> entry = sysPropsIter.next();
+ userProperties.setProperty(entry.getKey(), entry.getValue().toString());
+ }
+
+ //Jms message Model
+ if (jmsMsg instanceof JmsBytesMessage) {
+ userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_BYTES);
+ }
+ else if (jmsMsg instanceof JmsObjectMessage) {
+ userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_OBJ);
+ }
+ else if (jmsMsg instanceof JmsTextMessage) {
+ userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT);
+ }
+
+ //message topic and tag
+ userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic);
+ userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType);
+
+ return userProperties;
+ }
+
+}
+
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
new file mode 100644
index 0000000..5bf7005
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseSession implements Session {
+ protected CommonContext context;
+ protected JmsBaseConnection connection;
+ protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList =
+ new CopyOnWriteArrayList<JmsBaseMessageConsumer>();
+ private boolean transacted = true;
+ private int acknowledgeMode = AUTO_ACKNOWLEDGE;
+
+ public JmsBaseSession(JmsBaseConnection connection, boolean transacted,
+ int acknowledgeMode, CommonContext context) {
+ this.context = context;
+ this.acknowledgeMode = acknowledgeMode;
+ this.transacted = transacted;
+ this.connection = connection;
+ }
+
+ @Override
+ public BytesMessage createBytesMessage() throws JMSException {
+ return new JmsBytesMessage();
+ }
+
+ @Override
+ public MapMessage createMapMessage() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public Message createMessage() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() throws JMSException {
+ return new JmsObjectMessage();
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+ return new JmsObjectMessage(object);
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public TextMessage createTextMessage() throws JMSException {
+ return new JmsTextMessage();
+ }
+
+ @Override
+ public TextMessage createTextMessage(String text) throws JMSException {
+ return new JmsTextMessage(text);
+ }
+
+ @Override
+ public boolean getTransacted() throws JMSException {
+ return this.transacted;
+ }
+
+ @Override
+ public int getAcknowledgeMode() {
+ return this.acknowledgeMode;
+ }
+
+ @Override
+ public void commit() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public void rollback() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public void close() throws JMSException {
+ for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+ messageConsumer.close();
+ }
+ consumerList.clear();
+ }
+
+ @Override
+ public void recover() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSException {
+ return null;
+ }
+
+ @Override
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public void run() {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ return new JmsBaseMessageProducer(destination, context);
+ }
+
+ /**
+ * Create a MessageConsumer.
+ * <p/>
+ * <P>Create a durable consumer to the specified destination
+ *
+ * @param destination Equals to Topic:MessageType in ROCKETMQ
+ * @throws javax.jms.JMSException
+ * @see <CODE>Destination</CODE>
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ JmsBaseMessageConsumer messageConsumer = new
+ JmsBaseMessageConsumer(destination, this.context, this.connection);
+ this.consumerList.addIfAbsent(messageConsumer);
+ return messageConsumer;
+ }
+
+ /**
+ * Create a MessageConsumer with messageSelector.
+ * <p/>
+ * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages
+ *
+ * @param destination Equals to Topic:MessageType in ROCKETMQ
+ * @param messageSelector For filtering messages
+ * @throws javax.jms.JMSException
+ * @see <CODE>Destination</CODE>
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector)
+ throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+
+ }
+
+ /**
+ * Create a MessageConsumer with messageSelector.
+ * <p/>
+ * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages and do not support this mechanism to reject
+ * messages from localhost.
+ *
+ * @param destination Equals to Topic:MessageType in ROCKETMQ
+ * @param messageSelector For filtering messages
+ * @param noLocal If true: reject messages from localhost
+ * @throws javax.jms.JMSException
+ * @see <CODE>Destination</CODE>
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector,
+ boolean noLocal) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public Queue createQueue(String queueName) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public Topic createTopic(String topicName) throws JMSException {
+ Preconditions.checkNotNull(topicName);
+ List<String> msgTuple = Arrays.asList(topicName.split(":"));
+
+ Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2,
+ "Destination must match messageTopic:messageType !");
+
+ //If messageType is null, use * instead.
+ if (1 == msgTuple.size()) {
+ return new JmsBaseTopic(msgTuple.get(0), "*");
+ }
+ return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1));
+ }
+
+ /**
+ * Create a MessageConsumer with durable subscription.
+ * <p/>
+ * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable
+ * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method.
+ *
+ * @param topic destination
+ * @throws javax.jms.JMSException
+ * @see <CODE>Topic</CODE>
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name)
+ throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ /**
+ * Create a MessageConsumer with durable subscription.
+ * <p/>
+ * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable
+ * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method.
+ *
+ * @param topic destination
+ * @throws javax.jms.JMSException
+ * @see <CODE>Topic</CODE>
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name,
+ String messageSelector,
+ boolean noLocal) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public QueueBrowser createBrowser(Queue queue) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public TemporaryQueue createTemporaryQueue() throws JMSException {
+ return new TemporaryQueue() {
+ public void delete() throws JMSException {
+ }
+
+ public String getQueueName() throws JMSException {
+ return UUID.randomUUID().toString();
+ }
+ };
+ }
+
+ @Override
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ return new TemporaryTopic() {
+ public void delete() throws JMSException {
+ }
+
+ public String getTopicName() throws JMSException {
+ return UUID.randomUUID().toString();
+ }
+ };
+ }
+
+ @Override
+ public void unsubscribe(String name) throws JMSException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ public void start() throws JMSException {
+ for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+ messageConsumer.startConsumer();
+ }
+ }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
new file mode 100644
index 0000000..b7e2fab
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+public class JmsBaseTopic implements Topic {
+
+ private String messageTopic;
+ private String messageType;
+
+ public JmsBaseTopic(String messageTopic, String messageType) {
+ Preconditions.checkNotNull(messageTopic);
+ Preconditions.checkNotNull(messageType);
+
+ this.messageTopic = messageTopic;
+ this.messageType = messageType;
+ }
+
+ public String getTopicName() throws JMSException {
+ return this.toString();
+ }
+
+ public String toString() {
+ return Joiner.on(":").join(this.getMessageTopic(), this.getMessageType());
+ }
+
+ public String getMessageTopic() {
+ return messageTopic;
+ }
+
+ public String getMessageType() {
+ return messageType;
+ }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
new file mode 100644
index 0000000..7a8a9f7
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.util.MessageConverter;
+
+public class RMQPushConsumerExt {
+ private final MQPushConsumer consumer;
+ private final ConcurrentHashMap<String/* Topic */, javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, javax.jms.MessageListener>();
+
+ private AtomicInteger referenceCount = new AtomicInteger(0);
+ private AtomicBoolean started = new AtomicBoolean(false);
+
+ public RMQPushConsumerExt(MQPushConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ public MQPushConsumer getConsumer() {
+ return consumer;
+ }
+
+ public int incrementAndGet() {
+ return referenceCount.incrementAndGet();
+ }
+
+ public int decrementAndGet() {
+ return referenceCount.decrementAndGet();
+ }
+
+ public int getReferenceCount() {
+ return referenceCount.get();
+ }
+ public void start() throws MQClientException {
+ if (consumer == null) {
+ throw new MQClientException(-1, "consumer is null");
+ }
+
+ if (this.started.compareAndSet(false, true)) {
+ this.consumer.registerMessageListener(new MessageListenerImpl());
+ this.consumer.start();
+ }
+ }
+
+
+ public void close() {
+ if (this.started.compareAndSet(true, false)) {
+ this.consumer.shutdown();
+ }
+ }
+
+ public void subscribe(String topic, String subExpression, javax.jms.MessageListener listener) throws MQClientException {
+ if (null == topic) {
+ throw new MQClientException(-1, "topic is null");
+ }
+
+ if (null == listener) {
+ throw new MQClientException(-1, "listener is null");
+ }
+
+ try {
+ this.subscribeTable.put(topic, listener);
+ this.consumer.subscribe(topic, subExpression);
+ } catch (MQClientException e) {
+ throw new MQClientException("consumer subscribe exception", e);
+ }
+ }
+
+ public void unsubscribe(String topic) {
+ if (null != topic) {
+ this.consumer.unsubscribe(topic);
+ }
+ }
+
+ class MessageListenerImpl implements MessageListenerConcurrently {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgsRMQList, ConsumeConcurrentlyContext contextRMQ) {
+ MessageExt msgRMQ = msgsRMQList.get(0);
+ javax.jms.MessageListener listener = RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic());
+ if (null == listener) {
+ throw new RuntimeException("MessageListener is null");
+ }
+
+ try {
+ listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ));
+ }
+ catch (Exception e) {
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ }
+
+
+ public boolean isStarted() {
+ return started.get();
+ }
+
+
+ public boolean isClosed() {
+ return !isStarted();
+ }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
new file mode 100644
index 0000000..ea4b49e
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
+import java.io.Serializable;
+import java.util.Enumeration;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessage implements Message {
+ /**
+ * Message properties
+ */
+ protected Map<String, Object> properties = Maps.newHashMap();
+ /**
+ * Message headers
+ */
+ protected Map<String, Object> headers = Maps.newHashMap();
+ /**
+ * Message body
+ */
+ protected Serializable body;
+
+ @Override
+ public String getJMSMessageID() {
+ return (String) headers.get(JmsBaseConstant.JMS_MESSAGE_ID);
+ }
+
+ /**
+ * Sets the message ID.
+ * <p/>
+ * <P>JMS providers set this field when a message is sent. Do not allow User to set the message ID by yourself.
+ *
+ * @param id the ID of the message
+ * @see javax.jms.Message#getJMSMessageID()
+ */
+
+ @Override
+ public void setJMSMessageID(String id) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public long getJMSTimestamp() {
+ if (headers.containsKey(JmsBaseConstant.JMS_TIMESTAMP)) {
+ return (Long) headers.get(JmsBaseConstant.JMS_TIMESTAMP);
+ }
+ return 0;
+ }
+
+ @Override
+ public void setJMSTimestamp(long timestamp) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes() {
+ String jmsCorrelationID = getJMSCorrelationID();
+ if (jmsCorrelationID != null) {
+ try {
+ return BaseEncoding.base64().decode(jmsCorrelationID);
+ }
+ catch (Exception e) {
+ return jmsCorrelationID.getBytes();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void setJMSCorrelationIDAsBytes(byte[] correlationID) {
+ String encodedText = BaseEncoding.base64().encode(correlationID);
+ setJMSCorrelationID(encodedText);
+ }
+
+ @Override
+ public String getJMSCorrelationID() {
+ if (headers.containsKey(JmsBaseConstant.JMS_CORRELATION_ID)) {
+ return (String) headers.get(JmsBaseConstant.JMS_CORRELATION_ID);
+ }
+ return null;
+ }
+
+ @Override
+ public void setJMSCorrelationID(String correlationID) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public Destination getJMSReplyTo() {
+ if (headers.containsKey(JmsBaseConstant.JMS_REPLY_TO)) {
+ return (Destination) headers.get(JmsBaseConstant.JMS_REPLY_TO);
+ }
+ return null;
+ }
+
+ @Override
+ public void setJMSReplyTo(Destination replyTo) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+
+ @Override
+ public Destination getJMSDestination() {
+ if (headers.containsKey(JmsBaseConstant.JMS_DESTINATION)) {
+ return (Destination) headers.get(JmsBaseConstant.JMS_DESTINATION);
+ }
+ return null;
+ }
+
+ @Override
+ public void setJMSDestination(Destination destination) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getBody(Class<T> clazz) throws JMSException {
+ if (clazz.isInstance(body)) {
+ return (T) body;
+ }
+ else {
+ throw new IllegalArgumentException("The class " + clazz
+ + " is unknown to this implementation");
+ }
+ }
+
+ @Override
+ public int getJMSDeliveryMode() {
+ if (headers.containsKey(JmsBaseConstant.JMS_DELIVERY_MODE)) {
+ return (Integer) headers.get(JmsBaseConstant.JMS_DELIVERY_MODE);
+ }
+ return 0;
+ }
+
+ /**
+ * Sets the <CODE>DeliveryMode</CODE> value for this message.
+ * <p/>
+ * <P>JMS providers set this field when a message is sent. ROCKETMQ only support DeliveryMode.PERSISTENT mode. So do not
+ * allow User to set this by yourself, but you can get the default mode by <CODE>getJMSDeliveryMode</CODE> method.
+ *
+ * @param deliveryMode the delivery mode for this message
+ * @see javax.jms.Message#getJMSDeliveryMode()
+ * @see javax.jms.DeliveryMode
+ */
+
+ @Override
+ public void setJMSDeliveryMode(int deliveryMode) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public boolean isBodyAssignableTo(Class<?> clazz) throws JMSException {
+ return clazz.isInstance(body);
+ }
+
+ @Override
+ public boolean getJMSRedelivered() {
+ return headers.containsKey(JmsBaseConstant.JMS_REDELIVERED)
+ && (Boolean) headers.get(JmsBaseConstant.JMS_REDELIVERED);
+ }
+
+ @Override
+ public void setJMSRedelivered(boolean redelivered) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ /**
+ * copy meta data from source message
+ *
+ * @param sourceMessage source message
+ */
+ public void copyMetaData(JmsBaseMessage sourceMessage) {
+ if (!sourceMessage.getHeaders().isEmpty()) {
+ for (Map.Entry<String, Object> entry : sourceMessage.getHeaders().entrySet()) {
+ if (!headerExits(entry.getKey())) {
+ setHeader(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ if (!sourceMessage.getProperties().isEmpty()) {
+ for (Map.Entry<String, Object> entry : sourceMessage.getProperties().entrySet()) {
+ if (!propertyExists(entry.getKey())) {
+ setObjectProperty(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ @Override
+ public String getJMSType() {
+ return (String) headers.get(JmsBaseConstant.JMS_TYPE);
+ }
+
+ @Override
+ public void setJMSType(String type) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public Map<String, Object> getHeaders() {
+ return this.headers;
+ }
+
+ @Override
+ public long getJMSExpiration() {
+ if (headers.containsKey(JmsBaseConstant.JMS_EXPIRATION)) {
+ return (Long) headers.get(JmsBaseConstant.JMS_EXPIRATION);
+ }
+ return 0;
+ }
+
+ @Override
+ public void setJMSExpiration(long expiration) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public boolean headerExits(String name) {
+ return this.headers.containsKey(name);
+ }
+
+ @Override
+ public int getJMSPriority() {
+ if (headers.containsKey(JmsBaseConstant.JMS_PRIORITY)) {
+ return (Integer) headers.get(JmsBaseConstant.JMS_PRIORITY);
+ }
+ return 5;
+ }
+
+ @Override
+ public void setJMSPriority(int priority) {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void setHeader(String name, Object value) {
+ this.headers.put(name, value);
+ }
+
+ public Map<String, Object> getProperties() {
+ return this.properties;
+ }
+
+ public void setProperties(Map<String, Object> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public void acknowledge() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ @Override
+ public void clearProperties() {
+ this.properties.clear();
+ }
+
+ @Override
+ public void clearBody() {
+ this.body = null;
+ }
+
+ @Override
+ public boolean propertyExists(String name) {
+ return properties.containsKey(name);
+ }
+
+ @Override
+ public boolean getBooleanProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Boolean ? (Boolean) value : Boolean.valueOf(value.toString());
+ }
+ return false;
+ }
+
+ @Override
+ public byte getByteProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Byte ? (Byte) value : Byte.valueOf(value.toString());
+ }
+ return 0;
+ }
+
+ @Override
+ public short getShortProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Short ? (Short) value : Short.valueOf(value.toString());
+ }
+ return 0;
+ }
+
+ @Override
+ public int getIntProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Integer ? (Integer) value : Integer.valueOf(value.toString());
+ }
+ return 0;
+ }
+
+ @Override
+ public long getLongProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Long ? (Long) value : Long.valueOf(value.toString());
+ }
+ return 0L;
+ }
+
+ @Override
+ public float getFloatProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Float ? (Float) value : Float.valueOf(value.toString());
+ }
+ return 0f;
+ }
+
+ @Override
+ public double getDoubleProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ Object value = getObjectProperty(name);
+ return value instanceof Double ? (Double) value : Double.valueOf(value.toString());
+ }
+ return 0d;
+ }
+
+ @Override
+ public String getStringProperty(String name) throws JMSException {
+ if (propertyExists(name)) {
+ return getObjectProperty(name).toString();
+ }
+ return null;
+ }
+
+ @Override
+ public Object getObjectProperty(String name) throws JMSException {
+ return this.properties.get(name);
+ }
+
+ @Override
+ public Enumeration<?> getPropertyNames() throws JMSException {
+ final Object[] keys = this.properties.keySet().toArray();
+ return new Enumeration<Object>() {
+ int i;
+
+ @Override
+ public boolean hasMoreElements() {
+ return i < keys.length;
+ }
+
+ @Override
+ public Object nextElement() {
+ return keys[i++];
+ }
+ };
+ }
+
+ @Override
+ public void setBooleanProperty(String name, boolean value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setByteProperty(String name, byte value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setShortProperty(String name, short value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setIntProperty(String name, int value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setLongProperty(String name, long value) {
+ setObjectProperty(name, value);
+ }
+
+ public void setFloatProperty(String name, float value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setDoubleProperty(String name, double value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setStringProperty(String name, String value) {
+ setObjectProperty(name, value);
+ }
+
+ @Override
+ public void setObjectProperty(String name, Object value) {
+ if (value instanceof Number || value instanceof String || value instanceof Boolean) {
+ this.properties.put(name, value);
+ }
+ else {
+ throw new IllegalArgumentException(
+ "Value should be boolean, byte, short, int, long, float, double, and String.");
+ }
+ }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
new file mode 100644
index 0000000..b1e85b0
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+/**
+ * The <CODE>BytesMessage</CODE> methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>. <P> Notice:Although the JMS API allows the use of message properties with byte
+ * messages, they are typically not used, since the inclusion of properties may affect the format. <P>
+ */
+public class JmsBytesMessage extends JmsBaseMessage implements BytesMessage {
+ private DataInputStream dataAsInput;
+ private DataOutputStream dataAsOutput;
+ private ByteArrayOutputStream bytesOut;
+ private byte[] bytesIn;
+
+ /**
+ * Message created for reading
+ *
+ * @param data
+ */
+ public JmsBytesMessage(byte[] data) {
+ this.bytesIn = data;
+ dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length));
+ }
+
+ /**
+ * Message created to be sent
+ */
+ public JmsBytesMessage() {
+ bytesOut = new ByteArrayOutputStream();
+ dataAsOutput = new DataOutputStream(bytesOut);
+ }
+
+ public long getBodyLength() throws JMSException {
+ return getData().length;
+ }
+
+ /**
+ * @return the data
+ */
+ public byte[] getData() {
+ if (bytesOut != null) {
+ return bytesOut.toByteArray();
+ }
+ else {
+ return bytesIn;
+ }
+
+ }
+
+ public boolean readBoolean() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public byte readByte() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public int readUnsignedByte() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public short readShort() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public int readUnsignedShort() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public char readChar() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public int readInt() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public long readLong() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public float readFloat() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public double readDouble() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public String readUTF() throws JMSException {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public int readBytes(byte[] value) throws JMSException {
+ return readBytes(value, value.length);
+ }
+
+ public int readBytes(byte[] value, int length) throws JMSException {
+ if (length > value.length) {
+ throw new IndexOutOfBoundsException("length must be smaller than the length of value");
+ }
+ if (dataAsInput == null) {
+ throw new MessageNotReadableException("Message is not readable! ");
+ }
+ try {
+ int offset = 0;
+ while (offset < length) {
+ int read = dataAsInput.read(value, offset, length - offset);
+ if (read < 0) {
+ break;
+ }
+ offset += read;
+ }
+
+ if (offset == 0 && length != 0) {
+ return -1;
+ }
+ else {
+ return offset;
+ }
+ }
+ catch (IOException e) {
+ throw handleInputException(e);
+ }
+
+ }
+
+ public void writeBoolean(boolean value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeByte(byte value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeShort(short value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeChar(char value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeInt(int value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeLong(long value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeFloat(float value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeDouble(double value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeUTF(String value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void writeBytes(byte[] value) throws JMSException {
+ if (dataAsOutput == null) {
+ throw new MessageNotWriteableException("Message is not writable! ");
+ }
+ try {
+ dataAsOutput.write(value);
+ }
+ catch (IOException e) {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+ if (dataAsOutput == null) {
+ throw new MessageNotWriteableException("Message is not writable! ");
+ }
+ try {
+ dataAsOutput.write(value, offset, length);
+ }
+ catch (IOException e) {
+ throw handleOutputException(e);
+ }
+ }
+
+ public void writeObject(Object value) throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ public void reset() throws JMSException {
+ ExceptionUtil.handleUnSupportedException();
+ }
+
+ private JMSException handleOutputException(final IOException e) {
+ JMSException ex = new JMSException(e.getMessage());
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+
+ private JMSException handleInputException(final IOException e) {
+ JMSException ex;
+ if (e instanceof EOFException) {
+ ex = new MessageEOFException(e.getMessage());
+ }
+ else {
+ ex = new MessageFormatException(e.getMessage());
+ }
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
new file mode 100644
index 0000000..f67da14
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+public class JmsObjectMessage extends JmsBaseMessage implements ObjectMessage {
+
+ public JmsObjectMessage(Serializable object) {
+ this.body = object;
+ }
+
+ public JmsObjectMessage() {
+
+ }
+
+ public Serializable getObject() throws JMSException {
+ return this.body;
+ }
+
+ public void setObject(Serializable object) throws JMSException {
+ this.body = object;
+ }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
new file mode 100644
index 0000000..ce19b51
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+public class JmsTextMessage extends JmsBaseMessage implements TextMessage {
+ private String text;
+
+ public JmsTextMessage() {
+
+ }
+
+ public JmsTextMessage(String text) {
+ setText(text);
+ }
+
+ public void clearBody() {
+ this.text = null;
+ super.clearBody();
+ }
+
+ public String getText() throws JMSException {
+ return this.text;
+ }
+
+ public void setText(String text) {
+ this.body = text;
+ this.text = text;
+ }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
new file mode 100644
index 0000000..bd926e5
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+
+public class ExceptionUtil {
+ public static final boolean SKIP_SET_EXCEPTION
+ = Boolean.parseBoolean(System.getProperty("skip.set.exception", "false"));
+
+ public static void handleUnSupportedException() {
+ if (!ExceptionUtil.SKIP_SET_EXCEPTION) {
+ throw new UnsupportedOperationException("Operation unsupported! If you want to skip this Exception," +
+ " use '-Dskip.set.exception=true' in JVM options.");
+ }
+ }
+
+ public static JMSException convertToJmsException(Exception e, String extra) {
+ Preconditions.checkNotNull(extra);
+ Preconditions.checkNotNull(e);
+ JMSException jmsException = new JMSException(extra);
+ jmsException.initCause(e);
+ return jmsException;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
new file mode 100644
index 0000000..3cf03f9
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.jms.BytesMessage;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseTopic;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+
+import static org.apache.rocketmq.jms.domain.JmsBaseMessageProducer.initRocketMQHeaders;
+
+public class MessageConverter {
+ public static byte[] getContentFromJms(javax.jms.Message jmsMessage) throws Exception {
+ byte[] content;
+ if (jmsMessage instanceof TextMessage) {
+ if (StringUtils.isEmpty(((TextMessage) jmsMessage).getText())) {
+ throw new IllegalArgumentException("Message body length is zero");
+ }
+ content = MsgConvertUtil.string2Bytes(((TextMessage) jmsMessage).getText(),
+ Charsets.UTF_8.toString());
+ }
+ else if (jmsMessage instanceof ObjectMessage) {
+ if (((ObjectMessage) jmsMessage).getObject() == null) {
+ throw new IllegalArgumentException("Message body length is zero");
+ }
+ content = MsgConvertUtil.objectSerialize(((ObjectMessage) jmsMessage).getObject());
+ }
+ else if (jmsMessage instanceof BytesMessage) {
+ JmsBytesMessage bytesMessage = (JmsBytesMessage) jmsMessage;
+ if (bytesMessage.getBodyLength() == 0) {
+ throw new IllegalArgumentException("Message body length is zero");
+ }
+ content = bytesMessage.getData();
+ }
+ else {
+ throw new IllegalArgumentException("Unknown message type " + jmsMessage.getJMSType());
+ }
+
+ return content;
+ }
+
+ public static JmsBaseMessage convert2JMSMessage(MessageExt msg) throws Exception {
+ JmsBaseMessage message;
+ if (MsgConvertUtil.MSGMODEL_BYTES.equals(
+ msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+ message = new JmsBytesMessage(msg.getBody());
+ }
+ else if (MsgConvertUtil.MSGMODEL_OBJ.equals(
+ msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+ message = new JmsObjectMessage(MsgConvertUtil.objectDeserialize(msg.getBody()));
+ }
+ else if (MsgConvertUtil.MSGMODEL_TEXT.equals(
+ msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+ message = new JmsTextMessage(MsgConvertUtil.bytes2String(msg.getBody(),
+ Charsets.UTF_8.toString()));
+ }
+ else {
+ // rocketmq producer sends bytesMessage without setting JMS_MSGMODEL.
+ message = new JmsBytesMessage(msg.getBody());
+ }
+
+ //-------------------------set headers-------------------------
+ Map<String, Object> properties = new HashMap<String, Object>();
+
+ message.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + msg.getMsgId());
+
+ if (msg.getReconsumeTimes() > 0) {
+ message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.TRUE);
+ }
+ else {
+ message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
+ }
+
+ Map<String, String> propertiesMap = msg.getProperties();
+ if (propertiesMap != null) {
+ for (String properName : propertiesMap.keySet()) {
+ String properValue = propertiesMap.get(properName);
+ if (JmsBaseConstant.JMS_DESTINATION.equals(properName)) {
+ String destinationStr = properValue;
+ if (null != destinationStr) {
+ List<String> msgTuple = Arrays.asList(destinationStr.split(":"));
+ message.setHeader(JmsBaseConstant.JMS_DESTINATION,
+ new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1)));
+ }
+ }
+ else if (JmsBaseConstant.JMS_DELIVERY_MODE.equals(properName) ||
+ JmsBaseConstant.JMS_PRIORITY.equals(properName)) {
+ message.setHeader(properName, properValue);
+ }
+ else if (JmsBaseConstant.JMS_TIMESTAMP.equals(properName) ||
+ JmsBaseConstant.JMS_EXPIRATION.equals(properName)) {
+ message.setHeader(properName, properValue);
+ }
+ else if (JmsBaseConstant.JMS_CORRELATION_ID.equals(properName) ||
+ JmsBaseConstant.JMS_TYPE.equals(properName)) {
+ message.setHeader(properName, properValue);
+ }
+ else if (JmsBaseConstant.JMS_MESSAGE_ID.equals(properName) ||
+ JmsBaseConstant.JMS_REDELIVERED.equals(properName)) {
+ //JMS_MESSAGE_ID should set by msg.getMsgID()
+ continue;
+ }
+ else {
+ properties.put(properName, properValue);
+ }
+ }
+ }
+
+ //Handle System properties, put into header.
+ //add what?
+ message.setProperties(properties);
+
+ return message;
+ }
+
+ public static Message convert2RMQMessage(JmsBaseMessage jmsMsg) throws Exception {
+ Message rocketmqMsg = new MessageExt();
+ // 1. Transform message body
+ rocketmqMsg.setBody(MessageConverter.getContentFromJms(jmsMsg));
+
+ // 2. Transform topic and messageType
+ JmsBaseTopic destination = (JmsBaseTopic) jmsMsg.getHeaders().get(JmsBaseConstant.JMS_DESTINATION);
+ String topic = destination.getMessageTopic();
+ rocketmqMsg.setTopic(topic);
+ String messageType = destination.getMessageType();
+ Preconditions.checkState(!messageType.contains("||"),
+ "'||' can not be in the destination when sending a message");
+ rocketmqMsg.setTags(messageType);
+
+ // 3. Transform message properties
+ Properties properties = initRocketMQHeaders(jmsMsg, topic, messageType);
+ for (String name : properties.stringPropertyNames()) {
+ String value = properties.getProperty(name);
+ if (MessageConst.PROPERTY_KEYS.equals(name)) {
+ rocketmqMsg.setKeys(value);
+ } else if (MessageConst.PROPERTY_TAGS.equals(name)) {
+ rocketmqMsg.setTags(value);
+ } else if (MessageConst.PROPERTY_DELAY_TIME_LEVEL.equals(name)) {
+ rocketmqMsg.setDelayTimeLevel(Integer.parseInt(value));
+ } else if (MessageConst.PROPERTY_WAIT_STORE_MSG_OK.equals(name)) {
+ rocketmqMsg.setWaitStoreMsgOK(Boolean.parseBoolean(value));
+ } else if (MessageConst.PROPERTY_BUYER_ID.equals(name)) {
+ rocketmqMsg.setBuyerId(value);
+ } else {
+ rocketmqMsg.putUserProperty(name, value);
+ }
+ }
+
+ return rocketmqMsg;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
new file mode 100644
index 0000000..ec55bbc
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class MsgConvertUtil {
+
+ public static final byte[] EMPTY_BYTES = new byte[0];
+ public static final String EMPTY_STRING = "";
+
+ public static final String JMS_MSGMODEL = "jmsMsgModel";
+ /**
+ * To adapt this scene: "Notify client try to receive ObjectMessage sent by JMS client" Set notify out message
+ * model, value can be textMessage OR objectMessage
+ */
+ public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel";
+
+ public static final String MSGMODEL_TEXT = "textMessage";
+ public static final String MSGMODEL_BYTES = "bytesMessage";
+ public static final String MSGMODEL_OBJ = "objectMessage";
+
+ public static final String MSG_TOPIC = "msgTopic";
+ public static final String MSG_TYPE = "msgType";
+
+ public static byte[] objectSerialize(Object object) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(object);
+ oos.close();
+ baos.close();
+ return baos.toByteArray();
+ }
+
+ public static Serializable objectDeserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ ois.close();
+ bais.close();
+ return (Serializable) ois.readObject();
+ }
+
+ public static final byte[] string2Bytes(String s, String charset) {
+ if (null == s) {
+ return EMPTY_BYTES;
+ }
+ byte[] bs = null;
+ try {
+ bs = s.getBytes(charset);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ return bs;
+ }
+
+ public static final String bytes2String(byte[] bs, String charset) {
+ if (null == bs) {
+ return EMPTY_STRING;
+ }
+ String s = null;
+ try {
+ s = new String(bs, charset);
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ return s;
+ }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
new file mode 100644
index 0000000..9b29928
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+
+public abstract class URISpecParser {
+
+ private static final String DEFAULT_BROKER = "rocketmq";
+
+ /**
+ * ConnectionUrl spec is broker://ip:port?key1=value1&key2=value2
+ *
+ * @param uri Just like broker://ip:port?key1=value1&key2=value2
+ * @return The parameters' map
+ */
+ public static Map<String, String> parseURI(String uri) {
+ Preconditions.checkArgument(null != uri && !uri.trim().isEmpty(), "Uri can not be empty!");
+
+ Map<String, String> results = Maps.newHashMap();
+ String broker = uri.substring(0, uri.indexOf(":"));
+ results.put(CommonConstant.PROVIDER, broker);
+
+ if (broker.equals(DEFAULT_BROKER)) {
+ //Special handle for alibaba inner mq broker
+ String queryStr = uri.substring(uri.indexOf("?") + 1, uri.length());
+ if (StringUtils.isNotEmpty(queryStr)) {
+ String[] params = queryStr.split("&");
+ for (String param : params) {
+ if (param.contains("=")) {
+ String[] values = param.split("=", 2);
+ results.put(values[0], values[1]);
+ }
+ }
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Broker must be rocketmq");
+ }
+ return results;
+ }
+}
diff --git a/rocketmq-jms/core/src/main/resources/application.conf b/rocketmq-jms/core/src/main/resources/application.conf
new file mode 100644
index 0000000..713c915
--- /dev/null
+++ b/rocketmq-jms/core/src/main/resources/application.conf
@@ -0,0 +1 @@
+version = ${project.version}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
new file mode 100644
index 0000000..d77b13e
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import org.junit.Assert;
+
+public class JmsTestListener implements MessageListener {
+
+ private int expectd;
+ private CountDownLatch latch;
+ private AtomicInteger consumedNum = new AtomicInteger(0);
+
+ public JmsTestListener() {
+ this.expectd = 10;
+ }
+ public JmsTestListener(int expectd) {
+ this.expectd = expectd;
+ }
+ public JmsTestListener(int expected, CountDownLatch latch) {
+ this.expectd = expected;
+ this.latch = latch;
+ }
+ @Override
+ public void onMessage(Message message) {
+ try {
+ Assert.assertNotNull(message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ if (consumedNum.incrementAndGet() == expectd && latch != null) {
+ latch.countDown();
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getConsumedNum() {
+ return consumedNum.get();
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public void setExpectd(int expectd) {
+ this.expectd = expectd;
+ }
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
new file mode 100644
index 0000000..855cb19
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageProducer;
+import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
+import org.junit.Assert;
+
+public class JmsTestUtil {
+ public static MQProducer getMQProducer(String producerId) throws Exception {
+ Assert.assertNotNull(producerId);
+ Field field = JmsBaseMessageProducer.class.getDeclaredField("producerMap");
+ field.setAccessible(true);
+ ConcurrentMap<String, MQProducer> producerMap = (ConcurrentMap<String, MQProducer>) field.get(null);
+ return producerMap.get(producerId);
+ }
+ public static RMQPushConsumerExt getRMQPushConsumerExt(String consumerId) throws Exception {
+ Assert.assertNotNull(consumerId);
+ Field field = JmsBaseMessageConsumer.class.getDeclaredField("consumerMap");
+ field.setAccessible(true);
+ ConcurrentMap<String, RMQPushConsumerExt> consumerMap = (ConcurrentMap<String, RMQPushConsumerExt>) field.get(null);
+ return consumerMap.get(consumerId);
+ }
+ public static void checkConsumerState(String consumerId, boolean isNull, boolean isStarted) throws Exception {
+ RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId);
+ if (isNull) {
+ Assert.assertNull(rmqPushConsumerExt);
+ } else {
+ Assert.assertNotNull(rmqPushConsumerExt);
+ Assert.assertEquals(isStarted, rmqPushConsumerExt.isStarted());
+ }
+ }
+
+
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
new file mode 100644
index 0000000..9fe9f5e
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsBytesMessageTest {
+
+ private byte[] receiveData = "receive data test".getBytes();
+ private byte[] sendData = "send data test".getBytes();
+
+ @Test
+ public void testGetData() throws Exception {
+ JmsBytesMessage readMessage = new JmsBytesMessage(receiveData);
+
+ System.out.println(new String(readMessage.getData()));
+ Assert.assertEquals(new String(receiveData), new String(readMessage.getData()));
+
+ JmsBytesMessage sendMessage = new JmsBytesMessage();
+ sendMessage.writeBytes(sendData, 0, sendData.length);
+
+ System.out.println(new String(sendMessage.getData()));
+ Assert.assertEquals(new String(sendData), new String(sendMessage.getData()));
+
+ }
+
+ @Test
+ public void testGetBodyLength() throws Exception {
+
+ JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+ System.out.println(bytesMessage.getBodyLength());
+ Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
+ }
+
+ @Test
+ public void testReadBytes() throws Exception {
+ JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+ Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
+ byte[] receiveValue = new byte[receiveData.length];
+ bytesMessage.readBytes(receiveValue);
+
+ System.out.println(new String(receiveValue));
+ Assert.assertEquals(new String(receiveValue), new String(receiveData));
+
+ }
+
+ @Test
+ public void testReadBytes1() throws Exception {
+ JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+ byte[] receiveValue1 = new byte[2];
+ bytesMessage.readBytes(receiveValue1, 2);
+ System.out.println(new String(receiveValue1));
+ Assert.assertEquals(new String(receiveData).substring(0, 2), new String(receiveValue1));
+
+ byte[] receiceValue2 = new byte[2];
+ bytesMessage.readBytes(receiceValue2, 2);
+ System.out.println(new String(receiceValue2));
+ Assert.assertEquals(new String(receiveData).substring(2, 4), new String(receiceValue2));
+
+ }
+
+ @Test
+ public void testWriteBytes() throws Exception {
+ JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
+ jmsBytesMessage.writeBytes(sendData);
+
+ System.out.println(new String(jmsBytesMessage.getData()));
+ Assert.assertEquals(new String(jmsBytesMessage.getData()), new String(sendData));
+
+ }
+
+ @Test
+ public void testException() throws Exception {
+ JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
+
+ byte[] receiveValue = new byte[receiveData.length];
+// Throws out NullPointerException
+// jmsBytesMessage.readBytes(receiveValue);
+
+ JmsBytesMessage sendMessage = new JmsBytesMessage(sendData);
+// Throws out NullPointerException
+// sendMessage.writeBytes("hello again".getBytes());
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
new file mode 100644
index 0000000..b570142
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
@@ -0,0 +1,52 @@
+package org.apache.rocketmq.jms.domain.message;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseTopic;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsMessageConvertTest {
+ @Test
+ public void testCovert2RMQ() throws Exception {
+ //init jmsBaseMessage
+ String topic = "TestTopic";
+ String messageType = "TagA";
+
+ JmsBaseMessage jmsBaseMessage = new JmsTextMessage("testText");
+ jmsBaseMessage.setHeader(JmsBaseConstant.JMS_DESTINATION, new JmsBaseTopic(topic, messageType));
+ jmsBaseMessage.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:null");
+ jmsBaseMessage.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
+
+ jmsBaseMessage.setObjectProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT);
+ jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TOPIC, topic);
+ jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TYPE, messageType);
+ jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, messageType);
+ jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, messageType);
+
+ //convert to RMQMessage
+ MessageExt message = (MessageExt)MessageConverter.convert2RMQMessage(jmsBaseMessage);
+
+ System.out.println(message);
+
+ //then convert back to jmsBaseMessage
+ JmsBaseMessage jmsBaseMessageBack = MessageConverter.convert2JMSMessage(message);
+
+ JmsTextMessage jmsTextMessage = (JmsTextMessage) jmsBaseMessage;
+ JmsTextMessage jmsTextMessageBack = (JmsTextMessage) jmsBaseMessageBack;
+
+ Assert.assertEquals(jmsTextMessage.getText(), jmsTextMessageBack.getText());
+ Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), jmsTextMessageBack.getJMSDestination().toString());
+ Assert.assertEquals(jmsTextMessage.getJMSMessageID(), jmsTextMessageBack.getJMSMessageID());
+ Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), jmsTextMessageBack.getJMSRedelivered());
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL));
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TOPIC), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TOPIC));
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TYPE), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TYPE));
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS));
+ Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS));
+
+ }
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
new file mode 100644
index 0000000..6951976
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsObjectMessageTest {
+
+ @Test
+ public void testGetObject() {
+ JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20));
+ try {
+ Assert.assertEquals(jmsObjectMessage.getObject(), new User("jack", 20));
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testGetBody() {
+ JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20));
+
+ try {
+ User user = (User)jmsObjectMessage.getBody(Object.class);
+ System.out.println(user.getName() + ": " + user.getAge());
+ Assert.assertEquals(jmsObjectMessage.getBody(Object.class), jmsObjectMessage.getObject());
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private class User implements Serializable {
+ private String name;
+ private int age;
+
+ private User(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ User user = (User)obj;
+ if (age != user.getAge())
+ return false;
+ if (name != null ? !name.equals(user.getName()) : user.getName() != null)
+ return false;
+ return true;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
new file mode 100644
index 0000000..d3c8287
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import javax.jms.JMSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsTextMessageTest {
+ private String text = "jmsTextMessage test";
+
+ @Test
+ public void testGetBody() {
+ JmsTextMessage jmsTextMessage = new JmsTextMessage(text);
+ try {
+ Assert.assertEquals(jmsTextMessage.getBody(String.class), text);
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testSetGetText() {
+ JmsTextMessage jmsTextMessage = new JmsTextMessage();
+ jmsTextMessage.setText(text);
+ try {
+ Assert.assertEquals(jmsTextMessage.getText(), text);
+ }
+ catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
new file mode 100644
index 0000000..02fe111
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IntegrationTestBase {
+ public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class);
+
+ protected static Random random = new Random();
+ protected static final String SEP = File.separator;
+
+
+ protected static String topic = "jms-test";
+ protected static String topic2 = "jms-test-2";
+ protected static String messageType = "TagA";
+ protected static String producerId = "PID-jms-test";
+ protected static String consumerId = "CID-jms-test";
+ protected static String consumerId2 = "CID-jms-test-2";
+ protected static String nameServer;
+ protected static String text = "English test";
+ protected static int consumeThreadNums = 16;
+
+
+
+
+ protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
+ protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
+ protected static final List<File> TMPE_FILES = new ArrayList<File>();
+ protected static final List<BrokerController> BROKER_CONTROLLERS = new ArrayList<BrokerController>();
+ protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<NamesrvController>();
+
+
+ private static String createBaseDir() {
+ String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
+ final File file = new File(baseDir);
+ if (file.exists()) {
+ System.out.println(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir));
+ System.exit(1);
+ }
+ TMPE_FILES.add(file);
+ return baseDir;
+ }
+
+ public static NamesrvController createAndStartNamesrv() {
+ String baseDir = createBaseDir();
+ NamesrvConfig namesrvConfig = new NamesrvConfig();
+ NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
+ namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
+
+ nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
+ NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+ try {
+ Assert.assertTrue(namesrvController.initialize());
+ logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
+ namesrvController.start();
+ } catch (Exception e) {
+ System.out.println("Name Server start failed");
+ System.exit(1);
+ }
+ NAMESRV_CONTROLLERS.add(namesrvController);
+ return namesrvController;
+
+ }
+
+
+ public static BrokerController createAndStartBroker(String nsAddr) {
+ String baseDir = createBaseDir();
+ BrokerConfig brokerConfig = new BrokerConfig();
+ NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
+ brokerConfig.setBrokerIP1("127.0.0.1");
+ brokerConfig.setNamesrvAddr(nsAddr);
+ storeConfig.setStorePathRootDir(baseDir);
+ storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
+ storeConfig.setHaListenPort(8000 + random.nextInt(1000));
+ nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
+ BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
+ try {
+ Assert.assertTrue(brokerController.initialize());
+ logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+ brokerController.start();
+ } catch (Exception e) {
+ System.out.println("Broker start failed");
+ System.exit(1);
+ }
+ BROKER_CONTROLLERS.add(brokerController);
+ return brokerController;
+ }
+
+
+
+ protected static DefaultMQAdminExt defaultMQAdminExt;
+
+ static {
+ //clear the environment
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override public void run() {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ for (NamesrvController namesrvController: NAMESRV_CONTROLLERS) {
+ if (namesrvController != null) {
+ namesrvController.shutdown();
+ }
+ }
+ for (BrokerController brokerController: BROKER_CONTROLLERS) {
+ if (brokerController != null) {
+ brokerController.shutdown();
+ }
+ }
+ for (File file : TMPE_FILES) {
+ deleteFile(file);
+ }
+ }
+ });
+
+
+ NamesrvController namesrvController = IntegrationTestBase.createAndStartNamesrv();
+ nameServer = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
+ BrokerController brokerController = createAndStartBroker(nameServer);
+
+ defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExt.setNamesrvAddr(nameServer);
+ try {
+ defaultMQAdminExt.start();
+ } catch (MQClientException e) {
+ System.out.println("DefaultMQAdminExt start failed");
+ System.exit(1);
+ }
+
+ createTopic(topic, brokerController.getBrokerAddr());
+
+
+ }
+
+ public static void deleteFile(File file) {
+ if (!file.exists()) {
+ return;
+ }
+ if (file.isFile()) {
+ file.delete();
+ } else if (file.isDirectory()) {
+ File[] files = file.listFiles();
+ for (int i = 0;i < files.length;i ++) {
+ deleteFile(files[i]);
+ }
+ file.delete();
+ }
+ }
+ public static void createTopic(String topic, String addr) {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName(topic);
+ topicConfig.setReadQueueNums(4);
+ topicConfig.setWriteQueueNums(4);
+ try {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ } catch (Exception e) {
+ logger.error("Create topic:{} addr:{} failed", addr, topic);
+ }
+ }
+
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
new file mode 100644
index 0000000..367700a
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.jms.JmsTestListener;
+import org.apache.rocketmq.jms.JmsTestUtil;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
+
+public class JmsClientIT extends IntegrationTestBase {
+
+ @Test
+ public void testConfigInURI() throws Exception {
+ JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+ URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+ CommonConstant.PRODUCERID, producerId,
+ CommonConstant.CONSUMERID, consumerId,
+ CommonConstant.NAMESERVER, nameServer,
+ CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+ CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+ CommonConstant.INSTANCE_NAME, "JMS_TEST")));
+
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ try {
+ Destination destination = session.createTopic(topic + ":" + messageType);
+ session.createConsumer(destination);
+ session.createProducer(destination);
+
+ DefaultMQPushConsumer rmqPushConsumer = (DefaultMQPushConsumer) getRMQPushConsumerExt(consumerId).getConsumer();
+ Assert.assertNotNull(rmqPushConsumer);
+ Assert.assertEquals(consumerId, rmqPushConsumer.getConsumerGroup());
+ Assert.assertEquals("JMS_TEST", rmqPushConsumer.getInstanceName());
+ Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMax());
+ Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMin());
+ Assert.assertEquals(nameServer, rmqPushConsumer.getNamesrvAddr());
+
+ DefaultMQProducer mqProducer = (DefaultMQProducer) JmsTestUtil.getMQProducer(producerId);
+ Assert.assertNotNull(mqProducer);
+ Assert.assertEquals(producerId, mqProducer.getProducerGroup());
+ Assert.assertEquals("JMS_TEST", mqProducer.getInstanceName());
+ Assert.assertEquals(10 * 1000, mqProducer.getSendMsgTimeout());
+ Assert.assertEquals(nameServer, mqProducer.getNamesrvAddr());
+
+ Thread.sleep(2000);
+ }
+ finally {
+ connection.close();
+ }
+
+ }
+
+
+ private Connection createConnection(String producerGroup, String consumerGroup) throws Exception {
+ JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+ URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+ CommonConstant.PRODUCERID, producerGroup,
+ CommonConstant.CONSUMERID, consumerGroup,
+ CommonConstant.NAMESERVER, nameServer,
+ CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+ CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+ CommonConstant.INSTANCE_NAME, "JMS_TEST")));
+ return connectionFactory.createConnection();
+ }
+
+ @Test
+ public void testProducerAndConsume_TwoConsumer() throws Exception {
+
+ Connection connection = createConnection(producerId, consumerId);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destinationA = session.createTopic("TopicA");
+ Destination destinationB = session.createTopic("TopicB");
+ final CountDownLatch countDownLatch = new CountDownLatch(2);
+ JmsTestListener listenerA = new JmsTestListener(10,countDownLatch);
+ JmsTestListener listenerB = new JmsTestListener(10, countDownLatch);
+
+ try {
+ //two consumers
+ MessageConsumer messageConsumerA = session.createConsumer(destinationA);
+ messageConsumerA.setMessageListener(listenerA);
+ MessageConsumer messageConsumerB = session.createConsumer(destinationB);
+ messageConsumerB.setMessageListener(listenerB);
+ //producer
+ MessageProducer messageProducer = session.createProducer(destinationA);
+ connection.start();
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = session.createTextMessage(text + i);
+ Assert.assertNull(message.getJMSMessageID());
+ messageProducer.send(message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+ for (int i = 0; i < 10; i++) {
+ TextMessage message = session.createTextMessage(text + i);
+ Assert.assertNull(message.getJMSMessageID());
+ messageProducer.send(destinationB, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+
+ if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+ Thread.sleep(2000);
+ }
+ Assert.assertEquals(10, listenerA.getConsumedNum());
+ Assert.assertEquals(10, listenerB.getConsumedNum());
+ }
+ finally {
+ //Close the connection
+ connection.close();
+ }
+
+ }
+
+ @Test
+ public void testProducerAndConsume_TagFilter() throws Exception {
+ Connection connection = createConnection(producerId, consumerId);
+ Connection anotherConnection = createConnection(producerId, consumerId +"other");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session anotherSession = anotherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination destinationA = session.createTopic("topic:tagA");
+ Destination destinationB = session.createTopic("topic:tagB");
+ final CountDownLatch countDownLatch = new CountDownLatch(2);
+ JmsTestListener listenerForTagA = new JmsTestListener(10, countDownLatch);
+ JmsTestListener listenerForAll = new JmsTestListener(40, countDownLatch);
+ try {
+ session.createConsumer(destinationA).setMessageListener(listenerForTagA);
+ anotherSession.createConsumer(session.createTopic("topic")).setMessageListener(listenerForAll);
+ //producer
+ MessageProducer messageProducer = session.createProducer(destinationA);
+ connection.start();
+ anotherConnection.start();
+
+ for (int i = 0; i < 20; i++) {
+ TextMessage message = session.createTextMessage(text + i);
+ Assert.assertNull(message.getJMSMessageID());
+ messageProducer.send(message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+ for (int i = 0; i < 20; i++) {
+ TextMessage message = session.createTextMessage(text + i);
+ Assert.assertNull(message.getJMSMessageID());
+ messageProducer.send(destinationB, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+
+ if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+ Thread.sleep(2000);
+ }
+ Assert.assertEquals(20, listenerForTagA.getConsumedNum());
+ Assert.assertEquals(40, listenerForAll.getConsumedNum());
+ }
+ finally {
+ //Close the connection
+ connection.close();
+ anotherConnection.close();
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
new file mode 100644
index 0000000..6cbb7b1
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
+import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.jms.JmsTestUtil.checkConsumerState;
+import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
+
+public class JmsConsumerIT extends IntegrationTestBase {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+
+ private MessageListener listener = new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ Assert.assertNotNull(message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+
+ @Test
+ public void testStartIdempotency() throws Exception {
+ JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+ URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer));
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ checkConsumerState(consumerId, true, false);
+ try {
+ Destination destination = session.createTopic(topic + ":" + messageType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(listener);
+
+ checkConsumerState(consumerId, false, false);
+
+ ((JmsBaseMessageConsumer) consumer).startConsumer();
+ checkConsumerState(consumerId, false, true);
+
+ Destination destination1 = session.createTopic(topic2 + ":" + messageType);
+ MessageConsumer consumer1 = session.createConsumer(destination1);
+ consumer1.setMessageListener(listener);
+
+ ((JmsBaseMessageConsumer) consumer1).startConsumer();
+ checkConsumerState(consumerId, false, true);
+
+ //the start is idempotent
+ connection.start();
+ connection.start();
+
+ Thread.sleep(5000);
+ }
+ finally {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testReferenceCount() throws Exception {
+ JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+ URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer));
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ connection.start();
+ try {
+ Destination destination = session.createTopic(topic + ":" + messageType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(listener);
+
+ RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId);
+ Assert.assertNotNull(rmqPushConsumerExt);
+ Assert.assertEquals(1, rmqPushConsumerExt.getReferenceCount());
+
+
+ MessageConsumer consumer2 = session.createConsumer(destination);
+ Assert.assertEquals(2, rmqPushConsumerExt.getReferenceCount());
+
+ MessageConsumer consumer3 = session.createConsumer(session.createTopic(topic + ":" + messageType));
+
+ Assert.assertEquals(3, rmqPushConsumerExt.getReferenceCount());
+
+ session.close();
+
+ Assert.assertEquals(0, rmqPushConsumerExt.getReferenceCount());
+ Assert.assertEquals(false, rmqPushConsumerExt.isStarted());
+ Assert.assertNull(getRMQPushConsumerExt(consumerId));
+
+ Thread.sleep(5000);
+ }
+ finally {
+ connection.close();
+ }
+ }
+
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java
new file mode 100644
index 0000000..af57f67
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import java.util.Map;
+import org.junit.Test;
+
+public class URISpecParserTest {
+
+ @Test
+ public void parseURI_NormalTest() {
+ Map<String, String> result = URISpecParser.parseURI("rocketmq://localhost");
+ System.out.println(result);
+
+ result = URISpecParser
+ .parseURI("rocketmq://xxx?appId=test&consumerId=testGroup");
+ System.out.println(result);
+
+ result = URISpecParser.parseURI("rocketmq:!@#$%^&*()//localhost?appId=test!@#$%^&*()");
+ System.out.println(result);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void parseURI_AbnormalTest() {
+ URISpecParser.parseURI("metaq3://localhost?appId=test");
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-jms/pom.xml b/rocketmq-jms/pom.xml
new file mode 100644
index 0000000..2ecbd42
--- /dev/null
+++ b/rocketmq-jms/pom.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-jms-all</artifactId>
+ <packaging>pom</packaging>
+ <version>1.0.0</version>
+ <modules>
+ <module>spring</module>
+ <module>core</module>
+ </modules>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!--maven properties -->
+ <maven.test.skip>false</maven.test.skip>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ <!-- compiler settings properties -->
+ <maven.compiler.source>1.6</maven.compiler.source>
+ <maven.compiler.target>1.6</maven.compiler.target>
+ <surefire.version>2.19.1</surefire.version>
+ <rocketmq.version>4.0.0-incubating</rocketmq.version>
+
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms-api</artifactId>
+ <version>1.1-rev-1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.6</version>
+ </dependency>
+
+ <!--test-->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>4.12</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-namesrv</artifactId>
+ <version>${rocketmq.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-broker</artifactId>
+ <version>${rocketmq.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>0.7.8</version>
+ <executions>
+ <execution>
+ <id>default-prepare-agent</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ <configuration>
+ <destFile>${project.build.directory}/jacoco.exec</destFile>
+ </configuration>
+ </execution>
+ <execution>
+ <id>default-prepare-agent-integration</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>prepare-agent-integration</goal>
+ </goals>
+ <configuration>
+ <destFile>${project.build.directory}/jacoco-it.exec</destFile>
+ <propertyName>failsafeArgLine</propertyName>
+ </configuration>
+ </execution>
+ <execution>
+ <id>default-report</id>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>default-report-integration</id>
+ <goals>
+ <goal>report-integration</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${surefire.version}</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>${surefire.version}</version>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>true</reuseForks>
+ <argLine>@{failsafeArgLine}</argLine>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.eluder.coveralls</groupId>
+ <artifactId>coveralls-maven-plugin</artifactId>
+ <version>4.3.0</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/rocketmq-jms/spring/pom.xml b/rocketmq-jms/spring/pom.xml
new file mode 100644
index 0000000..b2afe12
--- /dev/null
+++ b/rocketmq-jms/spring/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-jms-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>1.0.0</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-jms-spring</artifactId>
+ <version>1.0.0</version>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <spring-version>4.1.4.RELEASE</spring-version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>rocketmq-jms</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <!-- spring 2.5.6 -->
+ <!--
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring</artifactId>
+ <version>2.5.6.SEC03</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-test</artifactId>
+ <version>2.5.6.SEC03</version>
+ <scope>test</scope>
+ </dependency>
+ -->
+ <!-- spring 3 or 4 -->
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jms</artifactId>
+ <version>${spring-version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <!-- for test -->
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>rocketmq-jms</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>6.8.21</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/rocketmq-jms/spring/src/main/java/org/apache/rocketmq/jms/spring/SimpleExMessageListenerContainer.java b/rocketmq-jms/spring/src/main/java/org/apache/rocketmq/jms/spring/SimpleExMessageListenerContainer.java
new file mode 100644
index 0000000..efbfbe9
--- /dev/null
+++ b/rocketmq-jms/spring/src/main/java/org/apache/rocketmq/jms/spring/SimpleExMessageListenerContainer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.spring;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
+
+
+public class SimpleExMessageListenerContainer extends SimpleMessageListenerContainer {
+
+ private String cacheLevelName;
+
+ /**
+ * Create a MessageConsumer for the given JMS Session, registering a
+ * MessageListener for the specified listener.
+ *
+ * @param session
+ * the JMS Session to work on
+ *
+ * @return the MessageConsumer
+ *
+ * @throws JMSException
+ * if thrown by JMS methods
+ * @see #executeListener
+ */
+ protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {
+ Destination destination = getDestination();
+ if (destination == null) {
+ destination = resolveDestinationName(session, getDestinationName());
+ }
+ MessageConsumer consumer = createConsumer(session, destination);
+ consumer.setMessageListener((MessageListener) super.getMessageListener());
+ return consumer;
+ }
+
+ /**
+ * Create a JMS MessageConsumer for the given Session and Destination.
+ * <p>
+ * This implementation uses JMS 1.1 API.
+ *
+ * @param session
+ * the JMS Session to create a MessageConsumer for
+ * @param destination
+ * the JMS Destination to create a MessageConsumer for
+ *
+ * @return the new JMS MessageConsumer
+ *
+ * @throws JMSException
+ * if thrown by JMS API methods
+ */
+ protected MessageConsumer createConsumer(Session session, Destination destination)
+ throws JMSException {
+ //ONS not support message selector and other features nowadays
+ return session.createConsumer(destination);
+ }
+
+ /**
+ * @return the cacheLevelName
+ */
+ public String getCacheLevelName() {
+ return cacheLevelName;
+ }
+
+ /**
+ * @param cacheLevelName
+ * the cacheLevelName to set
+ */
+ public void setCacheLevelName(String cacheLevelName) {
+ this.cacheLevelName = cacheLevelName;
+ }
+}
diff --git a/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsConsumeIT.java b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsConsumeIT.java
new file mode 100644
index 0000000..03da06e
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsConsumeIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.spring;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.JmsTestListener;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.springframework.jms.core.ProducerCallback;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class JmsConsumeIT extends JmsProduceIT {
+
+
+ @Test
+ public void testConsume() throws Exception {
+ final Topic topic = (Topic) consumeContext.getBean("baseTopic");
+ JmsTestListener messageListener = (JmsTestListener) consumeContext.getBean("messageListener");
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ messageListener.setLatch(countDownLatch);
+ messageListener.setExpectd(30);
+ consumeContext.start();
+
+ for (int i = 0; i < 30; i++) {
+ jmsTemplate.execute(topic, new ProducerCallback() {
+ @Override
+ public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+ JmsTextMessage message = (JmsTextMessage) session.createTextMessage("hello world,kafka, haha");
+ producer.send(topic, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ return message;
+ }
+ });
+ }
+ if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+ Thread.sleep(2000);
+ }
+ Assert.assertEquals(30, messageListener.getConsumedNum());
+ consumeContext.close();
+ }
+}
diff --git a/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsProduceIT.java b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsProduceIT.java
new file mode 100644
index 0000000..bcb968b
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsProduceIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.spring;
+
+import com.google.common.collect.Lists;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.ProducerCallback;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class JmsProduceIT extends SpringTestBase {
+
+ protected JmsTemplate jmsTemplate = (JmsTemplate) produceContext.getBean("jmsTemplate");
+
+ private Topic destination = (Topic) produceContext.getBean("destination");
+
+ @Test
+ public void simpleSendTest() throws Exception {
+ //Send text message
+ jmsTemplate.execute(destination, new ProducerCallback() {
+ @Override
+ public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+ JmsTextMessage message = (JmsTextMessage) session.createTextMessage("hello world,kafka");
+ producer.send(destination, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ return message;
+ }
+ });
+
+ //Send object message
+ jmsTemplate.execute(destination, new ProducerCallback() {
+ @Override
+ public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+ JmsObjectMessage message = (JmsObjectMessage) session.createObjectMessage(Lists.newArrayList(1, 2, 3));
+ producer.send(destination, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ return message;
+ }
+ });
+
+ //Send byte message
+ jmsTemplate.execute(destination, new ProducerCallback() {
+ @Override
+ public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+ byte[] ts = "Von,Test".getBytes();
+ JmsBytesMessage message = (JmsBytesMessage) session.createBytesMessage();
+ message.writeBytes(ts);
+ producer.send(destination, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ return message;
+ }
+ });
+ }
+
+
+ @Test(threadPoolSize = 2, invocationCount = 20)
+ public void multiSenderTest() throws Exception {
+ jmsTemplate.execute(destination, new ProducerCallback() {
+ @Override
+ public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+ byte[] ts = "Von,Multi thread sender test".getBytes();
+ JmsBytesMessage message = (JmsBytesMessage) session.createBytesMessage();
+ message.writeBytes(ts);
+ producer.send(destination, message);
+ Assert.assertNotNull(message.getJMSMessageID());
+ return message;
+ }
+ });
+ }
+}
diff --git a/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/SpringTestBase.java b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/SpringTestBase.java
new file mode 100644
index 0000000..d830a3e
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/SpringTestBase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.spring;
+
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.jms.integration.IntegrationTestBase;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class SpringTestBase extends IntegrationTestBase{
+
+ protected final static ClassPathXmlApplicationContext produceContext;
+ protected final static ClassPathXmlApplicationContext consumeContext;
+
+ static {
+ String rmqJmsUrl = String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+ CommonConstant.PRODUCERID, producerId,
+ CommonConstant.CONSUMERID, consumerId,
+ CommonConstant.NAMESERVER, nameServer,
+ CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+ CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+ CommonConstant.INSTANCE_NAME, "JMS_TEST");
+ System.setProperty("RMQ_JMS_URL", rmqJmsUrl);
+ produceContext = new ClassPathXmlApplicationContext("classpath:producer.xml");
+ consumeContext = new ClassPathXmlApplicationContext("classpath:consumer.xml");
+ }
+}
diff --git a/rocketmq-jms/spring/src/test/resources/consumer.xml b/rocketmq-jms/spring/src/test/resources/consumer.xml
new file mode 100644
index 0000000..0d46d8f
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/resources/consumer.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+
+ <context:property-placeholder/>
+
+ <!-- Consumer config -->
+ <bean id="jmsConsumerConnectionFactory"
+ class="org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory">
+ <property name="connectionUri" value="${RMQ_JMS_URL}" />
+ </bean>
+
+ <bean id="baseTopic"
+ class="org.apache.rocketmq.jms.domain.JmsBaseTopic">
+ <constructor-arg index="0" value="RMQ_JMS_TEST_CONSUME" /><!--topic -->
+ <constructor-arg index="1" value="baseType" /><!-- messageType -->
+ </bean>
+
+ <bean id="messageListener"
+ class="org.apache.rocketmq.jms.JmsTestListener"/>
+
+
+ <jms:listener-container
+ container-class="org.apache.rocketmq.jms.spring.SimpleExMessageListenerContainer"
+ connection-factory="jmsConsumerConnectionFactory" destination-type="durableTopic">
+ <jms:listener destination="RMQ_JMS_TEST_CONSUME:baseType"
+ ref="messageListener" method="onMessage" />
+ </jms:listener-container>
+
+</beans>
\ No newline at end of file
diff --git a/rocketmq-jms/spring/src/test/resources/producer.xml b/rocketmq-jms/spring/src/test/resources/producer.xml
new file mode 100644
index 0000000..a99343a
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/resources/producer.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+ <context:property-placeholder/>
+ <!-- Producer config -->
+ <bean id="jmsProducerConnectionFactory"
+ class="org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory">
+ <property name="connectionUri" value="${RMQ_JMS_URL}" />
+ </bean>
+
+ <bean id="destination"
+ class="org.apache.rocketmq.jms.domain.JmsBaseTopic">
+ <constructor-arg index="0" value="RMQ_JMS_TEST_PRODUCE" /><!--topic -->
+ <constructor-arg index="1" value="baseType" /><!-- messageType -->
+ </bean>
+
+ <bean id="jmsTemplate"
+ class="org.springframework.jms.core.JmsTemplate">
+ <property name="connectionFactory" ref="jmsProducerConnectionFactory" />
+ </bean>
+
+
+</beans>
\ No newline at end of file
diff --git a/rocketmq-jms/style/copyright/Apache.xml b/rocketmq-jms/style/copyright/Apache.xml
new file mode 100644
index 0000000..2db86d0
--- /dev/null
+++ b/rocketmq-jms/style/copyright/Apache.xml
@@ -0,0 +1,24 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<component name="CopyrightManager">
+ <copyright>
+ <option name="myName" value="Apache"/>
+ <option name="notice"
+ value="Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."/>
+ </copyright>
+</component>
\ No newline at end of file
diff --git a/rocketmq-jms/style/copyright/profiles_settings.xml b/rocketmq-jms/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..4c0e521
--- /dev/null
+++ b/rocketmq-jms/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<component name="CopyrightManager">
+ <settings default="Apache">
+ <module2copyright>
+ <element module="All" copyright="Apache"/>
+ </module2copyright>
+ <LanguageOptions name="GSP">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="prefixLines" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="HTML">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="prefixLines" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="JAVA">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="addBlankAfter" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="JSP">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="prefixLines" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="JSPX">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="prefixLines" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="MXML">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="prefixLines" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="Properties">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="block" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="SPI">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="block" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="XML">
+ <option name="fileTypeOverride" value="3"/>
+ <option name="prefixLines" value="false"/>
+ </LanguageOptions>
+ <LanguageOptions name="__TEMPLATE__">
+ <option name="separateBefore" value="true"/>
+ <option name="lenBefore" value="1"/>
+ </LanguageOptions>
+ </settings>
+</component>
\ No newline at end of file
diff --git a/rocketmq-jms/style/rmq_checkstyle.xml b/rocketmq-jms/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..e3155cc
--- /dev/null
+++ b/rocketmq-jms/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE module PUBLIC
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding -->
+<module name="Checker">
+
+ <property name="localeLanguage" value="en"/>
+
+ <!--To configure the check to report on the first instance in each file-->
+ <module name="FileTabCharacter"/>
+
+ <!-- header -->
+ <module name="RegexpHeader">
+ <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format" value="System\.out\.println"/>
+ <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format" value="//FIXME"/>
+ <property name="message" value="Recommended fix FIXME task !"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format" value="//TODO"/>
+ <property name="message" value="Recommended fix TODO task !"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format" value="@alibaba"/>
+ <property name="message" value="Recommended remove @alibaba keyword!"/>
+ </module>
+ <module name="RegexpSingleline">
+ <property name="format" value="@taobao"/>
+ <property name="message" value="Recommended remove @taobao keyword!"/>
+ </module>
+ <module name="RegexpSingleline">
+ <property name="format" value="@author"/>
+ <property name="message" value="Recommended remove @author tag in javadoc!"/>
+ </module>
+
+ <module name="RegexpSingleline">
+ <property name="format"
+ value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+ <property name="message" value="Not allow chinese character !"/>
+ </module>
+
+ <module name="FileLength">
+ <property name="max" value="3000"/>
+ </module>
+
+ <module name="TreeWalker">
+
+ <module name="UnusedImports">
+ <property name="processJavadoc" value="true"/>
+ </module>
+ <module name="RedundantImport"/>
+
+ <!--<module name="IllegalImport" />-->
+
+ <!--Checks that classes that override equals() also override hashCode()-->
+ <module name="EqualsHashCode"/>
+ <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+ <module name="SimplifyBooleanExpression"/>
+ <module name="OneStatementPerLine"/>
+ <module name="UnnecessaryParentheses"/>
+ <!--Checks for over-complicated boolean return statements. For example the following code-->
+ <module name="SimplifyBooleanReturn"/>
+
+ <!--Check that the default is after all the cases in producerGroup switch statement-->
+ <module name="DefaultComesLast"/>
+ <!--Detects empty statements (standalone ";" semicolon)-->
+ <module name="EmptyStatement"/>
+ <!--Checks that long constants are defined with an upper ell-->
+ <module name="UpperEll"/>
+ <module name="ConstantName">
+ <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+ </module>
+ <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+ <module name="LocalVariableName"/>
+ <!--Validates identifiers for local, final variables, including catch parameters-->
+ <module name="LocalFinalVariableName"/>
+ <!--Validates identifiers for non-static fields-->
+ <module name="MemberName"/>
+ <!--Validates identifiers for class type parameters-->
+ <module name="ClassTypeParameterName">
+ <property name="format" value="^[A-Z0-9]*$"/>
+ </module>
+ <!--Validates identifiers for method type parameters-->
+ <module name="MethodTypeParameterName">
+ <property name="format" value="^[A-Z0-9]*$"/>
+ </module>
+ <module name="PackageName"/>
+ <module name="ParameterName"/>
+ <module name="StaticVariableName"/>
+ <module name="TypeName"/>
+ <!--Checks that there are no import statements that use the * notation-->
+ <module name="AvoidStarImport"/>
+
+ <!--whitespace-->
+ <module name="GenericWhitespace"/>
+ <module name="NoWhitespaceBefore"/>
+ <module name="WhitespaceAfter"/>
+ <module name="NoWhitespaceAfter"/>
+ <module name="WhitespaceAround">
+ <property name="allowEmptyConstructors" value="true"/>
+ <property name="allowEmptyMethods" value="true"/>
+ </module>
+ <module name="Indentation"/>
+ <module name="MethodParamPad"/>
+ <module name="ParenPad"/>
+ <module name="TypecastParenPad"/>
+ </module>
+</module>
diff --git a/rocketmq-jms/style/rmq_codeStyle.xml b/rocketmq-jms/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..cd95ee6
--- /dev/null
+++ b/rocketmq-jms/style/rmq_codeStyle.xml
@@ -0,0 +1,157 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<code_scheme name="rocketmq">
+ <option name="USE_SAME_INDENTS" value="true"/>
+ <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+ <option name="OTHER_INDENT_OPTIONS">
+ <value>
+ <option name="INDENT_SIZE" value="4"/>
+ <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+ <option name="TAB_SIZE" value="4"/>
+ <option name="USE_TAB_CHARACTER" value="false"/>
+ <option name="SMART_TABS" value="false"/>
+ <option name="LABEL_INDENT_SIZE" value="0"/>
+ <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+ <option name="USE_RELATIVE_INDENTS" value="false"/>
+ </value>
+ </option>
+ <option name="PREFER_LONGER_NAMES" value="false"/>
+ <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+ <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+ <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+ <value/>
+ </option>
+ <option name="IMPORT_LAYOUT_TABLE">
+ <value>
+ <package name="" withSubpackages="true" static="false"/>
+ <emptyLine/>
+ <package name="" withSubpackages="true" static="true"/>
+ </value>
+ </option>
+ <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+ <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+ <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+ <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+ <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+ <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+ <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+ <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+ <option name="ELSE_ON_NEW_LINE" value="true"/>
+ <option name="WHILE_ON_NEW_LINE" value="true"/>
+ <option name="CATCH_ON_NEW_LINE" value="true"/>
+ <option name="FINALLY_ON_NEW_LINE" value="true"/>
+ <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+ <option name="ALIGN_MULTILINE_FOR" value="false"/>
+ <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+ <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+ <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+ <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+ <option name="LABELED_STATEMENT_WRAP" value="1"/>
+ <option name="WRAP_COMMENTS" value="true"/>
+ <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+ <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+ <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+ <JavaCodeStyleSettings>
+ <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+ </JavaCodeStyleSettings>
+ <XML>
+ <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+ </XML>
+ <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+ <option name="INDENT_SIZE" value="2"/>
+ </ADDITIONAL_INDENT_OPTIONS>
+ <codeStyleSettings language="Groovy">
+ <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+ <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+ <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+ <option name="ELSE_ON_NEW_LINE" value="true"/>
+ <option name="CATCH_ON_NEW_LINE" value="true"/>
+ <option name="FINALLY_ON_NEW_LINE" value="true"/>
+ <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+ <option name="ALIGN_MULTILINE_FOR" value="false"/>
+ <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+ <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+ <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+ <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+ <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+ <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+ <indentOptions>
+ <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+ </indentOptions>
+ </codeStyleSettings>
+ <codeStyleSettings language="HOCON">
+ <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+ <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+ </codeStyleSettings>
+ <codeStyleSettings language="JAVA">
+ <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+ <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+ <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+ <option name="ELSE_ON_NEW_LINE" value="true"/>
+ <option name="WHILE_ON_NEW_LINE" value="true"/>
+ <option name="CATCH_ON_NEW_LINE" value="true"/>
+ <option name="FINALLY_ON_NEW_LINE" value="true"/>
+ <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+ <option name="ALIGN_MULTILINE_FOR" value="false"/>
+ <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+ <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+ <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+ <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+ <option name="LABELED_STATEMENT_WRAP" value="1"/>
+ <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+ <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+ <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+ <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+ <indentOptions>
+ <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+ </indentOptions>
+ </codeStyleSettings>
+ <codeStyleSettings language="JSON">
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+ <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+ </codeStyleSettings>
+ <codeStyleSettings language="Scala">
+ <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+ <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+ <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+ <option name="ELSE_ON_NEW_LINE" value="true"/>
+ <option name="WHILE_ON_NEW_LINE" value="true"/>
+ <option name="CATCH_ON_NEW_LINE" value="true"/>
+ <option name="FINALLY_ON_NEW_LINE" value="true"/>
+ <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+ <option name="ALIGN_MULTILINE_FOR" value="false"/>
+ <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+ <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+ <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+ <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+ <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+ <indentOptions>
+ <option name="INDENT_SIZE" value="4"/>
+ <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+ <option name="TAB_SIZE" value="4"/>
+ </indentOptions>
+ </codeStyleSettings>
+ <codeStyleSettings language="XML">
+ <indentOptions>
+ <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+ </indentOptions>
+ </codeStyleSettings>
+</code_scheme>
\ No newline at end of file